summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-11-13 14:49:49 +0000
committerErik Johnston <erik@matrix.org>2018-11-13 14:56:38 +0000
commit08395c7f8946bd2c4eec2ac5f438b96922062ecb (patch)
treeca7fc44225657ad68126cbb60aa2b6e849622ea8
parentAdd thread_id to filter (diff)
downloadsynapse-08395c7f8946bd2c4eec2ac5f438b96922062ecb.tar.xz
Implemented thread support for backfills
-rw-r--r--synapse/handlers/federation.py54
-rw-r--r--synapse/rest/client/v2_alpha/sync.py4
-rw-r--r--synapse/storage/event_federation.py18
3 files changed, 64 insertions, 12 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f6fec0afdd..24189d85c9 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -691,7 +691,12 @@ class FederationHandler(BaseHandler):
 
         # Don't bother processing events we already have.
         seen_events = yield self.store.have_events_in_timeline(
-            set(e.event_id for e in events)
+            set(
+                itertools.chain.from_iterable(
+                    itertools.chain([e.event_id], e.prev_event_ids(),)
+                    for e in events
+                )
+            )
         )
 
         events = [e for e in events if e.event_id not in seen_events]
@@ -706,7 +711,7 @@ class FederationHandler(BaseHandler):
         edges = [
             ev.event_id
             for ev in events
-            if set(ev.prev_event_ids()) - event_ids
+            if set(ev.prev_event_ids()) - event_ids - seen_events
         ]
 
         logger.info(
@@ -740,18 +745,26 @@ class FederationHandler(BaseHandler):
         })
         missing_auth = required_auth - set(auth_events)
         failed_to_fetch = set()
+        not_in_db = set()
 
         # Try and fetch any missing auth events from both DB and remote servers.
         # We repeatedly do this until we stop finding new auth events.
         while missing_auth - failed_to_fetch:
             logger.info("Missing auth for backfill: %r", missing_auth)
-            ret_events = yield self.store.get_events(missing_auth - failed_to_fetch)
-            auth_events.update(ret_events)
 
-            required_auth.update(
-                a_id for event in ret_events.values() for a_id in event.auth_event_ids()
-            )
-            missing_auth = required_auth - set(auth_events)
+            to_fetch_from_db = missing_auth - failed_to_fetch
+            while to_fetch_from_db - not_in_db:
+                ret_events = yield self.store.get_events(missing_auth - failed_to_fetch)
+                auth_events.update(ret_events)
+
+                required_auth.update(
+                    a_id
+                    for event in ret_events.values()
+                    for a_id in event.auth_event_ids()
+                )
+                missing_auth = required_auth - set(auth_events)
+
+                to_fetch_from_db = required_auth - set(auth_events) - not_in_db
 
             if missing_auth - failed_to_fetch:
                 logger.info(
@@ -820,6 +833,25 @@ class FederationHandler(BaseHandler):
 
         events.sort(key=lambda e: e.depth)
 
+        event_id_to_thread = {}
+        event_to_parents = {}
+        for event in reversed(events):
+            threads = yield self.store.get_threads_for_backfill_event(event.event_id)
+            parents = event_to_parents.get(event.event_id, [])
+            for p in parents:
+                t = event_id_to_thread.get(p)
+                if t is not None:
+                    threads.append(t)
+
+            if threads:
+                thread_id = min(threads)
+            else:
+                thread_id = 0
+
+            event_id_to_thread[event.event_id] = thread_id
+            for c in event.prev_event_ids():
+                event_to_parents.setdefault(c, set()).add(event.event_id)
+
         for event in events:
             if event in events_to_state:
                 continue
@@ -829,6 +861,7 @@ class FederationHandler(BaseHandler):
             # TODO: We can probably do something more clever here.
             yield self._handle_new_event(
                 dest, event, backfilled=True,
+                thread_id=event_id_to_thread[event.event_id],
             )
 
         defer.returnValue(events)
@@ -838,12 +871,13 @@ class FederationHandler(BaseHandler):
         """Checks the database to see if we should backfill before paginating,
         and if so do.
         """
+        logger.info("Backfilling")
         extremities = yield self.store.get_oldest_events_with_depth_in_room(
             room_id
         )
 
         if not extremities:
-            logger.debug("Not backfilling as no extremeties found.")
+            logger.info("Not backfilling as no extremeties found.")
             return
 
         # Check if we reached a point where we should start backfilling.
@@ -858,7 +892,7 @@ class FederationHandler(BaseHandler):
         extremities = dict(sorted_extremeties_tuple[:5])
 
         if current_depth > max_depth:
-            logger.debug(
+            logger.info(
                 "Not backfilling as we don't need to. %d < %d",
                 max_depth, current_depth,
             )
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index a6b1121e8c..20a031fcec 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -28,7 +28,6 @@ from synapse.events.utils import (
     format_event_raw,
     serialize_event,
 )
-from synapse.events import FrozenEvent
 from synapse.handlers.presence import format_user_presence_state
 from synapse.handlers.sync import SyncConfig
 from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
@@ -246,7 +245,8 @@ class SyncRestServlet(RestServlet):
         }
 
     @staticmethod
-    def encode_joined(rooms, time_now, token_id, event_fields, event_formatter, exclude_threaded):
+    def encode_joined(rooms, time_now, token_id, event_fields, event_formatter,
+                      exclude_threaded):
         """
         Encode the joined rooms in a sync result
 
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index d3b9dea1d6..a5f72aaa13 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -34,6 +34,24 @@ logger = logging.getLogger(__name__)
 
 class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
                                  SQLBaseStore):
+
+    def get_threads_for_backfill_event(self, event_id):
+        def _get_thread_for_backfill_event_txn(txn):
+            sql = """
+                SELECT thread_id
+                FROM event_edges
+                INNER JOIN events USING (event_id)
+                WHERE prev_event_id = ?
+            """
+
+            txn.execute(sql, (event_id,))
+            return [thread_id for thread_id, in txn]
+
+        return self.runInteraction(
+            "get_thread_for_backfill_event",
+            _get_thread_for_backfill_event_txn,
+        )
+
     def get_auth_chain(self, event_ids, include_given=False):
         """Get auth events for given event_ids. The events *must* be state events.