summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/4902.misc1
-rw-r--r--synapse/config/server.py5
-rw-r--r--synapse/replication/tcp/resource.py18
3 files changed, 23 insertions, 1 deletions
diff --git a/changelog.d/4902.misc b/changelog.d/4902.misc
new file mode 100644
index 0000000000..fecc06a6e8
--- /dev/null
+++ b/changelog.d/4902.misc
@@ -0,0 +1 @@
+Add a config option for torture-testing worker replication.
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 499eb30bea..08e4e45482 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -126,6 +126,11 @@ class ServerConfig(Config):
                 self.public_baseurl += '/'
         self.start_pushers = config.get("start_pushers", True)
 
+        # (undocumented) option for torturing the worker-mode replication a bit,
+        # for testing. The value defines the number of milliseconds to pause before
+        # sending out any replication updates.
+        self.replication_torture_level = config.get("replication_torture_level")
+
         self.listeners = []
         for listener in config.get("listeners", []):
             if not isinstance(listener.get("port", None), int):
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index fd59f1595f..47cdf30bd3 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -16,6 +16,7 @@
 """
 
 import logging
+import random
 
 from six import itervalues
 
@@ -74,6 +75,8 @@ class ReplicationStreamer(object):
         self.notifier = hs.get_notifier()
         self._server_notices_sender = hs.get_server_notices_sender()
 
+        self._replication_torture_level = hs.config.replication_torture_level
+
         # Current connections.
         self.connections = []
 
@@ -157,10 +160,23 @@ class ReplicationStreamer(object):
                     for stream in self.streams:
                         stream.advance_current_token()
 
-                    for stream in self.streams:
+                    all_streams = self.streams
+
+                    if self._replication_torture_level is not None:
+                        # there is no guarantee about ordering between the streams,
+                        # so let's shuffle them around a bit when we are in torture mode.
+                        all_streams = list(all_streams)
+                        random.shuffle(all_streams)
+
+                    for stream in all_streams:
                         if stream.last_token == stream.upto_token:
                             continue
 
+                        if self._replication_torture_level:
+                            yield self.clock.sleep(
+                                self._replication_torture_level / 1000.0
+                            )
+
                         logger.debug(
                             "Getting stream: %s: %s -> %s",
                             stream.NAME, stream.last_token, stream.upto_token