1 files changed, 17 insertions, 1 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index fd59f1595f..47cdf30bd3 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -16,6 +16,7 @@
"""
import logging
+import random
from six import itervalues
@@ -74,6 +75,8 @@ class ReplicationStreamer(object):
self.notifier = hs.get_notifier()
self._server_notices_sender = hs.get_server_notices_sender()
+ self._replication_torture_level = hs.config.replication_torture_level
+
# Current connections.
self.connections = []
@@ -157,10 +160,23 @@ class ReplicationStreamer(object):
for stream in self.streams:
stream.advance_current_token()
- for stream in self.streams:
+ all_streams = self.streams
+
+ if self._replication_torture_level is not None:
+ # there is no guarantee about ordering between the streams,
+ # so let's shuffle them around a bit when we are in torture mode.
+ all_streams = list(all_streams)
+ random.shuffle(all_streams)
+
+ for stream in all_streams:
if stream.last_token == stream.upto_token:
continue
+ if self._replication_torture_level:
+ yield self.clock.sleep(
+ self._replication_torture_level / 1000.0
+ )
+
logger.debug(
"Getting stream: %s: %s -> %s",
stream.NAME, stream.last_token, stream.upto_token
|