summary refs log tree commit diff
path: root/synapse/storage/event_federation.py
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2015-08-12 17:07:22 +0100
committerMark Haines <mark.haines@matrix.org>2015-08-12 17:21:14 +0100
commit998a72d4d9ec6e73000888dcdf51437ec427fbee (patch)
treef8fa9d5deb820b49eb3a216e194ee83e14fb9eda /synapse/storage/event_federation.py
parentBump the version of twisted needed for setup_requires to 15.2.1 (diff)
parentMerge pull request #220 from matrix-org/markjh/generate_keys (diff)
downloadsynapse-998a72d4d9ec6e73000888dcdf51437ec427fbee.tar.xz
Merge branch 'develop' into markjh/twisted-15
Conflicts:
	synapse/http/matrixfederationclient.py
Diffstat (limited to 'synapse/storage/event_federation.py')
-rw-r--r--synapse/storage/event_federation.py132
1 files changed, 74 insertions, 58 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 1ba073884b..910b6598a7 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -49,14 +49,22 @@ class EventFederationStore(SQLBaseStore):
         results = set()
 
         base_sql = (
-            "SELECT auth_id FROM event_auth WHERE event_id = ?"
+            "SELECT auth_id FROM event_auth WHERE event_id IN (%s)"
         )
 
         front = set(event_ids)
         while front:
             new_front = set()
-            for f in front:
-                txn.execute(base_sql, (f,))
+            front_list = list(front)
+            chunks = [
+                front_list[x:x+100]
+                for x in xrange(0, len(front), 100)
+            ]
+            for chunk in chunks:
+                txn.execute(
+                    base_sql % (",".join(["?"] * len(chunk)),),
+                    chunk
+                )
                 new_front.update([r[0] for r in txn.fetchall()])
 
             new_front -= results
@@ -274,8 +282,7 @@ class EventFederationStore(SQLBaseStore):
                 },
             )
 
-    def _handle_prev_events(self, txn, outlier, event_id, prev_events,
-                            room_id):
+    def _handle_mult_prev_events(self, txn, events):
         """
         For the given event, update the event edges table and forward and
         backward extremities tables.
@@ -285,70 +292,77 @@ class EventFederationStore(SQLBaseStore):
             table="event_edges",
             values=[
                 {
-                    "event_id": event_id,
+                    "event_id": ev.event_id,
                     "prev_event_id": e_id,
-                    "room_id": room_id,
+                    "room_id": ev.room_id,
                     "is_state": False,
                 }
-                for e_id, _ in prev_events
+                for ev in events
+                for e_id, _ in ev.prev_events
             ],
         )
 
-        # Update the extremities table if this is not an outlier.
-        if not outlier:
-            for e_id, _ in prev_events:
-                # TODO (erikj): This could be done as a bulk insert
-                self._simple_delete_txn(
-                    txn,
-                    table="event_forward_extremities",
-                    keyvalues={
-                        "event_id": e_id,
-                        "room_id": room_id,
-                    }
-                )
+        events_by_room = {}
+        for ev in events:
+            events_by_room.setdefault(ev.room_id, []).append(ev)
 
-            # We only insert as a forward extremity the new event if there are
-            # no other events that reference it as a prev event
-            query = (
-                "SELECT 1 FROM event_edges WHERE prev_event_id = ?"
-            )
+        for room_id, room_events in events_by_room.items():
+            prevs = [
+                e_id for ev in room_events for e_id, _ in ev.prev_events
+                if not ev.internal_metadata.is_outlier()
+            ]
+            if prevs:
+                txn.execute(
+                    "DELETE FROM event_forward_extremities"
+                    " WHERE room_id = ?"
+                    " AND event_id in (%s)" % (
+                        ",".join(["?"] * len(prevs)),
+                    ),
+                    [room_id] + prevs,
+                )
 
-            txn.execute(query, (event_id,))
+        query = (
+            "INSERT INTO event_forward_extremities (event_id, room_id)"
+            " SELECT ?, ? WHERE NOT EXISTS ("
+            " SELECT 1 FROM event_edges WHERE prev_event_id = ?"
+            " )"
+        )
 
-            if not txn.fetchone():
-                query = (
-                    "INSERT INTO event_forward_extremities"
-                    " (event_id, room_id)"
-                    " VALUES (?, ?)"
-                )
+        txn.executemany(
+            query,
+            [(ev.event_id, ev.room_id, ev.event_id) for ev in events]
+        )
 
-                txn.execute(query, (event_id, room_id))
-
-            query = (
-                "INSERT INTO event_backward_extremities (event_id, room_id)"
-                " SELECT ?, ? WHERE NOT EXISTS ("
-                " SELECT 1 FROM event_backward_extremities"
-                " WHERE event_id = ? AND room_id = ?"
-                " )"
-                " AND NOT EXISTS ("
-                " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
-                " AND outlier = ?"
-                " )"
-            )
+        query = (
+            "INSERT INTO event_backward_extremities (event_id, room_id)"
+            " SELECT ?, ? WHERE NOT EXISTS ("
+            " SELECT 1 FROM event_backward_extremities"
+            " WHERE event_id = ? AND room_id = ?"
+            " )"
+            " AND NOT EXISTS ("
+            " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+            " AND outlier = ?"
+            " )"
+        )
 
-            txn.executemany(query, [
-                (e_id, room_id, e_id, room_id, e_id, room_id, False)
-                for e_id, _ in prev_events
-            ])
+        txn.executemany(query, [
+            (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+            for ev in events for e_id, _ in ev.prev_events
+            if not ev.internal_metadata.is_outlier()
+        ])
 
-            query = (
-                "DELETE FROM event_backward_extremities"
-                " WHERE event_id = ? AND room_id = ?"
-            )
-            txn.execute(query, (event_id, room_id))
+        query = (
+            "DELETE FROM event_backward_extremities"
+            " WHERE event_id = ? AND room_id = ?"
+        )
+        txn.executemany(
+            query,
+            [(ev.event_id, ev.room_id) for ev in events]
+        )
 
+        for room_id in events_by_room:
             txn.call_after(
-                self.get_latest_event_ids_in_room.invalidate, room_id
+                self.get_latest_event_ids_in_room.invalidate, (room_id,)
             )
 
     def get_backfill_events(self, room_id, event_list, limit):
@@ -400,10 +414,12 @@ class EventFederationStore(SQLBaseStore):
                 keyvalues={
                     "event_id": event_id,
                 },
-                retcol="depth"
+                retcol="depth",
+                allow_none=True,
             )
 
-            queue.put((-depth, event_id))
+            if depth:
+                queue.put((-depth, event_id))
 
         while not queue.empty() and len(event_results) < limit:
             try:
@@ -489,4 +505,4 @@ class EventFederationStore(SQLBaseStore):
         query = "DELETE FROM event_forward_extremities WHERE room_id = ?"
 
         txn.execute(query, (room_id,))
-        txn.call_after(self.get_latest_event_ids_in_room.invalidate, room_id)
+        txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,))