diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_base.py | 3 | ||||
-rw-r--r-- | synapse/federation/federation_client.py | 2 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 112 | ||||
-rw-r--r-- | synapse/federation/replication.py | 2 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 9 |
5 files changed, 83 insertions, 45 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index a0b7cb7963..da2f5e8cfd 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -31,6 +31,9 @@ logger = logging.getLogger(__name__) class FederationBase(object): + def __init__(self, hs): + pass + @defer.inlineCallbacks def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False, include_none=False): diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index d835c1b038..b06387051c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -52,6 +52,8 @@ sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) class FederationClient(FederationBase): + def __init__(self, hs): + super(FederationClient, self).__init__(hs) def start_get_pdu_cache(self): self._get_pdu_cache = ExpiringCache( diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index f1d231b9d8..2a589524a4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -19,6 +19,7 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Transaction, Edu +from synapse.util.async import Linearizer from synapse.util.logutils import log_function from synapse.events import FrozenEvent import synapse.metrics @@ -44,6 +45,11 @@ received_queries_counter = metrics.register_counter("received_queries", labels=[ class FederationServer(FederationBase): + def __init__(self, hs): + super(FederationServer, self).__init__(hs) + + self._room_pdu_linearizer = Linearizer() + def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate receipt of new PDUs from other home servers. The required methods are @@ -187,13 +193,16 @@ class FederationServer(FederationBase): ) for event in auth_chain: - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] + # We sign these again because there was a bug where we + # incorrectly signed things the first time round + if self.hs.is_mine_id(event.event_id): + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) ) - ) else: raise NotImplementedError("Specify an event") @@ -377,10 +386,20 @@ class FederationServer(FederationBase): @log_function def on_get_missing_events(self, origin, room_id, earliest_events, latest_events, limit, min_depth): + logger.info( + "on_get_missing_events: earliest_events: %r, latest_events: %r," + " limit: %d, min_depth: %d", + earliest_events, latest_events, limit, min_depth + ) missing_events = yield self.handler.on_get_missing_events( origin, room_id, earliest_events, latest_events, limit, min_depth ) + if len(missing_events) < 5: + logger.info("Returning %d events: %r", len(missing_events), missing_events) + else: + logger.info("Returning %d events", len(missing_events)) + time_now = self._clock.time_msec() defer.returnValue({ @@ -481,42 +500,59 @@ class FederationServer(FederationBase): pdu.internal_metadata.outlier = True elif min_depth and pdu.depth > min_depth: if get_missing and prevs - seen: - latest = yield self.store.get_latest_event_ids_in_room( - pdu.room_id - ) - - # We add the prev events that we have seen to the latest - # list to ensure the remote server doesn't give them to us - latest = set(latest) - latest |= seen - - missing_events = yield self.get_missing_events( - origin, - pdu.room_id, - earliest_events_ids=list(latest), - latest_events=[pdu], - limit=10, - min_depth=min_depth, - ) - - # We want to sort these by depth so we process them and - # tell clients about them in order. - missing_events.sort(key=lambda x: x.depth) - - for e in missing_events: - yield self._handle_new_pdu( - origin, - e, - get_missing=False - ) - - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) + # If we're missing stuff, ensure we only fetch stuff one + # at a time. + with (yield self._room_pdu_linearizer.queue(pdu.room_id)): + # We recalculate seen, since it may have changed. + have_seen = yield self.store.have_events(prevs) + seen = set(have_seen.keys()) + + if prevs - seen: + latest = yield self.store.get_latest_event_ids_in_room( + pdu.room_id + ) + + # We add the prev events that we have seen to the latest + # list to ensure the remote server doesn't give them to us + latest = set(latest) + latest |= seen + + logger.info( + "Missing %d events for room %r: %r...", + len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + ) + + missing_events = yield self.get_missing_events( + origin, + pdu.room_id, + earliest_events_ids=list(latest), + latest_events=[pdu], + limit=10, + min_depth=min_depth, + ) + + # We want to sort these by depth so we process them and + # tell clients about them in order. + missing_events.sort(key=lambda x: x.depth) + + for e in missing_events: + yield self._handle_new_pdu( + origin, + e, + get_missing=False + ) + + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) prevs = {e_id for e_id, _ in pdu.prev_events} seen = set(have_seen.keys()) if prevs - seen: + logger.info( + "Still missing %d events for room %r: %r...", + len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + ) fetch_state = True if fetch_state: diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 3e062a5eab..ea66a5dcbc 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -72,5 +72,7 @@ class ReplicationLayer(FederationClient, FederationServer): self.hs = hs + super(ReplicationLayer, self).__init__(hs) + def __str__(self): return "<ReplicationLayer(%s)>" % self.server_name diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index a1a334955f..8a1965f45a 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -37,7 +37,7 @@ class TransportLayerServer(JsonResource): self.hs = hs self.clock = hs.get_clock() - super(TransportLayerServer, self).__init__(hs) + super(TransportLayerServer, self).__init__(hs, canonical_json=False) self.authenticator = Authenticator(hs) self.ratelimiter = FederationRateLimiter( @@ -528,15 +528,10 @@ class PublicRoomList(BaseFederationServlet): PATH = "/publicRooms" @defer.inlineCallbacks - def on_GET(self, request): + def on_GET(self, origin, content, query): data = yield self.room_list_handler.get_local_public_room_list() defer.returnValue((200, data)) - # Avoid doing remote HS authorization checks which are done by default by - # BaseFederationServlet. - def _wrap(self, code): - return code - SERVLET_CLASSES = ( FederationSendServlet, |