summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py91
1 files changed, 55 insertions, 36 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 19c05dc8d6..87d0b78a2d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,7 +19,7 @@ import logging
 from collections import OrderedDict, deque, namedtuple
 from functools import wraps
 
-from six import iteritems, itervalues
+from six import iteritems
 from six.moves import range
 
 from canonicaljson import json
@@ -347,8 +347,9 @@ class EventsStore(EventsWorkerStore):
                 # state in each room after adding these events
                 current_state_for_room = {}
 
-                # map room_id->(to_delete, to_insert) where each entry is
-                # a map (type,key)->event_id giving the state delta in each
+                # map room_id->(to_delete, to_insert) where to_delete is a list
+                # of type/state keys to remove from current state, and to_insert
+                # is a map (type,key)->event_id giving the state delta in each
                 # room
                 state_delta_for_room = {}
 
@@ -647,17 +648,16 @@ class EventsStore(EventsWorkerStore):
         Assumes that we are only persisting events for one room at a time.
 
         Returns:
-            2-tuple (to_delete, to_insert) where both are state dicts,
-            i.e. (type, state_key) -> event_id. `to_delete` are the entries to
-            first be deleted from current_state_events, `to_insert` are entries
-            to insert.
+            tuple[list, dict] (to_delete, to_insert): where to_delete are the
+            type/state_keys to remove from current_state_events and `to_insert`
+            are the updates to current_state_events.
         """
         existing_state = yield self.get_current_state_ids(room_id)
 
-        to_delete = {
-            key: ev_id for key, ev_id in iteritems(existing_state)
-            if ev_id != current_state.get(key)
-        }
+        to_delete = [
+            key for key, ev_id in iteritems(existing_state)
+            if key not in current_state
+        ]
 
         to_insert = {
             key: ev_id for key, ev_id in iteritems(current_state)
@@ -684,10 +684,10 @@ class EventsStore(EventsWorkerStore):
             delete_existing (bool): True to purge existing table rows for the
                 events from the database. This is useful when retrying due to
                 IntegrityError.
-            state_delta_for_room (dict[str, (list[str], list[str])]):
+            state_delta_for_room (dict[str, (list, dict)]):
                 The current-state delta for each room. For each room, a tuple
-                (to_delete, to_insert), being a list of event ids to be removed
-                from the current state, and a list of event ids to be added to
+                (to_delete, to_insert), being a list of type/state keys to be
+                removed from the current state, and a state set to be added to
                 the current state.
             new_forward_extremeties (dict[str, list[str]]):
                 The new forward extremities for each room. For each room, a
@@ -765,9 +765,46 @@ class EventsStore(EventsWorkerStore):
     def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
         for room_id, current_state_tuple in iteritems(state_delta_by_room):
                 to_delete, to_insert = current_state_tuple
+
+                # First we add entries to the current_state_delta_stream. We
+                # do this before updating the current_state_events table so
+                # that we can use it to calculate the `prev_event_id`. (This
+                # allows us to not have to pull out the existing state
+                # unnecessarily).
+                sql = """
+                    INSERT INTO current_state_delta_stream
+                    (stream_id, room_id, type, state_key, event_id, prev_event_id)
+                    SELECT ?, ?, ?, ?, ?, (
+                        SELECT event_id FROM current_state_events
+                        WHERE room_id = ? AND type = ? AND state_key = ?
+                    )
+                """
+                txn.executemany(sql, (
+                    (
+                        max_stream_order, room_id, etype, state_key, None,
+                        room_id, etype, state_key,
+                    )
+                    for etype, state_key in to_delete
+                    # We sanity check that we're deleting rather than updating
+                    if (etype, state_key) not in to_insert
+                ))
+                txn.executemany(sql, (
+                    (
+                        max_stream_order, room_id, etype, state_key, ev_id,
+                        room_id, etype, state_key,
+                    )
+                    for (etype, state_key), ev_id in iteritems(to_insert)
+                ))
+
+                # Now we actually update the current_state_events table
+
                 txn.executemany(
-                    "DELETE FROM current_state_events WHERE event_id = ?",
-                    [(ev_id,) for ev_id in itervalues(to_delete)],
+                    "DELETE FROM current_state_events"
+                    " WHERE room_id = ? AND type = ? AND state_key = ?",
+                    (
+                        (room_id, etype, state_key)
+                        for etype, state_key in itertools.chain(to_delete, to_insert)
+                    ),
                 )
 
                 self._simple_insert_many_txn(
@@ -784,25 +821,6 @@ class EventsStore(EventsWorkerStore):
                     ],
                 )
 
-                state_deltas = {key: None for key in to_delete}
-                state_deltas.update(to_insert)
-
-                self._simple_insert_many_txn(
-                    txn,
-                    table="current_state_delta_stream",
-                    values=[
-                        {
-                            "stream_id": max_stream_order,
-                            "room_id": room_id,
-                            "type": key[0],
-                            "state_key": key[1],
-                            "event_id": ev_id,
-                            "prev_event_id": to_delete.get(key, None),
-                        }
-                        for key, ev_id in iteritems(state_deltas)
-                    ]
-                )
-
                 txn.call_after(
                     self._curr_state_delta_stream_cache.entity_has_changed,
                     room_id, max_stream_order,
@@ -816,7 +834,8 @@ class EventsStore(EventsWorkerStore):
                 # and which we have added, then we invlidate the caches for all
                 # those users.
                 members_changed = set(
-                    state_key for ev_type, state_key in state_deltas
+                    state_key
+                    for ev_type, state_key in itertools.chain(to_delete, to_insert)
                     if ev_type == EventTypes.Member
                 )