diff options
author | Will Hunt <will@half-shot.uk> | 2020-10-01 14:14:10 +0100 |
---|---|---|
committer | Will Hunt <will@half-shot.uk> | 2020-10-01 14:14:10 +0100 |
commit | 5cb3d237aecf1af5b058a811d183df2f9507efd0 (patch) | |
tree | d086ac378135d0402871c37ab1c92be1988677c3 | |
parent | Add basic support for device list updates (diff) | |
download | synapse-5cb3d237aecf1af5b058a811d183df2f9507efd0.tar.xz |
Send EDUs over /transaction and drop device stuff
-rw-r--r-- | synapse/appservice/__init__.py | 8 | ||||
-rw-r--r-- | synapse/appservice/api.py | 35 | ||||
-rw-r--r-- | synapse/appservice/scheduler.py | 31 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 71 | ||||
-rw-r--r-- | synapse/storage/databases/main/appservice.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/deviceinbox.py | 35 | ||||
-rw-r--r-- | synapse/storage/databases/main/devices.py | 26 | ||||
-rw-r--r-- | synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql | 25 |
8 files changed, 52 insertions, 185 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index a93c08ca81..1335175009 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -35,10 +35,11 @@ class ApplicationServiceState: class AppServiceTransaction: """Represents an application service transaction.""" - def __init__(self, service, id, events): + def __init__(self, service, id, events, ephemeral=None): self.service = service self.id = id self.events = events + self.ephemeral = ephemeral async def send(self, as_api: ApplicationServiceApi) -> bool: """Sends this transaction using the provided AS API interface. @@ -49,7 +50,10 @@ class AppServiceTransaction: True if the transaction was sent. """ return await as_api.push_bulk( - service=self.service, events=self.events, txn_id=self.id + service=self.service, + events=self.events, + ephemeral=self.ephemeral, + txn_id=self.id, ) async def complete(self, store: "DataStore") -> None: diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 364c1a88f3..361d2b105c 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -201,33 +201,7 @@ class ApplicationServiceApi(SimpleHttpClient): key = (service.id, protocol) return await self.protocol_meta_cache.wrap(key, _get) - async def push_ephemeral(self, service, events, to_device=None, device_lists=None): - if service.url is None: - return True - if service.supports_ephemeral is False: - return True - - uri = service.url + ( - "%s/uk.half-shot.appservice/ephemeral" % APP_SERVICE_PREFIX - ) - try: - await self.put_json( - uri=uri, - json_body={ - "events": events, - "device_messages": to_device, - "device_lists": device_lists, - }, - args={"access_token": service.hs_token}, - ) - return True - except CodeMessageException as e: - logger.warning("push_ephemeral to %s received %s", uri, e.code) - except Exception as ex: - logger.warning("push_ephemeral to %s threw exception %s", uri, ex) - return False - - async def push_bulk(self, service, events, txn_id=None): + async def push_bulk(self, service, events, ephemeral=None, txn_id=None): if service.url is None: return True @@ -241,11 +215,12 @@ class ApplicationServiceApi(SimpleHttpClient): txn_id = str(txn_id) uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id)) + body = {"events": events} + if ephemeral: + body["uk.half-shot.appservice.ephemeral"] = ephemeral try: await self.put_json( - uri=uri, - json_body={"events": events}, - args={"access_token": service.hs_token}, + uri=uri, json_body=body, args={"access_token": service.hs_token}, ) sent_transactions_counter.labels(service.id).inc() sent_events_counter.labels(service.id).inc(len(events)) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 74bd095677..31b664dd36 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -85,9 +85,8 @@ class ApplicationServiceScheduler: def submit_event_for_as(self, service, event): self.queuer.enqueue(service, event) - async def submit_ephemeral_events_for_as(self, service, events): - if self.txn_ctrl.is_service_up(service): - await self.as_api.push_ephemeral(service, events) + def submit_ephemeral_events_for_as(self, service, events): + self.queuer.enqueue_ephemeral(service, events) class _ServiceQueuer: @@ -100,6 +99,7 @@ class _ServiceQueuer: def __init__(self, txn_ctrl, clock): self.queued_events = {} # dict of {service_id: [events]} + self.queued_ephemeral = {} # dict of {service_id: [events]} # the appservices which currently have a transaction in flight self.requests_in_flight = set() @@ -115,10 +115,22 @@ class _ServiceQueuer: return run_as_background_process( - "as-sender-%s" % (service.id,), self._send_request, service + "as-sender-%s" % (service.id), self._send_request, service ) - async def _send_request(self, service): + def enqueue_ephemeral(self, service, events): + self.queued_ephemeral.setdefault(service.id, []).extend(events) + + # start a sender for this appservice if we don't already have one + + if service.id in self.requests_in_flight: + return + + run_as_background_process( + "as-sender-%s" % (service.id), self._send_request, service + ) + + async def _send_request(self, service, ephemeral=None): # sanity-check: we shouldn't get here if this service already has a sender # running. assert service.id not in self.requests_in_flight @@ -127,10 +139,11 @@ class _ServiceQueuer: try: while True: events = self.queued_events.pop(service.id, []) - if not events: + ephemeral = self.queued_ephemeral.pop(service.id, []) + if not events and not ephemeral: return try: - await self.txn_ctrl.send(service, events) + await self.txn_ctrl.send(service, events, ephemeral) except Exception: logger.exception("AS request failed") finally: @@ -162,9 +175,9 @@ class _TransactionController: # for UTs self.RECOVERER_CLASS = _Recoverer - async def send(self, service, events): + async def send(self, service, events, ephemeral=None): try: - txn = await self.store.create_appservice_txn(service=service, events=events) + txn = await self.store.create_appservice_txn(service=service, events=events, ephemeral=ephemeral) service_is_up = await self.is_service_up(service) if service_is_up: sent = await txn.send(self.as_api) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 6abc2891cf..0f834c93ab 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -179,39 +179,27 @@ class ApplicationServicesHandler: logger.info("Checking interested services for %s" % (stream_key)) with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: - events = [] if stream_key == "typing_key": events = await self._handle_typing(service, new_token) + if events: + self.scheduler.submit_ephemeral_events_for_as(service, events) + # We don't persist the token for typing_key for performance reasons elif stream_key == "receipt_key": events = await self._handle_receipts(service) - elif stream_key == "presence_key": - events = await self._handle_as_presence(service, users) - elif stream_key == "device_list_key": - # Check if the device lists have changed for any of the users we are interested in - events = await self._handle_device_list(service, users, new_token) - elif stream_key == "to_device_key": - # Check the inbox for any users the bridge owns - events = await self._handle_to_device(service, users, new_token) - if events: - # TODO: Do in background? - await self.scheduler.submit_ephemeral_events_for_as( - service, events, new_token - ) - # We don't persist the token for typing_key - if stream_key == "presence_key": - await self.store.set_type_stream_id_for_appservice( - service, "presence", new_token - ) - elif stream_key == "receipt_key": + if events: + self.scheduler.submit_ephemeral_events_for_as(service, events) await self.store.set_type_stream_id_for_appservice( service, "read_receipt", new_token ) - elif stream_key == "to_device_key": - await self.store.set_type_stream_id_for_appservice( - service, "to_device", new_token - ) + elif stream_key == "presence_key": + events = await self._handle_as_presence(service, users) + if events: + self.scheduler.submit_ephemeral_events_for_as(service, events) + await self.store.set_type_stream_id_for_appservice( + service, "presence", new_token + ) - async def _handle_typing(self, service, new_token): + async def _handle_typing(self, service: ApplicationService, new_token: int): typing_source = self.event_sources.sources["typing"] # Get the typing events from just before current typing, _key = await typing_source.get_new_events_as( @@ -223,7 +211,7 @@ class ApplicationServicesHandler: ) return typing - async def _handle_receipts(self, service, token: int): + async def _handle_receipts(self, service: ApplicationService, token: int): from_key = await self.store.get_type_stream_id_for_appservice( service, "read_receipt" ) @@ -233,36 +221,7 @@ class ApplicationServicesHandler: ) return receipts - async def _handle_device_list( - self, service: ApplicationService, users: List[str], new_token: int - ): - # TODO: Determine if any user have left and report those - from_token = await self.store.get_type_stream_id_for_appservice( - service, "device_list" - ) - changed_user_ids = await self.store.get_device_changes_for_as( - service, from_token, new_token - ) - # Return the - return { - "type": "m.device_list_update", - "content": {"changed": changed_user_ids,}, - } - - async def _handle_to_device(self, service, users, token): - if not any([True for u in users if service.is_interested_in_user(u)]): - return False - - since_token = await self.store.get_type_stream_id_for_appservice( - service, "to_device" - ) - messages, _ = await self.store.get_new_messages_for_as( - service, since_token, token - ) - # This returns user_id -> device_id -> message - return messages - - async def _handle_as_presence(self, service, users): + async def _handle_as_presence(self, service: ApplicationService, users: List[str]): events = [] presence_source = self.event_sources.sources["presence"] from_key = await self.store.get_type_stream_id_for_appservice( diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 91c0b52b34..b29ce1d1be 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -172,7 +172,7 @@ class ApplicationServiceTransactionWorkerStore( "application_services_state", {"as_id": service.id}, {"state": state} ) - async def create_appservice_txn(self, service, events): + async def create_appservice_txn(self, service, events, ephemeral=None): """Atomically creates a new transaction for this application service with the given list of events. @@ -207,7 +207,9 @@ class ApplicationServiceTransactionWorkerStore( "VALUES(?,?,?)", (service.id, new_txn_id, event_ids), ) - return AppServiceTransaction(service=service, id=new_txn_id, events=events) + return AppServiceTransaction( + service=service, id=new_txn_id, events=events, ephemeral=ephemeral + ) return await self.db_pool.runInteraction( "create_appservice_txn", _create_appservice_txn diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 8897e27b1f..d42faa3f1f 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -16,7 +16,6 @@ import logging from typing import List, Tuple -from synapse.appservice import ApplicationService from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool @@ -30,40 +29,6 @@ class DeviceInboxWorkerStore(SQLBaseStore): def get_to_device_stream_token(self): return self._device_inbox_id_gen.get_current_token() - async def get_new_messages_for_as( - self, - service: ApplicationService, - last_stream_id: int, - current_stream_id: int, - limit: int = 100, - ) -> Tuple[List[dict], int]: - def get_new_messages_for_device_txn(txn): - sql = ( - "SELECT stream_id, message_json, device_id, user_id FROM device_inbox" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC" - " LIMIT ?" - ) - txn.execute(sql, (last_stream_id, current_stream_id, limit)) - messages = [] - - for row in txn: - stream_pos = row[0] - if service.is_interested_in_user(row.user_id): - msg = db_to_json(row[1]) - msg.recipient = { - "device_id": row.device_id, - "user_id": row.user_id, - } - messages.append(msg) - if len(messages) < limit: - stream_pos = current_stream_id - return messages, stream_pos - - return await self.db_pool.runInteraction( - "get_new_messages_for_device", get_new_messages_for_device_txn - ) - async def get_new_messages_for_device( self, user_id: str, diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index bf32cc6c06..fdf394c612 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -19,7 +19,6 @@ import logging from typing import Any, Dict, Iterable, List, Optional, Set, Tuple from synapse.api.errors import Codes, StoreError -from synapse.appservice import ApplicationService from synapse.logging.opentracing import ( get_active_span_text_map, set_tag, @@ -526,31 +525,6 @@ class DeviceWorkerStore(SQLBaseStore): "get_users_whose_devices_changed", _get_users_whose_devices_changed_txn ) - async def get_device_changes_for_as( - self, - service: ApplicationService, - last_stream_id: int, - current_stream_id: int, - limit: int = 100, - ) -> Tuple[List[dict], int]: - def get_device_changes_for_as_txn(txn): - sql = ( - "SELECT DISTINCT user_ids FROM device_lists_stream" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC" - " LIMIT ?" - ) - txn.execute(sql, (last_stream_id, current_stream_id, limit)) - rows = txn.fetchall() - users = [] - for user in db_to_json(rows[0]): - if await service.is_interested_in_presence(user): - users.append(user) - - return await self.db_pool.runInteraction( - "get_device_changes_for_as", get_device_changes_for_as_txn - ) - async def get_users_whose_signatures_changed( self, user_id: str, from_key: int ) -> Set[str]: diff --git a/synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql b/synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql deleted file mode 100644 index d4abfb6183..0000000000 --- a/synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql +++ /dev/null @@ -1,25 +0,0 @@ -/* Copyright 2020 The Matrix.org Foundation C.I.C - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - /* for some reason, we have accumulated duplicate entries in - * device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less - * efficient. - */ - -ALTER TABLE application_services_state - ADD COLUMN device_list_stream_id INT; - -ALTER TABLE application_services_state - ADD COLUMN device_message_stream_id INT; \ No newline at end of file |