diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 4ab962a84b..841c8815b0 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -26,18 +26,22 @@ import contextlib
import logging
from bisect import bisect
from contextlib import contextmanager
+from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
+ Awaitable,
Callable,
Collection,
Dict,
FrozenSet,
+ Generator,
Iterable,
List,
Optional,
Set,
Tuple,
+ Type,
Union,
)
@@ -240,7 +244,7 @@ class BasePresenceHandler(abc.ABC):
"""
@abc.abstractmethod
- async def bump_presence_active_time(self, user: UserID):
+ async def bump_presence_active_time(self, user: UserID) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
@@ -274,7 +278,7 @@ class BasePresenceHandler(abc.ABC):
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
- ):
+ ) -> None:
"""Process streams received over replication."""
await self._federation_queue.process_replication_rows(
stream_name, instance_name, token, rows
@@ -286,7 +290,7 @@ class BasePresenceHandler(abc.ABC):
async def maybe_send_presence_to_interested_destinations(
self, states: List[UserPresenceState]
- ):
+ ) -> None:
"""If this instance is a federation sender, send the states to all
destinations that are interested. Filters out any states for remote
users.
@@ -309,7 +313,7 @@ class BasePresenceHandler(abc.ABC):
for destination, host_states in hosts_to_states.items():
self._federation.send_presence_to_destinations(host_states, [destination])
- async def send_full_presence_to_users(self, user_ids: Collection[str]):
+ async def send_full_presence_to_users(self, user_ids: Collection[str]) -> None:
"""
Adds to the list of users who should receive a full snapshot of presence
upon their next sync. Note that this only works for local users.
@@ -363,7 +367,12 @@ class BasePresenceHandler(abc.ABC):
class _NullContextManager(ContextManager[None]):
"""A context manager which does nothing."""
- def __exit__(self, exc_type, exc_val, exc_tb):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
pass
@@ -468,7 +477,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
if self._user_to_num_current_syncs[user_id] == 1:
self.mark_as_coming_online(user_id)
- def _end():
+ def _end() -> None:
# We check that the user_id is in user_to_num_current_syncs because
# user_to_num_current_syncs may have been cleared if we are
# shutting down.
@@ -480,7 +489,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
self.mark_as_going_offline(user_id)
@contextlib.contextmanager
- def _user_syncing():
+ def _user_syncing() -> Generator[None, None, None]:
try:
yield
finally:
@@ -503,7 +512,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
- ):
+ ) -> None:
await super().process_replication_rows(stream_name, instance_name, token, rows)
if stream_name != PresenceStream.NAME:
@@ -689,7 +698,7 @@ class PresenceHandler(BasePresenceHandler):
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
- def run_timeout_handler():
+ def run_timeout_handler() -> Awaitable[None]:
return run_as_background_process(
"handle_presence_timeouts", self._handle_timeouts
)
@@ -698,7 +707,7 @@ class PresenceHandler(BasePresenceHandler):
30, self.clock.looping_call, run_timeout_handler, 5000
)
- def run_persister():
+ def run_persister() -> Awaitable[None]:
return run_as_background_process(
"persist_presence_changes", self._persist_unpersisted_changes
)
@@ -942,8 +951,8 @@ class PresenceHandler(BasePresenceHandler):
when users disconnect/reconnect.
Args:
- user_id (str)
- affect_presence (bool): If false this function will be a no-op.
+ user_id
+ affect_presence: If false this function will be a no-op.
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
@@ -978,7 +987,7 @@ class PresenceHandler(BasePresenceHandler):
]
)
- async def _end():
+ async def _end() -> None:
try:
self.user_to_num_current_syncs[user_id] -= 1
@@ -994,7 +1003,7 @@ class PresenceHandler(BasePresenceHandler):
logger.exception("Error updating presence after sync")
@contextmanager
- def _user_syncing():
+ def _user_syncing() -> Generator[None, None, None]:
try:
yield
finally:
@@ -1264,7 +1273,7 @@ class PresenceHandler(BasePresenceHandler):
if self._event_processing:
return
- async def _process_presence():
+ async def _process_presence() -> None:
assert not self._event_processing
self._event_processing = True
@@ -1513,7 +1522,7 @@ class PresenceEventSource:
room_ids: Optional[List[str]] = None,
include_offline: bool = True,
explicit_room_id: Optional[str] = None,
- **kwargs,
+ **kwargs: Any,
) -> Tuple[List[UserPresenceState], int]:
# The process for getting presence events are:
# 1. Get the rooms the user is in.
@@ -2074,7 +2083,7 @@ class PresenceFederationQueue:
if self._queue_presence_updates:
self._clock.looping_call(self._clear_queue, self._CLEAR_ITEMS_EVERY_MS)
- def _clear_queue(self):
+ def _clear_queue(self) -> None:
"""Clear out older entries from the queue."""
clear_before = self._clock.time_msec() - self._KEEP_ITEMS_IN_QUEUE_FOR_MS
@@ -2205,7 +2214,7 @@ class PresenceFederationQueue:
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
- ):
+ ) -> None:
if stream_name != PresenceFederationStream.NAME:
return
|