summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-06-17 16:43:45 +0100
committerErik Johnston <erik@matrix.org>2016-06-17 16:43:45 +0100
commit8f4a9bbc16e6b54f1ab110085e42884fd16abb6a (patch)
treed002608685ff5f081347eb74ab26dd2fd1b24520
parentMerge pull request #878 from matrix-org/erikj/ujson (diff)
downloadsynapse-8f4a9bbc16e6b54f1ab110085e42884fd16abb6a.tar.xz
Linearize some federation endpoints based on (origin, room_id)
-rw-r--r--synapse/federation/federation_server.py143
-rw-r--r--synapse/federation/transport/server.py2
2 files changed, 78 insertions, 67 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 2a589524a4..85f5e752fe 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -49,6 +49,7 @@ class FederationServer(FederationBase):
         super(FederationServer, self).__init__(hs)
 
         self._room_pdu_linearizer = Linearizer()
+        self._server_linearizer = Linearizer()
 
     def set_handler(self, handler):
         """Sets the handler that the replication layer will use to communicate
@@ -89,11 +90,14 @@ class FederationServer(FederationBase):
     @defer.inlineCallbacks
     @log_function
     def on_backfill_request(self, origin, room_id, versions, limit):
-        pdus = yield self.handler.on_backfill_request(
-            origin, room_id, versions, limit
-        )
+        with (yield self._server_linearizer.queue((origin, room_id))):
+            pdus = yield self.handler.on_backfill_request(
+                origin, room_id, versions, limit
+            )
+
+            res = self._transaction_from_pdus(pdus).get_dict()
 
-        defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict()))
+        defer.returnValue((200, res))
 
     @defer.inlineCallbacks
     @log_function
@@ -184,27 +188,28 @@ class FederationServer(FederationBase):
     @defer.inlineCallbacks
     @log_function
     def on_context_state_request(self, origin, room_id, event_id):
-        if event_id:
-            pdus = yield self.handler.get_state_for_pdu(
-                origin, room_id, event_id,
-            )
-            auth_chain = yield self.store.get_auth_chain(
-                [pdu.event_id for pdu in pdus]
-            )
+        with (yield self._server_linearizer.queue((origin, room_id))):
+            if event_id:
+                pdus = yield self.handler.get_state_for_pdu(
+                    origin, room_id, event_id,
+                )
+                auth_chain = yield self.store.get_auth_chain(
+                    [pdu.event_id for pdu in pdus]
+                )
 
-            for event in auth_chain:
-                # We sign these again because there was a bug where we
-                # incorrectly signed things the first time round
-                if self.hs.is_mine_id(event.event_id):
-                    event.signatures.update(
-                        compute_event_signature(
-                            event,
-                            self.hs.hostname,
-                            self.hs.config.signing_key[0]
+                for event in auth_chain:
+                    # We sign these again because there was a bug where we
+                    # incorrectly signed things the first time round
+                    if self.hs.is_mine_id(event.event_id):
+                        event.signatures.update(
+                            compute_event_signature(
+                                event,
+                                self.hs.hostname,
+                                self.hs.config.signing_key[0]
+                            )
                         )
-                    )
-        else:
-            raise NotImplementedError("Specify an event")
+            else:
+                raise NotImplementedError("Specify an event")
 
         defer.returnValue((200, {
             "pdus": [pdu.get_pdu_json() for pdu in pdus],
@@ -283,14 +288,16 @@ class FederationServer(FederationBase):
 
     @defer.inlineCallbacks
     def on_event_auth(self, origin, room_id, event_id):
-        time_now = self._clock.time_msec()
-        auth_pdus = yield self.handler.on_event_auth(event_id)
-        defer.returnValue((200, {
-            "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
-        }))
+        with (yield self._server_linearizer.queue((origin, room_id))):
+            time_now = self._clock.time_msec()
+            auth_pdus = yield self.handler.on_event_auth(event_id)
+            res = {
+                "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
+            }
+        defer.returnValue((200, res))
 
     @defer.inlineCallbacks
-    def on_query_auth_request(self, origin, content, event_id):
+    def on_query_auth_request(self, origin, content, room_id, event_id):
         """
         Content is a dict with keys::
             auth_chain (list): A list of events that give the auth chain.
