summary refs log tree commit diff
diff options
context:
space:
mode:
authorWill Hunt <will@half-shot.uk>2020-10-01 14:14:10 +0100
committerWill Hunt <will@half-shot.uk>2020-10-01 14:14:10 +0100
commit5cb3d237aecf1af5b058a811d183df2f9507efd0 (patch)
treed086ac378135d0402871c37ab1c92be1988677c3
parentAdd basic support for device list updates (diff)
downloadsynapse-5cb3d237aecf1af5b058a811d183df2f9507efd0.tar.xz
Send EDUs over /transaction and drop device stuff
-rw-r--r--synapse/appservice/__init__.py8
-rw-r--r--synapse/appservice/api.py35
-rw-r--r--synapse/appservice/scheduler.py31
-rw-r--r--synapse/handlers/appservice.py71
-rw-r--r--synapse/storage/databases/main/appservice.py6
-rw-r--r--synapse/storage/databases/main/deviceinbox.py35
-rw-r--r--synapse/storage/databases/main/devices.py26
-rw-r--r--synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql25
8 files changed, 52 insertions, 185 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index a93c08ca81..1335175009 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -35,10 +35,11 @@ class ApplicationServiceState:
 class AppServiceTransaction:
     """Represents an application service transaction."""
 
-    def __init__(self, service, id, events):
+    def __init__(self, service, id, events, ephemeral=None):
         self.service = service
         self.id = id
         self.events = events
+        self.ephemeral = ephemeral
 
     async def send(self, as_api: ApplicationServiceApi) -> bool:
         """Sends this transaction using the provided AS API interface.
@@ -49,7 +50,10 @@ class AppServiceTransaction:
             True if the transaction was sent.
         """
         return await as_api.push_bulk(
-            service=self.service, events=self.events, txn_id=self.id
+            service=self.service,
+            events=self.events,
+            ephemeral=self.ephemeral,
+            txn_id=self.id,
         )
 
     async def complete(self, store: "DataStore") -> None:
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 364c1a88f3..361d2b105c 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -201,33 +201,7 @@ class ApplicationServiceApi(SimpleHttpClient):
         key = (service.id, protocol)
         return await self.protocol_meta_cache.wrap(key, _get)
 
-    async def push_ephemeral(self, service, events, to_device=None, device_lists=None):
-        if service.url is None:
-            return True
-        if service.supports_ephemeral is False:
-            return True
-
-        uri = service.url + (
-            "%s/uk.half-shot.appservice/ephemeral" % APP_SERVICE_PREFIX
-        )
-        try:
-            await self.put_json(
-                uri=uri,
-                json_body={
-                    "events": events,
-                    "device_messages": to_device,
-                    "device_lists": device_lists,
-                },
-                args={"access_token": service.hs_token},
-            )
-            return True
-        except CodeMessageException as e:
-            logger.warning("push_ephemeral to %s received %s", uri, e.code)
-        except Exception as ex:
-            logger.warning("push_ephemeral to %s threw exception %s", uri, ex)
-        return False
-
-    async def push_bulk(self, service, events, txn_id=None):
+    async def push_bulk(self, service, events, ephemeral=None, txn_id=None):
         if service.url is None:
             return True
 
@@ -241,11 +215,12 @@ class ApplicationServiceApi(SimpleHttpClient):
         txn_id = str(txn_id)
 
         uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id))
+        body = {"events": events}
+        if ephemeral:
+            body["uk.half-shot.appservice.ephemeral"] = ephemeral
         try:
             await self.put_json(
-                uri=uri,
-                json_body={"events": events},
-                args={"access_token": service.hs_token},
+                uri=uri, json_body=body, args={"access_token": service.hs_token},
             )
             sent_transactions_counter.labels(service.id).inc()
             sent_events_counter.labels(service.id).inc(len(events))
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 74bd095677..31b664dd36 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -85,9 +85,8 @@ class ApplicationServiceScheduler:
     def submit_event_for_as(self, service, event):
         self.queuer.enqueue(service, event)
 
-    async def submit_ephemeral_events_for_as(self, service, events):
-        if self.txn_ctrl.is_service_up(service):
-            await self.as_api.push_ephemeral(service, events)
+    def submit_ephemeral_events_for_as(self, service, events):
+        self.queuer.enqueue_ephemeral(service, events)
 
 
 class _ServiceQueuer:
@@ -100,6 +99,7 @@ class _ServiceQueuer:
 
     def __init__(self, txn_ctrl, clock):
         self.queued_events = {}  # dict of {service_id: [events]}
+        self.queued_ephemeral = {} # dict of {service_id: [events]}
 
         # the appservices which currently have a transaction in flight
         self.requests_in_flight = set()
@@ -115,10 +115,22 @@ class _ServiceQueuer:
             return
 
         run_as_background_process(
-            "as-sender-%s" % (service.id,), self._send_request, service
+            "as-sender-%s" % (service.id), self._send_request, service
         )
 
