From 04819239bae2b39ee42bfdb6f9b83c6d9fe34169 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 6 Apr 2021 14:38:30 +0100 Subject: Add a Synapse Module for configuring presence update routing (#9491) At the moment, if you'd like to share presence between local or remote users, those users must be sharing a room together. This isn't always the most convenient or useful situation though. This PR adds a module to Synapse that will allow deployments to set up extra logic on where presence updates should be routed. The module must implement two methods, `get_users_for_states` and `get_interested_users`. These methods are given presence updates or user IDs and must return information that Synapse will use to grant passing presence updates around. A method is additionally added to `ModuleApi` which allows triggering a set of users to receive the current, online presence information for all users they are considered interested in. This is the equivalent of that user receiving presence information during an initial sync. The goal of this module is to be fairly generic and useful for a variety of applications, with hard requirements being: * Sending state for a specific set or all known users to a defined set of local and remote users. * The ability to trigger an initial sync for specific users, so they receive all current state. --- synapse/app/generic_worker.py | 3 +- synapse/config/server.py | 39 ++++- synapse/events/presence_router.py | 104 +++++++++++++ synapse/federation/sender/__init__.py | 19 ++- synapse/handlers/presence.py | 278 ++++++++++++++++++++++++++++------ synapse/module_api/__init__.py | 50 ++++++ synapse/server.py | 5 + 7 files changed, 450 insertions(+), 48 deletions(-) create mode 100644 synapse/events/presence_router.py (limited to 'synapse') diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 3df2aa5c2b..d1c2079233 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -281,6 +281,7 @@ class GenericWorkerPresence(BasePresenceHandler): self.hs = hs self.is_mine_id = hs.is_mine_id + self.presence_router = hs.get_presence_router() self._presence_enabled = hs.config.use_presence # The number of ongoing syncs on this process, by user id. @@ -395,7 +396,7 @@ class GenericWorkerPresence(BasePresenceHandler): return _user_syncing() async def notify_from_replication(self, states, stream_id): - parties = await get_interested_parties(self.store, states) + parties = await get_interested_parties(self.store, self.presence_router, states) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( diff --git a/synapse/config/server.py b/synapse/config/server.py index 5f8910b6e1..8decc9d10d 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -27,6 +27,7 @@ import yaml from netaddr import AddrFormatError, IPNetwork, IPSet from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.util.module_loader import load_module from synapse.util.stringutils import parse_and_validate_server_name from ._base import Config, ConfigError @@ -238,7 +239,20 @@ class ServerConfig(Config): self.public_baseurl = config.get("public_baseurl") # Whether to enable user presence. - self.use_presence = config.get("use_presence", True) + presence_config = config.get("presence") or {} + self.use_presence = presence_config.get("enabled") + if self.use_presence is None: + self.use_presence = config.get("use_presence", True) + + # Custom presence router module + self.presence_router_module_class = None + self.presence_router_config = None + presence_router_config = presence_config.get("presence_router") + if presence_router_config: + ( + self.presence_router_module_class, + self.presence_router_config, + ) = load_module(presence_router_config, ("presence", "presence_router")) # Whether to update the user directory or not. This should be set to # false only if we are updating the user directory in a worker @@ -834,9 +848,28 @@ class ServerConfig(Config): # #soft_file_limit: 0 - # Set to false to disable presence tracking on this homeserver. + # Presence tracking allows users to see the state (e.g online/offline) + # of other local and remote users. # - #use_presence: false + presence: + # Uncomment to disable presence tracking on this homeserver. This option + # replaces the previous top-level 'use_presence' option. + # + #enabled: false + + # Presence routers are third-party modules that can specify additional logic + # to where presence updates from users are routed. + # + presence_router: + # The custom module's class. Uncomment to use a custom presence router module. + # + #module: "my_custom_router.PresenceRouter" + + # Configuration options of the custom module. Refer to your module's + # documentation for available options. + # + #config: + # example_option: 'something' # Whether to require authentication to retrieve profile data (avatars, # display names) of other users through the client API. Defaults to diff --git a/synapse/events/presence_router.py b/synapse/events/presence_router.py new file mode 100644 index 0000000000..24cd389d80 --- /dev/null +++ b/synapse/events/presence_router.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import TYPE_CHECKING, Dict, Iterable, Set, Union + +from synapse.api.presence import UserPresenceState + +if TYPE_CHECKING: + from synapse.server import HomeServer + + +class PresenceRouter: + """ + A module that the homeserver will call upon to help route user presence updates to + additional destinations. If a custom presence router is configured, calls will be + passed to that instead. + """ + + ALL_USERS = "ALL" + + def __init__(self, hs: "HomeServer"): + self.custom_presence_router = None + + # Check whether a custom presence router module has been configured + if hs.config.presence_router_module_class: + # Initialise the module + self.custom_presence_router = hs.config.presence_router_module_class( + config=hs.config.presence_router_config, module_api=hs.get_module_api() + ) + + # Ensure the module has implemented the required methods + required_methods = ["get_users_for_states", "get_interested_users"] + for method_name in required_methods: + if not hasattr(self.custom_presence_router, method_name): + raise Exception( + "PresenceRouter module '%s' must implement all required methods: %s" + % ( + hs.config.presence_router_module_class.__name__, + ", ".join(required_methods), + ) + ) + + async def get_users_for_states( + self, + state_updates: Iterable[UserPresenceState], + ) -> Dict[str, Set[UserPresenceState]]: + """ + Given an iterable of user presence updates, determine where each one + needs to go. + + Args: + state_updates: An iterable of user presence state updates. + + Returns: + A dictionary of user_id -> set of UserPresenceState, indicating which + presence updates each user should receive. + """ + if self.custom_presence_router is not None: + # Ask the custom module + return await self.custom_presence_router.get_users_for_states( + state_updates=state_updates + ) + + # Don't include any extra destinations for presence updates + return {} + + async def get_interested_users(self, user_id: str) -> Union[Set[str], ALL_USERS]: + """ + Retrieve a list of users that `user_id` is interested in receiving the + presence of. This will be in addition to those they share a room with. + Optionally, the object PresenceRouter.ALL_USERS can be returned to indicate + that this user should receive all incoming local and remote presence updates. + + Note that this method will only be called for local users, but can return users + that are local or remote. + + Args: + user_id: A user requesting presence updates. + + Returns: + A set of user IDs to return presence updates for, or ALL_USERS to return all + known updates. + """ + if self.custom_presence_router is not None: + # Ask the custom module for interested users + return await self.custom_presence_router.get_interested_users( + user_id=user_id + ) + + # A custom presence router is not defined. + # Don't report any additional interested users + return set() diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 8babb1ebbe..98bfce22ff 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -44,6 +44,7 @@ from synapse.types import JsonDict, ReadReceipt, RoomStreamToken from synapse.util.metrics import Measure, measure_func if TYPE_CHECKING: + from synapse.events.presence_router import PresenceRouter from synapse.server import HomeServer logger = logging.getLogger(__name__) @@ -162,6 +163,7 @@ class FederationSender(AbstractFederationSender): self.clock = hs.get_clock() self.is_mine_id = hs.is_mine_id + self._presence_router = None # type: Optional[PresenceRouter] self._transaction_manager = TransactionManager(hs) self._instance_name = hs.get_instance_name() @@ -584,7 +586,22 @@ class FederationSender(AbstractFederationSender): """Given a list of states populate self.pending_presence_by_dest and poke to send a new transaction to each destination """ - hosts_and_states = await get_interested_remotes(self.store, states, self.state) + # We pull the presence router here instead of __init__ + # to prevent a dependency cycle: + # + # AuthHandler -> Notifier -> FederationSender + # -> PresenceRouter -> ModuleApi -> AuthHandler + if self._presence_router is None: + self._presence_router = self.hs.get_presence_router() + + assert self._presence_router is not None + + hosts_and_states = await get_interested_remotes( + self.store, + self._presence_router, + states, + self.state, + ) for destinations, states in hosts_and_states: for destination in destinations: diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index da92feacc9..c817f2952d 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -25,7 +25,17 @@ The methods that define policy are: import abc import logging from contextlib import contextmanager -from typing import TYPE_CHECKING, Dict, Iterable, List, Set, Tuple +from typing import ( + TYPE_CHECKING, + Dict, + FrozenSet, + Iterable, + List, + Optional, + Set, + Tuple, + Union, +) from prometheus_client import Counter from typing_extensions import ContextManager @@ -34,6 +44,7 @@ import synapse.metrics from synapse.api.constants import EventTypes, Membership, PresenceState from synapse.api.errors import SynapseError from synapse.api.presence import UserPresenceState +from synapse.events.presence_router import PresenceRouter from synapse.logging.context import run_in_background from synapse.logging.utils import log_function from synapse.metrics import LaterGauge @@ -42,7 +53,7 @@ from synapse.state import StateHandler from synapse.storage.databases.main import DataStore from synapse.types import Collection, JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import _CacheContext, cached from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -209,6 +220,7 @@ class PresenceHandler(BasePresenceHandler): self.notifier = hs.get_notifier() self.federation = hs.get_federation_sender() self.state = hs.get_state_handler() + self.presence_router = hs.get_presence_router() self._presence_enabled = hs.config.use_presence federation_registry = hs.get_federation_registry() @@ -653,7 +665,7 @@ class PresenceHandler(BasePresenceHandler): """ stream_id, max_token = await self.store.update_presence(states) - parties = await get_interested_parties(self.store, states) + parties = await get_interested_parties(self.store, self.presence_router, states) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( @@ -1041,7 +1053,12 @@ class PresenceEventSource: # # Presence -> Notifier -> PresenceEventSource -> Presence # + # Same with get_module_api, get_presence_router + # + # AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler self.get_presence_handler = hs.get_presence_handler + self.get_module_api = hs.get_module_api + self.get_presence_router = hs.get_presence_router self.clock = hs.get_clock() self.store = hs.get_datastore() self.state = hs.get_state_handler() @@ -1055,7 +1072,7 @@ class PresenceEventSource: include_offline=True, explicit_room_id=None, **kwargs - ): + ) -> Tuple[List[UserPresenceState], int]: # The process for getting presence events are: # 1. Get the rooms the user is in. # 2. Get the list of user in the rooms. @@ -1068,7 +1085,17 @@ class PresenceEventSource: # We don't try and limit the presence updates by the current token, as # sending down the rare duplicate is not a concern. + user_id = user.to_string() + stream_change_cache = self.store.presence_stream_cache + with Measure(self.clock, "presence.get_new_events"): + if user_id in self.get_module_api()._send_full_presence_to_local_users: + # This user has been specified by a module to receive all current, online + # user presence. Removing from_key and setting include_offline to false + # will do effectively this. + from_key = None + include_offline = False + if from_key is not None: from_key = int(from_key) @@ -1091,59 +1118,209 @@ class PresenceEventSource: # doesn't return. C.f. #5503. return [], max_token - presence = self.get_presence_handler() - stream_change_cache = self.store.presence_stream_cache - + # Figure out which other users this user should receive updates for users_interested_in = await self._get_interested_in(user, explicit_room_id) - user_ids_changed = set() # type: Collection[str] - changed = None - if from_key: - changed = stream_change_cache.get_all_entities_changed(from_key) + # We have a set of users that we're interested in the presence of. We want to + # cross-reference that with the users that have actually changed their presence. - if changed is not None and len(changed) < 500: - assert isinstance(user_ids_changed, set) + # Check whether this user should see all user updates - # For small deltas, its quicker to get all changes and then - # work out if we share a room or they're in our presence list - get_updates_counter.labels("stream").inc() - for other_user_id in changed: - if other_user_id in users_interested_in: - user_ids_changed.add(other_user_id) - else: - # Too many possible updates. Find all users we can see and check - # if any of them have changed. - get_updates_counter.labels("full").inc() + if users_interested_in == PresenceRouter.ALL_USERS: + # Provide presence state for all users + presence_updates = await self._filter_all_presence_updates_for_user( + user_id, include_offline, from_key + ) - if from_key: - user_ids_changed = stream_change_cache.get_entities_changed( - users_interested_in, from_key + # Remove the user from the list of users to receive all presence + if user_id in self.get_module_api()._send_full_presence_to_local_users: + self.get_module_api()._send_full_presence_to_local_users.remove( + user_id ) + + return presence_updates, max_token + + # Make mypy happy. users_interested_in should now be a set + assert not isinstance(users_interested_in, str) + + # The set of users that we're interested in and that have had a presence update. + # We'll actually pull the presence updates for these users at the end. + interested_and_updated_users = ( + set() + ) # type: Union[Set[str], FrozenSet[str]] + + if from_key: + # First get all users that have had a presence update + updated_users = stream_change_cache.get_all_entities_changed(from_key) + + # Cross-reference users we're interested in with those that have had updates. + # Use a slightly-optimised method for processing smaller sets of updates. + if updated_users is not None and len(updated_users) < 500: + # For small deltas, it's quicker to get all changes and then + # cross-reference with the users we're interested in + get_updates_counter.labels("stream").inc() + for other_user_id in updated_users: + if other_user_id in users_interested_in: + # mypy thinks this variable could be a FrozenSet as it's possibly set + # to one in the `get_entities_changed` call below, and `add()` is not + # method on a FrozenSet. That doesn't affect us here though, as + # `interested_and_updated_users` is clearly a set() above. + interested_and_updated_users.add(other_user_id) # type: ignore else: - user_ids_changed = users_interested_in + # Too many possible updates. Find all users we can see and check + # if any of them have changed. + get_updates_counter.labels("full").inc() - updates = await presence.current_state_for_users(user_ids_changed) + interested_and_updated_users = ( + stream_change_cache.get_entities_changed( + users_interested_in, from_key + ) + ) + else: + # No from_key has been specified. Return the presence for all users + # this user is interested in + interested_and_updated_users = users_interested_in + + # Retrieve the current presence state for each user + users_to_state = await self.get_presence_handler().current_state_for_users( + interested_and_updated_users + ) + presence_updates = list(users_to_state.values()) - if include_offline: - return (list(updates.values()), max_token) + # Remove the user from the list of users to receive all presence + if user_id in self.get_module_api()._send_full_presence_to_local_users: + self.get_module_api()._send_full_presence_to_local_users.remove(user_id) + + if not include_offline: + # Filter out offline presence states + presence_updates = self._filter_offline_presence_state(presence_updates) + + return presence_updates, max_token + + async def _filter_all_presence_updates_for_user( + self, + user_id: str, + include_offline: bool, + from_key: Optional[int] = None, + ) -> List[UserPresenceState]: + """ + Computes the presence updates a user should receive. + + First pulls presence updates from the database. Then consults PresenceRouter + for whether any updates should be excluded by user ID. + + Args: + user_id: The User ID of the user to compute presence updates for. + include_offline: Whether to include offline presence states from the results. + from_key: The minimum stream ID of updates to pull from the database + before filtering. + + Returns: + A list of presence states for the given user to receive. + """ + if from_key: + # Only return updates since the last sync + updated_users = self.store.presence_stream_cache.get_all_entities_changed( + from_key + ) + if not updated_users: + updated_users = [] + + # Get the actual presence update for each change + users_to_state = await self.get_presence_handler().current_state_for_users( + updated_users + ) + presence_updates = list(users_to_state.values()) + + if not include_offline: + # Filter out offline states + presence_updates = self._filter_offline_presence_state(presence_updates) else: - return ( - [s for s in updates.values() if s.state != PresenceState.OFFLINE], - max_token, + users_to_state = await self.store.get_presence_for_all_users( + include_offline=include_offline ) + presence_updates = list(users_to_state.values()) + + # TODO: This feels wildly inefficient, and it's unfortunate we need to ask the + # module for information on a number of users when we then only take the info + # for a single user + + # Filter through the presence router + users_to_state_set = await self.get_presence_router().get_users_for_states( + presence_updates + ) + + # We only want the mapping for the syncing user + presence_updates = list(users_to_state_set[user_id]) + + # Return presence information for all users + return presence_updates + + def _filter_offline_presence_state( + self, presence_updates: Iterable[UserPresenceState] + ) -> List[UserPresenceState]: + """Given an iterable containing user presence updates, return a list with any offline + presence states removed. + + Args: + presence_updates: Presence states to filter + + Returns: + A new list with any offline presence states removed. + """ + return [ + update + for update in presence_updates + if update.state != PresenceState.OFFLINE + ] + def get_current_key(self): return self.store.get_current_presence_token() @cached(num_args=2, cache_context=True) - async def _get_interested_in(self, user, explicit_room_id, cache_context): + async def _get_interested_in( + self, + user: UserID, + explicit_room_id: Optional[str] = None, + cache_context: Optional[_CacheContext] = None, + ) -> Union[Set[str], str]: """Returns the set of users that the given user should see presence - updates for + updates for. + + Args: + user: The user to retrieve presence updates for. + explicit_room_id: The users that are in the room will be returned. + + Returns: + A set of user IDs to return presence updates for, or "ALL" to return all + known updates. """ user_id = user.to_string() users_interested_in = set() users_interested_in.add(user_id) # So that we receive our own presence + # cache_context isn't likely to ever be None due to the @cached decorator, + # but we can't have a non-optional argument after the optional argument + # explicit_room_id either. Assert cache_context is not None so we can use it + # without mypy complaining. + assert cache_context + + # Check with the presence router whether we should poll additional users for + # their presence information + additional_users = await self.get_presence_router().get_interested_users( + user.to_string() + ) + if additional_users == PresenceRouter.ALL_USERS: + # If the module requested that this user see the presence updates of *all* + # users, then simply return that instead of calculating what rooms this + # user shares + return PresenceRouter.ALL_USERS + + # Add the additional users from the router + users_interested_in.update(additional_users) + + # Find the users who share a room with this user users_who_share_room = await self.store.get_users_who_share_room_with_user( user_id, on_invalidate=cache_context.invalidate ) @@ -1314,14 +1491,15 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now): async def get_interested_parties( - store: DataStore, states: List[UserPresenceState] + store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState] ) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]: """Given a list of states return which entities (rooms, users) are interested in the given states. Args: - store - states + store: The homeserver's data store. + presence_router: A module for augmenting the destinations for presence updates. + states: A list of incoming user presence updates. Returns: A 2-tuple of `(room_ids_to_states, users_to_states)`, @@ -1337,11 +1515,22 @@ async def get_interested_parties( # Always notify self users_to_states.setdefault(state.user_id, []).append(state) + # Ask a presence routing module for any additional parties if one + # is loaded. + router_users_to_states = await presence_router.get_users_for_states(states) + + # Update the dictionaries with additional destinations and state to send + for user_id, user_states in router_users_to_states.items(): + users_to_states.setdefault(user_id, []).extend(user_states) + return room_ids_to_states, users_to_states async def get_interested_remotes( - store: DataStore, states: List[UserPresenceState], state_handler: StateHandler + store: DataStore, + presence_router: PresenceRouter, + states: List[UserPresenceState], + state_handler: StateHandler, ) -> List[Tuple[Collection[str], List[UserPresenceState]]]: """Given a list of presence states figure out which remote servers should be sent which. @@ -1349,9 +1538,10 @@ async def get_interested_remotes( All the presence states should be for local users only. Args: - store - states - state_handler + store: The homeserver's data store. + presence_router: A module for augmenting the destinations for presence updates. + states: A list of incoming user presence updates. + state_handler: Returns: A list of 2-tuples of destinations and states, where for @@ -1363,7 +1553,9 @@ async def get_interested_remotes( # First we look up the rooms each user is in (as well as any explicit # subscriptions), then for each distinct room we look up the remote # hosts in those rooms. - room_ids_to_states, users_to_states = await get_interested_parties(store, states) + room_ids_to_states, users_to_states = await get_interested_parties( + store, presence_router, states + ) for room_id, states in room_ids_to_states.items(): hosts = await state_handler.get_current_hosts_in_room(room_id) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 781e02fbbb..3ecd46c038 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -50,11 +50,20 @@ class ModuleApi: self._auth = hs.get_auth() self._auth_handler = auth_handler self._server_name = hs.hostname + self._presence_stream = hs.get_event_sources().sources["presence"] # We expose these as properties below in order to attach a helpful docstring. self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient self._public_room_list_manager = PublicRoomListManager(hs) + # The next time these users sync, they will receive the current presence + # state of all local users. Users are added by send_local_online_presence_to, + # and removed after a successful sync. + # + # We make this a private variable to deter modules from accessing it directly, + # though other classes in Synapse will still do so. + self._send_full_presence_to_local_users = set() + @property def http_client(self): """Allows making outbound HTTP requests to remote resources. @@ -385,6 +394,47 @@ class ModuleApi: return event + async def send_local_online_presence_to(self, users: Iterable[str]) -> None: + """ + Forces the equivalent of a presence initial_sync for a set of local or remote + users. The users will receive presence for all currently online users that they + are considered interested in. + + Updates to remote users will be sent immediately, whereas local users will receive + them on their next sync attempt. + + Note that this method can only be run on the main or federation_sender worker + processes. + """ + if not self._hs.should_send_federation(): + raise Exception( + "send_local_online_presence_to can only be run " + "on processes that send federation", + ) + + for user in users: + if self._hs.is_mine_id(user): + # Modify SyncHandler._generate_sync_entry_for_presence to call + # presence_source.get_new_events with an empty `from_key` if + # that user's ID were in a list modified by ModuleApi somewhere. + # That user would then get all presence state on next incremental sync. + + # Force a presence initial_sync for this user next time + self._send_full_presence_to_local_users.add(user) + else: + # Retrieve presence state for currently online users that this user + # is considered interested in + presence_events, _ = await self._presence_stream.get_new_events( + UserID.from_string(user), from_key=None, include_offline=False + ) + + # Send to remote destinations + await make_deferred_yieldable( + # We pull the federation sender here as we can only do so on workers + # that support sending presence + self._hs.get_federation_sender().send_presence(presence_events) + ) + class PublicRoomListManager: """Contains methods for adding to, removing from and querying whether a room diff --git a/synapse/server.py b/synapse/server.py index e42f7b1a18..cfb55c230d 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -51,6 +51,7 @@ from synapse.crypto import context_factory from synapse.crypto.context_factory import RegularPolicyForHTTPS from synapse.crypto.keyring import Keyring from synapse.events.builder import EventBuilderFactory +from synapse.events.presence_router import PresenceRouter from synapse.events.spamcheck import SpamChecker from synapse.events.third_party_rules import ThirdPartyEventRules from synapse.events.utils import EventClientSerializer @@ -425,6 +426,10 @@ class HomeServer(metaclass=abc.ABCMeta): else: raise Exception("Workers cannot write typing") + @cache_in_self + def get_presence_router(self) -> PresenceRouter: + return PresenceRouter(self) + @cache_in_self def get_typing_handler(self) -> FollowerTypingHandler: if self.config.worker.writers.typing == self.get_instance_name(): -- cgit 1.4.1