summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/event_federation.py76
-rw-r--r--synapse/storage/events.py142
2 files changed, 92 insertions, 126 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 53feaa1960..f0aa2193fb 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -235,80 +235,21 @@ class EventFederationStore(SQLBaseStore):
             ],
         )
 
-        self._update_extremeties(txn, events)
+        self._update_backward_extremeties(txn, events)
 
-    def _update_extremeties(self, txn, events):
-        """Updates the event_*_extremities tables based on the new/updated
+    def _update_backward_extremeties(self, txn, events):
+        """Updates the event_backward_extremities tables based on the new/updated
         events being persisted.
 
         This is called for new events *and* for events that were outliers, but
-        are are now being persisted as non-outliers.
+        are now being persisted as non-outliers.
+
+        Forward extremities are handled when we first start persisting the events.
         """
         events_by_room = {}
         for ev in events:
             events_by_room.setdefault(ev.room_id, []).append(ev)
 
-        for room_id, room_events in events_by_room.items():
-            prevs = [
-                e_id for ev in room_events for e_id, _ in ev.prev_events
-                if not ev.internal_metadata.is_outlier()
-            ]
-            if prevs:
-                txn.execute(
-                    "DELETE FROM event_forward_extremities"
-                    " WHERE room_id = ?"
-                    " AND event_id in (%s)" % (
-                        ",".join(["?"] * len(prevs)),
-                    ),
-                    [room_id] + prevs,
-                )
-
-        query = (
-            "INSERT INTO event_forward_extremities (event_id, room_id)"
-            " SELECT ?, ? WHERE NOT EXISTS ("
-            " SELECT 1 FROM event_edges WHERE prev_event_id = ?"
-            " )"
-        )
-
-        txn.executemany(
-            query,
-            [
-                (ev.event_id, ev.room_id, ev.event_id) for ev in events
-                if not ev.internal_metadata.is_outlier()
-            ]
-        )
-
-        # We now insert into stream_ordering_to_exterm a mapping from room_id,
-        # new stream_ordering to new forward extremeties in the room.
-        # This allows us to later efficiently look up the forward extremeties
-        # for a room before a given stream_ordering
-        max_stream_ord = max(
-            ev.internal_metadata.stream_ordering for ev in events
-        )
-        new_extrem = {}
-        for room_id in events_by_room:
-            event_ids = self._simple_select_onecol_txn(
-                txn,
-                table="event_forward_extremities",
-                keyvalues={"room_id": room_id},
-                retcol="event_id",
-            )
-            new_extrem[room_id] = event_ids
-
-        self._simple_insert_many_txn(
-            txn,
-            table="stream_ordering_to_exterm",
-            values=[
-                {
-                    "room_id": room_id,
-                    "event_id": event_id,
-                    "stream_ordering": max_stream_ord,
-                }
-                for room_id, extrem_evs in new_extrem.items()
-                for event_id in extrem_evs
-            ]
-        )
-
         query = (
             "INSERT INTO event_backward_extremities (event_id, room_id)"
             " SELECT ?, ? WHERE NOT EXISTS ("
@@ -339,11 +280,6 @@ class EventFederationStore(SQLBaseStore):
             ]
         )
 
-        for room_id in events_by_room:
-            txn.call_after(
-                self.get_latest_event_ids_in_room.invalidate, (room_id,)
-            )
-
     def get_forward_extremeties_for_room(self, room_id, stream_ordering):
         # We want to make the cache more effective, so we clamp to the last
         # change before the given ordering.
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 0d6519f30d..295f2522b5 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -279,6 +279,7 @@ class EventsStore(SQLBaseStore):
                 # We can't easily parallelize these since different chunks
                 # might contain the same event. :(
 
+                new_forward_extremeties = {}
                 current_state_for_room = {}
                 if not backfilled:
                     # Work out the new "current state" for each room.
@@ -296,20 +297,16 @@ class EventsStore(SQLBaseStore):
                         latest_event_ids = yield self.get_latest_event_ids_in_room(
                             room_id
                         )
-                        new_latest_event_ids = set(latest_event_ids)
-                        for event, ctx in ev_ctx_rm:
-                            if event.internal_metadata.is_outlier():
-                                continue
-
-                            new_latest_event_ids.difference_update(
-                                e_id for e_id, _ in event.prev_events
-                            )
-                            new_latest_event_ids.add(event.event_id)
+                        new_latest_event_ids = yield self._calculate_new_extremeties(
+                            room_id, [ev for ev, _ in ev_ctx_rm]
+                        )
 
                         if new_latest_event_ids == set(latest_event_ids):
                             # No change in extremities, so no change in state
                             continue
 
+                        new_forward_extremeties[room_id] = new_latest_event_ids
+
                         # Now we need to work out the different state sets for
                         # each state extremities
                         state_sets = []
@@ -358,10 +355,46 @@ class EventsStore(SQLBaseStore):
                     backfilled=backfilled,
                     delete_existing=delete_existing,
                     current_state_for_room=current_state_for_room,
+                    new_forward_extremeties=new_forward_extremeties,
                 )
                 persist_event_counter.inc_by(len(chunk))
 
     @defer.inlineCallbacks
+    def _calculate_new_extremeties(self, room_id, events):
+        latest_event_ids = yield self.get_latest_event_ids_in_room(
+            room_id
+        )
+        new_latest_event_ids = set(latest_event_ids)
+        new_latest_event_ids.update(
+            event.event_id for event in events
+            if not event.internal_metadata.is_outlier()
+        )
+        new_latest_event_ids.difference_update(
+            e_id
+            for event in events
+            for e_id, _ in event.prev_events
+            if not event.internal_metadata.is_outlier()
+        )
+
+        rows = yield self._simple_select_many_batch(
+            table="event_edges",
+            column="prev_event_id",
+            iterable=list(new_latest_event_ids),
+            retcols=["prev_event_id"],
+            keyvalues={
+                "room_id": room_id,
+                "is_state": False,
+            },
+            desc="_calculate_new_extremeties",
+        )
+
+        new_latest_event_ids.difference_update(
+            row["prev_event_id"] for row in rows
+        )
+
+        defer.returnValue(new_latest_event_ids)
+
+    @defer.inlineCallbacks
     def get_event(self, event_id, check_redacted=True,
                   get_prev_content=False, allow_rejected=False,
                   allow_none=False):
@@ -418,52 +451,9 @@ class EventsStore(SQLBaseStore):
         defer.returnValue({e.event_id: e for e in events})
 
     @log_function
-    def _persist_event_txn(self, txn, event, context, current_state, backfilled=False,
-                           delete_existing=False):
-        # We purposefully do this first since if we include a `current_state`
-        # key, we *want* to update the `current_state_events` table
-        if current_state:
-            txn.call_after(self._get_current_state_for_key.invalidate_all)
-            txn.call_after(self.get_rooms_for_user.invalidate_all)
-            txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
-
-            # Add an entry to the current_state_resets table to record the point
-            # where we clobbered the current state
-            stream_order = event.internal_metadata.stream_ordering
-            self._simple_insert_txn(
-                txn,
-                table="current_state_resets",
-                values={"event_stream_ordering": stream_order}
-            )
-
-            self._simple_delete_txn(
-                txn,
-                table="current_state_events",
-                keyvalues={"room_id": event.room_id},
-            )
-
-            for s in current_state:
-                self._simple_insert_txn(
-                    txn,
-                    "current_state_events",
-                    {
-                        "event_id": s.event_id,
-                        "room_id": s.room_id,
-                        "type": s.type,
-                        "state_key": s.state_key,
-                    }
-                )
-
-        return self._persist_events_txn(
-            txn,
-            [(event, context)],
-            backfilled=backfilled,
-            delete_existing=delete_existing,
-        )
-
-    @log_function
     def _persist_events_txn(self, txn, events_and_contexts, backfilled,
-                            delete_existing=False, current_state_for_room={}):
+                            delete_existing=False, current_state_for_room={},
+                            new_forward_extremeties={}):
         """Insert some number of room events into the necessary database tables.
 
         Rejected events are only inserted into the events table, the events_json table,
