diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_client.py | 89 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 26 | ||||
-rw-r--r-- | synapse/federation/persistence.py | 4 | ||||
-rw-r--r-- | synapse/federation/sender/__init__.py | 2 | ||||
-rw-r--r-- | synapse/federation/sender/transaction_manager.py | 4 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 24 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 6 |
7 files changed, 42 insertions, 113 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 27f6aff004..709449c9e3 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -324,87 +324,32 @@ class FederationClient(FederationBase): A list of events in the state, and a list of events in the auth chain for the given event. """ - try: - # First we try and ask for just the IDs, as thats far quicker if - # we have most of the state and auth_chain already. - # However, this may 404 if the other side has an old synapse. - result = yield self.transport_layer.get_room_state_ids( - destination, room_id, event_id=event_id - ) - - state_event_ids = result["pdu_ids"] - auth_event_ids = result.get("auth_chain_ids", []) - - fetched_events, failed_to_fetch = yield self.get_events_from_store_or_dest( - destination, room_id, set(state_event_ids + auth_event_ids) - ) - - if failed_to_fetch: - logger.warning( - "Failed to fetch missing state/auth events for %s: %s", - room_id, - failed_to_fetch, - ) - - event_map = {ev.event_id: ev for ev in fetched_events} - - pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map] - auth_chain = [ - event_map[e_id] for e_id in auth_event_ids if e_id in event_map - ] - - auth_chain.sort(key=lambda e: e.depth) - - return pdus, auth_chain - except HttpResponseException as e: - if e.code == 400 or e.code == 404: - logger.info("Failed to use get_room_state_ids API, falling back") - else: - raise e - - result = yield self.transport_layer.get_room_state( + result = yield self.transport_layer.get_room_state_ids( destination, room_id, event_id=event_id ) - room_version = yield self.store.get_room_version(room_id) - format_ver = room_version_to_event_format(room_version) - - pdus = [ - event_from_pdu_json(p, format_ver, outlier=True) for p in result["pdus"] - ] + state_event_ids = result["pdu_ids"] + auth_event_ids = result.get("auth_chain_ids", []) - auth_chain = [ - event_from_pdu_json(p, format_ver, outlier=True) - for p in result.get("auth_chain", []) - ] - - seen_events = yield self.store.get_events( - [ev.event_id for ev in itertools.chain(pdus, auth_chain)] + fetched_events, failed_to_fetch = yield self.get_events_from_store_or_dest( + destination, room_id, set(state_event_ids + auth_event_ids) ) - signed_pdus = yield self._check_sigs_and_hash_and_fetch( - destination, - [p for p in pdus if p.event_id not in seen_events], - outlier=True, - room_version=room_version, - ) - signed_pdus.extend( - seen_events[p.event_id] for p in pdus if p.event_id in seen_events - ) + if failed_to_fetch: + logger.warning( + "Failed to fetch missing state/auth events for %s: %s", + room_id, + failed_to_fetch, + ) - signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, - [p for p in auth_chain if p.event_id not in seen_events], - outlier=True, - room_version=room_version, - ) - signed_auth.extend( - seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events - ) + event_map = {ev.event_id: ev for ev in fetched_events} - signed_auth.sort(key=lambda e: e.depth) + pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map] + auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map] + + auth_chain.sort(key=lambda e: e.depth) - return signed_pdus, signed_auth + return pdus, auth_chain @defer.inlineCallbacks def get_events_from_store_or_dest(self, destination, room_id, event_ids): diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d942d77a72..84d4eca041 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd +# Copyright 2019 Matrix.org Federation C.I.C # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -73,6 +74,7 @@ class FederationServer(FederationBase): self.auth = hs.get_auth() self.handler = hs.get_handlers().federation_handler + self.state = hs.get_state_handler() self._server_linearizer = Linearizer("fed_server") self._transaction_linearizer = Linearizer("fed_txn_handler") @@ -264,9 +266,6 @@ class FederationServer(FederationBase): await self.registry.on_edu(edu_type, origin, content) async def on_context_state_request(self, origin, room_id, event_id): - if not event_id: - raise NotImplementedError("Specify an event") - origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) @@ -280,13 +279,18 @@ class FederationServer(FederationBase): # - but that's non-trivial to get right, and anyway somewhat defeats # the point of the linearizer. with (await self._server_linearizer.queue((origin, room_id))): - resp = await self._state_resp_cache.wrap( - (room_id, event_id), - self._on_context_state_request_compute, - room_id, - event_id, + resp = dict( + await self._state_resp_cache.wrap( + (room_id, event_id), + self._on_context_state_request_compute, + room_id, + event_id, + ) ) + room_version = await self.store.get_room_version(room_id) + resp["room_version"] = room_version + return 200, resp async def on_state_ids_request(self, origin, room_id, event_id): @@ -306,7 +310,11 @@ class FederationServer(FederationBase): return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids} async def _on_context_state_request_compute(self, room_id, event_id): - pdus = await self.handler.get_state_for_pdu(room_id, event_id) + if event_id: + pdus = await self.handler.get_state_for_pdu(room_id, event_id) + else: + pdus = (await self.state.get_current_state(room_id)).values() + auth_chain = await self.store.get_auth_chain([pdu.event_id for pdu in pdus]) return { diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 44edcabed4..d68b4bd670 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -44,7 +44,7 @@ class TransactionActions(object): response code and response body. """ if not transaction.transaction_id: - raise RuntimeError("Cannot persist a transaction with no " "transaction_id") + raise RuntimeError("Cannot persist a transaction with no transaction_id") return self.store.get_received_txn_response(transaction.transaction_id, origin) @@ -56,7 +56,7 @@ class TransactionActions(object): Deferred """ if not transaction.transaction_id: - raise RuntimeError("Cannot persist a transaction with no " "transaction_id") + raise RuntimeError("Cannot persist a transaction with no transaction_id") return self.store.set_received_txn_response( transaction.transaction_id, origin, code, response diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 2b2ee8612a..4ebb0e8bc0 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -49,7 +49,7 @@ sent_pdus_destination_dist_count = Counter( sent_pdus_destination_dist_total = Counter( "synapse_federation_client_sent_pdu_destinations:total", - "" "Total number of PDUs queued for sending across all destinations", + "Total number of PDUs queued for sending across all destinations", ) diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 67b3e1ab6e..5fed626d5b 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -84,7 +84,7 @@ class TransactionManager(object): txn_id = str(self._next_txn_id) logger.debug( - "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)", + "TX [%s] {%s} Attempting new transaction (pdus: %d, edus: %d)", destination, txn_id, len(pdus), @@ -103,7 +103,7 @@ class TransactionManager(object): self._next_txn_id += 1 logger.info( - "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)", + "TX [%s] {%s} Sending transaction [%s], (PDUs: %d, EDUs: %d)", destination, txn_id, transaction.transaction_id, diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index dc95ab2113..46dba84cac 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -39,30 +39,6 @@ class TransportLayerClient(object): self.client = hs.get_http_client() @log_function - def get_room_state(self, destination, room_id, event_id): - """ Requests all state for a given room from the given server at the - given event. - - Args: - destination (str): The host name of the remote homeserver we want - to get the state from. - context (str): The name of the context we want the state of - event_id (str): The event we want the context at. - - Returns: - Deferred: Results in a dict received from the remote homeserver. - """ - logger.debug("get_room_state dest=%s, room=%s", destination, room_id) - - path = _create_v1_path("/state/%s", room_id) - return self.client.get_json( - destination, - path=path, - args={"event_id": event_id}, - try_trailing_slash_on_400=True, - ) - - @log_function def get_room_state_ids(self, destination, room_id, event_id): """ Requests all state for a given room from the given server at the given event. Returns the state's event_id's diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 09baa9c57d..fefc789c85 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -421,7 +421,7 @@ class FederationEventServlet(BaseFederationServlet): return await self.handler.on_pdu_request(origin, event_id) -class FederationStateServlet(BaseFederationServlet): +class FederationStateV1Servlet(BaseFederationServlet): PATH = "/state/(?P<context>[^/]*)/?" # This is when someone asks for all data for a given context. @@ -429,7 +429,7 @@ class FederationStateServlet(BaseFederationServlet): return await self.handler.on_context_state_request( origin, context, - parse_string_from_args(query, "event_id", None, required=True), + parse_string_from_args(query, "event_id", None, required=False), ) @@ -1360,7 +1360,7 @@ class RoomComplexityServlet(BaseFederationServlet): FEDERATION_SERVLET_CLASSES = ( FederationSendServlet, FederationEventServlet, - FederationStateServlet, + FederationStateV1Servlet, FederationStateIdsServlet, FederationBackfillServlet, FederationQueryServlet, |