diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 39b39cd3e2..983c837c66 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -26,18 +26,22 @@ import contextlib
import logging
from bisect import bisect
from contextlib import contextmanager
+from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
+ Awaitable,
Callable,
Collection,
Dict,
FrozenSet,
+ Generator,
Iterable,
List,
Optional,
Set,
Tuple,
+ Type,
Union,
)
@@ -61,6 +65,7 @@ from synapse.replication.http.streams import ReplicationGetStreamUpdates
from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
from synapse.storage.databases.main import DataStore
+from synapse.streams import EventSource
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.descriptors import _CacheContext, cached
@@ -240,7 +245,7 @@ class BasePresenceHandler(abc.ABC):
"""
@abc.abstractmethod
- async def bump_presence_active_time(self, user: UserID):
+ async def bump_presence_active_time(self, user: UserID) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
@@ -274,7 +279,7 @@ class BasePresenceHandler(abc.ABC):
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
- ):
+ ) -> None:
"""Process streams received over replication."""
await self._federation_queue.process_replication_rows(
stream_name, instance_name, token, rows
@@ -286,7 +291,7 @@ class BasePresenceHandler(abc.ABC):
async def maybe_send_presence_to_interested_destinations(
self, states: List[UserPresenceState]
- ):
+ ) -> None:
"""If this instance is a federation sender, send the states to all
destinations that are interested. Filters out any states for remote
users.
@@ -309,7 +314,7 @@ class BasePresenceHandler(abc.ABC):
for destination, host_states in hosts_to_states.items():
self._federation.send_presence_to_destinations(host_states, [destination])
- async def send_full_presence_to_users(self, user_ids: Collection[str]):
+ async def send_full_presence_to_users(self, user_ids: Collection[str]) -> None:
"""
Adds to the list of users who should receive a full snapshot of presence
upon their next sync. Note that this only works for local users.
@@ -363,7 +368,12 @@ class BasePresenceHandler(abc.ABC):
class _NullContextManager(ContextManager[None]):
"""A context manager which does nothing."""
- def __exit__(self, exc_type, exc_val, exc_tb):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
pass
@@ -374,7 +384,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
self._presence_writer_instance = hs.config.worker.writers.presence[0]
- self._presence_enabled = hs.config.use_presence
+ self._presence_enabled = hs.config.server.use_presence
# Route presence EDUs to the right worker
hs.get_federation_registry().register_instances_for_edu(
@@ -468,7 +478,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
if self._user_to_num_current_syncs[user_id] == 1:
self.mark_as_coming_online(user_id)
- def _end():
+ def _end() -> None:
# We check that the user_id is in user_to_num_current_syncs because
# user_to_num_current_syncs may have been cleared if we are
# shutting down.
@@ -480,7 +490,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
self.mark_as_going_offline(user_id)
@contextlib.contextmanager
- def _user_syncing():
+ def _user_syncing() -> Generator[None, None, None]:
try:
yield
finally:
@@ -503,7 +513,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
- ):
+ ) -> None:
await super().process_replication_rows(stream_name, instance_name, token, rows)
if stream_name != PresenceStream.NAME:
@@ -584,7 +594,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
user_id = target_user.to_string()
# If presence is disabled, no-op
- if not self.hs.config.use_presence:
+ if not self.hs.config.server.use_presence:
return
# Proxy request to instance that writes presence
@@ -601,7 +611,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
with the app.
"""
# If presence is disabled, no-op
- if not self.hs.config.use_presence:
+ if not self.hs.config.server.use_presence:
return
# Proxy request to instance that writes presence
@@ -618,7 +628,7 @@ class PresenceHandler(BasePresenceHandler):
self.server_name = hs.hostname
self.wheel_timer: WheelTimer[str] = WheelTimer()
self.notifier = hs.get_notifier()
- self._presence_enabled = hs.config.use_presence
+ self._presence_enabled = hs.config.server.use_presence
federation_registry = hs.get_federation_registry()
@@ -689,7 +699,7 @@ class PresenceHandler(BasePresenceHandler):
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
- def run_timeout_handler():
+ def run_timeout_handler() -> Awaitable[None]:
return run_as_background_process(
"handle_presence_timeouts", self._handle_timeouts
)
@@ -698,7 +708,7 @@ class PresenceHandler(BasePresenceHandler):
30, self.clock.looping_call, run_timeout_handler, 5000
)
- def run_persister():
+ def run_persister() -> Awaitable[None]:
return run_as_background_process(
"persist_presence_changes", self._persist_unpersisted_changes
)
@@ -916,7 +926,7 @@ class PresenceHandler(BasePresenceHandler):
with the app.
"""
# If presence is disabled, no-op
- if not self.hs.config.use_presence:
+ if not self.hs.config.server.use_presence:
return
user_id = user.to_string()
@@ -942,14 +952,14 @@ class PresenceHandler(BasePresenceHandler):
when users disconnect/reconnect.
Args:
- user_id (str)
- affect_presence (bool): If false this function will be a no-op.
+ user_id
+ affect_presence: If false this function will be a no-op.
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
# Override if it should affect the user's presence, if presence is
# disabled.
- if not self.hs.config.use_presence:
+ if not self.hs.config.server.use_presence:
affect_presence = False
if affect_presence:
@@ -978,7 +988,7 @@ class PresenceHandler(BasePresenceHandler):
]
)
- async def _end():
+ async def _end() -> None:
try:
self.user_to_num_current_syncs[user_id] -= 1
@@ -994,7 +1004,7 @@ class PresenceHandler(BasePresenceHandler):
logger.exception("Error updating presence after sync")
@contextmanager
- def _user_syncing():
+ def _user_syncing() -> Generator[None, None, None]:
try:
yield
finally:
@@ -1264,7 +1274,7 @@ class PresenceHandler(BasePresenceHandler):
if self._event_processing:
return
- async def _process_presence():
+ async def _process_presence() -> None:
assert not self._event_processing
self._event_processing = True
@@ -1491,7 +1501,7 @@ def format_user_presence_state(
return content
-class PresenceEventSource:
+class PresenceEventSource(EventSource[int, UserPresenceState]):
def __init__(self, hs: "HomeServer"):
# We can't call get_presence_handler here because there's a cycle:
#
@@ -1510,10 +1520,11 @@ class PresenceEventSource:
self,
user: UserID,
from_key: Optional[int],
+ limit: Optional[int] = None,
room_ids: Optional[List[str]] = None,
- include_offline: bool = True,
+ is_guest: bool = False,
explicit_room_id: Optional[str] = None,
- **kwargs,
+ include_offline: bool = True,
) -> Tuple[List[UserPresenceState], int]:
# The process for getting presence events are:
# 1. Get the rooms the user is in.
@@ -2074,7 +2085,7 @@ class PresenceFederationQueue:
if self._queue_presence_updates:
self._clock.looping_call(self._clear_queue, self._CLEAR_ITEMS_EVERY_MS)
- def _clear_queue(self):
+ def _clear_queue(self) -> None:
"""Clear out older entries from the queue."""
clear_before = self._clock.time_msec() - self._KEEP_ITEMS_IN_QUEUE_FOR_MS
@@ -2205,7 +2216,7 @@ class PresenceFederationQueue:
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
- ):
+ ) -> None:
if stream_name != PresenceFederationStream.NAME:
return
|