summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/background_updates.py4
-rw-r--r--synapse/storage/database.py17
-rw-r--r--synapse/storage/databases/main/__init__.py6
-rw-r--r--synapse/storage/databases/main/events.py41
-rw-r--r--synapse/storage/databases/main/events_worker.py41
-rw-r--r--synapse/storage/databases/main/lock.py36
-rw-r--r--synapse/storage/databases/main/push_rule.py1
-rw-r--r--synapse/storage/databases/main/registration.py7
-rw-r--r--synapse/storage/databases/main/stats.py1
-rw-r--r--synapse/storage/databases/main/transactions.py26
-rw-r--r--synapse/storage/schema/__init__.py16
11 files changed, 99 insertions, 97 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py

index ddca0af1da..7619f405fa 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py
@@ -405,14 +405,14 @@ class BackgroundUpdater: try: result = await self.do_next_background_update(sleep) back_to_back_failures = 0 - except Exception: + except Exception as e: + logger.exception("Error doing update: %s", e) back_to_back_failures += 1 if back_to_back_failures >= 5: self._aborted = True raise RuntimeError( "5 back-to-back background update failures; aborting." ) - logger.exception("Error doing update") else: if result: logger.info( diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index a1c8fb0f46..55ac313f33 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py
@@ -31,6 +31,7 @@ from typing import ( Iterator, List, Optional, + Sequence, Tuple, Type, TypeVar, @@ -358,7 +359,21 @@ class LoggingTransaction: return self.txn.rowcount @property - def description(self) -> Any: + def description( + self, + ) -> Optional[ + Sequence[ + Tuple[ + str, + Optional[Any], + Optional[int], + Optional[int], + Optional[int], + Optional[int], + Optional[int], + ] + ] + ]: return self.txn.description def execute_batch(self, sql: str, args: Iterable[Iterable[Any]]) -> None: diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index a85633efcd..0836e247ef 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py
@@ -277,6 +277,10 @@ class DataStore( FROM users as u LEFT JOIN profiles AS p ON u.name = p.full_user_id LEFT JOIN erased_users AS eu ON u.name = eu.user_id + LEFT JOIN ( + SELECT user_id, MAX(last_seen) AS last_seen_ts + FROM user_ips GROUP BY user_id + ) ls ON u.name = ls.user_id {where_clause} """ sql = "SELECT COUNT(*) as total_users " + sql_base @@ -286,7 +290,7 @@ class DataStore( sql = f""" SELECT name, user_type, is_guest, admin, deactivated, shadow_banned, displayname, avatar_url, creation_ts * 1000 as creation_ts, approved, - eu.user_id is not null as erased + eu.user_id is not null as erased, last_seen_ts {sql_base} ORDER BY {order_by_column} {order}, u.name ASC LIMIT ? OFFSET ? diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index c1353b18c1..0c1ed75240 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -978,26 +978,12 @@ class PersistEventsStore: """Persist the mapping from transaction IDs to event IDs (if defined).""" inserted_ts = self._clock.time_msec() - to_insert_token_id: List[Tuple[str, str, str, int, str, int]] = [] to_insert_device_id: List[Tuple[str, str, str, str, str, int]] = [] for event, _ in events_and_contexts: txn_id = getattr(event.internal_metadata, "txn_id", None) - token_id = getattr(event.internal_metadata, "token_id", None) device_id = getattr(event.internal_metadata, "device_id", None) if txn_id is not None: - if token_id is not None: - to_insert_token_id.append( - ( - event.event_id, - event.room_id, - event.sender, - token_id, - txn_id, - inserted_ts, - ) - ) - if device_id is not None: to_insert_device_id.append( ( @@ -1010,26 +996,7 @@ class PersistEventsStore: ) ) - # Synapse usually relies on the device_id to scope transactions for events, - # except for users without device IDs (appservice, guests, and access - # tokens minted with the admin API) which use the access token ID instead. - # - # TODO https://github.com/matrix-org/synapse/issues/16042 - if to_insert_token_id: - self.db_pool.simple_insert_many_txn( - txn, - table="event_txn_id", - keys=( - "event_id", - "room_id", - "user_id", - "token_id", - "txn_id", - "inserted_ts", - ), - values=to_insert_token_id, - ) - + # Synapse relies on the device_id to scope transactions for events.. if to_insert_device_id: self.db_pool.simple_insert_many_txn( txn, @@ -1671,7 +1638,7 @@ class PersistEventsStore: if self._ephemeral_messages_enabled: # If there's an expiry timestamp on the event, store it. expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) - if type(expiry_ts) is int and not event.is_state(): + if type(expiry_ts) is int and not event.is_state(): # noqa: E721 self._insert_event_expiry_txn(txn, event.event_id, expiry_ts) # Insert into the room_memberships table. @@ -2039,10 +2006,10 @@ class PersistEventsStore: ): if ( "min_lifetime" in event.content - and type(event.content["min_lifetime"]) is not int + and type(event.content["min_lifetime"]) is not int # noqa: E721 ) or ( "max_lifetime" in event.content - and type(event.content["max_lifetime"]) is not int + and type(event.content["max_lifetime"]) is not int # noqa: E721 ): # Ignore the event if one of the value isn't an integer. return diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index d0dd455aec..943666ed4f 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -2022,25 +2022,6 @@ class EventsWorkerStore(SQLBaseStore): desc="get_next_event_to_expire", func=get_next_event_to_expire_txn ) - async def get_event_id_from_transaction_id_and_token_id( - self, room_id: str, user_id: str, token_id: int, txn_id: str - ) -> Optional[str]: - """Look up if we have already persisted an event for the transaction ID, - returning the event ID if so. - """ - return await self.db_pool.simple_select_one_onecol( - table="event_txn_id", - keyvalues={ - "room_id": room_id, - "user_id": user_id, - "token_id": token_id, - "txn_id": txn_id, - }, - retcol="event_id", - allow_none=True, - desc="get_event_id_from_transaction_id_and_token_id", - ) - async def get_event_id_from_transaction_id_and_device_id( self, room_id: str, user_id: str, device_id: str, txn_id: str ) -> Optional[str]: @@ -2072,29 +2053,35 @@ class EventsWorkerStore(SQLBaseStore): """ mapping = {} - txn_id_to_event: Dict[Tuple[str, int, str], str] = {} + txn_id_to_event: Dict[Tuple[str, str, str, str], str] = {} for event in events: - token_id = getattr(event.internal_metadata, "token_id", None) + device_id = getattr(event.internal_metadata, "device_id", None) txn_id = getattr(event.internal_metadata, "txn_id", None) - if token_id and txn_id: + if device_id and txn_id: # Check if this is a duplicate of an event in the given events. - existing = txn_id_to_event.get((event.room_id, token_id, txn_id)) + existing = txn_id_to_event.get( + (event.room_id, event.sender, device_id, txn_id) + ) if existing: mapping[event.event_id] = existing continue # Check if this is a duplicate of an event we've already # persisted. - existing = await self.get_event_id_from_transaction_id_and_token_id( - event.room_id, event.sender, token_id, txn_id + existing = await self.get_event_id_from_transaction_id_and_device_id( + event.room_id, event.sender, device_id, txn_id ) if existing: mapping[event.event_id] = existing - txn_id_to_event[(event.room_id, token_id, txn_id)] = existing + txn_id_to_event[ + (event.room_id, event.sender, device_id, txn_id) + ] = existing else: - txn_id_to_event[(event.room_id, token_id, txn_id)] = event.event_id + txn_id_to_event[ + (event.room_id, event.sender, device_id, txn_id) + ] = event.event_id return mapping diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 54d40e7a3a..5a01ec2137 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py
@@ -17,7 +17,7 @@ from types import TracebackType from typing import TYPE_CHECKING, Collection, Optional, Set, Tuple, Type from weakref import WeakValueDictionary -from twisted.internet.interfaces import IReactorCore +from twisted.internet.task import LoopingCall from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore @@ -26,6 +26,7 @@ from synapse.storage.database import ( LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.types import ISynapseReactor from synapse.util import Clock from synapse.util.stringutils import random_string @@ -358,7 +359,7 @@ class Lock: def __init__( self, - reactor: IReactorCore, + reactor: ISynapseReactor, clock: Clock, store: LockStore, read_write: bool, @@ -377,19 +378,25 @@ class Lock: self._table = "worker_read_write_locks" if read_write else "worker_locks" - self._looping_call = clock.looping_call( + # We might be called from a non-main thread, so we defer setting up the + # looping call. + self._looping_call: Optional[LoopingCall] = None + reactor.callFromThread(self._setup_looping_call) + + self._dropped = False + + def _setup_looping_call(self) -> None: + self._looping_call = self._clock.looping_call( self._renew, _RENEWAL_INTERVAL_MS, - store, - clock, - read_write, - lock_name, - lock_key, - token, + self._store, + self._clock, + self._read_write, + self._lock_name, + self._lock_key, + self._token, ) - self._dropped = False - @staticmethod @wrap_as_background_process("Lock._renew") async def _renew( @@ -459,7 +466,7 @@ class Lock: if self._dropped: return - if self._looping_call.running: + if self._looping_call and self._looping_call.running: self._looping_call.stop() await self._store.db_pool.simple_delete( @@ -486,8 +493,9 @@ class Lock: # We should not be dropped without the lock being released (unless # we're shutting down), but if we are then let's at least stop # renewing the lock. - if self._looping_call.running: - self._looping_call.stop() + if self._looping_call and self._looping_call.running: + # We might be called from a non-main thread. + self._reactor.callFromThread(self._looping_call.stop) if self._reactor.running: logger.error( diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index c13c0bc7d7..bec0dc2afe 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py
@@ -88,7 +88,6 @@ def _load_rules( msc1767_enabled=experimental_config.msc1767_enabled, msc3664_enabled=experimental_config.msc3664_enabled, msc3381_polls_enabled=experimental_config.msc3381_polls_enabled, - msc3958_suppress_edits_enabled=experimental_config.msc3958_supress_edit_notifs, ) return filtered_rules diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index d3a01d526f..7e85b73e8e 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py
@@ -206,8 +206,12 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): consent_server_notice_sent, appservice_id, creation_ts, user_type, deactivated, COALESCE(shadow_banned, FALSE) AS shadow_banned, COALESCE(approved, TRUE) AS approved, - COALESCE(locked, FALSE) AS locked + COALESCE(locked, FALSE) AS locked, last_seen_ts FROM users + LEFT JOIN ( + SELECT user_id, MAX(last_seen) AS last_seen_ts + FROM user_ips GROUP BY user_id + ) ls ON users.name = ls.user_id WHERE name = ? """, (user_id,), @@ -268,6 +272,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): is_shadow_banned=bool(user_data["shadow_banned"]), user_id=UserID.from_string(user_data["name"]), user_type=user_data["user_type"], + last_seen_ts=user_data["last_seen_ts"], ) async def is_trial_user(self, user_id: str) -> bool: diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 6298f0984d..3a2966b9e4 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py
@@ -107,6 +107,7 @@ class UserSortOrder(Enum): AVATAR_URL = "avatar_url" SHADOW_BANNED = "shadow_banned" CREATION_TS = "creation_ts" + LAST_SEEN_TS = "last_seen_ts" class StatsStore(StateDeltasStore): diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 860bbf7c0f..efd21b5bfc 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py
@@ -14,7 +14,7 @@ import logging from enum import Enum -from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, cast import attr from canonicaljson import encode_canonical_json @@ -28,8 +28,8 @@ from synapse.storage.database import ( LoggingTransaction, ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore -from synapse.types import JsonDict -from synapse.util.caches.descriptors import cached +from synapse.types import JsonDict, StrCollection +from synapse.util.caches.descriptors import cached, cachedList if TYPE_CHECKING: from synapse.server import HomeServer @@ -205,6 +205,26 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): else: return None + @cachedList( + cached_method_name="get_destination_retry_timings", list_name="destinations" + ) + async def get_destination_retry_timings_batch( + self, destinations: StrCollection + ) -> Dict[str, Optional[DestinationRetryTimings]]: + rows = await self.db_pool.simple_select_many_batch( + table="destinations", + iterable=destinations, + column="destination", + retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"), + desc="get_destination_retry_timings_batch", + ) + + return { + row.pop("destination"): DestinationRetryTimings(**row) + for row in rows + if row["retry_last_ts"] and row["failure_ts"] and row["retry_interval"] + } + async def set_destination_retry_timings( self, destination: str, diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 649d3c8e9f..422f11f59e 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 80 # remember to update the list below when updating +SCHEMA_VERSION = 81 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -114,19 +114,15 @@ Changes in SCHEMA_VERSION = 79 Changes in SCHEMA_VERSION = 80 - The event_txn_id_device_id is always written to for new events. - Add tables for the task scheduler. + +Changes in SCHEMA_VERSION = 81 + - The event_txn_id is no longer written to for new events. """ SCHEMA_COMPAT_VERSION = ( - # Queries against `event_stream_ordering` columns in membership tables must - # be disambiguated. - # - # The threads_id column must written to with non-null values for the - # event_push_actions, event_push_actions_staging, and event_push_summary tables. - # - # insertions to the column `full_user_id` of tables profiles and user_filters can no - # longer be null - 76 + # The `event_txn_id_device_id` must be written to for new events. + 80 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat