summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--README.rst7
-rw-r--r--changelog.d/9491.feature1
-rw-r--r--docs/presence_router_module.md235
-rw-r--r--docs/sample_config.yaml23
-rw-r--r--synapse/app/generic_worker.py3
-rw-r--r--synapse/config/server.py39
-rw-r--r--synapse/events/presence_router.py104
-rw-r--r--synapse/federation/sender/__init__.py19
-rw-r--r--synapse/handlers/presence.py278
-rw-r--r--synapse/module_api/__init__.py50
-rw-r--r--synapse/server.py5
-rw-r--r--tests/events/test_presence_router.py386
-rw-r--r--tests/handlers/test_sync.py21
-rw-r--r--tests/module_api/test_api.py175
14 files changed, 1282 insertions, 64 deletions
diff --git a/README.rst b/README.rst
index 655a2bf3be..1a5503572e 100644
--- a/README.rst
+++ b/README.rst
@@ -393,7 +393,12 @@ massive excess of outgoing federation requests (see `discussion
 indicate that your server is also issuing far more outgoing federation
 requests than can be accounted for by your users' activity, this is a
 likely cause. The misbehavior can be worked around by setting
-``use_presence: false`` in the Synapse config file.
+the following in the Synapse config file:
+
+.. code-block:: yaml
+
+   presence:
+       enabled: false
 
 People can't accept room invitations from me
 --------------------------------------------
diff --git a/changelog.d/9491.feature b/changelog.d/9491.feature
new file mode 100644
index 0000000000..8b56a95a44
--- /dev/null
+++ b/changelog.d/9491.feature
@@ -0,0 +1 @@
+Add a Synapse module for routing presence updates between users.
diff --git a/docs/presence_router_module.md b/docs/presence_router_module.md
new file mode 100644
index 0000000000..d6566d978d
--- /dev/null
+++ b/docs/presence_router_module.md
@@ -0,0 +1,235 @@
+# Presence Router Module
+
+Synapse supports configuring a module that can specify additional users
+(local or remote) to should receive certain presence updates from local
+users.
+
+Note that routing presence via Application Service transactions is not
+currently supported.
+
+The presence routing module is implemented as a Python class, which will
+be imported by the running Synapse.
+
+## Python Presence Router Class
+
+The Python class is instantiated with two objects:
+
+* A configuration object of some type (see below).
+* An instance of `synapse.module_api.ModuleApi`.
+
+It then implements methods related to presence routing.
+
+Note that one method of `ModuleApi` that may be useful is:
+
+```python
+async def ModuleApi.send_local_online_presence_to(users: Iterable[str]) -> None
+```
+
+which can be given a list of local or remote MXIDs to broadcast known, online user
+presence to (for those users that the receiving user is considered interested in). 
+It does not include state for users who are currently offline, and it can only be
+called on workers that support sending federation.
+
+### Module structure
+
+Below is a list of possible methods that can be implemented, and whether they are
+required.
+
+#### `parse_config`
+
+```python
+def parse_config(config_dict: dict) -> Any
+```
+
+**Required.** A static method that is passed a dictionary of config options, and
+  should return a validated config object. This method is described further in
+  [Configuration](#configuration).
+
+#### `get_users_for_states`
+
+```python
+async def get_users_for_states(
+    self,
+    state_updates: Iterable[UserPresenceState],
+) -> Dict[str, Set[UserPresenceState]]:
+```
+
+**Required.** An asynchronous method that is passed an iterable of user presence
+state. This method can determine whether a given presence update should be sent to certain
+users. It does this by returning a dictionary with keys representing local or remote
+Matrix User IDs, and values being a python set
+of `synapse.handlers.presence.UserPresenceState` instances.
+
+Synapse will then attempt to send the specified presence updates to each user when
+possible.
+
+#### `get_interested_users`
+
+```python
+async def get_interested_users(self, user_id: str) -> Union[Set[str], str]
+```
+
+**Required.** An asynchronous method that is passed a single Matrix User ID. This
+method is expected to return the users that the passed in user may be interested in the
+presence of. Returned users may be local or remote. The presence routed as a result of
+what this method returns is sent in addition to the updates already sent between users
+that share a room together. Presence updates are deduplicated.
+
+This method should return a python set of Matrix User IDs, or the object
+`synapse.events.presence_router.PresenceRouter.ALL_USERS` to indicate that the passed
+user should receive presence information for *all* known users.
+
+For clarity, if the user `@alice:example.org` is passed to this method, and the Set
+`{"@bob:example.com", "@charlie:somewhere.org"}` is returned, this signifies that Alice
+should receive presence updates sent by Bob and Charlie, regardless of whether these
+users share a room.
+
+### Example
+
+Below is an example implementation of a presence router class.
+
+```python
+from typing import Dict, Iterable, Set, Union
+from synapse.events.presence_router import PresenceRouter
+from synapse.handlers.presence import UserPresenceState
+from synapse.module_api import ModuleApi
+
+class PresenceRouterConfig:
+    def __init__(self):
+        # Config options with their defaults
+        # A list of users to always send all user presence updates to
+        self.always_send_to_users = []  # type: List[str]
+        
+        # A list of users to ignore presence updates for. Does not affect
+        # shared-room presence relationships
+        self.blacklisted_users = []  # type: List[str]
+
+class ExamplePresenceRouter:
+    """An example implementation of synapse.presence_router.PresenceRouter.
+    Supports routing all presence to a configured set of users, or a subset
+    of presence from certain users to members of certain rooms.
+
+    Args:
+        config: A configuration object.
+        module_api: An instance of Synapse's ModuleApi.
+    """
+    def __init__(self, config: PresenceRouterConfig, module_api: ModuleApi):
+        self._config = config
+        self._module_api = module_api
+
+    @staticmethod
+    def parse_config(config_dict: dict) -> PresenceRouterConfig:
+        """Parse a configuration dictionary from the homeserver config, do
+        some validation and return a typed PresenceRouterConfig.
+
+        Args:
+            config_dict: The configuration dictionary.
+
+        Returns:
+            A validated config object.
+        """
+        # Initialise a typed config object
+        config = PresenceRouterConfig()
+        always_send_to_users = config_dict.get("always_send_to_users")
+        blacklisted_users = config_dict.get("blacklisted_users")
+
+        # Do some validation of config options... otherwise raise a
+        # synapse.config.ConfigError.
+        config.always_send_to_users = always_send_to_users
+        config.blacklisted_users = blacklisted_users
+
+        return config
+
+    async def get_users_for_states(
+        self,
+        state_updates: Iterable[UserPresenceState],
+    ) -> Dict[str, Set[UserPresenceState]]:
+        """Given an iterable of user presence updates, determine where each one
+        needs to go. Returned results will not affect presence updates that are
+        sent between users who share a room.
+
+        Args:
+            state_updates: An iterable of user presence state updates.
+
+        Returns:
+          A dictionary of user_id -> set of UserPresenceState that the user should 
+          receive.
+        """
+        destination_users = {}  # type: Dict[str, Set[UserPresenceState]
+
+        # Ignore any updates for blacklisted users
+        desired_updates = set()
+        for update in state_updates:
+            if update.state_key not in self._config.blacklisted_users:
+                desired_updates.add(update)
+
+        # Send all presence updates to specific users
+        for user_id in self._config.always_send_to_users:
+            destination_users[user_id] = desired_updates
+
+        return destination_users
+
+    async def get_interested_users(
+        self,
+        user_id: str,
+    ) -> Union[Set[str], PresenceRouter.ALL_USERS]:
+        """
+        Retrieve a list of users that `user_id` is interested in receiving the
+        presence of. This will be in addition to those they share a room with.
+        Optionally, the object PresenceRouter.ALL_USERS can be returned to indicate
+        that this user should receive all incoming local and remote presence updates.
+
+        Note that this method will only be called for local users.
+
+        Args:
+          user_id: A user requesting presence updates.
+
+        Returns:
+          A set of user IDs to return additional presence updates for, or
+          PresenceRouter.ALL_USERS to return presence updates for all other users.
+        """
+        if user_id in self._config.always_send_to_users:
+            return PresenceRouter.ALL_USERS
+
+        return set()
+```
+
+#### A note on `get_users_for_states` and `get_interested_users`
+
+Both of these methods are effectively two different sides of the same coin. The logic
+regarding which users should receive updates for other users should be the same 
+between them.
+
+`get_users_for_states` is called when presence updates come in from either federation 
+or local users, and is used to either direct local presence to remote users, or to
+wake up the sync streams of local users to collect remote presence.
+
+In contrast, `get_interested_users` is used to determine the users that presence should
+be fetched for when a local user is syncing. This presence is then retrieved, before
+being fed through `get_users_for_states` once again, with only the syncing user's
+routing information pulled from the resulting dictionary.
+
+Their routing logic should thus line up, else you may run into unintended behaviour.
+
+## Configuration
+
+Once you've crafted your module and installed it into the same Python environment as
+Synapse, amend your homeserver config file with the following.
+
+```yaml
+presence:
+  routing_module:
+    module: my_module.ExamplePresenceRouter
+    config:
+      # Any configuration options for your module. The below is an example.
+      # of setting options for ExamplePresenceRouter.
+      always_send_to_users: ["@presence_gobbler:example.org"]
+      blacklisted_users:
+        - "@alice:example.com"
+        - "@bob:example.com"
+      ...
+```
+
+The contents of `config` will be passed as a Python dictionary to the static
+`parse_config` method of your class. The object returned by this method will
+then be passed to the `__init__` method of your module as `config`.
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index b0bf987740..9182dcd987 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -82,9 +82,28 @@ pid_file: DATADIR/homeserver.pid
 #
 #soft_file_limit: 0
 
-# Set to false to disable presence tracking on this homeserver.
+# Presence tracking allows users to see the state (e.g online/offline)
+# of other local and remote users.
 #
-#use_presence: false
+presence:
+  # Uncomment to disable presence tracking on this homeserver. This option
+  # replaces the previous top-level 'use_presence' option.
+  #
+  #enabled: false
+
+  # Presence routers are third-party modules that can specify additional logic
+  # to where presence updates from users are routed.
+  #
+  presence_router:
+    # The custom module's class. Uncomment to use a custom presence router module.
+    #
+    #module: "my_custom_router.PresenceRouter"
+
+    # Configuration options of the custom module. Refer to your module's
+    # documentation for available options.
+    #
+    #config:
+    #  example_option: 'something'
 
 # Whether to require authentication to retrieve profile data (avatars,
 # display names) of other users through the client API. Defaults to
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 3df2aa5c2b..d1c2079233 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -281,6 +281,7 @@ class GenericWorkerPresence(BasePresenceHandler):
         self.hs = hs
         self.is_mine_id = hs.is_mine_id
 
+        self.presence_router = hs.get_presence_router()
         self._presence_enabled = hs.config.use_presence
 
         # The number of ongoing syncs on this process, by user id.
@@ -395,7 +396,7 @@ class GenericWorkerPresence(BasePresenceHandler):
         return _user_syncing()
 
     async def notify_from_replication(self, states, stream_id):
-        parties = await get_interested_parties(self.store, states)
+        parties = await get_interested_parties(self.store, self.presence_router, states)
         room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 5f8910b6e1..8decc9d10d 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -27,6 +27,7 @@ import yaml
 from netaddr import AddrFormatError, IPNetwork, IPSet
 
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.util.module_loader import load_module
 from synapse.util.stringutils import parse_and_validate_server_name
 
 from ._base import Config, ConfigError
@@ -238,7 +239,20 @@ class ServerConfig(Config):
         self.public_baseurl = config.get("public_baseurl")
 
         # Whether to enable user presence.
-        self.use_presence = config.get("use_presence", True)
+        presence_config = config.get("presence") or {}
+        self.use_presence = presence_config.get("enabled")
+        if self.use_presence is None:
+            self.use_presence = config.get("use_presence", True)
+
+        # Custom presence router module
+        self.presence_router_module_class = None
+        self.presence_router_config = None
+        presence_router_config = presence_config.get("presence_router")
+        if presence_router_config:
+            (
+                self.presence_router_module_class,
+                self.presence_router_config,
+            ) = load_module(presence_router_config, ("presence", "presence_router"))
 
         # Whether to update the user directory or not. This should be set to
         # false only if we are updating the user directory in a worker
@@ -834,9 +848,28 @@ class ServerConfig(Config):
         #
         #soft_file_limit: 0
 
-        # Set to false to disable presence tracking on this homeserver.
+        # Presence tracking allows users to see the state (e.g online/offline)
+        # of other local and remote users.
         #
-        #use_presence: false
+        presence:
+          # Uncomment to disable presence tracking on this homeserver. This option
+          # replaces the previous top-level 'use_presence' option.
+          #
+          #enabled: false
+
+          # Presence routers are third-party modules that can specify additional logic
+          # to where presence updates from users are routed.
+          #
+          presence_router:
+            # The custom module's class. Uncomment to use a custom presence router module.
+            #
+            #module: "my_custom_router.PresenceRouter"
+
+            # Configuration options of the custom module. Refer to your module's
+            # documentation for available options.
+            #
+            #config:
+            #  example_option: 'something'
 
         # Whether to require authentication to retrieve profile data (avatars,
         # display names) of other users through the client API. Defaults to
diff --git a/synapse/events/presence_router.py b/synapse/events/presence_router.py
new file mode 100644
index 0000000000..24cd389d80
--- /dev/null
+++ b/synapse/events/presence_router.py
@@ -0,0 +1,104 @@
+# -*- coding: utf-8 -*-
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import TYPE_CHECKING, Dict, Iterable, Set, Union
+
+from synapse.api.presence import UserPresenceState
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+
+class PresenceRouter:
+    """
+    A module that the homeserver will call upon to help route user presence updates to
+    additional destinations. If a custom presence router is configured, calls will be
+    passed to that instead.
+    """
+
+    ALL_USERS = "ALL"
+
+    def __init__(self, hs: "HomeServer"):
+        self.custom_presence_router = None
+
+        # Check whether a custom presence router module has been configured
+        if hs.config.presence_router_module_class:
+            # Initialise the module
+            self.custom_presence_router = hs.config.presence_router_module_class(
+                config=hs.config.presence_router_config, module_api=hs.get_module_api()
+            )
+
+            # Ensure the module has implemented the required methods
+            required_methods = ["get_users_for_states", "get_interested_users"]
+            for method_name in required_methods:
+                if not hasattr(self.custom_presence_router, method_name):
+                    raise Exception(
+                        "PresenceRouter module '%s' must implement all required methods: %s"
+                        % (
+                            hs.config.presence_router_module_class.__name__,
+                            ", ".join(required_methods),
+                        )
+                    )
+
+    async def get_users_for_states(
+        self,
+        state_updates: Iterable[UserPresenceState],
+    ) -> Dict[str, Set[UserPresenceState]]:
+        """
+        Given an iterable of user presence updates, determine where each one
+        needs to go.
+
+        Args:
+            state_updates: An iterable of user presence state updates.
+
+        Returns:
+          A dictionary of user_id -> set of UserPresenceState, indicating which
+          presence updates each user should receive.
+        """
+        if self.custom_presence_router is not None:
+            # Ask the custom module
+            return await self.custom_presence_router.get_users_for_states(
+                state_updates=state_updates
+            )
+
+        # Don't include any extra destinations for presence updates
+        return {}
+
+    async def get_interested_users(self, user_id: str) -> Union[Set[str], ALL_USERS]:
+        """
+        Retrieve a list of users that `user_id` is interested in receiving the
+        presence of. This will be in addition to those they share a room with.
+        Optionally, the object PresenceRouter.ALL_USERS can be returned to indicate
+        that this user should receive all incoming local and remote presence updates.
+
+        Note that this method will only be called for local users, but can return users
+        that are local or remote.
+
+        Args:
+            user_id: A user requesting presence updates.
+
+        Returns:
+            A set of user IDs to return presence updates for, or ALL_USERS to return all
+            known updates.
+        """
+        if self.custom_presence_router is not None:
+            # Ask the custom module for interested users
+            return await self.custom_presence_router.get_interested_users(
+                user_id=user_id
+            )
+
+        # A custom presence router is not defined.
+        # Don't report any additional interested users
+        return set()
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 8babb1ebbe..98bfce22ff 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -44,6 +44,7 @@ from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
 from synapse.util.metrics import Measure, measure_func
 
 if TYPE_CHECKING:
+    from synapse.events.presence_router import PresenceRouter
     from synapse.server import HomeServer
 
 logger = logging.getLogger(__name__)
@@ -162,6 +163,7 @@ class FederationSender(AbstractFederationSender):
         self.clock = hs.get_clock()
         self.is_mine_id = hs.is_mine_id
 
+        self._presence_router = None  # type: Optional[PresenceRouter]
         self._transaction_manager = TransactionManager(hs)
 
         self._instance_name = hs.get_instance_name()
@@ -584,7 +586,22 @@ class FederationSender(AbstractFederationSender):
         """Given a list of states populate self.pending_presence_by_dest and
         poke to send a new transaction to each destination
         """
-        hosts_and_states = await get_interested_remotes(self.store, states, self.state)
+        # We pull the presence router here instead of __init__
+        # to prevent a dependency cycle:
+        #
+        # AuthHandler -> Notifier -> FederationSender
+        # -> PresenceRouter -> ModuleApi -> AuthHandler
+        if self._presence_router is None:
+            self._presence_router = self.hs.get_presence_router()
+
+        assert self._presence_router is not None
+
+        hosts_and_states = await get_interested_remotes(
+            self.store,
+            self._presence_router,
+            states,
+            self.state,
+        )
 
         for destinations, states in hosts_and_states:
             for destination in destinations:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index da92feacc9..c817f2952d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -25,7 +25,17 @@ The methods that define policy are:
 import abc
 import logging
 from contextlib import contextmanager
-from typing import TYPE_CHECKING, Dict, Iterable, List, Set, Tuple
+from typing import (
+    TYPE_CHECKING,
+    Dict,
+    FrozenSet,
+    Iterable,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Union,
+)
 
 from prometheus_client import Counter
 from typing_extensions import ContextManager
@@ -34,6 +44,7 @@ import synapse.metrics
 from synapse.api.constants import EventTypes, Membership, PresenceState
 from synapse.api.errors import SynapseError
 from synapse.api.presence import UserPresenceState
+from synapse.events.presence_router import PresenceRouter
 from synapse.logging.context import run_in_background
 from synapse.logging.utils import log_function
 from synapse.metrics import LaterGauge
@@ -42,7 +53,7 @@ from synapse.state import StateHandler
 from synapse.storage.databases.main import DataStore
 from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import _CacheContext, cached
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
 
@@ -209,6 +220,7 @@ class PresenceHandler(BasePresenceHandler):
         self.notifier = hs.get_notifier()
         self.federation = hs.get_federation_sender()
         self.state = hs.get_state_handler()
+        self.presence_router = hs.get_presence_router()
         self._presence_enabled = hs.config.use_presence
 
         federation_registry = hs.get_federation_registry()
@@ -653,7 +665,7 @@ class PresenceHandler(BasePresenceHandler):
         """
         stream_id, max_token = await self.store.update_presence(states)
 
-        parties = await get_interested_parties(self.store, states)
+        parties = await get_interested_parties(self.store, self.presence_router, states)
         room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
@@ -1041,7 +1053,12 @@ class PresenceEventSource:
         #
         #   Presence -> Notifier -> PresenceEventSource -> Presence
         #
+        # Same with get_module_api, get_presence_router
+        #
+        #   AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
         self.get_presence_handler = hs.get_presence_handler
+        self.get_module_api = hs.get_module_api
+        self.get_presence_router = hs.get_presence_router
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
         self.state = hs.get_state_handler()
@@ -1055,7 +1072,7 @@ class PresenceEventSource:
         include_offline=True,
         explicit_room_id=None,
         **kwargs
-    ):
+    ) -> Tuple[List[UserPresenceState], int]:
         # The process for getting presence events are:
         #  1. Get the rooms the user is in.
         #  2. Get the list of user in the rooms.
@@ -1068,7 +1085,17 @@ class PresenceEventSource:
         # We don't try and limit the presence updates by the current token, as
         # sending down the rare duplicate is not a concern.
 
+        user_id = user.to_string()
+        stream_change_cache = self.store.presence_stream_cache
+
         with Measure(self.clock, "presence.get_new_events"):
+            if user_id in self.get_module_api()._send_full_presence_to_local_users:
+                # This user has been specified by a module to receive all current, online
+                # user presence. Removing from_key and setting include_offline to false
+                # will do effectively this.
+                from_key = None
+                include_offline = False
+
             if from_key is not None:
                 from_key = int(from_key)
 
@@ -1091,59 +1118,209 @@ class PresenceEventSource:
                 # doesn't return. C.f. #5503.
                 return [], max_token
 
-            presence = self.get_presence_handler()
-            stream_change_cache = self.store.presence_stream_cache
-
+            # Figure out which other users this user should receive updates for
             users_interested_in = await self._get_interested_in(user, explicit_room_id)
 
-            user_ids_changed = set()  # type: Collection[str]
-            changed = None
-            if from_key:
-                changed = stream_change_cache.get_all_entities_changed(from_key)
+            # We have a set of users that we're interested in the presence of. We want to
+            # cross-reference that with the users that have actually changed their presence.
 
-            if changed is not None and len(changed) < 500:
-                assert isinstance(user_ids_changed, set)
+            # Check whether this user should see all user updates
 
-                # For small deltas, its quicker to get all changes and then
-                # work out if we share a room or they're in our presence list
-                get_updates_counter.labels("stream").inc()
-                for other_user_id in changed:
-                    if other_user_id in users_interested_in:
-                        user_ids_changed.add(other_user_id)
-            else:
-                # Too many possible updates. Find all users we can see and check
-                # if any of them have changed.
-                get_updates_counter.labels("full").inc()
+            if users_interested_in == PresenceRouter.ALL_USERS:
+                # Provide presence state for all users
+                presence_updates = await self._filter_all_presence_updates_for_user(
+                    user_id, include_offline, from_key
+                )
 
-                if from_key:
-                    user_ids_changed = stream_change_cache.get_entities_changed(
-                        users_interested_in, from_key
+                # Remove the user from the list of users to receive all presence
+                if user_id in self.get_module_api()._send_full_presence_to_local_users:
+                    self.get_module_api()._send_full_presence_to_local_users.remove(
+                        user_id
                     )
+
+                return presence_updates, max_token
+
+            # Make mypy happy. users_interested_in should now be a set
+            assert not isinstance(users_interested_in, str)
+
+            # The set of users that we're interested in and that have had a presence update.
+            # We'll actually pull the presence updates for these users at the end.
+            interested_and_updated_users = (
+                set()
+            )  # type: Union[Set[str], FrozenSet[str]]
+
+            if from_key:
+                # First get all users that have had a presence update
+                updated_users = stream_change_cache.get_all_entities_changed(from_key)
+
+                # Cross-reference users we're interested in with those that have had updates.
+                # Use a slightly-optimised method for processing smaller sets of updates.
+                if updated_users is not None and len(updated_users) < 500:
+                    # For small deltas, it's quicker to get all changes and then
+                    # cross-reference with the users we're interested in
+                    get_updates_counter.labels("stream").inc()
+                    for other_user_id in updated_users:
+                        if other_user_id in users_interested_in:
+                            # mypy thinks this variable could be a FrozenSet as it's possibly set
+                            # to one in the `get_entities_changed` call below, and `add()` is not
+                            # method on a FrozenSet. That doesn't affect us here though, as
+                            # `interested_and_updated_users` is clearly a set() above.
+                            interested_and_updated_users.add(other_user_id)  # type: ignore
                 else:
-                    user_ids_changed = users_interested_in
+                    # Too many possible updates. Find all users we can see and check
+                    # if any of them have changed.
+                    get_updates_counter.labels("full").inc()
 
-            updates = await presence.current_state_for_users(user_ids_changed)
+                    interested_and_updated_users = (
+                        stream_change_cache.get_entities_changed(
+                            users_interested_in, from_key
+                        )
+                    )
+            else:
+                # No from_key has been specified. Return the presence for all users
+                # this user is interested in
+                interested_and_updated_users = users_interested_in
+
+            # Retrieve the current presence state for each user
+            users_to_state = await self.get_presence_handler().current_state_for_users(
+                interested_and_updated_users
+            )
+            presence_updates = list(users_to_state.values())
 
-        if include_offline:
-            return (list(updates.values()), max_token)
+        # Remove the user from the list of users to receive all presence
+        if user_id in self.get_module_api()._send_full_presence_to_local_users:
+            self.get_module_api()._send_full_presence_to_local_users.remove(user_id)
+
+        if not include_offline:
+            # Filter out offline presence states
+            presence_updates = self._filter_offline_presence_state(presence_updates)
+
+        return presence_updates, max_token
+
+    async def _filter_all_presence_updates_for_user(
+        self,
+        user_id: str,
+        include_offline: bool,
+        from_key: Optional[int] = None,
+    ) -> List[UserPresenceState]:
+        """
+        Computes the presence updates a user should receive.
+
+        First pulls presence updates from the database. Then consults PresenceRouter
+        for whether any updates should be excluded by user ID.
+
+        Args:
+            user_id: The User ID of the user to compute presence updates for.
+            include_offline: Whether to include offline presence states from the results.
+            from_key: The minimum stream ID of updates to pull from the database
+                before filtering.
+
+        Returns:
+            A list of presence states for the given user to receive.
+        """
+        if from_key:
+            # Only return updates since the last sync
+            updated_users = self.store.presence_stream_cache.get_all_entities_changed(
+                from_key
+            )
+            if not updated_users:
+                updated_users = []
+
+            # Get the actual presence update for each change
+            users_to_state = await self.get_presence_handler().current_state_for_users(
+                updated_users
+            )
+            presence_updates = list(users_to_state.values())
+
+            if not include_offline:
+                # Filter out offline states
+                presence_updates = self._filter_offline_presence_state(presence_updates)
         else:
-            return (
-                [s for s in updates.values() if s.state != PresenceState.OFFLINE],
-                max_token,
+            users_to_state = await self.store.get_presence_for_all_users(
+                include_offline=include_offline
             )
 
+            presence_updates = list(users_to_state.values())
+
+        # TODO: This feels wildly inefficient, and it's unfortunate we need to ask the
+        # module for information on a number of users when we then only take the info
+        # for a single user
+
+        # Filter through the presence router
+        users_to_state_set = await self.get_presence_router().get_users_for_states(
+            presence_updates
+        )
+
+        # We only want the mapping for the syncing user
+        presence_updates = list(users_to_state_set[user_id])
+
+        # Return presence information for all users
+        return presence_updates
+
+    def _filter_offline_presence_state(
+        self, presence_updates: Iterable[UserPresenceState]
+    ) -> List[UserPresenceState]:
+        """Given an iterable containing user presence updates, return a list with any offline
+        presence states removed.
+
+        Args:
+            presence_updates: Presence states to filter
+
+        Returns:
+            A new list with any offline presence states removed.
+        """
+        return [
+            update
+            for update in presence_updates
+            if update.state != PresenceState.OFFLINE
+        ]
+
     def get_current_key(self):
         return self.store.get_current_presence_token()
 
     @cached(num_args=2, cache_context=True)
-    async def _get_interested_in(self, user, explicit_room_id, cache_context):
+    async def _get_interested_in(
+        self,
+        user: UserID,
+        explicit_room_id: Optional[str] = None,
+        cache_context: Optional[_CacheContext] = None,
+    ) -> Union[Set[str], str]:
         """Returns the set of users that the given user should see presence
-        updates for
+        updates for.
+
+        Args:
+            user: The user to retrieve presence updates for.
+            explicit_room_id: The users that are in the room will be returned.
+
+        Returns:
+            A set of user IDs to return presence updates for, or "ALL" to return all
+            known updates.
         """
         user_id = user.to_string()
         users_interested_in = set()
         users_interested_in.add(user_id)  # So that we receive our own presence
 
+        # cache_context isn't likely to ever be None due to the @cached decorator,
+        # but we can't have a non-optional argument after the optional argument
+        # explicit_room_id either. Assert cache_context is not None so we can use it
+        # without mypy complaining.
+        assert cache_context
+
+        # Check with the presence router whether we should poll additional users for
+        # their presence information
+        additional_users = await self.get_presence_router().get_interested_users(
+            user.to_string()
+        )
+        if additional_users == PresenceRouter.ALL_USERS:
+            # If the module requested that this user see the presence updates of *all*
+            # users, then simply return that instead of calculating what rooms this
+            # user shares
+            return PresenceRouter.ALL_USERS
+
+        # Add the additional users from the router
+        users_interested_in.update(additional_users)
+
+        # Find the users who share a room with this user
         users_who_share_room = await self.store.get_users_who_share_room_with_user(
             user_id, on_invalidate=cache_context.invalidate
         )
@@ -1314,14 +1491,15 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
 
 
 async def get_interested_parties(
-    store: DataStore, states: List[UserPresenceState]
+    store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState]
 ) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
     """Given a list of states return which entities (rooms, users)
     are interested in the given states.
 
     Args:
-        store
-        states
+        store: The homeserver's data store.
+        presence_router: A module for augmenting the destinations for presence updates.
+        states: A list of incoming user presence updates.
 
     Returns:
         A 2-tuple of `(room_ids_to_states, users_to_states)`,
@@ -1337,11 +1515,22 @@ async def get_interested_parties(
         # Always notify self
         users_to_states.setdefault(state.user_id, []).append(state)
 
+    # Ask a presence routing module for any additional parties if one
+    # is loaded.
+    router_users_to_states = await presence_router.get_users_for_states(states)
+
+    # Update the dictionaries with additional destinations and state to send
+    for user_id, user_states in router_users_to_states.items():
+        users_to_states.setdefault(user_id, []).extend(user_states)
+
     return room_ids_to_states, users_to_states
 
 
 async def get_interested_remotes(
-    store: DataStore, states: List[UserPresenceState], state_handler: StateHandler
+    store: DataStore,
+    presence_router: PresenceRouter,
+    states: List[UserPresenceState],
+    state_handler: StateHandler,
 ) -> List[Tuple[Collection[str], List[UserPresenceState]]]:
     """Given a list of presence states figure out which remote servers
     should be sent which.
@@ -1349,9 +1538,10 @@ async def get_interested_remotes(
     All the presence states should be for local users only.
 
     Args:
-        store
-        states
-        state_handler
+        store: The homeserver's data store.
+        presence_router: A module for augmenting the destinations for presence updates.
+        states: A list of incoming user presence updates.
+        state_handler:
 
     Returns:
         A list of 2-tuples of destinations and states, where for
@@ -1363,7 +1553,9 @@ async def get_interested_remotes(
     # First we look up the rooms each user is in (as well as any explicit
     # subscriptions), then for each distinct room we look up the remote
     # hosts in those rooms.
-    room_ids_to_states, users_to_states = await get_interested_parties(store, states)
+    room_ids_to_states, users_to_states = await get_interested_parties(
+        store, presence_router, states
+    )
 
     for room_id, states in room_ids_to_states.items():
         hosts = await state_handler.get_current_hosts_in_room(room_id)
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 781e02fbbb..3ecd46c038 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -50,11 +50,20 @@ class ModuleApi:
         self._auth = hs.get_auth()
         self._auth_handler = auth_handler
         self._server_name = hs.hostname
+        self._presence_stream = hs.get_event_sources().sources["presence"]
 
         # We expose these as properties below in order to attach a helpful docstring.
         self._http_client = hs.get_simple_http_client()  # type: SimpleHttpClient
         self._public_room_list_manager = PublicRoomListManager(hs)
 
+        # The next time these users sync, they will receive the current presence
+        # state of all local users. Users are added by send_local_online_presence_to,
+        # and removed after a successful sync.
+        #
+        # We make this a private variable to deter modules from accessing it directly,
+        # though other classes in Synapse will still do so.
+        self._send_full_presence_to_local_users = set()
+
     @property
     def http_client(self):
         """Allows making outbound HTTP requests to remote resources.
@@ -385,6 +394,47 @@ class ModuleApi:
 
         return event
 
+    async def send_local_online_presence_to(self, users: Iterable[str]) -> None:
+        """
+        Forces the equivalent of a presence initial_sync for a set of local or remote
+        users. The users will receive presence for all currently online users that they
+        are considered interested in.
+
+        Updates to remote users will be sent immediately, whereas local users will receive
+        them on their next sync attempt.
+
+        Note that this method can only be run on the main or federation_sender worker
+        processes.
+        """
+        if not self._hs.should_send_federation():
+            raise Exception(
+                "send_local_online_presence_to can only be run "
+                "on processes that send federation",
+            )
+
+        for user in users:
+            if self._hs.is_mine_id(user):
+                # Modify SyncHandler._generate_sync_entry_for_presence to call
+                # presence_source.get_new_events with an empty `from_key` if
+                # that user's ID were in a list modified by ModuleApi somewhere.
+                # That user would then get all presence state on next incremental sync.
+
+                # Force a presence initial_sync for this user next time
+                self._send_full_presence_to_local_users.add(user)
+            else:
+                # Retrieve presence state for currently online users that this user
+                # is considered interested in
+                presence_events, _ = await self._presence_stream.get_new_events(
+                    UserID.from_string(user), from_key=None, include_offline=False
+                )
+
+                # Send to remote destinations
+                await make_deferred_yieldable(
+                    # We pull the federation sender here as we can only do so on workers
+                    # that support sending presence
+                    self._hs.get_federation_sender().send_presence(presence_events)
+                )
+
 
 class PublicRoomListManager:
     """Contains methods for adding to, removing from and querying whether a room
diff --git a/synapse/server.py b/synapse/server.py
index e42f7b1a18..cfb55c230d 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -51,6 +51,7 @@ from synapse.crypto import context_factory
 from synapse.crypto.context_factory import RegularPolicyForHTTPS
 from synapse.crypto.keyring import Keyring
 from synapse.events.builder import EventBuilderFactory
+from synapse.events.presence_router import PresenceRouter
 from synapse.events.spamcheck import SpamChecker
 from synapse.events.third_party_rules import ThirdPartyEventRules
 from synapse.events.utils import EventClientSerializer
@@ -426,6 +427,10 @@ class HomeServer(metaclass=abc.ABCMeta):
             raise Exception("Workers cannot write typing")
 
     @cache_in_self
+    def get_presence_router(self) -> PresenceRouter:
+        return PresenceRouter(self)
+
+    @cache_in_self
     def get_typing_handler(self) -> FollowerTypingHandler:
         if self.config.worker.writers.typing == self.get_instance_name():
             # Use get_typing_writer_handler to ensure that we use the same
diff --git a/tests/events/test_presence_router.py b/tests/events/test_presence_router.py
new file mode 100644
index 0000000000..c6e547f11c
--- /dev/null
+++ b/tests/events/test_presence_router.py
@@ -0,0 +1,386 @@
+# -*- coding: utf-8 -*-
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import Dict, Iterable, List, Optional, Set, Tuple, Union
+
+from mock import Mock
+
+import attr
+
+from synapse.api.constants import EduTypes
+from synapse.events.presence_router import PresenceRouter
+from synapse.federation.units import Transaction
+from synapse.handlers.presence import UserPresenceState
+from synapse.module_api import ModuleApi
+from synapse.rest import admin
+from synapse.rest.client.v1 import login, presence, room
+from synapse.types import JsonDict, StreamToken, create_requester
+
+from tests.handlers.test_sync import generate_sync_config
+from tests.unittest import FederatingHomeserverTestCase, TestCase, override_config
+
+
+@attr.s
+class PresenceRouterTestConfig:
+    users_who_should_receive_all_presence = attr.ib(type=List[str], default=[])
+
+
+class PresenceRouterTestModule:
+    def __init__(self, config: PresenceRouterTestConfig, module_api: ModuleApi):
+        self._config = config
+        self._module_api = module_api
+
+    async def get_users_for_states(
+        self, state_updates: Iterable[UserPresenceState]
+    ) -> Dict[str, Set[UserPresenceState]]:
+        users_to_state = {
+            user_id: set(state_updates)
+            for user_id in self._config.users_who_should_receive_all_presence
+        }
+        return users_to_state
+
+    async def get_interested_users(
+        self, user_id: str
+    ) -> Union[Set[str], PresenceRouter.ALL_USERS]:
+        if user_id in self._config.users_who_should_receive_all_presence:
+            return PresenceRouter.ALL_USERS
+
+        return set()
+
+    @staticmethod
+    def parse_config(config_dict: dict) -> PresenceRouterTestConfig:
+        """Parse a configuration dictionary from the homeserver config, do
+        some validation and return a typed PresenceRouterConfig.
+
+        Args:
+            config_dict: The configuration dictionary.
+
+        Returns:
+            A validated config object.
+        """
+        # Initialise a typed config object
+        config = PresenceRouterTestConfig()
+
+        config.users_who_should_receive_all_presence = config_dict.get(
+            "users_who_should_receive_all_presence"
+        )
+
+        return config
+
+
+class PresenceRouterTestCase(FederatingHomeserverTestCase):
+    servlets = [
+        admin.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+        presence.register_servlets,
+    ]
+
+    def make_homeserver(self, reactor, clock):
+        return self.setup_test_homeserver(
+            federation_transport_client=Mock(spec=["send_transaction"]),
+        )
+
+    def prepare(self, reactor, clock, homeserver):
+        self.sync_handler = self.hs.get_sync_handler()
+        self.module_api = homeserver.get_module_api()
+
+    @override_config(
+        {
+            "presence": {
+                "presence_router": {
+                    "module": __name__ + ".PresenceRouterTestModule",
+                    "config": {
+                        "users_who_should_receive_all_presence": [
+                            "@presence_gobbler:test",
+                        ]
+                    },
+                }
+            },
+            "send_federation": True,
+        }
+    )
+    def test_receiving_all_presence(self):
+        """Test that a user that does not share a room with another other can receive
+        presence for them, due to presence routing.
+        """
+        # Create a user who should receive all presence of others
+        self.presence_receiving_user_id = self.register_user(
+            "presence_gobbler", "monkey"
+        )
+        self.presence_receiving_user_tok = self.login("presence_gobbler", "monkey")
+
+        # And two users who should not have any special routing
+        self.other_user_one_id = self.register_user("other_user_one", "monkey")
+        self.other_user_one_tok = self.login("other_user_one", "monkey")
+        self.other_user_two_id = self.register_user("other_user_two", "monkey")
+        self.other_user_two_tok = self.login("other_user_two", "monkey")
+
+        # Put the other two users in a room with each other
+        room_id = self.helper.create_room_as(
+            self.other_user_one_id, tok=self.other_user_one_tok
+        )
+
+        self.helper.invite(
+            room_id,
+            self.other_user_one_id,
+            self.other_user_two_id,
+            tok=self.other_user_one_tok,
+        )
+        self.helper.join(room_id, self.other_user_two_id, tok=self.other_user_two_tok)
+        # User one sends some presence
+        send_presence_update(
+            self,
+            self.other_user_one_id,
+            self.other_user_one_tok,
+            "online",
+            "boop",
+        )
+
+        # Check that the presence receiving user gets user one's presence when syncing
+        presence_updates, sync_token = sync_presence(
+            self, self.presence_receiving_user_id
+        )
+        self.assertEqual(len(presence_updates), 1)
+
+        presence_update = presence_updates[0]  # type: UserPresenceState
+        self.assertEqual(presence_update.user_id, self.other_user_one_id)
+        self.assertEqual(presence_update.state, "online")
+        self.assertEqual(presence_update.status_msg, "boop")
+
+        # Have all three users send presence
+        send_presence_update(
+            self,
+            self.other_user_one_id,
+            self.other_user_one_tok,
+            "online",
+            "user_one",
+        )
+        send_presence_update(
+            self,
+            self.other_user_two_id,
+            self.other_user_two_tok,
+            "online",
+            "user_two",
+        )
+        send_presence_update(
+            self,
+            self.presence_receiving_user_id,
+            self.presence_receiving_user_tok,
+            "online",
+            "presence_gobbler",
+        )
+
+        # Check that the presence receiving user gets everyone's presence
+        presence_updates, _ = sync_presence(
+            self, self.presence_receiving_user_id, sync_token
+        )
+        self.assertEqual(len(presence_updates), 3)
+
+        # But that User One only get itself and User Two's presence
+        presence_updates, _ = sync_presence(self, self.other_user_one_id)
+        self.assertEqual(len(presence_updates), 2)
+
+        found = False
+        for update in presence_updates:
+            if update.user_id == self.other_user_two_id:
+                self.assertEqual(update.state, "online")
+                self.assertEqual(update.status_msg, "user_two")
+                found = True
+
+        self.assertTrue(found)
+
+    @override_config(
+        {
+            "presence": {
+                "presence_router": {
+                    "module": __name__ + ".PresenceRouterTestModule",
+                    "config": {
+                        "users_who_should_receive_all_presence": [
+                            "@presence_gobbler1:test",
+                            "@presence_gobbler2:test",
+                            "@far_away_person:island",
+                        ]
+                    },
+                }
+            },
+            "send_federation": True,
+        }
+    )
+    def test_send_local_online_presence_to_with_module(self):
+        """Tests that send_local_presence_to_users sends local online presence to a set
+        of specified local and remote users, with a custom PresenceRouter module enabled.
+        """
+        # Create a user who will send presence updates
+        self.other_user_id = self.register_user("other_user", "monkey")
+        self.other_user_tok = self.login("other_user", "monkey")
+
+        # And another two users that will also send out presence updates, as well as receive
+        # theirs and everyone else's
+        self.presence_receiving_user_one_id = self.register_user(
+            "presence_gobbler1", "monkey"
+        )
+        self.presence_receiving_user_one_tok = self.login("presence_gobbler1", "monkey")
+        self.presence_receiving_user_two_id = self.register_user(
+            "presence_gobbler2", "monkey"
+        )
+        self.presence_receiving_user_two_tok = self.login("presence_gobbler2", "monkey")
+
+        # Have all three users send some presence updates
+        send_presence_update(
+            self,
+            self.other_user_id,
+            self.other_user_tok,
+            "online",
+            "I'm online!",
+        )
+        send_presence_update(
+            self,
+            self.presence_receiving_user_one_id,
+            self.presence_receiving_user_one_tok,
+            "online",
+            "I'm also online!",
+        )
+        send_presence_update(
+            self,
+            self.presence_receiving_user_two_id,
+            self.presence_receiving_user_two_tok,
+            "unavailable",
+            "I'm in a meeting!",
+        )
+
+        # Mark each presence-receiving user for receiving all user presence
+        self.get_success(
+            self.module_api.send_local_online_presence_to(
+                [
+                    self.presence_receiving_user_one_id,
+                    self.presence_receiving_user_two_id,
+                ]
+            )
+        )
+
+        # Perform a sync for each user
+
+        # The other user should only receive their own presence
+        presence_updates, _ = sync_presence(self, self.other_user_id)
+        self.assertEqual(len(presence_updates), 1)
+
+        presence_update = presence_updates[0]  # type: UserPresenceState
+        self.assertEqual(presence_update.user_id, self.other_user_id)
+        self.assertEqual(presence_update.state, "online")
+        self.assertEqual(presence_update.status_msg, "I'm online!")
+
+        # Whereas both presence receiving users should receive everyone's presence updates
+        presence_updates, _ = sync_presence(self, self.presence_receiving_user_one_id)
+        self.assertEqual(len(presence_updates), 3)
+        presence_updates, _ = sync_presence(self, self.presence_receiving_user_two_id)
+        self.assertEqual(len(presence_updates), 3)
+
+        # Test that sending to a remote user works
+        remote_user_id = "@far_away_person:island"
+
+        # Note that due to the remote user being in our module's
+        # users_who_should_receive_all_presence config, they would have
+        # received user presence updates already.
+        #
+        # Thus we reset the mock, and try sending all online local user
+        # presence again
+        self.hs.get_federation_transport_client().send_transaction.reset_mock()
+
+        # Broadcast local user online presence
+        self.get_success(
+            self.module_api.send_local_online_presence_to([remote_user_id])
+        )
+
+        # Check that the expected presence updates were sent
+        expected_users = [
+            self.other_user_id,
+            self.presence_receiving_user_one_id,
+            self.presence_receiving_user_two_id,
+        ]
+
+        calls = (
+            self.hs.get_federation_transport_client().send_transaction.call_args_list
+        )
+        for call in calls:
+            federation_transaction = call.args[0]  # type: Transaction
+
+            # Get the sent EDUs in this transaction
+            edus = federation_transaction.get_dict()["edus"]
+
+            for edu in edus:
+                # Make sure we're only checking presence-type EDUs
+                if edu["edu_type"] != EduTypes.Presence:
+                    continue
+
+                # EDUs can contain multiple presence updates
+                for presence_update in edu["content"]["push"]:
+                    # Check for presence updates that contain the user IDs we're after
+                    expected_users.remove(presence_update["user_id"])
+
+                    # Ensure that no offline states are being sent out
+                    self.assertNotEqual(presence_update["presence"], "offline")
+
+        self.assertEqual(len(expected_users), 0)
+
+
+def send_presence_update(
+    testcase: TestCase,
+    user_id: str,
+    access_token: str,
+    presence_state: str,
+    status_message: Optional[str] = None,
+) -> JsonDict:
+    # Build the presence body
+    body = {"presence": presence_state}
+    if status_message:
+        body["status_msg"] = status_message
+
+    # Update the user's presence state
+    channel = testcase.make_request(
+        "PUT", "/presence/%s/status" % (user_id,), body, access_token=access_token
+    )
+    testcase.assertEqual(channel.code, 200)
+
+    return channel.json_body
+
+
+def sync_presence(
+    testcase: TestCase,
+    user_id: str,
+    since_token: Optional[StreamToken] = None,
+) -> Tuple[List[UserPresenceState], StreamToken]:
+    """Perform a sync request for the given user and return the user presence updates
+    they've received, as well as the next_batch token.
+
+    This method assumes testcase.sync_handler points to the homeserver's sync handler.
+
+    Args:
+        testcase: The testcase that is currently being run.
+        user_id: The ID of the user to generate a sync response for.
+        since_token: An optional token to indicate from at what point to sync from.
+
+    Returns:
+        A tuple containing a list of presence updates, and the sync response's
+        next_batch token.
+    """
+    requester = create_requester(user_id)
+    sync_config = generate_sync_config(requester.user.to_string())
+    sync_result = testcase.get_success(
+        testcase.sync_handler.wait_for_sync_for_user(
+            requester, sync_config, since_token
+        )
+    )
+
+    return sync_result.presence, sync_result.next_batch
diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
index e62586142e..8e950f25c5 100644
--- a/tests/handlers/test_sync.py
+++ b/tests/handlers/test_sync.py
@@ -37,7 +37,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
     def test_wait_for_sync_for_user_auth_blocking(self):
         user_id1 = "@user1:test"
         user_id2 = "@user2:test"
-        sync_config = self._generate_sync_config(user_id1)
+        sync_config = generate_sync_config(user_id1)
         requester = create_requester(user_id1)
 
         self.reactor.advance(100)  # So we get not 0 time
@@ -60,7 +60,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
 
         self.auth_blocking._hs_disabled = False
 
-        sync_config = self._generate_sync_config(user_id2)
+        sync_config = generate_sync_config(user_id2)
         requester = create_requester(user_id2)
 
         e = self.get_failure(
@@ -69,11 +69,12 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
         )
         self.assertEquals(e.value.errcode, Codes.RESOURCE_LIMIT_EXCEEDED)
 
-    def _generate_sync_config(self, user_id):
-        return SyncConfig(
-            user=UserID(user_id.split(":")[0][1:], user_id.split(":")[1]),
-            filter_collection=DEFAULT_FILTER_COLLECTION,
-            is_guest=False,
-            request_key="request_key",
-            device_id="device_id",
-        )
+
+def generate_sync_config(user_id: str) -> SyncConfig:
+    return SyncConfig(
+        user=UserID(user_id.split(":")[0][1:], user_id.split(":")[1]),
+        filter_collection=DEFAULT_FILTER_COLLECTION,
+        is_guest=False,
+        request_key="request_key",
+        device_id="device_id",
+    )
diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py
index edacd1b566..1d1fceeecf 100644
--- a/tests/module_api/test_api.py
+++ b/tests/module_api/test_api.py
@@ -14,25 +14,37 @@
 # limitations under the License.
 from mock import Mock
 
+from synapse.api.constants import EduTypes
 from synapse.events import EventBase
+from synapse.federation.units import Transaction
+from synapse.handlers.presence import UserPresenceState
 from synapse.rest import admin
-from synapse.rest.client.v1 import login, room
+from synapse.rest.client.v1 import login, presence, room
 from synapse.types import create_requester
 
-from tests.unittest import HomeserverTestCase
+from tests.events.test_presence_router import send_presence_update, sync_presence
+from tests.test_utils.event_injection import inject_member_event
+from tests.unittest import FederatingHomeserverTestCase, override_config
 
 
-class ModuleApiTestCase(HomeserverTestCase):
+class ModuleApiTestCase(FederatingHomeserverTestCase):
     servlets = [
         admin.register_servlets,
         login.register_servlets,
         room.register_servlets,
+        presence.register_servlets,
     ]
 
     def prepare(self, reactor, clock, homeserver):
         self.store = homeserver.get_datastore()
         self.module_api = homeserver.get_module_api()
         self.event_creation_handler = homeserver.get_event_creation_handler()
+        self.sync_handler = homeserver.get_sync_handler()
+
+    def make_homeserver(self, reactor, clock):
+        return self.setup_test_homeserver(
+            federation_transport_client=Mock(spec=["send_transaction"]),
+        )
 
     def test_can_register_user(self):
         """Tests that an external module can register a user"""
@@ -205,3 +217,160 @@ class ModuleApiTestCase(HomeserverTestCase):
             )
         )
         self.assertFalse(is_in_public_rooms)
+
+    # The ability to send federation is required by send_local_online_presence_to.
+    @override_config({"send_federation": True})
+    def test_send_local_online_presence_to(self):
+        """Tests that send_local_presence_to_users sends local online presence to local users."""
+        # Create a user who will send presence updates
+        self.presence_receiver_id = self.register_user("presence_receiver", "monkey")
+        self.presence_receiver_tok = self.login("presence_receiver", "monkey")
+
+        # And another user that will send presence updates out
+        self.presence_sender_id = self.register_user("presence_sender", "monkey")
+        self.presence_sender_tok = self.login("presence_sender", "monkey")
+
+        # Put them in a room together so they will receive each other's presence updates
+        room_id = self.helper.create_room_as(
+            self.presence_receiver_id,
+            tok=self.presence_receiver_tok,
+        )
+        self.helper.join(room_id, self.presence_sender_id, tok=self.presence_sender_tok)
+
+        # Presence sender comes online
+        send_presence_update(
+            self,
+            self.presence_sender_id,
+            self.presence_sender_tok,
+            "online",
+            "I'm online!",
+        )
+
+        # Presence receiver should have received it
+        presence_updates, sync_token = sync_presence(self, self.presence_receiver_id)
+        self.assertEqual(len(presence_updates), 1)
+
+        presence_update = presence_updates[0]  # type: UserPresenceState
+        self.assertEqual(presence_update.user_id, self.presence_sender_id)
+        self.assertEqual(presence_update.state, "online")
+
+        # Syncing again should result in no presence updates
+        presence_updates, sync_token = sync_presence(
+            self, self.presence_receiver_id, sync_token
+        )
+        self.assertEqual(len(presence_updates), 0)
+
+        # Trigger sending local online presence
+        self.get_success(
+            self.module_api.send_local_online_presence_to(
+                [
+                    self.presence_receiver_id,
+                ]
+            )
+        )
+
+        # Presence receiver should have received online presence again
+        presence_updates, sync_token = sync_presence(
+            self, self.presence_receiver_id, sync_token
+        )
+        self.assertEqual(len(presence_updates), 1)
+
+        presence_update = presence_updates[0]  # type: UserPresenceState
+        self.assertEqual(presence_update.user_id, self.presence_sender_id)
+        self.assertEqual(presence_update.state, "online")
+
+        # Presence sender goes offline
+        send_presence_update(
+            self,
+            self.presence_sender_id,
+            self.presence_sender_tok,
+            "offline",
+            "I slink back into the darkness.",
+        )
+
+        # Trigger sending local online presence
+        self.get_success(
+            self.module_api.send_local_online_presence_to(
+                [
+                    self.presence_receiver_id,
+                ]
+            )
+        )
+
+        # Presence receiver should *not* have received offline state
+        presence_updates, sync_token = sync_presence(
+            self, self.presence_receiver_id, sync_token
+        )
+        self.assertEqual(len(presence_updates), 0)
+
+    @override_config({"send_federation": True})
+    def test_send_local_online_presence_to_federation(self):
+        """Tests that send_local_presence_to_users sends local online presence to remote users."""
+        # Create a user who will send presence updates
+        self.presence_sender_id = self.register_user("presence_sender", "monkey")
+        self.presence_sender_tok = self.login("presence_sender", "monkey")
+
+        # And a room they're a part of
+        room_id = self.helper.create_room_as(
+            self.presence_sender_id,
+            tok=self.presence_sender_tok,
+        )
+
+        # Mark them as online
+        send_presence_update(
+            self,
+            self.presence_sender_id,
+            self.presence_sender_tok,
+            "online",
+            "I'm online!",
+        )
+
+        # Make up a remote user to send presence to
+        remote_user_id = "@far_away_person:island"
+
+        # Create a join membership event for the remote user into the room.
+        # This allows presence information to flow from one user to the other.
+        self.get_success(
+            inject_member_event(
+                self.hs,
+                room_id,
+                sender=remote_user_id,
+                target=remote_user_id,
+                membership="join",
+            )
+        )
+
+        # The remote user would have received the existing room members' presence
+        # when they joined the room.
+        #
+        # Thus we reset the mock, and try sending online local user
+        # presence again
+        self.hs.get_federation_transport_client().send_transaction.reset_mock()
+
+        # Broadcast local user online presence
+        self.get_success(
+            self.module_api.send_local_online_presence_to([remote_user_id])
+        )
+
+        # Check that a presence update was sent as part of a federation transaction
+        found_update = False
+        calls = (
+            self.hs.get_federation_transport_client().send_transaction.call_args_list
+        )
+        for call in calls:
+            federation_transaction = call.args[0]  # type: Transaction
+
+            # Get the sent EDUs in this transaction
+            edus = federation_transaction.get_dict()["edus"]
+
+            for edu in edus:
+                # Make sure we're only checking presence-type EDUs
+                if edu["edu_type"] != EduTypes.Presence:
+                    continue
+
+                # EDUs can contain multiple presence updates
+                for presence_update in edu["content"]["push"]:
+                    if presence_update["user_id"] == self.presence_sender_id:
+                        found_update = True
+
+        self.assertTrue(found_update)