summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/event_federation.py123
-rw-r--r--synapse/storage/events.py6
2 files changed, 71 insertions, 58 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index dda3027b61..ca1f3977c9 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -303,75 +303,82 @@ class EventFederationStore(SQLBaseStore):
             ],
         )
 
+    @defer.inlineCallbacks
+    def _update_extremeties(self, events):
         events_by_room = {}
         for ev in events:
             events_by_room.setdefault(ev.room_id, []).append(ev)
 
-        for room_id, room_events in events_by_room.items():
-            prevs = [
-                e_id for ev in room_events for e_id, _ in ev.prev_events
-                if not ev.internal_metadata.is_outlier()
-            ]
-            if prevs:
-                txn.execute(
-                    "DELETE FROM event_forward_extremities"
-                    " WHERE room_id = ?"
-                    " AND event_id in (%s)" % (
-                        ",".join(["?"] * len(prevs)),
-                    ),
-                    [room_id] + prevs,
-                )
-
-        query = (
-            "INSERT INTO event_forward_extremities (event_id, room_id)"
-            " SELECT ?, ? WHERE NOT EXISTS ("
-            " SELECT 1 FROM event_edges WHERE prev_event_id = ?"
-            " )"
-        )
-
-        txn.executemany(
-            query,
-            [
-                (ev.event_id, ev.room_id, ev.event_id) for ev in events
-                if not ev.internal_metadata.is_outlier()
-            ]
-        )
+        def _update_forwards_txn(txn):
+            for room_id, room_events in events_by_room.items():
+                prevs = [
+                    e_id for ev in room_events for e_id, _ in ev.prev_events
+                    if not ev.internal_metadata.is_outlier()
+                ]
+                if prevs:
+                    txn.execute(
+                        "DELETE FROM event_forward_extremities"
+                        " WHERE room_id = ?"
+                        " AND event_id in (%s)" % (
+                            ",".join(["?"] * len(prevs)),
+                        ),
+                        [room_id] + prevs,
+                    )
+
+            query = (
+                "INSERT INTO event_forward_extremities (event_id, room_id)"
+                " SELECT ?, ? WHERE NOT EXISTS ("
+                " SELECT 1 FROM event_edges WHERE prev_event_id = ?"
+                " )"
+            )
 
-        query = (
-            "INSERT INTO event_backward_extremities (event_id, room_id)"
-            " SELECT ?, ? WHERE NOT EXISTS ("
-            " SELECT 1 FROM event_backward_extremities"
-            " WHERE event_id = ? AND room_id = ?"
-            " )"
-            " AND NOT EXISTS ("
-            " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
-            " AND outlier = ?"
-            " )"
-        )
+            txn.executemany(
+                query,
+                [
+                    (ev.event_id, ev.room_id, ev.event_id) for ev in events
+                    if not ev.internal_metadata.is_outlier()
+                ]
+            )
 
-        txn.executemany(query, [
-            (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
-            for ev in events for e_id, _ in ev.prev_events
-            if not ev.internal_metadata.is_outlier()
-        ])
+        def _update_backwards_txn(txn):
+            query = (
+                "INSERT INTO event_backward_extremities (event_id, room_id)"
+                " SELECT ?, ? WHERE NOT EXISTS ("
+                " SELECT 1 FROM event_backward_extremities"
+                " WHERE event_id = ? AND room_id = ?"
+                " )"
+                " AND NOT EXISTS ("
+                " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+                " AND outlier = ?"
+                " )"
+            )
 
-        query = (
-            "DELETE FROM event_backward_extremities"
-            " WHERE event_id = ? AND room_id = ?"
-        )
-        txn.executemany(
-            query,
-            [
-                (ev.event_id, ev.room_id) for ev in events
+            txn.executemany(query, [
+                (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+                for ev in events for e_id, _ in ev.prev_events
                 if not ev.internal_metadata.is_outlier()
-            ]
-        )
+            ])
 
-        for room_id in events_by_room:
-            txn.call_after(
-                self.get_latest_event_ids_in_room.invalidate, (room_id,)
+            query = (
+                "DELETE FROM event_backward_extremities"
+                " WHERE event_id = ? AND room_id = ?"
+            )
+            txn.executemany(
+                query,
+                [
+                    (ev.event_id, ev.room_id) for ev in events
+                    if not ev.internal_metadata.is_outlier()
+                ]
             )
 
+            for room_id in events_by_room:
+                txn.call_after(
+                    self.get_latest_event_ids_in_room.invalidate, (room_id,)
+                )
+
+        yield self.runInteraction("_update_forwards_txn", _update_forwards_txn)
+        yield self.runInteraction("_update_backwards_txn", _update_backwards_txn)
+
     def get_backfill_events(self, room_id, event_list, limit):
         """Get a list of Events for a given topic that occurred before (and
         including) the events in event_list. Return a list of max size `limit`
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index e3eabab13d..0501eeb6e0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -86,6 +86,10 @@ class EventsStore(SQLBaseStore):
                     is_new_state=is_new_state,
                 )
 
+                yield self._update_extremeties([
+                    ev for ev, _ in chunk
+                ])
+
     @defer.inlineCallbacks
     @log_function
     def persist_event(self, event, context, backfilled=False,
@@ -120,6 +124,8 @@ class EventsStore(SQLBaseStore):
         except _RollbackButIsFineException:
             pass
 
+        yield self._update_extremeties([event])
+
         max_persisted_id = yield self._stream_id_gen.get_max_token(self)
         defer.returnValue((stream_ordering, max_persisted_id))