diff options
Diffstat (limited to 'synapse/storage/persist_events.py')
-rw-r--r-- | synapse/storage/persist_events.py | 21 |
1 files changed, 13 insertions, 8 deletions
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index a7f6338e05..0fc282866b 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -25,6 +25,7 @@ from typing import ( Collection, Deque, Dict, + Generator, Generic, Iterable, List, @@ -207,7 +208,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]): return res - def _handle_queue(self, room_id): + def _handle_queue(self, room_id: str) -> None: """Attempts to handle the queue for a room if not already being handled. The queue's callback will be invoked with for each item in the queue, @@ -227,7 +228,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]): self._currently_persisting_rooms.add(room_id) - async def handle_queue_loop(): + async def handle_queue_loop() -> None: try: queue = self._get_drainining_queue(room_id) for item in queue: @@ -250,15 +251,17 @@ class _EventPeristenceQueue(Generic[_PersistResult]): with PreserveLoggingContext(): item.deferred.callback(ret) finally: - queue = self._event_persist_queues.pop(room_id, None) - if queue: - self._event_persist_queues[room_id] = queue + remaining_queue = self._event_persist_queues.pop(room_id, None) + if remaining_queue: + self._event_persist_queues[room_id] = remaining_queue self._currently_persisting_rooms.discard(room_id) # set handle_queue_loop off in the background run_as_background_process("persist_events", handle_queue_loop) - def _get_drainining_queue(self, room_id): + def _get_drainining_queue( + self, room_id: str + ) -> Generator[_EventPersistQueueItem, None, None]: queue = self._event_persist_queues.setdefault(room_id, deque()) try: @@ -317,7 +320,9 @@ class EventsPersistenceStorage: for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx)) - async def enqueue(item): + async def enqueue( + item: Tuple[str, List[Tuple[EventBase, EventContext]]] + ) -> Dict[str, str]: room_id, evs_ctxs = item return await self._event_persist_queue.add_to_queue( room_id, evs_ctxs, backfilled=backfilled @@ -1102,7 +1107,7 @@ class EventsPersistenceStorage: return False - async def _handle_potentially_left_users(self, user_ids: Set[str]): + async def _handle_potentially_left_users(self, user_ids: Set[str]) -> None: """Given a set of remote users check if the server still shares a room with them. If not then mark those users' device cache as stale. """ |