summary refs log tree commit diff
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2021-12-09 18:36:55 +0000
committerAndrew Morgan <andrew@amorgan.xyz>2021-12-09 18:36:55 +0000
commitba5f501b4b5add6044cade35aa2e8a540f3644c4 (patch)
treed3d37e4cc0fbe6872872f94b47efeca303eddc03
parentSquash into "and use everywhere" (diff)
downloadsynapse-ba5f501b4b5add6044cade35aa2e8a540f3644c4.tar.xz
Support sending device lists everywhere; needs cleaning up
-rw-r--r--synapse/appservice/__init__.py10
-rw-r--r--synapse/appservice/api.py21
-rw-r--r--synapse/appservice/scheduler.py64
-rw-r--r--synapse/handlers/appservice.py56
-rw-r--r--synapse/storage/databases/main/appservice.py6
-rw-r--r--synapse/storage/databases/main/devices.py5
6 files changed, 121 insertions, 41 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 1d7ba00c4a..382b174d81 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, Iterable, List, Match, Optional
 
 from synapse.api.constants import EventTypes
 from synapse.events import EventBase
-from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id
+from synapse.types import DeviceLists, GroupID, JsonDict, UserID, get_domain_from_id
 from synapse.util.caches.descriptors import _CacheContext, cached
 
 if TYPE_CHECKING:
@@ -305,10 +305,7 @@ class ApplicationService:
         return False
 
     def is_user_in_namespace(self, user_id: str) -> bool:
-        return (
-            bool(self._matches_regex(user_id, ApplicationService.NS_USERS))
-            or user_id == self.sender
-        )
+        return bool(self._matches_regex(user_id, ApplicationService.NS_USERS))
 
     def is_room_alias_in_namespace(self, alias: str) -> bool:
         return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES))
@@ -377,12 +374,14 @@ class AppServiceTransaction:
         events: List[EventBase],
         ephemeral: List[JsonDict],
         to_device_messages: List[JsonDict],
+        device_list_summary: DeviceLists,
     ):
         self.service = service
         self.id = id
         self.events = events
         self.ephemeral = ephemeral
         self.to_device_messages = to_device_messages
+        self.device_list_summary = device_list_summary
 
     async def send(self, as_api: "ApplicationServiceApi") -> bool:
         """Sends this transaction using the provided AS API interface.
@@ -397,6 +396,7 @@ class AppServiceTransaction:
             events=self.events,
             ephemeral=self.ephemeral,
             to_device_messages=self.to_device_messages,
+            device_list_summary=self.device_list_summary,
             txn_id=self.id,
         )
 
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 63bc07512c..0b504b3fc8 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -1,4 +1,5 @@
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2021 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,7 +14,7 @@
 # limitations under the License.
 import logging
 import urllib
-from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
 
 from prometheus_client import Counter
 
@@ -22,7 +23,7 @@ from synapse.api.errors import CodeMessageException
 from synapse.events import EventBase
 from synapse.events.utils import serialize_event
 from synapse.http.client import SimpleHttpClient
-from synapse.types import JsonDict, ThirdPartyInstanceID
+from synapse.types import DeviceLists, JsonDict, ThirdPartyInstanceID
 from synapse.util.caches.response_cache import ResponseCache
 
 if TYPE_CHECKING:
@@ -205,6 +206,7 @@ class ApplicationServiceApi(SimpleHttpClient):
         events: List[EventBase],
         ephemeral: List[JsonDict],
         to_device_messages: List[JsonDict],
+        device_list_summary: DeviceLists,
         txn_id: Optional[int] = None,
     ) -> bool:
         """
@@ -233,7 +235,7 @@ class ApplicationServiceApi(SimpleHttpClient):
         uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
 
         # Never send ephemeral events to appservices that do not support it
-        body: Dict[str, List[JsonDict]] = {"events": serialized_events}
+        body: Dict[str, Union[JsonDict, List[JsonDict]]] = {"events": serialized_events}
         if service.supports_ephemeral:
             body.update(
                 {
@@ -243,6 +245,19 @@ class ApplicationServiceApi(SimpleHttpClient):
                 }
             )
 
