diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 4c3c162acf..0b69aa6a94 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -438,7 +438,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
"""
if stream_ordering <= self.stream_ordering_month_ago:
- raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,))
+ raise StoreError(400, "stream_ordering too old")
sql = """
SELECT event_id FROM stream_ordering_to_exterm
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index b94fe7ac17..b3d27a2ee7 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -97,7 +97,6 @@ class PersistEventsStore:
self.store = main_data_store
self.database_engine = db.engine
self._clock = hs.get_clock()
- self._instance_name = hs.get_instance_name()
self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
self.is_mine_id = hs.is_mine_id
@@ -109,7 +108,7 @@ class PersistEventsStore:
# This should only exist on instances that are configured to write
assert (
- hs.get_instance_name() in hs.config.worker.writers.events
+ hs.config.worker.writers.events == hs.get_instance_name()
), "Can only instantiate EventsStore on master"
async def _persist_events_and_state_updates(
@@ -801,7 +800,6 @@ class PersistEventsStore:
table="events",
values=[
{
- "instance_name": self._instance_name,
"stream_ordering": event.internal_metadata.stream_ordering,
"topological_ordering": event.depth,
"depth": event.depth,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 17f5997b89..a7a73cc3d8 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -42,8 +42,7 @@ from synapse.replication.tcp.streams import BackfillStream
from synapse.replication.tcp.streams.events import EventsStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
-from synapse.storage.engines import PostgresEngine
-from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
+from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import Collection, get_domain_from_id
from synapse.util.caches.descriptors import Cache, cached
from synapse.util.iterutils import batch_iter
@@ -79,54 +78,27 @@ class EventsWorkerStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super(EventsWorkerStore, self).__init__(database, db_conn, hs)
- if isinstance(database.engine, PostgresEngine):
- # If we're using Postgres than we can use `MultiWriterIdGenerator`
- # regardless of whether this process writes to the streams or not.
- self._stream_id_gen = MultiWriterIdGenerator(
- db_conn=db_conn,
- db=database,
- instance_name=hs.get_instance_name(),
- table="events",
- instance_column="instance_name",
- id_column="stream_ordering",
- sequence_name="events_stream_seq",
+ if hs.config.worker.writers.events == hs.get_instance_name():
+ # We are the process in charge of generating stream ids for events,
+ # so instantiate ID generators based on the database
+ self._stream_id_gen = StreamIdGenerator(
+ db_conn, "events", "stream_ordering",
)
- self._backfill_id_gen = MultiWriterIdGenerator(
- db_conn=db_conn,
- db=database,
- instance_name=hs.get_instance_name(),
- table="events",
- instance_column="instance_name",
- id_column="stream_ordering",
- sequence_name="events_backfill_stream_seq",
- positive=False,
+ self._backfill_id_gen = StreamIdGenerator(
+ db_conn,
+ "events",
+ "stream_ordering",
+ step=-1,
+ extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
)
else:
- # We shouldn't be running in worker mode with SQLite, but its useful
- # to support it for unit tests.
- #
- # If this process is the writer than we need to use
- # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
- # updated over replication. (Multiple writers are not supported for
- # SQLite).
- if hs.get_instance_name() in hs.config.worker.writers.events:
- self._stream_id_gen = StreamIdGenerator(
- db_conn, "events", "stream_ordering",
- )
- self._backfill_id_gen = StreamIdGenerator(
- db_conn,
- "events",
- "stream_ordering",
- step=-1,
- extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
- )
- else:
- self._stream_id_gen = SlavedIdTracker(
- db_conn, "events", "stream_ordering"
- )
- self._backfill_id_gen = SlavedIdTracker(
- db_conn, "events", "stream_ordering", step=-1
- )
+ # Another process is in charge of persisting events and generating
+ # stream IDs: rely on the replication streams to let us know which
+ # IDs we can process.
+ self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
+ self._backfill_id_gen = SlavedIdTracker(
+ db_conn, "events", "stream_ordering", step=-1
+ )
self._get_event_cache = Cache(
"*getEvent*",
diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql
deleted file mode 100644
index 98ff76d709..0000000000
--- a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql
+++ /dev/null
@@ -1,16 +0,0 @@
-/* Copyright 2020 The Matrix.org Foundation C.I.C.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-ALTER TABLE events ADD COLUMN instance_name TEXT;
diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres
deleted file mode 100644
index 97c1e6a0c5..0000000000
--- a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres
+++ /dev/null
@@ -1,26 +0,0 @@
-/* Copyright 2020 The Matrix.org Foundation C.I.C.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-CREATE SEQUENCE IF NOT EXISTS events_stream_seq;
-
-SELECT setval('events_stream_seq', (
- SELECT COALESCE(MAX(stream_ordering), 1) FROM events
-));
-
-CREATE SEQUENCE IF NOT EXISTS events_backfill_stream_seq;
-
-SELECT setval('events_backfill_stream_seq', (
- SELECT COALESCE(-MIN(stream_ordering), 1) FROM events
-));
|