summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-09-16 11:48:42 +0100
committerErik Johnston <erik@matrix.org>2020-09-29 14:45:42 +0100
commit1e05b033af47c9858b5db49982b5b43b3c415729 (patch)
tree27812f121475cd9599ef721953176ac21ef7dfe6 /synapse/replication/tcp/resource.py
parentWire up token (diff)
downloadsynapse-1e05b033af47c9858b5db49982b5b43b3c415729.tar.xz
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py13
1 files changed, 10 insertions, 3 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py

index 687984e7a8..623d7fff3f 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py
@@ -24,6 +24,7 @@ from twisted.internet.protocol import Factory from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol +from synapse.replication.tcp.streams import EventsStream from synapse.util.metrics import Measure stream_updates_counter = Counter( @@ -84,6 +85,9 @@ class ReplicationStreamer: # Set of streams to replicate. self.streams = self.command_handler.get_streams_to_replicate() + if self.streams: + self.clock.looping_call(self.on_notifier_poke, 1000.0) + def on_notifier_poke(self): """Checks if there is actually any new data and sends it to the connections if there are. @@ -126,9 +130,7 @@ class ReplicationStreamer: random.shuffle(all_streams) for stream in all_streams: - if stream.last_token == stream.current_token( - self._instance_name - ): + if not stream.has_updates(): continue if self._replication_torture_level: @@ -174,6 +176,11 @@ class ReplicationStreamer: except Exception: logger.exception("Failed to replicate") + # for command in stream.extra_commands( + # sent_updates=bool(updates) + # ): + # self.command_handler.send_command(command) + logger.debug("No more pending updates, breaking poke loop") finally: self.pending_updates = False