summary refs log tree commit diff
path: root/synapse/storage/event_federation.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-07-14 10:49:24 +0100
committerErik Johnston <erik@matrix.org>2015-07-14 10:49:24 +0100
commitbaa55fb69e88bc95f9ea603ca9d1cd758cba5187 (patch)
tree58040d05bde26aa28b128bc8784b1006426f2aea /synapse/storage/event_federation.py
parentClose, but no cigar. (diff)
parentRemove commented out code (diff)
downloadsynapse-baa55fb69e88bc95f9ea603ca9d1cd758cba5187.tar.xz
Merge pull request #193 from matrix-org/erikj/bulk_persist_event
Add bulk insert events API
Diffstat (limited to 'synapse/storage/event_federation.py')
-rw-r--r--synapse/storage/event_federation.py108
1 files changed, 57 insertions, 51 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index c71019d93b..45b86c94e8 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -282,8 +282,7 @@ class EventFederationStore(SQLBaseStore):
                 },
             )
 
-    def _handle_prev_events(self, txn, outlier, event_id, prev_events,
-                            room_id):
+    def _handle_mult_prev_events(self, txn, events):
         """
         For the given event, update the event edges table and forward and
         backward extremities tables.
@@ -293,68 +292,75 @@ class EventFederationStore(SQLBaseStore):
             table="event_edges",
             values=[
                 {
-                    "event_id": event_id,
+                    "event_id": ev.event_id,
                     "prev_event_id": e_id,
-                    "room_id": room_id,
+                    "room_id": ev.room_id,
                     "is_state": False,
                 }
-                for e_id, _ in prev_events
+                for ev in events
+                for e_id, _ in ev.prev_events
             ],
         )
 
-        # Update the extremities table if this is not an outlier.
-        if not outlier:
-            for e_id, _ in prev_events:
-                # TODO (erikj): This could be done as a bulk insert
-                self._simple_delete_txn(
-                    txn,
-                    table="event_forward_extremities",
-                    keyvalues={
-                        "event_id": e_id,
-                        "room_id": room_id,
-                    }
-                )
+        events_by_room = {}
+        for ev in events:
+            events_by_room.setdefault(ev.room_id, []).append(ev)
 
-            # We only insert as a forward extremity the new event if there are
-            # no other events that reference it as a prev event
-            query = (
-                "SELECT 1 FROM event_edges WHERE prev_event_id = ?"
-            )
+        for room_id, room_events in events_by_room.items():
+            prevs = [
+                e_id for ev in room_events for e_id, _ in ev.prev_events
+                if not ev.internal_metadata.is_outlier()
+            ]
+            if prevs:
+                txn.execute(
+                    "DELETE FROM event_forward_extremities"
+                    " WHERE room_id = ?"
+                    " AND event_id in (%s)" % (
+                        ",".join(["?"] * len(prevs)),
+                    ),
+                    [room_id] + prevs,
+                )
 
-            txn.execute(query, (event_id,))
+        query = (
+            "INSERT INTO event_forward_extremities (event_id, room_id)"
+            " SELECT ?, ? WHERE NOT EXISTS ("
+            " SELECT 1 FROM event_edges WHERE prev_event_id = ?"
+            " )"
+        )
 
-            if not txn.fetchone():
-                query = (
-                    "INSERT INTO event_forward_extremities"
-                    " (event_id, room_id)"
-                    " VALUES (?, ?)"
-                )
+        txn.executemany(
+            query,
+            [(ev.event_id, ev.room_id, ev.event_id) for ev in events]
+        )
 
-                txn.execute(query, (event_id, room_id))
-
-            query = (
-                "INSERT INTO event_backward_extremities (event_id, room_id)"
-                " SELECT ?, ? WHERE NOT EXISTS ("
-                " SELECT 1 FROM event_backward_extremities"
-                " WHERE event_id = ? AND room_id = ?"
-                " )"
-                " AND NOT EXISTS ("
-                " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
-                " AND outlier = ?"
-                " )"
-            )
+        query = (
+            "INSERT INTO event_backward_extremities (event_id, room_id)"
+            " SELECT ?, ? WHERE NOT EXISTS ("
+            " SELECT 1 FROM event_backward_extremities"
+            " WHERE event_id = ? AND room_id = ?"
+            " )"
+            " AND NOT EXISTS ("
+            " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+            " AND outlier = ?"
+            " )"
+        )
 
-            txn.executemany(query, [
-                (e_id, room_id, e_id, room_id, e_id, room_id, False)
-                for e_id, _ in prev_events
-            ])
+        txn.executemany(query, [
+            (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+            for ev in events for e_id, _ in ev.prev_events
+            if not ev.internal_metadata.is_outlier()
+        ])
 
-            query = (
-                "DELETE FROM event_backward_extremities"
-                " WHERE event_id = ? AND room_id = ?"
-            )
-            txn.execute(query, (event_id, room_id))
+        query = (
+            "DELETE FROM event_backward_extremities"
+            " WHERE event_id = ? AND room_id = ?"
+        )
+        txn.executemany(
+            query,
+            [(ev.event_id, ev.room_id) for ev in events]
+        )
 
+        for room_id in events_by_room:
             txn.call_after(
                 self.get_latest_event_ids_in_room.invalidate, room_id
             )