summary refs log tree commit diff
diff options
context:
space:
mode:
authorWill Hunt <will@half-shot.uk>2020-09-22 11:31:59 +0100
committerWill Hunt <will@half-shot.uk>2020-09-22 11:31:59 +0100
commit316ad09a64b455efc18823f92619fd9c7d3a6c63 (patch)
tree436cfef82bfded78850b4a4806f0cf0cc3aed547
parentLast little bits (diff)
downloadsynapse-316ad09a64b455efc18823f92619fd9c7d3a6c63.tar.xz
Add support for device messages, start support for device lists
-rw-r--r--synapse/appservice/api.py4
-rw-r--r--synapse/handlers/appservice.py76
-rw-r--r--synapse/storage/databases/main/appservice.py44
-rw-r--r--synapse/storage/databases/main/deviceinbox.py32
-rw-r--r--synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql25
5 files changed, 153 insertions, 28 deletions
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 48982d2ad3..9523c3a5d9 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -201,7 +201,7 @@ class ApplicationServiceApi(SimpleHttpClient):
         key = (service.id, protocol)
         return await self.protocol_meta_cache.wrap(key, _get)
 
-    async def push_ephemeral(self, service, events):
+    async def push_ephemeral(self, service, events, to_device=None, device_lists=None):
         if service.url is None:
             return True
         if service.supports_ephemeral is False:
@@ -213,7 +213,7 @@ class ApplicationServiceApi(SimpleHttpClient):
         try:
             await self.put_json(
                 uri=uri,
-                json_body={"events": events},
+                json_body={"events": events, "device_messages": to_device, "device_lists": device_lists},
                 args={"access_token": service.hs_token},
             )
             return True
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 9813447903..dbbde3db18 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -201,35 +201,61 @@ class ApplicationServicesHandler:
                     )
                     events = receipts
                 elif stream_key == "presence_key":
-                    events = []
-                    presence_source = self.event_sources.sources["presence"]
-                    for user in users:
-                        interested = await service.is_interested_in_presence(user, self.store)
-                        if not interested:
-                            continue
-                        presence_events, _key = await presence_source.get_new_events(
-                            user=user,
-                            service=service,
-                            from_key=None, # I don't think this is required
-                        )
-                        time_now = self.clock.time_msec()
-                        presence_events = [
-                            {
-                                "type": "m.presence",
-                                "sender": event.user_id,
-                                "content": format_user_presence_state(
-                                    event, time_now, include_user_id=False
-                                ),
-                            }
-                            for event in presence_events
-                        ]
-                        events = events + presence_events
+                    events = await self._handle_as_presence(service, users)
+                elif stream_key == "device_list_key":
+                    # Check if the device lists have changed for any of the users we are interested in
+                    print("device_list_key", users)
                 elif stream_key == "to_device_key":
-                    print("to_device_key", users)
+                    # Check the inbox for any users the bridge owns 
+                    events, to_device_token = await self._handle_to_device(service, users, new_token)
+                    if events:
+                        # TODO: Do in background?
+                        await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token)
+                        if stream_key == "to_device_key":
+                            # Update database with new token
+                            await self.store.set_device_messages_token_for_appservice(service, to_device_token)
+                        return
                 if events:
                     # TODO: Do in background?
-                    await self.scheduler.submit_ephemeral_events_for_as(service, events)
+                    await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token)
+
+    async def _handle_device_list(self, service, users, token):
+        if not any([True for u in users if service.is_interested_in_user(u)]):
+            return False
+
+    async def _handle_to_device(self, service, users, token):
+        if not any([True for u in users if service.is_interested_in_user(u)]):
+            return False
+        
+        since_token = await self.store.get_device_messages_token_for_appservice(service)
         
