summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/federation.py51
-rw-r--r--synapse/storage/events.py23
2 files changed, 59 insertions, 15 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 83dab32bcb..5ac55e10f3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -280,6 +280,10 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     def backfill(self, dest, room_id, limit, extremities=[]):
         """ Trigger a backfill request to `dest` for the given `room_id`
+
+        This will attempt to get more events from the remote. This may return
+        be successfull and still return no events if the other side has no new
+        events to offer.
         """
         if dest == self.server_name:
             raise SynapseError(400, "Can't backfill from self.")
@@ -294,6 +298,16 @@ class FederationHandler(BaseHandler):
             extremities=extremities,
         )
 
+        # Don't bother processing events we already have.
+        seen_events = yield self.store.have_events_in_timeline(
+            set(e.event_id for e in events)
+        )
+
+        events = [e for e in events if e.event_id not in seen_events]
+
+        if not events:
+            defer.returnValue([])
+
         event_map = {e.event_id: e for e in events}
 
         event_ids = set(e.event_id for e in events)
@@ -353,6 +367,7 @@ class FederationHandler(BaseHandler):
         for a in auth_events.values():
             if a.event_id in seen_events:
                 continue
+            a.internal_metadata.outlier = True
             ev_infos.append({
                 "event": a,
                 "auth_events": {
@@ -373,20 +388,23 @@ class FederationHandler(BaseHandler):
                 }
             })
 
+        yield self._handle_new_events(
+            dest, ev_infos,
+            backfilled=True,
+        )
+
         events.sort(key=lambda e: e.depth)
 
         for event in events:
             if event in events_to_state:
                 continue
 
-            ev_infos.append({
-                "event": event,
-            })
-
-        yield self._handle_new_events(
-            dest, ev_infos,
-            backfilled=True,
-        )
+            # We store these one at a time since each event depends on the
+            # previous to work out the state.
+            # TODO: We can probably do something more clever here.
+            yield self._handle_new_event(
+                dest, event
+            )
 
         defer.returnValue(events)
 
@@ -458,11 +476,15 @@ class FederationHandler(BaseHandler):
             # TODO: Should we try multiple of these at a time?
             for dom in domains:
                 try:
-                    events = yield self.backfill(
+                    yield self.backfill(
                         dom, room_id,
                         limit=100,
                         extremities=[e for e in extremities.keys()]
                     )
+                    # If this succeeded then we probably already have the
+                    # appropriate stuff.
+                    # TODO: We can probably do something more intelligent here.
+                    defer.returnValue(True)
                 except SynapseError as e:
                     logger.info(
                         "Failed to backfill from %s because %s",
@@ -488,8 +510,6 @@ class FederationHandler(BaseHandler):
                     )
                     continue
 
-                if events:
-                    defer.returnValue(True)
             defer.returnValue(False)
 
         success = yield try_backfill(likely_domains)
@@ -1076,7 +1096,8 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def _handle_new_event(self, origin, event, state=None, auth_events=None):
+    def _handle_new_event(self, origin, event, state=None, auth_events=None,
+                          backfilled=False):
         context = yield self._prep_event(
             origin, event,
             state=state,
@@ -1092,6 +1113,7 @@ class FederationHandler(BaseHandler):
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event,
             context=context,
+            backfilled=backfilled,
         )
 
         # this intentionally does not yield: we don't care about the result
@@ -1104,6 +1126,11 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _handle_new_events(self, origin, event_infos, backfilled=False):
+        """Creates the appropriate contexts and persists events. The events
+        should not depend on one another, e.g. this should be used to persist
+        a bunch of outliers, but not a chunk of individual events that depend
+        on each other for state calculations.
+        """
         contexts = yield defer.gatherResults(
             [
                 self._prep_event(
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 308a2c9b02..21487724ed 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -118,7 +118,7 @@ class EventsStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     @log_function
-    def persist_event(self, event, context, current_state=None):
+    def persist_event(self, event, context, current_state=None, backfilled=False):
 
         try:
             with self._stream_id_gen.get_next() as stream_ordering:
@@ -131,6 +131,7 @@ class EventsStore(SQLBaseStore):
                         event=event,
                         context=context,
                         current_state=current_state,
+                        backfilled=backfilled,
                     )
         except _RollbackButIsFineException:
             pass
@@ -195,7 +196,7 @@ class EventsStore(SQLBaseStore):
         defer.returnValue({e.event_id: e for e in events})
 
     @log_function
-    def _persist_event_txn(self, txn, event, context, current_state):
+    def _persist_event_txn(self, txn, event, context, current_state, backfilled=False):
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
@@ -238,7 +239,7 @@ class EventsStore(SQLBaseStore):
         return self._persist_events_txn(
             txn,
             [(event, context)],
-            backfilled=False,
+            backfilled=backfilled,
         )
 
     @log_function
@@ -543,6 +544,22 @@ class EventsStore(SQLBaseStore):
             (event.event_id, event.redacts)
         )
 
+    @defer.inlineCallbacks
+    def have_events_in_timeline(self, event_ids):
+        """Given a list of event ids, check if we have already processed and
+        stored them as non outliers.
+        """
+        rows = yield self._simple_select_many_batch(
+            table="events",
+            retcols=("event_id",),
+            column="event_id",
+            iterable=list(event_ids),
+            keyvalues={"outlier": False},
+            desc="have_events_in_timeline",
+        )
+
+        defer.returnValue(set(r["event_id"] for r in rows))
+
     def have_events(self, event_ids):
         """Given a list of event ids, check if we have already processed them.