From 84e6b4001f22b0e8c2f806053189fcdb1e85205b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 11 May 2015 18:01:31 +0100 Subject: Initial hack at wiring together pagination and backfill --- synapse/handlers/federation.py | 108 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 106 insertions(+), 2 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 85e2757227..4d39cd4b30 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -218,10 +218,11 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def backfill(self, dest, room_id, limit): + def backfill(self, dest, room_id, limit, extremities=[]): """ Trigger a backfill request to `dest` for the given `room_id` """ - extremities = yield self.store.get_oldest_events_in_room(room_id) + if not extremities: + extremities = yield self.store.get_oldest_events_in_room(room_id) pdus = yield self.replication_layer.backfill( dest, @@ -248,6 +249,109 @@ class FederationHandler(BaseHandler): defer.returnValue(events) + @defer.inlineCallbacks + def maybe_backfill(self, room_id, current_depth): + """Checks the database to see if we should backfill before paginating + """ + extremities = yield self.store.get_oldest_events_with_depth_in_room( + room_id + ) + + logger.debug("Got extremeties: %r", extremities) + + if not extremities: + return + + # Check if we reached a point where we should start backfilling. + sorted_extremeties_tuple = sorted( + extremities.items(), + key=lambda e: -int(e[1]) + ) + max_depth = sorted_extremeties_tuple[0][1] + + logger.debug("max_depth: %r", max_depth) + if current_depth > max_depth: + return + + # Now we need to decide which hosts to hit first. + + # First we try hosts that are already in the room, that were around + # at the time. TODO: HEURISTIC ALERT. + + curr_state = yield self.state_handler.get_current_state(room_id) + + def get_domains_from_state(state): + joined_users = [ + (state_key, int(event.depth)) + for (e_type, state_key), event in state.items() + if e_type == EventTypes.Member + and event.membership == Membership.JOIN + ] + + joined_domains = {} + for u, d in joined_users: + try: + dom = UserID.from_string(u).domain + old_d = joined_domains.get(dom) + if old_d: + joined_domains[dom] = min(d, old_d) + else: + joined_domains[dom] = d + except: + pass + + return sorted(joined_domains.items(), key=lambda d: d[1]) + + curr_domains = get_domains_from_state(curr_state) + + logger.debug("curr_domains: %r", curr_domains) + + likely_domains = [ + domain for domain, depth in curr_domains + ] + + @defer.inlineCallbacks + def try_backfill(domains): + # TODO: Should we try multiple of these at a time? + for dom in domains: + events = yield self.backfill( + dom, room_id, + limit=100, + extremities=[e for e in extremities.keys()] + ) + + if events: + defer.returnValue(True) + defer.returnValue(False) + + success = yield try_backfill(likely_domains) + if success: + defer.returnValue(True) + + # Huh, well *those* domains didn't work out. Lets try some domains + # from the time. + + tried_domains = set(likely_domains) + + states = yield defer.gatherResults({ + e: self.state_handler.resolve_state_groups([e])[1] + for e in extremities.keys() + }) + + for e_id, _ in sorted_extremeties_tuple: + likely_domains = get_domains_from_state(states[e_id])[0] + + success = yield try_backfill([ + dom for dom in likely_domains + if dom not in tried_domains + ]) + if success: + defer.returnValue(True) + + tried_domains.update(likely_domains) + + defer.returnValue(False) + @defer.inlineCallbacks def send_invite(self, target_host, event): """ Sends the invite to the remote server for signing. -- cgit 1.4.1 From 367382b575a61f780f3e70a62cc01a790dcc9375 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 10:35:45 +0100 Subject: Handle the case where the other side is unreachable when backfilling --- synapse/handlers/federation.py | 56 +++++++++++++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 14 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 4d39cd4b30..8b5ac5d6c4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -18,7 +18,7 @@ from ._base import BaseHandler from synapse.api.errors import ( - AuthError, FederationError, StoreError, + AuthError, FederationError, StoreError, CodeMessageException, SynapseError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.util.logutils import log_function @@ -29,6 +29,8 @@ from synapse.crypto.event_signing import ( ) from synapse.types import UserID +from synapse.util.retryutils import NotRetryingDestination + from twisted.internet import defer import itertools @@ -251,15 +253,15 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def maybe_backfill(self, room_id, current_depth): - """Checks the database to see if we should backfill before paginating + """Checks the database to see if we should backfill before paginating, + and if so do. """ extremities = yield self.store.get_oldest_events_with_depth_in_room( room_id ) - logger.debug("Got extremeties: %r", extremities) - if not extremities: + logger.debug("Not backfilling as no extremeties found.") return # Check if we reached a point where we should start backfilling. @@ -269,14 +271,17 @@ class FederationHandler(BaseHandler): ) max_depth = sorted_extremeties_tuple[0][1] - logger.debug("max_depth: %r", max_depth) if current_depth > max_depth: + logger.debug( + "Not backfilling as we don't need to. %d < %d", + current_depth, max_depth, + ) return # Now we need to decide which hosts to hit first. - # First we try hosts that are already in the room, that were around - # at the time. TODO: HEURISTIC ALERT. + # First we try hosts that are already in the room + # TODO: HEURISTIC ALERT. curr_state = yield self.state_handler.get_current_state(room_id) @@ -304,8 +309,6 @@ class FederationHandler(BaseHandler): curr_domains = get_domains_from_state(curr_state) - logger.debug("curr_domains: %r", curr_domains) - likely_domains = [ domain for domain, depth in curr_domains ] @@ -314,11 +317,36 @@ class FederationHandler(BaseHandler): def try_backfill(domains): # TODO: Should we try multiple of these at a time? for dom in domains: - events = yield self.backfill( - dom, room_id, - limit=100, - extremities=[e for e in extremities.keys()] - ) + try: + events = yield self.backfill( + dom, room_id, + limit=100, + extremities=[e for e in extremities.keys()] + ) + except SynapseError: + logger.info( + "Failed to backfil from %s because %s", + dom, e, + ) + continue + except CodeMessageException as e: + if 400 <= e.code < 500: + raise + + logger.info( + "Failed to backfil from %s because %s", + dom, e, + ) + continue + except NotRetryingDestination as e: + logger.info(e.message) + continue + except Exception as e: + logger.info( + "Failed to backfil from %s because %s", + dom, e, + ) + continue if events: defer.returnValue(True) -- cgit 1.4.1 From 6e5ac4a28fe79162e62b68cc62aa4e37badcc8b4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 13:58:14 +0100 Subject: Err, gatherResults doesn't take a dict... --- synapse/handlers/federation.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8b5ac5d6c4..31c09365e3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -361,10 +361,13 @@ class FederationHandler(BaseHandler): tried_domains = set(likely_domains) - states = yield defer.gatherResults({ - e: self.state_handler.resolve_state_groups([e])[1] - for e in extremities.keys() - }) + event_ids = list(extremities.keys()) + + states = yield defer.gatherResults([ + self.state_handler.resolve_state_groups([e])[1] + for e in event_ids + ]) + states = dict(zip(event_ids, states)) for e_id, _ in sorted_extremeties_tuple: likely_domains = get_domains_from_state(states[e_id])[0] -- cgit 1.4.1 From a0dfffb33cf8ca721526be0c6a1e05199f2b6258 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 14:00:31 +0100 Subject: And another typo. --- synapse/handlers/federation.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 31c09365e3..6f97127aec 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -274,7 +274,7 @@ class FederationHandler(BaseHandler): if current_depth > max_depth: logger.debug( "Not backfilling as we don't need to. %d < %d", - current_depth, max_depth, + max_depth, current_depth, ) return @@ -364,10 +364,10 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) states = yield defer.gatherResults([ - self.state_handler.resolve_state_groups([e])[1] + self.state_handler.resolve_state_groups([e]) for e in event_ids ]) - states = dict(zip(event_ids, states)) + states = dict(zip(event_ids, [s[1] for s in states])) for e_id, _ in sorted_extremeties_tuple: likely_domains = get_domains_from_state(states[e_id])[0] -- cgit 1.4.1 From 0d31ad5101546380308e7735d4543102b7e60bca Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 14:02:01 +0100 Subject: Typos everywhere --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6f97127aec..7b7b998f05 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -370,7 +370,7 @@ class FederationHandler(BaseHandler): states = dict(zip(event_ids, [s[1] for s in states])) for e_id, _ in sorted_extremeties_tuple: - likely_domains = get_domains_from_state(states[e_id])[0] + likely_domains = get_domains_from_state(states[e_id]) success = yield try_backfill([ dom for dom in likely_domains -- cgit 1.4.1 From 07a12231569189be1699f50d71b38414ba822bdc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 14:09:54 +0100 Subject: s/backfil/backfill/ --- synapse/handlers/federation.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7b7b998f05..1093112587 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -325,7 +325,7 @@ class FederationHandler(BaseHandler): ) except SynapseError: logger.info( - "Failed to backfil from %s because %s", + "Failed to backfill from %s because %s", dom, e, ) continue @@ -334,7 +334,7 @@ class FederationHandler(BaseHandler): raise logger.info( - "Failed to backfil from %s because %s", + "Failed to backfill from %s because %s", dom, e, ) continue @@ -342,8 +342,8 @@ class FederationHandler(BaseHandler): logger.info(e.message) continue except Exception as e: - logger.info( - "Failed to backfil from %s because %s", + logger.warn( + "Failed to backfill from %s because %s", dom, e, ) continue -- cgit 1.4.1