summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/federation.py108
-rw-r--r--synapse/handlers/message.py10
2 files changed, 115 insertions, 3 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 85e2757227..4d39cd4b30 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -218,10 +218,11 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def backfill(self, dest, room_id, limit):
+    def backfill(self, dest, room_id, limit, extremities=[]):
         """ Trigger a backfill request to `dest` for the given `room_id`
         """
-        extremities = yield self.store.get_oldest_events_in_room(room_id)
+        if not extremities:
+            extremities = yield self.store.get_oldest_events_in_room(room_id)
 
         pdus = yield self.replication_layer.backfill(
             dest,
@@ -249,6 +250,109 @@ class FederationHandler(BaseHandler):
         defer.returnValue(events)
 
     @defer.inlineCallbacks
+    def maybe_backfill(self, room_id, current_depth):
+        """Checks the database to see if we should backfill before paginating
+        """
+        extremities = yield self.store.get_oldest_events_with_depth_in_room(
+            room_id
+        )
+
+        logger.debug("Got extremeties: %r", extremities)
+
+        if not extremities:
+            return
+
+        # Check if we reached a point where we should start backfilling.
+        sorted_extremeties_tuple = sorted(
+            extremities.items(),
+            key=lambda e: -int(e[1])
+        )
+        max_depth = sorted_extremeties_tuple[0][1]
+
+        logger.debug("max_depth: %r", max_depth)
+        if current_depth > max_depth:
+            return
+
+        # Now we need to decide which hosts to hit first.
+
+        # First we try hosts that are already in the room, that were around
+        # at the time. TODO: HEURISTIC ALERT.
+
+        curr_state = yield self.state_handler.get_current_state(room_id)
+
+        def get_domains_from_state(state):
+            joined_users = [
+                (state_key, int(event.depth))
+                for (e_type, state_key), event in state.items()
+                if e_type == EventTypes.Member
+                and event.membership == Membership.JOIN
+            ]
+
+            joined_domains = {}
+            for u, d in joined_users:
+                try:
+                    dom = UserID.from_string(u).domain
+                    old_d = joined_domains.get(dom)
+                    if old_d:
+                        joined_domains[dom] = min(d, old_d)
+                    else:
+                        joined_domains[dom] = d
+                except:
+                    pass
+
+            return sorted(joined_domains.items(), key=lambda d: d[1])
+
+        curr_domains = get_domains_from_state(curr_state)
+
+        logger.debug("curr_domains: %r", curr_domains)
+
+        likely_domains = [
+            domain for domain, depth in curr_domains
+        ]
+
+        @defer.inlineCallbacks
+        def try_backfill(domains):
+            # TODO: Should we try multiple of these at a time?
+            for dom in domains:
+                events = yield self.backfill(
+                    dom, room_id,
+                    limit=100,
+                    extremities=[e for e in extremities.keys()]
+                )
+
+                if events:
+                    defer.returnValue(True)
+            defer.returnValue(False)
+
+        success = yield try_backfill(likely_domains)
+        if success:
+            defer.returnValue(True)
+
+        # Huh, well *those* domains didn't work out. Lets try some domains
+        # from the time.
+
+        tried_domains = set(likely_domains)
+
+        states = yield defer.gatherResults({
+            e: self.state_handler.resolve_state_groups([e])[1]
+            for e in extremities.keys()
+        })
+
+        for e_id, _ in sorted_extremeties_tuple:
+            likely_domains = get_domains_from_state(states[e_id])[0]
+
+            success = yield try_backfill([
+                dom for dom in likely_domains
+                if dom not in tried_domains
+            ])
+            if success:
+                defer.returnValue(True)
+
+            tried_domains.update(likely_domains)
+
+        defer.returnValue(False)
+
+    @defer.inlineCallbacks
     def send_invite(self, target_host, event):
         """ Sends the invite to the remote server for signing.
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 22e19af17f..38e375f86a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -21,7 +21,7 @@ from synapse.streams.config import PaginationConfig
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.util.logcontext import PreserveLoggingContext
-from synapse.types import UserID
+from synapse.types import UserID, RoomStreamToken
 
 from ._base import BaseHandler
 
@@ -92,6 +92,14 @@ class MessageHandler(BaseHandler):
                 yield self.hs.get_event_sources().get_current_token()
             )
 
+        room_token = RoomStreamToken.parse(pagin_config.from_token.room_key)
+        if room_token.topological is None:
+            raise SynapseError(400, "Invalid token")
+
+        yield self.hs.get_handlers().federation_handler.maybe_backfill(
+            room_id, room_token.topological
+        )
+
         user = UserID.from_string(user_id)
 
         events, next_key = yield data_source.get_pagination_rows(