diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 85f6b1e3fd..43bf0f649a 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -15,12 +15,15 @@
# limitations under the License.
import logging
import re
+from typing import List
-from synapse.appservice import AppServiceTransaction
+from synapse.appservice import ApplicationService, AppServiceTransaction
from synapse.config.appservice import load_appservices
+from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.events_worker import EventsWorkerStore
+from synapse.types import JsonDict
from synapse.util import json_encoder
logger = logging.getLogger(__name__)
@@ -172,15 +175,23 @@ class ApplicationServiceTransactionWorkerStore(
"application_services_state", {"as_id": service.id}, {"state": state}
)
- async def create_appservice_txn(self, service, events):
+ async def create_appservice_txn(
+ self,
+ service: ApplicationService,
+ events: List[EventBase],
+ ephemeral: List[JsonDict],
+ ) -> AppServiceTransaction:
"""Atomically creates a new transaction for this application service
- with the given list of events.
+ with the given list of events. Ephemeral events are NOT persisted to the
+ database and are not resent if a transaction is retried.
Args:
- service(ApplicationService): The service who the transaction is for.
- events(list<Event>): A list of events to put in the transaction.
+ service: The service who the transaction is for.
+ events: A list of persistent events to put in the transaction.
+ ephemeral: A list of ephemeral events to put in the transaction.
+
Returns:
- AppServiceTransaction: A new transaction.
+ A new transaction.
"""
def _create_appservice_txn(txn):
@@ -207,7 +218,9 @@ class ApplicationServiceTransactionWorkerStore(
"VALUES(?,?,?)",
(service.id, new_txn_id, event_ids),
)
- return AppServiceTransaction(service=service, id=new_txn_id, events=events)
+ return AppServiceTransaction(
+ service=service, id=new_txn_id, events=events, ephemeral=ephemeral
+ )
return await self.db_pool.runInteraction(
"create_appservice_txn", _create_appservice_txn
@@ -296,7 +309,9 @@ class ApplicationServiceTransactionWorkerStore(
events = await self.get_events_as_list(event_ids)
- return AppServiceTransaction(service=service, id=entry["txn_id"], events=events)
+ return AppServiceTransaction(
+ service=service, id=entry["txn_id"], events=events, ephemeral=[]
+ )
def _get_last_txn(self, txn, service_id):
txn.execute(
@@ -320,7 +335,7 @@ class ApplicationServiceTransactionWorkerStore(
)
async def get_new_events_for_appservice(self, current_id, limit):
- """Get all new evnets"""
+ """Get all new events for an appservice"""
def get_new_events_for_appservice_txn(txn):
sql = (
@@ -351,6 +366,39 @@ class ApplicationServiceTransactionWorkerStore(
return upper_bound, events
+ async def get_type_stream_id_for_appservice(
+ self, service: ApplicationService, type: str
+ ) -> int:
+ def get_type_stream_id_for_appservice_txn(txn):
+ stream_id_type = "%s_stream_id" % type
+ txn.execute(
+ "SELECT ? FROM application_services_state WHERE as_id=?",
+ (stream_id_type, service.id,),
+ )
+ last_txn_id = txn.fetchone()
+ if last_txn_id is None or last_txn_id[0] is None: # no row exists
+ return 0
+ else:
+ return int(last_txn_id[0])
+
+ return await self.db_pool.runInteraction(
+ "get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn
+ )
+
+ async def set_type_stream_id_for_appservice(
+ self, service: ApplicationService, type: str, pos: int
+ ) -> None:
+ def set_type_stream_id_for_appservice_txn(txn):
+ stream_id_type = "%s_stream_id" % type
+ txn.execute(
+ "UPDATE ? SET device_list_stream_id = ? WHERE as_id=?",
+ (stream_id_type, pos, service.id),
+ )
+
+ await self.db_pool.runInteraction(
+ "set_type_stream_id_for_appservice", set_type_stream_id_for_appservice_txn
+ )
+
class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore):
# This is currently empty due to there not being any AS storage functions
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index c30746c886..ec8260e906 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -19,7 +19,7 @@ from typing import Dict, Optional, Tuple
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, make_tuple_comparison_clause
-from synapse.util.caches.descriptors import Cache
+from synapse.util.caches.deferred_cache import DeferredCache
logger = logging.getLogger(__name__)
@@ -410,7 +410,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
class ClientIpStore(ClientIpWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):
- self.client_ip_last_seen = Cache(
+ self.client_ip_last_seen = DeferredCache(
name="client_ip_last_seen", keylen=4, max_entries=50000
)
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 88fd97e1df..e662a20d24 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -34,7 +34,8 @@ from synapse.storage.database import (
)
from synapse.types import Collection, JsonDict, get_verify_key_from_cross_signing_key
from synapse.util import json_decoder, json_encoder
-from synapse.util.caches.descriptors import Cache, cached, cachedList
+from synapse.util.caches.deferred_cache import DeferredCache
+from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.iterutils import batch_iter
from synapse.util.stringutils import shortstr
@@ -1004,7 +1005,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
# Map of (user_id, device_id) -> bool. If there is an entry that implies
# the device exists.
- self.device_id_exists_cache = Cache(
+ self.device_id_exists_cache = DeferredCache(
name="device_id_exists", keylen=2, max_entries=10000
)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 3ec4d1d9c2..ff150f0be7 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -42,7 +42,8 @@ from synapse.storage.database import DatabasePool
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.types import Collection, get_domain_from_id
-from synapse.util.caches.descriptors import Cache, cached
+from synapse.util.caches.deferred_cache import DeferredCache
+from synapse.util.caches.descriptors import cached
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure
@@ -145,7 +146,7 @@ class EventsWorkerStore(SQLBaseStore):
self._cleanup_old_transaction_ids,
)
- self._get_event_cache = Cache(
+ self._get_event_cache = DeferredCache(
"*getEvent*",
keylen=3,
max_entries=hs.config.caches.event_cache_size,
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index ad43bb05ab..f8f4bb9b3f 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -122,9 +122,7 @@ class KeyStore(SQLBaseStore):
# param, which is itself the 2-tuple (server_name, key_id).
invalidations.append((server_name, key_id))
- await self.db_pool.runInteraction(
- "store_server_verify_keys",
- self.db_pool.simple_upsert_many_txn,
+ await self.db_pool.simple_upsert_many(
table="server_signature_keys",
key_names=("server_name", "key_id"),
key_values=key_values,
@@ -135,6 +133,7 @@ class KeyStore(SQLBaseStore):
"verify_key",
),
value_values=value_values,
+ desc="store_server_verify_keys",
)
invalidate = self._get_server_verify_key.invalidate
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index 0acf0617ca..79b01d16f9 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -281,9 +281,14 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
a_day_in_milliseconds = 24 * 60 * 60 * 1000
now = self._clock.time_msec()
+ # A note on user_agent. Technically a given device can have multiple
+ # user agents, so we need to decide which one to pick. We could have handled this
+ # in number of ways, but given that we don't _that_ much have gone for MAX()
+ # For more details of the other options considered see
+ # https://github.com/matrix-org/synapse/pull/8503#discussion_r502306111
sql = """
- INSERT INTO user_daily_visits (user_id, device_id, timestamp)
- SELECT u.user_id, u.device_id, ?
+ INSERT INTO user_daily_visits (user_id, device_id, timestamp, user_agent)
+ SELECT u.user_id, u.device_id, ?, MAX(u.user_agent)
FROM user_ips AS u
LEFT JOIN (
SELECT user_id, device_id, timestamp FROM user_daily_visits
@@ -294,7 +299,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
WHERE last_seen > ? AND last_seen <= ?
AND udv.timestamp IS NULL AND users.is_guest=0
AND users.appservice_id IS NULL
- GROUP BY u.user_id, u.device_id
+ GROUP BY u.user_id, u.device_id, u.user_agent
"""
# This means that the day has rolled over but there could still
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index c79ddff680..5cdf16521c 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -23,6 +23,7 @@ from twisted.internet import defer
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedList
@@ -274,6 +275,60 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
}
return results
+ @cached(num_args=2,)
+ async def get_linearized_receipts_for_all_rooms(
+ self, to_key: int, from_key: Optional[int] = None
+ ) -> Dict[str, JsonDict]:
+ """Get receipts for all rooms between two stream_ids.
+
+ Args:
+ to_key: Max stream id to fetch receipts upto.
+ from_key: Min stream id to fetch receipts from. None fetches
+ from the start.
+
+ Returns:
+ A dictionary of roomids to a list of receipts.
+ """
+
+ def f(txn):
+ if from_key:
+ sql = """
+ SELECT * FROM receipts_linearized WHERE
+ stream_id > ? AND stream_id <= ?
+ """
+ txn.execute(sql, [from_key, to_key])
+ else:
+ sql = """
+ SELECT * FROM receipts_linearized WHERE
+ stream_id <= ?
+ """
+
+ txn.execute(sql, [to_key])
+
+ return self.db_pool.cursor_to_dict(txn)
+
+ txn_results = await self.db_pool.runInteraction(
+ "get_linearized_receipts_for_all_rooms", f
+ )
+
+ results = {}
+ for row in txn_results:
+ # We want a single event per room, since we want to batch the
+ # receipts by room, event and type.
+ room_event = results.setdefault(
+ row["room_id"],
+ {"type": "m.receipt", "room_id": row["room_id"], "content": {}},
+ )
+
+ # The content is of the form:
+ # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. }
+ event_entry = room_event["content"].setdefault(row["event_id"], {})
+ receipt_type = event_entry.setdefault(row["receipt_type"], {})
+
+ receipt_type[row["user_id"]] = db_to_json(row["data"])
+
+ return results
+
async def get_users_sent_receipts_between(
self, last_id: int, current_id: int
) -> List[str]:
diff --git a/synapse/storage/databases/main/schema/delta/58/20user_daily_visits.sql b/synapse/storage/databases/main/schema/delta/58/20user_daily_visits.sql
new file mode 100644
index 0000000000..b0b5dcddce
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/20user_daily_visits.sql
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ -- Add new column to user_daily_visits to track user agent
+ALTER TABLE user_daily_visits
+ ADD COLUMN user_agent TEXT;
diff --git a/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql b/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql
new file mode 100644
index 0000000000..20f5a95a24
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE application_services_state
+ ADD COLUMN read_receipt_stream_id INT,
+ ADD COLUMN presence_stream_id INT;
\ No newline at end of file
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 7d46090267..59207cadd4 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -208,42 +208,56 @@ class TransactionStore(TransactionWorkerStore):
"""
self._destination_retry_cache.pop(destination, None)
- return await self.db_pool.runInteraction(
- "set_destination_retry_timings",
- self._set_destination_retry_timings,
- destination,
- failure_ts,
- retry_last_ts,
- retry_interval,
- )
+ if self.database_engine.can_native_upsert:
+ return await self.db_pool.runInteraction(
+ "set_destination_retry_timings",
+ self._set_destination_retry_timings_native,
+ destination,
+ failure_ts,
+ retry_last_ts,
+ retry_interval,
+ db_autocommit=True, # Safe as its a single upsert
+ )
+ else:
+ return await self.db_pool.runInteraction(
+ "set_destination_retry_timings",
+ self._set_destination_retry_timings_emulated,
+ destination,
+ failure_ts,
+ retry_last_ts,
+ retry_interval,
+ )
- def _set_destination_retry_timings(
+ def _set_destination_retry_timings_native(
self, txn, destination, failure_ts, retry_last_ts, retry_interval
):
+ assert self.database_engine.can_native_upsert
+
+ # Upsert retry time interval if retry_interval is zero (i.e. we're
+ # resetting it) or greater than the existing retry interval.
+ #
+ # WARNING: This is executed in autocommit, so we shouldn't add any more
+ # SQL calls in here (without being very careful).
+ sql = """
+ INSERT INTO destinations (
+ destination, failure_ts, retry_last_ts, retry_interval
+ )
+ VALUES (?, ?, ?, ?)
+ ON CONFLICT (destination) DO UPDATE SET
+ failure_ts = EXCLUDED.failure_ts,
+ retry_last_ts = EXCLUDED.retry_last_ts,
+ retry_interval = EXCLUDED.retry_interval
+ WHERE
+ EXCLUDED.retry_interval = 0
+ OR destinations.retry_interval IS NULL
+ OR destinations.retry_interval < EXCLUDED.retry_interval
+ """
- if self.database_engine.can_native_upsert:
- # Upsert retry time interval if retry_interval is zero (i.e. we're
- # resetting it) or greater than the existing retry interval.
-
- sql = """
- INSERT INTO destinations (
- destination, failure_ts, retry_last_ts, retry_interval
- )
- VALUES (?, ?, ?, ?)
- ON CONFLICT (destination) DO UPDATE SET
- failure_ts = EXCLUDED.failure_ts,
- retry_last_ts = EXCLUDED.retry_last_ts,
- retry_interval = EXCLUDED.retry_interval
- WHERE
- EXCLUDED.retry_interval = 0
- OR destinations.retry_interval IS NULL
- OR destinations.retry_interval < EXCLUDED.retry_interval
- """
-
- txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
-
- return
+ txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
+ def _set_destination_retry_timings_emulated(
+ self, txn, destination, failure_ts, retry_last_ts, retry_interval
+ ):
self.database_engine.lock_table(txn, "destinations")
# We need to be careful here as the data may have changed from under us
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 5a390ff2f6..d87ceec6da 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -480,21 +480,16 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
user_id_tuples: iterable of 2-tuple of user IDs.
"""
- def _add_users_who_share_room_txn(txn):
- self.db_pool.simple_upsert_many_txn(
- txn,
- table="users_who_share_private_rooms",
- key_names=["user_id", "other_user_id", "room_id"],
- key_values=[
- (user_id, other_user_id, room_id)
- for user_id, other_user_id in user_id_tuples
- ],
- value_names=(),
- value_values=None,
- )
-
- await self.db_pool.runInteraction(
- "add_users_who_share_room", _add_users_who_share_room_txn
+ await self.db_pool.simple_upsert_many(
+ table="users_who_share_private_rooms",
+ key_names=["user_id", "other_user_id", "room_id"],
+ key_values=[
+ (user_id, other_user_id, room_id)
+ for user_id, other_user_id in user_id_tuples
+ ],
+ value_names=(),
+ value_values=None,
+ desc="add_users_who_share_room",
)
async def add_users_in_public_rooms(
@@ -508,19 +503,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
user_ids
"""
- def _add_users_in_public_rooms_txn(txn):
-
- self.db_pool.simple_upsert_many_txn(
- txn,
- table="users_in_public_rooms",
- key_names=["user_id", "room_id"],
- key_values=[(user_id, room_id) for user_id in user_ids],
- value_names=(),
- value_values=None,
- )
-
- await self.db_pool.runInteraction(
- "add_users_in_public_rooms", _add_users_in_public_rooms_txn
+ await self.db_pool.simple_upsert_many(
+ table="users_in_public_rooms",
+ key_names=["user_id", "room_id"],
+ key_values=[(user_id, room_id) for user_id in user_ids],
+ value_names=(),
+ value_values=None,
+ desc="add_users_in_public_rooms",
)
async def delete_all_from_user_dir(self) -> None:
|