summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py19
1 files changed, 17 insertions, 2 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py

index fd59f1595f..7fc346c7b6 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py
@@ -16,6 +16,7 @@ """ import logging +import random from six import itervalues @@ -56,7 +57,6 @@ class ReplicationStreamProtocolFactory(Factory): self.server_name, self.clock, self.streamer, - addr ) @@ -74,6 +74,8 @@ class ReplicationStreamer(object): self.notifier = hs.get_notifier() self._server_notices_sender = hs.get_server_notices_sender() + self._replication_torture_level = hs.config.replication_torture_level + # Current connections. self.connections = [] @@ -157,10 +159,23 @@ class ReplicationStreamer(object): for stream in self.streams: stream.advance_current_token() - for stream in self.streams: + all_streams = self.streams + + if self._replication_torture_level is not None: + # there is no guarantee about ordering between the streams, + # so let's shuffle them around a bit when we are in torture mode. + all_streams = list(all_streams) + random.shuffle(all_streams) + + for stream in all_streams: if stream.last_token == stream.upto_token: continue + if self._replication_torture_level: + yield self.clock.sleep( + self._replication_torture_level / 1000.0 + ) + logger.debug( "Getting stream: %s: %s -> %s", stream.NAME, stream.last_token, stream.upto_token