summary refs log tree commit diff
path: root/synapse/storage/databases/main/events_worker.py
diff options
context:
space:
mode:
authorDavid Robertson <davidr@element.io>2022-11-16 22:16:46 +0000
committerGitHub <noreply@github.com>2022-11-16 22:16:46 +0000
commit115f0eb2334b13665e5c112bd87f95ea393c9047 (patch)
treeba2c16605e2ed5d9fe0849b04da49ff63b106bdf /synapse/storage/databases/main/events_worker.py
parentRemove need for `worker_main_http_uri` setting to use /keys/upload. (#14400) (diff)
downloadsynapse-115f0eb2334b13665e5c112bd87f95ea393c9047.tar.xz
Reintroduce #14376, with bugfix for monoliths (#14468)
* Add tests for StreamIdGenerator

* Drive-by: annotate all defs

* Revert "Revert "Remove slaved id tracker (#14376)" (#14463)"

This reverts commit d63814fd736fed5d3d45ff3af5e6d3bfae50c439, which in
turn reverted 36097e88c4da51fce6556a58c49bd675f4cf20ab. This restores
the latter.

* Fix StreamIdGenerator not handling unpersisted IDs

Spotted by @erikjohnston.

Closes #14456.

* Changelog

Co-authored-by: Nick Mills-Barrett <nick@fizzadar.com>
Co-authored-by: Erik Johnston <erik@matrix.org>
Diffstat (limited to 'synapse/storage/databases/main/events_worker.py')
-rw-r--r--synapse/storage/databases/main/events_worker.py35
1 files changed, 14 insertions, 21 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 8a104f7e93..01e935edef 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -59,7 +59,6 @@ from synapse.metrics.background_process_metrics import (
     run_as_background_process,
     wrap_as_background_process,
 )
-from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
 from synapse.replication.tcp.streams import BackfillStream
 from synapse.replication.tcp.streams.events import EventsStream
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -213,26 +212,20 @@ class EventsWorkerStore(SQLBaseStore):
             # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
             # updated over replication. (Multiple writers are not supported for
             # SQLite).
-            if hs.get_instance_name() in hs.config.worker.writers.events:
-                self._stream_id_gen = StreamIdGenerator(
-                    db_conn,
-                    "events",
-                    "stream_ordering",
-                )
-                self._backfill_id_gen = StreamIdGenerator(
-                    db_conn,
-                    "events",
-                    "stream_ordering",
-                    step=-1,
-                    extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
-                )
-            else:
-                self._stream_id_gen = SlavedIdTracker(
-                    db_conn, "events", "stream_ordering"
-                )
-                self._backfill_id_gen = SlavedIdTracker(
-                    db_conn, "events", "stream_ordering", step=-1
-                )
+            self._stream_id_gen = StreamIdGenerator(
+                db_conn,
+                "events",
+                "stream_ordering",
+                is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
+            )
+            self._backfill_id_gen = StreamIdGenerator(
+                db_conn,
+                "events",
+                "stream_ordering",
+                step=-1,
+                extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
+                is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
+            )
 
         events_max = self._stream_id_gen.get_current_token()
         curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(