summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2021-08-31 17:16:11 +0100
committerBrendan Abolivier <babolivier@matrix.org>2021-08-31 17:16:11 +0100
commit200ee12326bc8b8e73556f81272eecdcbc8f856f (patch)
tree6250a311d2e812297c03243f77140051abacb0e3 /synapse/storage/databases/main
parentMerge tag 'v1.34.0' into babolivier/dinsic_1.41.0 (diff)
parentMerge v1.35.0rc3 into v1.35.0 due to incorrect tagging (diff)
downloadsynapse-200ee12326bc8b8e73556f81272eecdcbc8f856f.tar.xz
Merge tag 'v1.35.0' into babolivier/dinsic_1.41.0
Synapse 1.35.0 (2021-06-01)
===========================

Note that [the tag](https://github.com/matrix-org/synapse/releases/tag/v1.35.0rc3) and [docker images](https://hub.docker.com/layers/matrixdotorg/synapse/v1.35.0rc3/images/sha256-34ccc87bd99a17e2cbc0902e678b5937d16bdc1991ead097eee6096481ecf2c4?context=explore) for `v1.35.0rc3` were incorrectly built. If you are experiencing issues with either, it is recommended to upgrade to the equivalent tag or docker image for the `v1.35.0` release.

Deprecations and Removals
-------------------------

- The core Synapse development team plan to drop support for the [unstable API of MSC2858](https://github.com/matrix-org/matrix-doc/blob/master/proposals/2858-Multiple-SSO-Identity-Providers.md#unstable-prefix), including the undocumented `experimental.msc2858_enabled` config option, in August 2021. Client authors should ensure that their clients are updated to use the stable API (which has been supported since Synapse 1.30) well before that time, to give their users time to upgrade. ([\#10101](https://github.com/matrix-org/synapse/issues/10101))

Bugfixes
--------

- Fixed a bug causing replication requests to fail when receiving a lot of events via federation. Introduced in v1.33.0. ([\#10082](https://github.com/matrix-org/synapse/issues/10082))
- Fix HTTP response size limit to allow joining very large rooms over federation. Introduced in v1.33.0. ([\#10093](https://github.com/matrix-org/synapse/issues/10093))

Internal Changes
----------------

- Log method and path when dropping request due to size limit. ([\#10091](https://github.com/matrix-org/synapse/issues/10091))

Synapse 1.35.0rc2 (2021-05-27)
==============================

Bugfixes
--------

- Fix a bug introduced in v1.35.0rc1 when calling the spaces summary API via a GET request. ([\#10079](https://github.com/matrix-org/synapse/issues/10079))

Synapse 1.35.0rc1 (2021-05-25)
==============================

Features
--------

- Add experimental support to allow a user who could join a restricted room to view it in the spaces summary. ([\#9922](https://github.com/matrix-org/synapse/issues/9922), [\#10007](https://github.com/matrix-org/synapse/issues/10007), [\#10038](https://github.com/matrix-org/synapse/issues/10038))
- Reduce memory usage when joining very large rooms over federation. ([\#9958](https://github.com/matrix-org/synapse/issues/9958))
- Add a configuration option which allows enabling opentracing by user id. ([\#9978](https://github.com/matrix-org/synapse/issues/9978))
- Enable experimental support for [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946) (spaces summary API) and [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083) (restricted join rules) by default. ([\#10011](https://github.com/matrix-org/synapse/issues/10011))

Bugfixes
--------

- Fix a bug introduced in v1.26.0 which meant that `synapse_port_db` would not correctly initialise some postgres sequences, requiring manual updates afterwards. ([\#9991](https://github.com/matrix-org/synapse/issues/9991))
- Fix `synctl`'s `--no-daemonize` parameter to work correctly with worker processes. ([\#9995](https://github.com/matrix-org/synapse/issues/9995))
- Fix a validation bug introduced in v1.34.0 in the ordering of spaces in the space summary API. ([\#10002](https://github.com/matrix-org/synapse/issues/10002))
- Fixed deletion of new presence stream states from database. ([\#10014](https://github.com/matrix-org/synapse/issues/10014), [\#10033](https://github.com/matrix-org/synapse/issues/10033))
- Fixed a bug with very high resolution image uploads throwing internal server errors. ([\#10029](https://github.com/matrix-org/synapse/issues/10029))

Updates to the Docker image
---------------------------

- Fix bug introduced in Synapse 1.33.0 which caused a `Permission denied: '/homeserver.log'` error when starting Synapse with the generated log configuration. Contributed by Sergio Miguéns Iglesias. ([\#10045](https://github.com/matrix-org/synapse/issues/10045))

Improved Documentation
----------------------

- Add hardened systemd files as proposed in [#9760](https://github.com/matrix-org/synapse/issues/9760) and added them to `contrib/`. Change the docs to reflect the presence of these files. ([\#9803](https://github.com/matrix-org/synapse/issues/9803))
- Clarify documentation around SSO mapping providers generating unique IDs and localparts. ([\#9980](https://github.com/matrix-org/synapse/issues/9980))
- Updates to the PostgreSQL documentation (`postgres.md`). ([\#9988](https://github.com/matrix-org/synapse/issues/9988), [\#9989](https://github.com/matrix-org/synapse/issues/9989))
- Fix broken link in user directory documentation. Contributed by @junquera. ([\#10016](https://github.com/matrix-org/synapse/issues/10016))
- Add missing room state entry to the table of contents of room admin API. ([\#10043](https://github.com/matrix-org/synapse/issues/10043))

Deprecations and Removals
-------------------------

- Removed support for the deprecated `tls_fingerprints` configuration setting. Contributed by Jerin J Titus. ([\#9280](https://github.com/matrix-org/synapse/issues/9280))

Internal Changes
----------------

- Allow sending full presence to users via workers other than the one that called `ModuleApi.send_local_online_presence_to`. ([\#9823](https://github.com/matrix-org/synapse/issues/9823))
- Update comments in the space summary handler. ([\#9974](https://github.com/matrix-org/synapse/issues/9974))
- Minor enhancements to the `@cachedList` descriptor. ([\#9975](https://github.com/matrix-org/synapse/issues/9975))
- Split multipart email sending into a dedicated handler. ([\#9977](https://github.com/matrix-org/synapse/issues/9977))
- Run `black` on files in the `scripts` directory. ([\#9981](https://github.com/matrix-org/synapse/issues/9981))
- Add missing type hints to `synapse.util` module. ([\#9982](https://github.com/matrix-org/synapse/issues/9982))
- Simplify a few helper functions. ([\#9984](https://github.com/matrix-org/synapse/issues/9984), [\#9985](https://github.com/matrix-org/synapse/issues/9985), [\#9986](https://github.com/matrix-org/synapse/issues/9986))
- Remove unnecessary property from SQLBaseStore. ([\#9987](https://github.com/matrix-org/synapse/issues/9987))
- Remove `keylen` param on `LruCache`. ([\#9993](https://github.com/matrix-org/synapse/issues/9993))
- Update the Grafana dashboard in `contrib/`. ([\#10001](https://github.com/matrix-org/synapse/issues/10001))
- Add a batching queue implementation. ([\#10017](https://github.com/matrix-org/synapse/issues/10017))
- Reduce memory usage when verifying signatures on large numbers of events at once. ([\#10018](https://github.com/matrix-org/synapse/issues/10018))
- Properly invalidate caches for destination retry timings every (instead of expiring entries every 5 minutes). ([\#10036](https://github.com/matrix-org/synapse/issues/10036))
- Fix running complement tests with Synapse workers. ([\#10039](https://github.com/matrix-org/synapse/issues/10039))
- Fix typo in `get_state_ids_for_event` docstring where the return type was incorrect. ([\#10050](https://github.com/matrix-org/synapse/issues/10050))
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/__init__.py4
-rw-r--r--synapse/storage/databases/main/client_ips.py2
-rw-r--r--synapse/storage/databases/main/devices.py4
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py4
-rw-r--r--synapse/storage/databases/main/events_worker.py1
-rw-r--r--synapse/storage/databases/main/keys.py2
-rw-r--r--synapse/storage/databases/main/presence.py76
-rw-r--r--synapse/storage/databases/main/registration.py3
-rw-r--r--synapse/storage/databases/main/transactions.py66
-rw-r--r--synapse/storage/databases/main/user_erasure_store.py13
10 files changed, 118 insertions, 57 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py

index 49c7606d51..9cce62ae6c 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py
@@ -67,7 +67,7 @@ from .state import StateStore from .stats import StatsStore from .stream import StreamStore from .tags import TagsStore -from .transactions import TransactionStore +from .transactions import TransactionWorkerStore from .ui_auth import UIAuthStore from .user_directory import UserDirectoryStore from .user_erasure_store import UserErasureStore @@ -83,7 +83,7 @@ class DataStore( StreamStore, ProfileStore, PresenceStore, - TransactionStore, + TransactionWorkerStore, DirectoryStore, KeyStore, StateStore, diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index d60010e942..074b077bef 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py
@@ -436,7 +436,7 @@ class ClientIpStore(ClientIpWorkerStore): def __init__(self, database: DatabasePool, db_conn, hs): self.client_ip_last_seen = LruCache( - cache_name="client_ip_last_seen", keylen=4, max_size=50000 + cache_name="client_ip_last_seen", max_size=50000 ) super().__init__(database, db_conn, hs) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index c9346de316..fd87ba71ab 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py
@@ -665,7 +665,7 @@ class DeviceWorkerStore(SQLBaseStore): cached_method_name="get_device_list_last_stream_id_for_remote", list_name="user_ids", ) - async def get_device_list_last_stream_id_for_remotes(self, user_ids: str): + async def get_device_list_last_stream_id_for_remotes(self, user_ids: Iterable[str]): rows = await self.db_pool.simple_select_many_batch( table="device_lists_remote_extremeties", column="user_id", @@ -1053,7 +1053,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): # Map of (user_id, device_id) -> bool. If there is an entry that implies # the device exists. self.device_id_exists_cache = LruCache( - cache_name="device_id_exists", keylen=2, max_size=10000 + cache_name="device_id_exists", max_size=10000 ) async def store_device( diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 398d6b6acb..9ba5778a88 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -473,7 +473,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore): num_args=1, ) async def _get_bare_e2e_cross_signing_keys_bulk( - self, user_ids: List[str] + self, user_ids: Iterable[str] ) -> Dict[str, Dict[str, dict]]: """Returns the cross-signing keys for a set of users. The output of this function should be passed to _get_e2e_cross_signing_signatures_txn if @@ -497,7 +497,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore): def _get_bare_e2e_cross_signing_keys_bulk_txn( self, txn: Connection, - user_ids: List[str], + user_ids: Iterable[str], ) -> Dict[str, Dict[str, dict]]: """Returns the cross-signing keys for a set of users. The output of this function should be passed to _get_e2e_cross_signing_signatures_txn if diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 2c823e09cf..6963bbf7f4 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -157,7 +157,6 @@ class EventsWorkerStore(SQLBaseStore): self._get_event_cache = LruCache( cache_name="*getEvent*", - keylen=3, max_size=hs.config.caches.event_cache_size, ) diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index 0e86807834..6990f3ed1d 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py
@@ -55,7 +55,7 @@ class KeyStore(SQLBaseStore): """ keys = {} - def _get_keys(txn: Cursor, batch: Tuple[Tuple[str, str]]) -> None: + def _get_keys(txn: Cursor, batch: Tuple[Tuple[str, str], ...]) -> None: """Processes a batch of keys to fetch, and adds the result to `keys`.""" # batch_iter always returns tuples so it's safe to do len(batch) diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index db22fab23e..6a2baa7841 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py
@@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Dict, List, Tuple +from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple from synapse.api.presence import PresenceState, UserPresenceState from synapse.replication.tcp.streams import PresenceStream @@ -57,6 +57,7 @@ class PresenceStore(SQLBaseStore): db_conn, "presence_stream", "stream_id" ) + self.hs = hs self._presence_on_startup = self._get_active_presence(db_conn) presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict( @@ -96,6 +97,15 @@ class PresenceStore(SQLBaseStore): ) txn.call_after(self._get_presence_for_user.invalidate, (state.user_id,)) + # Delete old rows to stop database from getting really big + sql = "DELETE FROM presence_stream WHERE stream_id < ? AND " + + for states in batch_iter(presence_states, 50): + clause, args = make_in_list_sql_clause( + self.database_engine, "user_id", [s.user_id for s in states] + ) + txn.execute(sql + clause, [stream_id] + list(args)) + # Actually insert new rows self.db_pool.simple_insert_many_txn( txn, @@ -116,15 +126,6 @@ class PresenceStore(SQLBaseStore): ], ) - # Delete old rows to stop database from getting really big - sql = "DELETE FROM presence_stream WHERE stream_id < ? AND " - - for states in batch_iter(presence_states, 50): - clause, args = make_in_list_sql_clause( - self.database_engine, "user_id", [s.user_id for s in states] - ) - txn.execute(sql + clause, [stream_id] + list(args)) - async def get_all_presence_updates( self, instance_name: str, last_id: int, current_id: int, limit: int ) -> Tuple[List[Tuple[int, list]], int, bool]: @@ -210,6 +211,61 @@ class PresenceStore(SQLBaseStore): return {row["user_id"]: UserPresenceState(**row) for row in rows} + async def should_user_receive_full_presence_with_token( + self, + user_id: str, + from_token: int, + ) -> bool: + """Check whether the given user should receive full presence using the stream token + they're updating from. + + Args: + user_id: The ID of the user to check. + from_token: The stream token included in their /sync token. + + Returns: + True if the user should have full presence sent to them, False otherwise. + """ + + def _should_user_receive_full_presence_with_token_txn(txn): + sql = """ + SELECT 1 FROM users_to_send_full_presence_to + WHERE user_id = ? + AND presence_stream_id >= ? + """ + txn.execute(sql, (user_id, from_token)) + return bool(txn.fetchone()) + + return await self.db_pool.runInteraction( + "should_user_receive_full_presence_with_token", + _should_user_receive_full_presence_with_token_txn, + ) + + async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): + """Adds to the list of users who should receive a full snapshot of presence + upon their next sync. + + Args: + user_ids: An iterable of user IDs. + """ + # Add user entries to the table, updating the presence_stream_id column if the user already + # exists in the table. + await self.db_pool.simple_upsert_many( + table="users_to_send_full_presence_to", + key_names=("user_id",), + key_values=[(user_id,) for user_id in user_ids], + value_names=("presence_stream_id",), + # We save the current presence stream ID token along with the user ID entry so + # that when a user /sync's, even if they syncing multiple times across separate + # devices at different times, each device will receive full presence once - when + # the presence stream ID in their sync token is less than the one in the table + # for their user ID. + value_values=( + (self._presence_id_gen.get_current_token(),) for _ in user_ids + ), + desc="add_users_to_send_full_presence_to", + ) + async def get_presence_for_all_users( self, include_offline: bool = True, diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index d36b18a0e9..77e2eb27db 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py
@@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import random import re from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union @@ -1077,7 +1078,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): expiration_ts = now_ms + self._account_validity_period if use_delta: - expiration_ts = self.rand.randrange( + expiration_ts = random.randrange( expiration_ts - self._account_validity_startup_job_max_delta, expiration_ts, ) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 82335e7a9d..d211c423b2 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py
@@ -16,13 +16,15 @@ import logging from collections import namedtuple from typing import Iterable, List, Optional, Tuple +import attr from canonicaljson import encode_canonical_json from synapse.metrics.background_process_metrics import wrap_as_background_process -from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.storage._base import db_to_json from synapse.storage.database import DatabasePool, LoggingTransaction +from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.types import JsonDict -from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.caches.descriptors import cached db_binary_type = memoryview @@ -38,10 +40,23 @@ _UpdateTransactionRow = namedtuple( "_TransactionRow", ("response_code", "response_json") ) -SENTINEL = object() +@attr.s(slots=True, frozen=True, auto_attribs=True) +class DestinationRetryTimings: + """The current destination retry timing info for a remote server.""" -class TransactionWorkerStore(SQLBaseStore): + # The first time we tried and failed to reach the remote server, in ms. + failure_ts: int + + # The last time we tried and failed to reach the remote server, in ms. + retry_last_ts: int + + # How long since the last time we tried to reach the remote server before + # trying again, in ms. + retry_interval: int + + +class TransactionWorkerStore(CacheInvalidationWorkerStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) @@ -60,19 +75,6 @@ class TransactionWorkerStore(SQLBaseStore): "_cleanup_transactions", _cleanup_transactions_txn ) - -class TransactionStore(TransactionWorkerStore): - """A collection of queries for handling PDUs.""" - - def __init__(self, database: DatabasePool, db_conn, hs): - super().__init__(database, db_conn, hs) - - self._destination_retry_cache = ExpiringCache( - cache_name="get_destination_retry_timings", - clock=self._clock, - expiry_ms=5 * 60 * 1000, - ) - async def get_received_txn_response( self, transaction_id: str, origin: str ) -> Optional[Tuple[int, JsonDict]]: @@ -145,7 +147,11 @@ class TransactionStore(TransactionWorkerStore): desc="set_received_txn_response", ) - async def get_destination_retry_timings(self, destination): + @cached(max_entries=10000) + async def get_destination_retry_timings( + self, + destination: str, + ) -> Optional[DestinationRetryTimings]: """Gets the current retry timings (if any) for a given destination. Args: @@ -156,34 +162,29 @@ class TransactionStore(TransactionWorkerStore): Otherwise a dict for the retry scheme """ - result = self._destination_retry_cache.get(destination, SENTINEL) - if result is not SENTINEL: - return result - result = await self.db_pool.runInteraction( "get_destination_retry_timings", self._get_destination_retry_timings, destination, ) - # We don't hugely care about race conditions between getting and - # invalidating the cache, since we time out fairly quickly anyway. - self._destination_retry_cache[destination] = result return result - def _get_destination_retry_timings(self, txn, destination): + def _get_destination_retry_timings( + self, txn, destination: str + ) -> Optional[DestinationRetryTimings]: result = self.db_pool.simple_select_one_txn( txn, table="destinations", keyvalues={"destination": destination}, - retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"), + retcols=("failure_ts", "retry_last_ts", "retry_interval"), allow_none=True, ) # check we have a row and retry_last_ts is not null or zero # (retry_last_ts can't be negative) if result and result["retry_last_ts"]: - return result + return DestinationRetryTimings(**result) else: return None @@ -204,7 +205,6 @@ class TransactionStore(TransactionWorkerStore): retry_interval: how long until next retry in ms """ - self._destination_retry_cache.pop(destination, None) if self.database_engine.can_native_upsert: return await self.db_pool.runInteraction( "set_destination_retry_timings", @@ -252,6 +252,10 @@ class TransactionStore(TransactionWorkerStore): txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval)) + self._invalidate_cache_and_stream( + txn, self.get_destination_retry_timings, (destination,) + ) + def _set_destination_retry_timings_emulated( self, txn, destination, failure_ts, retry_last_ts, retry_interval ): @@ -295,6 +299,10 @@ class TransactionStore(TransactionWorkerStore): }, ) + self._invalidate_cache_and_stream( + txn, self.get_destination_retry_timings, (destination,) + ) + async def store_destination_rooms_entries( self, destinations: Iterable[str], diff --git a/synapse/storage/databases/main/user_erasure_store.py b/synapse/storage/databases/main/user_erasure_store.py
index acf6b2fb64..1ecdd40c38 100644 --- a/synapse/storage/databases/main/user_erasure_store.py +++ b/synapse/storage/databases/main/user_erasure_store.py
@@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Dict, Iterable + from synapse.storage._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedList @@ -37,21 +39,16 @@ class UserErasureWorkerStore(SQLBaseStore): return bool(result) @cachedList(cached_method_name="is_user_erased", list_name="user_ids") - async def are_users_erased(self, user_ids): + async def are_users_erased(self, user_ids: Iterable[str]) -> Dict[str, bool]: """ Checks which users in a list have requested erasure Args: - user_ids (iterable[str]): full user id to check + user_ids: full user ids to check Returns: - dict[str, bool]: - for each user, whether the user has requested erasure. + for each user, whether the user has requested erasure. """ - # this serves the dual purpose of (a) making sure we can do len and - # iterate it multiple times, and (b) avoiding duplicates. - user_ids = tuple(set(user_ids)) - rows = await self.db_pool.simple_select_many_batch( table="erased_users", column="user_id",