summary refs log tree commit diff
path: root/synapse/replication/http
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-22 14:21:54 +0100
committerGitHub <noreply@github.com>2020-05-22 14:21:54 +0100
commit1531b214fc57714c14046a8f66c7b5fe5ec5dcdd (patch)
treefd150f21a14dcc6f5cf3f373984478dd12d43c95 /synapse/replication/http
parentConvert sending mail to async/await. (#7557) (diff)
downloadsynapse-1531b214fc57714c14046a8f66c7b5fe5ec5dcdd.tar.xz
Add ability to wait for replication streams (#7542)
The idea here is that if an instance persists an event via the replication HTTP API it can return before we receive that event over replication, which can lead to races where code assumes that persisting an event immediately updates various caches (e.g. current state of the room).

Most of Synapse doesn't hit such races, so we don't do the waiting automagically, instead we do so where necessary to avoid unnecessary delays. We may decide to change our minds here if it turns out there are a lot of subtle races going on.

People probably want to look at this commit by commit.
Diffstat (limited to 'synapse/replication/http')
-rw-r--r--synapse/replication/http/federation.py13
-rw-r--r--synapse/replication/http/membership.py14
-rw-r--r--synapse/replication/http/send_event.py4
-rw-r--r--synapse/replication/http/streams.py5
4 files changed, 20 insertions, 16 deletions
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 7e23b565b9..c287c4e269 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -29,7 +29,7 @@ logger = logging.getLogger(__name__)
 
 class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
     """Handles events newly received from federation, including persisting and
-    notifying.
+    notifying. Returns the maximum stream ID of the persisted events.
 
     The API looks like:
 
@@ -46,6 +46,13 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
                 "context": { .. serialized event context .. },
             }],
             "backfilled": false
+        }
+
+        200 OK
+
+        {
+            "max_stream_id": 32443,
+        }
     """
 
     NAME = "fed_send_events"
@@ -115,11 +122,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
 
         logger.info("Got %d events from federation", len(event_and_contexts))
 
-        await self.federation_handler.persist_events_and_notify(
+        max_stream_id = await self.federation_handler.persist_events_and_notify(
             event_and_contexts, backfilled
         )
 
-        return 200, {}
+        return 200, {"max_stream_id": max_stream_id}
 
 
 class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 3577611fd7..050fd34562 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -76,11 +76,11 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
 
         logger.info("remote_join: %s into room: %s", user_id, room_id)
 
-        await self.federation_handler.do_invite_join(
+        event_id, stream_id = await self.federation_handler.do_invite_join(
             remote_room_hosts, room_id, user_id, event_content
         )
 
-        return 200, {}
+        return 200, {"event_id": event_id, "stream_id": stream_id}
 
 
 class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
@@ -136,10 +136,10 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
         logger.info("remote_reject_invite: %s out of room: %s", user_id, room_id)
 
         try:
-            event = await self.federation_handler.do_remotely_reject_invite(
+            event, stream_id = await self.federation_handler.do_remotely_reject_invite(
                 remote_room_hosts, room_id, user_id, event_content,
             )
-            ret = event.get_pdu_json()
+            event_id = event.event_id
         except Exception as e:
             # if we were unable to reject the exception, just mark
             # it as rejected on our end and plough ahead.
@@ -149,10 +149,10 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
             #
             logger.warning("Failed to reject invite: %s", e)
 
-            await self.store.locally_reject_invite(user_id, room_id)
-            ret = {}
+            stream_id = await self.store.locally_reject_invite(user_id, room_id)
+            event_id = None
 
-        return 200, ret
+        return 200, {"event_id": event_id, "stream_id": stream_id}
 
 
 class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index b74b088ff4..c981723c1a 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -119,11 +119,11 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
             "Got event to send with ID: %s into room: %s", event.event_id, event.room_id
         )
 
-        await self.event_creation_handler.persist_and_notify_client_event(
+        stream_id = await self.event_creation_handler.persist_and_notify_client_event(
             requester, event, context, ratelimit=ratelimit, extra_users=extra_users
         )
 
-        return 200, {}
+        return 200, {"stream_id": stream_id}
 
 
 def register_servlets(hs, http_server):
diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
index b705a8e16c..bde97eef32 100644
--- a/synapse/replication/http/streams.py
+++ b/synapse/replication/http/streams.py
@@ -51,10 +51,7 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
         super().__init__(hs)
 
         self._instance_name = hs.get_instance_name()
-
-        # We pull the streams from the replication handler (if we try and make
-        # them ourselves we end up in an import loop).
-        self.streams = hs.get_tcp_replication().get_streams()
+        self.streams = hs.get_replication_streams()
 
     @staticmethod
     def _serialize_payload(stream_name, from_token, upto_token):