diff options
author | Erik Johnston <erik@matrix.org> | 2016-04-12 13:30:30 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-04-12 13:30:30 +0100 |
commit | 318cb1f2078353525137ca095ba7624735301a0c (patch) | |
tree | 688d57f0af0403757482f01c1bb0c53694b0387d | |
parent | Fix the rule_id for .m.rule.invite_for_me (#715) (diff) | |
parent | More comments (diff) | |
download | synapse-318cb1f2078353525137ca095ba7624735301a0c.tar.xz |
Merge pull request #717 from matrix-org/erikj/backfill_state
Check if we've already backfilled events
-rw-r--r-- | synapse/handlers/federation.py | 51 | ||||
-rw-r--r-- | synapse/storage/events.py | 23 |
2 files changed, 59 insertions, 15 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 83dab32bcb..5ac55e10f3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -280,6 +280,10 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def backfill(self, dest, room_id, limit, extremities=[]): """ Trigger a backfill request to `dest` for the given `room_id` + + This will attempt to get more events from the remote. This may return + be successfull and still return no events if the other side has no new + events to offer. """ if dest == self.server_name: raise SynapseError(400, "Can't backfill from self.") @@ -294,6 +298,16 @@ class FederationHandler(BaseHandler): extremities=extremities, ) + # Don't bother processing events we already have. + seen_events = yield self.store.have_events_in_timeline( + set(e.event_id for e in events) + ) + + events = [e for e in events if e.event_id not in seen_events] + + if not events: + defer.returnValue([]) + event_map = {e.event_id: e for e in events} event_ids = set(e.event_id for e in events) @@ -353,6 +367,7 @@ class FederationHandler(BaseHandler): for a in auth_events.values(): if a.event_id in seen_events: continue + a.internal_metadata.outlier = True ev_infos.append({ "event": a, "auth_events": { @@ -373,20 +388,23 @@ class FederationHandler(BaseHandler): } }) + yield self._handle_new_events( + dest, ev_infos, + backfilled=True, + ) + events.sort(key=lambda e: e.depth) for event in events: if event in events_to_state: continue - ev_infos.append({ - "event": event, - }) - - yield self._handle_new_events( - dest, ev_infos, - backfilled=True, - ) + # We store these one at a time since each event depends on the + # previous to work out the state. + # TODO: We can probably do something more clever here. + yield self._handle_new_event( + dest, event + ) defer.returnValue(events) @@ -458,11 +476,15 @@ class FederationHandler(BaseHandler): # TODO: Should we try multiple of these at a time? for dom in domains: try: - events = yield self.backfill( + yield self.backfill( dom, room_id, limit=100, extremities=[e for e in extremities.keys()] ) + # If this succeeded then we probably already have the + # appropriate stuff. + # TODO: We can probably do something more intelligent here. + defer.returnValue(True) except SynapseError as e: logger.info( "Failed to backfill from %s because %s", @@ -488,8 +510,6 @@ class FederationHandler(BaseHandler): ) continue - if events: - defer.returnValue(True) defer.returnValue(False) success = yield try_backfill(likely_domains) @@ -1076,7 +1096,8 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def _handle_new_event(self, origin, event, state=None, auth_events=None): + def _handle_new_event(self, origin, event, state=None, auth_events=None, + backfilled=False): context = yield self._prep_event( origin, event, state=state, @@ -1092,6 +1113,7 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, + backfilled=backfilled, ) # this intentionally does not yield: we don't care about the result @@ -1104,6 +1126,11 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _handle_new_events(self, origin, event_infos, backfilled=False): + """Creates the appropriate contexts and persists events. The events + should not depend on one another, e.g. this should be used to persist + a bunch of outliers, but not a chunk of individual events that depend + on each other for state calculations. + """ contexts = yield defer.gatherResults( [ self._prep_event( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 308a2c9b02..21487724ed 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -118,7 +118,7 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks @log_function - def persist_event(self, event, context, current_state=None): + def persist_event(self, event, context, current_state=None, backfilled=False): try: with self._stream_id_gen.get_next() as stream_ordering: @@ -131,6 +131,7 @@ class EventsStore(SQLBaseStore): event=event, context=context, current_state=current_state, + backfilled=backfilled, ) except _RollbackButIsFineException: pass @@ -195,7 +196,7 @@ class EventsStore(SQLBaseStore): defer.returnValue({e.event_id: e for e in events}) @log_function - def _persist_event_txn(self, txn, event, context, current_state): + def _persist_event_txn(self, txn, event, context, current_state, backfilled=False): # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: @@ -238,7 +239,7 @@ class EventsStore(SQLBaseStore): return self._persist_events_txn( txn, [(event, context)], - backfilled=False, + backfilled=backfilled, ) @log_function @@ -543,6 +544,22 @@ class EventsStore(SQLBaseStore): (event.event_id, event.redacts) ) + @defer.inlineCallbacks + def have_events_in_timeline(self, event_ids): + """Given a list of event ids, check if we have already processed and + stored them as non outliers. + """ + rows = yield self._simple_select_many_batch( + table="events", + retcols=("event_id",), + column="event_id", + iterable=list(event_ids), + keyvalues={"outlier": False}, + desc="have_events_in_timeline", + ) + + defer.returnValue(set(r["event_id"] for r in rows)) + def have_events(self, event_ids): """Given a list of event ids, check if we have already processed them. |