summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2018-07-25 17:27:49 +0100
committerMatthew Hodgson <matthew@matrix.org>2018-07-25 17:27:49 +0100
commit2565804030b14eab5f1455e8a860c1fbd71d45fc (patch)
tree7cdac37c0d7a0611b54f32312fdf5028c0c91b44 /synapse/storage/events.py
parentflake8 (diff)
parentMerge pull request #3603 from matrix-org/erikj/handle_outliers (diff)
downloadsynapse-2565804030b14eab5f1455e8a860c1fbd71d45fc.tar.xz
Merge branch 'develop' into matthew/filter_members
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py196
1 files changed, 131 insertions, 65 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 19c05dc8d6..200f5ec95f 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,7 +19,7 @@ import logging
 from collections import OrderedDict, deque, namedtuple
 from functools import wraps
 
-from six import iteritems, itervalues
+from six import iteritems
 from six.moves import range
 
 from canonicaljson import json
@@ -142,15 +142,14 @@ class _EventPeristenceQueue(object):
             try:
                 queue = self._get_drainining_queue(room_id)
                 for item in queue:
-                    # handle_queue_loop runs in the sentinel logcontext, so
-                    # there is no need to preserve_fn when running the
-                    # callbacks on the deferred.
                     try:
                         ret = yield per_item_callback(item)
+                    except Exception:
+                        with PreserveLoggingContext():
+                            item.deferred.errback()
+                    else:
                         with PreserveLoggingContext():
                             item.deferred.callback(ret)
-                    except Exception:
-                        item.deferred.errback()
             finally:
                 queue = self._event_persist_queues.pop(room_id, None)
                 if queue:
@@ -344,11 +343,14 @@ class EventsStore(EventsWorkerStore):
                 new_forward_extremeties = {}
 
                 # map room_id->(type,state_key)->event_id tracking the full
-                # state in each room after adding these events
+                # state in each room after adding these events.
+                # This is simply used to prefill the get_current_state_ids
+                # cache
                 current_state_for_room = {}
 
-                # map room_id->(to_delete, to_insert) where each entry is
-                # a map (type,key)->event_id giving the state delta in each
+                # map room_id->(to_delete, to_insert) where to_delete is a list
+                # of type/state keys to remove from current state, and to_insert
+                # is a map (type,key)->event_id giving the state delta in each
                 # room
                 state_delta_for_room = {}
 
@@ -418,28 +420,40 @@ class EventsStore(EventsWorkerStore):
                             logger.info(
                                 "Calculating state delta for room %s", room_id,
                             )
-
                             with Measure(
-                                    self._clock,
-                                    "persist_events.get_new_state_after_events",
+                                self._clock,
+                                "persist_events.get_new_state_after_events",
                             ):
