summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/events.py42
1 files changed, 31 insertions, 11 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 8659f605a5..b5e4b6f4b0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -301,15 +301,24 @@ class EventsStore(SQLBaseStore):
                             latest_event_ids = yield self.get_latest_event_ids_in_room(
                                 room_id
                             )
+                            latest_event_ids = frozenset(latest_event_ids)
                             new_latest_event_ids = yield self._calculate_new_extremeties(
-                                room_id, [ev for ev, _ in ev_ctx_rm]
+                                room_id, [ev for ev, _ in ev_ctx_rm], latest_event_ids
                             )
 
-                            if new_latest_event_ids == set(latest_event_ids):
+                            if new_latest_event_ids == latest_event_ids:
                                 # No change in extremities, so no change in state
                                 continue
 
-                            new_forward_extremeties[room_id] = new_latest_event_ids
+                            to_add = new_latest_event_ids - latest_event_ids
+                            to_remove = latest_event_ids - new_latest_event_ids
+
+                            new_forward_extremeties[room_id] = {
+                                "full_list": new_latest_event_ids,
+                                "to_add": to_add,
+                                "to_remove": to_remove,
+                                "prev_latest": latest_event_ids,
+                            }
 
                             state = yield self._calculate_state_delta(
                                 room_id, ev_ctx_rm, new_latest_event_ids
@@ -329,15 +338,12 @@ class EventsStore(SQLBaseStore):
                 persist_event_counter.inc_by(len(chunk))
 
     @defer.inlineCallbacks
-    def _calculate_new_extremeties(self, room_id, events):
+    def _calculate_new_extremeties(self, room_id, events, latest_event_ids):
         """Calculates the new forward extremeties for a room given events to
         persist.
 
         Assumes that we are only persisting events for one room at a time.
         """
-        latest_event_ids = yield self.get_latest_event_ids_in_room(
-            room_id
-        )
         new_latest_event_ids = set(latest_event_ids)
         # First, add all the new events to the list
         new_latest_event_ids.update(
@@ -573,12 +579,26 @@ class EventsStore(SQLBaseStore):
                     txn, self.get_users_in_room, (room_id,)
                 )
 
-        for room_id, new_extrem in new_forward_extremeties.items():
-            self._simple_delete_txn(
+        for room_id, new_extrem_dict in new_forward_extremeties.items():
+            current_latest = self._simple_select_onecol_txn(
                 txn,
                 table="event_forward_extremities",
                 keyvalues={"room_id": room_id},
+                retcol="event_id"
             )
+
+            if set(current_latest) != new_extrem_dict["prev_latest"]:
+                raise RuntimeError(
+                    "event_forward_extremities don't match that when we"
+                    " calculated new extrems"
+                )
+
+            txn.executemany(
+                "DELETE FROM event_forward_extremities"
+                " WHERE room_id = ? AND event_id = ?",
+                ((room_id, event_id) for event_id in new_extrem_dict["to_remove"])
+            )
+
             txn.call_after(
                 self.get_latest_event_ids_in_room.invalidate, (room_id,)
             )
@@ -592,7 +612,7 @@ class EventsStore(SQLBaseStore):
                     "room_id": room_id,
                 }
                 for room_id, new_extrem in new_forward_extremeties.items()
-                for ev_id in new_extrem
+                for ev_id in new_extrem["to_add"]
             ],
         )
         # We now insert into stream_ordering_to_exterm a mapping from room_id,
@@ -609,7 +629,7 @@ class EventsStore(SQLBaseStore):
                     "stream_ordering": max_stream_order,
                 }
                 for room_id, new_extrem in new_forward_extremeties.items()
-                for event_id in new_extrem
+                for event_id in new_extrem["full_list"]
             ]
         )