summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py160
1 files changed, 106 insertions, 54 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index f39c8c8461..03cedf3a75 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,7 +19,7 @@ import logging
 from collections import OrderedDict, deque, namedtuple
 from functools import wraps
 
-from six import iteritems
+from six import iteritems, text_type
 from six.moves import range
 
 from canonicaljson import json
@@ -38,6 +38,7 @@ from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.event_federation import EventFederationStore
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import RoomStreamToken, get_domain_from_id
+from synapse.util import batch_iter
 from synapse.util.async_helpers import ObservableDeferred
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.util.frozenutils import frozendict_json_encoder
@@ -386,12 +387,10 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
                             )
 
                         for room_id, ev_ctx_rm in iteritems(events_by_room):
-                            # Work out new extremities by recursively adding and removing
-                            # the new events.
                             latest_event_ids = yield self.get_latest_event_ids_in_room(
                                 room_id
                             )
-                            new_latest_event_ids = yield self._calculate_new_extremeties(
+                            new_latest_event_ids = yield self._calculate_new_extremities(
                                 room_id, ev_ctx_rm, latest_event_ids
                             )
 
@@ -400,6 +399,12 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
                                 # No change in extremities, so no change in state
                                 continue
 
+                            # there should always be at least one forward extremity.
+                            # (except during the initial persistence of the send_join
+                            # results, in which case there will be no existing
+                            # extremities, so we'll `continue` above and skip this bit.)
+                            assert new_latest_event_ids, "No forward extremities left!"
+
                             new_forward_extremeties[room_id] = new_latest_event_ids
 
                             len_1 = (
@@ -517,44 +522,79 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
                     )
 
     @defer.inlineCallbacks
-    def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
-        """Calculates the new forward extremeties for a room given events to
+    def _calculate_new_extremities(self, room_id, event_contexts, latest_event_ids):
+        """Calculates the new forward extremities for a room given events to
         persist.
 
         Assumes that we are only persisting events for one room at a time.
         """
-        new_latest_event_ids = set(latest_event_ids)
-        # First, add all the new events to the list
-        new_latest_event_ids.update(
-            event.event_id for event, ctx in event_contexts
+
+        # we're only interested in new events which aren't outliers and which aren't
+        # being rejected.
+        new_events = [
+            event for event, ctx in event_contexts
             if not event.internal_metadata.is_outlier() and not ctx.rejected
+        ]
+
+        # start with the existing forward extremities
+        result = set(latest_event_ids)
+
+        # add all the new events to the list
+        result.update(
+            event.event_id for event in new_events
         )
-        # Now remove all events that are referenced by the to-be-added events
-        new_latest_event_ids.difference_update(
+
+        # Now remove all events which are prev_events of any of the new events
+        result.difference_update(
             e_id
-            for event, ctx in event_contexts
+            for event in new_events
             for e_id, _ in event.prev_events
-            if not event.internal_metadata.is_outlier() and not ctx.rejected
         )
 
-        # And finally remove any events that are referenced by previously added
-        # events.
-        rows = yield self._simple_select_many_batch(
-            table="event_edges",
-            column="prev_event_id",
-            iterable=list(new_latest_event_ids),
-            retcols=["prev_event_id"],
-            keyvalues={
-                "is_state": False,
-            },
-            desc="_calculate_new_extremeties",
-        )
+        # Finally, remove any events which are prev_events of any existing events.
+        existing_prevs = yield self._get_events_which_are_prevs(result)
+        result.difference_update(existing_prevs)
 
-        new_latest_event_ids.difference_update(
-            row["prev_event_id"] for row in rows
-        )
+        defer.returnValue(result)
+
+    @defer.inlineCallbacks
+    def _get_events_which_are_prevs(self, event_ids):
+        """Filter the supplied list of event_ids to get those which are prev_events of
+        existing (non-outlier/rejected) events.
+
+        Args:
+            event_ids (Iterable[str]): event ids to filter
+
+        Returns:
+            Deferred[List[str]]: filtered event ids
+        """
+        results = []
+
+        def _get_events(txn, batch):
+            sql = """
+            SELECT prev_event_id
+            FROM event_edges
+                INNER JOIN events USING (event_id)
+                LEFT JOIN rejections USING (event_id)
+            WHERE
+                prev_event_id IN (%s)
+                AND NOT events.outlier
+                AND rejections.event_id IS NULL
+            """ % (
+                ",".join("?" for _ in batch),
+            )
+
+            txn.execute(sql, batch)
+            results.extend(r[0] for r in txn)
+
+        for chunk in batch_iter(event_ids, 100):
+            yield self.runInteraction(
+                "_get_events_which_are_prevs",
+                _get_events,
+                chunk,
+            )
 
-        defer.returnValue(new_latest_event_ids)
+        defer.returnValue(results)
 
     @defer.inlineCallbacks
     def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids,
@@ -586,10 +626,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
             the new current state is only returned if we've already calculated
             it.
         """
-
-        if not new_latest_event_ids:
-            return
-
         # map from state_group to ((type, key) -> event_id) state map
         state_groups_map = {}
 
@@ -930,6 +966,10 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
                 )
 
                 self._invalidate_cache_and_stream(
+                    txn, self.get_room_summary, (room_id,)
+                )
+
+                self._invalidate_cache_and_stream(
                     txn, self.get_current_state_ids, (room_id,)
                 )
 
