summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py123
1 files changed, 92 insertions, 31 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 98707d40ee..c4aeb48800 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -207,6 +207,18 @@ class EventsStore(SQLBaseStore):
             where_clause="contains_url = true AND outlier = false",
         )
 
+        # an event_id index on event_search is useful for the purge_history
+        # api. Plus it means we get to enforce some integrity with a UNIQUE
+        # clause
+        self.register_background_index_update(
+            "event_search_event_id_idx",
+            index_name="event_search_event_id_idx",
+            table="event_search",
+            columns=["event_id"],
+            unique=True,
+            psql_only=True,
+        )
+
         self._event_persist_queue = _EventPeristenceQueue()
 
     def persist_events(self, events_and_contexts, backfilled=False):
@@ -387,6 +399,11 @@ class EventsStore(SQLBaseStore):
 
                     event_counter.inc(event.type, origin_type, origin_entity)
 
+                for room_id, (_, _, new_state) in current_state_for_room.iteritems():
+                    self.get_current_state_ids.prefill(
+                        (room_id, ), new_state
+                    )
+
     @defer.inlineCallbacks
     def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
         """Calculates the new forward extremeties for a room given events to
@@ -435,10 +452,10 @@ class EventsStore(SQLBaseStore):
         Assumes that we are only persisting events for one room at a time.
 
         Returns:
-            2-tuple (to_delete, to_insert) where both are state dicts, i.e.
-            (type, state_key) -> event_id. `to_delete` are the entries to
+            3-tuple (to_delete, to_insert, new_state) where both are state dicts,
+            i.e. (type, state_key) -> event_id. `to_delete` are the entries to
             first be deleted from current_state_events, `to_insert` are entries
-            to insert.
+            to insert. `new_state` is the full set of state.
             May return None if there are no changes to be applied.
         """
         # Now we need to work out the different state sets for
@@ -545,7 +562,7 @@ class EventsStore(SQLBaseStore):
             if ev_id in events_to_insert
         }
 
-        defer.returnValue((to_delete, to_insert))
+        defer.returnValue((to_delete, to_insert, current_state))
 
     @defer.inlineCallbacks
     def get_event(self, event_id, check_redacted=True,
@@ -698,7 +715,7 @@ class EventsStore(SQLBaseStore):
 
     def _update_current_state_txn(self, txn, state_delta_by_room):
         for room_id, current_state_tuple in state_delta_by_room.iteritems():
-                to_delete, to_insert = current_state_tuple
+                to_delete, to_insert, _ = current_state_tuple
                 txn.executemany(
                     "DELETE FROM current_state_events WHERE event_id = ?",
                     [(ev_id,) for ev_id in to_delete.itervalues()],
@@ -1343,11 +1360,26 @@ class EventsStore(SQLBaseStore):
     def _invalidate_get_event_cache(self, event_id):
             self._get_event_cache.invalidate((event_id,))
 
-    def _get_events_from_cache(self, events, allow_rejected):
+    def _get_events_from_cache(self, events, allow_rejected, update_metrics=True):
+        """Fetch events from the caches
+
+        Args:
+            events (list(str)): list of event_ids to fetch
+            allow_rejected (bool): Whether to teturn events that were rejected
+            update_metrics (bool): Whether to update the cache hit ratio metrics
+
+        Returns:
+            dict of event_id -> _EventCacheEntry for each event_id in cache. If
+            allow_rejected is `False` then there will still be an entry but it
+            will be `None`
+        """
         event_map = {}
 
         for event_id in events:
-            ret = self._get_event_cache.get((event_id,), None)
+            ret = self._get_event_cache.get(
+                (event_id,), None,
+                update_metrics=update_metrics,
+            )
             if not ret:
                 continue
 
@@ -2007,6 +2039,8 @@ class EventsStore(SQLBaseStore):
                 400, "topological_ordering is greater than forward extremeties"
             )
 
+        logger.debug("[purge] looking for events to delete")
+
         txn.execute(
             "SELECT event_id, state_key FROM events"
             " LEFT JOIN state_events USING (room_id, event_id)"
@@ -2015,9 +2049,19 @@ class EventsStore(SQLBaseStore):
         )
         event_rows = txn.fetchall()
 
+        to_delete = [
+            (event_id,) for event_id, state_key in event_rows
+            if state_key is None and not self.hs.is_mine_id(event_id)
+        ]
+        logger.info(
+            "[purge] found %i events before cutoff, of which %i are remote"
+            " non-state events to delete", len(event_rows), len(to_delete))
+
         for event_id, state_key in event_rows:
             txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
 
+        logger.debug("[purge] Finding new backward extremities")
+
         # We calculate the new entries for the backward extremeties by finding
         # all events that point to events that are to be purged
         txn.execute(
@@ -2030,6 +2074,8 @@ class EventsStore(SQLBaseStore):
         )
         new_backwards_extrems = txn.fetchall()
 
+        logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems)
+
         txn.execute(
             "DELETE FROM event_backward_extremities WHERE room_id = ?",
             (room_id,)
@@ -2044,6 +2090,8 @@ class EventsStore(SQLBaseStore):
             ]
         )
 
