summary refs log tree commit diff
path: root/synapse/storage/event_federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/event_federation.py')
-rw-r--r--synapse/storage/event_federation.py176
1 files changed, 105 insertions, 71 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 74b4e23590..1ba073884b 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -13,10 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
+
 from ._base import SQLBaseStore, cached
 from syutil.base64util import encode_base64
 
 import logging
+from Queue import PriorityQueue, Empty
 
 
 logger = logging.getLogger(__name__)
@@ -33,16 +36,7 @@ class EventFederationStore(SQLBaseStore):
     """
 
     def get_auth_chain(self, event_ids):
-        return self.runInteraction(
-            "get_auth_chain",
-            self._get_auth_chain_txn,
-            event_ids
-        )
-
-    def _get_auth_chain_txn(self, txn, event_ids):
-        results = self._get_auth_chain_ids_txn(txn, event_ids)
-
-        return self._get_events_txn(txn, results)
+        return self.get_auth_chain_ids(event_ids).addCallback(self._get_events)
 
     def get_auth_chain_ids(self, event_ids):
         return self.runInteraction(
@@ -79,6 +73,28 @@ class EventFederationStore(SQLBaseStore):
             room_id,
         )
 
+    def get_oldest_events_with_depth_in_room(self, room_id):
+        return self.runInteraction(
+            "get_oldest_events_with_depth_in_room",
+            self.get_oldest_events_with_depth_in_room_txn,
+            room_id,
+        )
+
+    def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
+        sql = (
+            "SELECT b.event_id, MAX(e.depth) FROM events as e"
+            " INNER JOIN event_edges as g"
+            " ON g.event_id = e.event_id AND g.room_id = e.room_id"
+            " INNER JOIN event_backward_extremities as b"
+            " ON g.prev_event_id = b.event_id AND g.room_id = b.room_id"
+            " WHERE b.room_id = ? AND g.is_state is ?"
+            " GROUP BY b.event_id"
+        )
+
+        txn.execute(sql, (room_id, False,))
+
+        return dict(txn.fetchall())
+
     def _get_oldest_events_in_room_txn(self, txn, room_id):
         return self._simple_select_onecol_txn(
             txn,
@@ -247,11 +263,13 @@ class EventFederationStore(SQLBaseStore):
         do_insert = depth < min_depth if min_depth else True
 
         if do_insert:
-            self._simple_insert_txn(
+            self._simple_upsert_txn(
                 txn,
                 table="room_depth",
-                values={
+                keyvalues={
                     "room_id": room_id,
+                },
+                values={
                     "min_depth": depth,
                 },
             )
@@ -306,31 +324,28 @@ class EventFederationStore(SQLBaseStore):
 
                 txn.execute(query, (event_id, room_id))
 
-            # Insert all the prev_events as a backwards thing, they'll get
-            # deleted in a second if they're incorrect anyway.
-            self._simple_insert_many_txn(
-                txn,
-                table="event_backward_extremities",
-                values=[
-                    {
-                        "event_id": e_id,
-                        "room_id": room_id,
-                    }
-                    for e_id, _ in prev_events
-                ],
+            query = (
+                "INSERT INTO event_backward_extremities (event_id, room_id)"
+                " SELECT ?, ? WHERE NOT EXISTS ("
+                " SELECT 1 FROM event_backward_extremities"
+                " WHERE event_id = ? AND room_id = ?"
+                " )"
+                " AND NOT EXISTS ("
+                " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+                " AND outlier = ?"
+                " )"
             )
 
-            # Also delete from the backwards extremities table all ones that
-            # reference events that we have already seen
+            txn.executemany(query, [
+                (e_id, room_id, e_id, room_id, e_id, room_id, False)
+                for e_id, _ in prev_events
+            ])
+
             query = (
-                "DELETE FROM event_backward_extremities WHERE EXISTS ("
-                "SELECT 1 FROM events "
-                "WHERE "
-                "event_backward_extremities.event_id = events.event_id "
-                "AND not events.outlier "
-                ")"
+                "DELETE FROM event_backward_extremities"
+                " WHERE event_id = ? AND room_id = ?"
             )
-            txn.execute(query)
+            txn.execute(query, (event_id, room_id))
 
             txn.call_after(
                 self.get_latest_event_ids_in_room.invalidate, room_id
@@ -349,6 +364,10 @@ class EventFederationStore(SQLBaseStore):
         return self.runInteraction(
             "get_backfill_events",
             self._get_backfill_events, room_id, event_list, limit
+        ).addCallback(
+            self._get_events
+        ).addCallback(
+            lambda l: sorted(l, key=lambda e: -e.depth)
         )
 
     def _get_backfill_events(self, txn, room_id, event_list, limit):
@@ -357,54 +376,75 @@ class EventFederationStore(SQLBaseStore):
             room_id, repr(event_list), limit
         )
 
-        event_results = event_list
+        event_results = set()
 
-        front = event_list
+        # We want to make sure that we do a breadth-first, "depth" ordered
+        # search.
 
         query = (
-            "SELECT prev_event_id FROM event_edges "
-            "WHERE room_id = ? AND event_id = ? "
-            "LIMIT ?"
+            "SELECT depth, prev_event_id FROM event_edges"
+            " INNER JOIN events"
+            " ON prev_event_id = events.event_id"
+            " AND event_edges.room_id = events.room_id"
+            " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+            " AND event_edges.is_state = ?"
+            " LIMIT ?"
         )
 
-        # We iterate through all event_ids in `front` to select their previous
-        # events. These are dumped in `new_front`.
-        # We continue until we reach the limit *or* new_front is empty (i.e.,
-        # we've run out of things to select
-        while front and len(event_results) < limit:
+        queue = PriorityQueue()
 
-            new_front = []
-            for event_id in front:
-                logger.debug(
-                    "_backfill_interaction: id=%s",
-                    event_id
-                )
+        for event_id in event_list:
+            depth = self._simple_select_one_onecol_txn(
+                txn,
+                table="events",
+                keyvalues={
+                    "event_id": event_id,
+                },
+                retcol="depth"
+            )
 
-                txn.execute(
-                    query,
-                    (room_id, event_id, limit - len(event_results))
-                )
+            queue.put((-depth, event_id))
 
-                for row in txn.fetchall():
-                    logger.debug(
-                        "_backfill_interaction: got id=%s",
-                        *row
-                    )
-                    new_front.append(row[0])
+        while not queue.empty() and len(event_results) < limit:
+            try:
+                _, event_id = queue.get_nowait()
+            except Empty:
+                break
 
-            front = new_front
-            event_results += new_front
+            if event_id in event_results:
+                continue
+
+            event_results.add(event_id)
+
+            txn.execute(
+                query,
+                (room_id, event_id, False, limit - len(event_results))
+            )
+
+            for row in txn.fetchall():
+                if row[1] not in event_results:
+                    queue.put((-row[0], row[1]))
 
-        return self._get_events_txn(txn, event_results)
+        return event_results
 
+    @defer.inlineCallbacks
     def get_missing_events(self, room_id, earliest_events, latest_events,
                            limit, min_depth):
-        return self.runInteraction(
+        ids = yield self.runInteraction(
             "get_missing_events",
             self._get_missing_events,
             room_id, earliest_events, latest_events, limit, min_depth
         )
 
+        events = yield self._get_events(ids)
+
+        events = sorted(
+            [ev for ev in events if ev.depth >= min_depth],
+            key=lambda e: e.depth,
+        )
+
+        defer.returnValue(events[:limit])
+
     def _get_missing_events(self, txn, room_id, earliest_events, latest_events,
                             limit, min_depth):
 
@@ -436,14 +476,7 @@ class EventFederationStore(SQLBaseStore):
             front = new_front
             event_results |= new_front
 
-        events = self._get_events_txn(txn, event_results)
-
-        events = sorted(
-            [ev for ev in events if ev.depth >= min_depth],
-            key=lambda e: e.depth,
-        )
-
-        return events[:limit]
+        return event_results
 
     def clean_room_for_join(self, room_id):
         return self.runInteraction(
@@ -456,3 +489,4 @@ class EventFederationStore(SQLBaseStore):
         query = "DELETE FROM event_forward_extremities WHERE room_id = ?"
 
         txn.execute(query, (room_id,))
+        txn.call_after(self.get_latest_event_ids_in_room.invalidate, room_id)