-    async def _send_request(self, service):
+    def enqueue_ephemeral(self, service, events):
+        self.queued_ephemeral.setdefault(service.id, []).extend(events)
+
+        # start a sender for this appservice if we don't already have one
+
+        if service.id in self.requests_in_flight:
+            return
+
+        run_as_background_process(
+            "as-sender-%s" % (service.id), self._send_request, service
+        )
+
+    async def _send_request(self, service, ephemeral=None):
         # sanity-check: we shouldn't get here if this service already has a sender
         # running.
         assert service.id not in self.requests_in_flight
@@ -127,10 +139,11 @@ class _ServiceQueuer:
         try:
             while True:
                 events = self.queued_events.pop(service.id, [])
-                if not events:
+                ephemeral = self.queued_ephemeral.pop(service.id, [])
+                if not events and not ephemeral:
                     return
                 try:
-                    await self.txn_ctrl.send(service, events)
+                    await self.txn_ctrl.send(service, events, ephemeral)
                 except Exception:
                     logger.exception("AS request failed")
         finally:
@@ -162,9 +175,9 @@ class _TransactionController:
         # for UTs
         self.RECOVERER_CLASS = _Recoverer
 
-    async def send(self, service, events):
+    async def send(self, service, events, ephemeral=None):
         try:
-            txn = await self.store.create_appservice_txn(service=service, events=events)
+            txn = await self.store.create_appservice_txn(service=service, events=events, ephemeral=ephemeral)
             service_is_up = await self.is_service_up(service)
             if service_is_up:
                 sent = await txn.send(self.as_api)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 6abc2891cf..0f834c93ab 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -179,39 +179,27 @@ class ApplicationServicesHandler:
         logger.info("Checking interested services for %s" % (stream_key))
         with Measure(self.clock, "notify_interested_services_ephemeral"):
             for service in services:
-                events = []
                 if stream_key == "typing_key":
                     events = await self._handle_typing(service, new_token)
+                    if events:
+                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                    # We don't persist the token for typing_key for performance reasons
                 elif stream_key == "receipt_key":
                     events = await self._handle_receipts(service)
-                elif stream_key == "presence_key":
-                    events = await self._handle_as_presence(service, users)
-                elif stream_key == "device_list_key":
-                    # Check if the device lists have changed for any of the users we are interested in
-                    events = await self._handle_device_list(service, users, new_token)
-                elif stream_key == "to_device_key":
-                    # Check the inbox for any users the bridge owns
-                    events = await self._handle_to_device(service, users, new_token)
-                if events:
-                    # TODO: Do in background?
-                    await self.scheduler.submit_ephemeral_events_for_as(
-                        service, events, new_token
-                    )
-                    # We don't persist the token for typing_key
-                    if stream_key == "presence_key":
-                        await self.store.set_type_stream_id_for_appservice(
-                            service, "presence", new_token
-                        )
-                    elif stream_key == "receipt_key":
+                    if events:
+                        self.scheduler.submit_ephemeral_events_for_as(service, events)
                         await self.store.set_type_stream_id_for_appservice(
                             service, "read_receipt", new_token
                         )
-                    elif stream_key == "to_device_key":
-                        await self.store.set_type_stream_id_for_appservice(
-                            service, "to_device", new_token
-                        )
+                elif stream_key == "presence_key":
+                    events = await self._handle_as_presence(service, users)
+                    if events:
+                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                    await self.store.set_type_stream_id_for_appservice(
+                        service, "presence", new_token
+                    )
 
-    async def _handle_typing(self, service, new_token):
+    async def _handle_typing(self, service: ApplicationService, new_token: int):
         typing_source = self.event_sources.sources["typing"]
         # Get the typing events from just before current
         typing, _key = await typing_source.get_new_events_as(
@@ -223,7 +211,7 @@ class ApplicationServicesHandler:
         )
         return typing
 
-    async def _handle_receipts(self, service, token: int):
+    async def _handle_receipts(self, service: ApplicationService, token: int):
         from_key = await self.store.get_type_stream_id_for_appservice(
             service, "read_receipt"
         )
@@ -233,36 +221,7 @@ class ApplicationServicesHandler:
         )
         return receipts
 