-                                current_state = yield self._get_new_state_after_events(
+                                res = yield self._get_new_state_after_events(
                                     room_id,
                                     ev_ctx_rm,
                                     latest_event_ids,
                                     new_latest_event_ids,
                                 )
-
-                            if current_state is not None:
-                                current_state_for_room[room_id] = current_state
+                                current_state, delta_ids = res
+
+                            # If either are not None then there has been a change,
+                            # and we need to work out the delta (or use that
+                            # given)
+                            if delta_ids is not None:
+                                # If there is a delta we know that we've
+                                # only added or replaced state, never
+                                # removed keys entirely.
+                                state_delta_for_room[room_id] = ([], delta_ids)
+                            elif current_state is not None:
                                 with Measure(
-                                        self._clock,
-                                        "persist_events.calculate_state_delta",
+                                    self._clock,
+                                    "persist_events.calculate_state_delta",
                                 ):
                                     delta = yield self._calculate_state_delta(
                                         room_id, current_state,
                                     )
-                                    state_delta_for_room[room_id] = delta
+                                state_delta_for_room[room_id] = delta
+
+                            # If we have the current_state then lets prefill
+                            # the cache with it.
+                            if current_state is not None:
+                                current_state_for_room[room_id] = current_state
 
                 yield self.runInteraction(
                     "persist_events",
@@ -538,9 +552,15 @@ class EventsStore(EventsWorkerStore):
                 the new forward extremities for the room.
 
         Returns:
-            Deferred[dict[(str,str), str]|None]:
-                None if there are no changes to the room state, or
-                a dict of (type, state_key) -> event_id].
+            Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]:
+            Returns a tuple of two state maps, the first being the full new current
+            state and the second being the delta to the existing current state.
+            If both are None then there has been no change.
+
+            If there has been a change then we only return the delta if its
+            already been calculated. Conversely if we do know the delta then
+            the new current state is only returned if we've already calculated
+            it.
         """
 
         if not new_latest_event_ids:
@@ -548,13 +568,19 @@ class EventsStore(EventsWorkerStore):
 
         # map from state_group to ((type, key) -> event_id) state map
         state_groups_map = {}
+
+        # Map from (prev state group, new state group) -> delta state dict
+        state_group_deltas = {}
+
         for ev, ctx in events_context:
             if ctx.state_group is None:
-                # I don't think this can happen, but let's double-check
-                raise Exception(
-                    "Context for new extremity event %s has no state "
-                    "group" % (ev.event_id, ),
-                )
+                # This should only happen for outlier events.
+                if not ev.internal_metadata.is_outlier():
+                    raise Exception(
+                        "Context for new event %s has no state "
+                        "group" % (ev.event_id, ),
+                    )
+                continue
 
             if ctx.state_group in state_groups_map:
                 continue
@@ -566,6 +592,9 @@ class EventsStore(EventsWorkerStore):
             if current_state_ids is not None:
                 state_groups_map[ctx.state_group] = current_state_ids
 
+            if ctx.prev_group:
+                state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids
+
         # We need to map the event_ids to their state groups. First, let's
         # check if the event is one we're persisting, in which case we can
         # pull the state group from its context.
@@ -579,7 +608,7 @@ class EventsStore(EventsWorkerStore):
         for event_id in new_latest_event_ids:
             # First search in the list of new events we're adding.
             for ev, ctx in events_context:
-                if event_id == ev.event_id:
+                if event_id == ev.event_id and ctx.state_group is not None:
                     event_id_to_state_group[event_id] = ctx.state_group
                     break
             else:
@@ -607,7 +636,26 @@ class EventsStore(EventsWorkerStore):
         # If they old and new groups are the same then we don't need to do
         # anything.
         if old_state_groups == new_state_groups:
-            return
+            defer.returnValue((None, None))
+
+        if len(new_state_groups) == 1 and len(old_state_groups) == 1:
+            # If we're going from one state group to another, lets check if
+            # we have a delta for that transition. If we do then we can just
+            # return that.
+
+            new_state_group = next(iter(new_state_groups))
+            old_state_group = next(iter(old_state_groups))
+
+            delta_ids = state_group_deltas.get(
+                (old_state_group, new_state_group,), None
+            )
+            if delta_ids is not None:
+                # We have a delta from the existing to new current state,
+                # so lets just return that. If we happen to already have
+                # the current state in memory then lets also return that,
+                # but it doesn't matter if we don't.
+                new_state = state_groups_map.get(new_state_group)
+                defer.returnValue((new_state, delta_ids))
 
         # Now that we have calculated new_state_groups we need to get
         # their state IDs so we can resolve to a single state set.
@@ -619,7 +667,7 @@ class EventsStore(EventsWorkerStore):
         if len(new_state_groups) == 1:
             # If there is only one state group, then we know what the current
             # state is.
-            defer.returnValue(state_groups_map[new_state_groups.pop()])
+            defer.returnValue((state_groups_map[new_state_groups.pop()], None))
 
         # Ok, we need to defer to the state handler to resolve our state sets.
 
@@ -638,7 +686,7 @@ class EventsStore(EventsWorkerStore):
             room_id, state_groups, events_map, get_events
         )
 
-        defer.returnValue(res.state)
+        defer.returnValue((res.state, None))
 
     @defer.inlineCallbacks
     def _calculate_state_delta(self, room_id, current_state):
@@ -647,17 +695,16 @@ class EventsStore(EventsWorkerStore):
         Assumes that we are only persisting events for one room at a time.
 
         Returns:
-            2-tuple (to_delete, to_insert) where both are state dicts,
-            i.e. (type, state_key) -> event_id. `to_delete` are the entries to
-            first be deleted from current_state_events, `to_insert` are entries
-            to insert.
+            tuple[list, dict] (to_delete, to_insert): where to_delete are the
+            type/state_keys to remove from current_state_events and `to_insert`
+            are the updates to current_state_events.
         """
         existing_state = yield self.get_current_state_ids(room_id)
 
