diff options
author | Erik Johnston <erik@matrix.org> | 2016-06-20 14:18:04 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-06-20 14:18:04 +0100 |
commit | bc72d381b2d5322982204bb214453e2af56f70d5 (patch) | |
tree | 4f6932f9a32e5e25b80e8202be75d04e50a9686e /synapse/federation/federation_server.py | |
parent | point to the CAPTCHA docs (diff) | |
parent | Bump version and changelog (diff) | |
download | synapse-bc72d381b2d5322982204bb214453e2af56f70d5.tar.xz |
Merge branch 'release-v0.16.1' of github.com:matrix-org/synapse v0.16.1
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r-- | synapse/federation/federation_server.py | 112 |
1 files changed, 74 insertions, 38 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index f1d231b9d8..2a589524a4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -19,6 +19,7 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Transaction, Edu +from synapse.util.async import Linearizer from synapse.util.logutils import log_function from synapse.events import FrozenEvent import synapse.metrics @@ -44,6 +45,11 @@ received_queries_counter = metrics.register_counter("received_queries", labels=[ class FederationServer(FederationBase): + def __init__(self, hs): + super(FederationServer, self).__init__(hs) + + self._room_pdu_linearizer = Linearizer() + def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate receipt of new PDUs from other home servers. The required methods are @@ -187,13 +193,16 @@ class FederationServer(FederationBase): ) for event in auth_chain: - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] + # We sign these again because there was a bug where we + # incorrectly signed things the first time round + if self.hs.is_mine_id(event.event_id): + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) ) - ) else: raise NotImplementedError("Specify an event") @@ -377,10 +386,20 @@ class FederationServer(FederationBase): @log_function def on_get_missing_events(self, origin, room_id, earliest_events, latest_events, limit, min_depth): + logger.info( + "on_get_missing_events: earliest_events: %r, latest_events: %r," + " limit: %d, min_depth: %d", + earliest_events, latest_events, limit, min_depth + ) missing_events = yield self.handler.on_get_missing_events( origin, room_id, earliest_events, latest_events, limit, min_depth ) + if len(missing_events) < 5: + logger.info("Returning %d events: %r", len(missing_events), missing_events) + else: + logger.info("Returning %d events", len(missing_events)) + time_now = self._clock.time_msec() defer.returnValue({ @@ -481,42 +500,59 @@ class FederationServer(FederationBase): pdu.internal_metadata.outlier = True elif min_depth and pdu.depth > min_depth: if get_missing and prevs - seen: - latest = yield self.store.get_latest_event_ids_in_room( - pdu.room_id - ) - - # We add the prev events that we have seen to the latest - # list to ensure the remote server doesn't give them to us - latest = set(latest) - latest |= seen - - missing_events = yield self.get_missing_events( - origin, - pdu.room_id, - earliest_events_ids=list(latest), - latest_events=[pdu], - limit=10, - min_depth=min_depth, - ) - - # We want to sort these by depth so we process them and - # tell clients about them in order. - missing_events.sort(key=lambda x: x.depth) - - for e in missing_events: - yield self._handle_new_pdu( - origin, - e, - get_missing=False - ) - - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) + # If we're missing stuff, ensure we only fetch stuff one + # at a time. + with (yield self._room_pdu_linearizer.queue(pdu.room_id)): + # We recalculate seen, since it may have changed. + have_seen = yield self.store.have_events(prevs) + seen = set(have_seen.keys()) + + if prevs - seen: + latest = yield self.store.get_latest_event_ids_in_room( + pdu.room_id + ) + + # We add the prev events that we have seen to the latest + # list to ensure the remote server doesn't give them to us + latest = set(latest) + latest |= seen + + logger.info( + "Missing %d events for room %r: %r...", + len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + ) + + missing_events = yield self.get_missing_events( + origin, + pdu.room_id, + earliest_events_ids=list(latest), + latest_events=[pdu], + limit=10, + min_depth=min_depth, + ) + + # We want to sort these by depth so we process them and + # tell clients about them in order. + missing_events.sort(key=lambda x: x.depth) + + for e in missing_events: + yield self._handle_new_pdu( + origin, + e, + get_missing=False + ) + + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) prevs = {e_id for e_id, _ in pdu.prev_events} seen = set(have_seen.keys()) if prevs - seen: + logger.info( + "Still missing %d events for room %r: %r...", + len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + ) fetch_state = True if fetch_state: |