summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2019-03-20 16:04:35 +0000
committerGitHub <noreply@github.com>2019-03-20 16:04:35 +0000
commitcdb803616195c76306b30328af9da7cb2b32c960 (patch)
tree69d932c9342dabfe24521baf03c505ee075b8ca2 /synapse/replication/tcp/resource.py
parentBatch up outgoing read-receipts to reduce federation traffic. (#4890) (diff)
downloadsynapse-cdb803616195c76306b30328af9da7cb2b32c960.tar.xz
Add a config option for torture-testing worker replication. (#4902)
Setting this to 50 or so makes a bunch of sytests fail in worker mode.
Diffstat (limited to '')
-rw-r--r--synapse/replication/tcp/resource.py18
1 files changed, 17 insertions, 1 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index fd59f1595f..47cdf30bd3 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -16,6 +16,7 @@
 """
 
 import logging
+import random
 
 from six import itervalues
 
@@ -74,6 +75,8 @@ class ReplicationStreamer(object):
         self.notifier = hs.get_notifier()
         self._server_notices_sender = hs.get_server_notices_sender()
 
+        self._replication_torture_level = hs.config.replication_torture_level
+
         # Current connections.
         self.connections = []
 
@@ -157,10 +160,23 @@ class ReplicationStreamer(object):
                     for stream in self.streams:
                         stream.advance_current_token()
 
-                    for stream in self.streams:
+                    all_streams = self.streams
+
+                    if self._replication_torture_level is not None:
+                        # there is no guarantee about ordering between the streams,
+                        # so let's shuffle them around a bit when we are in torture mode.
+                        all_streams = list(all_streams)
+                        random.shuffle(all_streams)
+
+                    for stream in all_streams:
                         if stream.last_token == stream.upto_token:
                             continue
 
+                        if self._replication_torture_level:
+                            yield self.clock.sleep(
+                                self._replication_torture_level / 1000.0
+                            )
+
                         logger.debug(
                             "Getting stream: %s: %s -> %s",
                             stream.NAME, stream.last_token, stream.upto_token