From 8f4a9bbc16e6b54f1ab110085e42884fd16abb6a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Jun 2016 16:43:45 +0100 Subject: Linearize some federation endpoints based on (origin, room_id) --- synapse/federation/federation_server.py | 143 +++++++++++++++++--------------- synapse/federation/transport/server.py | 2 +- 2 files changed, 78 insertions(+), 67 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 2a589524a4..85f5e752fe 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -49,6 +49,7 @@ class FederationServer(FederationBase): super(FederationServer, self).__init__(hs) self._room_pdu_linearizer = Linearizer() + self._server_linearizer = Linearizer() def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate @@ -89,11 +90,14 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_backfill_request(self, origin, room_id, versions, limit): - pdus = yield self.handler.on_backfill_request( - origin, room_id, versions, limit - ) + with (yield self._server_linearizer.queue((origin, room_id))): + pdus = yield self.handler.on_backfill_request( + origin, room_id, versions, limit + ) + + res = self._transaction_from_pdus(pdus).get_dict() - defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) + defer.returnValue((200, res)) @defer.inlineCallbacks @log_function @@ -184,27 +188,28 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_context_state_request(self, origin, room_id, event_id): - if event_id: - pdus = yield self.handler.get_state_for_pdu( - origin, room_id, event_id, - ) - auth_chain = yield self.store.get_auth_chain( - [pdu.event_id for pdu in pdus] - ) + with (yield self._server_linearizer.queue((origin, room_id))): + if event_id: + pdus = yield self.handler.get_state_for_pdu( + origin, room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) - for event in auth_chain: - # We sign these again because there was a bug where we - # incorrectly signed things the first time round - if self.hs.is_mine_id(event.event_id): - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] + for event in auth_chain: + # We sign these again because there was a bug where we + # incorrectly signed things the first time round + if self.hs.is_mine_id(event.event_id): + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) ) - ) - else: - raise NotImplementedError("Specify an event") + else: + raise NotImplementedError("Specify an event") defer.returnValue((200, { "pdus": [pdu.get_pdu_json() for pdu in pdus], @@ -283,14 +288,16 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def on_event_auth(self, origin, room_id, event_id): - time_now = self._clock.time_msec() - auth_pdus = yield self.handler.on_event_auth(event_id) - defer.returnValue((200, { - "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], - })) + with (yield self._server_linearizer.queue((origin, room_id))): + time_now = self._clock.time_msec() + auth_pdus = yield self.handler.on_event_auth(event_id) + res = { + "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], + } + defer.returnValue((200, res)) @defer.inlineCallbacks - def on_query_auth_request(self, origin, content, event_id): + def on_query_auth_request(self, origin, content, room_id, event_id): """ Content is a dict with keys:: auth_chain (list): A list of events that give the auth chain. @@ -309,32 +316,33 @@ class FederationServer(FederationBase): Returns: Deferred: Results in `dict` with the same format as `content` """ - auth_chain = [ - self.event_from_pdu_json(e) - for e in content["auth_chain"] - ] - - signed_auth = yield self._check_sigs_and_hash_and_fetch( - origin, auth_chain, outlier=True - ) + with (yield self._server_linearizer.queue((origin, room_id))): + auth_chain = [ + self.event_from_pdu_json(e) + for e in content["auth_chain"] + ] + + signed_auth = yield self._check_sigs_and_hash_and_fetch( + origin, auth_chain, outlier=True + ) - ret = yield self.handler.on_query_auth( - origin, - event_id, - signed_auth, - content.get("rejects", []), - content.get("missing", []), - ) + ret = yield self.handler.on_query_auth( + origin, + event_id, + signed_auth, + content.get("rejects", []), + content.get("missing", []), + ) - time_now = self._clock.time_msec() - send_content = { - "auth_chain": [ - e.get_pdu_json(time_now) - for e in ret["auth_chain"] - ], - "rejects": ret.get("rejects", []), - "missing": ret.get("missing", []), - } + time_now = self._clock.time_msec() + send_content = { + "auth_chain": [ + e.get_pdu_json(time_now) + for e in ret["auth_chain"] + ], + "rejects": ret.get("rejects", []), + "missing": ret.get("missing", []), + } defer.returnValue( (200, send_content) @@ -386,21 +394,24 @@ class FederationServer(FederationBase): @log_function def on_get_missing_events(self, origin, room_id, earliest_events, latest_events, limit, min_depth): - logger.info( - "on_get_missing_events: earliest_events: %r, latest_events: %r," - " limit: %d, min_depth: %d", - earliest_events, latest_events, limit, min_depth - ) - missing_events = yield self.handler.on_get_missing_events( - origin, room_id, earliest_events, latest_events, limit, min_depth - ) + with (yield self._server_linearizer.queue((origin, room_id))): + logger.info( + "on_get_missing_events: earliest_events: %r, latest_events: %r," + " limit: %d, min_depth: %d", + earliest_events, latest_events, limit, min_depth + ) + missing_events = yield self.handler.on_get_missing_events( + origin, room_id, earliest_events, latest_events, limit, min_depth + ) - if len(missing_events) < 5: - logger.info("Returning %d events: %r", len(missing_events), missing_events) - else: - logger.info("Returning %d events", len(missing_events)) + if len(missing_events) < 5: + logger.info( + "Returning %d events: %r", len(missing_events), missing_events + ) + else: + logger.info("Returning %d events", len(missing_events)) - time_now = self._clock.time_msec() + time_now = self._clock.time_msec() defer.returnValue({ "events": [ev.get_pdu_json(time_now) for ev in missing_events], diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 8a1965f45a..26fa88ae84 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -388,7 +388,7 @@ class FederationQueryAuthServlet(BaseFederationServlet): @defer.inlineCallbacks def on_POST(self, origin, content, query, context, event_id): new_content = yield self.handler.on_query_auth_request( - origin, content, event_id + origin, content, context, event_id ) defer.returnValue((200, new_content)) -- cgit 1.4.1