diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 78fbdcdee8..4a164834d9 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -25,7 +25,7 @@ from prometheus_client import Counter, Histogram
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
-from synapse.events import FrozenEvent
+from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -192,12 +192,11 @@ class EventsPersistenceStorage(object):
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()
- @defer.inlineCallbacks
- def persist_events(
+ async def persist_events(
self,
- events_and_contexts: List[Tuple[FrozenEvent, EventContext]],
+ events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool = False,
- ):
+ ) -> int:
"""
Write events to the database
Args:
@@ -207,7 +206,7 @@ class EventsPersistenceStorage(object):
which might update the current state etc.
Returns:
- Deferred[int]: the stream ordering of the latest persisted event
+ the stream ordering of the latest persisted event
"""
partitioned = {}
for event, ctx in events_and_contexts:
@@ -223,22 +222,19 @@ class EventsPersistenceStorage(object):
for room_id in partitioned:
self._maybe_start_persisting(room_id)
- yield make_deferred_yieldable(
+ await make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
- max_persisted_id = yield self.main_store.get_current_events_token()
-
- return max_persisted_id
+ return self.main_store.get_current_events_token()
- @defer.inlineCallbacks
- def persist_event(
- self, event: FrozenEvent, context: EventContext, backfilled: bool = False
- ):
+ async def persist_event(
+ self, event: EventBase, context: EventContext, backfilled: bool = False
+ ) -> Tuple[int, int]:
"""
Returns:
- Deferred[Tuple[int, int]]: the stream ordering of ``event``,
- and the stream ordering of the latest persisted event
+ The stream ordering of `event`, and the stream ordering of the
+ latest persisted event
"""
deferred = self._event_persist_queue.add_to_queue(
event.room_id, [(event, context)], backfilled=backfilled
@@ -246,9 +242,9 @@ class EventsPersistenceStorage(object):
self._maybe_start_persisting(event.room_id)
- yield make_deferred_yieldable(deferred)
+ await make_deferred_yieldable(deferred)
- max_persisted_id = yield self.main_store.get_current_events_token()
+ max_persisted_id = self.main_store.get_current_events_token()
return (event.internal_metadata.stream_ordering, max_persisted_id)
def _maybe_start_persisting(self, room_id: str):
@@ -262,7 +258,7 @@ class EventsPersistenceStorage(object):
async def _persist_events(
self,
- events_and_contexts: List[Tuple[FrozenEvent, EventContext]],
+ events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool = False,
):
"""Calculates the change to current state and forward extremities, and
@@ -439,7 +435,7 @@ class EventsPersistenceStorage(object):
async def _calculate_new_extremities(
self,
room_id: str,
- event_contexts: List[Tuple[FrozenEvent, EventContext]],
+ event_contexts: List[Tuple[EventBase, EventContext]],
latest_event_ids: List[str],
):
"""Calculates the new forward extremities for a room given events to
@@ -497,7 +493,7 @@ class EventsPersistenceStorage(object):
async def _get_new_state_after_events(
self,
room_id: str,
- events_context: List[Tuple[FrozenEvent, EventContext]],
+ events_context: List[Tuple[EventBase, EventContext]],
old_latest_event_ids: Iterable[str],
new_latest_event_ids: Iterable[str],
) -> Tuple[Optional[StateMap[str]], Optional[StateMap[str]]]:
@@ -683,7 +679,7 @@ class EventsPersistenceStorage(object):
async def _is_server_still_joined(
self,
room_id: str,
- ev_ctx_rm: List[Tuple[FrozenEvent, EventContext]],
+ ev_ctx_rm: List[Tuple[EventBase, EventContext]],
delta: DeltaState,
current_state: Optional[StateMap[str]],
potentially_left_users: Set[str],
|