1 files changed, 10 insertions, 3 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 687984e7a8..623d7fff3f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -24,6 +24,7 @@ from twisted.internet.protocol import Factory
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
+from synapse.replication.tcp.streams import EventsStream
from synapse.util.metrics import Measure
stream_updates_counter = Counter(
@@ -84,6 +85,9 @@ class ReplicationStreamer:
# Set of streams to replicate.
self.streams = self.command_handler.get_streams_to_replicate()
+ if self.streams:
+ self.clock.looping_call(self.on_notifier_poke, 1000.0)
+
def on_notifier_poke(self):
"""Checks if there is actually any new data and sends it to the
connections if there are.
@@ -126,9 +130,7 @@ class ReplicationStreamer:
random.shuffle(all_streams)
for stream in all_streams:
- if stream.last_token == stream.current_token(
- self._instance_name
- ):
+ if not stream.has_updates():
continue
if self._replication_torture_level:
@@ -174,6 +176,11 @@ class ReplicationStreamer:
except Exception:
logger.exception("Failed to replicate")
+ # for command in stream.extra_commands(
+ # sent_updates=bool(updates)
+ # ):
+ # self.command_handler.send_command(command)
+
logger.debug("No more pending updates, breaking poke loop")
finally:
self.pending_updates = False
|