diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 83dab32bcb..5ac55e10f3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -280,6 +280,10 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def backfill(self, dest, room_id, limit, extremities=[]):
""" Trigger a backfill request to `dest` for the given `room_id`
+
+ This will attempt to get more events from the remote. This may return
+ be successfull and still return no events if the other side has no new
+ events to offer.
"""
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
@@ -294,6 +298,16 @@ class FederationHandler(BaseHandler):
extremities=extremities,
)
+ # Don't bother processing events we already have.
+ seen_events = yield self.store.have_events_in_timeline(
+ set(e.event_id for e in events)
+ )
+
+ events = [e for e in events if e.event_id not in seen_events]
+
+ if not events:
+ defer.returnValue([])
+
event_map = {e.event_id: e for e in events}
event_ids = set(e.event_id for e in events)
@@ -353,6 +367,7 @@ class FederationHandler(BaseHandler):
for a in auth_events.values():
if a.event_id in seen_events:
continue
+ a.internal_metadata.outlier = True
ev_infos.append({
"event": a,
"auth_events": {
@@ -373,20 +388,23 @@ class FederationHandler(BaseHandler):
}
})
+ yield self._handle_new_events(
+ dest, ev_infos,
+ backfilled=True,
+ )
+
events.sort(key=lambda e: e.depth)
for event in events:
if event in events_to_state:
continue
- ev_infos.append({
- "event": event,
- })
-
- yield self._handle_new_events(
- dest, ev_infos,
- backfilled=True,
- )
+ # We store these one at a time since each event depends on the
+ # previous to work out the state.
+ # TODO: We can probably do something more clever here.
+ yield self._handle_new_event(
+ dest, event
+ )
defer.returnValue(events)
@@ -458,11 +476,15 @@ class FederationHandler(BaseHandler):
# TODO: Should we try multiple of these at a time?
for dom in domains:
try:
- events = yield self.backfill(
+ yield self.backfill(
dom, room_id,
limit=100,
extremities=[e for e in extremities.keys()]
)
+ # If this succeeded then we probably already have the
+ # appropriate stuff.
+ # TODO: We can probably do something more intelligent here.
+ defer.returnValue(True)
except SynapseError as e:
logger.info(
"Failed to backfill from %s because %s",
@@ -488,8 +510,6 @@ class FederationHandler(BaseHandler):
)
continue
- if events:
- defer.returnValue(True)
defer.returnValue(False)
success = yield try_backfill(likely_domains)
@@ -1076,7 +1096,8 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
- def _handle_new_event(self, origin, event, state=None, auth_events=None):
+ def _handle_new_event(self, origin, event, state=None, auth_events=None,
+ backfilled=False):
context = yield self._prep_event(
origin, event,
state=state,
@@ -1092,6 +1113,7 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
+ backfilled=backfilled,
)
# this intentionally does not yield: we don't care about the result
@@ -1104,6 +1126,11 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _handle_new_events(self, origin, event_infos, backfilled=False):
+ """Creates the appropriate contexts and persists events. The events
+ should not depend on one another, e.g. this should be used to persist
+ a bunch of outliers, but not a chunk of individual events that depend
+ on each other for state calculations.
+ """
contexts = yield defer.gatherResults(
[
self._prep_event(
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 308a2c9b02..21487724ed 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -118,7 +118,7 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
@log_function
- def persist_event(self, event, context, current_state=None):
+ def persist_event(self, event, context, current_state=None, backfilled=False):
try:
with self._stream_id_gen.get_next() as stream_ordering:
@@ -131,6 +131,7 @@ class EventsStore(SQLBaseStore):
event=event,
context=context,
current_state=current_state,
+ backfilled=backfilled,
)
except _RollbackButIsFineException:
pass
@@ -195,7 +196,7 @@ class EventsStore(SQLBaseStore):
defer.returnValue({e.event_id: e for e in events})
@log_function
- def _persist_event_txn(self, txn, event, context, current_state):
+ def _persist_event_txn(self, txn, event, context, current_state, backfilled=False):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
@@ -238,7 +239,7 @@ class EventsStore(SQLBaseStore):
return self._persist_events_txn(
txn,
[(event, context)],
- backfilled=False,
+ backfilled=backfilled,
)
@log_function
@@ -543,6 +544,22 @@ class EventsStore(SQLBaseStore):
(event.event_id, event.redacts)
)
+ @defer.inlineCallbacks
+ def have_events_in_timeline(self, event_ids):
+ """Given a list of event ids, check if we have already processed and
+ stored them as non outliers.
+ """
+ rows = yield self._simple_select_many_batch(
+ table="events",
+ retcols=("event_id",),
+ column="event_id",
+ iterable=list(event_ids),
+ keyvalues={"outlier": False},
+ desc="have_events_in_timeline",
+ )
+
+ defer.returnValue(set(r["event_id"] for r in rows))
+
def have_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them.
|