summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/event_federation.py74
-rw-r--r--synapse/storage/push_rule.py18
2 files changed, 62 insertions, 30 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 114ccece65..1ba073884b 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -19,6 +19,7 @@ from ._base import SQLBaseStore, cached
 from syutil.base64util import encode_base64
 
 import logging
+from Queue import PriorityQueue, Empty
 
 
 logger = logging.getLogger(__name__)
@@ -363,7 +364,11 @@ class EventFederationStore(SQLBaseStore):
         return self.runInteraction(
             "get_backfill_events",
             self._get_backfill_events, room_id, event_list, limit
-        ).addCallback(self._get_events)
+        ).addCallback(
+            self._get_events
+        ).addCallback(
+            lambda l: sorted(l, key=lambda e: -e.depth)
+        )
 
     def _get_backfill_events(self, txn, room_id, event_list, limit):
         logger.debug(
@@ -371,43 +376,54 @@ class EventFederationStore(SQLBaseStore):
             room_id, repr(event_list), limit
         )
 
-        event_results = event_list
+        event_results = set()
 
-        front = event_list
+        # We want to make sure that we do a breadth-first, "depth" ordered
+        # search.
 
         query = (
-            "SELECT prev_event_id FROM event_edges "
-            "WHERE room_id = ? AND event_id = ? "
-            "LIMIT ?"
+            "SELECT depth, prev_event_id FROM event_edges"
+            " INNER JOIN events"
+            " ON prev_event_id = events.event_id"
+            " AND event_edges.room_id = events.room_id"
+            " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+            " AND event_edges.is_state = ?"
+            " LIMIT ?"
         )
 
-        # We iterate through all event_ids in `front` to select their previous
-        # events. These are dumped in `new_front`.
-        # We continue until we reach the limit *or* new_front is empty (i.e.,
-        # we've run out of things to select
-        while front and len(event_results) < limit:
+        queue = PriorityQueue()
 
-            new_front = []
-            for event_id in front:
-                logger.debug(
-                    "_backfill_interaction: id=%s",
-                    event_id
-                )
+        for event_id in event_list:
+            depth = self._simple_select_one_onecol_txn(
+                txn,
+                table="events",
+                keyvalues={
+                    "event_id": event_id,
+                },
+                retcol="depth"
+            )
 
-                txn.execute(
-                    query,
-                    (room_id, event_id, limit - len(event_results))
-                )
+            queue.put((-depth, event_id))
 
-                for row in txn.fetchall():
-                    logger.debug(
-                        "_backfill_interaction: got id=%s",
-                        *row
-                    )
-                    new_front.append(row[0])
+        while not queue.empty() and len(event_results) < limit:
+            try:
+                _, event_id = queue.get_nowait()
+            except Empty:
+                break
 
-            front = new_front
-            event_results += new_front
+            if event_id in event_results:
+                continue
+
+            event_results.add(event_id)
+
+            txn.execute(
+                query,
+                (room_id, event_id, False, limit - len(event_results))
+            )
+
+            for row in txn.fetchall():
+                if row[1] not in event_results:
+                    queue.put((-row[0], row[1]))
 
         return event_results
 
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 80d0ac4ea3..4cac118d17 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -23,6 +23,7 @@ logger = logging.getLogger(__name__)
 
 
 class PushRuleStore(SQLBaseStore):
+    @cached()
     @defer.inlineCallbacks
     def get_push_rules_for_user(self, user_name):
         rows = yield self._simple_select_list(
@@ -31,6 +32,7 @@ class PushRuleStore(SQLBaseStore):
                 "user_name": user_name,
             },
             retcols=PushRuleTable.fields,
+            desc="get_push_rules_enabled_for_user",
         )
 
         rows.sort(
@@ -151,6 +153,10 @@ class PushRuleStore(SQLBaseStore):
             txn.execute(sql, (user_name, priority_class, new_rule_priority))
 
         txn.call_after(
+            self.get_push_rules_for_user.invalidate, user_name
+        )
+
+        txn.call_after(
             self.get_push_rules_enabled_for_user.invalidate, user_name
         )
 
@@ -183,6 +189,9 @@ class PushRuleStore(SQLBaseStore):
         new_rule['priority'] = new_prio
 
         txn.call_after(
+            self.get_push_rules_for_user.invalidate, user_name
+        )
+        txn.call_after(
             self.get_push_rules_enabled_for_user.invalidate, user_name
         )
 
@@ -208,6 +217,8 @@ class PushRuleStore(SQLBaseStore):
             {'user_name': user_name, 'rule_id': rule_id},
             desc="delete_push_rule",
         )
+
+        self.get_push_rules_for_user.invalidate(user_name)
         self.get_push_rules_enabled_for_user.invalidate(user_name)
 
     @defer.inlineCallbacks
@@ -228,7 +239,12 @@ class PushRuleStore(SQLBaseStore):
             {'enabled': 1 if enabled else 0},
             {'id': new_id},
         )
-        self.get_push_rules_enabled_for_user.invalidate(user_name)
+        txn.call_after(
+            self.get_push_rules_for_user.invalidate, user_name
+        )
+        txn.call_after(
+            self.get_push_rules_enabled_for_user.invalidate, user_name
+        )
 
 
 class RuleNotFoundException(Exception):