1 files changed, 50 insertions, 0 deletions
| diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 781e02fbbb..3ecd46c038 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -50,11 +50,20 @@ class ModuleApi:
         self._auth = hs.get_auth()
         self._auth_handler = auth_handler
         self._server_name = hs.hostname
+        self._presence_stream = hs.get_event_sources().sources["presence"]
 
         # We expose these as properties below in order to attach a helpful docstring.
         self._http_client = hs.get_simple_http_client()  # type: SimpleHttpClient
         self._public_room_list_manager = PublicRoomListManager(hs)
 
+        # The next time these users sync, they will receive the current presence
+        # state of all local users. Users are added by send_local_online_presence_to,
+        # and removed after a successful sync.
+        #
+        # We make this a private variable to deter modules from accessing it directly,
+        # though other classes in Synapse will still do so.
+        self._send_full_presence_to_local_users = set()
+
     @property
     def http_client(self):
         """Allows making outbound HTTP requests to remote resources.
@@ -385,6 +394,47 @@ class ModuleApi:
 
         return event
 
+    async def send_local_online_presence_to(self, users: Iterable[str]) -> None:
+        """
+        Forces the equivalent of a presence initial_sync for a set of local or remote
+        users. The users will receive presence for all currently online users that they
+        are considered interested in.
+
+        Updates to remote users will be sent immediately, whereas local users will receive
+        them on their next sync attempt.
+
+        Note that this method can only be run on the main or federation_sender worker
+        processes.
+        """
+        if not self._hs.should_send_federation():
+            raise Exception(
+                "send_local_online_presence_to can only be run "
+                "on processes that send federation",
+            )
+
+        for user in users:
+            if self._hs.is_mine_id(user):
+                # Modify SyncHandler._generate_sync_entry_for_presence to call
+                # presence_source.get_new_events with an empty `from_key` if
+                # that user's ID were in a list modified by ModuleApi somewhere.
+                # That user would then get all presence state on next incremental sync.
+
+                # Force a presence initial_sync for this user next time
+                self._send_full_presence_to_local_users.add(user)
+            else:
+                # Retrieve presence state for currently online users that this user
+                # is considered interested in
+                presence_events, _ = await self._presence_stream.get_new_events(
+                    UserID.from_string(user), from_key=None, include_offline=False
+                )
+
+                # Send to remote destinations
+                await make_deferred_yieldable(
+                    # We pull the federation sender here as we can only do so on workers
+                    # that support sending presence
+                    self._hs.get_federation_sender().send_presence(presence_events)
+                )
+
 
 class PublicRoomListManager:
     """Contains methods for adding to, removing from and querying whether a room
 |