summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/replication/slave/storage/events.py2
-rw-r--r--synapse/storage/databases/main/events.py7
-rw-r--r--synapse/storage/databases/main/events_worker.py37
-rw-r--r--synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres6
-rw-r--r--synapse/storage/util/id_generators.py6
5 files changed, 41 insertions, 17 deletions
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index da1cc836cf..fff6b0511c 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -80,4 +80,4 @@ class SlavedEventStore(
         return self._stream_id_gen.get_current_token()
 
     def get_room_min_stream_ordering(self):
-        return self._backfill_id_gen.get_current_token()
+        return -self._backfill_id_gen.get_current_token()
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index bf2afc3dd7..af794018ad 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -155,8 +155,8 @@ class PersistEventsStore:
         # Note: Multiple instances of this function cannot be in flight at
         # the same time for the same room.
         if backfilled:
-            stream_ordering_manager = self._backfill_id_gen.get_next_mult(
-                len(events_and_contexts)
+            stream_ordering_manager = await maybe_awaitable(
+                self._backfill_id_gen.get_next_mult(len(events_and_contexts))
             )
         else:
             stream_ordering_manager = await maybe_awaitable(
@@ -164,6 +164,9 @@ class PersistEventsStore:
             )
 
         with stream_ordering_manager as stream_orderings:
+            if backfilled:
+                stream_orderings = [-s for s in stream_orderings]
+
             for (event, context), stream in zip(events_and_contexts, stream_orderings):
                 event.internal_metadata.stream_ordering = stream
 
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index bda189b923..fdbe256471 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -37,7 +37,6 @@ from synapse.events import EventBase, make_event_from_dict
 from synapse.events.utils import prune_event
 from synapse.logging.context import PreserveLoggingContext, current_context
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
 from synapse.replication.tcp.streams import BackfillStream
 from synapse.replication.tcp.streams.events import EventsStream
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -92,18 +91,26 @@ class EventsWorkerStore(SQLBaseStore):
                     id_column="stream_ordering",
                     sequence_name="events_stream_seq",
                 )
+                self._backfill_id_gen = MultiWriterIdGenerator(
+                    db_conn=db_conn,
+                    db=database,
+                    instance_name=hs.get_instance_name(),
+                    table="events",
+                    instance_column="instance_name",
+                    id_column="-stream_ordering",
+                    sequence_name="events_backfill_stream_seq",
+                )
             else:
                 self._stream_id_gen = StreamIdGenerator(
                     db_conn, "events", "stream_ordering",
                 )
 
-            self._backfill_id_gen = StreamIdGenerator(
-                db_conn,
-                "events",
-                "stream_ordering",
-                step=-1,
-                extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
-            )
+                self._backfill_id_gen = StreamIdGenerator(
+                    db_conn,
+                    "events",
+                    "-stream_ordering",
+                    # extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
+                )
         else:
             # Another process is in charge of persisting events and generating
             # stream IDs: rely on the replication streams to let us know which
@@ -117,8 +124,14 @@ class EventsWorkerStore(SQLBaseStore):
                 id_column="stream_ordering",
                 sequence_name="events_stream_seq",
             )
-            self._backfill_id_gen = SlavedIdTracker(
-                db_conn, "events", "stream_ordering", step=-1
+            self._backfill_id_gen = MultiWriterIdGenerator(
+                db_conn=db_conn,
+                db=database,
+                instance_name=hs.get_instance_name(),
+                table="events",
+                instance_column="instance_name",
+                id_column="-stream_ordering",
+                sequence_name="events_backfill_stream_seq",
             )
 
         self._get_event_cache = Cache(
@@ -136,7 +149,7 @@ class EventsWorkerStore(SQLBaseStore):
         if stream_name == EventsStream.NAME:
             self._stream_id_gen.advance(instance_name, token)
         elif stream_name == BackfillStream.NAME:
-            self._backfill_id_gen.advance(-token)
+            self._backfill_id_gen.advance(instance_name, token)
 
         super().process_replication_rows(stream_name, instance_name, token, rows)
 
@@ -987,7 +1000,7 @@ class EventsWorkerStore(SQLBaseStore):
 
     def get_current_backfill_token(self):
         """The current minimum token that backfilled events have reached"""
-        return -self._backfill_id_gen.get_current_token()
+        return self._backfill_id_gen.get_current_token()
 
     def get_current_events_token(self):
         """The current maximum token that events have reached"""
diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres
index 7901f89941..97c1e6a0c5 100644
--- a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres
+++ b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres
@@ -18,3 +18,9 @@ CREATE SEQUENCE IF NOT EXISTS events_stream_seq;
 SELECT setval('events_stream_seq', (
     SELECT COALESCE(MAX(stream_ordering), 1) FROM events
 ));
+
+CREATE SEQUENCE IF NOT EXISTS events_backfill_stream_seq;
+
+SELECT setval('events_backfill_stream_seq', (
+    SELECT COALESCE(-MIN(stream_ordering), 1) FROM events
+));
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 90663f3443..1557decff8 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -202,6 +202,7 @@ class MultiWriterIdGenerator:
     ):
         self._db = db
         self._instance_name = instance_name
+        self._sequence_name = sequence_name
 
         # We lock as some functions may be called from DB threads.
         self._lock = threading.Lock()
@@ -219,7 +220,7 @@ class MultiWriterIdGenerator:
         # and b) noting that if we have seen a run of persisted positions
         # without gaps (e.g. 5, 6, 7) then we can skip forward (e.g. to 7).
         self._persisted_upto_position = (
-            min(self._current_positions.values()) if self._current_positions else 0
+            min(self._current_positions.values()) if self._current_positions else 1
         )
         self._known_persisted_positions = []  # type: List[int]
 
@@ -415,7 +416,8 @@ class MultiWriterIdGenerator:
                 break
 
         logger.info(
-            "Got new_id: %s, setting persited pos to %s",
+            "Got new_id %s: %s, setting persited pos to %s",
+            self._sequence_name,
             new_id,
             self._persisted_upto_position,
         )