@@ -1220,7 +1260,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
                     "sender": event.sender,
                     "contains_url": (
                         "url" in event.content
-                        and isinstance(event.content["url"], basestring)
+                        and isinstance(event.content["url"], text_type)
                     ),
                 }
                 for event, _ in events_and_contexts
@@ -1529,7 +1569,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
 
                     contains_url = "url" in content
                     if contains_url:
-                        contains_url &= isinstance(content["url"], basestring)
+                        contains_url &= isinstance(content["url"], text_type)
                 except (KeyError, AttributeError):
                     # If the event is missing a necessary field then
                     # skip over it.
@@ -1886,20 +1926,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
             ")"
         )
 
-        # create an index on should_delete because later we'll be looking for
-        # the should_delete / shouldn't_delete subsets
-        txn.execute(
-            "CREATE INDEX events_to_purge_should_delete"
-            " ON events_to_purge(should_delete)",
-        )
-
-        # We do joins against events_to_purge for e.g. calculating state
-        # groups to purge, etc., so lets make an index.
-        txn.execute(
-            "CREATE INDEX events_to_purge_id"
-            " ON events_to_purge(event_id)",
-        )
-
         # First ensure that we're not about to delete all the forward extremeties
         txn.execute(
             "SELECT e.event_id, e.depth FROM events as e "
@@ -1910,9 +1936,9 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
             (room_id,)
         )
         rows = txn.fetchall()
-        max_depth = max(row[0] for row in rows)
+        max_depth = max(row[1] for row in rows)
 
-        if max_depth <= token.topological:
+        if max_depth < token.topological:
             # We need to ensure we don't delete all the events from the database
             # otherwise we wouldn't be able to send any events (due to not
             # having any backwards extremeties)
@@ -1926,19 +1952,45 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
         should_delete_params = ()
         if not delete_local_events:
             should_delete_expr += " AND event_id NOT LIKE ?"
-            should_delete_params += ("%:" + self.hs.hostname, )
+
+            # We include the parameter twice since we use the expression twice
+            should_delete_params += (
+                "%:" + self.hs.hostname,
+                "%:" + self.hs.hostname,
+            )
 
         should_delete_params += (room_id, token.topological)
 
+        # Note that we insert events that are outliers and aren't going to be
+        # deleted, as nothing will happen to them.
         txn.execute(
             "INSERT INTO events_to_purge"
             " SELECT event_id, %s"
             " FROM events AS e LEFT JOIN state_events USING (event_id)"
-            " WHERE e.room_id = ? AND topological_ordering < ?" % (
+            " WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?"
+            % (
+                should_delete_expr,
                 should_delete_expr,
             ),
             should_delete_params,
         )
+
+        # We create the indices *after* insertion as that's a lot faster.
+
+        # create an index on should_delete because later we'll be looking for
+        # the should_delete / shouldn't_delete subsets
+        txn.execute(
+            "CREATE INDEX events_to_purge_should_delete"
+            " ON events_to_purge(should_delete)",
+        )
+
+        # We do joins against events_to_purge for e.g. calculating state
+        # groups to purge, etc., so lets make an index.
+        txn.execute(
+            "CREATE INDEX events_to_purge_id"
+            " ON events_to_purge(event_id)",
+        )
+
         txn.execute(
             "SELECT event_id, should_delete FROM events_to_purge"
         )