summary refs log tree commit diff
path: root/synapse/appservice
diff options
context:
space:
mode:
authorAndrew Morgan <1342360+anoadragon453@users.noreply.github.com>2022-03-30 14:39:27 +0100
committerGitHub <noreply@github.com>2022-03-30 14:39:27 +0100
commitd8d0271977938d89585613e9a77537c33c4dc4a9 (patch)
treec5ccf846430cef6b45e5dcf1de938a9c92dcf628 /synapse/appservice
parentFlesh out documentation for running SyTest against Synapse, including use of ... (diff)
downloadsynapse-d8d0271977938d89585613e9a77537c33c4dc4a9.tar.xz
Send device list updates to application services (MSC3202) - part 1 (#11881)
Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>
Diffstat (limited to 'synapse/appservice')
-rw-r--r--synapse/appservice/__init__.py12
-rw-r--r--synapse/appservice/api.py10
-rw-r--r--synapse/appservice/scheduler.py53
3 files changed, 70 insertions, 5 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 07ec95f1d6..d23d9221bc 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -1,4 +1,5 @@
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2022 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -22,7 +23,13 @@ from netaddr import IPSet
 
 from synapse.api.constants import EventTypes
 from synapse.events import EventBase
-from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id
+from synapse.types import (
+    DeviceListUpdates,
+    GroupID,
+    JsonDict,
+    UserID,
+    get_domain_from_id,
+)
 from synapse.util.caches.descriptors import _CacheContext, cached
 
 if TYPE_CHECKING:
@@ -400,6 +407,7 @@ class AppServiceTransaction:
         to_device_messages: List[JsonDict],
         one_time_key_counts: TransactionOneTimeKeyCounts,
         unused_fallback_keys: TransactionUnusedFallbackKeys,
+        device_list_summary: DeviceListUpdates,
     ):
         self.service = service
         self.id = id
@@ -408,6 +416,7 @@ class AppServiceTransaction:
         self.to_device_messages = to_device_messages
         self.one_time_key_counts = one_time_key_counts
         self.unused_fallback_keys = unused_fallback_keys
+        self.device_list_summary = device_list_summary
 
     async def send(self, as_api: "ApplicationServiceApi") -> bool:
         """Sends this transaction using the provided AS API interface.
@@ -424,6 +433,7 @@ class AppServiceTransaction:
             to_device_messages=self.to_device_messages,
             one_time_key_counts=self.one_time_key_counts,
             unused_fallback_keys=self.unused_fallback_keys,
+            device_list_summary=self.device_list_summary,
             txn_id=self.id,
         )
 
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 98fe354014..0cdbb04bfb 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -1,4 +1,5 @@
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2022 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -27,7 +28,7 @@ from synapse.appservice import (
 from synapse.events import EventBase
 from synapse.events.utils import SerializeEventConfig, serialize_event
 from synapse.http.client import SimpleHttpClient
-from synapse.types import JsonDict, ThirdPartyInstanceID
+from synapse.types import DeviceListUpdates, JsonDict, ThirdPartyInstanceID
 from synapse.util.caches.response_cache import ResponseCache
 
 if TYPE_CHECKING:
@@ -225,6 +226,7 @@ class ApplicationServiceApi(SimpleHttpClient):
         to_device_messages: List[JsonDict],
         one_time_key_counts: TransactionOneTimeKeyCounts,
         unused_fallback_keys: TransactionUnusedFallbackKeys,
+        device_list_summary: DeviceListUpdates,
         txn_id: Optional[int] = None,
     ) -> bool:
         """
@@ -268,6 +270,7 @@ class ApplicationServiceApi(SimpleHttpClient):
                 }
             )
 
