From d240796dedcfae1f6929c1501e7e335df417cfaf Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 17:20:07 +0100 Subject: Basic, un-cached support for secondary_directory_servers --- synapse/federation/federation_client.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 37ee469fa2..ba8d71c050 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -24,6 +24,7 @@ from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) from synapse.util import unwrapFirstError +from synapse.util.async import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent @@ -550,6 +551,26 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to send to any server.") + @defer.inlineCallbacks + def get_public_rooms(self, destinations): + results_by_server = {} + + @defer.inlineCallbacks + def _get_result(s): + if s == self.server_name: + defer.returnValue() + + try: + result = yield self.transport_layer.get_public_rooms(s) + results_by_server[s] = result + except: + logger.exception("Error getting room list from server %r", s) + + + yield concurrently_execute(_get_result, destinations, 3) + + defer.returnValue(results_by_server) + @defer.inlineCallbacks def query_auth(self, destination, room_id, event_id, local_auth): """ -- cgit 1.4.1 From 963e3ed2828f6d1e704678af971ceffff3076115 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 17:22:53 +0100 Subject: Apparently I am not permitted to have two blank lines here --- synapse/federation/federation_client.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ba8d71c050..d835c1b038 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -566,7 +566,6 @@ class FederationClient(FederationBase): except: logger.exception("Error getting room list from server %r", s) - yield concurrently_execute(_get_result, destinations, 3) defer.returnValue(results_by_server) -- cgit 1.4.1 From d41a1a91d3cce28e5416a91b7494d079e4c765f0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Jun 2016 15:12:59 +0100 Subject: Linearize fetching of gaps on incoming events This potentially stops the server from doing multiple requests for the same data. --- synapse/federation/federation_base.py | 3 ++ synapse/federation/federation_client.py | 2 + synapse/federation/federation_server.py | 88 +++++++++++++++++++-------------- synapse/federation/replication.py | 2 + 4 files changed, 58 insertions(+), 37 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index a0b7cb7963..da2f5e8cfd 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -31,6 +31,9 @@ logger = logging.getLogger(__name__) class FederationBase(object): + def __init__(self, hs): + pass + @defer.inlineCallbacks def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False, include_none=False): diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index d835c1b038..b06387051c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -52,6 +52,8 @@ sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) class FederationClient(FederationBase): + def __init__(self, hs): + super(FederationClient, self).__init__(hs) def start_get_pdu_cache(self): self._get_pdu_cache = ExpiringCache( diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9f2a64dede..fe92457ba1 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -19,6 +19,7 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Transaction, Edu +from synapse.util.async import Linearizer from synapse.util.logutils import log_function from synapse.events import FrozenEvent import synapse.metrics @@ -44,6 +45,11 @@ received_queries_counter = metrics.register_counter("received_queries", labels=[ class FederationServer(FederationBase): + def __init__(self, hs): + super(FederationServer, self).__init__(hs) + + self._room_pdu_linearizer = Linearizer() + def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate receipt of new PDUs from other home servers. The required methods are @@ -491,43 +497,51 @@ class FederationServer(FederationBase): pdu.internal_metadata.outlier = True elif min_depth and pdu.depth > min_depth: if get_missing and prevs - seen: - latest = yield self.store.get_latest_event_ids_in_room( - pdu.room_id - ) - - # We add the prev events that we have seen to the latest - # list to ensure the remote server doesn't give them to us - latest = set(latest) - latest |= seen - - logger.info( - "Missing %d events for room %r: %r...", - len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] - ) - - missing_events = yield self.get_missing_events( - origin, - pdu.room_id, - earliest_events_ids=list(latest), - latest_events=[pdu], - limit=10, - min_depth=min_depth, - ) - - # We want to sort these by depth so we process them and - # tell clients about them in order. - missing_events.sort(key=lambda x: x.depth) - - for e in missing_events: - yield self._handle_new_pdu( - origin, - e, - get_missing=False - ) - - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) + # If we're missing stuff, ensure we only fetch stuff one + # at a time. + with (yield self._room_pdu_linearizer.queue(pdu.room_id)): + # We recalculate seen, since it may have changed. + have_seen = yield self.store.have_events(prevs) + seen = set(have_seen.keys()) + + if prevs - seen: + latest = yield self.store.get_latest_event_ids_in_room( + pdu.room_id + ) + + # We add the prev events that we have seen to the latest + # list to ensure the remote server doesn't give them to us + latest = set(latest) + latest |= seen + + logger.info( + "Missing %d events for room %r: %r...", + len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + ) + + missing_events = yield self.get_missing_events( + origin, + pdu.room_id, + earliest_events_ids=list(latest), + latest_events=[pdu], + limit=10, + min_depth=min_depth, + ) + + # We want to sort these by depth so we process them and + # tell clients about them in order. + missing_events.sort(key=lambda x: x.depth) + + for e in missing_events: + yield self._handle_new_pdu( + origin, + e, + get_missing=False + ) + + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) prevs = {e_id for e_id, _ in pdu.prev_events} seen = set(have_seen.keys()) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 3e062a5eab..ea66a5dcbc 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -72,5 +72,7 @@ class ReplicationLayer(FederationClient, FederationServer): self.hs = hs + super(ReplicationLayer, self).__init__(hs) + def __str__(self): return "" % self.server_name -- cgit 1.4.1 From e3a720217a9d200a7c3db8305df53ef8bf76f565 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 14:47:37 +0100 Subject: Add /state_ids federation API The new API only returns the event_ids for the state, as most requesters will already have the vast majority of the events already. --- synapse/federation/federation_client.py | 73 +++++++++++++++++++++++++++++++-- synapse/federation/federation_server.py | 21 ++++++++++ synapse/federation/transport/client.py | 22 ++++++++++ synapse/federation/transport/server.py | 12 ++++++ 4 files changed, 125 insertions(+), 3 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b06387051c..03f6133e61 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -314,9 +314,32 @@ class FederationClient(FederationBase): Deferred: Results in a list of PDUs. """ - result = yield self.transport_layer.get_room_state( - destination, room_id, event_id=event_id, - ) + try: + # First we try and ask for just the IDs, as thats far quicker if + # we have most of the state and auth_chain already. + # However, this may 404 if the other side has an old synapse. + result = yield self.transport_layer.get_room_state_ids( + destination, room_id, event_id=event_id, + ) + + state_event_ids = result["pdus"] + auth_event_ids = result.get("auth_chain", []) + + event_map, _failed_to_fetch = yield self.get_events( + [destination], room_id, set(state_event_ids + auth_event_ids) + ) + + pdus = [event_map[e_id] for e_id in state_event_ids] + auth_chain = [event_map[e_id] for e_id in auth_event_ids] + + auth_chain.sort(key=lambda e: e.depth) + + defer.returnValue((pdus, auth_chain)) + except HttpResponseException as e: + if e.code == 404: + logger.info("Failed to use get_room_state_ids API, falling back") + else: + raise e pdus = [ self.event_from_pdu_json(p, outlier=True) for p in result["pdus"] @@ -339,6 +362,50 @@ class FederationClient(FederationBase): defer.returnValue((signed_pdus, signed_auth)) + @defer.inlineCallbacks + def get_events(self, destinations, room_id, event_ids, return_local=True): + if return_local: + seen_events = yield self.store.get_events(event_ids) + signed_events = seen_events.values() + else: + seen_events = yield self.store.have_events(event_ids) + signed_events = [] + + failed_to_fetch = [] + + missing_events = set(event_ids) + for k in seen_events: + missing_events.discard(k) + + if not missing_events: + defer.returnValue((signed_events, failed_to_fetch)) + + def random_server_list(): + srvs = list(destinations) + random.shuffle(srvs) + return srvs + + batch_size = 20 + for i in xrange(0, len(missing_events), batch_size): + batch = missing_events[i:i + batch_size] + + deferreds = [ + self.get_pdu( + destinations=random_server_list(), + event_id=e_id, + ).addBoth(lambda r, e: (r, e), e_id) + for e_id in batch + ] + + res = yield defer.DeferredList(deferreds, consumeErrors=True) + for (result, val), (e_id, _) in res: + if result and val: + signed_events.append(val) + else: + failed_to_fetch.add(e_id) + + defer.returnValue((signed_events, failed_to_fetch)) + @defer.inlineCallbacks @log_function def get_event_auth(self, destination, room_id, event_id): diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 612d274bdb..40e9fda0eb 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -214,6 +214,27 @@ class FederationServer(FederationBase): defer.returnValue((200, resp)) + @defer.inlineCallbacks + def on_state_ids_request(self, origin, room_id, event_id): + if not event_id: + raise NotImplementedError("Specify an event") + + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") + + pdus = yield self.handler.get_state_for_pdu( + room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) + + defer.returnValue((200, { + "pdus": [pdu.event_id for pdu in pdus], + "auth_chain": [pdu.event_id for pdu in auth_chain], + })) + @defer.inlineCallbacks def _on_context_state_request_compute(self, room_id, event_id): pdus = yield self.handler.get_state_for_pdu( diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index ebb698e278..3d088e43cb 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -54,6 +54,28 @@ class TransportLayerClient(object): destination, path=path, args={"event_id": event_id}, ) + @log_function + def get_room_state_ids(self, destination, room_id, event_id): + """ Requests all state for a given room from the given server at the + given event. Returns the state's event_id's + + Args: + destination (str): The host name of the remote home server we want + to get the state from. + context (str): The name of the context we want the state of + event_id (str): The event we want the context at. + + Returns: + Deferred: Results in a dict received from the remote homeserver. + """ + logger.debug("get_room_state_ids dest=%s, room=%s", + destination, room_id) + + path = PREFIX + "/state_ids/%s/" % room_id + return self.client.get_json( + destination, path=path, args={"event_id": event_id}, + ) + @log_function def get_event(self, destination, event_id, timeout=None): """ Requests the pdu with give id and origin from the given server. diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 26fa88ae84..3ae7c48457 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -271,6 +271,17 @@ class FederationStateServlet(BaseFederationServlet): ) +class FederationStateIdsServlet(BaseFederationServlet): + PATH = "/state_ids/(?P[^/]*)/" + + def on_GET(self, origin, content, query, room_id): + return self.handler.on_state_ids_request( + origin, + room_id, + query.get("event_id", [None])[0], + ) + + class FederationBackfillServlet(BaseFederationServlet): PATH = "/backfill/(?P[^/]*)/" @@ -538,6 +549,7 @@ SERVLET_CLASSES = ( FederationPullServlet, FederationEventServlet, FederationStateServlet, + FederationStateIdsServlet, FederationBackfillServlet, FederationQueryServlet, FederationMakeJoinServlet, -- cgit 1.4.1 From a60a2eaa02f454dbc450cf821f6cd1c6b0b93993 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 14:52:43 +0100 Subject: Comment --- synapse/federation/federation_client.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 03f6133e61..0491f1c3fe 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -364,6 +364,20 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def get_events(self, destinations, room_id, event_ids, return_local=True): + """Fetch events from some remote destinations, checking if we already + have them. + + Args: + destinations (list) + room_id (str) + event_ids (list) + return_local (bool): Whether to include events we already have in + the DB in the returned list of events + + Returns: + Deferred: A deferred resolving to a 2-tuple where the first is a list of + events and the second is a list of event ids that we failed to fetch. + """ if return_local: seen_events = yield self.store.get_events(event_ids) signed_events = seen_events.values() -- cgit 1.4.1 From 520ee9bd2c91a75eb1dc7ed923016967856c6bdf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 15:03:15 +0100 Subject: Fix syntax error --- synapse/federation/federation_client.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 0491f1c3fe..6c626cf12c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -325,10 +325,17 @@ class FederationClient(FederationBase): state_event_ids = result["pdus"] auth_event_ids = result.get("auth_chain", []) - event_map, _failed_to_fetch = yield self.get_events( + fetched_events, failed_to_fetch = yield self.get_events( [destination], room_id, set(state_event_ids + auth_event_ids) ) + if failed_to_fetch: + logger.warn("Failed to get %r", failed_to_fetch) + + event_map = { + ev.event_id: ev for ev in fetched_events + } + pdus = [event_map[e_id] for e_id in state_event_ids] auth_chain = [event_map[e_id] for e_id in auth_event_ids] -- cgit 1.4.1 From 4c56bedee3bb63d7035fca4a1a092b11de0b18cc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 15:04:29 +0100 Subject: Actually call get_room_state --- synapse/federation/federation_client.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 6c626cf12c..7eadcdd28c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -348,6 +348,10 @@ class FederationClient(FederationBase): else: raise e + result = yield self.transport_layer.get_room_state( + destination, room_id, event_id=event_id, + ) + pdus = [ self.event_from_pdu_json(p, outlier=True) for p in result["pdus"] ] -- cgit 1.4.1 From bcc9cda8ca75b5cc381ce10ba9b8e4af56c6bdaa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 17:17:26 +0100 Subject: Fix copy + paste fails --- synapse/federation/federation_client.py | 15 ++++++++++----- synapse/federation/federation_server.py | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 7eadcdd28c..dde10967be 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -411,8 +411,13 @@ class FederationClient(FederationBase): return srvs batch_size = 20 - for i in xrange(0, len(missing_events), batch_size): - batch = missing_events[i:i + batch_size] + while missing_events: + batch = [] + try: + for _ in range(0, batch_size): + batch.append(missing_events.pop()) + except KeyError: + pass deferreds = [ self.get_pdu( @@ -423,9 +428,9 @@ class FederationClient(FederationBase): ] res = yield defer.DeferredList(deferreds, consumeErrors=True) - for (result, val), (e_id, _) in res: - if result and val: - signed_events.append(val) + for success, (result, e_id) in res: + if success and result: + signed_events.append(result) else: failed_to_fetch.add(e_id) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 40e9fda0eb..35a01eecca 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -623,7 +623,7 @@ class FederationServer(FederationBase): origin, pdu.room_id, pdu.event_id, ) except: - logger.warn("Failed to get state for event: %s", pdu.event_id) + logger.exception("Failed to get state for event: %s", pdu.event_id) yield self.handler.on_receive_pdu( origin, -- cgit 1.4.1 From edb33eb163b6c60bfd2c3cab78a6bd13a47b6702 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 17:19:15 +0100 Subject: Rename fields to _ids --- synapse/federation/federation_client.py | 4 ++-- synapse/federation/federation_server.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index dde10967be..264f3c0aec 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -322,8 +322,8 @@ class FederationClient(FederationBase): destination, room_id, event_id=event_id, ) - state_event_ids = result["pdus"] - auth_event_ids = result.get("auth_chain", []) + state_event_ids = result["pdu_ids"] + auth_event_ids = result.get("auth_chain_ids", []) fetched_events, failed_to_fetch = yield self.get_events( [destination], room_id, set(state_event_ids + auth_event_ids) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 35a01eecca..2b91f93e09 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -231,8 +231,8 @@ class FederationServer(FederationBase): ) defer.returnValue((200, { - "pdus": [pdu.event_id for pdu in pdus], - "auth_chain": [pdu.event_id for pdu in auth_chain], + "pdu_ids": [pdu.event_id for pdu in pdus], + "auth_chain_ids": [pdu.event_id for pdu in auth_chain], })) @defer.inlineCallbacks -- cgit 1.4.1 From e3ee63578f335037c73675209bb7861045c2027a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Aug 2016 14:01:18 +0100 Subject: Tidy up get_events --- synapse/federation/federation_client.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 264f3c0aec..ae0d650700 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -411,28 +411,26 @@ class FederationClient(FederationBase): return srvs batch_size = 20 - while missing_events: - batch = [] - try: - for _ in range(0, batch_size): - batch.append(missing_events.pop()) - except KeyError: - pass + missing_events = len(missing_events) + for i in xrange(0, batch_size, batch_size): + batch = set(missing_events[i:i + batch_size]) deferreds = [ self.get_pdu( destinations=random_server_list(), event_id=e_id, - ).addBoth(lambda r, e: (r, e), e_id) + ) for e_id in batch ] res = yield defer.DeferredList(deferreds, consumeErrors=True) - for success, (result, e_id) in res: - if success and result: + for success, result in res: + if success: signed_events.append(result) - else: - failed_to_fetch.add(e_id) + batch.discard(result.event_id) + + # We removed all events we successfully fetched from `batch` + failed_to_fetch.update(batch) defer.returnValue((signed_events, failed_to_fetch)) -- cgit 1.4.1 From 257c41cc2e7163c42b8bcfa3a29d42ae5ac087b9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Aug 2016 14:05:45 +0100 Subject: Fix typos. --- synapse/federation/federation_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ae0d650700..c6ed720166 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -396,7 +396,7 @@ class FederationClient(FederationBase): seen_events = yield self.store.have_events(event_ids) signed_events = [] - failed_to_fetch = [] + failed_to_fetch = set() missing_events = set(event_ids) for k in seen_events: @@ -411,8 +411,8 @@ class FederationClient(FederationBase): return srvs batch_size = 20 - missing_events = len(missing_events) - for i in xrange(0, batch_size, batch_size): + missing_events = list(missing_events) + for i in xrange(0, len(missing_events), batch_size): batch = set(missing_events[i:i + batch_size]) deferreds = [ -- cgit 1.4.1 From a2b7102eea23572c8a3619704f5f5c3f21a8edcd Mon Sep 17 00:00:00 2001 From: Benjamin Saunders Date: Thu, 4 Aug 2016 20:38:08 -0700 Subject: Tweak integrity error recovery to work as intended --- synapse/federation/federation_client.py | 2 +- synapse/storage/events.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c6ed720166..c5e99cebf7 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -343,7 +343,7 @@ class FederationClient(FederationBase): defer.returnValue((pdus, auth_chain)) except HttpResponseException as e: - if e.code == 404: + if e.code == 400: logger.info("Failed to use get_room_state_ids API, falling back") else: raise e diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 340c0621cc..643c3aed2a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -580,6 +580,7 @@ class EventsStore(SQLBaseStore): for table in ( "events", + "event_auth", "event_json", "event_content_hashes", "event_destinations", @@ -593,6 +594,8 @@ class EventsStore(SQLBaseStore): "event_to_state_groups", "rejections", "redactions", + "room_memberships", + "state_events" ): txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), -- cgit 1.4.1 From 1515d1b581158e98c6513cf8b3b9548787854a64 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Aug 2016 10:24:23 +0100 Subject: Fallback to /state/ on both 400 and 404 --- synapse/federation/federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c5e99cebf7..7e1690b0fb 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -343,7 +343,7 @@ class FederationClient(FederationBase): defer.returnValue((pdus, auth_chain)) except HttpResponseException as e: - if e.code == 400: + if e.code == 400 or e.code == 404: logger.info("Failed to use get_room_state_ids API, falling back") else: raise e -- cgit 1.4.1 From fccadb7719f872ecdabd6ebfb299156eca208454 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Aug 2016 10:43:47 +0100 Subject: Check if we already have the events returned by /state/ --- synapse/federation/federation_client.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 7e1690b0fb..b751d26de1 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -361,12 +361,26 @@ class FederationClient(FederationBase): for p in result.get("auth_chain", []) ] + seen_events = yield self.store.get_events([ + ev.event_id for ev in itertools.chain(pdus, auth_chain) + ]) + signed_pdus = yield self._check_sigs_and_hash_and_fetch( - destination, pdus, outlier=True + destination, + [p for p in pdus if p.event_id not in seen_events], + outlier=True + ) + signed_pdus.extend( + seen_events[p.event_id] for p in pdus if p.event_id in seen_events ) signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True + destination, + [p for p in pdus if p.event_id not in auth_chain], + outlier=True + ) + signed_auth.extend( + seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events ) signed_auth.sort(key=lambda e: e.depth) -- cgit 1.4.1 From 2d4de61fb7b6276255fac3c07ef4c242f5a93c71 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Aug 2016 10:48:56 +0100 Subject: Fix typo --- synapse/federation/federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b751d26de1..65778fd4ee 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -376,7 +376,7 @@ class FederationClient(FederationBase): signed_auth = yield self._check_sigs_and_hash_and_fetch( destination, - [p for p in pdus if p.event_id not in auth_chain], + [p for p in auth_chain if p.event_id not in seen_events], outlier=True ) signed_auth.extend( -- cgit 1.4.1 From 46453bfc2f203bae8ac7845f04ef64c04e172c92 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Aug 2016 18:02:03 +0100 Subject: Retry joining via other servers if first one failed --- synapse/federation/federation_client.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 65778fd4ee..92332cfdcf 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -523,14 +523,19 @@ class FederationClient(FederationBase): (destination, self.event_from_pdu_json(pdu_dict)) ) break - except CodeMessageException: - raise + except CodeMessageException as e: + if not 500 <= e.code < 600: + raise + else: + logger.warn( + "Failed to make_%s via %s: %s", + membership, destination, e.message + ) except Exception as e: logger.warn( "Failed to make_%s via %s: %s", membership, destination, e.message ) - raise raise RuntimeError("Failed to send to any server.") @@ -602,8 +607,14 @@ class FederationClient(FederationBase): "auth_chain": signed_auth, "origin": destination, }) - except CodeMessageException: - raise + except CodeMessageException as e: + if not 500 <= e.code < 600: + raise + else: + logger.exception( + "Failed to send_join via %s: %s", + destination, e.message + ) except Exception as e: logger.exception( "Failed to send_join via %s: %s", -- cgit 1.4.1 From 5f360182c65d1ef24417ee284c5a66239beca9fe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Aug 2016 18:08:32 +0100 Subject: Fix a couple of python bugs --- synapse/federation/federation_client.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 92332cfdcf..da95c2ad6d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -236,9 +236,9 @@ class FederationClient(FederationBase): # TODO: Rate limit the number of times we try and get the same event. if self._get_pdu_cache: - e = self._get_pdu_cache.get(event_id) - if e: - defer.returnValue(e) + ev = self._get_pdu_cache.get(event_id) + if ev: + defer.returnValue(ev) pdu = None for destination in destinations: @@ -269,7 +269,7 @@ class FederationClient(FederationBase): break - except SynapseError: + except SynapseError as e: logger.info( "Failed to get PDU %s from %s because %s", event_id, destination, e, @@ -336,8 +336,10 @@ class FederationClient(FederationBase): ev.event_id: ev for ev in fetched_events } - pdus = [event_map[e_id] for e_id in state_event_ids] - auth_chain = [event_map[e_id] for e_id in auth_event_ids] + pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map] + auth_chain = [ + event_map[e_id] for e_id in auth_event_ids if e_id in event_map + ] auth_chain.sort(key=lambda e: e.depth) -- cgit 1.4.1 From f91df1f761b1e9e4da184560b0e7d9557129d064 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Aug 2016 11:31:46 +0100 Subject: Store if we fail to fetch an event from a destination --- synapse/federation/federation_client.py | 37 ++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index da95c2ad6d..baa672c4ac 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -51,10 +51,34 @@ sent_edus_counter = metrics.register_counter("sent_edus") sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) +PDU_RETRY_TIME_MS = 1 * 60 * 1000 + + class FederationClient(FederationBase): def __init__(self, hs): super(FederationClient, self).__init__(hs) + self.pdu_destination_tried = {} + self._clock.looping_call( + self._clear_tried_cache, 60 * 1000, + ) + + def _clear_tried_cache(self): + """Clear pdu_destination_tried cache""" + now = self._clock.time_msec() + + old_dict = self.pdu_destination_tried + self.pdu_destination_tried = {} + + for event_id, destination_dict in old_dict.items(): + destination_dict = { + dest: time + for dest, time in destination_dict.items() + if time + PDU_RETRY_TIME_MS > now + } + if destination_dict: + self.pdu_destination_tried[event_id] = destination_dict + def start_get_pdu_cache(self): self._get_pdu_cache = ExpiringCache( cache_name="get_pdu_cache", @@ -240,8 +264,15 @@ class FederationClient(FederationBase): if ev: defer.returnValue(ev) + pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) + pdu = None for destination in destinations: + now = self._clock.time_msec() + last_attempt = pdu_attempts.get(destination, 0) + if last_attempt + PDU_RETRY_TIME_MS > now: + continue + try: limiter = yield get_retry_limiter( destination, @@ -276,9 +307,11 @@ class FederationClient(FederationBase): ) continue except CodeMessageException as e: - if 400 <= e.code < 500: + if 400 <= e.code < 500 and e.code != 404: raise + pdu_attempts[destination] = now + logger.info( "Failed to get PDU %s from %s because %s", event_id, destination, e, @@ -288,6 +321,8 @@ class FederationClient(FederationBase): logger.info(e.message) continue except Exception as e: + pdu_attempts[destination] = now + logger.info( "Failed to get PDU %s from %s because %s", event_id, destination, e, -- cgit 1.4.1 From ea8c4094dbaa9cec30c543a03f451d2555d1d23d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Aug 2016 13:26:13 +0100 Subject: Also pull out rejected events --- synapse/federation/federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index baa672c4ac..56115a87d7 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -441,7 +441,7 @@ class FederationClient(FederationBase): events and the second is a list of event ids that we failed to fetch. """ if return_local: - seen_events = yield self.store.get_events(event_ids) + seen_events = yield self.store.get_events(event_ids, allow_rejected=True) signed_events = seen_events.values() else: seen_events = yield self.store.have_events(event_ids) -- cgit 1.4.1 From 487bc49bf8dcaadd6abd9cee1ef762f1bf0d35a7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Aug 2016 13:39:12 +0100 Subject: Don't stop on 4xx series errors --- synapse/federation/federation_client.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 56115a87d7..9ba3151713 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -300,23 +300,13 @@ class FederationClient(FederationBase): break - except SynapseError as e: - logger.info( - "Failed to get PDU %s from %s because %s", - event_id, destination, e, - ) - continue - except CodeMessageException as e: - if 400 <= e.code < 500 and e.code != 404: - raise - pdu_attempts[destination] = now + except SynapseError as e: logger.info( "Failed to get PDU %s from %s because %s", event_id, destination, e, ) - continue except NotRetryingDestination as e: logger.info(e.message) continue -- cgit 1.4.1