diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 85f6b1e3fd..43bf0f649a 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -15,12 +15,15 @@
# limitations under the License.
import logging
import re
+from typing import List
-from synapse.appservice import AppServiceTransaction
+from synapse.appservice import ApplicationService, AppServiceTransaction
from synapse.config.appservice import load_appservices
+from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.events_worker import EventsWorkerStore
+from synapse.types import JsonDict
from synapse.util import json_encoder
logger = logging.getLogger(__name__)
@@ -172,15 +175,23 @@ class ApplicationServiceTransactionWorkerStore(
"application_services_state", {"as_id": service.id}, {"state": state}
)
- async def create_appservice_txn(self, service, events):
+ async def create_appservice_txn(
+ self,
+ service: ApplicationService,
+ events: List[EventBase],
+ ephemeral: List[JsonDict],
+ ) -> AppServiceTransaction:
"""Atomically creates a new transaction for this application service
- with the given list of events.
+ with the given list of events. Ephemeral events are NOT persisted to the
+ database and are not resent if a transaction is retried.
Args:
- service(ApplicationService): The service who the transaction is for.
- events(list<Event>): A list of events to put in the transaction.
+ service: The service who the transaction is for.
+ events: A list of persistent events to put in the transaction.
+ ephemeral: A list of ephemeral events to put in the transaction.
+
Returns:
- AppServiceTransaction: A new transaction.
+ A new transaction.
"""
def _create_appservice_txn(txn):
@@ -207,7 +218,9 @@ class ApplicationServiceTransactionWorkerStore(
"VALUES(?,?,?)",
(service.id, new_txn_id, event_ids),
)
- return AppServiceTransaction(service=service, id=new_txn_id, events=events)
+ return AppServiceTransaction(
+ service=service, id=new_txn_id, events=events, ephemeral=ephemeral
+ )
return await self.db_pool.runInteraction(
"create_appservice_txn", _create_appservice_txn
@@ -296,7 +309,9 @@ class ApplicationServiceTransactionWorkerStore(
events = await self.get_events_as_list(event_ids)
- return AppServiceTransaction(service=service, id=entry["txn_id"], events=events)
+ return AppServiceTransaction(
+ service=service, id=entry["txn_id"], events=events, ephemeral=[]
+ )
def _get_last_txn(self, txn, service_id):
txn.execute(
@@ -320,7 +335,7 @@ class ApplicationServiceTransactionWorkerStore(
)
async def get_new_events_for_appservice(self, current_id, limit):
- """Get all new evnets"""
+ """Get all new events for an appservice"""
def get_new_events_for_appservice_txn(txn):
sql = (
@@ -351,6 +366,39 @@ class ApplicationServiceTransactionWorkerStore(
return upper_bound, events
+ async def get_type_stream_id_for_appservice(
+ self, service: ApplicationService, type: str
+ ) -> int:
+ def get_type_stream_id_for_appservice_txn(txn):
+ stream_id_type = "%s_stream_id" % type
+ txn.execute(
+ "SELECT ? FROM application_services_state WHERE as_id=?",
+ (stream_id_type, service.id,),
+ )
+ last_txn_id = txn.fetchone()
+ if last_txn_id is None or last_txn_id[0] is None: # no row exists
+ return 0
+ else:
+ return int(last_txn_id[0])
+
+ return await self.db_pool.runInteraction(
+ "get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn
+ )
+
+ async def set_type_stream_id_for_appservice(
+ self, service: ApplicationService, type: str, pos: int
+ ) -> None:
+ def set_type_stream_id_for_appservice_txn(txn):
+ stream_id_type = "%s_stream_id" % type
+ txn.execute(
+ "UPDATE ? SET device_list_stream_id = ? WHERE as_id=?",
+ (stream_id_type, pos, service.id),
+ )
+
+ await self.db_pool.runInteraction(
+ "set_type_stream_id_for_appservice", set_type_stream_id_for_appservice_txn
+ )
+
class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore):
# This is currently empty due to there not being any AS storage functions
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index c79ddff680..5cdf16521c 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -23,6 +23,7 @@ from twisted.internet import defer
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedList
@@ -274,6 +275,60 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
}
return results
+ @cached(num_args=2,)
+ async def get_linearized_receipts_for_all_rooms(
+ self, to_key: int, from_key: Optional[int] = None
+ ) -> Dict[str, JsonDict]:
+ """Get receipts for all rooms between two stream_ids.
+
+ Args:
+ to_key: Max stream id to fetch receipts upto.
+ from_key: Min stream id to fetch receipts from. None fetches
+ from the start.
+
+ Returns:
+ A dictionary of roomids to a list of receipts.
+ """
+
+ def f(txn):
+ if from_key:
+ sql = """
+ SELECT * FROM receipts_linearized WHERE
+ stream_id > ? AND stream_id <= ?
+ """
+ txn.execute(sql, [from_key, to_key])
+ else:
+ sql = """
+ SELECT * FROM receipts_linearized WHERE
+ stream_id <= ?
+ """
+
+ txn.execute(sql, [to_key])
+
+ return self.db_pool.cursor_to_dict(txn)
+
+ txn_results = await self.db_pool.runInteraction(
+ "get_linearized_receipts_for_all_rooms", f
+ )
+
+ results = {}
+ for row in txn_results:
+ # We want a single event per room, since we want to batch the
+ # receipts by room, event and type.
+ room_event = results.setdefault(
+ row["room_id"],
+ {"type": "m.receipt", "room_id": row["room_id"], "content": {}},
+ )
+
+ # The content is of the form:
+ # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. }
+ event_entry = room_event["content"].setdefault(row["event_id"], {})
+ receipt_type = event_entry.setdefault(row["receipt_type"], {})
+
+ receipt_type[row["user_id"]] = db_to_json(row["data"])
+
+ return results
+
async def get_users_sent_receipts_between(
self, last_id: int, current_id: int
) -> List[str]:
diff --git a/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql b/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql
new file mode 100644
index 0000000000..20f5a95a24
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE application_services_state
+ ADD COLUMN read_receipt_stream_id INT,
+ ADD COLUMN presence_stream_id INT;
\ No newline at end of file
|