diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index dda3027b61..ca1f3977c9 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -303,75 +303,82 @@ class EventFederationStore(SQLBaseStore):
],
)
+ @defer.inlineCallbacks
+ def _update_extremeties(self, events):
events_by_room = {}
for ev in events:
events_by_room.setdefault(ev.room_id, []).append(ev)
- for room_id, room_events in events_by_room.items():
- prevs = [
- e_id for ev in room_events for e_id, _ in ev.prev_events
- if not ev.internal_metadata.is_outlier()
- ]
- if prevs:
- txn.execute(
- "DELETE FROM event_forward_extremities"
- " WHERE room_id = ?"
- " AND event_id in (%s)" % (
- ",".join(["?"] * len(prevs)),
- ),
- [room_id] + prevs,
- )
-
- query = (
- "INSERT INTO event_forward_extremities (event_id, room_id)"
- " SELECT ?, ? WHERE NOT EXISTS ("
- " SELECT 1 FROM event_edges WHERE prev_event_id = ?"
- " )"
- )
-
- txn.executemany(
- query,
- [
- (ev.event_id, ev.room_id, ev.event_id) for ev in events
- if not ev.internal_metadata.is_outlier()
- ]
- )
+ def _update_forwards_txn(txn):
+ for room_id, room_events in events_by_room.items():
+ prevs = [
+ e_id for ev in room_events for e_id, _ in ev.prev_events
+ if not ev.internal_metadata.is_outlier()
+ ]
+ if prevs:
+ txn.execute(
+ "DELETE FROM event_forward_extremities"
+ " WHERE room_id = ?"
+ " AND event_id in (%s)" % (
+ ",".join(["?"] * len(prevs)),
+ ),
+ [room_id] + prevs,
+ )
+
+ query = (
+ "INSERT INTO event_forward_extremities (event_id, room_id)"
+ " SELECT ?, ? WHERE NOT EXISTS ("
+ " SELECT 1 FROM event_edges WHERE prev_event_id = ?"
+ " )"
+ )
- query = (
- "INSERT INTO event_backward_extremities (event_id, room_id)"
- " SELECT ?, ? WHERE NOT EXISTS ("
- " SELECT 1 FROM event_backward_extremities"
- " WHERE event_id = ? AND room_id = ?"
- " )"
- " AND NOT EXISTS ("
- " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
- " AND outlier = ?"
- " )"
- )
+ txn.executemany(
+ query,
+ [
+ (ev.event_id, ev.room_id, ev.event_id) for ev in events
+ if not ev.internal_metadata.is_outlier()
+ ]
+ )
- txn.executemany(query, [
- (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
- for ev in events for e_id, _ in ev.prev_events
- if not ev.internal_metadata.is_outlier()
- ])
+ def _update_backwards_txn(txn):
+ query = (
+ "INSERT INTO event_backward_extremities (event_id, room_id)"
+ " SELECT ?, ? WHERE NOT EXISTS ("
+ " SELECT 1 FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ " )"
+ " AND NOT EXISTS ("
+ " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+ " AND outlier = ?"
+ " )"
+ )
- query = (
- "DELETE FROM event_backward_extremities"
- " WHERE event_id = ? AND room_id = ?"
- )
- txn.executemany(
- query,
- [
- (ev.event_id, ev.room_id) for ev in events
+ txn.executemany(query, [
+ (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+ for ev in events for e_id, _ in ev.prev_events
if not ev.internal_metadata.is_outlier()
- ]
- )
+ ])
- for room_id in events_by_room:
- txn.call_after(
- self.get_latest_event_ids_in_room.invalidate, (room_id,)
+ query = (
+ "DELETE FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ )
+ txn.executemany(
+ query,
+ [
+ (ev.event_id, ev.room_id) for ev in events
+ if not ev.internal_metadata.is_outlier()
+ ]
)
+ for room_id in events_by_room:
+ txn.call_after(
+ self.get_latest_event_ids_in_room.invalidate, (room_id,)
+ )
+
+ yield self.runInteraction("_update_forwards_txn", _update_forwards_txn)
+ yield self.runInteraction("_update_backwards_txn", _update_backwards_txn)
+
def get_backfill_events(self, room_id, event_list, limit):
"""Get a list of Events for a given topic that occurred before (and
including) the events in event_list. Return a list of max size `limit`
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index e3eabab13d..0501eeb6e0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -86,6 +86,10 @@ class EventsStore(SQLBaseStore):
is_new_state=is_new_state,
)
+ yield self._update_extremeties([
+ ev for ev, _ in chunk
+ ])
+
@defer.inlineCallbacks
@log_function
def persist_event(self, event, context, backfilled=False,
@@ -120,6 +124,8 @@ class EventsStore(SQLBaseStore):
except _RollbackButIsFineException:
pass
+ yield self._update_extremeties([event])
+
max_persisted_id = yield self._stream_id_gen.get_max_token(self)
defer.returnValue((stream_ordering, max_persisted_id))
|