+        # Send device list summaries if needed
+        if device_list_summary:
+            logger.info("Sending device list summary: %s", device_list_summary)
+            body.update(
+                {
+                    # TODO: Update to stable prefix once MSC3202 completes FCP merge
+                    "org.matrix.msc3202.device_lists": {
+                        "changed": list(device_list_summary.changed),
+                        "left": list(device_list_summary.left),
+                    }
+                }
+            )
+
         try:
             await self.put_json(
                 uri=uri,
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index dae952dc13..d49636d926 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -54,7 +54,7 @@ from synapse.appservice import ApplicationService, ApplicationServiceState
 from synapse.events import EventBase
 from synapse.logging.context import run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import JsonDict
+from synapse.types import DeviceLists, JsonDict
 
 logger = logging.getLogger(__name__)
 
@@ -100,9 +100,11 @@ class ApplicationServiceScheduler:
         events: Optional[Iterable[EventBase]] = None,
         ephemeral: Optional[Iterable[JsonDict]] = None,
         to_device_messages: Optional[Iterable[JsonDict]] = None,
+        device_list_summary: Optional[DeviceLists] = None,
     ) -> None:
         """
         Enqueue some data to be sent off to an application service.
+
         Args:
             appservice: The application service to create and send a transaction to.
             events: The persistent room events to send.
@@ -110,10 +112,18 @@ class ApplicationServiceScheduler:
             to_device_messages: The to-device messages to send. These differ from normal
                 to-device messages sent to clients, as they have 'to_device_id' and
                 'to_user_id' fields.
+            device_list_summary: A summary of users that the application service either needs
+                to refresh the device lists of, or those that the application service need no
+                longer track the device lists of.
         """
         # We purposefully allow this method to run with empty events/ephemeral
         # iterables, so that callers do not need to check iterable size themselves.
-        if not events and not ephemeral and not to_device_messages:
+        if (
+            not events
+            and not ephemeral
+            and not to_device_messages
+            and not device_list_summary
+        ):
             return
 
         if events:
@@ -124,6 +134,10 @@ class ApplicationServiceScheduler:
             self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend(
                 to_device_messages
             )
+        if device_list_summary:
+            self.queuer.queued_device_list_summaries.setdefault(
+                appservice.id, []
+            ).append(device_list_summary)
 
         # Kick off a new application service transaction
         self.queuer.start_background_request(appservice)
@@ -144,6 +158,8 @@ class _ServiceQueuer:
         self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
         # dict of {service_id: [to_device_message_json]}
         self.queued_to_device_messages: Dict[str, List[JsonDict]] = {}
+        # dict of {service_id: [device_list_summary]}
+        self.queued_device_list_summaries: Dict[str, List[DeviceLists]] = {}
 
         # the appservices which currently have a transaction in flight
         self.requests_in_flight = set()
@@ -183,12 +199,49 @@ class _ServiceQueuer:
                 ]
                 del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION]
 
-                if not events and not ephemeral and not to_device_messages_to_send:
+                # Consolidate any pending device list summaries into a single, up-to-date
+                # summary.
+                # Note: this code assumes that in a single DeviceLists, a user will
+                # never be in both "changed" and "left" sets.
+                device_list_summary = DeviceLists()
+                while self.queued_device_list_summaries.get(service.id, []):
+                    # Pop a summary off the front of the queue
+                    summary = self.queued_device_list_summaries[service.id].pop(0)
+
+                    # For every user in the incoming "changed" set:
+                    #   * Remove them from the existing "left" set if necessary
+                    #     (as we need to start tracking them again)
+                    #   * Add them to the existing "changed" set if necessary.
+                    for user_id in summary.changed:
+                        if user_id in device_list_summary.left:
+                            device_list_summary.left.remove(user_id)
+                        device_list_summary.changed.add(user_id)
+
+                    # For every user in the incoming "left" set:
+                    #   * Remove them from the existing "changed" set if necessary
+                    #     (we no longer need to track them)
+                    #   * Add them to the existing "left" set if necessary.
+                    for user_id in summary.left:
+                        if user_id in device_list_summary.changed:
+                            device_list_summary.changed.remove(user_id)
+                        device_list_summary.left.add(user_id)
+
+                if (
+                    not events
+                    and not ephemeral
+                    and not to_device_messages_to_send
+                    # Note that DeviceLists implements __bool__
+                    and not device_list_summary
+                ):
                     return
 
                 try:
                     await self.txn_ctrl.send(
-                        service, events, ephemeral, to_device_messages_to_send
+                        service,
+                        events,
+                        ephemeral,
+                        to_device_messages_to_send,
+                        device_list_summary,
                     )
                 except Exception:
                     logger.exception("AS request failed")
