diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 07ec95f1d6..dac570a107 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -1,4 +1,5 @@
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -22,7 +23,7 @@ from netaddr import IPSet
from synapse.api.constants import EventTypes
from synapse.events import EventBase
-from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id
+from synapse.types import DeviceLists, GroupID, JsonDict, UserID, get_domain_from_id
from synapse.util.caches.descriptors import _CacheContext, cached
if TYPE_CHECKING:
@@ -400,6 +401,7 @@ class AppServiceTransaction:
to_device_messages: List[JsonDict],
one_time_key_counts: TransactionOneTimeKeyCounts,
unused_fallback_keys: TransactionUnusedFallbackKeys,
+ device_list_summary: DeviceLists,
):
self.service = service
self.id = id
@@ -408,6 +410,7 @@ class AppServiceTransaction:
self.to_device_messages = to_device_messages
self.one_time_key_counts = one_time_key_counts
self.unused_fallback_keys = unused_fallback_keys
+ self.device_list_summary = device_list_summary
async def send(self, as_api: "ApplicationServiceApi") -> bool:
"""Sends this transaction using the provided AS API interface.
@@ -424,6 +427,7 @@ class AppServiceTransaction:
to_device_messages=self.to_device_messages,
one_time_key_counts=self.one_time_key_counts,
unused_fallback_keys=self.unused_fallback_keys,
+ device_list_summary=self.device_list_summary,
txn_id=self.id,
)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 98fe354014..3f2c99812f 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -1,4 +1,5 @@
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -27,7 +28,7 @@ from synapse.appservice import (
from synapse.events import EventBase
from synapse.events.utils import SerializeEventConfig, serialize_event
from synapse.http.client import SimpleHttpClient
-from synapse.types import JsonDict, ThirdPartyInstanceID
+from synapse.types import DeviceLists, JsonDict, ThirdPartyInstanceID
from synapse.util.caches.response_cache import ResponseCache
if TYPE_CHECKING:
@@ -225,6 +226,7 @@ class ApplicationServiceApi(SimpleHttpClient):
to_device_messages: List[JsonDict],
one_time_key_counts: TransactionOneTimeKeyCounts,
unused_fallback_keys: TransactionUnusedFallbackKeys,
+ device_list_summary: DeviceLists,
txn_id: Optional[int] = None,
) -> bool:
"""
@@ -268,6 +270,7 @@ class ApplicationServiceApi(SimpleHttpClient):
}
)
+ # TODO: Update to stable prefixes once MSC3202 completes FCP merge
if service.msc3202_transaction_extensions:
if one_time_key_counts:
body[
@@ -277,6 +280,11 @@ class ApplicationServiceApi(SimpleHttpClient):
body[
"org.matrix.msc3202.device_unused_fallback_keys"
] = unused_fallback_keys
+ if device_list_summary:
+ body["org.matrix.msc3202.device_lists"] = {
+ "changed": list(device_list_summary.changed),
+ "left": list(device_list_summary.left),
+ }
try:
await self.put_json(
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 72417151ba..a9f9fa67ac 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -72,7 +72,7 @@ from synapse.events import EventBase
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main import DataStore
-from synapse.types import JsonDict
+from synapse.types import DeviceLists, JsonDict
from synapse.util import Clock
if TYPE_CHECKING:
@@ -122,6 +122,7 @@ class ApplicationServiceScheduler:
events: Optional[Collection[EventBase]] = None,
ephemeral: Optional[Collection[JsonDict]] = None,
to_device_messages: Optional[Collection[JsonDict]] = None,
+ device_list_summary: Optional[DeviceLists] = None,
) -> None:
"""
Enqueue some data to be sent off to an application service.
@@ -133,10 +134,18 @@ class ApplicationServiceScheduler:
to_device_messages: The to-device messages to send. These differ from normal
to-device messages sent to clients, as they have 'to_device_id' and
'to_user_id' fields.
+ device_list_summary: A summary of users that the application service either needs
+ to refresh the device lists of, or those that the application service need no
+ longer track the device lists of.
"""
# We purposefully allow this method to run with empty events/ephemeral
# collections, so that callers do not need to check iterable size themselves.
- if not events and not ephemeral and not to_device_messages:
+ if (
+ not events
+ and not ephemeral
+ and not to_device_messages
+ and not device_list_summary
+ ):
return
if events:
@@ -147,6 +156,10 @@ class ApplicationServiceScheduler:
self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend(
to_device_messages
)
+ if device_list_summary:
+ self.queuer.queued_device_list_summaries.setdefault(
+ appservice.id, []
+ ).append(device_list_summary)
# Kick off a new application service transaction
self.queuer.start_background_request(appservice)
@@ -169,6 +182,8 @@ class _ServiceQueuer:
self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
# dict of {service_id: [to_device_message_json]}
self.queued_to_device_messages: Dict[str, List[JsonDict]] = {}
+ # dict of {service_id: [device_list_summary]}
+ self.queued_device_list_summaries: Dict[str, List[DeviceLists]] = {}
# the appservices which currently have a transaction in flight
self.requests_in_flight: Set[str] = set()
@@ -212,7 +227,40 @@ class _ServiceQueuer:
]
del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION]
- if not events and not ephemeral and not to_device_messages_to_send:
+ # Consolidate any pending device list summaries into a single, up-to-date
+ # summary.
+ # Note: this code assumes that in a single DeviceLists, a user will
+ # never be in both "changed" and "left" sets.
+ device_list_summary = DeviceLists()
+ while self.queued_device_list_summaries.get(service.id, []):
+ # Pop a summary off the front of the queue
+ summary = self.queued_device_list_summaries[service.id].pop(0)
+
+ # For every user in the incoming "changed" set:
+ # * Remove them from the existing "left" set if necessary
+ # (as we need to start tracking them again)
+ # * Add them to the existing "changed" set if necessary.
+ for user_id in summary.changed:
+ if user_id in device_list_summary.left:
+ device_list_summary.left.remove(user_id)
+ device_list_summary.changed.add(user_id)
+
+ # For every user in the incoming "left" set:
+ # * Remove them from the existing "changed" set if necessary
+ # (we no longer need to track them)
+ # * Add them to the existing "left" set if necessary.
+ for user_id in summary.left:
+ if user_id in device_list_summary.changed:
+ device_list_summary.changed.remove(user_id)
+ device_list_summary.left.add(user_id)
+
+ if (
+ not events
+ and not ephemeral
+ and not to_device_messages_to_send
+ # Note that DeviceLists implements __bool__
+ and not device_list_summary
+ ):
return
one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None
@@ -240,6 +288,7 @@ class _ServiceQueuer:
to_device_messages_to_send,
one_time_key_counts,
unused_fallback_keys,
+ device_list_summary,
)
except Exception:
logger.exception("AS request failed")
@@ -322,6 +371,7 @@ class _TransactionController:
to_device_messages: Optional[List[JsonDict]] = None,
one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None,
unused_fallback_keys: Optional[TransactionUnusedFallbackKeys] = None,
+ device_list_summary: Optional[DeviceLists] = None,
) -> None:
"""
Create a transaction with the given data and send to the provided
@@ -336,6 +386,7 @@ class _TransactionController:
appservice devices in the transaction.
unused_fallback_keys: Lists of unused fallback keys for relevant
appservice devices in the transaction.
+ device_list_summary: The device list summary to include in the transaction.
"""
try:
txn = await self.store.create_appservice_txn(
@@ -345,6 +396,7 @@ class _TransactionController:
to_device_messages=to_device_messages or [],
one_time_key_counts=one_time_key_counts or {},
unused_fallback_keys=unused_fallback_keys or {},
+ device_list_summary=device_list_summary or DeviceLists(),
)
service_is_up = await self._is_service_up(service)
if service_is_up:
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index 439bfe1526..ada165f238 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -170,6 +170,7 @@ def _load_appservice(
# When enabled, appservice transactions contain the following information:
# - device One-Time Key counts
# - device unused fallback key usage states
+ # - device list changes
msc3202_transaction_extensions = as_info.get("org.matrix.msc3202", False)
if not isinstance(msc3202_transaction_extensions, bool):
raise ValueError(
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index c1716e70ff..1615f5d530 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -29,7 +29,7 @@ from synapse.storage._base import db_to_json
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
-from synapse.types import JsonDict
+from synapse.types import DeviceLists, JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import _CacheContext, cached
@@ -217,6 +217,7 @@ class ApplicationServiceTransactionWorkerStore(
to_device_messages: List[JsonDict],
one_time_key_counts: TransactionOneTimeKeyCounts,
unused_fallback_keys: TransactionUnusedFallbackKeys,
+ device_list_summary: DeviceLists,
) -> AppServiceTransaction:
"""Atomically creates a new transaction for this application service
with the given list of events. Ephemeral events are NOT persisted to the
@@ -231,6 +232,7 @@ class ApplicationServiceTransactionWorkerStore(
appservice devices in the transaction.
unused_fallback_keys: Lists of unused fallback keys for relevant
appservice devices in the transaction.
+ device_list_summary: The device list summary to include in the transaction.
Returns:
A new transaction.
@@ -268,6 +270,7 @@ class ApplicationServiceTransactionWorkerStore(
to_device_messages=to_device_messages,
one_time_key_counts=one_time_key_counts,
unused_fallback_keys=unused_fallback_keys,
+ device_list_summary=device_list_summary,
)
return await self.db_pool.runInteraction(
@@ -359,8 +362,8 @@ class ApplicationServiceTransactionWorkerStore(
events = await self.get_events_as_list(event_ids)
- # TODO: to-device messages, one-time key counts and unused fallback keys
- # are not yet populated for catch-up transactions.
+ # TODO: to-device messages, one-time key counts, device list summaries and unused
+ # fallback keys are not yet populated for catch-up transactions.
# We likely want to populate those for reliability.
return AppServiceTransaction(
service=service,
@@ -370,6 +373,7 @@ class ApplicationServiceTransactionWorkerStore(
to_device_messages=[],
one_time_key_counts={},
unused_fallback_keys={},
+ device_list_summary=DeviceLists(),
)
def _get_last_txn(self, txn, service_id: Optional[str]) -> int:
@@ -430,7 +434,7 @@ class ApplicationServiceTransactionWorkerStore(
async def get_type_stream_id_for_appservice(
self, service: ApplicationService, type: str
) -> int:
- if type not in ("read_receipt", "presence", "to_device"):
+ if type not in ("read_receipt", "presence", "to_device", "device_list"):
raise ValueError(
"Expected type to be a valid application stream id type, got %s"
% (type,)
@@ -457,7 +461,7 @@ class ApplicationServiceTransactionWorkerStore(
async def set_appservice_stream_type_pos(
self, service: ApplicationService, stream_type: str, pos: Optional[int]
) -> None:
- if stream_type not in ("read_receipt", "presence", "to_device"):
+ if stream_type not in ("read_receipt", "presence", "to_device", "device_list"):
raise ValueError(
"Expected type to be a valid application stream id type, got %s"
% (stream_type,)
|