diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 1ba073884b..dda3027b61 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -15,7 +15,8 @@
from twisted.internet import defer
-from ._base import SQLBaseStore, cached
+from ._base import SQLBaseStore
+from synapse.util.caches.descriptors import cached
from syutil.base64util import encode_base64
import logging
@@ -49,14 +50,22 @@ class EventFederationStore(SQLBaseStore):
results = set()
base_sql = (
- "SELECT auth_id FROM event_auth WHERE event_id = ?"
+ "SELECT auth_id FROM event_auth WHERE event_id IN (%s)"
)
front = set(event_ids)
while front:
new_front = set()
- for f in front:
- txn.execute(base_sql, (f,))
+ front_list = list(front)
+ chunks = [
+ front_list[x:x+100]
+ for x in xrange(0, len(front), 100)
+ ]
+ for chunk in chunks:
+ txn.execute(
+ base_sql % (",".join(["?"] * len(chunk)),),
+ chunk
+ )
new_front.update([r[0] for r in txn.fetchall()])
new_front -= results
@@ -274,8 +283,7 @@ class EventFederationStore(SQLBaseStore):
},
)
- def _handle_prev_events(self, txn, outlier, event_id, prev_events,
- room_id):
+ def _handle_mult_prev_events(self, txn, events):
"""
For the given event, update the event edges table and forward and
backward extremities tables.
@@ -285,70 +293,83 @@ class EventFederationStore(SQLBaseStore):
table="event_edges",
values=[
{
- "event_id": event_id,
+ "event_id": ev.event_id,
"prev_event_id": e_id,
- "room_id": room_id,
+ "room_id": ev.room_id,
"is_state": False,
}
- for e_id, _ in prev_events
+ for ev in events
+ for e_id, _ in ev.prev_events
],
)
- # Update the extremities table if this is not an outlier.
- if not outlier:
- for e_id, _ in prev_events:
- # TODO (erikj): This could be done as a bulk insert
- self._simple_delete_txn(
- txn,
- table="event_forward_extremities",
- keyvalues={
- "event_id": e_id,
- "room_id": room_id,
- }
- )
+ events_by_room = {}
+ for ev in events:
+ events_by_room.setdefault(ev.room_id, []).append(ev)
- # We only insert as a forward extremity the new event if there are
- # no other events that reference it as a prev event
- query = (
- "SELECT 1 FROM event_edges WHERE prev_event_id = ?"
- )
+ for room_id, room_events in events_by_room.items():
+ prevs = [
+ e_id for ev in room_events for e_id, _ in ev.prev_events
+ if not ev.internal_metadata.is_outlier()
+ ]
+ if prevs:
+ txn.execute(
+ "DELETE FROM event_forward_extremities"
+ " WHERE room_id = ?"
+ " AND event_id in (%s)" % (
+ ",".join(["?"] * len(prevs)),
+ ),
+ [room_id] + prevs,
+ )
- txn.execute(query, (event_id,))
+ query = (
+ "INSERT INTO event_forward_extremities (event_id, room_id)"
+ " SELECT ?, ? WHERE NOT EXISTS ("
+ " SELECT 1 FROM event_edges WHERE prev_event_id = ?"
+ " )"
+ )
- if not txn.fetchone():
- query = (
- "INSERT INTO event_forward_extremities"
- " (event_id, room_id)"
- " VALUES (?, ?)"
- )
+ txn.executemany(
+ query,
+ [
+ (ev.event_id, ev.room_id, ev.event_id) for ev in events
+ if not ev.internal_metadata.is_outlier()
+ ]
+ )
- txn.execute(query, (event_id, room_id))
-
- query = (
- "INSERT INTO event_backward_extremities (event_id, room_id)"
- " SELECT ?, ? WHERE NOT EXISTS ("
- " SELECT 1 FROM event_backward_extremities"
- " WHERE event_id = ? AND room_id = ?"
- " )"
- " AND NOT EXISTS ("
- " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
- " AND outlier = ?"
- " )"
- )
+ query = (
+ "INSERT INTO event_backward_extremities (event_id, room_id)"
+ " SELECT ?, ? WHERE NOT EXISTS ("
+ " SELECT 1 FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ " )"
+ " AND NOT EXISTS ("
+ " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+ " AND outlier = ?"
+ " )"
+ )
- txn.executemany(query, [
- (e_id, room_id, e_id, room_id, e_id, room_id, False)
- for e_id, _ in prev_events
- ])
+ txn.executemany(query, [
+ (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+ for ev in events for e_id, _ in ev.prev_events
+ if not ev.internal_metadata.is_outlier()
+ ])
- query = (
- "DELETE FROM event_backward_extremities"
- " WHERE event_id = ? AND room_id = ?"
- )
- txn.execute(query, (event_id, room_id))
+ query = (
+ "DELETE FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ )
+ txn.executemany(
+ query,
+ [
+ (ev.event_id, ev.room_id) for ev in events
+ if not ev.internal_metadata.is_outlier()
+ ]
+ )
+ for room_id in events_by_room:
txn.call_after(
- self.get_latest_event_ids_in_room.invalidate, room_id
+ self.get_latest_event_ids_in_room.invalidate, (room_id,)
)
def get_backfill_events(self, room_id, event_list, limit):
@@ -400,10 +421,12 @@ class EventFederationStore(SQLBaseStore):
keyvalues={
"event_id": event_id,
},
- retcol="depth"
+ retcol="depth",
+ allow_none=True,
)
- queue.put((-depth, event_id))
+ if depth:
+ queue.put((-depth, event_id))
while not queue.empty() and len(event_results) < limit:
try:
@@ -489,4 +512,4 @@ class EventFederationStore(SQLBaseStore):
query = "DELETE FROM event_forward_extremities WHERE room_id = ?"
txn.execute(query, (room_id,))
- txn.call_after(self.get_latest_event_ids_in_room.invalidate, room_id)
+ txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,))
|