-    async def _handle_device_list(
-        self, service: ApplicationService, users: List[str], new_token: int
-    ):
-        # TODO: Determine if any user have left and report those
-        from_token = await self.store.get_type_stream_id_for_appservice(
-            service, "device_list"
-        )
-        changed_user_ids = await self.store.get_device_changes_for_as(
-            service, from_token, new_token
-        )
-        # Return the
-        return {
-            "type": "m.device_list_update",
-            "content": {"changed": changed_user_ids,}, 
-        }
-
-    async def _handle_to_device(self, service, users, token):
-        if not any([True for u in users if service.is_interested_in_user(u)]):
-            return False
-
-        since_token = await self.store.get_type_stream_id_for_appservice(
-            service, "to_device"
-        )
-        messages, _ = await self.store.get_new_messages_for_as(
-            service, since_token, token
-        )
-        # This returns user_id -> device_id -> message
-        return messages
-
-    async def _handle_as_presence(self, service, users):
+    async def _handle_as_presence(self, service: ApplicationService, users: List[str]):
         events = []
         presence_source = self.event_sources.sources["presence"]
         from_key = await self.store.get_type_stream_id_for_appservice(
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 91c0b52b34..b29ce1d1be 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -172,7 +172,7 @@ class ApplicationServiceTransactionWorkerStore(
             "application_services_state", {"as_id": service.id}, {"state": state}
         )
 
-    async def create_appservice_txn(self, service, events):
+    async def create_appservice_txn(self, service, events, ephemeral=None):
         """Atomically creates a new transaction for this application service
         with the given list of events.
 
@@ -207,7 +207,9 @@ class ApplicationServiceTransactionWorkerStore(
                 "VALUES(?,?,?)",
                 (service.id, new_txn_id, event_ids),
             )
-            return AppServiceTransaction(service=service, id=new_txn_id, events=events)
+            return AppServiceTransaction(
+                service=service, id=new_txn_id, events=events, ephemeral=ephemeral
+            )
 
         return await self.db_pool.runInteraction(
             "create_appservice_txn", _create_appservice_txn
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 8897e27b1f..d42faa3f1f 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -16,7 +16,6 @@
 import logging
 from typing import List, Tuple
 
-from synapse.appservice import ApplicationService
 from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import DatabasePool
@@ -30,40 +29,6 @@ class DeviceInboxWorkerStore(SQLBaseStore):
     def get_to_device_stream_token(self):
         return self._device_inbox_id_gen.get_current_token()
 
-    async def get_new_messages_for_as(
-        self,
-        service: ApplicationService,
-        last_stream_id: int,
-        current_stream_id: int,
-        limit: int = 100,
-    ) -> Tuple[List[dict], int]:
-        def get_new_messages_for_device_txn(txn):
-            sql = (
-                "SELECT stream_id, message_json, device_id, user_id FROM device_inbox"
-                " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC"
-                " LIMIT ?"
-            )
-            txn.execute(sql, (last_stream_id, current_stream_id, limit))
-            messages = []
-
-            for row in txn:
-                stream_pos = row[0]
-                if service.is_interested_in_user(row.user_id):
-                    msg = db_to_json(row[1])
-                    msg.recipient = {
-                        "device_id": row.device_id,
-                        "user_id": row.user_id,
-                    }
-                    messages.append(msg)
-            if len(messages) < limit:
-                stream_pos = current_stream_id
-            return messages, stream_pos
-
-        return await self.db_pool.runInteraction(
-            "get_new_messages_for_device", get_new_messages_for_device_txn
-        )
-
     async def get_new_messages_for_device(
         self,
         user_id: str,
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index bf32cc6c06..fdf394c612 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -19,7 +19,6 @@ import logging
 from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
 
 from synapse.api.errors import Codes, StoreError
-from synapse.appservice import ApplicationService
 from synapse.logging.opentracing import (
     get_active_span_text_map,
     set_tag,
@@ -526,31 +525,6 @@ class DeviceWorkerStore(SQLBaseStore):
             "get_users_whose_devices_changed", _get_users_whose_devices_changed_txn
         )
 
-    async def get_device_changes_for_as(
-        self,
-        service: ApplicationService,
-        last_stream_id: int,
-        current_stream_id: int,
-        limit: int = 100,
-    ) -> Tuple[List[dict], int]:
-        def get_device_changes_for_as_txn(txn):
-            sql = (
-                "SELECT DISTINCT user_ids FROM device_lists_stream"
-                " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC"
-                " LIMIT ?"
-            )
-            txn.execute(sql, (last_stream_id, current_stream_id, limit))
-            rows = txn.fetchall()
-            users = []
-            for user in db_to_json(rows[0]):
-                if await service.is_interested_in_presence(user):
-                    users.append(user)
-
-        return await self.db_pool.runInteraction(
-            "get_device_changes_for_as", get_device_changes_for_as_txn
-        )
-
     async def get_users_whose_signatures_changed(
         self, user_id: str, from_key: int
     ) -> Set[str]:
diff --git a/synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql b/synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql
deleted file mode 100644
index d4abfb6183..0000000000
--- a/synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql
+++ /dev/null
@@ -1,25 +0,0 @@
-/* Copyright 2020 The Matrix.org Foundation C.I.C
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- /* for some reason, we have accumulated duplicate entries in
-  * device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less
-  * efficient.
-  */
-
-ALTER TABLE application_services_state
-    ADD COLUMN device_list_stream_id INT;
-
-ALTER TABLE application_services_state
-    ADD COLUMN device_message_stream_id INT;
\ No newline at end of file