summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorAndrew Morgan <1342360+anoadragon453@users.noreply.github.com>2021-04-06 14:38:30 +0100
committerGitHub <noreply@github.com>2021-04-06 14:38:30 +0100
commit04819239bae2b39ee42bfdb6f9b83c6d9fe34169 (patch)
tree3dc49ad3938fd456c3df1321f515a76e3a380ae6 /synapse
parentAdd type hints to expiring cache. (#9730) (diff)
downloadsynapse-04819239bae2b39ee42bfdb6f9b83c6d9fe34169.tar.xz
Add a Synapse Module for configuring presence update routing (#9491)
At the moment, if you'd like to share presence between local or remote users, those users must be sharing a room together. This isn't always the most convenient or useful situation though.

This PR adds a module to Synapse that will allow deployments to set up extra logic on where presence updates should be routed. The module must implement two methods, `get_users_for_states` and `get_interested_users`. These methods are given presence updates or user IDs and must return information that Synapse will use to grant passing presence updates around.

A method is additionally added to `ModuleApi` which allows triggering a set of users to receive the current, online presence information for all users they are considered interested in. This is the equivalent of that user receiving presence information during an initial sync. 

The goal of this module is to be fairly generic and useful for a variety of applications, with hard requirements being:

* Sending state for a specific set or all known users to a defined set of local and remote users.
* The ability to trigger an initial sync for specific users, so they receive all current state.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/generic_worker.py3
-rw-r--r--synapse/config/server.py39
-rw-r--r--synapse/events/presence_router.py104
-rw-r--r--synapse/federation/sender/__init__.py19
-rw-r--r--synapse/handlers/presence.py278
-rw-r--r--synapse/module_api/__init__.py50
-rw-r--r--synapse/server.py5
7 files changed, 450 insertions, 48 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 3df2aa5c2b..d1c2079233 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -281,6 +281,7 @@ class GenericWorkerPresence(BasePresenceHandler):
         self.hs = hs
         self.is_mine_id = hs.is_mine_id
 
+        self.presence_router = hs.get_presence_router()
         self._presence_enabled = hs.config.use_presence
 
         # The number of ongoing syncs on this process, by user id.
@@ -395,7 +396,7 @@ class GenericWorkerPresence(BasePresenceHandler):
         return _user_syncing()
 
     async def notify_from_replication(self, states, stream_id):
-        parties = await get_interested_parties(self.store, states)
+        parties = await get_interested_parties(self.store, self.presence_router, states)
         room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 5f8910b6e1..8decc9d10d 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -27,6 +27,7 @@ import yaml
 from netaddr import AddrFormatError, IPNetwork, IPSet
 
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.util.module_loader import load_module
 from synapse.util.stringutils import parse_and_validate_server_name
 
 from ._base import Config, ConfigError
@@ -238,7 +239,20 @@ class ServerConfig(Config):
         self.public_baseurl = config.get("public_baseurl")
 
         # Whether to enable user presence.
-        self.use_presence = config.get("use_presence", True)
+        presence_config = config.get("presence") or {}
+        self.use_presence = presence_config.get("enabled")
+        if self.use_presence is None:
+            self.use_presence = config.get("use_presence", True)
+
+        # Custom presence router module
+        self.presence_router_module_class = None
+        self.presence_router_config = None
+        presence_router_config = presence_config.get("presence_router")
+        if presence_router_config:
+            (
+                self.presence_router_module_class,
+                self.presence_router_config,
+            ) = load_module(presence_router_config, ("presence", "presence_router"))
 
         # Whether to update the user directory or not. This should be set to
         # false only if we are updating the user directory in a worker
@@ -834,9 +848,28 @@ class ServerConfig(Config):
         #
         #soft_file_limit: 0
 
-        # Set to false to disable presence tracking on this homeserver.
+        # Presence tracking allows users to see the state (e.g online/offline)
+        # of other local and remote users.
         #
-        #use_presence: false
+        presence:
+          # Uncomment to disable presence tracking on this homeserver. This option
+          # replaces the previous top-level 'use_presence' option.
+          #
+          #enabled: false
+
+          # Presence routers are third-party modules that can specify additional logic
+          # to where presence updates from users are routed.
+          #
+          presence_router:
+            # The custom module's class. Uncomment to use a custom presence router module.
+            #
+            #module: "my_custom_router.PresenceRouter"
+
+            # Configuration options of the custom module. Refer to your module's
+            # documentation for available options.
+            #
+            #config:
+            #  example_option: 'something'
 
         # Whether to require authentication to retrieve profile data (avatars,
         # display names) of other users through the client API. Defaults to
diff --git a/synapse/events/presence_router.py b/synapse/events/presence_router.py
new file mode 100644
index 0000000000..24cd389d80
--- /dev/null
+++ b/synapse/events/presence_router.py
@@ -0,0 +1,104 @@
+# -*- coding: utf-8 -*-
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import TYPE_CHECKING, Dict, Iterable, Set, Union
+
+from synapse.api.presence import UserPresenceState
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+
+class PresenceRouter:
+    """
+    A module that the homeserver will call upon to help route user presence updates to
+    additional destinations. If a custom presence router is configured, calls will be
+    passed to that instead.
+    """
+
+    ALL_USERS = "ALL"
+
+    def __init__(self, hs: "HomeServer"):
+        self.custom_presence_router = None
+
+        # Check whether a custom presence router module has been configured
+        if hs.config.presence_router_module_class:
+            # Initialise the module
+            self.custom_presence_router = hs.config.presence_router_module_class(
+                config=hs.config.presence_router_config, module_api=hs.get_module_api()
+            )
+
+            # Ensure the module has implemented the required methods
+            required_methods = ["get_users_for_states", "get_interested_users"]
+            for method_name in required_methods:
+                if not hasattr(self.custom_presence_router, method_name):
+                    raise Exception(
+                        "PresenceRouter module '%s' must implement all required methods: %s"
+                        % (
+                            hs.config.presence_router_module_class.__name__,
+                            ", ".join(required_methods),
+                        )
+                    )
+
+    async def get_users_for_states(
+        self,
+        state_updates: Iterable[UserPresenceState],
+    ) -> Dict[str, Set[UserPresenceState]]:
+        """
+        Given an iterable of user presence updates, determine where each one
+        needs to go.
+
+        Args:
+            state_updates: An iterable of user presence state updates.
+
+        Returns:
+          A dictionary of user_id -> set of UserPresenceState, indicating which
+          presence updates each user should receive.
+        """
+        if self.custom_presence_router is not None:
+            # Ask the custom module
+            return await self.custom_presence_router.get_users_for_states(
+                state_updates=state_updates
+            )
+
+        # Don't include any extra destinations for presence updates
+        return {}
+
+    async def get_interested_users(self, user_id: str) -> Union[Set[str], ALL_USERS]:
+        """
+        Retrieve a list of users that `user_id` is interested in receiving the
+        presence of. This will be in addition to those they share a room with.
+        Optionally, the object PresenceRouter.ALL_USERS can be returned to indicate
+        that this user should receive all incoming local and remote presence updates.
+
+        Note that this method will only be called for local users, but can return users
+        that are local or remote.
+
+        Args:
+            user_id: A user requesting presence updates.
+
+        Returns:
+            A set of user IDs to return presence updates for, or ALL_USERS to return all
+            known updates.
+        """
+        if self.custom_presence_router is not None:
+            # Ask the custom module for interested users
+            return await self.custom_presence_router.get_interested_users(
+                user_id=user_id
+            )
+
+        # A custom presence router is not defined.
+        # Don't report any additional interested users
+        return set()
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 8babb1ebbe..98bfce22ff 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -44,6 +44,7 @@ from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
 from synapse.util.metrics import Measure, measure_func
 
 if TYPE_CHECKING:
+    from synapse.events.presence_router import PresenceRouter
     from synapse.server import HomeServer
 
 logger = logging.getLogger(__name__)
@@ -162,6 +163,7 @@ class FederationSender(AbstractFederationSender):
         self.clock = hs.get_clock()
         self.is_mine_id = hs.is_mine_id
 
+        self._presence_router = None  # type: Optional[PresenceRouter]
         self._transaction_manager = TransactionManager(hs)
 
         self._instance_name = hs.get_instance_name()
@@ -584,7 +586,22 @@ class FederationSender(AbstractFederationSender):
         """Given a list of states populate self.pending_presence_by_dest and
         poke to send a new transaction to each destination
         """
-        hosts_and_states = await get_interested_remotes(self.store, states, self.state)
+        # We pull the presence router here instead of __init__
+        # to prevent a dependency cycle:
+        #
+        # AuthHandler -> Notifier -> FederationSender
+        # -> PresenceRouter -> ModuleApi -> AuthHandler
+        if self._presence_router is None:
+            self._presence_router = self.hs.get_presence_router()
+
+        assert self._presence_router is not None
+
+        hosts_and_states = await get_interested_remotes(
+            self.store,
+            self._presence_router,
+            states,
+            self.state,
+        )
 
         for destinations, states in hosts_and_states:
             for destination in destinations:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index da92feacc9..c817f2952d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -25,7 +25,17 @@ The methods that define policy are:
 import abc
 import logging
 from contextlib import contextmanager
-from typing import TYPE_CHECKING, Dict, Iterable, List, Set, Tuple
+from typing import (
+    TYPE_CHECKING,
+    Dict,
+    FrozenSet,
+    Iterable,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Union,
+)
 
 from prometheus_client import Counter
 from typing_extensions import ContextManager
@@ -34,6 +44,7 @@ import synapse.metrics
 from synapse.api.constants import EventTypes, Membership, PresenceState
 from synapse.api.errors import SynapseError
 from synapse.api.presence import UserPresenceState
+from synapse.events.presence_router import PresenceRouter
 from synapse.logging.context import run_in_background
 from synapse.logging.utils import log_function
 from synapse.metrics import LaterGauge
@@ -42,7 +53,7 @@ from synapse.state import StateHandler
 from synapse.storage.databases.main import DataStore
 from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import _CacheContext, cached
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
 
@@ -209,6 +220,7 @@ class PresenceHandler(BasePresenceHandler):
         self.notifier = hs.get_notifier()
         self.federation = hs.get_federation_sender()
         self.state = hs.get_state_handler()
+        self.presence_router = hs.get_presence_router()
         self._presence_enabled = hs.config.use_presence
 
         federation_registry = hs.get_federation_registry()
@@ -653,7 +665,7 @@ class PresenceHandler(BasePresenceHandler):
         """
         stream_id, max_token = await self.store.update_presence(states)
 
-        parties = await get_interested_parties(self.store, states)
+        parties = await get_interested_parties(self.store, self.presence_router, states)
         room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
@@ -1041,7 +1053,12 @@ class PresenceEventSource:
         #
         #   Presence -> Notifier -> PresenceEventSource -> Presence
         #
+        # Same with get_module_api, get_presence_router
+        #
+        #   AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
         self.get_presence_handler = hs.get_presence_handler
+        self.get_module_api = hs.get_module_api
+        self.get_presence_router = hs.get_presence_router
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
         self.state = hs.get_state_handler()
@@ -1055,7 +1072,7 @@ class PresenceEventSource:
         include_offline=True,
         explicit_room_id=None,
         **kwargs
-    ):
+    ) -> Tuple[List[UserPresenceState], int]:
         # The process for getting presence events are:
         #  1. Get the rooms the user is in.
         #  2. Get the list of user in the rooms.
@@ -1068,7 +1085,17 @@ class PresenceEventSource:
         # We don't try and limit the presence updates by the current token, as
         # sending down the rare duplicate is not a concern.
 
+        user_id = user.to_string()
+        stream_change_cache = self.store.presence_stream_cache
+
         with Measure(self.clock, "presence.get_new_events"):
+            if user_id in self.get_module_api()._send_full_presence_to_local_users:
+                # This user has been specified by a module to receive all current, online
+                # user presence. Removing from_key and setting include_offline to false
+                # will do effectively this.
+                from_key = None
+                include_offline = False
+
             if from_key is not None:
                 from_key = int(from_key)
 
@@ -1091,59 +1118,209 @@ class PresenceEventSource:
                 # doesn't return. C.f. #5503.
                 return [], max_token
 
-            presence = self.get_presence_handler()
-            stream_change_cache = self.store.presence_stream_cache
-
+            # Figure out which other users this user should receive updates for
             users_interested_in = await self._get_interested_in(user, explicit_room_id)
 
-            user_ids_changed = set()  # type: Collection[str]
-            changed = None
-            if from_key:
-                changed = stream_change_cache.get_all_entities_changed(from_key)
+            # We have a set of users that we're interested in the presence of. We want to
+            # cross-reference that with the users that have actually changed their presence.
 
-            if changed is not None and len(changed) < 500:
-                assert isinstance(user_ids_changed, set)
+            # Check whether this user should see all user updates
 
-                # For small deltas, its quicker to get all changes and then
-                # work out if we share a room or they're in our presence list
-                get_updates_counter.labels("stream").inc()
-                for other_user_id in changed:
-                    if other_user_id in users_interested_in:
-                        user_ids_changed.add(other_user_id)
-            else:
-                # Too many possible updates. Find all users we can see and check
-                # if any of them have changed.
-                get_updates_counter.labels("full").inc()
+            if users_interested_in == PresenceRouter.ALL_USERS:
+                # Provide presence state for all users
+                presence_updates = await self._filter_all_presence_updates_for_user(
+                    user_id, include_offline, from_key
+                )
 
-                if from_key:
-                    user_ids_changed = stream_change_cache.get_entities_changed(
-                        users_interested_in, from_key
+                # Remove the user from the list of users to receive all presence
+                if user_id in self.get_module_api()._send_full_presence_to_local_users:
+                    self.get_module_api()._send_full_presence_to_local_users.remove(
+                        user_id
                     )
+
+                return presence_updates, max_token
+
+            # Make mypy happy. users_interested_in should now be a set
+            assert not isinstance(users_interested_in, str)
+
+            # The set of users that we're interested in and that have had a presence update.
+            # We'll actually pull the presence updates for these users at the end.
+            interested_and_updated_users = (
+                set()
+            )  # type: Union[Set[str], FrozenSet[str]]
+
+            if from_key:
+                # First get all users that have had a presence update
+                updated_users = stream_change_cache.get_all_entities_changed(from_key)
+
+                # Cross-reference users we're interested in with those that have had updates.
+                # Use a slightly-optimised method for processing smaller sets of updates.
+                if updated_users is not None and len(updated_users) < 500:
+                    # For small deltas, it's quicker to get all changes and then
+                    # cross-reference with the users we're interested in
+                    get_updates_counter.labels("stream").inc()
+                    for other_user_id in updated_users:
+                        if other_user_id in users_interested_in:
+                            # mypy thinks this variable could be a FrozenSet as it's possibly set
+                            # to one in the `get_entities_changed` call below, and `add()` is not
+                            # method on a FrozenSet. That doesn't affect us here though, as
+                            # `interested_and_updated_users` is clearly a set() above.
+                            interested_and_updated_users.add(other_user_id)  # type: ignore
                 else:
-                    user_ids_changed = users_interested_in
+                    # Too many possible updates. Find all users we can see and check
+                    # if any of them have changed.
+                    get_updates_counter.labels("full").inc()
 
-            updates = await presence.current_state_for_users(user_ids_changed)
+                    interested_and_updated_users = (
+                        stream_change_cache.get_entities_changed(
+                            users_interested_in, from_key
+                        )
+                    )
+            else:
+                # No from_key has been specified. Return the presence for all users
+                # this user is interested in
+                interested_and_updated_users = users_interested_in
+
+            # Retrieve the current presence state for each user
+            users_to_state = await self.get_presence_handler().current_state_for_users(
+                interested_and_updated_users
+            )
+            presence_updates = list(users_to_state.values())
 
-        if include_offline:
-            return (list(updates.values()), max_token)
+        # Remove the user from the list of users to receive all presence
+        if user_id in self.get_module_api()._send_full_presence_to_local_users:
+            self.get_module_api()._send_full_presence_to_local_users.remove(user_id)
+
+        if not include_offline:
+            # Filter out offline presence states
+            presence_updates = self._filter_offline_presence_state(presence_updates)
+
+        return presence_updates, max_token
+
+    async def _filter_all_presence_updates_for_user(
+        self,
+        user_id: str,
+        include_offline: bool,
+        from_key: Optional[int] = None,
+    ) -> List[UserPresenceState]:
+        """
+        Computes the presence updates a user should receive.
+
+        First pulls presence updates from the database. Then consults PresenceRouter
+        for whether any updates should be excluded by user ID.
+
+        Args:
+            user_id: The User ID of the user to compute presence updates for.
+            include_offline: Whether to include offline presence states from the results.
+            from_key: The minimum stream ID of updates to pull from the database
+                before filtering.
+
+        Returns:
+            A list of presence states for the given user to receive.
+        """
+        if from_key:
+            # Only return updates since the last sync
+            updated_users = self.store.presence_stream_cache.get_all_entities_changed(
+                from_key
+            )
+            if not updated_users:
+                updated_users = []
+
+            # Get the actual presence update for each change
+            users_to_state = await self.get_presence_handler().current_state_for_users(
+                updated_users
+            )
+            presence_updates = list(users_to_state.values())
+
+            if not include_offline:
+                # Filter out offline states
+                presence_updates = self._filter_offline_presence_state(presence_updates)
         else:
-            return (
-                [s for s in updates.values() if s.state != PresenceState.OFFLINE],
-                max_token,
+            users_to_state = await self.store.get_presence_for_all_users(
+                include_offline=include_offline
             )
 
+            presence_updates = list(users_to_state.values())
+
+        # TODO: This feels wildly inefficient, and it's unfortunate we need to ask the
+        # module for information on a number of users when we then only take the info
+        # for a single user
+
+        # Filter through the presence router
+        users_to_state_set = await self.get_presence_router().get_users_for_states(
+            presence_updates
+        )
+
+        # We only want the mapping for the syncing user
+        presence_updates = list(users_to_state_set[user_id])
+
+        # Return presence information for all users
+        return presence_updates
+
+    def _filter_offline_presence_state(
+        self, presence_updates: Iterable[UserPresenceState]
+    ) -> List[UserPresenceState]:
+        """Given an iterable containing user presence updates, return a list with any offline
+        presence states removed.
+
+        Args:
+            presence_updates: Presence states to filter
+
+        Returns:
+            A new list with any offline presence states removed.
+        """
+        return [
+            update
+            for update in presence_updates
+            if update.state != PresenceState.OFFLINE
+        ]
+
     def get_current_key(self):
         return self.store.get_current_presence_token()
 
     @cached(num_args=2, cache_context=True)
-    async def _get_interested_in(self, user, explicit_room_id, cache_context):
+    async def _get_interested_in(
+        self,
+        user: UserID,
+        explicit_room_id: Optional[str] = None,
+        cache_context: Optional[_CacheContext] = None,
+    ) -> Union[Set[str], str]:
         """Returns the set of users that the given user should see presence
-        updates for
+        updates for.
+
+        Args:
+            user: The user to retrieve presence updates for.
+            explicit_room_id: The users that are in the room will be returned.
+
+        Returns:
+            A set of user IDs to return presence updates for, or "ALL" to return all
+            known updates.
         """
         user_id = user.to_string()
         users_interested_in = set()
         users_interested_in.add(user_id)  # So that we receive our own presence
 
+        # cache_context isn't likely to ever be None due to the @cached decorator,
+        # but we can't have a non-optional argument after the optional argument
+        # explicit_room_id either. Assert cache_context is not None so we can use it
+        # without mypy complaining.
+        assert cache_context
+
+        # Check with the presence router whether we should poll additional users for
+        # their presence information
+        additional_users = await self.get_presence_router().get_interested_users(
+            user.to_string()
+        )
+        if additional_users == PresenceRouter.ALL_USERS:
+            # If the module requested that this user see the presence updates of *all*
+            # users, then simply return that instead of calculating what rooms this
+            # user shares
+            return PresenceRouter.ALL_USERS
+
+        # Add the additional users from the router
+        users_interested_in.update(additional_users)
+
+        # Find the users who share a room with this user
         users_who_share_room = await self.store.get_users_who_share_room_with_user(
             user_id, on_invalidate=cache_context.invalidate
         )
@@ -1314,14 +1491,15 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
 
 
 async def get_interested_parties(
-    store: DataStore, states: List[UserPresenceState]
+    store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState]
 ) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
     """Given a list of states return which entities (rooms, users)
     are interested in the given states.
 
     Args:
-        store
-        states
+        store: The homeserver's data store.
+        presence_router: A module for augmenting the destinations for presence updates.
+        states: A list of incoming user presence updates.
 
     Returns:
         A 2-tuple of `(room_ids_to_states, users_to_states)`,
@@ -1337,11 +1515,22 @@ async def get_interested_parties(
         # Always notify self
         users_to_states.setdefault(state.user_id, []).append(state)
 
+    # Ask a presence routing module for any additional parties if one
+    # is loaded.
+    router_users_to_states = await presence_router.get_users_for_states(states)
+
+    # Update the dictionaries with additional destinations and state to send
+    for user_id, user_states in router_users_to_states.items():
+        users_to_states.setdefault(user_id, []).extend(user_states)
+
     return room_ids_to_states, users_to_states
 
 
 async def get_interested_remotes(
-    store: DataStore, states: List[UserPresenceState], state_handler: StateHandler
+    store: DataStore,
+    presence_router: PresenceRouter,
+    states: List[UserPresenceState],
+    state_handler: StateHandler,
 ) -> List[Tuple[Collection[str], List[UserPresenceState]]]:
     """Given a list of presence states figure out which remote servers
     should be sent which.
@@ -1349,9 +1538,10 @@ async def get_interested_remotes(
     All the presence states should be for local users only.
 
     Args:
-        store
-        states
-        state_handler
+        store: The homeserver's data store.
+        presence_router: A module for augmenting the destinations for presence updates.
+        states: A list of incoming user presence updates.
+        state_handler:
 
     Returns:
         A list of 2-tuples of destinations and states, where for
@@ -1363,7 +1553,9 @@ async def get_interested_remotes(
     # First we look up the rooms each user is in (as well as any explicit
     # subscriptions), then for each distinct room we look up the remote
     # hosts in those rooms.
-    room_ids_to_states, users_to_states = await get_interested_parties(store, states)
+    room_ids_to_states, users_to_states = await get_interested_parties(
+        store, presence_router, states
+    )
 
     for room_id, states in room_ids_to_states.items():
         hosts = await state_handler.get_current_hosts_in_room(room_id)
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 781e02fbbb..3ecd46c038 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -50,11 +50,20 @@ class ModuleApi:
         self._auth = hs.get_auth()
         self._auth_handler = auth_handler
         self._server_name = hs.hostname
+        self._presence_stream = hs.get_event_sources().sources["presence"]
 
         # We expose these as properties below in order to attach a helpful docstring.
         self._http_client = hs.get_simple_http_client()  # type: SimpleHttpClient
         self._public_room_list_manager = PublicRoomListManager(hs)
 
+        # The next time these users sync, they will receive the current presence
+        # state of all local users. Users are added by send_local_online_presence_to,
+        # and removed after a successful sync.
+        #
+        # We make this a private variable to deter modules from accessing it directly,
+        # though other classes in Synapse will still do so.
+        self._send_full_presence_to_local_users = set()
+
     @property
     def http_client(self):
         """Allows making outbound HTTP requests to remote resources.
@@ -385,6 +394,47 @@ class ModuleApi:
 
         return event
 
+    async def send_local_online_presence_to(self, users: Iterable[str]) -> None:
+        """
+        Forces the equivalent of a presence initial_sync for a set of local or remote
+        users. The users will receive presence for all currently online users that they
+        are considered interested in.
+
+        Updates to remote users will be sent immediately, whereas local users will receive
+        them on their next sync attempt.
+
+        Note that this method can only be run on the main or federation_sender worker
+        processes.
+        """
+        if not self._hs.should_send_federation():
+            raise Exception(
+                "send_local_online_presence_to can only be run "
+                "on processes that send federation",
+            )
+
+        for user in users:
+            if self._hs.is_mine_id(user):
+                # Modify SyncHandler._generate_sync_entry_for_presence to call
+                # presence_source.get_new_events with an empty `from_key` if
+                # that user's ID were in a list modified by ModuleApi somewhere.
+                # That user would then get all presence state on next incremental sync.
+
+                # Force a presence initial_sync for this user next time
+                self._send_full_presence_to_local_users.add(user)
+            else:
+                # Retrieve presence state for currently online users that this user
+                # is considered interested in
+                presence_events, _ = await self._presence_stream.get_new_events(
+                    UserID.from_string(user), from_key=None, include_offline=False
+                )
+
+                # Send to remote destinations
+                await make_deferred_yieldable(
+                    # We pull the federation sender here as we can only do so on workers
+                    # that support sending presence
+                    self._hs.get_federation_sender().send_presence(presence_events)
+                )
+
 
 class PublicRoomListManager:
     """Contains methods for adding to, removing from and querying whether a room
diff --git a/synapse/server.py b/synapse/server.py
index e42f7b1a18..cfb55c230d 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -51,6 +51,7 @@ from synapse.crypto import context_factory
 from synapse.crypto.context_factory import RegularPolicyForHTTPS
 from synapse.crypto.keyring import Keyring
 from synapse.events.builder import EventBuilderFactory
+from synapse.events.presence_router import PresenceRouter
 from synapse.events.spamcheck import SpamChecker
 from synapse.events.third_party_rules import ThirdPartyEventRules
 from synapse.events.utils import EventClientSerializer
@@ -426,6 +427,10 @@ class HomeServer(metaclass=abc.ABCMeta):
             raise Exception("Workers cannot write typing")
 
     @cache_in_self
+    def get_presence_router(self) -> PresenceRouter:
+        return PresenceRouter(self)
+
+    @cache_in_self
     def get_typing_handler(self) -> FollowerTypingHandler:
         if self.config.worker.writers.typing == self.get_instance_name():
             # Use get_typing_writer_handler to ensure that we use the same