summary refs log tree commit diff
path: root/synapse/storage/databases/main/pusher.py
diff options
context:
space:
mode:
authorNick Mills-Barrett <nick@beeper.com>2022-11-11 10:51:49 +0000
committerGitHub <noreply@github.com>2022-11-11 10:51:49 +0000
commit3a4f80f8c6f39c5549c56c044e10b35064d8d22f (patch)
treea3d5baf8775d304c63caa7bbb00a28c854dad778 /synapse/storage/databases/main/pusher.py
parentRemove duplicated code to evict entries. (#14410) (diff)
downloadsynapse-3a4f80f8c6f39c5549c56c044e10b35064d8d22f.tar.xz
Merge/remove `Slaved*` stores into `WorkerStores` (#14375)
Diffstat (limited to 'synapse/storage/databases/main/pusher.py')
-rw-r--r--synapse/storage/databases/main/pusher.py41
1 files changed, 35 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 01206950a9..4a01562d45 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -27,13 +27,19 @@ from typing import (
 )
 
 from synapse.push import PusherConfig, ThrottleParams
+from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.replication.tcp.streams import PushersStream
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import (
     DatabasePool,
     LoggingDatabaseConnection,
     LoggingTransaction,
 )
-from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.storage.util.id_generators import (
+    AbstractStreamIdGenerator,
+    AbstractStreamIdTracker,
+    StreamIdGenerator,
+)
 from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
@@ -52,9 +58,21 @@ class PusherWorkerStore(SQLBaseStore):
         hs: "HomeServer",
     ):
         super().__init__(database, db_conn, hs)
-        self._pushers_id_gen = StreamIdGenerator(
-            db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
-        )
+
+        if hs.config.worker.worker_app is None:
+            self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
+                db_conn,
+                "pushers",
+                "id",
+                extra_tables=[("deleted_pushers", "stream_id")],
+            )
+        else:
+            self._pushers_id_gen = SlavedIdTracker(
+                db_conn,
+                "pushers",
+                "id",
+                extra_tables=[("deleted_pushers", "stream_id")],
+            )
 
         self.db_pool.updates.register_background_update_handler(
             "remove_deactivated_pushers",
@@ -96,6 +114,16 @@ class PusherWorkerStore(SQLBaseStore):
 
             yield PusherConfig(**r)
 
+    def get_pushers_stream_token(self) -> int:
+        return self._pushers_id_gen.get_current_token()
+
+    def process_replication_rows(
+        self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
+    ) -> None:
+        if stream_name == PushersStream.NAME:
+            self._pushers_id_gen.advance(instance_name, token)
+        return super().process_replication_rows(stream_name, instance_name, token, rows)
+
     async def get_pushers_by_app_id_and_pushkey(
         self, app_id: str, pushkey: str
     ) -> Iterator[PusherConfig]:
@@ -545,8 +573,9 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
 
 
 class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
-    def get_pushers_stream_token(self) -> int:
-        return self._pushers_id_gen.get_current_token()
+    # Because we have write access, this will be a StreamIdGenerator
+    # (see PusherWorkerStore.__init__)
+    _pushers_id_gen: AbstractStreamIdGenerator
 
     async def add_pusher(
         self,