summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/storage/events.py142
1 files changed, 86 insertions, 56 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 0d6519f30d..295f2522b5 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -279,6 +279,7 @@ class EventsStore(SQLBaseStore):
                 # We can't easily parallelize these since different chunks
                 # might contain the same event. :(
 
+                new_forward_extremeties = {}
                 current_state_for_room = {}
                 if not backfilled:
                     # Work out the new "current state" for each room.
@@ -296,20 +297,16 @@ class EventsStore(SQLBaseStore):
                         latest_event_ids = yield self.get_latest_event_ids_in_room(
                             room_id
                         )
-                        new_latest_event_ids = set(latest_event_ids)
-                        for event, ctx in ev_ctx_rm:
-                            if event.internal_metadata.is_outlier():
-                                continue
-
-                            new_latest_event_ids.difference_update(
-                                e_id for e_id, _ in event.prev_events
-                            )
-                            new_latest_event_ids.add(event.event_id)
+                        new_latest_event_ids = yield self._calculate_new_extremeties(
+                            room_id, [ev for ev, _ in ev_ctx_rm]
+                        )
 
                         if new_latest_event_ids == set(latest_event_ids):
                             # No change in extremities, so no change in state
                             continue
 
+                        new_forward_extremeties[room_id] = new_latest_event_ids
+
                         # Now we need to work out the different state sets for
                         # each state extremities
                         state_sets = []
@@ -358,10 +355,46 @@ class EventsStore(SQLBaseStore):
                     backfilled=backfilled,
                     delete_existing=delete_existing,
                     current_state_for_room=current_state_for_room,
+                    new_forward_extremeties=new_forward_extremeties,
                 )
                 persist_event_counter.inc_by(len(chunk))
 
     @defer.inlineCallbacks
+    def _calculate_new_extremeties(self, room_id, events):
+        latest_event_ids = yield self.get_latest_event_ids_in_room(
+            room_id
+        )
+        new_latest_event_ids = set(latest_event_ids)
+        new_latest_event_ids.update(
+            event.event_id for event in events
+            if not event.internal_metadata.is_outlier()
+        )
+        new_latest_event_ids.difference_update(
+            e_id
+            for event in events
+            for e_id, _ in event.prev_events
+            if not event.internal_metadata.is_outlier()
+        )
+
+        rows = yield self._simple_select_many_batch(
+            table="event_edges",
+            column="prev_event_id",
+            iterable=list(new_latest_event_ids),
+            retcols=["prev_event_id"],
+            keyvalues={
+                "room_id": room_id,
+                "is_state": False,
+            },
+            desc="_calculate_new_extremeties",
+        )
+
+        new_latest_event_ids.difference_update(
+            row["prev_event_id"] for row in rows
+        )
+
+        defer.returnValue(new_latest_event_ids)
+
+    @defer.inlineCallbacks
     def get_event(self, event_id, check_redacted=True,
                   get_prev_content=False, allow_rejected=False,
                   allow_none=False):
@@ -418,52 +451,9 @@ class EventsStore(SQLBaseStore):
         defer.returnValue({e.event_id: e for e in events})
 
     @log_function
-    def _persist_event_txn(self, txn, event, context, current_state, backfilled=False,
-                           delete_existing=False):
-        # We purposefully do this first since if we include a `current_state`
-        # key, we *want* to update the `current_state_events` table
-        if current_state:
-            txn.call_after(self._get_current_state_for_key.invalidate_all)
-            txn.call_after(self.get_rooms_for_user.invalidate_all)
-            txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
-
-            # Add an entry to the current_state_resets table to record the point
-            # where we clobbered the current state
-            stream_order = event.internal_metadata.stream_ordering
-            self._simple_insert_txn(
-                txn,
-                table="current_state_resets",
-                values={"event_stream_ordering": stream_order}
-            )
-
-            self._simple_delete_txn(
-                txn,
-                table="current_state_events",
-                keyvalues={"room_id": event.room_id},
-            )
-
-            for s in current_state:
-                self._simple_insert_txn(
-                    txn,
-                    "current_state_events",
-                    {
-                        "event_id": s.event_id,
-                        "room_id": s.room_id,
-                        "type": s.type,
-                        "state_key": s.state_key,
-                    }
-                )
-
-        return self._persist_events_txn(
-            txn,
-            [(event, context)],
-            backfilled=backfilled,
-            delete_existing=delete_existing,
-        )
-
-    @log_function
     def _persist_events_txn(self, txn, events_and_contexts, backfilled,
-                            delete_existing=False, current_state_for_room={}):
+                            delete_existing=False, current_state_for_room={},
+                            new_forward_extremeties={}):
         """Insert some number of room events into the necessary database tables.
 
         Rejected events are only inserted into the events table, the events_json table,
@@ -473,6 +463,7 @@ class EventsStore(SQLBaseStore):
         If delete_existing is True then existing events will be purged from the
         database before insertion. This is useful when retrying due to IntegrityError.
         """
+        max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
         for room_id, current_state in current_state_for_room.iteritems():
             txn.call_after(self._get_current_state_for_key.invalidate_all)
             txn.call_after(self.get_rooms_for_user.invalidate_all)
@@ -480,11 +471,10 @@ class EventsStore(SQLBaseStore):
 
             # Add an entry to the current_state_resets table to record the point
             # where we clobbered the current state
-            stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
             self._simple_insert_txn(
                 txn,
                 table="current_state_resets",
-                values={"event_stream_ordering": stream_order}
+                values={"event_stream_ordering": max_stream_order}
             )
 
             self._simple_delete_txn(
@@ -507,6 +497,46 @@ class EventsStore(SQLBaseStore):
                 ],
             )
 
+        for room_id, new_extrem in new_forward_extremeties.items():
+            self._simple_delete_txn(
+                txn,
+                table="event_forward_extremities",
+                keyvalues={"room_id": room_id},
+            )
+            txn.call_after(
+                self.get_latest_event_ids_in_room.invalidate, (room_id,)
+            )
+
+        self._simple_insert_many_txn(
+            txn,
+            table="event_forward_extremities",
+            values=[
+                {
+                    "event_id": ev_id,
+                    "room_id": room_id,
+                }
+                for room_id, new_extrem in new_forward_extremeties.items()
+                for ev_id in new_extrem
+            ],
+        )
+        # We now insert into stream_ordering_to_exterm a mapping from room_id,
+        # new stream_ordering to new forward extremeties in the room.
+        # This allows us to later efficiently look up the forward extremeties
+        # for a room before a given stream_ordering
+        self._simple_insert_many_txn(
+            txn,
+            table="stream_ordering_to_exterm",
+            values=[
+                {
+                    "room_id": room_id,
+                    "event_id": event_id,
+                    "stream_ordering": max_stream_order,
+                }
+                for room_id, new_extrem in new_forward_extremeties.items()
+                for event_id in new_extrem
+            ]
+        )
+
         # Ensure that we don't have the same event twice.
         # Pick the earliest non-outlier if there is one, else the earliest one.
         new_events_and_contexts = OrderedDict()