diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index bb69c20448..596daf8909 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -19,6 +19,7 @@ import itertools
import logging
from collections import Counter as c_counter, OrderedDict, namedtuple
from functools import wraps
+from typing import Dict, List, Tuple
from six import iteritems, text_type
from six.moves import range
@@ -41,8 +42,9 @@ from synapse.storage._base import make_in_list_sql_clause
from synapse.storage.data_stores.main.event_federation import EventFederationStore
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
from synapse.storage.data_stores.main.state import StateGroupWorkerStore
-from synapse.storage.database import Database
-from synapse.types import RoomStreamToken, get_domain_from_id
+from synapse.storage.database import Database, LoggingTransaction
+from synapse.storage.persist_events import DeltaState
+from synapse.types import RoomStreamToken, StateMap, get_domain_from_id
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.iterutils import batch_iter
@@ -148,30 +150,26 @@ class EventsStore(
@defer.inlineCallbacks
def _persist_events_and_state_updates(
self,
- events_and_contexts,
- current_state_for_room,
- state_delta_for_room,
- new_forward_extremeties,
- backfilled=False,
- delete_existing=False,
+ events_and_contexts: List[Tuple[EventBase, EventContext]],
+ current_state_for_room: Dict[str, StateMap[str]],
+ state_delta_for_room: Dict[str, DeltaState],
+ new_forward_extremeties: Dict[str, List[str]],
+ backfilled: bool = False,
+ delete_existing: bool = False,
):
"""Persist a set of events alongside updates to the current state and
forward extremities tables.
Args:
- events_and_contexts (list[(EventBase, EventContext)]):
- current_state_for_room (dict[str, dict]): Map from room_id to the
- current state of the room based on forward extremities
- state_delta_for_room (dict[str, tuple]): Map from room_id to tuple
- of `(to_delete, to_insert)` where to_delete is a list
- of type/state keys to remove from current state, and to_insert
- is a map (type,key)->event_id giving the state delta in each
- room.
- new_forward_extremities (dict[str, list[str]]): Map from room_id
- to list of event IDs that are the new forward extremities of
- the room.
- backfilled (bool)
- delete_existing (bool):
+ events_and_contexts:
+ current_state_for_room: Map from room_id to the current state of
+ the room based on forward extremities
+ state_delta_for_room: Map from room_id to the delta to apply to
+ room state
+ new_forward_extremities: Map from room_id to list of event IDs
+ that are the new forward extremities of the room.
+ backfilled
+ delete_existing
Returns:
Deferred: resolves when the events have been persisted
@@ -352,12 +350,12 @@ class EventsStore(
@log_function
def _persist_events_txn(
self,
- txn,
- events_and_contexts,
- backfilled,
- delete_existing=False,
- state_delta_for_room={},
- new_forward_extremeties={},
+ txn: LoggingTransaction,
+ events_and_contexts: List[Tuple[EventBase, EventContext]],
+ backfilled: bool,
+ delete_existing: bool = False,
+ state_delta_for_room: Dict[str, DeltaState] = {},
+ new_forward_extremeties: Dict[str, List[str]] = {},
):
"""Insert some number of room events into the necessary database tables.
@@ -366,21 +364,16 @@ class EventsStore(
whether the event was rejected.
Args:
- txn (twisted.enterprise.adbapi.Connection): db connection
- events_and_contexts (list[(EventBase, EventContext)]):
- events to persist
- backfilled (bool): True if the events were backfilled
- delete_existing (bool): True to purge existing table rows for the
- events from the database. This is useful when retrying due to
+ txn
+ events_and_contexts: events to persist
+ backfilled: True if the events were backfilled
+ delete_existing True to purge existing table rows for the events
+ from the database. This is useful when retrying due to
IntegrityError.
- state_delta_for_room (dict[str, (list, dict)]):
- The current-state delta for each room. For each room, a tuple
- (to_delete, to_insert), being a list of type/state keys to be
- removed from the current state, and a state set to be added to
- the current state.
- new_forward_extremeties (dict[str, list[str]]):
- The new forward extremities for each room. For each room, a
- list of the event ids which are the forward extremities.
+ state_delta_for_room: The current-state delta for each room.
+ new_forward_extremetie: The new forward extremities for each room.
+ For each room, a list of the event ids which are the forward
+ extremities.
"""
all_events_and_contexts = events_and_contexts
@@ -465,9 +458,15 @@ class EventsStore(
# room_memberships, where applicable.
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
- def _update_current_state_txn(self, txn, state_delta_by_room, stream_id):
- for room_id, current_state_tuple in iteritems(state_delta_by_room):
- to_delete, to_insert = current_state_tuple
+ def _update_current_state_txn(
+ self,
+ txn: LoggingTransaction,
+ state_delta_by_room: Dict[str, DeltaState],
+ stream_id: int,
+ ):
+ for room_id, delta_state in iteritems(state_delta_by_room):
+ to_delete = delta_state.to_delete
+ to_insert = delta_state.to_insert
# First we add entries to the current_state_delta_stream. We
# do this before updating the current_state_events table so
|