diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index a7f6338e05..0fc282866b 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -25,6 +25,7 @@ from typing import (
Collection,
Deque,
Dict,
+ Generator,
Generic,
Iterable,
List,
@@ -207,7 +208,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
return res
- def _handle_queue(self, room_id):
+ def _handle_queue(self, room_id: str) -> None:
"""Attempts to handle the queue for a room if not already being handled.
The queue's callback will be invoked with for each item in the queue,
@@ -227,7 +228,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
self._currently_persisting_rooms.add(room_id)
- async def handle_queue_loop():
+ async def handle_queue_loop() -> None:
try:
queue = self._get_drainining_queue(room_id)
for item in queue:
@@ -250,15 +251,17 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
with PreserveLoggingContext():
item.deferred.callback(ret)
finally:
- queue = self._event_persist_queues.pop(room_id, None)
- if queue:
- self._event_persist_queues[room_id] = queue
+ remaining_queue = self._event_persist_queues.pop(room_id, None)
+ if remaining_queue:
+ self._event_persist_queues[room_id] = remaining_queue
self._currently_persisting_rooms.discard(room_id)
# set handle_queue_loop off in the background
run_as_background_process("persist_events", handle_queue_loop)
- def _get_drainining_queue(self, room_id):
+ def _get_drainining_queue(
+ self, room_id: str
+ ) -> Generator[_EventPersistQueueItem, None, None]:
queue = self._event_persist_queues.setdefault(room_id, deque())
try:
@@ -317,7 +320,9 @@ class EventsPersistenceStorage:
for event, ctx in events_and_contexts:
partitioned.setdefault(event.room_id, []).append((event, ctx))
- async def enqueue(item):
+ async def enqueue(
+ item: Tuple[str, List[Tuple[EventBase, EventContext]]]
+ ) -> Dict[str, str]:
room_id, evs_ctxs = item
return await self._event_persist_queue.add_to_queue(
room_id, evs_ctxs, backfilled=backfilled
@@ -1102,7 +1107,7 @@ class EventsPersistenceStorage:
return False
- async def _handle_potentially_left_users(self, user_ids: Set[str]):
+ async def _handle_potentially_left_users(self, user_ids: Set[str]) -> None:
"""Given a set of remote users check if the server still shares a room with
them. If not then mark those users' device cache as stale.
"""
|