diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/federation/federation_client.py | 9 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 164 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 74 | ||||
-rw-r--r-- | synapse/storage/push_rule.py | 18 |
4 files changed, 170 insertions, 95 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 3249060bcf..d3b46b24c1 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -165,10 +165,11 @@ class FederationClient(FederationBase): for p in transaction_data["pdus"] ] - for i, pdu in enumerate(pdus): - pdus[i] = yield self._check_sigs_and_hash(pdu) - - # FIXME: We should handle signature failures more gracefully. + # FIXME: We should handle signature failures more gracefully. + pdus[:] = yield defer.gatherResults( + [self._check_sigs_and_hash(pdu) for pdu in pdus], + consumeErrors=True, + ).addErrback(unwrapFirstError) defer.returnValue(pdus) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d85b1cf5de..46ce3699d7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -230,27 +230,65 @@ class FederationHandler(BaseHandler): if not extremities: extremities = yield self.store.get_oldest_events_in_room(room_id) - pdus = yield self.replication_layer.backfill( + events = yield self.replication_layer.backfill( dest, room_id, - limit, + limit=limit, extremities=extremities, ) - events = [] + event_map = {e.event_id: e for e in events} - for pdu in pdus: - event = pdu + event_ids = set(e.event_id for e in events) - # FIXME (erikj): Not sure this actually works :/ - context = yield self.state_handler.compute_event_context(event) + edges = [ + ev.event_id + for ev in events + if set(e_id for e_id, _ in ev.prev_events) - event_ids + ] - events.append((event, context)) + # For each edge get the current state. - yield self.store.persist_event( - event, - context=context, - backfilled=True + auth_events = {} + events_to_state = {} + for e_id in edges: + state, auth = yield self.replication_layer.get_state_for_room( + destination=dest, + room_id=room_id, + event_id=e_id + ) + auth_events.update({a.event_id: a for a in auth}) + events_to_state[e_id] = state + + yield defer.gatherResults( + [ + self._handle_new_event(dest, a) + for a in auth_events.values() + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) + + yield defer.gatherResults( + [ + self._handle_new_event( + dest, event_map[e_id], + state=events_to_state[e_id], + backfilled=True, + ) + for e_id in events_to_state + ], + consumeErrors=True + ).addErrback(unwrapFirstError) + + events.sort(key=lambda e: e.depth) + + for event in events: + if event in events_to_state: + continue + + yield self._handle_new_event( + dest, event, + backfilled=True, ) defer.returnValue(events) @@ -347,7 +385,7 @@ class FederationHandler(BaseHandler): logger.info(e.message) continue except Exception as e: - logger.warn( + logger.exception( "Failed to backfill from %s because %s", dom, e, ) @@ -517,54 +555,9 @@ class FederationHandler(BaseHandler): # FIXME pass - auth_ids_to_deferred = {} - - def process_auth_ev(ev): - auth_ids = [e_id for e_id, _ in ev.auth_events] - - prev_ds = [ - auth_ids_to_deferred[i] - for i in auth_ids - if i in auth_ids_to_deferred - ] - - d = defer.Deferred() - - auth_ids_to_deferred[ev.event_id] = d - - @defer.inlineCallbacks - def f(*_): - ev.internal_metadata.outlier = True - - try: - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - - yield self._handle_new_event( - origin, ev, auth_events=auth - ) - except: - logger.exception( - "Failed to handle auth event %s", - ev.event_id, - ) - - d.callback(None) - - if prev_ds: - dx = defer.DeferredList(prev_ds) - dx.addBoth(f) - else: - f() - - for e in auth_chain: - if e.event_id == event.event_id: - return - process_auth_ev(e) - - yield defer.DeferredList(auth_ids_to_deferred.values()) + yield self._handle_auth_events( + origin, [e for e in auth_chain if e.event_id != event.event_id] + ) @defer.inlineCallbacks def handle_state(e): @@ -1348,3 +1341,52 @@ class FederationHandler(BaseHandler): }, "missing": [e.event_id for e in missing_locals], }) + + @defer.inlineCallbacks + def _handle_auth_events(self, origin, auth_events): + auth_ids_to_deferred = {} + + def process_auth_ev(ev): + auth_ids = [e_id for e_id, _ in ev.auth_events] + + prev_ds = [ + auth_ids_to_deferred[i] + for i in auth_ids + if i in auth_ids_to_deferred + ] + + d = defer.Deferred() + + auth_ids_to_deferred[ev.event_id] = d + + @defer.inlineCallbacks + def f(*_): + ev.internal_metadata.outlier = True + + try: + auth = { + (e.type, e.state_key): e for e in auth_events + if e.event_id in auth_ids + } + + yield self._handle_new_event( + origin, ev, auth_events=auth + ) + except: + logger.exception( + "Failed to handle auth event %s", + ev.event_id, + ) + + d.callback(None) + + if prev_ds: + dx = defer.DeferredList(prev_ds) + dx.addBoth(f) + else: + f() + + for e in auth_events: + process_auth_ev(e) + + yield defer.DeferredList(auth_ids_to_deferred.values()) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 114ccece65..1ba073884b 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -19,6 +19,7 @@ from ._base import SQLBaseStore, cached from syutil.base64util import encode_base64 import logging +from Queue import PriorityQueue, Empty logger = logging.getLogger(__name__) @@ -363,7 +364,11 @@ class EventFederationStore(SQLBaseStore): return self.runInteraction( "get_backfill_events", self._get_backfill_events, room_id, event_list, limit - ).addCallback(self._get_events) + ).addCallback( + self._get_events + ).addCallback( + lambda l: sorted(l, key=lambda e: -e.depth) + ) def _get_backfill_events(self, txn, room_id, event_list, limit): logger.debug( @@ -371,43 +376,54 @@ class EventFederationStore(SQLBaseStore): room_id, repr(event_list), limit ) - event_results = event_list + event_results = set() - front = event_list + # We want to make sure that we do a breadth-first, "depth" ordered + # search. query = ( - "SELECT prev_event_id FROM event_edges " - "WHERE room_id = ? AND event_id = ? " - "LIMIT ?" + "SELECT depth, prev_event_id FROM event_edges" + " INNER JOIN events" + " ON prev_event_id = events.event_id" + " AND event_edges.room_id = events.room_id" + " WHERE event_edges.room_id = ? AND event_edges.event_id = ?" + " AND event_edges.is_state = ?" + " LIMIT ?" ) - # We iterate through all event_ids in `front` to select their previous - # events. These are dumped in `new_front`. - # We continue until we reach the limit *or* new_front is empty (i.e., - # we've run out of things to select - while front and len(event_results) < limit: + queue = PriorityQueue() - new_front = [] - for event_id in front: - logger.debug( - "_backfill_interaction: id=%s", - event_id - ) + for event_id in event_list: + depth = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={ + "event_id": event_id, + }, + retcol="depth" + ) - txn.execute( - query, - (room_id, event_id, limit - len(event_results)) - ) + queue.put((-depth, event_id)) - for row in txn.fetchall(): - logger.debug( - "_backfill_interaction: got id=%s", - *row - ) - new_front.append(row[0]) + while not queue.empty() and len(event_results) < limit: + try: + _, event_id = queue.get_nowait() + except Empty: + break - front = new_front - event_results += new_front + if event_id in event_results: + continue + + event_results.add(event_id) + + txn.execute( + query, + (room_id, event_id, False, limit - len(event_results)) + ) + + for row in txn.fetchall(): + if row[1] not in event_results: + queue.put((-row[0], row[1])) return event_results diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 80d0ac4ea3..4cac118d17 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -23,6 +23,7 @@ logger = logging.getLogger(__name__) class PushRuleStore(SQLBaseStore): + @cached() @defer.inlineCallbacks def get_push_rules_for_user(self, user_name): rows = yield self._simple_select_list( @@ -31,6 +32,7 @@ class PushRuleStore(SQLBaseStore): "user_name": user_name, }, retcols=PushRuleTable.fields, + desc="get_push_rules_enabled_for_user", ) rows.sort( @@ -151,6 +153,10 @@ class PushRuleStore(SQLBaseStore): txn.execute(sql, (user_name, priority_class, new_rule_priority)) txn.call_after( + self.get_push_rules_for_user.invalidate, user_name + ) + + txn.call_after( self.get_push_rules_enabled_for_user.invalidate, user_name ) @@ -183,6 +189,9 @@ class PushRuleStore(SQLBaseStore): new_rule['priority'] = new_prio txn.call_after( + self.get_push_rules_for_user.invalidate, user_name + ) + txn.call_after( self.get_push_rules_enabled_for_user.invalidate, user_name ) @@ -208,6 +217,8 @@ class PushRuleStore(SQLBaseStore): {'user_name': user_name, 'rule_id': rule_id}, desc="delete_push_rule", ) + + self.get_push_rules_for_user.invalidate(user_name) self.get_push_rules_enabled_for_user.invalidate(user_name) @defer.inlineCallbacks @@ -228,7 +239,12 @@ class PushRuleStore(SQLBaseStore): {'enabled': 1 if enabled else 0}, {'id': new_id}, ) - self.get_push_rules_enabled_for_user.invalidate(user_name) + txn.call_after( + self.get_push_rules_for_user.invalidate, user_name + ) + txn.call_after( + self.get_push_rules_enabled_for_user.invalidate, user_name + ) class RuleNotFoundException(Exception): |