@@ -473,6 +463,7 @@ class EventsStore(SQLBaseStore):
         If delete_existing is True then existing events will be purged from the
         database before insertion. This is useful when retrying due to IntegrityError.
         """
+        max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
         for room_id, current_state in current_state_for_room.iteritems():
             txn.call_after(self._get_current_state_for_key.invalidate_all)
             txn.call_after(self.get_rooms_for_user.invalidate_all)
@@ -480,11 +471,10 @@ class EventsStore(SQLBaseStore):
 
             # Add an entry to the current_state_resets table to record the point
             # where we clobbered the current state
-            stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
             self._simple_insert_txn(
                 txn,
                 table="current_state_resets",
-                values={"event_stream_ordering": stream_order}
+                values={"event_stream_ordering": max_stream_order}
             )
 
             self._simple_delete_txn(
@@ -507,6 +497,46 @@ class EventsStore(SQLBaseStore):
                 ],
             )
 
+        for room_id, new_extrem in new_forward_extremeties.items():
+            self._simple_delete_txn(
+                txn,
+                table="event_forward_extremities",
+                keyvalues={"room_id": room_id},
+            )
+            txn.call_after(
+                self.get_latest_event_ids_in_room.invalidate, (room_id,)
+            )
+
+        self._simple_insert_many_txn(
+            txn,
+            table="event_forward_extremities",
+            values=[
+                {
+                    "event_id": ev_id,
+                    "room_id": room_id,
+                }
+                for room_id, new_extrem in new_forward_extremeties.items()
+                for ev_id in new_extrem
+            ],
+        )
+        # We now insert into stream_ordering_to_exterm a mapping from room_id,
+        # new stream_ordering to new forward extremeties in the room.
+        # This allows us to later efficiently look up the forward extremeties
+        # for a room before a given stream_ordering
+        self._simple_insert_many_txn(
+            txn,
+            table="stream_ordering_to_exterm",
+            values=[
+                {
+                    "room_id": room_id,
+                    "event_id": event_id,
+                    "stream_ordering": max_stream_order,
+                }
+                for room_id, new_extrem in new_forward_extremeties.items()
+                for event_id in new_extrem
+            ]
+        )
+
         # Ensure that we don't have the same event twice.
         # Pick the earliest non-outlier if there is one, else the earliest one.
         new_events_and_contexts = OrderedDict()