diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 2a589524a4..85f5e752fe 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -49,6 +49,7 @@ class FederationServer(FederationBase):
super(FederationServer, self).__init__(hs)
self._room_pdu_linearizer = Linearizer()
+ self._server_linearizer = Linearizer()
def set_handler(self, handler):
"""Sets the handler that the replication layer will use to communicate
@@ -89,11 +90,14 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
@log_function
def on_backfill_request(self, origin, room_id, versions, limit):
- pdus = yield self.handler.on_backfill_request(
- origin, room_id, versions, limit
- )
+ with (yield self._server_linearizer.queue((origin, room_id))):
+ pdus = yield self.handler.on_backfill_request(
+ origin, room_id, versions, limit
+ )
+
+ res = self._transaction_from_pdus(pdus).get_dict()
- defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict()))
+ defer.returnValue((200, res))
@defer.inlineCallbacks
@log_function
@@ -184,27 +188,28 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
@log_function
def on_context_state_request(self, origin, room_id, event_id):
- if event_id:
- pdus = yield self.handler.get_state_for_pdu(
- origin, room_id, event_id,
- )
- auth_chain = yield self.store.get_auth_chain(
- [pdu.event_id for pdu in pdus]
- )
+ with (yield self._server_linearizer.queue((origin, room_id))):
+ if event_id:
+ pdus = yield self.handler.get_state_for_pdu(
+ origin, room_id, event_id,
+ )
+ auth_chain = yield self.store.get_auth_chain(
+ [pdu.event_id for pdu in pdus]
+ )
- for event in auth_chain:
- # We sign these again because there was a bug where we
- # incorrectly signed things the first time round
- if self.hs.is_mine_id(event.event_id):
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
+ for event in auth_chain:
+ # We sign these again because there was a bug where we
+ # incorrectly signed things the first time round
+ if self.hs.is_mine_id(event.event_id):
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
)
- )
- else:
- raise NotImplementedError("Specify an event")
+ else:
+ raise NotImplementedError("Specify an event")
defer.returnValue((200, {
"pdus": [pdu.get_pdu_json() for pdu in pdus],
@@ -283,14 +288,16 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_event_auth(self, origin, room_id, event_id):
- time_now = self._clock.time_msec()
- auth_pdus = yield self.handler.on_event_auth(event_id)
- defer.returnValue((200, {
- "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
- }))
+ with (yield self._server_linearizer.queue((origin, room_id))):
+ time_now = self._clock.time_msec()
+ auth_pdus = yield self.handler.on_event_auth(event_id)
+ res = {
+ "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
+ }
+ defer.returnValue((200, res))
@defer.inlineCallbacks
- def on_query_auth_request(self, origin, content, event_id):
+ def on_query_auth_request(self, origin, content, room_id, event_id):
"""
Content is a dict with keys::
auth_chain (list): A list of events that give the auth chain.
@@ -309,32 +316,33 @@ class FederationServer(FederationBase):
Returns:
Deferred: Results in `dict` with the same format as `content`
"""
- auth_chain = [
- self.event_from_pdu_json(e)
- for e in content["auth_chain"]
- ]
-
- signed_auth = yield self._check_sigs_and_hash_and_fetch(
- origin, auth_chain, outlier=True
- )
+ with (yield self._server_linearizer.queue((origin, room_id))):
+ auth_chain = [
+ self.event_from_pdu_json(e)
+ for e in content["auth_chain"]
+ ]
+
+ signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ origin, auth_chain, outlier=True
+ )
- ret = yield self.handler.on_query_auth(
- origin,
- event_id,
- signed_auth,
- content.get("rejects", []),
- content.get("missing", []),
- )
+ ret = yield self.handler.on_query_auth(
+ origin,
+ event_id,
+ signed_auth,
+ content.get("rejects", []),
+ content.get("missing", []),
+ )
- time_now = self._clock.time_msec()
- send_content = {
- "auth_chain": [
- e.get_pdu_json(time_now)
- for e in ret["auth_chain"]
- ],
- "rejects": ret.get("rejects", []),
- "missing": ret.get("missing", []),
- }
+ time_now = self._clock.time_msec()
+ send_content = {
+ "auth_chain": [
+ e.get_pdu_json(time_now)
+ for e in ret["auth_chain"]
+ ],
+ "rejects": ret.get("rejects", []),
+ "missing": ret.get("missing", []),
+ }
defer.returnValue(
(200, send_content)
@@ -386,21 +394,24 @@ class FederationServer(FederationBase):
@log_function
def on_get_missing_events(self, origin, room_id, earliest_events,
latest_events, limit, min_depth):
- logger.info(
- "on_get_missing_events: earliest_events: %r, latest_events: %r,"
- " limit: %d, min_depth: %d",
- earliest_events, latest_events, limit, min_depth
- )
- missing_events = yield self.handler.on_get_missing_events(
- origin, room_id, earliest_events, latest_events, limit, min_depth
- )
+ with (yield self._server_linearizer.queue((origin, room_id))):
+ logger.info(
+ "on_get_missing_events: earliest_events: %r, latest_events: %r,"
+ " limit: %d, min_depth: %d",
+ earliest_events, latest_events, limit, min_depth
+ )
+ missing_events = yield self.handler.on_get_missing_events(
+ origin, room_id, earliest_events, latest_events, limit, min_depth
+ )
- if len(missing_events) < 5:
- logger.info("Returning %d events: %r", len(missing_events), missing_events)
- else:
- logger.info("Returning %d events", len(missing_events))
+ if len(missing_events) < 5:
+ logger.info(
+ "Returning %d events: %r", len(missing_events), missing_events
+ )
+ else:
+ logger.info("Returning %d events", len(missing_events))
- time_now = self._clock.time_msec()
+ time_now = self._clock.time_msec()
defer.returnValue({
"events": [ev.get_pdu_json(time_now) for ev in missing_events],
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 8a1965f45a..26fa88ae84 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -388,7 +388,7 @@ class FederationQueryAuthServlet(BaseFederationServlet):
@defer.inlineCallbacks
def on_POST(self, origin, content, query, context, event_id):
new_content = yield self.handler.on_query_auth_request(
- origin, content, event_id
+ origin, content, context, event_id
)
defer.returnValue((200, new_content))
|