@@ -227,6 +280,7 @@ class _TransactionController:
         events: List[EventBase],
         ephemeral: Optional[List[JsonDict]] = None,
         to_device_messages: Optional[List[JsonDict]] = None,
+        device_list_summary: Optional[DeviceLists] = None,
     ) -> None:
         """
         Create a transaction with the given data and send to the provided
@@ -237,6 +291,7 @@ class _TransactionController:
             events: The persistent events to include in the transaction.
             ephemeral: The ephemeral events to include in the transaction.
             to_device_messages: The to-device messages to include in the transaction.
+            device_list_summary: The device list summary to include in the transaction.
         """
         try:
             txn = await self.store.create_appservice_txn(
@@ -244,6 +299,7 @@ class _TransactionController:
                 events=events,
                 ephemeral=ephemeral or [],
                 to_device_messages=to_device_messages or [],
+                device_list_summary=device_list_summary or DeviceLists(),
             )
             service_is_up = await self._is_service_up(service)
             if service_is_up:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 39cabaef40..91de1f6652 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -33,9 +33,8 @@ from synapse.metrics.background_process_metrics import (
     wrap_as_background_process,
 )
 from synapse.storage.databases.main.directory import RoomAliasMapping
-from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
+from synapse.types import DeviceLists, JsonDict, RoomAlias, RoomStreamToken, UserID
 from synapse.util.async_helpers import Linearizer
-from synapse.util.caches.descriptors import _CacheContext, cached
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
@@ -344,22 +343,16 @@ class ApplicationServicesHandler:
                         )
 
                     elif stream_key == "device_list_key":
