diff options
5 files changed, 41 insertions, 17 deletions
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index da1cc836cf..fff6b0511c 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -80,4 +80,4 @@ class SlavedEventStore( return self._stream_id_gen.get_current_token() def get_room_min_stream_ordering(self): - return self._backfill_id_gen.get_current_token() + return -self._backfill_id_gen.get_current_token() diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index bf2afc3dd7..af794018ad 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -155,8 +155,8 @@ class PersistEventsStore: # Note: Multiple instances of this function cannot be in flight at # the same time for the same room. if backfilled: - stream_ordering_manager = self._backfill_id_gen.get_next_mult( - len(events_and_contexts) + stream_ordering_manager = await maybe_awaitable( + self._backfill_id_gen.get_next_mult(len(events_and_contexts)) ) else: stream_ordering_manager = await maybe_awaitable( @@ -164,6 +164,9 @@ class PersistEventsStore: ) with stream_ordering_manager as stream_orderings: + if backfilled: + stream_orderings = [-s for s in stream_orderings] + for (event, context), stream in zip(events_and_contexts, stream_orderings): event.internal_metadata.stream_ordering = stream diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index bda189b923..fdbe256471 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -37,7 +37,6 @@ from synapse.events import EventBase, make_event_from_dict from synapse.events.utils import prune_event from synapse.logging.context import PreserveLoggingContext, current_context from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.replication.tcp.streams import BackfillStream from synapse.replication.tcp.streams.events import EventsStream from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause @@ -92,18 +91,26 @@ class EventsWorkerStore(SQLBaseStore): id_column="stream_ordering", sequence_name="events_stream_seq", ) + self._backfill_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + instance_name=hs.get_instance_name(), + table="events", + instance_column="instance_name", + id_column="-stream_ordering", + sequence_name="events_backfill_stream_seq", + ) else: self._stream_id_gen = StreamIdGenerator( db_conn, "events", "stream_ordering", ) - self._backfill_id_gen = StreamIdGenerator( - db_conn, - "events", - "stream_ordering", - step=-1, - extra_tables=[("ex_outlier_stream", "event_stream_ordering")], - ) + self._backfill_id_gen = StreamIdGenerator( + db_conn, + "events", + "-stream_ordering", + # extra_tables=[("ex_outlier_stream", "event_stream_ordering")], + ) else: # Another process is in charge of persisting events and generating # stream IDs: rely on the replication streams to let us know which @@ -117,8 +124,14 @@ class EventsWorkerStore(SQLBaseStore): id_column="stream_ordering", sequence_name="events_stream_seq", ) - self._backfill_id_gen = SlavedIdTracker( - db_conn, "events", "stream_ordering", step=-1 + self._backfill_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + instance_name=hs.get_instance_name(), + table="events", + instance_column="instance_name", + id_column="-stream_ordering", + sequence_name="events_backfill_stream_seq", ) self._get_event_cache = Cache( @@ -136,7 +149,7 @@ class EventsWorkerStore(SQLBaseStore): if stream_name == EventsStream.NAME: self._stream_id_gen.advance(instance_name, token) elif stream_name == BackfillStream.NAME: - self._backfill_id_gen.advance(-token) + self._backfill_id_gen.advance(instance_name, token) super().process_replication_rows(stream_name, instance_name, token, rows) @@ -987,7 +1000,7 @@ class EventsWorkerStore(SQLBaseStore): def get_current_backfill_token(self): """The current minimum token that backfilled events have reached""" - return -self._backfill_id_gen.get_current_token() + return self._backfill_id_gen.get_current_token() def get_current_events_token(self): """The current maximum token that events have reached""" diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres index 7901f89941..97c1e6a0c5 100644 --- a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres +++ b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres @@ -18,3 +18,9 @@ CREATE SEQUENCE IF NOT EXISTS events_stream_seq; SELECT setval('events_stream_seq', ( SELECT COALESCE(MAX(stream_ordering), 1) FROM events )); + +CREATE SEQUENCE IF NOT EXISTS events_backfill_stream_seq; + +SELECT setval('events_backfill_stream_seq', ( + SELECT COALESCE(-MIN(stream_ordering), 1) FROM events +)); diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 90663f3443..1557decff8 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -202,6 +202,7 @@ class MultiWriterIdGenerator: ): self._db = db self._instance_name = instance_name + self._sequence_name = sequence_name # We lock as some functions may be called from DB threads. self._lock = threading.Lock() @@ -219,7 +220,7 @@ class MultiWriterIdGenerator: # and b) noting that if we have seen a run of persisted positions # without gaps (e.g. 5, 6, 7) then we can skip forward (e.g. to 7). self._persisted_upto_position = ( - min(self._current_positions.values()) if self._current_positions else 0 + min(self._current_positions.values()) if self._current_positions else 1 ) self._known_persisted_positions = [] # type: List[int] @@ -415,7 +416,8 @@ class MultiWriterIdGenerator: break logger.info( - "Got new_id: %s, setting persited pos to %s", + "Got new_id %s: %s, setting persited pos to %s", + self._sequence_name, new_id, self._persisted_upto_position, ) |