+        # TODO: Update to stable prefixes once MSC3202 completes FCP merge
         if service.msc3202_transaction_extensions:
             if one_time_key_counts:
                 body[
@@ -277,6 +280,11 @@ class ApplicationServiceApi(SimpleHttpClient):
                 body[
                     "org.matrix.msc3202.device_unused_fallback_keys"
                 ] = unused_fallback_keys
+            if device_list_summary:
+                body["org.matrix.msc3202.device_lists"] = {
+                    "changed": list(device_list_summary.changed),
+                    "left": list(device_list_summary.left),
+                }
 
         try:
             await self.put_json(
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index a6084b9c35..3b49e60716 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -72,7 +72,7 @@ from synapse.events import EventBase
 from synapse.logging.context import run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.databases.main import DataStore
-from synapse.types import JsonDict
+from synapse.types import DeviceListUpdates, JsonDict
 from synapse.util import Clock
 
 if TYPE_CHECKING:
@@ -122,6 +122,7 @@ class ApplicationServiceScheduler:
         events: Optional[Collection[EventBase]] = None,
         ephemeral: Optional[Collection[JsonDict]] = None,
         to_device_messages: Optional[Collection[JsonDict]] = None,
+        device_list_summary: Optional[DeviceListUpdates] = None,
     ) -> None:
         """
         Enqueue some data to be sent off to an application service.
@@ -133,10 +134,18 @@ class ApplicationServiceScheduler:
             to_device_messages: The to-device messages to send. These differ from normal
                 to-device messages sent to clients, as they have 'to_device_id' and
                 'to_user_id' fields.
+            device_list_summary: A summary of users that the application service either needs
+                to refresh the device lists of, or those that the application service need no
+                longer track the device lists of.
         """
         # We purposefully allow this method to run with empty events/ephemeral
         # collections, so that callers do not need to check iterable size themselves.
-        if not events and not ephemeral and not to_device_messages:
+        if (
+            not events
+            and not ephemeral
+            and not to_device_messages
+            and not device_list_summary
+        ):
             return
 
         if events:
@@ -147,6 +156,10 @@ class ApplicationServiceScheduler:
             self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend(
                 to_device_messages
             )
+        if device_list_summary:
+            self.queuer.queued_device_list_summaries.setdefault(
+                appservice.id, []
+            ).append(device_list_summary)
 
         # Kick off a new application service transaction
         self.queuer.start_background_request(appservice)
@@ -169,6 +182,8 @@ class _ServiceQueuer:
         self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
         # dict of {service_id: [to_device_message_json]}
         self.queued_to_device_messages: Dict[str, List[JsonDict]] = {}
+        # dict of {service_id: [device_list_summary]}
+        self.queued_device_list_summaries: Dict[str, List[DeviceListUpdates]] = {}
 
         # the appservices which currently have a transaction in flight
         self.requests_in_flight: Set[str] = set()
@@ -212,7 +227,35 @@ class _ServiceQueuer:
                 ]
                 del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION]
 
-                if not events and not ephemeral and not to_device_messages_to_send:
+                # Consolidate any pending device list summaries into a single, up-to-date
+                # summary.
+                # Note: this code assumes that in a single DeviceListUpdates, a user will
+                # never be in both "changed" and "left" sets.
+                device_list_summary = DeviceListUpdates()
+                for summary in self.queued_device_list_summaries.get(service.id, []):
+                    # For every user in the incoming "changed" set:
+                    #   * Remove them from the existing "left" set if necessary
+                    #     (as we need to start tracking them again)
+                    #   * Add them to the existing "changed" set if necessary.
+                    device_list_summary.left.difference_update(summary.changed)
+                    device_list_summary.changed.update(summary.changed)
+
+                    # For every user in the incoming "left" set:
+                    #   * Remove them from the existing "changed" set if necessary
+                    #     (we no longer need to track them)
+                    #   * Add them to the existing "left" set if necessary.
+                    device_list_summary.changed.difference_update(summary.left)
+                    device_list_summary.left.update(summary.left)
+                self.queued_device_list_summaries.clear()
+
+                if (
+                    not events
+                    and not ephemeral
+                    and not to_device_messages_to_send
+                    # DeviceListUpdates is True if either the 'changed' or 'left' sets have
+                    # at least one entry, otherwise False
+                    and not device_list_summary
+                ):
                     return
 
                 one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None
@@ -240,6 +283,7 @@ class _ServiceQueuer:
                         to_device_messages_to_send,
                         one_time_key_counts,
                         unused_fallback_keys,
+                        device_list_summary,
                     )
                 except Exception:
                     logger.exception("AS request failed")
@@ -322,6 +366,7 @@ class _TransactionController:
         to_device_messages: Optional[List[JsonDict]] = None,
         one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None,
         unused_fallback_keys: Optional[TransactionUnusedFallbackKeys] = None,
+        device_list_summary: Optional[DeviceListUpdates] = None,
     ) -> None:
         """
         Create a transaction with the given data and send to the provided
@@ -336,6 +381,7 @@ class _TransactionController:
                 appservice devices in the transaction.
             unused_fallback_keys: Lists of unused fallback keys for relevant
                 appservice devices in the transaction.
+            device_list_summary: The device list summary to include in the transaction.
         """
         try:
             txn = await self.store.create_appservice_txn(
@@ -345,6 +391,7 @@ class _TransactionController:
                 to_device_messages=to_device_messages or [],
                 one_time_key_counts=one_time_key_counts or {},
                 unused_fallback_keys=unused_fallback_keys or {},
+                device_list_summary=device_list_summary or DeviceListUpdates(),
             )
             service_is_up = await self._is_service_up(service)
             if service_is_up: