summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py13
1 files changed, 10 insertions, 3 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 687984e7a8..623d7fff3f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -24,6 +24,7 @@ from twisted.internet.protocol import Factory
 
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
+from synapse.replication.tcp.streams import EventsStream
 from synapse.util.metrics import Measure
 
 stream_updates_counter = Counter(
@@ -84,6 +85,9 @@ class ReplicationStreamer:
         # Set of streams to replicate.
         self.streams = self.command_handler.get_streams_to_replicate()
 
+        if self.streams:
+            self.clock.looping_call(self.on_notifier_poke, 1000.0)
+
     def on_notifier_poke(self):
         """Checks if there is actually any new data and sends it to the
         connections if there are.
@@ -126,9 +130,7 @@ class ReplicationStreamer:
                         random.shuffle(all_streams)
 
                     for stream in all_streams:
-                        if stream.last_token == stream.current_token(
-                            self._instance_name
-                        ):
+                        if not stream.has_updates():
                             continue
 
                         if self._replication_torture_level:
@@ -174,6 +176,11 @@ class ReplicationStreamer:
                             except Exception:
                                 logger.exception("Failed to replicate")
 
+                        # for command in stream.extra_commands(
+                        #     sent_updates=bool(updates)
+                        # ):
+                        #     self.command_handler.send_command(command)
+
             logger.debug("No more pending updates, breaking poke loop")
         finally:
             self.pending_updates = False