-                        users_whose_device_lists_changed = await self._get_device_list_changes(
+                        device_list_summary = await self._get_device_list_summary(
                             service, new_token
                         )
-                        if users_whose_device_lists_changed:
-                            # TODO: Have a way of including things in an outgoing appservice
-                            #   transaction that's not "events" or "ephemeral"
-                            payload = [{
-                                "changed": users_whose_device_lists_changed,
-                                "left": [],
-                            }]
-                            self.scheduler.submit_ephemeral_events_for_as(
-                                service, payload
+                        if device_list_summary:
+                            self.scheduler.enqueue_for_appservice(
+                                service, device_list_summary=device_list_summary
                             )
 
                         # Persist the latest handled stream token for this appservice
-                        await self.store.set_type_stream_id_for_appservice(
+                        await self.store.set_appservice_stream_type_pos(
                             service, "device_list", new_token
                         )
 
@@ -568,11 +561,11 @@ class ApplicationServicesHandler:
 
         return message_payload
 
-    async def _get_device_list_changes(
+    async def _get_device_list_summary(
         self,
         appservice: ApplicationService,
         new_key: int,
-    ) -> List[str]:
+    ) -> DeviceLists:
         """
         Retrieve a list of users who have changed their device lists.
 
@@ -581,8 +574,9 @@ class ApplicationServicesHandler:
             new_key: The stream key of the device list change that triggered this method call.
 
         Returns:
-            A list of users whose device lists have changed and need to be resynced by the
-            appservice.
+            A set of device list updates, comprised of users that the appservices needs to:
+                * resync the device list of, and
+                * stop tracking the device list of.
         """
         # Fetch the last successfully processed device list update stream ID
         # for this appservice.
@@ -591,21 +585,31 @@ class ApplicationServicesHandler:
         )
 
         # Fetch the users who have modified their device list since then.
-        users_with_changed_device_lists = await self.store.get_users_whose_devices_changed(
-            from_key, filter_user_ids=None, to_key=new_key
+        users_with_changed_device_lists = (
+            await self.store.get_users_whose_devices_changed(
+                from_key, filter_user_ids=None, to_key=new_key
+            )
         )
 
         # Filter out any users the application service is not interested in
         #
         # For each user who changed their device list, we want to check whether this
-        # appservice would be interested in the change
-        filtered_users_with_changed_device_lists = [
+        # appservice would be interested in the change.
+        filtered_users_with_changed_device_lists = {
             user_id
             for user_id in users_with_changed_device_lists
-            if self._is_appservice_interested_in_device_lists_of_user(appservice, user_id)
-        ]
+            if self._is_appservice_interested_in_device_lists_of_user(
+                appservice, user_id
+            )
+        }
+
+        # Create a summary of "changed" and "left" users.
+        # TODO: Calculate "left" users.
+        device_list_summary = DeviceLists(
+            changed=filtered_users_with_changed_device_lists
+        )
 
-        return filtered_users_with_changed_device_lists
+        return device_list_summary
 
     async def _is_appservice_interested_in_device_lists_of_user(
         self,
@@ -641,9 +645,7 @@ class ApplicationServicesHandler:
         for room_id in room_ids:
             # This method covers checking room members for appservice interest as well as
             # room ID and alias checks.
-            if await appservice.is_interested_in_room(
-                room_id, self.store
-            ):
+            if await appservice.is_interested_in_room(room_id, self.store):
                 return True
 
         return False
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index e0ca460c13..0ac2005bee 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -27,7 +27,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.types import Connection
-from synapse.types import JsonDict
+from synapse.types import DeviceLists, JsonDict
 from synapse.util import json_encoder
 
 if TYPE_CHECKING:
@@ -195,6 +195,7 @@ class ApplicationServiceTransactionWorkerStore(
         events: List[EventBase],
         ephemeral: List[JsonDict],
         to_device_messages: List[JsonDict],
+        device_list_summary: DeviceLists,
     ) -> AppServiceTransaction:
         """Atomically creates a new transaction for this application service
         with the given list of events. Ephemeral events are NOT persisted to the
@@ -205,6 +206,7 @@ class ApplicationServiceTransactionWorkerStore(
             events: A list of persistent events to put in the transaction.
             ephemeral: A list of ephemeral events to put in the transaction.
             to_device_messages: A list of to-device messages to put in the transaction.
+            device_list_summary: The device list summary to include in the transaction.
 
         Returns:
             A new transaction.
@@ -240,6 +242,7 @@ class ApplicationServiceTransactionWorkerStore(
                 events=events,
                 ephemeral=ephemeral,
                 to_device_messages=to_device_messages,
+                device_list_summary=device_list_summary,
             )
 
         return await self.db_pool.runInteraction(
@@ -337,6 +340,7 @@ class ApplicationServiceTransactionWorkerStore(
             events=events,
             ephemeral=[],
             to_device_messages=[],
+            device_list_summary=DeviceLists(),
         )
 
     def _get_last_txn(self, txn, service_id: Optional[str]) -> int:
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 0d5d9830a5..f62bc03915 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -555,7 +555,10 @@ class DeviceWorkerStore(SQLBaseStore):
         }
 
     async def get_users_whose_devices_changed(
-        self, from_key: int, filter_user_ids: Optional[Iterable[str]] = None, to_key: Optional[int] = None
+        self,
+        from_key: int,
+        filter_user_ids: Optional[Iterable[str]] = None,
+        to_key: Optional[int] = None,
     ) -> Set[str]:
         """Get set of users whose devices have changed since `from_key` that
         are in the given list of user_ids.