+        logger.debug("[purge] finding redundant state groups")
+
         # Get all state groups that are only referenced by events that are
         # to be deleted.
         txn.execute(
@@ -2059,15 +2107,20 @@ class EventsStore(SQLBaseStore):
         )
 
         state_rows = txn.fetchall()
-        state_groups_to_delete = [sg for sg, in state_rows]
+        logger.debug("[purge] found %i redundant state groups", len(state_rows))
+
+        # make a set of the redundant state groups, so that we can look them up
+        # efficiently
+        state_groups_to_delete = set([sg for sg, in state_rows])
 
         # Now we get all the state groups that rely on these state groups
-        new_state_edges = []
-        chunks = [
-            state_groups_to_delete[i:i + 100]
-            for i in xrange(0, len(state_groups_to_delete), 100)
-        ]
-        for chunk in chunks:
+        logger.debug("[purge] finding state groups which depend on redundant"
+                     " state groups")
+        remaining_state_groups = []
+        for i in xrange(0, len(state_rows), 100):
+            chunk = [sg for sg, in state_rows[i:i + 100]]
+            # look for state groups whose prev_state_group is one we are about
+            # to delete
             rows = self._simple_select_many_txn(
                 txn,
                 table="state_group_edges",
@@ -2076,21 +2129,28 @@ class EventsStore(SQLBaseStore):
                 retcols=["state_group"],
                 keyvalues={},
             )
-            new_state_edges.extend(row["state_group"] for row in rows)
+            remaining_state_groups.extend(
+                row["state_group"] for row in rows
+
+                # exclude state groups we are about to delete: no point in
+                # updating them
+                if row["state_group"] not in state_groups_to_delete
+            )
 
-        # Now we turn the state groups that reference to-be-deleted state groups
-        # to non delta versions.
-        for new_state_edge in new_state_edges:
+        # Now we turn the state groups that reference to-be-deleted state
+        # groups to non delta versions.
+        for sg in remaining_state_groups:
+            logger.debug("[purge] de-delta-ing remaining state group %s", sg)
             curr_state = self._get_state_groups_from_groups_txn(
-                txn, [new_state_edge], types=None
+                txn, [sg], types=None
             )
-            curr_state = curr_state[new_state_edge]
+            curr_state = curr_state[sg]
 
             self._simple_delete_txn(
                 txn,
                 table="state_groups_state",
                 keyvalues={
-                    "state_group": new_state_edge,
+                    "state_group": sg,
                 }
             )
 
@@ -2098,7 +2158,7 @@ class EventsStore(SQLBaseStore):
                 txn,
                 table="state_group_edges",
                 keyvalues={
-                    "state_group": new_state_edge,
+                    "state_group": sg,
                 }
             )
 
@@ -2107,7 +2167,7 @@ class EventsStore(SQLBaseStore):
                 table="state_groups_state",
                 values=[
                     {
-                        "state_group": new_state_edge,
+                        "state_group": sg,
                         "room_id": room_id,
                         "type": key[0],
                         "state_key": key[1],
@@ -2117,6 +2177,7 @@ class EventsStore(SQLBaseStore):
                 ],
             )
 
+        logger.debug("[purge] removing redundant state groups")
         txn.executemany(
             "DELETE FROM state_groups_state WHERE state_group = ?",
             state_rows
@@ -2125,22 +2186,21 @@ class EventsStore(SQLBaseStore):
             "DELETE FROM state_groups WHERE id = ?",
             state_rows
         )
+
         # Delete all non-state
+        logger.debug("[purge] removing events from event_to_state_groups")
         txn.executemany(
             "DELETE FROM event_to_state_groups WHERE event_id = ?",
             [(event_id,) for event_id, _ in event_rows]
         )
 
+        logger.debug("[purge] updating room_depth")
         txn.execute(
             "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
             (topological_ordering, room_id,)
         )
 
         # Delete all remote non-state events
-        to_delete = [
-            (event_id,) for event_id, state_key in event_rows
-            if state_key is None and not self.hs.is_mine_id(event_id)
-        ]
         for table in (
             "events",
             "event_json",
@@ -2156,16 +2216,15 @@ class EventsStore(SQLBaseStore):
             "event_signatures",
             "rejections",
         ):
+            logger.debug("[purge] removing remote non-state events from %s", table)
+
             txn.executemany(
                 "DELETE FROM %s WHERE event_id = ?" % (table,),
                 to_delete
             )
 
-        txn.executemany(
-            "DELETE FROM events WHERE event_id = ?",
-            to_delete
-        )
         # Mark all state and own events as outliers
+        logger.debug("[purge] marking remaining events as outliers")
         txn.executemany(
             "UPDATE events SET outlier = ?"
             " WHERE event_id = ?",
@@ -2175,6 +2234,8 @@ class EventsStore(SQLBaseStore):
             ]
         )
 
+        logger.info("[purge] done")
+
     @defer.inlineCallbacks
     def is_event_after(self, event_id1, event_id2):
         """Returns True if event_id1 is after event_id2 in the stream