diff --git a/synapse/notifier.py b/synapse/notifier.py
index 26b97cf766..28f0d4a25a 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -226,8 +226,7 @@ class Notifier:
self.store = hs.get_datastores().main
self.pending_new_room_events: List[_PendingRoomEventEntry] = []
- # Called when there are new things to stream over replication
- self.replication_callbacks: List[Callable[[], None]] = []
+ self._replication_notifier = hs.get_replication_notifier()
self._new_join_in_room_callbacks: List[Callable[[str, str], None]] = []
self._federation_client = hs.get_federation_http_client()
@@ -279,7 +278,7 @@ class Notifier:
it needs to do any asynchronous work, a background thread should be started and
wrapped with run_as_background_process.
"""
- self.replication_callbacks.append(cb)
+ self._replication_notifier.add_replication_callback(cb)
def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None:
"""Add a callback that will be called when a user joins a room.
@@ -741,8 +740,7 @@ class Notifier:
def notify_replication(self) -> None:
"""Notify the any replication listeners that there's a new event"""
- for cb in self.replication_callbacks:
- cb()
+ self._replication_notifier.notify_replication()
def notify_user_joined_room(self, event_id: str, room_id: str) -> None:
for cb in self._new_join_in_room_callbacks:
@@ -759,3 +757,26 @@ class Notifier:
# Tell the federation client about the fact the server is back up, so
# that any in flight requests can be immediately retried.
self._federation_client.wake_destination(server)
+
+
+@attr.s(auto_attribs=True)
+class ReplicationNotifier:
+ """Tracks callbacks for things that need to know about stream changes.
+
+ This is separate from the notifier to avoid circular dependencies.
+ """
+
+ _replication_callbacks: List[Callable[[], None]] = attr.Factory(list)
+
+ def add_replication_callback(self, cb: Callable[[], None]) -> None:
+ """Add a callback that will be called when some new data is available.
+ Callback is not given any arguments. It should *not* return a Deferred - if
+ it needs to do any asynchronous work, a background thread should be started and
+ wrapped with run_as_background_process.
+ """
+ self._replication_callbacks.append(cb)
+
+ def notify_replication(self) -> None:
+ """Notify the any replication listeners that there's a new event"""
+ for cb in self._replication_callbacks:
+ cb()
|