@@ -309,32 +316,33 @@ class FederationServer(FederationBase):
         Returns:
             Deferred: Results in `dict` with the same format as `content`
         """
-        auth_chain = [
-            self.event_from_pdu_json(e)
-            for e in content["auth_chain"]
-        ]
-
-        signed_auth = yield self._check_sigs_and_hash_and_fetch(
-            origin, auth_chain, outlier=True
-        )
+        with (yield self._server_linearizer.queue((origin, room_id))):
+            auth_chain = [
+                self.event_from_pdu_json(e)
+                for e in content["auth_chain"]
+            ]
+
+            signed_auth = yield self._check_sigs_and_hash_and_fetch(
+                origin, auth_chain, outlier=True
+            )
 
-        ret = yield self.handler.on_query_auth(
-            origin,
-            event_id,
-            signed_auth,
-            content.get("rejects", []),
-            content.get("missing", []),
-        )
+            ret = yield self.handler.on_query_auth(
+                origin,
+                event_id,
+                signed_auth,
+                content.get("rejects", []),
+                content.get("missing", []),
+            )
 
-        time_now = self._clock.time_msec()
-        send_content = {
-            "auth_chain": [
-                e.get_pdu_json(time_now)
-                for e in ret["auth_chain"]
-            ],
-            "rejects": ret.get("rejects", []),
-            "missing": ret.get("missing", []),
-        }
+            time_now = self._clock.time_msec()
+            send_content = {
+                "auth_chain": [
+                    e.get_pdu_json(time_now)
+                    for e in ret["auth_chain"]
+                ],
+                "rejects": ret.get("rejects", []),
+                "missing": ret.get("missing", []),
+            }
 
         defer.returnValue(
             (200, send_content)
@@ -386,21 +394,24 @@ class FederationServer(FederationBase):
     @log_function
     def on_get_missing_events(self, origin, room_id, earliest_events,
                               latest_events, limit, min_depth):
-        logger.info(
-            "on_get_missing_events: earliest_events: %r, latest_events: %r,"
-            " limit: %d, min_depth: %d",
-            earliest_events, latest_events, limit, min_depth
-        )
-        missing_events = yield self.handler.on_get_missing_events(
-            origin, room_id, earliest_events, latest_events, limit, min_depth
-        )
+        with (yield self._server_linearizer.queue((origin, room_id))):
+            logger.info(
+                "on_get_missing_events: earliest_events: %r, latest_events: %r,"
+                " limit: %d, min_depth: %d",
+                earliest_events, latest_events, limit, min_depth
+            )
+            missing_events = yield self.handler.on_get_missing_events(
+                origin, room_id, earliest_events, latest_events, limit, min_depth
+            )
 
-        if len(missing_events) < 5:
-            logger.info("Returning %d events: %r", len(missing_events), missing_events)
-        else:
-            logger.info("Returning %d events", len(missing_events))
+            if len(missing_events) < 5:
+                logger.info(
+                    "Returning %d events: %r", len(missing_events), missing_events
+                )
+            else:
+                logger.info("Returning %d events", len(missing_events))
 
-        time_now = self._clock.time_msec()
+            time_now = self._clock.time_msec()
 
         defer.returnValue({
             "events": [ev.get_pdu_json(time_now) for ev in missing_events],
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 8a1965f45a..26fa88ae84 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -388,7 +388,7 @@ class FederationQueryAuthServlet(BaseFederationServlet):
     @defer.inlineCallbacks
     def on_POST(self, origin, content, query, context, event_id):
         new_content = yield self.handler.on_query_auth_request(
-            origin, content, event_id
+            origin, content, context, event_id
         )
 
         defer.returnValue((200, new_content))