-        to_delete = {
-            key: ev_id for key, ev_id in iteritems(existing_state)
-            if ev_id != current_state.get(key)
-        }
+        to_delete = [
+            key for key in existing_state
+            if key not in current_state
+        ]
 
         to_insert = {
             key: ev_id for key, ev_id in iteritems(current_state)
@@ -684,10 +731,10 @@ class EventsStore(EventsWorkerStore):
             delete_existing (bool): True to purge existing table rows for the
                 events from the database. This is useful when retrying due to
                 IntegrityError.
-            state_delta_for_room (dict[str, (list[str], list[str])]):
+            state_delta_for_room (dict[str, (list, dict)]):
                 The current-state delta for each room. For each room, a tuple
-                (to_delete, to_insert), being a list of event ids to be removed
-                from the current state, and a list of event ids to be added to
+                (to_delete, to_insert), being a list of type/state keys to be
+                removed from the current state, and a state set to be added to
                 the current state.
             new_forward_extremeties (dict[str, list[str]]):
                 The new forward extremities for each room. For each room, a
@@ -765,9 +812,46 @@ class EventsStore(EventsWorkerStore):
     def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
         for room_id, current_state_tuple in iteritems(state_delta_by_room):
                 to_delete, to_insert = current_state_tuple
+
+                # First we add entries to the current_state_delta_stream. We
+                # do this before updating the current_state_events table so
+                # that we can use it to calculate the `prev_event_id`. (This
+                # allows us to not have to pull out the existing state
+                # unnecessarily).
+                sql = """
+                    INSERT INTO current_state_delta_stream
+                    (stream_id, room_id, type, state_key, event_id, prev_event_id)
+                    SELECT ?, ?, ?, ?, ?, (
+                        SELECT event_id FROM current_state_events
+                        WHERE room_id = ? AND type = ? AND state_key = ?
+                    )
+                """
+                txn.executemany(sql, (
+                    (
+                        max_stream_order, room_id, etype, state_key, None,
+                        room_id, etype, state_key,
+                    )
+                    for etype, state_key in to_delete
+                    # We sanity check that we're deleting rather than updating
+                    if (etype, state_key) not in to_insert
+                ))
+                txn.executemany(sql, (
+                    (
+                        max_stream_order, room_id, etype, state_key, ev_id,
+                        room_id, etype, state_key,
+                    )
+                    for (etype, state_key), ev_id in iteritems(to_insert)
+                ))
+
+                # Now we actually update the current_state_events table
+
                 txn.executemany(
-                    "DELETE FROM current_state_events WHERE event_id = ?",
-                    [(ev_id,) for ev_id in itervalues(to_delete)],
+                    "DELETE FROM current_state_events"
+                    " WHERE room_id = ? AND type = ? AND state_key = ?",
+                    (
+                        (room_id, etype, state_key)
+                        for etype, state_key in itertools.chain(to_delete, to_insert)
+                    ),
                 )
 
                 self._simple_insert_many_txn(
@@ -784,25 +868,6 @@ class EventsStore(EventsWorkerStore):
                     ],
                 )
 
-                state_deltas = {key: None for key in to_delete}
-                state_deltas.update(to_insert)
-
-                self._simple_insert_many_txn(
-                    txn,
-                    table="current_state_delta_stream",
-                    values=[
-                        {
-                            "stream_id": max_stream_order,
-                            "room_id": room_id,
-                            "type": key[0],
-                            "state_key": key[1],
-                            "event_id": ev_id,
-                            "prev_event_id": to_delete.get(key, None),
-                        }
-                        for key, ev_id in iteritems(state_deltas)
-                    ]
-                )
-
                 txn.call_after(
                     self._curr_state_delta_stream_cache.entity_has_changed,
                     room_id, max_stream_order,
@@ -816,7 +881,8 @@ class EventsStore(EventsWorkerStore):
                 # and which we have added, then we invlidate the caches for all
                 # those users.
                 members_changed = set(
-                    state_key for ev_type, state_key in state_deltas
+                    state_key
+                    for ev_type, state_key in itertools.chain(to_delete, to_insert)
                     if ev_type == EventTypes.Member
                 )
 
@@ -1072,7 +1138,7 @@ class EventsStore(EventsWorkerStore):
         ):
             txn.executemany(
                 "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
-                [(ev.event_id,) for ev, _ in events_and_contexts]
+                [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts]
             )
 
     def _store_event_txn(self, txn, events_and_contexts):