diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/federation/federation_client.py | 95 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 23 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 22 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 12 |
4 files changed, 151 insertions, 1 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b06387051c..c6ed720166 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -314,6 +314,40 @@ class FederationClient(FederationBase): Deferred: Results in a list of PDUs. """ + try: + # First we try and ask for just the IDs, as thats far quicker if + # we have most of the state and auth_chain already. + # However, this may 404 if the other side has an old synapse. + result = yield self.transport_layer.get_room_state_ids( + destination, room_id, event_id=event_id, + ) + + state_event_ids = result["pdu_ids"] + auth_event_ids = result.get("auth_chain_ids", []) + + fetched_events, failed_to_fetch = yield self.get_events( + [destination], room_id, set(state_event_ids + auth_event_ids) + ) + + if failed_to_fetch: + logger.warn("Failed to get %r", failed_to_fetch) + + event_map = { + ev.event_id: ev for ev in fetched_events + } + + pdus = [event_map[e_id] for e_id in state_event_ids] + auth_chain = [event_map[e_id] for e_id in auth_event_ids] + + auth_chain.sort(key=lambda e: e.depth) + + defer.returnValue((pdus, auth_chain)) + except HttpResponseException as e: + if e.code == 404: + logger.info("Failed to use get_room_state_ids API, falling back") + else: + raise e + result = yield self.transport_layer.get_room_state( destination, room_id, event_id=event_id, ) @@ -340,6 +374,67 @@ class FederationClient(FederationBase): defer.returnValue((signed_pdus, signed_auth)) @defer.inlineCallbacks + def get_events(self, destinations, room_id, event_ids, return_local=True): + """Fetch events from some remote destinations, checking if we already + have them. + + Args: + destinations (list) + room_id (str) + event_ids (list) + return_local (bool): Whether to include events we already have in + the DB in the returned list of events + + Returns: + Deferred: A deferred resolving to a 2-tuple where the first is a list of + events and the second is a list of event ids that we failed to fetch. + """ + if return_local: + seen_events = yield self.store.get_events(event_ids) + signed_events = seen_events.values() + else: + seen_events = yield self.store.have_events(event_ids) + signed_events = [] + + failed_to_fetch = set() + + missing_events = set(event_ids) + for k in seen_events: + missing_events.discard(k) + + if not missing_events: + defer.returnValue((signed_events, failed_to_fetch)) + + def random_server_list(): + srvs = list(destinations) + random.shuffle(srvs) + return srvs + + batch_size = 20 + missing_events = list(missing_events) + for i in xrange(0, len(missing_events), batch_size): + batch = set(missing_events[i:i + batch_size]) + + deferreds = [ + self.get_pdu( + destinations=random_server_list(), + event_id=e_id, + ) + for e_id in batch + ] + + res = yield defer.DeferredList(deferreds, consumeErrors=True) + for success, result in res: + if success: + signed_events.append(result) + batch.discard(result.event_id) + + # We removed all events we successfully fetched from `batch` + failed_to_fetch.update(batch) + + defer.returnValue((signed_events, failed_to_fetch)) + + @defer.inlineCallbacks @log_function def get_event_auth(self, destination, room_id, event_id): res = yield self.transport_layer.get_event_auth( diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8ec5b190c8..aba19639c7 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -215,6 +215,27 @@ class FederationServer(FederationBase): defer.returnValue((200, resp)) @defer.inlineCallbacks + def on_state_ids_request(self, origin, room_id, event_id): + if not event_id: + raise NotImplementedError("Specify an event") + + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") + + pdus = yield self.handler.get_state_for_pdu( + room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) + + defer.returnValue((200, { + "pdu_ids": [pdu.event_id for pdu in pdus], + "auth_chain_ids": [pdu.event_id for pdu in auth_chain], + })) + + @defer.inlineCallbacks def _on_context_state_request_compute(self, room_id, event_id): pdus = yield self.handler.get_state_for_pdu( room_id, event_id, @@ -584,7 +605,7 @@ class FederationServer(FederationBase): origin, pdu.room_id, pdu.event_id, ) except: - logger.warn("Failed to get state for event: %s", pdu.event_id) + logger.exception("Failed to get state for event: %s", pdu.event_id) yield self.handler.on_receive_pdu( origin, diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index ebb698e278..3d088e43cb 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -55,6 +55,28 @@ class TransportLayerClient(object): ) @log_function + def get_room_state_ids(self, destination, room_id, event_id): + """ Requests all state for a given room from the given server at the + given event. Returns the state's event_id's + + Args: + destination (str): The host name of the remote home server we want + to get the state from. + context (str): The name of the context we want the state of + event_id (str): The event we want the context at. + + Returns: + Deferred: Results in a dict received from the remote homeserver. + """ + logger.debug("get_room_state_ids dest=%s, room=%s", + destination, room_id) + + path = PREFIX + "/state_ids/%s/" % room_id + return self.client.get_json( + destination, path=path, args={"event_id": event_id}, + ) + + @log_function def get_event(self, destination, event_id, timeout=None): """ Requests the pdu with give id and origin from the given server. diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 1a88413d18..0bc6e0801d 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -271,6 +271,17 @@ class FederationStateServlet(BaseFederationServlet): ) +class FederationStateIdsServlet(BaseFederationServlet): + PATH = "/state_ids/(?P<room_id>[^/]*)/" + + def on_GET(self, origin, content, query, room_id): + return self.handler.on_state_ids_request( + origin, + room_id, + query.get("event_id", [None])[0], + ) + + class FederationBackfillServlet(BaseFederationServlet): PATH = "/backfill/(?P<context>[^/]*)/" @@ -536,6 +547,7 @@ SERVLET_CLASSES = ( FederationPullServlet, FederationEventServlet, FederationStateServlet, + FederationStateIdsServlet, FederationBackfillServlet, FederationQueryServlet, FederationMakeJoinServlet, |