summary refs log tree commit diff
path: root/synapse/replication/tcp/client.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-10-12 15:51:41 +0100
committerGitHub <noreply@github.com>2020-10-12 15:51:41 +0100
commit8de3703d214c814ad637793a0cc2220e20579ffa (patch)
tree3daec0f9a8d7baf92f8746b12c8b54d92b8d7d6d /synapse/replication/tcp/client.py
parentDocker: support passing additional commandline args to synapse (#8390) (diff)
downloadsynapse-8de3703d214c814ad637793a0cc2220e20579ffa.tar.xz
Make event persisters periodically announce position over replication. (#8499)
Currently background proccesses stream the events stream use the "minimum persisted position" (i.e. `get_current_token()`) rather than the vector clock style tokens. This is broadly fine as it doesn't matter if the background processes lag a small amount. However, in extreme cases (i.e. SyTests) where we only write to one event persister the background processes will never make progress.

This PR changes it so that the `MultiWriterIDGenerator` keeps the current position of a given instance as up to date as possible (i.e using the latest token it sees if its not in the process of persisting anything), and then periodically announces that over replication. This then allows the "minimum persisted position" to advance, albeit with a small lag.
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r--synapse/replication/tcp/client.py4
1 files changed, 4 insertions, 0 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e165429cad..e27ee216f0 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -191,6 +191,10 @@ class ReplicationDataHandler:
     async def on_position(self, stream_name: str, instance_name: str, token: int):
         self.store.process_replication_rows(stream_name, instance_name, token, [])
 
+        # We poke the generic "replication" notifier to wake anything up that
+        # may be streaming.
+        self.notifier.notify_replication()
+
     def on_remote_server_up(self, server: str):
         """Called when get a new REMOTE_SERVER_UP command."""