diff options
author | Erik Johnston <erik@matrix.org> | 2015-05-11 18:01:31 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-05-11 18:01:31 +0100 |
commit | 84e6b4001f22b0e8c2f806053189fcdb1e85205b (patch) | |
tree | ade52ca08e4af76b9f4c361bdbb3b2e0ebc15db7 /synapse/handlers/federation.py | |
parent | Move storage.stream._StreamToken to types.RoomStreamToken (diff) | |
download | synapse-84e6b4001f22b0e8c2f806053189fcdb1e85205b.tar.xz |
Initial hack at wiring together pagination and backfill
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 108 |
1 files changed, 106 insertions, 2 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 85e2757227..4d39cd4b30 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -218,10 +218,11 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def backfill(self, dest, room_id, limit): + def backfill(self, dest, room_id, limit, extremities=[]): """ Trigger a backfill request to `dest` for the given `room_id` """ - extremities = yield self.store.get_oldest_events_in_room(room_id) + if not extremities: + extremities = yield self.store.get_oldest_events_in_room(room_id) pdus = yield self.replication_layer.backfill( dest, @@ -249,6 +250,109 @@ class FederationHandler(BaseHandler): defer.returnValue(events) @defer.inlineCallbacks + def maybe_backfill(self, room_id, current_depth): + """Checks the database to see if we should backfill before paginating + """ + extremities = yield self.store.get_oldest_events_with_depth_in_room( + room_id + ) + + logger.debug("Got extremeties: %r", extremities) + + if not extremities: + return + + # Check if we reached a point where we should start backfilling. + sorted_extremeties_tuple = sorted( + extremities.items(), + key=lambda e: -int(e[1]) + ) + max_depth = sorted_extremeties_tuple[0][1] + + logger.debug("max_depth: %r", max_depth) + if current_depth > max_depth: + return + + # Now we need to decide which hosts to hit first. + + # First we try hosts that are already in the room, that were around + # at the time. TODO: HEURISTIC ALERT. + + curr_state = yield self.state_handler.get_current_state(room_id) + + def get_domains_from_state(state): + joined_users = [ + (state_key, int(event.depth)) + for (e_type, state_key), event in state.items() + if e_type == EventTypes.Member + and event.membership == Membership.JOIN + ] + + joined_domains = {} + for u, d in joined_users: + try: + dom = UserID.from_string(u).domain + old_d = joined_domains.get(dom) + if old_d: + joined_domains[dom] = min(d, old_d) + else: + joined_domains[dom] = d + except: + pass + + return sorted(joined_domains.items(), key=lambda d: d[1]) + + curr_domains = get_domains_from_state(curr_state) + + logger.debug("curr_domains: %r", curr_domains) + + likely_domains = [ + domain for domain, depth in curr_domains + ] + + @defer.inlineCallbacks + def try_backfill(domains): + # TODO: Should we try multiple of these at a time? + for dom in domains: + events = yield self.backfill( + dom, room_id, + limit=100, + extremities=[e for e in extremities.keys()] + ) + + if events: + defer.returnValue(True) + defer.returnValue(False) + + success = yield try_backfill(likely_domains) + if success: + defer.returnValue(True) + + # Huh, well *those* domains didn't work out. Lets try some domains + # from the time. + + tried_domains = set(likely_domains) + + states = yield defer.gatherResults({ + e: self.state_handler.resolve_state_groups([e])[1] + for e in extremities.keys() + }) + + for e_id, _ in sorted_extremeties_tuple: + likely_domains = get_domains_from_state(states[e_id])[0] + + success = yield try_backfill([ + dom for dom in likely_domains + if dom not in tried_domains + ]) + if success: + defer.returnValue(True) + + tried_domains.update(likely_domains) + + defer.returnValue(False) + + @defer.inlineCallbacks def send_invite(self, target_host, event): """ Sends the invite to the remote server for signing. |