diff options
author | David Robertson <davidr@element.io> | 2022-05-12 15:33:50 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-12 14:33:50 +0000 |
commit | 17e1eb7749adf12d43f534c50115bbe19c809ea6 (patch) | |
tree | b2e3e373a22432dbafe5a9e65abb98900fa3bb5d /synapse/storage/persist_events.py | |
parent | add default_power_level_content_override config option. (#12618) (diff) | |
download | synapse-17e1eb7749adf12d43f534c50115bbe19c809ea6.tar.xz |
Reduce the number of "untyped defs" (#12716)
Diffstat (limited to 'synapse/storage/persist_events.py')
-rw-r--r-- | synapse/storage/persist_events.py | 21 |
1 files changed, 13 insertions, 8 deletions
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index a7f6338e05..0fc282866b 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -25,6 +25,7 @@ from typing import ( Collection, Deque, Dict, + Generator, Generic, Iterable, List, @@ -207,7 +208,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]): return res - def _handle_queue(self, room_id): + def _handle_queue(self, room_id: str) -> None: """Attempts to handle the queue for a room if not already being handled. The queue's callback will be invoked with for each item in the queue, @@ -227,7 +228,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]): self._currently_persisting_rooms.add(room_id) - async def handle_queue_loop(): + async def handle_queue_loop() -> None: try: queue = self._get_drainining_queue(room_id) for item in queue: @@ -250,15 +251,17 @@ class _EventPeristenceQueue(Generic[_PersistResult]): with PreserveLoggingContext(): item.deferred.callback(ret) finally: - queue = self._event_persist_queues.pop(room_id, None) - if queue: - self._event_persist_queues[room_id] = queue + remaining_queue = self._event_persist_queues.pop(room_id, None) + if remaining_queue: + self._event_persist_queues[room_id] = remaining_queue self._currently_persisting_rooms.discard(room_id) # set handle_queue_loop off in the background run_as_background_process("persist_events", handle_queue_loop) - def _get_drainining_queue(self, room_id): + def _get_drainining_queue( + self, room_id: str + ) -> Generator[_EventPersistQueueItem, None, None]: queue = self._event_persist_queues.setdefault(room_id, deque()) try: @@ -317,7 +320,9 @@ class EventsPersistenceStorage: for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx)) - async def enqueue(item): + async def enqueue( + item: Tuple[str, List[Tuple[EventBase, EventContext]]] + ) -> Dict[str, str]: room_id, evs_ctxs = item return await self._event_persist_queue.add_to_queue( room_id, evs_ctxs, backfilled=backfilled @@ -1102,7 +1107,7 @@ class EventsPersistenceStorage: return False - async def _handle_potentially_left_users(self, user_ids: Set[str]): + async def _handle_potentially_left_users(self, user_ids: Set[str]) -> None: """Given a set of remote users check if the server still shares a room with them. If not then mark those users' device cache as stale. """ |