summary refs log tree commit diff
path: root/synapse/server.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-22 14:21:54 +0100
committerGitHub <noreply@github.com>2020-05-22 14:21:54 +0100
commit1531b214fc57714c14046a8f66c7b5fe5ec5dcdd (patch)
treefd150f21a14dcc6f5cf3f373984478dd12d43c95 /synapse/server.py
parentConvert sending mail to async/await. (#7557) (diff)
downloadsynapse-1531b214fc57714c14046a8f66c7b5fe5ec5dcdd.tar.xz
Add ability to wait for replication streams (#7542)
The idea here is that if an instance persists an event via the replication HTTP API it can return before we receive that event over replication, which can lead to races where code assumes that persisting an event immediately updates various caches (e.g. current state of the room).

Most of Synapse doesn't hit such races, so we don't do the waiting automagically, instead we do so where necessary to avoid unnecessary delays. We may decide to change our minds here if it turns out there are a lot of subtle races going on.

People probably want to look at this commit by commit.
Diffstat (limited to 'synapse/server.py')
-rw-r--r--synapse/server.py5
1 files changed, 5 insertions, 0 deletions
diff --git a/synapse/server.py b/synapse/server.py
index c530f1aa1a..ca2deb49bb 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -90,6 +90,7 @@ from synapse.push.pusherpool import PusherPool
 from synapse.replication.tcp.client import ReplicationDataHandler
 from synapse.replication.tcp.handler import ReplicationCommandHandler
 from synapse.replication.tcp.resource import ReplicationStreamer
+from synapse.replication.tcp.streams import STREAMS_MAP
 from synapse.rest.media.v1.media_repository import (
     MediaRepository,
     MediaRepositoryResource,
@@ -210,6 +211,7 @@ class HomeServer(object):
         "storage",
         "replication_streamer",
         "replication_data_handler",
+        "replication_streams",
     ]
 
     REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"]
@@ -583,6 +585,9 @@ class HomeServer(object):
     def build_replication_data_handler(self):
         return ReplicationDataHandler(self)
 
+    def build_replication_streams(self):
+        return {stream.NAME: stream(self) for stream in STREAMS_MAP.values()}
+
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)