summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorWill Hunt <will@half-shot.uk>2020-10-15 17:33:28 +0100
committerGitHub <noreply@github.com>2020-10-15 12:33:28 -0400
commitc276bd996916adce899410b9c4c891892f51b992 (patch)
tree4e6e2737c82d963e8f00945f4e1b0fafcf835423 /synapse/storage/databases/main
parentAdd option to scripts-dev/lint.sh to only lint files changed since the last g... (diff)
downloadsynapse-c276bd996916adce899410b9c4c891892f51b992.tar.xz
Send some ephemeral events to appservices (#8437)
Optionally sends typing, presence, and read receipt information to appservices.
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/appservice.py66
-rw-r--r--synapse/storage/databases/main/receipts.py55
-rw-r--r--synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql18
3 files changed, 130 insertions, 9 deletions
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 85f6b1e3fd..43bf0f649a 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -15,12 +15,15 @@
 # limitations under the License.
 import logging
 import re
+from typing import List
 
-from synapse.appservice import AppServiceTransaction
+from synapse.appservice import ApplicationService, AppServiceTransaction
 from synapse.config.appservice import load_appservices
+from synapse.events import EventBase
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
+from synapse.types import JsonDict
 from synapse.util import json_encoder
 
 logger = logging.getLogger(__name__)
@@ -172,15 +175,23 @@ class ApplicationServiceTransactionWorkerStore(
             "application_services_state", {"as_id": service.id}, {"state": state}
         )
 
-    async def create_appservice_txn(self, service, events):
+    async def create_appservice_txn(
+        self,
+        service: ApplicationService,
+        events: List[EventBase],
+        ephemeral: List[JsonDict],
+    ) -> AppServiceTransaction:
         """Atomically creates a new transaction for this application service
-        with the given list of events.
+        with the given list of events. Ephemeral events are NOT persisted to the
+        database and are not resent if a transaction is retried.
 
         Args:
-            service(ApplicationService): The service who the transaction is for.
-            events(list<Event>): A list of events to put in the transaction.
+            service: The service who the transaction is for.
+            events: A list of persistent events to put in the transaction.
+            ephemeral: A list of ephemeral events to put in the transaction.
+
         Returns:
-            AppServiceTransaction: A new transaction.
+            A new transaction.
         """
 
         def _create_appservice_txn(txn):
@@ -207,7 +218,9 @@ class ApplicationServiceTransactionWorkerStore(
                 "VALUES(?,?,?)",
                 (service.id, new_txn_id, event_ids),
             )
-            return AppServiceTransaction(service=service, id=new_txn_id, events=events)
+            return AppServiceTransaction(
+                service=service, id=new_txn_id, events=events, ephemeral=ephemeral
+            )
 
         return await self.db_pool.runInteraction(
             "create_appservice_txn", _create_appservice_txn
@@ -296,7 +309,9 @@ class ApplicationServiceTransactionWorkerStore(
 
         events = await self.get_events_as_list(event_ids)
 
-        return AppServiceTransaction(service=service, id=entry["txn_id"], events=events)
+        return AppServiceTransaction(
+            service=service, id=entry["txn_id"], events=events, ephemeral=[]
+        )
 
     def _get_last_txn(self, txn, service_id):
         txn.execute(
@@ -320,7 +335,7 @@ class ApplicationServiceTransactionWorkerStore(
         )
 
     async def get_new_events_for_appservice(self, current_id, limit):
-        """Get all new evnets"""
+        """Get all new events for an appservice"""
 
         def get_new_events_for_appservice_txn(txn):
             sql = (
@@ -351,6 +366,39 @@ class ApplicationServiceTransactionWorkerStore(
 
         return upper_bound, events
 
+    async def get_type_stream_id_for_appservice(
+        self, service: ApplicationService, type: str
+    ) -> int:
+        def get_type_stream_id_for_appservice_txn(txn):
+            stream_id_type = "%s_stream_id" % type
+            txn.execute(
+                "SELECT ? FROM application_services_state WHERE as_id=?",
+                (stream_id_type, service.id,),
+            )
+            last_txn_id = txn.fetchone()
+            if last_txn_id is None or last_txn_id[0] is None:  # no row exists
+                return 0
+            else:
+                return int(last_txn_id[0])
+
+        return await self.db_pool.runInteraction(
+            "get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn
+        )
+
+    async def set_type_stream_id_for_appservice(
+        self, service: ApplicationService, type: str, pos: int
+    ) -> None:
+        def set_type_stream_id_for_appservice_txn(txn):
+            stream_id_type = "%s_stream_id" % type
+            txn.execute(
+                "UPDATE ? SET device_list_stream_id = ? WHERE as_id=?",
+                (stream_id_type, pos, service.id),
+            )
+
+        await self.db_pool.runInteraction(
+            "set_type_stream_id_for_appservice", set_type_stream_id_for_appservice_txn
+        )
+
 
 class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore):
     # This is currently empty due to there not being any AS storage functions
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index c79ddff680..5cdf16521c 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -23,6 +23,7 @@ from twisted.internet import defer
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import DatabasePool
 from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.async_helpers import ObservableDeferred
 from synapse.util.caches.descriptors import cached, cachedList
@@ -274,6 +275,60 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
         }
         return results
 
+    @cached(num_args=2,)
+    async def get_linearized_receipts_for_all_rooms(
+        self, to_key: int, from_key: Optional[int] = None
+    ) -> Dict[str, JsonDict]:
+        """Get receipts for all rooms between two stream_ids.
+
+        Args:
+            to_key: Max stream id to fetch receipts upto.
+            from_key: Min stream id to fetch receipts from. None fetches
+                from the start.
+
+        Returns:
+            A dictionary of roomids to a list of receipts.
+        """
+
+        def f(txn):
+            if from_key:
+                sql = """
+                    SELECT * FROM receipts_linearized WHERE
+                    stream_id > ? AND stream_id <= ?
+                """
+                txn.execute(sql, [from_key, to_key])
+            else:
+                sql = """
+                    SELECT * FROM receipts_linearized WHERE
+                    stream_id <= ?
+                """
+
+                txn.execute(sql, [to_key])
+
+            return self.db_pool.cursor_to_dict(txn)
+
+        txn_results = await self.db_pool.runInteraction(
+            "get_linearized_receipts_for_all_rooms", f
+        )
+
+        results = {}
+        for row in txn_results:
+            # We want a single event per room, since we want to batch the
+            # receipts by room, event and type.
+            room_event = results.setdefault(
+                row["room_id"],
+                {"type": "m.receipt", "room_id": row["room_id"], "content": {}},
+            )
+
+            # The content is of the form:
+            # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. }
+            event_entry = room_event["content"].setdefault(row["event_id"], {})
+            receipt_type = event_entry.setdefault(row["receipt_type"], {})
+
+            receipt_type[row["user_id"]] = db_to_json(row["data"])
+
+        return results
+
     async def get_users_sent_receipts_between(
         self, last_id: int, current_id: int
     ) -> List[str]:
diff --git a/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql b/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql
new file mode 100644
index 0000000000..20f5a95a24
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE application_services_state
+    ADD COLUMN read_receipt_stream_id INT,
+    ADD COLUMN presence_stream_id INT;
\ No newline at end of file