+        messages, new_token = await self.store.get_new_messages_for_as(service, since_token, token)
+        return messages, new_token
+
+    async def _handle_as_presence(self, service, users):
+        events = []
+        presence_source = self.event_sources.sources["presence"]
+        for user in users:
+            interested = await service.is_interested_in_presence(user, self.store)
+            if not interested:
+                continue
+            presence_events, _key = await presence_source.get_new_events(
+                user=user,
+                service=service,
+                from_key=None, # TODO: I don't think this is required?
+            )
+            time_now = self.clock.time_msec()
+            presence_events = [
+                {
+                    "type": "m.presence",
+                    "sender": event.user_id,
+                    "content": format_user_presence_state(
+                        event, time_now, include_user_id=False
+                    ),
+                }
+                for event in presence_events
+            ]
+            events = events + presence_events
 
     async def query_user_exists(self, user_id):
         """Check if any application service knows this user_id exists.
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 85f6b1e3fd..1ebe4504fd 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -320,7 +320,7 @@ class ApplicationServiceTransactionWorkerStore(
         )
 
     async def get_new_events_for_appservice(self, current_id, limit):
-        """Get all new evnets"""
+        """Get all new events for an appservice"""
 
         def get_new_events_for_appservice_txn(txn):
             sql = (
@@ -350,6 +350,48 @@ class ApplicationServiceTransactionWorkerStore(
         events = await self.get_events_as_list(event_ids)
 
         return upper_bound, events
+    
+    async def get_device_messages_token_for_appservice(self, service):
+        txn.execute(
+            "SELECT device_message_stream_id FROM application_services_state WHERE as_id=?",
+            (service.id,),
+        )
+        last_txn_id = txn.fetchone()
+        if last_txn_id is None or last_txn_id[0] is None:  # no row exists
+            return 0
+        else:
+            return int(last_txn_id[0])  # select 'last_txn' col
+
+    async def set_device_messages_token_for_appservice(self, service, pos) -> None:
+        def set_appservice_last_pos_txn(txn):
+            txn.execute(
+                "UPDATE application_services_state SET device_message_stream_id = ? WHERE as_id=?", (pos, service.id)
+            )
+
+        await self.db_pool.runInteraction(
+            "set_device_messages_token_for_appservice", set_appservice_last_pos_txn
+        )
+    
+    async def get_device_list_token_for_appservice(self, service):
+        txn.execute(
+            "SELECT device_list_stream_id FROM application_services_state WHERE as_id=?",
+            (service.id,),
+        )
+        last_txn_id = txn.fetchone()
+        if last_txn_id is None or last_txn_id[0] is None:  # no row exists
+            return 0
+        else:
+            return int(last_txn_id[0])  # select 'last_txn' col
+
+    async def set_device_list_token_for_appservice(self, service, pos) -> None:
+        def set_appservice_last_pos_txn(txn):
+            txn.execute(
+                "UPDATE application_services_state SET device_list_stream_id = ?", (pos, service.id)
+            )
+
+        await self.db_pool.runInteraction(
+            "set_device_list_token_for_appservice", set_appservice_last_pos_txn
+        )
 
 
 class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore):
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index e71217a41f..e4fd979a33 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -21,6 +21,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla
 from synapse.storage.database import DatabasePool
 from synapse.util import json_encoder
 from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.appservice import ApplicationService
 
 logger = logging.getLogger(__name__)
 
@@ -29,6 +30,37 @@ class DeviceInboxWorkerStore(SQLBaseStore):
     def get_to_device_stream_token(self):
         return self._device_inbox_id_gen.get_current_token()
 
+    async def get_new_messages_for_as(
+        self,
+        service: ApplicationService,
+        last_stream_id: int,
+        current_stream_id: int,
+        limit: int = 100,
+    ) -> Tuple[List[dict], int]:
+        def get_new_messages_for_device_txn(txn):
+            sql = (
+                "SELECT stream_id, message_json, device_id, user_id FROM device_inbox"
+                " WHERE ? < stream_id AND stream_id <= ?"
+                " ORDER BY stream_id ASC"
+                " LIMIT ?"
+            )
+            txn.execute(
+                sql, (last_stream_id, current_stream_id, limit)
+            )
+            messages = []
+
+            for row in txn:
+                stream_pos = row[0]
+                if service.is_interested_in_user(row.user_id):
+                    messages.append(db_to_json(row[1]))
+            if len(messages) < limit:
+                stream_pos = current_stream_id
+            return messages, stream_pos
+
+        return await self.db_pool.runInteraction(
+            "get_new_messages_for_device", get_new_messages_for_device_txn
+        )
+
     async def get_new_messages_for_device(
         self,
         user_id: str,
diff --git a/synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql b/synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql
new file mode 100644
index 0000000000..d4abfb6183
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql
@@ -0,0 +1,25 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ /* for some reason, we have accumulated duplicate entries in
+  * device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less
+  * efficient.
+  */
+
+ALTER TABLE application_services_state
+    ADD COLUMN device_list_stream_id INT;
+
+ALTER TABLE application_services_state
+    ADD COLUMN device_message_stream_id INT;
\ No newline at end of file