diff options
author | Erik Johnston <erik@matrix.org> | 2023-01-20 18:02:18 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-20 18:02:18 +0000 |
commit | 65d03866936adb144631d263a8539a2cb060fd43 (patch) | |
tree | 894f71640642a5bf444d475bbb7831cc512d9b13 /synapse/notifier.py | |
parent | Dockerfile: Bump Python version from 3.9 to 3.11 (#14875) (diff) | |
download | synapse-65d03866936adb144631d263a8539a2cb060fd43.tar.xz |
Always notify replication when a stream advances (#14877)
This ensures that all other workers are told about stream updates in a timely manner, without having to remember to manually poke replication.
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 31 |
1 files changed, 26 insertions, 5 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index 26b97cf766..28f0d4a25a 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -226,8 +226,7 @@ class Notifier: self.store = hs.get_datastores().main self.pending_new_room_events: List[_PendingRoomEventEntry] = [] - # Called when there are new things to stream over replication - self.replication_callbacks: List[Callable[[], None]] = [] + self._replication_notifier = hs.get_replication_notifier() self._new_join_in_room_callbacks: List[Callable[[str, str], None]] = [] self._federation_client = hs.get_federation_http_client() @@ -279,7 +278,7 @@ class Notifier: it needs to do any asynchronous work, a background thread should be started and wrapped with run_as_background_process. """ - self.replication_callbacks.append(cb) + self._replication_notifier.add_replication_callback(cb) def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None: """Add a callback that will be called when a user joins a room. @@ -741,8 +740,7 @@ class Notifier: def notify_replication(self) -> None: """Notify the any replication listeners that there's a new event""" - for cb in self.replication_callbacks: - cb() + self._replication_notifier.notify_replication() def notify_user_joined_room(self, event_id: str, room_id: str) -> None: for cb in self._new_join_in_room_callbacks: @@ -759,3 +757,26 @@ class Notifier: # Tell the federation client about the fact the server is back up, so # that any in flight requests can be immediately retried. self._federation_client.wake_destination(server) + + +@attr.s(auto_attribs=True) +class ReplicationNotifier: + """Tracks callbacks for things that need to know about stream changes. + + This is separate from the notifier to avoid circular dependencies. + """ + + _replication_callbacks: List[Callable[[], None]] = attr.Factory(list) + + def add_replication_callback(self, cb: Callable[[], None]) -> None: + """Add a callback that will be called when some new data is available. + Callback is not given any arguments. It should *not* return a Deferred - if + it needs to do any asynchronous work, a background thread should be started and + wrapped with run_as_background_process. + """ + self._replication_callbacks.append(cb) + + def notify_replication(self) -> None: + """Notify the any replication listeners that there's a new event""" + for cb in self._replication_callbacks: + cb() |