summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-01-20 18:02:18 +0000
committerGitHub <noreply@github.com>2023-01-20 18:02:18 +0000
commit65d03866936adb144631d263a8539a2cb060fd43 (patch)
tree894f71640642a5bf444d475bbb7831cc512d9b13 /synapse/notifier.py
parentDockerfile: Bump Python version from 3.9 to 3.11 (#14875) (diff)
downloadsynapse-65d03866936adb144631d263a8539a2cb060fd43.tar.xz
Always notify replication when a stream advances (#14877)
This ensures that all other workers are told about stream updates in a timely manner, without having to remember to manually poke replication.
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py31
1 files changed, 26 insertions, 5 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 26b97cf766..28f0d4a25a 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -226,8 +226,7 @@ class Notifier:
         self.store = hs.get_datastores().main
         self.pending_new_room_events: List[_PendingRoomEventEntry] = []
 
-        # Called when there are new things to stream over replication
-        self.replication_callbacks: List[Callable[[], None]] = []
+        self._replication_notifier = hs.get_replication_notifier()
         self._new_join_in_room_callbacks: List[Callable[[str, str], None]] = []
 
         self._federation_client = hs.get_federation_http_client()
@@ -279,7 +278,7 @@ class Notifier:
         it needs to do any asynchronous work, a background thread should be started and
         wrapped with run_as_background_process.
         """
-        self.replication_callbacks.append(cb)
+        self._replication_notifier.add_replication_callback(cb)
 
     def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None:
         """Add a callback that will be called when a user joins a room.
@@ -741,8 +740,7 @@ class Notifier:
 
     def notify_replication(self) -> None:
         """Notify the any replication listeners that there's a new event"""
-        for cb in self.replication_callbacks:
-            cb()
+        self._replication_notifier.notify_replication()
 
     def notify_user_joined_room(self, event_id: str, room_id: str) -> None:
         for cb in self._new_join_in_room_callbacks:
@@ -759,3 +757,26 @@ class Notifier:
         # Tell the federation client about the fact the server is back up, so
         # that any in flight requests can be immediately retried.
         self._federation_client.wake_destination(server)
+
+
+@attr.s(auto_attribs=True)
+class ReplicationNotifier:
+    """Tracks callbacks for things that need to know about stream changes.
+
+    This is separate from the notifier to avoid circular dependencies.
+    """
+
+    _replication_callbacks: List[Callable[[], None]] = attr.Factory(list)
+
+    def add_replication_callback(self, cb: Callable[[], None]) -> None:
+        """Add a callback that will be called when some new data is available.
+        Callback is not given any arguments. It should *not* return a Deferred - if
+        it needs to do any asynchronous work, a background thread should be started and
+        wrapped with run_as_background_process.
+        """
+        self._replication_callbacks.append(cb)
+
+    def notify_replication(self) -> None:
+        """Notify the any replication listeners that there's a new event"""
+        for cb in self._replication_callbacks:
+            cb()