From 71472bac9109fd2b773b119767312e2f6f32caa6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jan 2023 11:36:06 +0000 Subject: Create a separate ReplicationNotifier --- synapse/notifier.py | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index 26b97cf766..e7b011125b 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -226,8 +226,7 @@ class Notifier: self.store = hs.get_datastores().main self.pending_new_room_events: List[_PendingRoomEventEntry] = [] - # Called when there are new things to stream over replication - self.replication_callbacks: List[Callable[[], None]] = [] + self._replication_notifier = hs.get_replication_notifier() self._new_join_in_room_callbacks: List[Callable[[str, str], None]] = [] self._federation_client = hs.get_federation_http_client() @@ -279,7 +278,7 @@ class Notifier: it needs to do any asynchronous work, a background thread should be started and wrapped with run_as_background_process. """ - self.replication_callbacks.append(cb) + self._replication_notifier.add_replication_callback(cb) def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None: """Add a callback that will be called when a user joins a room. @@ -741,8 +740,7 @@ class Notifier: def notify_replication(self) -> None: """Notify the any replication listeners that there's a new event""" - for cb in self.replication_callbacks: - cb() + self._replication_notifier.notify_replication() def notify_user_joined_room(self, event_id: str, room_id: str) -> None: for cb in self._new_join_in_room_callbacks: @@ -759,3 +757,26 @@ class Notifier: # Tell the federation client about the fact the server is back up, so # that any in flight requests can be immediately retried. self._federation_client.wake_destination(server) + + +@attr.s +class ReplicationNotifier: + """Tracks callbacks for things that need to know about stream changes. + + This is separate from the notifier to avoid circular dependencies. + """ + + _replication_callbacks: List[Callable[[], None]] = attr.Factory(list) + + def add_replication_callback(self, cb: Callable[[], None]) -> None: + """Add a callback that will be called when some new data is available. + Callback is not given any arguments. It should *not* return a Deferred - if + it needs to do any asynchronous work, a background thread should be started and + wrapped with run_as_background_process. + """ + self._replication_callbacks.append(cb) + + def notify_replication(self) -> None: + """Notify the any replication listeners that there's a new event""" + for cb in self._replication_callbacks: + cb() -- cgit 1.5.1 From b72b698701f1f6ed89c2aef1dffbebb1648a6d52 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jan 2023 14:17:18 +0000 Subject: Fix test --- synapse/notifier.py | 2 +- tests/replication/tcp/test_handler.py | 23 +++++++---------------- 2 files changed, 8 insertions(+), 17 deletions(-) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index e7b011125b..28f0d4a25a 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -759,7 +759,7 @@ class Notifier: self._federation_client.wake_destination(server) -@attr.s +@attr.s(auto_attribs=True) class ReplicationNotifier: """Tracks callbacks for things that need to know about stream changes. diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py index 555922409d..6e4055cc21 100644 --- a/tests/replication/tcp/test_handler.py +++ b/tests/replication/tcp/test_handler.py @@ -14,7 +14,7 @@ from twisted.internet import defer -from synapse.replication.tcp.commands import PositionCommand, RdataCommand +from synapse.replication.tcp.commands import PositionCommand from tests.replication._base import BaseMultiWorkerStreamTestCase @@ -111,20 +111,14 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase): next_token = self.get_success(ctx.__aenter__()) self.get_success(ctx.__aexit__(None, None, None)) - cmd_handler.send_command( - RdataCommand("caches", "worker1", next_token, ("func_name", [], 0)) - ) - self.replicate() - self.get_success( data_handler.wait_for_stream_position("worker1", "caches", next_token) ) - # `wait_for_stream_position` should only return once master receives an - # RDATA from the worker - ctx = cache_id_gen.get_next() - next_token = self.get_success(ctx.__aenter__()) - self.get_success(ctx.__aexit__(None, None, None)) + # `wait_for_stream_position` should only return once master receives a + # notification that `next_token` has persisted. + ctx_worker1 = cache_id_gen.get_next() + next_token = self.get_success(ctx_worker1.__aenter__()) d = defer.ensureDeferred( data_handler.wait_for_stream_position("worker1", "caches", next_token) @@ -142,10 +136,7 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase): ) self.assertFalse(d.called) - # ... but receiving the RDATA should - cmd_handler.send_command( - RdataCommand("caches", "worker1", next_token, ("func_name", [], 0)) - ) - self.replicate() + # ... but worker1 finishing (and so sending an update) should. + self.get_success(ctx_worker1.__aexit__(None, None, None)) self.assertTrue(d.called) -- cgit 1.5.1