From 4c131b2c78bae793509bea776107a8183274a709 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 11 Nov 2019 15:47:47 +0000 Subject: Implement v2 API for send_join --- synapse/federation/federation_client.py | 46 ++++++++++++++++++++++++++++----- 1 file changed, 39 insertions(+), 7 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 545d719652..50ae40504d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -664,13 +664,7 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def send_request(destination): - time_now = self._clock.time_msec() - _, content = yield self.transport_layer.send_join( - destination=destination, - room_id=pdu.room_id, - event_id=pdu.event_id, - content=pdu.get_pdu_json(time_now), - ) + content = self._do_send_join(destination, pdu) logger.debug("Got content: %s", content) @@ -737,6 +731,44 @@ class FederationClient(FederationBase): return self._try_destination_list("send_join", destinations, send_request) + @defer.inlineCallbacks + def _do_send_join(self, destination, pdu): + time_now = self._clock.time_msec() + + try: + content = yield self.transport_layer.send_join_v2( + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), + ) + + return content + except HttpResponseException as e: + if e.code in [400, 404]: + err = e.to_synapse_error() + + # If we receive an error response that isn't a generic error, or an + # unrecognised endpoint error, we assume that the remote understands + # the v2 invite API and this is a legitimate error. + if not err.errcode in [Codes.UNKNOWN, Codes.UNRECOGNIZED]: + raise err + else: + raise e.to_synapse_error() + + logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API") + + resp = yield self.transport_layer.send_join_v1( + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), + ) + + # We expect the v1 API to respond with [200, content], so we only return the + # content. + return resp[1] + @defer.inlineCallbacks def send_invite(self, destination, room_id, event_id, pdu): room_version = yield self.store.get_room_version(room_id) -- cgit 1.5.1 From 92527d7b2186a06c204c3c4bff47207252c5dea2 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 11 Nov 2019 16:20:53 +0000 Subject: Add missing yield --- synapse/federation/federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 50ae40504d..4a8e65c292 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -664,7 +664,7 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def send_request(destination): - content = self._do_send_join(destination, pdu) + content = yield self._do_send_join(destination, pdu) logger.debug("Got content: %s", content) -- cgit 1.5.1 From 1e202a90f15a8f518e4350075d40d0423b64318d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 11 Nov 2019 16:26:53 +0000 Subject: Implement v2 API for send_leave --- synapse/federation/federation_client.py | 41 +++++++++++++++++++++++++++++---- synapse/federation/transport/client.py | 20 +++++++++++++++- 2 files changed, 56 insertions(+), 5 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 4a8e65c292..289017b2ed 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -879,17 +879,50 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def send_request(destination): time_now = self._clock.time_msec() - _, content = yield self.transport_layer.send_leave( + content = yield self._do_send_leave(destination, pdu) + + logger.debug("Got content: %s", content) + return None + + return self._try_destination_list("send_leave", destinations, send_request) + + @defer.inlineCallbacks + def _do_send_leave(self, destination, pdu): + time_now = self._clock.time_msec() + + try: + content = yield self.transport_layer.send_leave_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, content=pdu.get_pdu_json(time_now), ) - logger.debug("Got content: %s", content) - return None + return content + except HttpResponseException as e: + if e.code in [400, 404]: + err = e.to_synapse_error() - return self._try_destination_list("send_leave", destinations, send_request) + # If we receive an error response that isn't a generic error, or an + # unrecognised endpoint error, we assume that the remote understands + # the v2 invite API and this is a legitimate error. + if not err.errcode in [Codes.UNKNOWN, Codes.UNRECOGNIZED]: + raise err + else: + raise e.to_synapse_error() + + logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API") + + resp = yield self.transport_layer.send_leave_v1( + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), + ) + + # We expect the v1 API to respond with [200, content], so we only return the + # content. + return resp[1] def get_public_rooms( self, diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index ba68f7c0b4..df2b5dc91b 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -289,7 +289,7 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def send_leave(self, destination, room_id, event_id, content): + def send_leave_v1(self, destination, room_id, event_id, content): path = _create_v1_path("/send_leave/%s/%s", room_id, event_id) response = yield self.client.put_json( @@ -305,6 +305,24 @@ class TransportLayerClient(object): return response + @defer.inlineCallbacks + @log_function + def send_leave_v2(self, destination, room_id, event_id, content): + path = _create_v2_path("/send_leave/%s/%s", room_id, event_id) + + response = yield self.client.put_json( + destination=destination, + path=path, + data=content, + # we want to do our best to send this through. The problem is + # that if it fails, we won't retry it later, so if the remote + # server was just having a momentary blip, the room will be out of + # sync. + ignore_backoff=True, + ) + + return response + @defer.inlineCallbacks @log_function def send_invite_v1(self, destination, room_id, event_id, content): -- cgit 1.5.1 From 94cdd6fffed90cceaf0396a66174ddd5c990c8eb Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 11 Nov 2019 16:56:55 +0000 Subject: Lint --- synapse/federation/federation_client.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 289017b2ed..23c08104b3 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -751,7 +751,7 @@ class FederationClient(FederationBase): # If we receive an error response that isn't a generic error, or an # unrecognised endpoint error, we assume that the remote understands # the v2 invite API and this is a legitimate error. - if not err.errcode in [Codes.UNKNOWN, Codes.UNRECOGNIZED]: + if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]: raise err else: raise e.to_synapse_error() @@ -878,7 +878,6 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def send_request(destination): - time_now = self._clock.time_msec() content = yield self._do_send_leave(destination, pdu) logger.debug("Got content: %s", content) @@ -906,7 +905,7 @@ class FederationClient(FederationBase): # If we receive an error response that isn't a generic error, or an # unrecognised endpoint error, we assume that the remote understands # the v2 invite API and this is a legitimate error. - if not err.errcode in [Codes.UNKNOWN, Codes.UNRECOGNIZED]: + if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]: raise err else: raise e.to_synapse_error() -- cgit 1.5.1 From c3dda2874d78790525f47e502aaed22b64961873 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 10 Dec 2019 16:22:00 +0000 Subject: Refactor get_events_from_store_or_dest to return a dict (#6501) There was a bunch of unnecessary conversion back and forth between dict and list going on here. We can simplify a bunch of the code. --- changelog.d/6501.misc | 1 + synapse/federation/federation_client.py | 44 +++++++++++---------------------- 2 files changed, 16 insertions(+), 29 deletions(-) create mode 100644 changelog.d/6501.misc (limited to 'synapse/federation/federation_client.py') diff --git a/changelog.d/6501.misc b/changelog.d/6501.misc new file mode 100644 index 0000000000..255f45a9c3 --- /dev/null +++ b/changelog.d/6501.misc @@ -0,0 +1 @@ +Refactor get_events_from_store_or_dest to return a dict. diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 709449c9e3..73e1dda6a3 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -18,8 +18,6 @@ import copy import itertools import logging -from six.moves import range - from prometheus_client import Counter from twisted.internet import defer @@ -41,7 +39,7 @@ from synapse.events import builder, room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.utils import log_function -from synapse.util import unwrapFirstError +from synapse.util import batch_iter, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.retryutils import NotRetryingDestination @@ -331,10 +329,12 @@ class FederationClient(FederationBase): state_event_ids = result["pdu_ids"] auth_event_ids = result.get("auth_chain_ids", []) - fetched_events, failed_to_fetch = yield self.get_events_from_store_or_dest( - destination, room_id, set(state_event_ids + auth_event_ids) + desired_events = set(state_event_ids + auth_event_ids) + event_map = yield self.get_events_from_store_or_dest( + destination, room_id, desired_events ) + failed_to_fetch = desired_events - event_map.keys() if failed_to_fetch: logger.warning( "Failed to fetch missing state/auth events for %s: %s", @@ -342,8 +342,6 @@ class FederationClient(FederationBase): failed_to_fetch, ) - event_map = {ev.event_id: ev for ev in fetched_events} - pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map] auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map] @@ -358,23 +356,18 @@ class FederationClient(FederationBase): Args: destination (str) room_id (str) - event_ids (list) + event_ids (Iterable[str]) Returns: - Deferred: A deferred resolving to a 2-tuple where the first is a list of - events and the second is a list of event ids that we failed to fetch. + Deferred[dict[str, EventBase]]: A deferred resolving to a map + from event_id to event """ - seen_events = yield self.store.get_events(event_ids, allow_rejected=True) - signed_events = list(seen_events.values()) - - failed_to_fetch = set() + fetched_events = yield self.store.get_events(event_ids, allow_rejected=True) - missing_events = set(event_ids) - for k in seen_events: - missing_events.discard(k) + missing_events = set(event_ids) - fetched_events.keys() if not missing_events: - return signed_events, failed_to_fetch + return fetched_events logger.debug( "Fetching unknown state/auth events %s for room %s", @@ -384,11 +377,8 @@ class FederationClient(FederationBase): room_version = yield self.store.get_room_version(room_id) - batch_size = 20 - missing_events = list(missing_events) - for i in range(0, len(missing_events), batch_size): - batch = set(missing_events[i : i + batch_size]) - + # XXX 20 requests at once? really? + for batch in batch_iter(missing_events, 20): deferreds = [ run_in_background( self.get_pdu, @@ -404,13 +394,9 @@ class FederationClient(FederationBase): ) for success, result in res: if success and result: - signed_events.append(result) - batch.discard(result.event_id) - - # We removed all events we successfully fetched from `batch` - failed_to_fetch.update(batch) + fetched_events[result.event_id] = result - return signed_events, failed_to_fetch + return fetched_events @defer.inlineCallbacks @log_function -- cgit 1.5.1 From f8bc2ae8830615698ae683cafe4fdddb9a05a1f9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 10 Dec 2019 17:42:46 +0000 Subject: Move get_state methods into FederationHandler (#6503) This is a non-functional refactor as a precursor to some other work. --- changelog.d/6503.misc | 1 + synapse/federation/federation_client.py | 91 ++++------------------------ synapse/handlers/federation.py | 101 ++++++++++++++++++++++++++++++-- 3 files changed, 107 insertions(+), 86 deletions(-) create mode 100644 changelog.d/6503.misc (limited to 'synapse/federation/federation_client.py') diff --git a/changelog.d/6503.misc b/changelog.d/6503.misc new file mode 100644 index 0000000000..e4e9a5a3d4 --- /dev/null +++ b/changelog.d/6503.misc @@ -0,0 +1 @@ +Move get_state methods into FederationHandler. diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 73e1dda6a3..d396e6564f 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -37,9 +37,9 @@ from synapse.api.room_versions import ( ) from synapse.events import builder, room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json -from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.logging.context import make_deferred_yieldable from synapse.logging.utils import log_function -from synapse.util import batch_iter, unwrapFirstError +from synapse.util import unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.retryutils import NotRetryingDestination @@ -308,19 +308,12 @@ class FederationClient(FederationBase): return signed_pdu @defer.inlineCallbacks - @log_function - def get_state_for_room(self, destination, room_id, event_id): - """Requests all of the room state at a given event from a remote homeserver. - - Args: - destination (str): The remote homeserver to query for the state. - room_id (str): The id of the room we're interested in. - event_id (str): The id of the event we want the state at. + def get_room_state_ids(self, destination: str, room_id: str, event_id: str): + """Calls the /state_ids endpoint to fetch the state at a particular point + in the room, and the auth events for the given event Returns: - Deferred[Tuple[List[EventBase], List[EventBase]]]: - A list of events in the state, and a list of events in the auth chain - for the given event. + Tuple[List[str], List[str]]: a tuple of (state event_ids, auth event_ids) """ result = yield self.transport_layer.get_room_state_ids( destination, room_id, event_id=event_id @@ -329,74 +322,12 @@ class FederationClient(FederationBase): state_event_ids = result["pdu_ids"] auth_event_ids = result.get("auth_chain_ids", []) - desired_events = set(state_event_ids + auth_event_ids) - event_map = yield self.get_events_from_store_or_dest( - destination, room_id, desired_events - ) - - failed_to_fetch = desired_events - event_map.keys() - if failed_to_fetch: - logger.warning( - "Failed to fetch missing state/auth events for %s: %s", - room_id, - failed_to_fetch, - ) - - pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map] - auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map] - - auth_chain.sort(key=lambda e: e.depth) - - return pdus, auth_chain - - @defer.inlineCallbacks - def get_events_from_store_or_dest(self, destination, room_id, event_ids): - """Fetch events from a remote destination, checking if we already have them. - - Args: - destination (str) - room_id (str) - event_ids (Iterable[str]) - - Returns: - Deferred[dict[str, EventBase]]: A deferred resolving to a map - from event_id to event - """ - fetched_events = yield self.store.get_events(event_ids, allow_rejected=True) - - missing_events = set(event_ids) - fetched_events.keys() - - if not missing_events: - return fetched_events - - logger.debug( - "Fetching unknown state/auth events %s for room %s", - missing_events, - event_ids, - ) - - room_version = yield self.store.get_room_version(room_id) - - # XXX 20 requests at once? really? - for batch in batch_iter(missing_events, 20): - deferreds = [ - run_in_background( - self.get_pdu, - destinations=[destination], - event_id=e_id, - room_version=room_version, - ) - for e_id in batch - ] - - res = yield make_deferred_yieldable( - defer.DeferredList(deferreds, consumeErrors=True) - ) - for success, result in res: - if success and result: - fetched_events[result.event_id] = result + if not isinstance(state_event_ids, list) or not isinstance( + auth_event_ids, list + ): + raise Exception("invalid response from /state_ids") - return fetched_events + return state_event_ids, auth_event_ids @defer.inlineCallbacks @log_function diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bc26921768..c0dcf9abf8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -64,7 +64,7 @@ from synapse.replication.http.federation import ( from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet from synapse.state import StateResolutionStore, resolve_events_with_store from synapse.types import UserID, get_domain_from_id -from synapse.util import unwrapFirstError +from synapse.util import batch_iter, unwrapFirstError from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room from synapse.util.retryutils import NotRetryingDestination @@ -379,11 +379,9 @@ class FederationHandler(BaseHandler): ( remote_state, got_auth_chain, - ) = yield self.federation_client.get_state_for_room( - origin, room_id, p - ) + ) = yield self._get_state_for_room(origin, room_id, p) - # we want the state *after* p; get_state_for_room returns the + # we want the state *after* p; _get_state_for_room returns the # state *before* p. remote_event = yield self.federation_client.get_pdu( [origin], p, room_version, outlier=True @@ -583,6 +581,97 @@ class FederationHandler(BaseHandler): else: raise + @defer.inlineCallbacks + @log_function + def _get_state_for_room(self, destination, room_id, event_id): + """Requests all of the room state at a given event from a remote homeserver. + + Args: + destination (str): The remote homeserver to query for the state. + room_id (str): The id of the room we're interested in. + event_id (str): The id of the event we want the state at. + + Returns: + Deferred[Tuple[List[EventBase], List[EventBase]]]: + A list of events in the state, and a list of events in the auth chain + for the given event. + """ + ( + state_event_ids, + auth_event_ids, + ) = yield self.federation_client.get_room_state_ids( + destination, room_id, event_id=event_id + ) + + desired_events = set(state_event_ids + auth_event_ids) + event_map = yield self._get_events_from_store_or_dest( + destination, room_id, desired_events + ) + + failed_to_fetch = desired_events - event_map.keys() + if failed_to_fetch: + logger.warning( + "Failed to fetch missing state/auth events for %s: %s", + room_id, + failed_to_fetch, + ) + + pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map] + auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map] + + auth_chain.sort(key=lambda e: e.depth) + + return pdus, auth_chain + + @defer.inlineCallbacks + def _get_events_from_store_or_dest(self, destination, room_id, event_ids): + """Fetch events from a remote destination, checking if we already have them. + + Args: + destination (str) + room_id (str) + event_ids (Iterable[str]) + + Returns: + Deferred[dict[str, EventBase]]: A deferred resolving to a map + from event_id to event + """ + fetched_events = yield self.store.get_events(event_ids, allow_rejected=True) + + missing_events = set(event_ids) - fetched_events.keys() + + if not missing_events: + return fetched_events + + logger.debug( + "Fetching unknown state/auth events %s for room %s", + missing_events, + event_ids, + ) + + room_version = yield self.store.get_room_version(room_id) + + # XXX 20 requests at once? really? + for batch in batch_iter(missing_events, 20): + deferreds = [ + run_in_background( + self.federation_client.get_pdu, + destinations=[destination], + event_id=e_id, + room_version=room_version, + ) + for e_id in batch + ] + + res = yield make_deferred_yieldable( + defer.DeferredList(deferreds, consumeErrors=True) + ) + for success, result in res: + if success and result: + fetched_events[result.event_id] = result + + return fetched_events + @defer.inlineCallbacks def _process_received_pdu(self, origin, event, state, auth_chain): """ Called when we have a new pdu. We need to do auth checks and put it @@ -723,7 +812,7 @@ class FederationHandler(BaseHandler): state_events = {} events_to_state = {} for e_id in edges: - state, auth = yield self.federation_client.get_state_for_room( + state, auth = yield self._get_state_for_room( destination=dest, room_id=room_id, event_id=e_id ) auth_events.update({a.event_id: a for a in auth}) -- cgit 1.5.1