diff --git a/README.rst b/README.rst
index 655a2bf3be..1a5503572e 100644
--- a/README.rst
+++ b/README.rst
@@ -393,7 +393,12 @@ massive excess of outgoing federation requests (see `discussion
indicate that your server is also issuing far more outgoing federation
requests than can be accounted for by your users' activity, this is a
likely cause. The misbehavior can be worked around by setting
-``use_presence: false`` in the Synapse config file.
+the following in the Synapse config file:
+
+.. code-block:: yaml
+
+ presence:
+ enabled: false
People can't accept room invitations from me
--------------------------------------------
diff --git a/changelog.d/9491.feature b/changelog.d/9491.feature
new file mode 100644
index 0000000000..8b56a95a44
--- /dev/null
+++ b/changelog.d/9491.feature
@@ -0,0 +1 @@
+Add a Synapse module for routing presence updates between users.
diff --git a/docs/presence_router_module.md b/docs/presence_router_module.md
new file mode 100644
index 0000000000..d6566d978d
--- /dev/null
+++ b/docs/presence_router_module.md
@@ -0,0 +1,235 @@
+# Presence Router Module
+
+Synapse supports configuring a module that can specify additional users
+(local or remote) to should receive certain presence updates from local
+users.
+
+Note that routing presence via Application Service transactions is not
+currently supported.
+
+The presence routing module is implemented as a Python class, which will
+be imported by the running Synapse.
+
+## Python Presence Router Class
+
+The Python class is instantiated with two objects:
+
+* A configuration object of some type (see below).
+* An instance of `synapse.module_api.ModuleApi`.
+
+It then implements methods related to presence routing.
+
+Note that one method of `ModuleApi` that may be useful is:
+
+```python
+async def ModuleApi.send_local_online_presence_to(users: Iterable[str]) -> None
+```
+
+which can be given a list of local or remote MXIDs to broadcast known, online user
+presence to (for those users that the receiving user is considered interested in).
+It does not include state for users who are currently offline, and it can only be
+called on workers that support sending federation.
+
+### Module structure
+
+Below is a list of possible methods that can be implemented, and whether they are
+required.
+
+#### `parse_config`
+
+```python
+def parse_config(config_dict: dict) -> Any
+```
+
+**Required.** A static method that is passed a dictionary of config options, and
+ should return a validated config object. This method is described further in
+ [Configuration](#configuration).
+
+#### `get_users_for_states`
+
+```python
+async def get_users_for_states(
+ self,
+ state_updates: Iterable[UserPresenceState],
+) -> Dict[str, Set[UserPresenceState]]:
+```
+
+**Required.** An asynchronous method that is passed an iterable of user presence
+state. This method can determine whether a given presence update should be sent to certain
+users. It does this by returning a dictionary with keys representing local or remote
+Matrix User IDs, and values being a python set
+of `synapse.handlers.presence.UserPresenceState` instances.
+
+Synapse will then attempt to send the specified presence updates to each user when
+possible.
+
+#### `get_interested_users`
+
+```python
+async def get_interested_users(self, user_id: str) -> Union[Set[str], str]
+```
+
+**Required.** An asynchronous method that is passed a single Matrix User ID. This
+method is expected to return the users that the passed in user may be interested in the
+presence of. Returned users may be local or remote. The presence routed as a result of
+what this method returns is sent in addition to the updates already sent between users
+that share a room together. Presence updates are deduplicated.
+
+This method should return a python set of Matrix User IDs, or the object
+`synapse.events.presence_router.PresenceRouter.ALL_USERS` to indicate that the passed
+user should receive presence information for *all* known users.
+
+For clarity, if the user `@alice:example.org` is passed to this method, and the Set
+`{"@bob:example.com", "@charlie:somewhere.org"}` is returned, this signifies that Alice
+should receive presence updates sent by Bob and Charlie, regardless of whether these
+users share a room.
+
+### Example
+
+Below is an example implementation of a presence router class.
+
+```python
+from typing import Dict, Iterable, Set, Union
+from synapse.events.presence_router import PresenceRouter
+from synapse.handlers.presence import UserPresenceState
+from synapse.module_api import ModuleApi
+
+class PresenceRouterConfig:
+ def __init__(self):
+ # Config options with their defaults
+ # A list of users to always send all user presence updates to
+ self.always_send_to_users = [] # type: List[str]
+
+ # A list of users to ignore presence updates for. Does not affect
+ # shared-room presence relationships
+ self.blacklisted_users = [] # type: List[str]
+
+class ExamplePresenceRouter:
+ """An example implementation of synapse.presence_router.PresenceRouter.
+ Supports routing all presence to a configured set of users, or a subset
+ of presence from certain users to members of certain rooms.
+
+ Args:
+ config: A configuration object.
+ module_api: An instance of Synapse's ModuleApi.
+ """
+ def __init__(self, config: PresenceRouterConfig, module_api: ModuleApi):
+ self._config = config
+ self._module_api = module_api
+
+ @staticmethod
+ def parse_config(config_dict: dict) -> PresenceRouterConfig:
+ """Parse a configuration dictionary from the homeserver config, do
+ some validation and return a typed PresenceRouterConfig.
+
+ Args:
+ config_dict: The configuration dictionary.
+
+ Returns:
+ A validated config object.
+ """
+ # Initialise a typed config object
+ config = PresenceRouterConfig()
+ always_send_to_users = config_dict.get("always_send_to_users")
+ blacklisted_users = config_dict.get("blacklisted_users")
+
+ # Do some validation of config options... otherwise raise a
+ # synapse.config.ConfigError.
+ config.always_send_to_users = always_send_to_users
+ config.blacklisted_users = blacklisted_users
+
+ return config
+
+ async def get_users_for_states(
+ self,
+ state_updates: Iterable[UserPresenceState],
+ ) -> Dict[str, Set[UserPresenceState]]:
+ """Given an iterable of user presence updates, determine where each one
+ needs to go. Returned results will not affect presence updates that are
+ sent between users who share a room.
+
+ Args:
+ state_updates: An iterable of user presence state updates.
+
+ Returns:
+ A dictionary of user_id -> set of UserPresenceState that the user should
+ receive.
+ """
+ destination_users = {} # type: Dict[str, Set[UserPresenceState]
+
+ # Ignore any updates for blacklisted users
+ desired_updates = set()
+ for update in state_updates:
+ if update.state_key not in self._config.blacklisted_users:
+ desired_updates.add(update)
+
+ # Send all presence updates to specific users
+ for user_id in self._config.always_send_to_users:
+ destination_users[user_id] = desired_updates
+
+ return destination_users
+
+ async def get_interested_users(
+ self,
+ user_id: str,
+ ) -> Union[Set[str], PresenceRouter.ALL_USERS]:
+ """
+ Retrieve a list of users that `user_id` is interested in receiving the
+ presence of. This will be in addition to those they share a room with.
+ Optionally, the object PresenceRouter.ALL_USERS can be returned to indicate
+ that this user should receive all incoming local and remote presence updates.
+
+ Note that this method will only be called for local users.
+
+ Args:
+ user_id: A user requesting presence updates.
+
+ Returns:
+ A set of user IDs to return additional presence updates for, or
+ PresenceRouter.ALL_USERS to return presence updates for all other users.
+ """
+ if user_id in self._config.always_send_to_users:
+ return PresenceRouter.ALL_USERS
+
+ return set()
+```
+
+#### A note on `get_users_for_states` and `get_interested_users`
+
+Both of these methods are effectively two different sides of the same coin. The logic
+regarding which users should receive updates for other users should be the same
+between them.
+
+`get_users_for_states` is called when presence updates come in from either federation
+or local users, and is used to either direct local presence to remote users, or to
+wake up the sync streams of local users to collect remote presence.
+
+In contrast, `get_interested_users` is used to determine the users that presence should
+be fetched for when a local user is syncing. This presence is then retrieved, before
+being fed through `get_users_for_states` once again, with only the syncing user's
+routing information pulled from the resulting dictionary.
+
+Their routing logic should thus line up, else you may run into unintended behaviour.
+
+## Configuration
+
+Once you've crafted your module and installed it into the same Python environment as
+Synapse, amend your homeserver config file with the following.
+
+```yaml
+presence:
+ routing_module:
+ module: my_module.ExamplePresenceRouter
+ config:
+ # Any configuration options for your module. The below is an example.
+ # of setting options for ExamplePresenceRouter.
+ always_send_to_users: ["@presence_gobbler:example.org"]
+ blacklisted_users:
+ - "@alice:example.com"
+ - "@bob:example.com"
+ ...
+```
+
+The contents of `config` will be passed as a Python dictionary to the static
+`parse_config` method of your class. The object returned by this method will
+then be passed to the `__init__` method of your module as `config`.
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index b0bf987740..9182dcd987 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -82,9 +82,28 @@ pid_file: DATADIR/homeserver.pid
#
#soft_file_limit: 0
-# Set to false to disable presence tracking on this homeserver.
+# Presence tracking allows users to see the state (e.g online/offline)
+# of other local and remote users.
#
-#use_presence: false
+presence:
+ # Uncomment to disable presence tracking on this homeserver. This option
+ # replaces the previous top-level 'use_presence' option.
+ #
+ #enabled: false
+
+ # Presence routers are third-party modules that can specify additional logic
+ # to where presence updates from users are routed.
+ #
+ presence_router:
+ # The custom module's class. Uncomment to use a custom presence router module.
+ #
+ #module: "my_custom_router.PresenceRouter"
+
+ # Configuration options of the custom module. Refer to your module's
+ # documentation for available options.
+ #
+ #config:
+ # example_option: 'something'
# Whether to require authentication to retrieve profile data (avatars,
# display names) of other users through the client API. Defaults to
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 3df2aa5c2b..d1c2079233 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -281,6 +281,7 @@ class GenericWorkerPresence(BasePresenceHandler):
self.hs = hs
self.is_mine_id = hs.is_mine_id
+ self.presence_router = hs.get_presence_router()
self._presence_enabled = hs.config.use_presence
# The number of ongoing syncs on this process, by user id.
@@ -395,7 +396,7 @@ class GenericWorkerPresence(BasePresenceHandler):
return _user_syncing()
async def notify_from_replication(self, states, stream_id):
- parties = await get_interested_parties(self.store, states)
+ parties = await get_interested_parties(self.store, self.presence_router, states)
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 5f8910b6e1..8decc9d10d 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -27,6 +27,7 @@ import yaml
from netaddr import AddrFormatError, IPNetwork, IPSet
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.util.module_loader import load_module
from synapse.util.stringutils import parse_and_validate_server_name
from ._base import Config, ConfigError
@@ -238,7 +239,20 @@ class ServerConfig(Config):
self.public_baseurl = config.get("public_baseurl")
# Whether to enable user presence.
- self.use_presence = config.get("use_presence", True)
+ presence_config = config.get("presence") or {}
+ self.use_presence = presence_config.get("enabled")
+ if self.use_presence is None:
+ self.use_presence = config.get("use_presence", True)
+
+ # Custom presence router module
+ self.presence_router_module_class = None
+ self.presence_router_config = None
+ presence_router_config = presence_config.get("presence_router")
+ if presence_router_config:
+ (
+ self.presence_router_module_class,
+ self.presence_router_config,
+ ) = load_module(presence_router_config, ("presence", "presence_router"))
# Whether to update the user directory or not. This should be set to
# false only if we are updating the user directory in a worker
@@ -834,9 +848,28 @@ class ServerConfig(Config):
#
#soft_file_limit: 0
- # Set to false to disable presence tracking on this homeserver.
+ # Presence tracking allows users to see the state (e.g online/offline)
+ # of other local and remote users.
#
- #use_presence: false
+ presence:
+ # Uncomment to disable presence tracking on this homeserver. This option
+ # replaces the previous top-level 'use_presence' option.
+ #
+ #enabled: false
+
+ # Presence routers are third-party modules that can specify additional logic
+ # to where presence updates from users are routed.
+ #
+ presence_router:
+ # The custom module's class. Uncomment to use a custom presence router module.
+ #
+ #module: "my_custom_router.PresenceRouter"
+
+ # Configuration options of the custom module. Refer to your module's
+ # documentation for available options.
+ #
+ #config:
+ # example_option: 'something'
# Whether to require authentication to retrieve profile data (avatars,
# display names) of other users through the client API. Defaults to
diff --git a/synapse/events/presence_router.py b/synapse/events/presence_router.py
new file mode 100644
index 0000000000..24cd389d80
--- /dev/null
+++ b/synapse/events/presence_router.py
@@ -0,0 +1,104 @@
+# -*- coding: utf-8 -*-
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import TYPE_CHECKING, Dict, Iterable, Set, Union
+
+from synapse.api.presence import UserPresenceState
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+
+class PresenceRouter:
+ """
+ A module that the homeserver will call upon to help route user presence updates to
+ additional destinations. If a custom presence router is configured, calls will be
+ passed to that instead.
+ """
+
+ ALL_USERS = "ALL"
+
+ def __init__(self, hs: "HomeServer"):
+ self.custom_presence_router = None
+
+ # Check whether a custom presence router module has been configured
+ if hs.config.presence_router_module_class:
+ # Initialise the module
+ self.custom_presence_router = hs.config.presence_router_module_class(
+ config=hs.config.presence_router_config, module_api=hs.get_module_api()
+ )
+
+ # Ensure the module has implemented the required methods
+ required_methods = ["get_users_for_states", "get_interested_users"]
+ for method_name in required_methods:
+ if not hasattr(self.custom_presence_router, method_name):
+ raise Exception(
+ "PresenceRouter module '%s' must implement all required methods: %s"
+ % (
+ hs.config.presence_router_module_class.__name__,
+ ", ".join(required_methods),
+ )
+ )
+
+ async def get_users_for_states(
+ self,
+ state_updates: Iterable[UserPresenceState],
+ ) -> Dict[str, Set[UserPresenceState]]:
+ """
+ Given an iterable of user presence updates, determine where each one
+ needs to go.
+
+ Args:
+ state_updates: An iterable of user presence state updates.
+
+ Returns:
+ A dictionary of user_id -> set of UserPresenceState, indicating which
+ presence updates each user should receive.
+ """
+ if self.custom_presence_router is not None:
+ # Ask the custom module
+ return await self.custom_presence_router.get_users_for_states(
+ state_updates=state_updates
+ )
+
+ # Don't include any extra destinations for presence updates
+ return {}
+
+ async def get_interested_users(self, user_id: str) -> Union[Set[str], ALL_USERS]:
+ """
+ Retrieve a list of users that `user_id` is interested in receiving the
+ presence of. This will be in addition to those they share a room with.
+ Optionally, the object PresenceRouter.ALL_USERS can be returned to indicate
+ that this user should receive all incoming local and remote presence updates.
+
+ Note that this method will only be called for local users, but can return users
+ that are local or remote.
+
+ Args:
+ user_id: A user requesting presence updates.
+
+ Returns:
+ A set of user IDs to return presence updates for, or ALL_USERS to return all
+ known updates.
+ """
+ if self.custom_presence_router is not None:
+ # Ask the custom module for interested users
+ return await self.custom_presence_router.get_interested_users(
+ user_id=user_id
+ )
+
+ # A custom presence router is not defined.
+ # Don't report any additional interested users
+ return set()
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 8babb1ebbe..98bfce22ff 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -44,6 +44,7 @@ from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func
if TYPE_CHECKING:
+ from synapse.events.presence_router import PresenceRouter
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -162,6 +163,7 @@ class FederationSender(AbstractFederationSender):
self.clock = hs.get_clock()
self.is_mine_id = hs.is_mine_id
+ self._presence_router = None # type: Optional[PresenceRouter]
self._transaction_manager = TransactionManager(hs)
self._instance_name = hs.get_instance_name()
@@ -584,7 +586,22 @@ class FederationSender(AbstractFederationSender):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
- hosts_and_states = await get_interested_remotes(self.store, states, self.state)
+ # We pull the presence router here instead of __init__
+ # to prevent a dependency cycle:
+ #
+ # AuthHandler -> Notifier -> FederationSender
+ # -> PresenceRouter -> ModuleApi -> AuthHandler
+ if self._presence_router is None:
+ self._presence_router = self.hs.get_presence_router()
+
+ assert self._presence_router is not None
+
+ hosts_and_states = await get_interested_remotes(
+ self.store,
+ self._presence_router,
+ states,
+ self.state,
+ )
for destinations, states in hosts_and_states:
for destination in destinations:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index da92feacc9..c817f2952d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -25,7 +25,17 @@ The methods that define policy are:
import abc
import logging
from contextlib import contextmanager
-from typing import TYPE_CHECKING, Dict, Iterable, List, Set, Tuple
+from typing import (
+ TYPE_CHECKING,
+ Dict,
+ FrozenSet,
+ Iterable,
+ List,
+ Optional,
+ Set,
+ Tuple,
+ Union,
+)
from prometheus_client import Counter
from typing_extensions import ContextManager
@@ -34,6 +44,7 @@ import synapse.metrics
from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
from synapse.api.presence import UserPresenceState
+from synapse.events.presence_router import PresenceRouter
from synapse.logging.context import run_in_background
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
@@ -42,7 +53,7 @@ from synapse.state import StateHandler
from synapse.storage.databases.main import DataStore
from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import _CacheContext, cached
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@@ -209,6 +220,7 @@ class PresenceHandler(BasePresenceHandler):
self.notifier = hs.get_notifier()
self.federation = hs.get_federation_sender()
self.state = hs.get_state_handler()
+ self.presence_router = hs.get_presence_router()
self._presence_enabled = hs.config.use_presence
federation_registry = hs.get_federation_registry()
@@ -653,7 +665,7 @@ class PresenceHandler(BasePresenceHandler):
"""
stream_id, max_token = await self.store.update_presence(states)
- parties = await get_interested_parties(self.store, states)
+ parties = await get_interested_parties(self.store, self.presence_router, states)
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
@@ -1041,7 +1053,12 @@ class PresenceEventSource:
#
# Presence -> Notifier -> PresenceEventSource -> Presence
#
+ # Same with get_module_api, get_presence_router
+ #
+ # AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
self.get_presence_handler = hs.get_presence_handler
+ self.get_module_api = hs.get_module_api
+ self.get_presence_router = hs.get_presence_router
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
@@ -1055,7 +1072,7 @@ class PresenceEventSource:
include_offline=True,
explicit_room_id=None,
**kwargs
- ):
+ ) -> Tuple[List[UserPresenceState], int]:
# The process for getting presence events are:
# 1. Get the rooms the user is in.
# 2. Get the list of user in the rooms.
@@ -1068,7 +1085,17 @@ class PresenceEventSource:
# We don't try and limit the presence updates by the current token, as
# sending down the rare duplicate is not a concern.
+ user_id = user.to_string()
+ stream_change_cache = self.store.presence_stream_cache
+
with Measure(self.clock, "presence.get_new_events"):
+ if user_id in self.get_module_api()._send_full_presence_to_local_users:
+ # This user has been specified by a module to receive all current, online
+ # user presence. Removing from_key and setting include_offline to false
+ # will do effectively this.
+ from_key = None
+ include_offline = False
+
if from_key is not None:
from_key = int(from_key)
@@ -1091,59 +1118,209 @@ class PresenceEventSource:
# doesn't return. C.f. #5503.
return [], max_token
- presence = self.get_presence_handler()
- stream_change_cache = self.store.presence_stream_cache
-
+ # Figure out which other users this user should receive updates for
users_interested_in = await self._get_interested_in(user, explicit_room_id)
- user_ids_changed = set() # type: Collection[str]
- changed = None
- if from_key:
- changed = stream_change_cache.get_all_entities_changed(from_key)
+ # We have a set of users that we're interested in the presence of. We want to
+ # cross-reference that with the users that have actually changed their presence.
- if changed is not None and len(changed) < 500:
- assert isinstance(user_ids_changed, set)
+ # Check whether this user should see all user updates
- # For small deltas, its quicker to get all changes and then
- # work out if we share a room or they're in our presence list
- get_updates_counter.labels("stream").inc()
- for other_user_id in changed:
- if other_user_id in users_interested_in:
- user_ids_changed.add(other_user_id)
- else:
- # Too many possible updates. Find all users we can see and check
- # if any of them have changed.
- get_updates_counter.labels("full").inc()
+ if users_interested_in == PresenceRouter.ALL_USERS:
+ # Provide presence state for all users
+ presence_updates = await self._filter_all_presence_updates_for_user(
+ user_id, include_offline, from_key
+ )
- if from_key:
- user_ids_changed = stream_change_cache.get_entities_changed(
- users_interested_in, from_key
+ # Remove the user from the list of users to receive all presence
+ if user_id in self.get_module_api()._send_full_presence_to_local_users:
+ self.get_module_api()._send_full_presence_to_local_users.remove(
+ user_id
)
+
+ return presence_updates, max_token
+
+ # Make mypy happy. users_interested_in should now be a set
+ assert not isinstance(users_interested_in, str)
+
+ # The set of users that we're interested in and that have had a presence update.
+ # We'll actually pull the presence updates for these users at the end.
+ interested_and_updated_users = (
+ set()
+ ) # type: Union[Set[str], FrozenSet[str]]
+
+ if from_key:
+ # First get all users that have had a presence update
+ updated_users = stream_change_cache.get_all_entities_changed(from_key)
+
+ # Cross-reference users we're interested in with those that have had updates.
+ # Use a slightly-optimised method for processing smaller sets of updates.
+ if updated_users is not None and len(updated_users) < 500:
+ # For small deltas, it's quicker to get all changes and then
+ # cross-reference with the users we're interested in
+ get_updates_counter.labels("stream").inc()
+ for other_user_id in updated_users:
+ if other_user_id in users_interested_in:
+ # mypy thinks this variable could be a FrozenSet as it's possibly set
+ # to one in the `get_entities_changed` call below, and `add()` is not
+ # method on a FrozenSet. That doesn't affect us here though, as
+ # `interested_and_updated_users` is clearly a set() above.
+ interested_and_updated_users.add(other_user_id) # type: ignore
else:
- user_ids_changed = users_interested_in
+ # Too many possible updates. Find all users we can see and check
+ # if any of them have changed.
+ get_updates_counter.labels("full").inc()
- updates = await presence.current_state_for_users(user_ids_changed)
+ interested_and_updated_users = (
+ stream_change_cache.get_entities_changed(
+ users_interested_in, from_key
+ )
+ )
+ else:
+ # No from_key has been specified. Return the presence for all users
+ # this user is interested in
+ interested_and_updated_users = users_interested_in
+
+ # Retrieve the current presence state for each user
+ users_to_state = await self.get_presence_handler().current_state_for_users(
+ interested_and_updated_users
+ )
+ presence_updates = list(users_to_state.values())
- if include_offline:
- return (list(updates.values()), max_token)
+ # Remove the user from the list of users to receive all presence
+ if user_id in self.get_module_api()._send_full_presence_to_local_users:
+ self.get_module_api()._send_full_presence_to_local_users.remove(user_id)
+
+ if not include_offline:
+ # Filter out offline presence states
+ presence_updates = self._filter_offline_presence_state(presence_updates)
+
+ return presence_updates, max_token
+
+ async def _filter_all_presence_updates_for_user(
+ self,
+ user_id: str,
+ include_offline: bool,
+ from_key: Optional[int] = None,
+ ) -> List[UserPresenceState]:
+ """
+ Computes the presence updates a user should receive.
+
+ First pulls presence updates from the database. Then consults PresenceRouter
+ for whether any updates should be excluded by user ID.
+
+ Args:
+ user_id: The User ID of the user to compute presence updates for.
+ include_offline: Whether to include offline presence states from the results.
+ from_key: The minimum stream ID of updates to pull from the database
+ before filtering.
+
+ Returns:
+ A list of presence states for the given user to receive.
+ """
+ if from_key:
+ # Only return updates since the last sync
+ updated_users = self.store.presence_stream_cache.get_all_entities_changed(
+ from_key
+ )
+ if not updated_users:
+ updated_users = []
+
+ # Get the actual presence update for each change
+ users_to_state = await self.get_presence_handler().current_state_for_users(
+ updated_users
+ )
+ presence_updates = list(users_to_state.values())
+
+ if not include_offline:
+ # Filter out offline states
+ presence_updates = self._filter_offline_presence_state(presence_updates)
else:
- return (
- [s for s in updates.values() if s.state != PresenceState.OFFLINE],
- max_token,
+ users_to_state = await self.store.get_presence_for_all_users(
+ include_offline=include_offline
)
+ presence_updates = list(users_to_state.values())
+
+ # TODO: This feels wildly inefficient, and it's unfortunate we need to ask the
+ # module for information on a number of users when we then only take the info
+ # for a single user
+
+ # Filter through the presence router
+ users_to_state_set = await self.get_presence_router().get_users_for_states(
+ presence_updates
+ )
+
+ # We only want the mapping for the syncing user
+ presence_updates = list(users_to_state_set[user_id])
+
+ # Return presence information for all users
+ return presence_updates
+
+ def _filter_offline_presence_state(
+ self, presence_updates: Iterable[UserPresenceState]
+ ) -> List[UserPresenceState]:
+ """Given an iterable containing user presence updates, return a list with any offline
+ presence states removed.
+
+ Args:
+ presence_updates: Presence states to filter
+
+ Returns:
+ A new list with any offline presence states removed.
+ """
+ return [
+ update
+ for update in presence_updates
+ if update.state != PresenceState.OFFLINE
+ ]
+
def get_current_key(self):
return self.store.get_current_presence_token()
@cached(num_args=2, cache_context=True)
- async def _get_interested_in(self, user, explicit_room_id, cache_context):
+ async def _get_interested_in(
+ self,
+ user: UserID,
+ explicit_room_id: Optional[str] = None,
+ cache_context: Optional[_CacheContext] = None,
+ ) -> Union[Set[str], str]:
"""Returns the set of users that the given user should see presence
- updates for
+ updates for.
+
+ Args:
+ user: The user to retrieve presence updates for.
+ explicit_room_id: The users that are in the room will be returned.
+
+ Returns:
+ A set of user IDs to return presence updates for, or "ALL" to return all
+ known updates.
"""
user_id = user.to_string()
users_interested_in = set()
users_interested_in.add(user_id) # So that we receive our own presence
+ # cache_context isn't likely to ever be None due to the @cached decorator,
+ # but we can't have a non-optional argument after the optional argument
+ # explicit_room_id either. Assert cache_context is not None so we can use it
+ # without mypy complaining.
+ assert cache_context
+
+ # Check with the presence router whether we should poll additional users for
+ # their presence information
+ additional_users = await self.get_presence_router().get_interested_users(
+ user.to_string()
+ )
+ if additional_users == PresenceRouter.ALL_USERS:
+ # If the module requested that this user see the presence updates of *all*
+ # users, then simply return that instead of calculating what rooms this
+ # user shares
+ return PresenceRouter.ALL_USERS
+
+ # Add the additional users from the router
+ users_interested_in.update(additional_users)
+
+ # Find the users who share a room with this user
users_who_share_room = await self.store.get_users_who_share_room_with_user(
user_id, on_invalidate=cache_context.invalidate
)
@@ -1314,14 +1491,15 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
async def get_interested_parties(
- store: DataStore, states: List[UserPresenceState]
+ store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState]
) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
"""Given a list of states return which entities (rooms, users)
are interested in the given states.
Args:
- store
- states
+ store: The homeserver's data store.
+ presence_router: A module for augmenting the destinations for presence updates.
+ states: A list of incoming user presence updates.
Returns:
A 2-tuple of `(room_ids_to_states, users_to_states)`,
@@ -1337,11 +1515,22 @@ async def get_interested_parties(
# Always notify self
users_to_states.setdefault(state.user_id, []).append(state)
+ # Ask a presence routing module for any additional parties if one
+ # is loaded.
+ router_users_to_states = await presence_router.get_users_for_states(states)
+
+ # Update the dictionaries with additional destinations and state to send
+ for user_id, user_states in router_users_to_states.items():
+ users_to_states.setdefault(user_id, []).extend(user_states)
+
return room_ids_to_states, users_to_states
async def get_interested_remotes(
- store: DataStore, states: List[UserPresenceState], state_handler: StateHandler
+ store: DataStore,
+ presence_router: PresenceRouter,
+ states: List[UserPresenceState],
+ state_handler: StateHandler,
) -> List[Tuple[Collection[str], List[UserPresenceState]]]:
"""Given a list of presence states figure out which remote servers
should be sent which.
@@ -1349,9 +1538,10 @@ async def get_interested_remotes(
All the presence states should be for local users only.
Args:
- store
- states
- state_handler
+ store: The homeserver's data store.
+ presence_router: A module for augmenting the destinations for presence updates.
+ states: A list of incoming user presence updates.
+ state_handler:
Returns:
A list of 2-tuples of destinations and states, where for
@@ -1363,7 +1553,9 @@ async def get_interested_remotes(
# First we look up the rooms each user is in (as well as any explicit
# subscriptions), then for each distinct room we look up the remote
# hosts in those rooms.
- room_ids_to_states, users_to_states = await get_interested_parties(store, states)
+ room_ids_to_states, users_to_states = await get_interested_parties(
+ store, presence_router, states
+ )
for room_id, states in room_ids_to_states.items():
hosts = await state_handler.get_current_hosts_in_room(room_id)
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 781e02fbbb..3ecd46c038 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -50,11 +50,20 @@ class ModuleApi:
self._auth = hs.get_auth()
self._auth_handler = auth_handler
self._server_name = hs.hostname
+ self._presence_stream = hs.get_event_sources().sources["presence"]
# We expose these as properties below in order to attach a helpful docstring.
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
self._public_room_list_manager = PublicRoomListManager(hs)
+ # The next time these users sync, they will receive the current presence
+ # state of all local users. Users are added by send_local_online_presence_to,
+ # and removed after a successful sync.
+ #
+ # We make this a private variable to deter modules from accessing it directly,
+ # though other classes in Synapse will still do so.
+ self._send_full_presence_to_local_users = set()
+
@property
def http_client(self):
"""Allows making outbound HTTP requests to remote resources.
@@ -385,6 +394,47 @@ class ModuleApi:
return event
+ async def send_local_online_presence_to(self, users: Iterable[str]) -> None:
+ """
+ Forces the equivalent of a presence initial_sync for a set of local or remote
+ users. The users will receive presence for all currently online users that they
+ are considered interested in.
+
+ Updates to remote users will be sent immediately, whereas local users will receive
+ them on their next sync attempt.
+
+ Note that this method can only be run on the main or federation_sender worker
+ processes.
+ """
+ if not self._hs.should_send_federation():
+ raise Exception(
+ "send_local_online_presence_to can only be run "
+ "on processes that send federation",
+ )
+
+ for user in users:
+ if self._hs.is_mine_id(user):
+ # Modify SyncHandler._generate_sync_entry_for_presence to call
+ # presence_source.get_new_events with an empty `from_key` if
+ # that user's ID were in a list modified by ModuleApi somewhere.
+ # That user would then get all presence state on next incremental sync.
+
+ # Force a presence initial_sync for this user next time
+ self._send_full_presence_to_local_users.add(user)
+ else:
+ # Retrieve presence state for currently online users that this user
+ # is considered interested in
+ presence_events, _ = await self._presence_stream.get_new_events(
+ UserID.from_string(user), from_key=None, include_offline=False
+ )
+
+ # Send to remote destinations
+ await make_deferred_yieldable(
+ # We pull the federation sender here as we can only do so on workers
+ # that support sending presence
+ self._hs.get_federation_sender().send_presence(presence_events)
+ )
+
class PublicRoomListManager:
"""Contains methods for adding to, removing from and querying whether a room
diff --git a/synapse/server.py b/synapse/server.py
index e42f7b1a18..cfb55c230d 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -51,6 +51,7 @@ from synapse.crypto import context_factory
from synapse.crypto.context_factory import RegularPolicyForHTTPS
from synapse.crypto.keyring import Keyring
from synapse.events.builder import EventBuilderFactory
+from synapse.events.presence_router import PresenceRouter
from synapse.events.spamcheck import SpamChecker
from synapse.events.third_party_rules import ThirdPartyEventRules
from synapse.events.utils import EventClientSerializer
@@ -426,6 +427,10 @@ class HomeServer(metaclass=abc.ABCMeta):
raise Exception("Workers cannot write typing")
@cache_in_self
+ def get_presence_router(self) -> PresenceRouter:
+ return PresenceRouter(self)
+
+ @cache_in_self
def get_typing_handler(self) -> FollowerTypingHandler:
if self.config.worker.writers.typing == self.get_instance_name():
# Use get_typing_writer_handler to ensure that we use the same
diff --git a/tests/events/test_presence_router.py b/tests/events/test_presence_router.py
new file mode 100644
index 0000000000..c6e547f11c
--- /dev/null
+++ b/tests/events/test_presence_router.py
@@ -0,0 +1,386 @@
+# -*- coding: utf-8 -*-
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import Dict, Iterable, List, Optional, Set, Tuple, Union
+
+from mock import Mock
+
+import attr
+
+from synapse.api.constants import EduTypes
+from synapse.events.presence_router import PresenceRouter
+from synapse.federation.units import Transaction
+from synapse.handlers.presence import UserPresenceState
+from synapse.module_api import ModuleApi
+from synapse.rest import admin
+from synapse.rest.client.v1 import login, presence, room
+from synapse.types import JsonDict, StreamToken, create_requester
+
+from tests.handlers.test_sync import generate_sync_config
+from tests.unittest import FederatingHomeserverTestCase, TestCase, override_config
+
+
+@attr.s
+class PresenceRouterTestConfig:
+ users_who_should_receive_all_presence = attr.ib(type=List[str], default=[])
+
+
+class PresenceRouterTestModule:
+ def __init__(self, config: PresenceRouterTestConfig, module_api: ModuleApi):
+ self._config = config
+ self._module_api = module_api
+
+ async def get_users_for_states(
+ self, state_updates: Iterable[UserPresenceState]
+ ) -> Dict[str, Set[UserPresenceState]]:
+ users_to_state = {
+ user_id: set(state_updates)
+ for user_id in self._config.users_who_should_receive_all_presence
+ }
+ return users_to_state
+
+ async def get_interested_users(
+ self, user_id: str
+ ) -> Union[Set[str], PresenceRouter.ALL_USERS]:
+ if user_id in self._config.users_who_should_receive_all_presence:
+ return PresenceRouter.ALL_USERS
+
+ return set()
+
+ @staticmethod
+ def parse_config(config_dict: dict) -> PresenceRouterTestConfig:
+ """Parse a configuration dictionary from the homeserver config, do
+ some validation and return a typed PresenceRouterConfig.
+
+ Args:
+ config_dict: The configuration dictionary.
+
+ Returns:
+ A validated config object.
+ """
+ # Initialise a typed config object
+ config = PresenceRouterTestConfig()
+
+ config.users_who_should_receive_all_presence = config_dict.get(
+ "users_who_should_receive_all_presence"
+ )
+
+ return config
+
+
+class PresenceRouterTestCase(FederatingHomeserverTestCase):
+ servlets = [
+ admin.register_servlets,
+ login.register_servlets,
+ room.register_servlets,
+ presence.register_servlets,
+ ]
+
+ def make_homeserver(self, reactor, clock):
+ return self.setup_test_homeserver(
+ federation_transport_client=Mock(spec=["send_transaction"]),
+ )
+
+ def prepare(self, reactor, clock, homeserver):
+ self.sync_handler = self.hs.get_sync_handler()
+ self.module_api = homeserver.get_module_api()
+
+ @override_config(
+ {
+ "presence": {
+ "presence_router": {
+ "module": __name__ + ".PresenceRouterTestModule",
+ "config": {
+ "users_who_should_receive_all_presence": [
+ "@presence_gobbler:test",
+ ]
+ },
+ }
+ },
+ "send_federation": True,
+ }
+ )
+ def test_receiving_all_presence(self):
+ """Test that a user that does not share a room with another other can receive
+ presence for them, due to presence routing.
+ """
+ # Create a user who should receive all presence of others
+ self.presence_receiving_user_id = self.register_user(
+ "presence_gobbler", "monkey"
+ )
+ self.presence_receiving_user_tok = self.login("presence_gobbler", "monkey")
+
+ # And two users who should not have any special routing
+ self.other_user_one_id = self.register_user("other_user_one", "monkey")
+ self.other_user_one_tok = self.login("other_user_one", "monkey")
+ self.other_user_two_id = self.register_user("other_user_two", "monkey")
+ self.other_user_two_tok = self.login("other_user_two", "monkey")
+
+ # Put the other two users in a room with each other
+ room_id = self.helper.create_room_as(
+ self.other_user_one_id, tok=self.other_user_one_tok
+ )
+
+ self.helper.invite(
+ room_id,
+ self.other_user_one_id,
+ self.other_user_two_id,
+ tok=self.other_user_one_tok,
+ )
+ self.helper.join(room_id, self.other_user_two_id, tok=self.other_user_two_tok)
+ # User one sends some presence
+ send_presence_update(
+ self,
+ self.other_user_one_id,
+ self.other_user_one_tok,
+ "online",
+ "boop",
+ )
+
+ # Check that the presence receiving user gets user one's presence when syncing
+ presence_updates, sync_token = sync_presence(
+ self, self.presence_receiving_user_id
+ )
+ self.assertEqual(len(presence_updates), 1)
+
+ presence_update = presence_updates[0] # type: UserPresenceState
+ self.assertEqual(presence_update.user_id, self.other_user_one_id)
+ self.assertEqual(presence_update.state, "online")
+ self.assertEqual(presence_update.status_msg, "boop")
+
+ # Have all three users send presence
+ send_presence_update(
+ self,
+ self.other_user_one_id,
+ self.other_user_one_tok,
+ "online",
+ "user_one",
+ )
+ send_presence_update(
+ self,
+ self.other_user_two_id,
+ self.other_user_two_tok,
+ "online",
+ "user_two",
+ )
+ send_presence_update(
+ self,
+ self.presence_receiving_user_id,
+ self.presence_receiving_user_tok,
+ "online",
+ "presence_gobbler",
+ )
+
+ # Check that the presence receiving user gets everyone's presence
+ presence_updates, _ = sync_presence(
+ self, self.presence_receiving_user_id, sync_token
+ )
+ self.assertEqual(len(presence_updates), 3)
+
+ # But that User One only get itself and User Two's presence
+ presence_updates, _ = sync_presence(self, self.other_user_one_id)
+ self.assertEqual(len(presence_updates), 2)
+
+ found = False
+ for update in presence_updates:
+ if update.user_id == self.other_user_two_id:
+ self.assertEqual(update.state, "online")
+ self.assertEqual(update.status_msg, "user_two")
+ found = True
+
+ self.assertTrue(found)
+
+ @override_config(
+ {
+ "presence": {
+ "presence_router": {
+ "module": __name__ + ".PresenceRouterTestModule",
+ "config": {
+ "users_who_should_receive_all_presence": [
+ "@presence_gobbler1:test",
+ "@presence_gobbler2:test",
+ "@far_away_person:island",
+ ]
+ },
+ }
+ },
+ "send_federation": True,
+ }
+ )
+ def test_send_local_online_presence_to_with_module(self):
+ """Tests that send_local_presence_to_users sends local online presence to a set
+ of specified local and remote users, with a custom PresenceRouter module enabled.
+ """
+ # Create a user who will send presence updates
+ self.other_user_id = self.register_user("other_user", "monkey")
+ self.other_user_tok = self.login("other_user", "monkey")
+
+ # And another two users that will also send out presence updates, as well as receive
+ # theirs and everyone else's
+ self.presence_receiving_user_one_id = self.register_user(
+ "presence_gobbler1", "monkey"
+ )
+ self.presence_receiving_user_one_tok = self.login("presence_gobbler1", "monkey")
+ self.presence_receiving_user_two_id = self.register_user(
+ "presence_gobbler2", "monkey"
+ )
+ self.presence_receiving_user_two_tok = self.login("presence_gobbler2", "monkey")
+
+ # Have all three users send some presence updates
+ send_presence_update(
+ self,
+ self.other_user_id,
+ self.other_user_tok,
+ "online",
+ "I'm online!",
+ )
+ send_presence_update(
+ self,
+ self.presence_receiving_user_one_id,
+ self.presence_receiving_user_one_tok,
+ "online",
+ "I'm also online!",
+ )
+ send_presence_update(
+ self,
+ self.presence_receiving_user_two_id,
+ self.presence_receiving_user_two_tok,
+ "unavailable",
+ "I'm in a meeting!",
+ )
+
+ # Mark each presence-receiving user for receiving all user presence
+ self.get_success(
+ self.module_api.send_local_online_presence_to(
+ [
+ self.presence_receiving_user_one_id,
+ self.presence_receiving_user_two_id,
+ ]
+ )
+ )
+
+ # Perform a sync for each user
+
+ # The other user should only receive their own presence
+ presence_updates, _ = sync_presence(self, self.other_user_id)
+ self.assertEqual(len(presence_updates), 1)
+
+ presence_update = presence_updates[0] # type: UserPresenceState
+ self.assertEqual(presence_update.user_id, self.other_user_id)
+ self.assertEqual(presence_update.state, "online")
+ self.assertEqual(presence_update.status_msg, "I'm online!")
+
+ # Whereas both presence receiving users should receive everyone's presence updates
+ presence_updates, _ = sync_presence(self, self.presence_receiving_user_one_id)
+ self.assertEqual(len(presence_updates), 3)
+ presence_updates, _ = sync_presence(self, self.presence_receiving_user_two_id)
+ self.assertEqual(len(presence_updates), 3)
+
+ # Test that sending to a remote user works
+ remote_user_id = "@far_away_person:island"
+
+ # Note that due to the remote user being in our module's
+ # users_who_should_receive_all_presence config, they would have
+ # received user presence updates already.
+ #
+ # Thus we reset the mock, and try sending all online local user
+ # presence again
+ self.hs.get_federation_transport_client().send_transaction.reset_mock()
+
+ # Broadcast local user online presence
+ self.get_success(
+ self.module_api.send_local_online_presence_to([remote_user_id])
+ )
+
+ # Check that the expected presence updates were sent
+ expected_users = [
+ self.other_user_id,
+ self.presence_receiving_user_one_id,
+ self.presence_receiving_user_two_id,
+ ]
+
+ calls = (
+ self.hs.get_federation_transport_client().send_transaction.call_args_list
+ )
+ for call in calls:
+ federation_transaction = call.args[0] # type: Transaction
+
+ # Get the sent EDUs in this transaction
+ edus = federation_transaction.get_dict()["edus"]
+
+ for edu in edus:
+ # Make sure we're only checking presence-type EDUs
+ if edu["edu_type"] != EduTypes.Presence:
+ continue
+
+ # EDUs can contain multiple presence updates
+ for presence_update in edu["content"]["push"]:
+ # Check for presence updates that contain the user IDs we're after
+ expected_users.remove(presence_update["user_id"])
+
+ # Ensure that no offline states are being sent out
+ self.assertNotEqual(presence_update["presence"], "offline")
+
+ self.assertEqual(len(expected_users), 0)
+
+
+def send_presence_update(
+ testcase: TestCase,
+ user_id: str,
+ access_token: str,
+ presence_state: str,
+ status_message: Optional[str] = None,
+) -> JsonDict:
+ # Build the presence body
+ body = {"presence": presence_state}
+ if status_message:
+ body["status_msg"] = status_message
+
+ # Update the user's presence state
+ channel = testcase.make_request(
+ "PUT", "/presence/%s/status" % (user_id,), body, access_token=access_token
+ )
+ testcase.assertEqual(channel.code, 200)
+
+ return channel.json_body
+
+
+def sync_presence(
+ testcase: TestCase,
+ user_id: str,
+ since_token: Optional[StreamToken] = None,
+) -> Tuple[List[UserPresenceState], StreamToken]:
+ """Perform a sync request for the given user and return the user presence updates
+ they've received, as well as the next_batch token.
+
+ This method assumes testcase.sync_handler points to the homeserver's sync handler.
+
+ Args:
+ testcase: The testcase that is currently being run.
+ user_id: The ID of the user to generate a sync response for.
+ since_token: An optional token to indicate from at what point to sync from.
+
+ Returns:
+ A tuple containing a list of presence updates, and the sync response's
+ next_batch token.
+ """
+ requester = create_requester(user_id)
+ sync_config = generate_sync_config(requester.user.to_string())
+ sync_result = testcase.get_success(
+ testcase.sync_handler.wait_for_sync_for_user(
+ requester, sync_config, since_token
+ )
+ )
+
+ return sync_result.presence, sync_result.next_batch
diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
index e62586142e..8e950f25c5 100644
--- a/tests/handlers/test_sync.py
+++ b/tests/handlers/test_sync.py
@@ -37,7 +37,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
def test_wait_for_sync_for_user_auth_blocking(self):
user_id1 = "@user1:test"
user_id2 = "@user2:test"
- sync_config = self._generate_sync_config(user_id1)
+ sync_config = generate_sync_config(user_id1)
requester = create_requester(user_id1)
self.reactor.advance(100) # So we get not 0 time
@@ -60,7 +60,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.auth_blocking._hs_disabled = False
- sync_config = self._generate_sync_config(user_id2)
+ sync_config = generate_sync_config(user_id2)
requester = create_requester(user_id2)
e = self.get_failure(
@@ -69,11 +69,12 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
)
self.assertEquals(e.value.errcode, Codes.RESOURCE_LIMIT_EXCEEDED)
- def _generate_sync_config(self, user_id):
- return SyncConfig(
- user=UserID(user_id.split(":")[0][1:], user_id.split(":")[1]),
- filter_collection=DEFAULT_FILTER_COLLECTION,
- is_guest=False,
- request_key="request_key",
- device_id="device_id",
- )
+
+def generate_sync_config(user_id: str) -> SyncConfig:
+ return SyncConfig(
+ user=UserID(user_id.split(":")[0][1:], user_id.split(":")[1]),
+ filter_collection=DEFAULT_FILTER_COLLECTION,
+ is_guest=False,
+ request_key="request_key",
+ device_id="device_id",
+ )
diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py
index edacd1b566..1d1fceeecf 100644
--- a/tests/module_api/test_api.py
+++ b/tests/module_api/test_api.py
@@ -14,25 +14,37 @@
# limitations under the License.
from mock import Mock
+from synapse.api.constants import EduTypes
from synapse.events import EventBase
+from synapse.federation.units import Transaction
+from synapse.handlers.presence import UserPresenceState
from synapse.rest import admin
-from synapse.rest.client.v1 import login, room
+from synapse.rest.client.v1 import login, presence, room
from synapse.types import create_requester
-from tests.unittest import HomeserverTestCase
+from tests.events.test_presence_router import send_presence_update, sync_presence
+from tests.test_utils.event_injection import inject_member_event
+from tests.unittest import FederatingHomeserverTestCase, override_config
-class ModuleApiTestCase(HomeserverTestCase):
+class ModuleApiTestCase(FederatingHomeserverTestCase):
servlets = [
admin.register_servlets,
login.register_servlets,
room.register_servlets,
+ presence.register_servlets,
]
def prepare(self, reactor, clock, homeserver):
self.store = homeserver.get_datastore()
self.module_api = homeserver.get_module_api()
self.event_creation_handler = homeserver.get_event_creation_handler()
+ self.sync_handler = homeserver.get_sync_handler()
+
+ def make_homeserver(self, reactor, clock):
+ return self.setup_test_homeserver(
+ federation_transport_client=Mock(spec=["send_transaction"]),
+ )
def test_can_register_user(self):
"""Tests that an external module can register a user"""
@@ -205,3 +217,160 @@ class ModuleApiTestCase(HomeserverTestCase):
)
)
self.assertFalse(is_in_public_rooms)
+
+ # The ability to send federation is required by send_local_online_presence_to.
+ @override_config({"send_federation": True})
+ def test_send_local_online_presence_to(self):
+ """Tests that send_local_presence_to_users sends local online presence to local users."""
+ # Create a user who will send presence updates
+ self.presence_receiver_id = self.register_user("presence_receiver", "monkey")
+ self.presence_receiver_tok = self.login("presence_receiver", "monkey")
+
+ # And another user that will send presence updates out
+ self.presence_sender_id = self.register_user("presence_sender", "monkey")
+ self.presence_sender_tok = self.login("presence_sender", "monkey")
+
+ # Put them in a room together so they will receive each other's presence updates
+ room_id = self.helper.create_room_as(
+ self.presence_receiver_id,
+ tok=self.presence_receiver_tok,
+ )
+ self.helper.join(room_id, self.presence_sender_id, tok=self.presence_sender_tok)
+
+ # Presence sender comes online
+ send_presence_update(
+ self,
+ self.presence_sender_id,
+ self.presence_sender_tok,
+ "online",
+ "I'm online!",
+ )
+
+ # Presence receiver should have received it
+ presence_updates, sync_token = sync_presence(self, self.presence_receiver_id)
+ self.assertEqual(len(presence_updates), 1)
+
+ presence_update = presence_updates[0] # type: UserPresenceState
+ self.assertEqual(presence_update.user_id, self.presence_sender_id)
+ self.assertEqual(presence_update.state, "online")
+
+ # Syncing again should result in no presence updates
+ presence_updates, sync_token = sync_presence(
+ self, self.presence_receiver_id, sync_token
+ )
+ self.assertEqual(len(presence_updates), 0)
+
+ # Trigger sending local online presence
+ self.get_success(
+ self.module_api.send_local_online_presence_to(
+ [
+ self.presence_receiver_id,
+ ]
+ )
+ )
+
+ # Presence receiver should have received online presence again
+ presence_updates, sync_token = sync_presence(
+ self, self.presence_receiver_id, sync_token
+ )
+ self.assertEqual(len(presence_updates), 1)
+
+ presence_update = presence_updates[0] # type: UserPresenceState
+ self.assertEqual(presence_update.user_id, self.presence_sender_id)
+ self.assertEqual(presence_update.state, "online")
+
+ # Presence sender goes offline
+ send_presence_update(
+ self,
+ self.presence_sender_id,
+ self.presence_sender_tok,
+ "offline",
+ "I slink back into the darkness.",
+ )
+
+ # Trigger sending local online presence
+ self.get_success(
+ self.module_api.send_local_online_presence_to(
+ [
+ self.presence_receiver_id,
+ ]
+ )
+ )
+
+ # Presence receiver should *not* have received offline state
+ presence_updates, sync_token = sync_presence(
+ self, self.presence_receiver_id, sync_token
+ )
+ self.assertEqual(len(presence_updates), 0)
+
+ @override_config({"send_federation": True})
+ def test_send_local_online_presence_to_federation(self):
+ """Tests that send_local_presence_to_users sends local online presence to remote users."""
+ # Create a user who will send presence updates
+ self.presence_sender_id = self.register_user("presence_sender", "monkey")
+ self.presence_sender_tok = self.login("presence_sender", "monkey")
+
+ # And a room they're a part of
+ room_id = self.helper.create_room_as(
+ self.presence_sender_id,
+ tok=self.presence_sender_tok,
+ )
+
+ # Mark them as online
+ send_presence_update(
+ self,
+ self.presence_sender_id,
+ self.presence_sender_tok,
+ "online",
+ "I'm online!",
+ )
+
+ # Make up a remote user to send presence to
+ remote_user_id = "@far_away_person:island"
+
+ # Create a join membership event for the remote user into the room.
+ # This allows presence information to flow from one user to the other.
+ self.get_success(
+ inject_member_event(
+ self.hs,
+ room_id,
+ sender=remote_user_id,
+ target=remote_user_id,
+ membership="join",
+ )
+ )
+
+ # The remote user would have received the existing room members' presence
+ # when they joined the room.
+ #
+ # Thus we reset the mock, and try sending online local user
+ # presence again
+ self.hs.get_federation_transport_client().send_transaction.reset_mock()
+
+ # Broadcast local user online presence
+ self.get_success(
+ self.module_api.send_local_online_presence_to([remote_user_id])
+ )
+
+ # Check that a presence update was sent as part of a federation transaction
+ found_update = False
+ calls = (
+ self.hs.get_federation_transport_client().send_transaction.call_args_list
+ )
+ for call in calls:
+ federation_transaction = call.args[0] # type: Transaction
+
+ # Get the sent EDUs in this transaction
+ edus = federation_transaction.get_dict()["edus"]
+
+ for edu in edus:
+ # Make sure we're only checking presence-type EDUs
+ if edu["edu_type"] != EduTypes.Presence:
+ continue
+
+ # EDUs can contain multiple presence updates
+ for presence_update in edu["content"]["push"]:
+ if presence_update["user_id"] == self.presence_sender_id:
+ found_update = True
+
+ self.assertTrue(found_update)
|