diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/event_federation.py | 74 | ||||
-rw-r--r-- | synapse/storage/push_rule.py | 18 |
2 files changed, 62 insertions, 30 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 114ccece65..1ba073884b 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -19,6 +19,7 @@ from ._base import SQLBaseStore, cached from syutil.base64util import encode_base64 import logging +from Queue import PriorityQueue, Empty logger = logging.getLogger(__name__) @@ -363,7 +364,11 @@ class EventFederationStore(SQLBaseStore): return self.runInteraction( "get_backfill_events", self._get_backfill_events, room_id, event_list, limit - ).addCallback(self._get_events) + ).addCallback( + self._get_events + ).addCallback( + lambda l: sorted(l, key=lambda e: -e.depth) + ) def _get_backfill_events(self, txn, room_id, event_list, limit): logger.debug( @@ -371,43 +376,54 @@ class EventFederationStore(SQLBaseStore): room_id, repr(event_list), limit ) - event_results = event_list + event_results = set() - front = event_list + # We want to make sure that we do a breadth-first, "depth" ordered + # search. query = ( - "SELECT prev_event_id FROM event_edges " - "WHERE room_id = ? AND event_id = ? " - "LIMIT ?" + "SELECT depth, prev_event_id FROM event_edges" + " INNER JOIN events" + " ON prev_event_id = events.event_id" + " AND event_edges.room_id = events.room_id" + " WHERE event_edges.room_id = ? AND event_edges.event_id = ?" + " AND event_edges.is_state = ?" + " LIMIT ?" ) - # We iterate through all event_ids in `front` to select their previous - # events. These are dumped in `new_front`. - # We continue until we reach the limit *or* new_front is empty (i.e., - # we've run out of things to select - while front and len(event_results) < limit: + queue = PriorityQueue() - new_front = [] - for event_id in front: - logger.debug( - "_backfill_interaction: id=%s", - event_id - ) + for event_id in event_list: + depth = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={ + "event_id": event_id, + }, + retcol="depth" + ) - txn.execute( - query, - (room_id, event_id, limit - len(event_results)) - ) + queue.put((-depth, event_id)) - for row in txn.fetchall(): - logger.debug( - "_backfill_interaction: got id=%s", - *row - ) - new_front.append(row[0]) + while not queue.empty() and len(event_results) < limit: + try: + _, event_id = queue.get_nowait() + except Empty: + break - front = new_front - event_results += new_front + if event_id in event_results: + continue + + event_results.add(event_id) + + txn.execute( + query, + (room_id, event_id, False, limit - len(event_results)) + ) + + for row in txn.fetchall(): + if row[1] not in event_results: + queue.put((-row[0], row[1])) return event_results diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 80d0ac4ea3..4cac118d17 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -23,6 +23,7 @@ logger = logging.getLogger(__name__) class PushRuleStore(SQLBaseStore): + @cached() @defer.inlineCallbacks def get_push_rules_for_user(self, user_name): rows = yield self._simple_select_list( @@ -31,6 +32,7 @@ class PushRuleStore(SQLBaseStore): "user_name": user_name, }, retcols=PushRuleTable.fields, + desc="get_push_rules_enabled_for_user", ) rows.sort( @@ -151,6 +153,10 @@ class PushRuleStore(SQLBaseStore): txn.execute(sql, (user_name, priority_class, new_rule_priority)) txn.call_after( + self.get_push_rules_for_user.invalidate, user_name + ) + + txn.call_after( self.get_push_rules_enabled_for_user.invalidate, user_name ) @@ -183,6 +189,9 @@ class PushRuleStore(SQLBaseStore): new_rule['priority'] = new_prio txn.call_after( + self.get_push_rules_for_user.invalidate, user_name + ) + txn.call_after( self.get_push_rules_enabled_for_user.invalidate, user_name ) @@ -208,6 +217,8 @@ class PushRuleStore(SQLBaseStore): {'user_name': user_name, 'rule_id': rule_id}, desc="delete_push_rule", ) + + self.get_push_rules_for_user.invalidate(user_name) self.get_push_rules_enabled_for_user.invalidate(user_name) @defer.inlineCallbacks @@ -228,7 +239,12 @@ class PushRuleStore(SQLBaseStore): {'enabled': 1 if enabled else 0}, {'id': new_id}, ) - self.get_push_rules_enabled_for_user.invalidate(user_name) + txn.call_after( + self.get_push_rules_for_user.invalidate, user_name + ) + txn.call_after( + self.get_push_rules_enabled_for_user.invalidate, user_name + ) class RuleNotFoundException(Exception): |