diff options
-rw-r--r-- | synapse/appservice/__init__.py | 10 | ||||
-rw-r--r-- | synapse/appservice/api.py | 21 | ||||
-rw-r--r-- | synapse/appservice/scheduler.py | 64 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 56 | ||||
-rw-r--r-- | synapse/storage/databases/main/appservice.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/devices.py | 5 |
6 files changed, 121 insertions, 41 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 1d7ba00c4a..382b174d81 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, Iterable, List, Match, Optional from synapse.api.constants import EventTypes from synapse.events import EventBase -from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id +from synapse.types import DeviceLists, GroupID, JsonDict, UserID, get_domain_from_id from synapse.util.caches.descriptors import _CacheContext, cached if TYPE_CHECKING: @@ -305,10 +305,7 @@ class ApplicationService: return False def is_user_in_namespace(self, user_id: str) -> bool: - return ( - bool(self._matches_regex(user_id, ApplicationService.NS_USERS)) - or user_id == self.sender - ) + return bool(self._matches_regex(user_id, ApplicationService.NS_USERS)) def is_room_alias_in_namespace(self, alias: str) -> bool: return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES)) @@ -377,12 +374,14 @@ class AppServiceTransaction: events: List[EventBase], ephemeral: List[JsonDict], to_device_messages: List[JsonDict], + device_list_summary: DeviceLists, ): self.service = service self.id = id self.events = events self.ephemeral = ephemeral self.to_device_messages = to_device_messages + self.device_list_summary = device_list_summary async def send(self, as_api: "ApplicationServiceApi") -> bool: """Sends this transaction using the provided AS API interface. @@ -397,6 +396,7 @@ class AppServiceTransaction: events=self.events, ephemeral=self.ephemeral, to_device_messages=self.to_device_messages, + device_list_summary=self.device_list_summary, txn_id=self.id, ) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 63bc07512c..0b504b3fc8 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -1,4 +1,5 @@ # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,7 +14,7 @@ # limitations under the License. import logging import urllib -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union from prometheus_client import Counter @@ -22,7 +23,7 @@ from synapse.api.errors import CodeMessageException from synapse.events import EventBase from synapse.events.utils import serialize_event from synapse.http.client import SimpleHttpClient -from synapse.types import JsonDict, ThirdPartyInstanceID +from synapse.types import DeviceLists, JsonDict, ThirdPartyInstanceID from synapse.util.caches.response_cache import ResponseCache if TYPE_CHECKING: @@ -205,6 +206,7 @@ class ApplicationServiceApi(SimpleHttpClient): events: List[EventBase], ephemeral: List[JsonDict], to_device_messages: List[JsonDict], + device_list_summary: DeviceLists, txn_id: Optional[int] = None, ) -> bool: """ @@ -233,7 +235,7 @@ class ApplicationServiceApi(SimpleHttpClient): uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id))) # Never send ephemeral events to appservices that do not support it - body: Dict[str, List[JsonDict]] = {"events": serialized_events} + body: Dict[str, Union[JsonDict, List[JsonDict]]] = {"events": serialized_events} if service.supports_ephemeral: body.update( { @@ -243,6 +245,19 @@ class ApplicationServiceApi(SimpleHttpClient): } ) + # Send device list summaries if needed + if device_list_summary: + logger.info("Sending device list summary: %s", device_list_summary) + body.update( + { + # TODO: Update to stable prefix once MSC3202 completes FCP merge + "org.matrix.msc3202.device_lists": { + "changed": list(device_list_summary.changed), + "left": list(device_list_summary.left), + } + } + ) + try: await self.put_json( uri=uri, diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index dae952dc13..d49636d926 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -54,7 +54,7 @@ from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.events import EventBase from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import JsonDict +from synapse.types import DeviceLists, JsonDict logger = logging.getLogger(__name__) @@ -100,9 +100,11 @@ class ApplicationServiceScheduler: events: Optional[Iterable[EventBase]] = None, ephemeral: Optional[Iterable[JsonDict]] = None, to_device_messages: Optional[Iterable[JsonDict]] = None, + device_list_summary: Optional[DeviceLists] = None, ) -> None: """ Enqueue some data to be sent off to an application service. + Args: appservice: The application service to create and send a transaction to. events: The persistent room events to send. @@ -110,10 +112,18 @@ class ApplicationServiceScheduler: to_device_messages: The to-device messages to send. These differ from normal to-device messages sent to clients, as they have 'to_device_id' and 'to_user_id' fields. + device_list_summary: A summary of users that the application service either needs + to refresh the device lists of, or those that the application service need no + longer track the device lists of. """ # We purposefully allow this method to run with empty events/ephemeral # iterables, so that callers do not need to check iterable size themselves. - if not events and not ephemeral and not to_device_messages: + if ( + not events + and not ephemeral + and not to_device_messages + and not device_list_summary + ): return if events: @@ -124,6 +134,10 @@ class ApplicationServiceScheduler: self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend( to_device_messages ) + if device_list_summary: + self.queuer.queued_device_list_summaries.setdefault( + appservice.id, [] + ).append(device_list_summary) # Kick off a new application service transaction self.queuer.start_background_request(appservice) @@ -144,6 +158,8 @@ class _ServiceQueuer: self.queued_ephemeral: Dict[str, List[JsonDict]] = {} # dict of {service_id: [to_device_message_json]} self.queued_to_device_messages: Dict[str, List[JsonDict]] = {} + # dict of {service_id: [device_list_summary]} + self.queued_device_list_summaries: Dict[str, List[DeviceLists]] = {} # the appservices which currently have a transaction in flight self.requests_in_flight = set() @@ -183,12 +199,49 @@ class _ServiceQueuer: ] del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION] - if not events and not ephemeral and not to_device_messages_to_send: + # Consolidate any pending device list summaries into a single, up-to-date + # summary. + # Note: this code assumes that in a single DeviceLists, a user will + # never be in both "changed" and "left" sets. + device_list_summary = DeviceLists() + while self.queued_device_list_summaries.get(service.id, []): + # Pop a summary off the front of the queue + summary = self.queued_device_list_summaries[service.id].pop(0) + + # For every user in the incoming "changed" set: + # * Remove them from the existing "left" set if necessary + # (as we need to start tracking them again) + # * Add them to the existing "changed" set if necessary. + for user_id in summary.changed: + if user_id in device_list_summary.left: + device_list_summary.left.remove(user_id) + device_list_summary.changed.add(user_id) + + # For every user in the incoming "left" set: + # * Remove them from the existing "changed" set if necessary + # (we no longer need to track them) + # * Add them to the existing "left" set if necessary. + for user_id in summary.left: + if user_id in device_list_summary.changed: + device_list_summary.changed.remove(user_id) + device_list_summary.left.add(user_id) + + if ( + not events + and not ephemeral + and not to_device_messages_to_send + # Note that DeviceLists implements __bool__ + and not device_list_summary + ): return try: await self.txn_ctrl.send( - service, events, ephemeral, to_device_messages_to_send + service, + events, + ephemeral, + to_device_messages_to_send, + device_list_summary, ) except Exception: logger.exception("AS request failed") @@ -227,6 +280,7 @@ class _TransactionController: events: List[EventBase], ephemeral: Optional[List[JsonDict]] = None, to_device_messages: Optional[List[JsonDict]] = None, + device_list_summary: Optional[DeviceLists] = None, ) -> None: """ Create a transaction with the given data and send to the provided @@ -237,6 +291,7 @@ class _TransactionController: events: The persistent events to include in the transaction. ephemeral: The ephemeral events to include in the transaction. to_device_messages: The to-device messages to include in the transaction. + device_list_summary: The device list summary to include in the transaction. """ try: txn = await self.store.create_appservice_txn( @@ -244,6 +299,7 @@ class _TransactionController: events=events, ephemeral=ephemeral or [], to_device_messages=to_device_messages or [], + device_list_summary=device_list_summary or DeviceLists(), ) service_is_up = await self._is_service_up(service) if service_is_up: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 39cabaef40..91de1f6652 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -33,9 +33,8 @@ from synapse.metrics.background_process_metrics import ( wrap_as_background_process, ) from synapse.storage.databases.main.directory import RoomAliasMapping -from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID +from synapse.types import DeviceLists, JsonDict, RoomAlias, RoomStreamToken, UserID from synapse.util.async_helpers import Linearizer -from synapse.util.caches.descriptors import _CacheContext, cached from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -344,22 +343,16 @@ class ApplicationServicesHandler: ) elif stream_key == "device_list_key": - users_whose_device_lists_changed = await self._get_device_list_changes( + device_list_summary = await self._get_device_list_summary( service, new_token ) - if users_whose_device_lists_changed: - # TODO: Have a way of including things in an outgoing appservice - # transaction that's not "events" or "ephemeral" - payload = [{ - "changed": users_whose_device_lists_changed, - "left": [], - }] - self.scheduler.submit_ephemeral_events_for_as( - service, payload + if device_list_summary: + self.scheduler.enqueue_for_appservice( + service, device_list_summary=device_list_summary ) # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( + await self.store.set_appservice_stream_type_pos( service, "device_list", new_token ) @@ -568,11 +561,11 @@ class ApplicationServicesHandler: return message_payload - async def _get_device_list_changes( + async def _get_device_list_summary( self, appservice: ApplicationService, new_key: int, - ) -> List[str]: + ) -> DeviceLists: """ Retrieve a list of users who have changed their device lists. @@ -581,8 +574,9 @@ class ApplicationServicesHandler: new_key: The stream key of the device list change that triggered this method call. Returns: - A list of users whose device lists have changed and need to be resynced by the - appservice. + A set of device list updates, comprised of users that the appservices needs to: + * resync the device list of, and + * stop tracking the device list of. """ # Fetch the last successfully processed device list update stream ID # for this appservice. @@ -591,21 +585,31 @@ class ApplicationServicesHandler: ) # Fetch the users who have modified their device list since then. - users_with_changed_device_lists = await self.store.get_users_whose_devices_changed( - from_key, filter_user_ids=None, to_key=new_key + users_with_changed_device_lists = ( + await self.store.get_users_whose_devices_changed( + from_key, filter_user_ids=None, to_key=new_key + ) ) # Filter out any users the application service is not interested in # # For each user who changed their device list, we want to check whether this - # appservice would be interested in the change - filtered_users_with_changed_device_lists = [ + # appservice would be interested in the change. + filtered_users_with_changed_device_lists = { user_id for user_id in users_with_changed_device_lists - if self._is_appservice_interested_in_device_lists_of_user(appservice, user_id) - ] + if self._is_appservice_interested_in_device_lists_of_user( + appservice, user_id + ) + } + + # Create a summary of "changed" and "left" users. + # TODO: Calculate "left" users. + device_list_summary = DeviceLists( + changed=filtered_users_with_changed_device_lists + ) - return filtered_users_with_changed_device_lists + return device_list_summary async def _is_appservice_interested_in_device_lists_of_user( self, @@ -641,9 +645,7 @@ class ApplicationServicesHandler: for room_id in room_ids: # This method covers checking room members for appservice interest as well as # room ID and alias checks. - if await appservice.is_interested_in_room( - room_id, self.store - ): + if await appservice.is_interested_in_room(room_id, self.store): return True return False diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index e0ca460c13..0ac2005bee 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -27,7 +27,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.types import Connection -from synapse.types import JsonDict +from synapse.types import DeviceLists, JsonDict from synapse.util import json_encoder if TYPE_CHECKING: @@ -195,6 +195,7 @@ class ApplicationServiceTransactionWorkerStore( events: List[EventBase], ephemeral: List[JsonDict], to_device_messages: List[JsonDict], + device_list_summary: DeviceLists, ) -> AppServiceTransaction: """Atomically creates a new transaction for this application service with the given list of events. Ephemeral events are NOT persisted to the @@ -205,6 +206,7 @@ class ApplicationServiceTransactionWorkerStore( events: A list of persistent events to put in the transaction. ephemeral: A list of ephemeral events to put in the transaction. to_device_messages: A list of to-device messages to put in the transaction. + device_list_summary: The device list summary to include in the transaction. Returns: A new transaction. @@ -240,6 +242,7 @@ class ApplicationServiceTransactionWorkerStore( events=events, ephemeral=ephemeral, to_device_messages=to_device_messages, + device_list_summary=device_list_summary, ) return await self.db_pool.runInteraction( @@ -337,6 +340,7 @@ class ApplicationServiceTransactionWorkerStore( events=events, ephemeral=[], to_device_messages=[], + device_list_summary=DeviceLists(), ) def _get_last_txn(self, txn, service_id: Optional[str]) -> int: diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 0d5d9830a5..f62bc03915 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -555,7 +555,10 @@ class DeviceWorkerStore(SQLBaseStore): } async def get_users_whose_devices_changed( - self, from_key: int, filter_user_ids: Optional[Iterable[str]] = None, to_key: Optional[int] = None + self, + from_key: int, + filter_user_ids: Optional[Iterable[str]] = None, + to_key: Optional[int] = None, ) -> Set[str]: """Get set of users whose devices have changed since `from_key` that are in the given list of user_ids. |