summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-01-20 18:02:18 +0000
committerGitHub <noreply@github.com>2023-01-20 18:02:18 +0000
commit65d03866936adb144631d263a8539a2cb060fd43 (patch)
tree894f71640642a5bf444d475bbb7831cc512d9b13 /synapse/storage/databases/main
parentDockerfile: Bump Python version from 3.9 to 3.11 (#14875) (diff)
downloadsynapse-65d03866936adb144631d263a8539a2cb060fd43.tar.xz
Always notify replication when a stream advances (#14877)
This ensures that all other workers are told about stream updates in a timely manner, without having to remember to manually poke replication.
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/account_data.py2
-rw-r--r--synapse/storage/databases/main/cache.py1
-rw-r--r--synapse/storage/databases/main/deviceinbox.py3
-rw-r--r--synapse/storage/databases/main/devices.py1
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py5
-rw-r--r--synapse/storage/databases/main/events_worker.py10
-rw-r--r--synapse/storage/databases/main/presence.py3
-rw-r--r--synapse/storage/databases/main/push_rule.py1
-rw-r--r--synapse/storage/databases/main/pusher.py1
-rw-r--r--synapse/storage/databases/main/receipts.py2
-rw-r--r--synapse/storage/databases/main/room.py6
11 files changed, 30 insertions, 5 deletions
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 881d7089db..8a359d7eb8 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -75,6 +75,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
             self._account_data_id_gen = MultiWriterIdGenerator(
                 db_conn=db_conn,
                 db=database,
+                notifier=hs.get_replication_notifier(),
                 stream_name="account_data",
                 instance_name=self._instance_name,
                 tables=[
@@ -95,6 +96,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
             # SQLite).
             self._account_data_id_gen = StreamIdGenerator(
                 db_conn,
+                hs.get_replication_notifier(),
                 "room_account_data",
                 "stream_id",
                 extra_tables=[("room_tags_revisions", "stream_id")],
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 2179a8bf59..5b66431691 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -75,6 +75,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
             self._cache_id_gen = MultiWriterIdGenerator(
                 db_conn,
                 database,
+                notifier=hs.get_replication_notifier(),
                 stream_name="caches",
                 instance_name=hs.get_instance_name(),
                 tables=[
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 713be91c5d..8e61aba454 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -91,6 +91,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 MultiWriterIdGenerator(
                     db_conn=db_conn,
                     db=database,
+                    notifier=hs.get_replication_notifier(),
                     stream_name="to_device",
                     instance_name=self._instance_name,
                     tables=[("device_inbox", "instance_name", "stream_id")],
@@ -101,7 +102,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         else:
             self._can_write_to_device = True
             self._device_inbox_id_gen = StreamIdGenerator(
-                db_conn, "device_inbox", "stream_id"
+                db_conn, hs.get_replication_notifier(), "device_inbox", "stream_id"
             )
 
         max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index cd186c8472..903606fb46 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -92,6 +92,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
         # class below that is used on the main process.
         self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
             db_conn,
+            hs.get_replication_notifier(),
             "device_lists_stream",
             "stream_id",
             extra_tables=[
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 4c691642e2..c4ac6c33ba 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -1181,7 +1181,10 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
         super().__init__(database, db_conn, hs)
 
         self._cross_signing_id_gen = StreamIdGenerator(
-            db_conn, "e2e_cross_signing_keys", "stream_id"
+            db_conn,
+            hs.get_replication_notifier(),
+            "e2e_cross_signing_keys",
+            "stream_id",
         )
 
     async def set_e2e_device_keys(
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index d150fa8a94..d8a8bcafb6 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -191,6 +191,7 @@ class EventsWorkerStore(SQLBaseStore):
             self._stream_id_gen = MultiWriterIdGenerator(
                 db_conn=db_conn,
                 db=database,
+                notifier=hs.get_replication_notifier(),
                 stream_name="events",
                 instance_name=hs.get_instance_name(),
                 tables=[("events", "instance_name", "stream_ordering")],
@@ -200,6 +201,7 @@ class EventsWorkerStore(SQLBaseStore):
             self._backfill_id_gen = MultiWriterIdGenerator(
                 db_conn=db_conn,
                 db=database,
+                notifier=hs.get_replication_notifier(),
                 stream_name="backfill",
                 instance_name=hs.get_instance_name(),
                 tables=[("events", "instance_name", "stream_ordering")],
@@ -217,12 +219,14 @@ class EventsWorkerStore(SQLBaseStore):
             # SQLite).
             self._stream_id_gen = StreamIdGenerator(
                 db_conn,
+                hs.get_replication_notifier(),
                 "events",
                 "stream_ordering",
                 is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
             )
             self._backfill_id_gen = StreamIdGenerator(
                 db_conn,
+                hs.get_replication_notifier(),
                 "events",
                 "stream_ordering",
                 step=-1,
@@ -300,6 +304,7 @@ class EventsWorkerStore(SQLBaseStore):
             self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
                 db_conn=db_conn,
                 db=database,
+                notifier=hs.get_replication_notifier(),
                 stream_name="un_partial_stated_event_stream",
                 instance_name=hs.get_instance_name(),
                 tables=[
@@ -311,7 +316,10 @@ class EventsWorkerStore(SQLBaseStore):
             )
         else:
             self._un_partial_stated_events_stream_id_gen = StreamIdGenerator(
-                db_conn, "un_partial_stated_event_stream", "stream_id"
+                db_conn,
+                hs.get_replication_notifier(),
+                "un_partial_stated_event_stream",
+                "stream_id",
             )
 
     def get_un_partial_stated_events_token(self) -> int:
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 7b60815043..beb210f8ee 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -77,6 +77,7 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
             self._presence_id_gen = MultiWriterIdGenerator(
                 db_conn=db_conn,
                 db=database,
+                notifier=hs.get_replication_notifier(),
                 stream_name="presence_stream",
                 instance_name=self._instance_name,
                 tables=[("presence_stream", "instance_name", "stream_id")],
@@ -85,7 +86,7 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
             )
         else:
             self._presence_id_gen = StreamIdGenerator(
-                db_conn, "presence_stream", "stream_id"
+                db_conn, hs.get_replication_notifier(), "presence_stream", "stream_id"
             )
 
         self.hs = hs
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 03182887d1..14ca167b34 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -118,6 +118,7 @@ class PushRulesWorkerStore(
         # class below that is used on the main process.
         self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
             db_conn,
+            hs.get_replication_notifier(),
             "push_rules_stream",
             "stream_id",
             is_writer=hs.config.worker.worker_app is None,
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 7f24a3b6ec..df53e726e6 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -62,6 +62,7 @@ class PusherWorkerStore(SQLBaseStore):
         # class below that is used on the main process.
         self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
             db_conn,
+            hs.get_replication_notifier(),
             "pushers",
             "id",
             extra_tables=[("deleted_pushers", "stream_id")],
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 86f5bce5f0..3468f354e6 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -73,6 +73,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             self._receipts_id_gen = MultiWriterIdGenerator(
                 db_conn=db_conn,
                 db=database,
+                notifier=hs.get_replication_notifier(),
                 stream_name="receipts",
                 instance_name=self._instance_name,
                 tables=[("receipts_linearized", "instance_name", "stream_id")],
@@ -91,6 +92,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             # SQLite).
             self._receipts_id_gen = StreamIdGenerator(
                 db_conn,
+                hs.get_replication_notifier(),
                 "receipts_linearized",
                 "stream_id",
                 is_writer=hs.get_instance_name() in hs.config.worker.writers.receipts,
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 78906a5e1d..7264a33cd4 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -126,6 +126,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
             self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
                 db_conn=db_conn,
                 db=database,
+                notifier=hs.get_replication_notifier(),
                 stream_name="un_partial_stated_room_stream",
                 instance_name=self._instance_name,
                 tables=[
@@ -137,7 +138,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
             )
         else:
             self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator(
-                db_conn, "un_partial_stated_room_stream", "stream_id"
+                db_conn,
+                hs.get_replication_notifier(),
+                "un_partial_stated_room_stream",
+                "stream_id",
             )
 
     async def store_room(