summary refs log tree commit diff
path: root/synapse/storage/data_stores
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-01-20 18:07:20 +0000
committerGitHub <noreply@github.com>2020-01-20 18:07:20 +0000
commit0e68760078c0aac57bfaeb681d534231e191315a (patch)
treeff54f8d4f4bcb2623cc62ca77b1eb372457eaa87 /synapse/storage/data_stores
parentFixup synapse.rest to pass mypy (#6732) (diff)
downloadsynapse-0e68760078c0aac57bfaeb681d534231e191315a.tar.xz
Add a DeltaState to track changes to be made to current state (#6716)
Diffstat (limited to 'synapse/storage/data_stores')
-rw-r--r--synapse/storage/data_stores/main/events.py87
1 files changed, 43 insertions, 44 deletions
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index bb69c20448..596daf8909 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -19,6 +19,7 @@ import itertools
 import logging
 from collections import Counter as c_counter, OrderedDict, namedtuple
 from functools import wraps
+from typing import Dict, List, Tuple
 
 from six import iteritems, text_type
 from six.moves import range
@@ -41,8 +42,9 @@ from synapse.storage._base import make_in_list_sql_clause
 from synapse.storage.data_stores.main.event_federation import EventFederationStore
 from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
 from synapse.storage.data_stores.main.state import StateGroupWorkerStore
-from synapse.storage.database import Database
-from synapse.types import RoomStreamToken, get_domain_from_id
+from synapse.storage.database import Database, LoggingTransaction
+from synapse.storage.persist_events import DeltaState
+from synapse.types import RoomStreamToken, StateMap, get_domain_from_id
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.iterutils import batch_iter
@@ -148,30 +150,26 @@ class EventsStore(
     @defer.inlineCallbacks
     def _persist_events_and_state_updates(
         self,
-        events_and_contexts,
-        current_state_for_room,
-        state_delta_for_room,
-        new_forward_extremeties,
-        backfilled=False,
-        delete_existing=False,
+        events_and_contexts: List[Tuple[EventBase, EventContext]],
+        current_state_for_room: Dict[str, StateMap[str]],
+        state_delta_for_room: Dict[str, DeltaState],
+        new_forward_extremeties: Dict[str, List[str]],
+        backfilled: bool = False,
+        delete_existing: bool = False,
     ):
         """Persist a set of events alongside updates to the current state and
         forward extremities tables.
 
         Args:
-            events_and_contexts (list[(EventBase, EventContext)]):
-            current_state_for_room (dict[str, dict]): Map from room_id to the
-                current state of the room based on forward extremities
-            state_delta_for_room (dict[str, tuple]): Map from room_id to tuple
-                of `(to_delete, to_insert)` where to_delete is a list
-                of type/state keys to remove from current state, and to_insert
-                is a map (type,key)->event_id giving the state delta in each
-                room.
-            new_forward_extremities (dict[str, list[str]]): Map from room_id
-                to list of event IDs that are the new forward extremities of
-                the room.
-            backfilled (bool)
-            delete_existing (bool):
+            events_and_contexts:
+            current_state_for_room: Map from room_id to the current state of
+                the room based on forward extremities
+            state_delta_for_room: Map from room_id to the delta to apply to
+                room state
+            new_forward_extremities: Map from room_id to list of event IDs
+                that are the new forward extremities of the room.
+            backfilled
+            delete_existing
 
         Returns:
             Deferred: resolves when the events have been persisted
@@ -352,12 +350,12 @@ class EventsStore(
     @log_function
     def _persist_events_txn(
         self,
-        txn,
-        events_and_contexts,
-        backfilled,
-        delete_existing=False,
-        state_delta_for_room={},
-        new_forward_extremeties={},
+        txn: LoggingTransaction,
+        events_and_contexts: List[Tuple[EventBase, EventContext]],
+        backfilled: bool,
+        delete_existing: bool = False,
+        state_delta_for_room: Dict[str, DeltaState] = {},
+        new_forward_extremeties: Dict[str, List[str]] = {},
     ):
         """Insert some number of room events into the necessary database tables.
 
@@ -366,21 +364,16 @@ class EventsStore(
         whether the event was rejected.
 
         Args:
-            txn (twisted.enterprise.adbapi.Connection): db connection
-            events_and_contexts (list[(EventBase, EventContext)]):
-                events to persist
-            backfilled (bool): True if the events were backfilled
-            delete_existing (bool): True to purge existing table rows for the
-                events from the database. This is useful when retrying due to
+            txn
+            events_and_contexts: events to persist
+            backfilled: True if the events were backfilled
+            delete_existing True to purge existing table rows for the events
+                from the database. This is useful when retrying due to
                 IntegrityError.
-            state_delta_for_room (dict[str, (list, dict)]):
-                The current-state delta for each room. For each room, a tuple
-                (to_delete, to_insert), being a list of type/state keys to be
-                removed from the current state, and a state set to be added to
-                the current state.
-            new_forward_extremeties (dict[str, list[str]]):
-                The new forward extremities for each room. For each room, a
-                list of the event ids which are the forward extremities.
+            state_delta_for_room: The current-state delta for each room.
+            new_forward_extremetie: The new forward extremities for each room.
+                For each room, a list of the event ids which are the forward
+                extremities.
 
         """
         all_events_and_contexts = events_and_contexts
@@ -465,9 +458,15 @@ class EventsStore(
         # room_memberships, where applicable.
         self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
 
-    def _update_current_state_txn(self, txn, state_delta_by_room, stream_id):
-        for room_id, current_state_tuple in iteritems(state_delta_by_room):
-            to_delete, to_insert = current_state_tuple
+    def _update_current_state_txn(
+        self,
+        txn: LoggingTransaction,
+        state_delta_by_room: Dict[str, DeltaState],
+        stream_id: int,
+    ):
+        for room_id, delta_state in iteritems(state_delta_by_room):
+            to_delete = delta_state.to_delete
+            to_insert = delta_state.to_insert
 
             # First we add entries to the current_state_delta_stream. We
             # do this before updating the current_state_events table so