diff options
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r-- | synapse/replication/tcp/resource.py | 13 |
1 files changed, 10 insertions, 3 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 687984e7a8..623d7fff3f 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -24,6 +24,7 @@ from twisted.internet.protocol import Factory from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol +from synapse.replication.tcp.streams import EventsStream from synapse.util.metrics import Measure stream_updates_counter = Counter( @@ -84,6 +85,9 @@ class ReplicationStreamer: # Set of streams to replicate. self.streams = self.command_handler.get_streams_to_replicate() + if self.streams: + self.clock.looping_call(self.on_notifier_poke, 1000.0) + def on_notifier_poke(self): """Checks if there is actually any new data and sends it to the connections if there are. @@ -126,9 +130,7 @@ class ReplicationStreamer: random.shuffle(all_streams) for stream in all_streams: - if stream.last_token == stream.current_token( - self._instance_name - ): + if not stream.has_updates(): continue if self._replication_torture_level: @@ -174,6 +176,11 @@ class ReplicationStreamer: except Exception: logger.exception("Failed to replicate") + # for command in stream.extra_commands( + # sent_updates=bool(updates) + # ): + # self.command_handler.send_command(command) + logger.debug("No more pending updates, breaking poke loop") finally: self.pending_updates = False |