diff options
Diffstat (limited to 'synapse/storage')
7 files changed, 26 insertions, 100 deletions
diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index c73d54fb67..0ac854aee2 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -68,7 +68,7 @@ class Databases(object): # If we're on a process that can persist events also # instantiate a `PersistEventsStore` - if hs.get_instance_name() in hs.config.worker.writers.events: + if hs.config.worker.writers.events == hs.get_instance_name(): persist_events = PersistEventsStore(hs, database, main) if "state" in database_config.databases: diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 4c3c162acf..0b69aa6a94 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -438,7 +438,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas """ if stream_ordering <= self.stream_ordering_month_ago: - raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,)) + raise StoreError(400, "stream_ordering too old") sql = """ SELECT event_id FROM stream_ordering_to_exterm diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index b94fe7ac17..b3d27a2ee7 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -97,7 +97,6 @@ class PersistEventsStore: self.store = main_data_store self.database_engine = db.engine self._clock = hs.get_clock() - self._instance_name = hs.get_instance_name() self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages self.is_mine_id = hs.is_mine_id @@ -109,7 +108,7 @@ class PersistEventsStore: # This should only exist on instances that are configured to write assert ( - hs.get_instance_name() in hs.config.worker.writers.events + hs.config.worker.writers.events == hs.get_instance_name() ), "Can only instantiate EventsStore on master" async def _persist_events_and_state_updates( @@ -801,7 +800,6 @@ class PersistEventsStore: table="events", values=[ { - "instance_name": self._instance_name, "stream_ordering": event.internal_metadata.stream_ordering, "topological_ordering": event.depth, "depth": event.depth, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 17f5997b89..a7a73cc3d8 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -42,8 +42,7 @@ from synapse.replication.tcp.streams import BackfillStream from synapse.replication.tcp.streams.events import EventsStream from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool -from synapse.storage.engines import PostgresEngine -from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator +from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import Collection, get_domain_from_id from synapse.util.caches.descriptors import Cache, cached from synapse.util.iterutils import batch_iter @@ -79,54 +78,27 @@ class EventsWorkerStore(SQLBaseStore): def __init__(self, database: DatabasePool, db_conn, hs): super(EventsWorkerStore, self).__init__(database, db_conn, hs) - if isinstance(database.engine, PostgresEngine): - # If we're using Postgres than we can use `MultiWriterIdGenerator` - # regardless of whether this process writes to the streams or not. - self._stream_id_gen = MultiWriterIdGenerator( - db_conn=db_conn, - db=database, - instance_name=hs.get_instance_name(), - table="events", - instance_column="instance_name", - id_column="stream_ordering", - sequence_name="events_stream_seq", + if hs.config.worker.writers.events == hs.get_instance_name(): + # We are the process in charge of generating stream ids for events, + # so instantiate ID generators based on the database + self._stream_id_gen = StreamIdGenerator( + db_conn, "events", "stream_ordering", ) - self._backfill_id_gen = MultiWriterIdGenerator( - db_conn=db_conn, - db=database, - instance_name=hs.get_instance_name(), - table="events", - instance_column="instance_name", - id_column="stream_ordering", - sequence_name="events_backfill_stream_seq", - positive=False, + self._backfill_id_gen = StreamIdGenerator( + db_conn, + "events", + "stream_ordering", + step=-1, + extra_tables=[("ex_outlier_stream", "event_stream_ordering")], ) else: - # We shouldn't be running in worker mode with SQLite, but its useful - # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). - if hs.get_instance_name() in hs.config.worker.writers.events: - self._stream_id_gen = StreamIdGenerator( - db_conn, "events", "stream_ordering", - ) - self._backfill_id_gen = StreamIdGenerator( - db_conn, - "events", - "stream_ordering", - step=-1, - extra_tables=[("ex_outlier_stream", "event_stream_ordering")], - ) - else: - self._stream_id_gen = SlavedIdTracker( - db_conn, "events", "stream_ordering" - ) - self._backfill_id_gen = SlavedIdTracker( - db_conn, "events", "stream_ordering", step=-1 - ) + # Another process is in charge of persisting events and generating + # stream IDs: rely on the replication streams to let us know which + # IDs we can process. + self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering") + self._backfill_id_gen = SlavedIdTracker( + db_conn, "events", "stream_ordering", step=-1 + ) self._get_event_cache = Cache( "*getEvent*", diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql deleted file mode 100644 index 98ff76d709..0000000000 --- a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql +++ /dev/null @@ -1,16 +0,0 @@ -/* Copyright 2020 The Matrix.org Foundation C.I.C. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -ALTER TABLE events ADD COLUMN instance_name TEXT; diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres deleted file mode 100644 index 97c1e6a0c5..0000000000 --- a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres +++ /dev/null @@ -1,26 +0,0 @@ -/* Copyright 2020 The Matrix.org Foundation C.I.C. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -CREATE SEQUENCE IF NOT EXISTS events_stream_seq; - -SELECT setval('events_stream_seq', ( - SELECT COALESCE(MAX(stream_ordering), 1) FROM events -)); - -CREATE SEQUENCE IF NOT EXISTS events_backfill_stream_seq; - -SELECT setval('events_backfill_stream_seq', ( - SELECT COALESCE(-MIN(stream_ordering), 1) FROM events -)); diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 8fd21c2bf8..9f3d23f0a5 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -231,12 +231,8 @@ class MultiWriterIdGenerator: # gaps should be relatively rare it's still worth doing the book keeping # that allows us to skip forwards when there are gapless runs of # positions. - # - # We start at 1 here as a) the first generated stream ID will be 2, and - # b) other parts of the code assume that stream IDs are strictly greater - # than 0. self._persisted_upto_position = ( - min(self._current_positions.values()) if self._current_positions else 1 + min(self._current_positions.values()) if self._current_positions else 0 ) self._known_persisted_positions = [] # type: List[int] @@ -366,7 +362,9 @@ class MultiWriterIdGenerator: equal to it have been successfully persisted. """ - return self.get_persisted_upto_position() + # Currently we don't support this operation, as it's not obvious how to + # condense the stream positions of multiple writers into a single int. + raise NotImplementedError() def get_current_token_for_writer(self, instance_name: str) -> int: """Returns the position of the given writer. |