From ba00e20234eadae66f105f8bda64e39beed9a92d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 21 Oct 2021 14:39:16 -0400 Subject: Add a thread relation type per MSC3440. (#11088) Adds experimental support for MSC3440's `io.element.thread` relation type (and the aggregation for it). --- synapse/storage/databases/main/events.py | 4 ++ synapse/storage/databases/main/relations.py | 59 ++++++++++++++++++++++++++++- 2 files changed, 62 insertions(+), 1 deletion(-) (limited to 'synapse/storage/databases') diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 37439f8562..8d9086ecf0 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1710,6 +1710,7 @@ class PersistEventsStore: RelationTypes.ANNOTATION, RelationTypes.REFERENCE, RelationTypes.REPLACE, + RelationTypes.THREAD, ): # Unknown relation type return @@ -1740,6 +1741,9 @@ class PersistEventsStore: if rel_type == RelationTypes.REPLACE: txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,)) + if rel_type == RelationTypes.THREAD: + txn.call_after(self.store.get_thread_summary.invalidate, (parent_id,)) + def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase): """Handles keeping track of insertion events and edges/connections. Part of MSC2716. diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 2bbf6d6a95..40760fbd1b 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import Optional +from typing import Optional, Tuple import attr @@ -269,6 +269,63 @@ class RelationsWorkerStore(SQLBaseStore): return await self.get_event(edit_id, allow_none=True) + @cached() + async def get_thread_summary( + self, event_id: str + ) -> Tuple[int, Optional[EventBase]]: + """Get the number of threaded replies, the senders of those replies, and + the latest reply (if any) for the given event. + + Args: + event_id: The original event ID + + Returns: + The number of items in the thread and the most recent response, if any. + """ + + def _get_thread_summary_txn(txn) -> Tuple[int, Optional[str]]: + # Fetch the count of threaded events and the latest event ID. + # TODO Should this only allow m.room.message events. + sql = """ + SELECT event_id + FROM event_relations + INNER JOIN events USING (event_id) + WHERE + relates_to_id = ? + AND relation_type = ? + ORDER BY topological_ordering DESC, stream_ordering DESC + LIMIT 1 + """ + + txn.execute(sql, (event_id, RelationTypes.THREAD)) + row = txn.fetchone() + if row is None: + return 0, None + + latest_event_id = row[0] + + sql = """ + SELECT COALESCE(COUNT(event_id), 0) + FROM event_relations + WHERE + relates_to_id = ? + AND relation_type = ? + """ + txn.execute(sql, (event_id, RelationTypes.THREAD)) + count = txn.fetchone()[0] + + return count, latest_event_id + + count, latest_event_id = await self.db_pool.runInteraction( + "get_thread_summary", _get_thread_summary_txn + ) + + latest_event = None + if latest_event_id: + latest_event = await self.get_event(latest_event_id, allow_none=True) + + return count, latest_event + async def has_user_annotated_event( self, parent_id: str, event_type: str, aggregation_key: str, sender: str ) -> bool: -- cgit 1.5.1 From 2b82ec425fccb0ef626242779f7ccd4d77a0685c Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Fri, 22 Oct 2021 18:15:41 +0100 Subject: Add type hints for most `HomeServer` parameters (#11095) --- changelog.d/11095.misc | 1 + synapse/app/_base.py | 8 +++---- synapse/app/admin_cmd.py | 4 ++-- synapse/app/generic_worker.py | 4 ++-- synapse/app/homeserver.py | 2 +- synapse/app/phone_stats_home.py | 8 +++++-- synapse/appservice/api.py | 3 ++- synapse/config/logger.py | 9 ++++++- synapse/federation/federation_base.py | 7 +++++- synapse/federation/federation_server.py | 9 +++---- synapse/http/matrixfederationclient.py | 8 +++++-- synapse/http/server.py | 19 +++++++++------ synapse/replication/http/__init__.py | 9 +++++-- synapse/replication/http/_base.py | 8 ++++--- synapse/replication/http/account_data.py | 14 +++++++---- synapse/replication/http/devices.py | 8 +++++-- synapse/replication/http/federation.py | 16 ++++++++----- synapse/replication/http/login.py | 8 +++++-- synapse/replication/http/membership.py | 6 ++--- synapse/replication/http/presence.py | 2 +- synapse/replication/http/push.py | 2 +- synapse/replication/http/register.py | 10 +++++--- synapse/replication/http/send_event.py | 8 +++++-- synapse/replication/http/streams.py | 8 +++++-- synapse/replication/slave/storage/_base.py | 7 ++++-- synapse/replication/slave/storage/client_ips.py | 7 +++++- synapse/replication/slave/storage/devices.py | 7 +++++- synapse/replication/slave/storage/events.py | 6 ++++- synapse/replication/slave/storage/filtering.py | 7 +++++- synapse/replication/slave/storage/groups.py | 7 +++++- synapse/replication/tcp/external_cache.py | 9 ++++++- synapse/replication/tcp/handler.py | 6 ++++- synapse/replication/tcp/resource.py | 8 +++++-- synapse/replication/tcp/streams/_base.py | 20 ++++++++-------- synapse/rest/admin/devices.py | 2 +- synapse/server.py | 11 ++++++--- synapse/storage/database.py | 6 ++++- synapse/storage/databases/__init__.py | 28 +++++++++++++++++----- synapse/storage/databases/main/__init__.py | 7 ++++-- synapse/storage/databases/main/account_data.py | 7 ++++-- synapse/storage/databases/main/cache.py | 7 ++++-- synapse/storage/databases/main/deviceinbox.py | 9 ++++--- synapse/storage/databases/main/devices.py | 21 ++++++++++++---- synapse/storage/databases/main/event_federation.py | 9 ++++--- .../storage/databases/main/event_push_actions.py | 9 ++++--- .../storage/databases/main/events_bg_updates.py | 7 ++++-- synapse/storage/databases/main/media_repository.py | 9 ++++--- synapse/storage/databases/main/metrics.py | 7 ++++-- .../storage/databases/main/monthly_active_users.py | 9 ++++--- synapse/storage/databases/main/push_rule.py | 7 ++++-- synapse/storage/databases/main/receipts.py | 7 ++++-- synapse/storage/databases/main/room.py | 11 +++++---- synapse/storage/databases/main/roommember.py | 7 +++--- synapse/storage/databases/main/search.py | 9 ++++--- synapse/storage/databases/main/state.py | 11 +++++---- synapse/storage/databases/main/stats.py | 7 ++++-- synapse/storage/databases/main/transactions.py | 7 ++++-- synapse/storage/persist_events.py | 6 ++++- 58 files changed, 342 insertions(+), 143 deletions(-) create mode 100644 changelog.d/11095.misc (limited to 'synapse/storage/databases') diff --git a/changelog.d/11095.misc b/changelog.d/11095.misc new file mode 100644 index 0000000000..786e90b595 --- /dev/null +++ b/changelog.d/11095.misc @@ -0,0 +1 @@ +Add type hints to most `HomeServer` parameters. diff --git a/synapse/app/_base.py b/synapse/app/_base.py index bb4d53d778..2ca2e051e4 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -294,7 +294,7 @@ def listen_ssl( return r -def refresh_certificate(hs): +def refresh_certificate(hs: "HomeServer"): """ Refresh the TLS certificates that Synapse is using by re-reading them from disk and updating the TLS context factories to use them. @@ -419,11 +419,11 @@ async def start(hs: "HomeServer"): atexit.register(gc.freeze) -def setup_sentry(hs): +def setup_sentry(hs: "HomeServer"): """Enable sentry integration, if enabled in configuration Args: - hs (synapse.server.HomeServer) + hs """ if not hs.config.metrics.sentry_enabled: @@ -449,7 +449,7 @@ def setup_sentry(hs): scope.set_tag("worker_name", name) -def setup_sdnotify(hs): +def setup_sdnotify(hs: "HomeServer"): """Adds process state hooks to tell systemd what we are up to.""" # Tell systemd our state, if we're using it. This will silently fail if diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index b156b93bf3..2fc848596d 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -68,11 +68,11 @@ class AdminCmdServer(HomeServer): DATASTORE_CLASS = AdminCmdSlavedStore -async def export_data_command(hs, args): +async def export_data_command(hs: HomeServer, args): """Export data for a user. Args: - hs (HomeServer) + hs args (argparse.Namespace) """ diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 7489f31d9a..51eadf122d 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -131,10 +131,10 @@ class KeyUploadServlet(RestServlet): PATTERNS = client_patterns("/keys/upload(/(?P[^/]+))?$") - def __init__(self, hs): + def __init__(self, hs: HomeServer): """ Args: - hs (synapse.server.HomeServer): server + hs: server """ super().__init__() self.auth = hs.get_auth() diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 422f03cc04..93e2299266 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -412,7 +412,7 @@ def format_config_error(e: ConfigError) -> Iterator[str]: e = e.__cause__ -def run(hs): +def run(hs: HomeServer): PROFILE_SYNAPSE = False if PROFILE_SYNAPSE: diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py index fcd01e833c..126450e17a 100644 --- a/synapse/app/phone_stats_home.py +++ b/synapse/app/phone_stats_home.py @@ -15,11 +15,15 @@ import logging import math import resource import sys +from typing import TYPE_CHECKING from prometheus_client import Gauge from synapse.metrics.background_process_metrics import wrap_as_background_process +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger("synapse.app.homeserver") # Contains the list of processes we will be monitoring @@ -41,7 +45,7 @@ registered_reserved_users_mau_gauge = Gauge( @wrap_as_background_process("phone_stats_home") -async def phone_stats_home(hs, stats, stats_process=_stats_process): +async def phone_stats_home(hs: "HomeServer", stats, stats_process=_stats_process): logger.info("Gathering stats for reporting") now = int(hs.get_clock().time()) uptime = int(now - hs.start_time) @@ -142,7 +146,7 @@ async def phone_stats_home(hs, stats, stats_process=_stats_process): logger.warning("Error reporting stats: %s", e) -def start_phone_stats_home(hs): +def start_phone_stats_home(hs: "HomeServer"): """ Start the background tasks which report phone home stats. """ diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 935f24263c..d08f6bbd7f 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -27,6 +27,7 @@ from synapse.util.caches.response_cache import ResponseCache if TYPE_CHECKING: from synapse.appservice import ApplicationService + from synapse.server import HomeServer logger = logging.getLogger(__name__) @@ -84,7 +85,7 @@ class ApplicationServiceApi(SimpleHttpClient): pushing. """ - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.clock = hs.get_clock() diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 0a08231e5a..5252e61a99 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -18,6 +18,7 @@ import os import sys import threading from string import Template +from typing import TYPE_CHECKING import yaml from zope.interface import implementer @@ -38,6 +39,9 @@ from synapse.util.versionstring import get_version_string from ._base import Config, ConfigError +if TYPE_CHECKING: + from synapse.server import HomeServer + DEFAULT_LOG_CONFIG = Template( """\ # Log configuration for Synapse. @@ -306,7 +310,10 @@ def _reload_logging_config(log_config_path): def setup_logging( - hs, config, use_worker_options=False, logBeginner: LogBeginner = globalLogBeginner + hs: "HomeServer", + config, + use_worker_options=False, + logBeginner: LogBeginner = globalLogBeginner, ) -> None: """ Set up the logging subsystem. diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 0cd424e12a..f56344a3b9 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -14,6 +14,7 @@ # limitations under the License. import logging from collections import namedtuple +from typing import TYPE_CHECKING from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership from synapse.api.errors import Codes, SynapseError @@ -25,11 +26,15 @@ from synapse.events.utils import prune_event, validate_canonicaljson from synapse.http.servlet import assert_params_in_dict from synapse.types import JsonDict, get_domain_from_id +if TYPE_CHECKING: + from synapse.server import HomeServer + + logger = logging.getLogger(__name__) class FederationBase: - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.hs = hs self.server_name = hs.hostname diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d8c0b86f23..0d66034f44 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -467,7 +467,7 @@ class FederationServer(FederationBase): async def on_room_state_request( self, origin: str, room_id: str, event_id: Optional[str] - ) -> Tuple[int, Dict[str, Any]]: + ) -> Tuple[int, JsonDict]: origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) @@ -481,7 +481,7 @@ class FederationServer(FederationBase): # - but that's non-trivial to get right, and anyway somewhat defeats # the point of the linearizer. with (await self._server_linearizer.queue((origin, room_id))): - resp = dict( + resp: JsonDict = dict( await self._state_resp_cache.wrap( (room_id, event_id), self._on_context_state_request_compute, @@ -1061,11 +1061,12 @@ class FederationServer(FederationBase): origin, event = next - lock = await self.store.try_acquire_lock( + new_lock = await self.store.try_acquire_lock( _INBOUND_EVENT_HANDLING_LOCK_NAME, room_id ) - if not lock: + if not new_lock: return + lock = new_lock def __str__(self) -> str: return "" % self.server_name diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 4f59224686..203d723d41 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -21,6 +21,7 @@ import typing import urllib.parse from io import BytesIO, StringIO from typing import ( + TYPE_CHECKING, Callable, Dict, Generic, @@ -73,6 +74,9 @@ from synapse.util import json_decoder from synapse.util.async_helpers import timeout_deferred from synapse.util.metrics import Measure +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) outgoing_requests_counter = Counter( @@ -319,7 +323,7 @@ class MatrixFederationHttpClient: requests. """ - def __init__(self, hs, tls_client_options_factory): + def __init__(self, hs: "HomeServer", tls_client_options_factory): self.hs = hs self.signing_key = hs.signing_key self.server_name = hs.hostname @@ -711,7 +715,7 @@ class MatrixFederationHttpClient: Returns: A list of headers to be added as "Authorization:" headers """ - request = { + request: JsonDict = { "method": method.decode("ascii"), "uri": url_bytes.decode("ascii"), "origin": self.server_name, diff --git a/synapse/http/server.py b/synapse/http/server.py index 897ba5e453..1af0d9a31d 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -22,6 +22,7 @@ import urllib from http import HTTPStatus from inspect import isawaitable from typing import ( + TYPE_CHECKING, Any, Awaitable, Callable, @@ -61,6 +62,9 @@ from synapse.util import json_encoder from synapse.util.caches import intern_dict from synapse.util.iterutils import chunk_seq +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) HTML_ERROR_TEMPLATE = """ @@ -343,6 +347,11 @@ class DirectServeJsonResource(_AsyncResource): return_json_error(f, request) +_PathEntry = collections.namedtuple( + "_PathEntry", ["pattern", "callback", "servlet_classname"] +) + + class JsonResource(DirectServeJsonResource): """This implements the HttpServer interface and provides JSON support for Resources. @@ -359,14 +368,10 @@ class JsonResource(DirectServeJsonResource): isLeaf = True - _PathEntry = collections.namedtuple( - "_PathEntry", ["pattern", "callback", "servlet_classname"] - ) - - def __init__(self, hs, canonical_json=True, extract_context=False): + def __init__(self, hs: "HomeServer", canonical_json=True, extract_context=False): super().__init__(canonical_json, extract_context) self.clock = hs.get_clock() - self.path_regexs = {} + self.path_regexs: Dict[bytes, List[_PathEntry]] = {} self.hs = hs def register_paths(self, method, path_patterns, callback, servlet_classname): @@ -391,7 +396,7 @@ class JsonResource(DirectServeJsonResource): for path_pattern in path_patterns: logger.debug("Registering for %s %s", method, path_pattern.pattern) self.path_regexs.setdefault(method, []).append( - self._PathEntry(path_pattern, callback, servlet_classname) + _PathEntry(path_pattern, callback, servlet_classname) ) def _get_handler_for_request( diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index ba8114ac9e..1457d9d59b 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING + from synapse.http.server import JsonResource from synapse.replication.http import ( account_data, @@ -26,16 +28,19 @@ from synapse.replication.http import ( streams, ) +if TYPE_CHECKING: + from synapse.server import HomeServer + REPLICATION_PREFIX = "/_synapse/replication" class ReplicationRestResource(JsonResource): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): # We enable extracting jaeger contexts here as these are internal APIs. super().__init__(hs, canonical_json=False, extract_context=True) self.register_servlets(hs) - def register_servlets(self, hs): + def register_servlets(self, hs: "HomeServer"): send_event.register_servlets(hs, self) federation.register_servlets(hs, self) presence.register_servlets(hs, self) diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index e047ec74d8..585332b244 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -17,7 +17,7 @@ import logging import re import urllib from inspect import signature -from typing import TYPE_CHECKING, Dict, List, Tuple +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Tuple from prometheus_client import Counter, Gauge @@ -156,7 +156,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): pass @classmethod - def make_client(cls, hs): + def make_client(cls, hs: "HomeServer"): """Create a client that makes requests. Returns a callable that accepts the same parameters as @@ -208,7 +208,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): url_args.append(txn_id) if cls.METHOD == "POST": - request_func = client.post_json_get_json + request_func: Callable[ + ..., Awaitable[Any] + ] = client.post_json_get_json elif cls.METHOD == "PUT": request_func = client.put_json elif cls.METHOD == "GET": diff --git a/synapse/replication/http/account_data.py b/synapse/replication/http/account_data.py index 70e951af63..5f0f225aa9 100644 --- a/synapse/replication/http/account_data.py +++ b/synapse/replication/http/account_data.py @@ -13,10 +13,14 @@ # limitations under the License. import logging +from typing import TYPE_CHECKING from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -37,7 +41,7 @@ class ReplicationUserAccountDataRestServlet(ReplicationEndpoint): PATH_ARGS = ("user_id", "account_data_type") CACHE = False - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.handler = hs.get_account_data_handler() @@ -78,7 +82,7 @@ class ReplicationRoomAccountDataRestServlet(ReplicationEndpoint): PATH_ARGS = ("user_id", "room_id", "account_data_type") CACHE = False - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.handler = hs.get_account_data_handler() @@ -119,7 +123,7 @@ class ReplicationAddTagRestServlet(ReplicationEndpoint): PATH_ARGS = ("user_id", "room_id", "tag") CACHE = False - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.handler = hs.get_account_data_handler() @@ -162,7 +166,7 @@ class ReplicationRemoveTagRestServlet(ReplicationEndpoint): ) CACHE = False - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.handler = hs.get_account_data_handler() @@ -183,7 +187,7 @@ class ReplicationRemoveTagRestServlet(ReplicationEndpoint): return 200, {"max_stream_id": max_stream_id} -def register_servlets(hs, http_server): +def register_servlets(hs: "HomeServer", http_server): ReplicationUserAccountDataRestServlet(hs).register(http_server) ReplicationRoomAccountDataRestServlet(hs).register(http_server) ReplicationAddTagRestServlet(hs).register(http_server) diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py index 5a5818ef61..42dffb39cb 100644 --- a/synapse/replication/http/devices.py +++ b/synapse/replication/http/devices.py @@ -13,9 +13,13 @@ # limitations under the License. import logging +from typing import TYPE_CHECKING from synapse.replication.http._base import ReplicationEndpoint +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -51,7 +55,7 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint): PATH_ARGS = ("user_id",) CACHE = False - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.device_list_updater = hs.get_device_handler().device_list_updater @@ -68,5 +72,5 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint): return 200, user_devices -def register_servlets(hs, http_server): +def register_servlets(hs: "HomeServer", http_server): ReplicationUserDevicesResyncRestServlet(hs).register(http_server) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index a0b3145f4e..5ed535c90d 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +from typing import TYPE_CHECKING from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import make_event_from_dict @@ -21,6 +22,9 @@ from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint from synapse.util.metrics import Measure +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -56,7 +60,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): NAME = "fed_send_events" PATH_ARGS = () - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.store = hs.get_datastore() @@ -151,7 +155,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): NAME = "fed_send_edu" PATH_ARGS = ("edu_type",) - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.store = hs.get_datastore() @@ -194,7 +198,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint): # This is a query, so let's not bother caching CACHE = False - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.store = hs.get_datastore() @@ -238,7 +242,7 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint): NAME = "fed_cleanup_room" PATH_ARGS = ("room_id",) - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.store = hs.get_datastore() @@ -273,7 +277,7 @@ class ReplicationStoreRoomOnOutlierMembershipRestServlet(ReplicationEndpoint): NAME = "store_room_on_outlier_membership" PATH_ARGS = ("room_id",) - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.store = hs.get_datastore() @@ -289,7 +293,7 @@ class ReplicationStoreRoomOnOutlierMembershipRestServlet(ReplicationEndpoint): return 200, {} -def register_servlets(hs, http_server): +def register_servlets(hs: "HomeServer", http_server): ReplicationFederationSendEventsRestServlet(hs).register(http_server) ReplicationFederationSendEduRestServlet(hs).register(http_server) ReplicationGetQueryRestServlet(hs).register(http_server) diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py index 550bd5c95f..0db419ea57 100644 --- a/synapse/replication/http/login.py +++ b/synapse/replication/http/login.py @@ -13,10 +13,14 @@ # limitations under the License. import logging +from typing import TYPE_CHECKING from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -30,7 +34,7 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint): NAME = "device_check_registered" PATH_ARGS = ("user_id",) - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.registration_handler = hs.get_registration_handler() @@ -82,5 +86,5 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint): return 200, res -def register_servlets(hs, http_server): +def register_servlets(hs: "HomeServer", http_server): RegisterDeviceReplicationServlet(hs).register(http_server) diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index 34206c5060..7371c240b2 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -45,7 +45,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint): NAME = "remote_join" PATH_ARGS = ("room_id", "user_id") - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.federation_handler = hs.get_federation_handler() @@ -320,7 +320,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): PATH_ARGS = ("room_id", "user_id", "change") CACHE = False # No point caching as should return instantly. - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.registeration_handler = hs.get_registration_handler() @@ -360,7 +360,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): return 200, {} -def register_servlets(hs, http_server): +def register_servlets(hs: "HomeServer", http_server): ReplicationRemoteJoinRestServlet(hs).register(http_server) ReplicationRemoteRejectInviteRestServlet(hs).register(http_server) ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server) diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py index bb00247953..63143085d5 100644 --- a/synapse/replication/http/presence.py +++ b/synapse/replication/http/presence.py @@ -117,6 +117,6 @@ class ReplicationPresenceSetState(ReplicationEndpoint): ) -def register_servlets(hs, http_server): +def register_servlets(hs: "HomeServer", http_server): ReplicationBumpPresenceActiveTime(hs).register(http_server) ReplicationPresenceSetState(hs).register(http_server) diff --git a/synapse/replication/http/push.py b/synapse/replication/http/push.py index 139427cb1f..6c8db3061e 100644 --- a/synapse/replication/http/push.py +++ b/synapse/replication/http/push.py @@ -67,5 +67,5 @@ class ReplicationRemovePusherRestServlet(ReplicationEndpoint): return 200, {} -def register_servlets(hs, http_server): +def register_servlets(hs: "HomeServer", http_server): ReplicationRemovePusherRestServlet(hs).register(http_server) diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py index d6dd7242eb..7adfbb666f 100644 --- a/synapse/replication/http/register.py +++ b/synapse/replication/http/register.py @@ -13,10 +13,14 @@ # limitations under the License. import logging +from typing import TYPE_CHECKING from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -26,7 +30,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint): NAME = "register_user" PATH_ARGS = ("user_id",) - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.store = hs.get_datastore() self.registration_handler = hs.get_registration_handler() @@ -100,7 +104,7 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint): NAME = "post_register" PATH_ARGS = ("user_id",) - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.store = hs.get_datastore() self.registration_handler = hs.get_registration_handler() @@ -130,6 +134,6 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint): return 200, {} -def register_servlets(hs, http_server): +def register_servlets(hs: "HomeServer", http_server): ReplicationRegisterServlet(hs).register(http_server) ReplicationPostRegisterActionsServlet(hs).register(http_server) diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index fae5ffa451..9f6851d059 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +from typing import TYPE_CHECKING from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import make_event_from_dict @@ -22,6 +23,9 @@ from synapse.replication.http._base import ReplicationEndpoint from synapse.types import Requester, UserID from synapse.util.metrics import Measure +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -57,7 +61,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): NAME = "send_event" PATH_ARGS = ("event_id",) - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.event_creation_handler = hs.get_event_creation_handler() @@ -135,5 +139,5 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): ) -def register_servlets(hs, http_server): +def register_servlets(hs: "HomeServer", http_server): ReplicationSendEventRestServlet(hs).register(http_server) diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py index 9afa147d00..3223bc2432 100644 --- a/synapse/replication/http/streams.py +++ b/synapse/replication/http/streams.py @@ -13,11 +13,15 @@ # limitations under the License. import logging +from typing import TYPE_CHECKING from synapse.api.errors import SynapseError from synapse.http.servlet import parse_integer from synapse.replication.http._base import ReplicationEndpoint +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -46,7 +50,7 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint): PATH_ARGS = ("stream_name",) METHOD = "GET" - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self._instance_name = hs.get_instance_name() @@ -74,5 +78,5 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint): ) -def register_servlets(hs, http_server): +def register_servlets(hs: "HomeServer", http_server): ReplicationGetStreamUpdates(hs).register(http_server) diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index e460dd85cd..7ecb446e7c 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -13,18 +13,21 @@ # limitations under the License. import logging -from typing import Optional +from typing import TYPE_CHECKING, Optional from synapse.storage.database import DatabasePool from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) class BaseSlavedStore(CacheInvalidationWorkerStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen: Optional[ diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py index 436d39c320..61cd7e5228 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py @@ -12,15 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING + from synapse.storage.database import DatabasePool from synapse.storage.databases.main.client_ips import LAST_SEEN_GRANULARITY from synapse.util.caches.lrucache import LruCache from ._base import BaseSlavedStore +if TYPE_CHECKING: + from synapse.server import HomeServer + class SlavedClientIpStore(BaseSlavedStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.client_ip_last_seen: LruCache[tuple, int] = LruCache( diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index 26bdead565..0a58296089 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING + from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream @@ -20,9 +22,12 @@ from synapse.storage.databases.main.devices import DeviceWorkerStore from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore from synapse.util.caches.stream_change_cache import StreamChangeCache +if TYPE_CHECKING: + from synapse.server import HomeServer + class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.hs = hs diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index d4d3f8c448..63ed50caa5 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import TYPE_CHECKING from synapse.storage.database import DatabasePool from synapse.storage.databases.main.event_federation import EventFederationWorkerStore @@ -30,6 +31,9 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache from ._base import BaseSlavedStore +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -54,7 +58,7 @@ class SlavedEventStore( RelationsWorkerStore, BaseSlavedStore, ): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) events_max = self._stream_id_gen.get_current_token() diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py index 37875bc973..90284c202d 100644 --- a/synapse/replication/slave/storage/filtering.py +++ b/synapse/replication/slave/storage/filtering.py @@ -12,14 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING + from synapse.storage.database import DatabasePool from synapse.storage.databases.main.filtering import FilteringStore from ._base import BaseSlavedStore +if TYPE_CHECKING: + from synapse.server import HomeServer + class SlavedFilteringStore(BaseSlavedStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) # Filters are immutable so this cache doesn't need to be expired diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py index e9bdc38470..497e16c69e 100644 --- a/synapse/replication/slave/storage/groups.py +++ b/synapse/replication/slave/storage/groups.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING + from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.replication.tcp.streams import GroupServerStream @@ -19,9 +21,12 @@ from synapse.storage.database import DatabasePool from synapse.storage.databases.main.group_server import GroupServerWorkerStore from synapse.util.caches.stream_change_cache import StreamChangeCache +if TYPE_CHECKING: + from synapse.server import HomeServer + class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.hs = hs diff --git a/synapse/replication/tcp/external_cache.py b/synapse/replication/tcp/external_cache.py index b402f82810..aaf91e5e02 100644 --- a/synapse/replication/tcp/external_cache.py +++ b/synapse/replication/tcp/external_cache.py @@ -21,6 +21,8 @@ from synapse.logging.context import make_deferred_yieldable from synapse.util import json_decoder, json_encoder if TYPE_CHECKING: + from txredisapi import RedisProtocol + from synapse.server import HomeServer set_counter = Counter( @@ -59,7 +61,12 @@ class ExternalCache: """ def __init__(self, hs: "HomeServer"): - self._redis_connection = hs.get_outbound_redis_connection() + if hs.config.redis.redis_enabled: + self._redis_connection: Optional[ + "RedisProtocol" + ] = hs.get_outbound_redis_connection() + else: + self._redis_connection = None def _get_redis_key(self, cache_name: str, key: str) -> str: return "cache_v1:%s:%s" % (cache_name, key) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 6aa9318027..06fd06fdf3 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -294,7 +294,7 @@ class ReplicationCommandHandler: # This shouldn't be possible raise Exception("Unrecognised command %s in stream queue", cmd.NAME) - def start_replication(self, hs): + def start_replication(self, hs: "HomeServer"): """Helper method to start a replication connection to the remote server using TCP. """ @@ -321,6 +321,8 @@ class ReplicationCommandHandler: hs.config.redis.redis_host, # type: ignore[arg-type] hs.config.redis.redis_port, self._factory, + timeout=30, + bindAddress=None, ) else: client_name = hs.get_instance_name() @@ -331,6 +333,8 @@ class ReplicationCommandHandler: host, # type: ignore[arg-type] port, self._factory, + timeout=30, + bindAddress=None, ) def get_streams(self) -> Dict[str, Stream]: diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 80f9b23bfd..55326877fd 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -16,6 +16,7 @@ import logging import random +from typing import TYPE_CHECKING from prometheus_client import Counter @@ -27,6 +28,9 @@ from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol from synapse.replication.tcp.streams import EventsStream from synapse.util.metrics import Measure +if TYPE_CHECKING: + from synapse.server import HomeServer + stream_updates_counter = Counter( "synapse_replication_tcp_resource_stream_updates", "", ["stream_name"] ) @@ -37,7 +41,7 @@ logger = logging.getLogger(__name__) class ReplicationStreamProtocolFactory(Factory): """Factory for new replication connections.""" - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.command_handler = hs.get_tcp_replication() self.clock = hs.get_clock() self.server_name = hs.config.server.server_name @@ -65,7 +69,7 @@ class ReplicationStreamer: data is available it will propagate to all connected clients. """ - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() self.clock = hs.get_clock() self.notifier = hs.get_notifier() diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 9b905aba9d..c8b188ae4e 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -241,7 +241,7 @@ class BackfillStream(Stream): NAME = "backfill" ROW_TYPE = BackfillStreamRow - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() super().__init__( hs.get_instance_name(), @@ -363,7 +363,7 @@ class ReceiptsStream(Stream): NAME = "receipts" ROW_TYPE = ReceiptsStreamRow - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): store = hs.get_datastore() super().__init__( hs.get_instance_name(), @@ -380,7 +380,7 @@ class PushRulesStream(Stream): NAME = "push_rules" ROW_TYPE = PushRulesStreamRow - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() super().__init__( @@ -405,7 +405,7 @@ class PushersStream(Stream): NAME = "pushers" ROW_TYPE = PushersStreamRow - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): store = hs.get_datastore() super().__init__( @@ -438,7 +438,7 @@ class CachesStream(Stream): NAME = "caches" ROW_TYPE = CachesStreamRow - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): store = hs.get_datastore() super().__init__( hs.get_instance_name(), @@ -459,7 +459,7 @@ class DeviceListsStream(Stream): NAME = "device_lists" ROW_TYPE = DeviceListsStreamRow - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): store = hs.get_datastore() super().__init__( hs.get_instance_name(), @@ -476,7 +476,7 @@ class ToDeviceStream(Stream): NAME = "to_device" ROW_TYPE = ToDeviceStreamRow - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): store = hs.get_datastore() super().__init__( hs.get_instance_name(), @@ -495,7 +495,7 @@ class TagAccountDataStream(Stream): NAME = "tag_account_data" ROW_TYPE = TagAccountDataStreamRow - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): store = hs.get_datastore() super().__init__( hs.get_instance_name(), @@ -582,7 +582,7 @@ class GroupServerStream(Stream): NAME = "groups" ROW_TYPE = GroupsStreamRow - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): store = hs.get_datastore() super().__init__( hs.get_instance_name(), @@ -599,7 +599,7 @@ class UserSignatureStream(Stream): NAME = "user_signature" ROW_TYPE = UserSignatureStreamRow - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): store = hs.get_datastore() super().__init__( hs.get_instance_name(), diff --git a/synapse/rest/admin/devices.py b/synapse/rest/admin/devices.py index a6fa03c90f..80fbf32f17 100644 --- a/synapse/rest/admin/devices.py +++ b/synapse/rest/admin/devices.py @@ -110,7 +110,7 @@ class DevicesRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): """ Args: - hs (synapse.server.HomeServer): server + hs: server """ self.hs = hs self.auth = hs.get_auth() diff --git a/synapse/server.py b/synapse/server.py index a64c846d1c..0fbf36ba99 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -800,9 +800,14 @@ class HomeServer(metaclass=abc.ABCMeta): return ExternalCache(self) @cache_in_self - def get_outbound_redis_connection(self) -> Optional["RedisProtocol"]: - if not self.config.redis.redis_enabled: - return None + def get_outbound_redis_connection(self) -> "RedisProtocol": + """ + The Redis connection used for replication. + + Raises: + AssertionError: if Redis is not enabled in the homeserver config. + """ + assert self.config.redis.redis_enabled # We only want to import redis module if we're using it, as we have # `txredisapi` as an optional dependency. diff --git a/synapse/storage/database.py b/synapse/storage/database.py index f5a8f90a0f..fa4e89d35c 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -19,6 +19,7 @@ from collections import defaultdict from sys import intern from time import monotonic as monotonic_time from typing import ( + TYPE_CHECKING, Any, Callable, Collection, @@ -52,6 +53,9 @@ from synapse.storage.background_updates import BackgroundUpdater from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.types import Connection, Cursor +if TYPE_CHECKING: + from synapse.server import HomeServer + # python 3 does not have a maximum int value MAX_TXN_ID = 2 ** 63 - 1 @@ -392,7 +396,7 @@ class DatabasePool: def __init__( self, - hs, + hs: "HomeServer", database_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine, ): diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index 20b755056b..cfe887b7f7 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -13,33 +13,49 @@ # limitations under the License. import logging +from typing import TYPE_CHECKING, Generic, List, Optional, Type, TypeVar +from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool, make_conn from synapse.storage.databases.main.events import PersistEventsStore from synapse.storage.databases.state import StateGroupDataStore from synapse.storage.engines import create_engine from synapse.storage.prepare_database import prepare_database +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) -class Databases: +DataStoreT = TypeVar("DataStoreT", bound=SQLBaseStore, covariant=True) + + +class Databases(Generic[DataStoreT]): """The various databases. These are low level interfaces to physical databases. Attributes: - main (DataStore) + databases + main + state + persist_events """ - def __init__(self, main_store_class, hs): + databases: List[DatabasePool] + main: DataStoreT + state: StateGroupDataStore + persist_events: Optional[PersistEventsStore] + + def __init__(self, main_store_class: Type[DataStoreT], hs: "HomeServer"): # Note we pass in the main store class here as workers use a different main # store. self.databases = [] - main = None - state = None - persist_events = None + main: Optional[DataStoreT] = None + state: Optional[StateGroupDataStore] = None + persist_events: Optional[PersistEventsStore] = None for database_config in hs.config.database.databases: db_name = database_config.name diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 5c21402dea..259cae5b37 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -15,7 +15,7 @@ # limitations under the License. import logging -from typing import List, Optional, Tuple +from typing import TYPE_CHECKING, List, Optional, Tuple from synapse.config.homeserver import HomeServerConfig from synapse.storage.database import DatabasePool @@ -75,6 +75,9 @@ from .ui_auth import UIAuthStore from .user_directory import UserDirectoryStore from .user_erasure_store import UserErasureStore +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -126,7 +129,7 @@ class DataStore( LockStore, SessionStore, ): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): self.hs = hs self._clock = hs.get_clock() self.database_engine = database.engine diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 70ca3e09f7..f8bec266ac 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import Dict, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple from synapse.api.constants import AccountDataTypes from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker @@ -28,6 +28,9 @@ from synapse.util import json_encoder from synapse.util.caches.descriptors import cached from synapse.util.caches.stream_change_cache import StreamChangeCache +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -36,7 +39,7 @@ class AccountDataWorkerStore(SQLBaseStore): `get_max_account_data_stream_id` which can be called in the initializer. """ - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): self._instance_name = hs.get_instance_name() if isinstance(database.engine, PostgresEngine): diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index c57ae5ef15..36e8422fc6 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -15,7 +15,7 @@ import itertools import logging -from typing import Any, Iterable, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple from synapse.api.constants import EventTypes from synapse.replication.tcp.streams import BackfillStream, CachesStream @@ -29,6 +29,9 @@ from synapse.storage.database import DatabasePool from synapse.storage.engines import PostgresEngine from synapse.util.iterutils import batch_iter +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -38,7 +41,7 @@ CURRENT_STATE_CACHE_NAME = "cs_cache_fake" class CacheInvalidationWorkerStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self._instance_name = hs.get_instance_name() diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 3154906d45..8143168107 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import List, Optional, Tuple +from typing import TYPE_CHECKING, List, Optional, Tuple from synapse.logging import issue9533_logger from synapse.logging.opentracing import log_kv, set_tag, trace @@ -26,11 +26,14 @@ from synapse.util import json_encoder from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.stream_change_cache import StreamChangeCache +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) class DeviceInboxWorkerStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self._instance_name = hs.get_instance_name() @@ -553,7 +556,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): class DeviceInboxBackgroundUpdateStore(SQLBaseStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.db_pool.updates.register_background_index_update( diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 6464520386..a01bf2c5b7 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -15,7 +15,17 @@ # limitations under the License. import abc import logging -from typing import Any, Collection, Dict, Iterable, List, Optional, Set, Tuple +from typing import ( + TYPE_CHECKING, + Any, + Collection, + Dict, + Iterable, + List, + Optional, + Set, + Tuple, +) from synapse.api.errors import Codes, StoreError from synapse.logging.opentracing import ( @@ -38,6 +48,9 @@ from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter from synapse.util.stringutils import shortstr +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = ( @@ -48,7 +61,7 @@ BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes" class DeviceWorkerStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) if hs.config.worker.run_background_tasks: @@ -915,7 +928,7 @@ class DeviceWorkerStore(SQLBaseStore): class DeviceBackgroundUpdateStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.db_pool.updates.register_background_index_update( @@ -1047,7 +1060,7 @@ class DeviceBackgroundUpdateStore(SQLBaseStore): class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) # Map of (user_id, device_id) -> bool. If there is an entry that implies diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index ba9f71a230..ef5d1ef01e 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -14,7 +14,7 @@ import itertools import logging from queue import Empty, PriorityQueue -from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple from prometheus_client import Counter, Gauge @@ -34,6 +34,9 @@ from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter +if TYPE_CHECKING: + from synapse.server import HomeServer + oldest_pdu_in_federation_staging = Gauge( "synapse_federation_server_oldest_inbound_pdu_in_staging", "The age in seconds since we received the oldest pdu in the federation staging area", @@ -59,7 +62,7 @@ class _NoChainCoverIndex(Exception): class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) if hs.config.worker.run_background_tasks: @@ -1511,7 +1514,7 @@ class EventFederationStore(EventFederationWorkerStore): EVENT_AUTH_STATE_ONLY = "event_auth_state_only" - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.db_pool.updates.register_background_update_handler( diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 97b3e92d3f..d957e770dc 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Dict, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union import attr @@ -23,6 +23,9 @@ from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.util import json_encoder from synapse.util.caches.descriptors import cached +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -64,7 +67,7 @@ def _deserialize_action(actions, is_highlight): class EventPushActionsWorkerStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) # These get correctly set by _find_stream_orderings_for_times_txn @@ -892,7 +895,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): class EventPushActionsStore(EventPushActionsWorkerStore): EPA_HIGHLIGHT_INDEX = "epa_highlight_index" - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.db_pool.updates.register_background_index_update( diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 1afc59fafb..fc49112063 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple import attr @@ -26,6 +26,9 @@ from synapse.storage.databases.main.events import PersistEventsStore from synapse.storage.types import Cursor from synapse.types import JsonDict +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -76,7 +79,7 @@ class _CalculateChainCover: class EventsBackgroundUpdatesStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.db_pool.updates.register_background_update_handler( diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index 2fa945d171..717487be28 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -13,11 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. from enum import Enum -from typing import Any, Dict, Iterable, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool +if TYPE_CHECKING: + from synapse.server import HomeServer + BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD = ( "media_repository_drop_index_wo_method" ) @@ -43,7 +46,7 @@ class MediaSortOrder(Enum): class MediaRepositoryBackgroundUpdateStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.db_pool.updates.register_background_index_update( @@ -123,7 +126,7 @@ class MediaRepositoryBackgroundUpdateStore(SQLBaseStore): class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): """Persistence for attachments and avatars""" - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.server_name = hs.hostname diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index dac3d14da8..d901933ae4 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -14,7 +14,7 @@ import calendar import logging import time -from typing import Dict +from typing import TYPE_CHECKING, Dict from synapse.metrics import GaugeBucketCollector from synapse.metrics.background_process_metrics import wrap_as_background_process @@ -24,6 +24,9 @@ from synapse.storage.databases.main.event_push_actions import ( EventPushActionsWorkerStore, ) +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) # Collect metrics on the number of forward extremities that exist. @@ -52,7 +55,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): stats and prometheus metrics. """ - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) # Read the extrems every 60 minutes diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py index a14ac03d4b..b5284e4f67 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py @@ -12,13 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Dict, List, Optional +from typing import TYPE_CHECKING, Dict, List, Optional from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool, make_in_list_sql_clause from synapse.util.caches.descriptors import cached +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) # Number of msec of granularity to store the monthly_active_user timestamp @@ -27,7 +30,7 @@ LAST_SEEN_GRANULARITY = 60 * 60 * 1000 class MonthlyActiveUsersWorkerStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self._clock = hs.get_clock() self.hs = hs @@ -209,7 +212,7 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore): class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self._mau_stats_only = hs.config.server.mau_stats_only diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index fc720f5947..fa782023d4 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -14,7 +14,7 @@ # limitations under the License. import abc import logging -from typing import Dict, List, Tuple, Union +from typing import TYPE_CHECKING, Dict, List, Tuple, Union from synapse.api.errors import NotFoundError, StoreError from synapse.push.baserules import list_with_base_rules @@ -33,6 +33,9 @@ from synapse.util import json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -75,7 +78,7 @@ class PushRulesWorkerStore( `get_max_push_rules_stream_id` which can be called in the initializer. """ - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) if hs.config.worker.worker_app is None: diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 01a4281301..c99f8aebdb 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import Any, Dict, Iterable, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple from twisted.internet import defer @@ -29,11 +29,14 @@ from synapse.util import json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) class ReceiptsWorkerStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): self._instance_name = hs.get_instance_name() if isinstance(database.engine, PostgresEngine): diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 835d7889cb..f879bbe7c7 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -17,7 +17,7 @@ import collections import logging from abc import abstractmethod from enum import Enum -from typing import Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from synapse.api.constants import EventContentFields, EventTypes, JoinRules from synapse.api.errors import StoreError @@ -32,6 +32,9 @@ from synapse.util import json_encoder from synapse.util.caches.descriptors import cached from synapse.util.stringutils import MXC_REGEX +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -69,7 +72,7 @@ class RoomSortOrder(Enum): class RoomWorkerStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.config = hs.config @@ -1026,7 +1029,7 @@ _REPLACE_ROOM_DEPTH_SQL_COMMANDS = ( class RoomBackgroundUpdateStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.config = hs.config @@ -1411,7 +1414,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore): class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.config = hs.config diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index ddb162a4fc..4b288bb2e7 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -53,6 +53,7 @@ from synapse.util.caches.descriptors import _CacheContext, cached, cachedList from synapse.util.metrics import Measure if TYPE_CHECKING: + from synapse.server import HomeServer from synapse.state import _StateCacheEntry logger = logging.getLogger(__name__) @@ -63,7 +64,7 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership" class RoomMemberWorkerStore(EventsWorkerStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) # Used by `_get_joined_hosts` to ensure only one thing mutates the cache @@ -982,7 +983,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): class RoomMemberBackgroundUpdateStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.db_pool.updates.register_background_update_handler( _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile @@ -1132,7 +1133,7 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore): class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) async def forget(self, user_id: str, room_id: str) -> None: diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py index c85383c975..7fe233767f 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py @@ -15,7 +15,7 @@ import logging import re from collections import namedtuple -from typing import Collection, Iterable, List, Optional, Set +from typing import TYPE_CHECKING, Collection, Iterable, List, Optional, Set from synapse.api.errors import SynapseError from synapse.events import EventBase @@ -24,6 +24,9 @@ from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.engines import PostgresEngine, Sqlite3Engine +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) SearchEntry = namedtuple( @@ -102,7 +105,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist" EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin" - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) if not hs.config.server.enable_search: @@ -355,7 +358,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): class SearchStore(SearchBackgroundUpdateStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) async def search_msgs(self, room_ids, search_term, keys): diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index a8e8dd4577..fa2c3b1feb 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -15,7 +15,7 @@ import collections.abc import logging from collections import namedtuple -from typing import Iterable, Optional, Set +from typing import TYPE_CHECKING, Iterable, Optional, Set from synapse.api.constants import EventTypes, Membership from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError @@ -30,6 +30,9 @@ from synapse.types import StateMap from synapse.util.caches import intern_string from synapse.util.caches.descriptors import cached, cachedList +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -53,7 +56,7 @@ class _GetStateGroupDelta( class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): """The parts of StateGroupStore that can be called from workers.""" - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) async def get_room_version(self, room_id: str) -> RoomVersion: @@ -346,7 +349,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index" DELETE_CURRENT_STATE_UPDATE_NAME = "delete_old_current_state_events" - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.server_name = hs.hostname @@ -533,5 +536,5 @@ class StateStore(StateGroupWorkerStore, MainStateBackgroundUpdateStore): * `state_groups_state`: Maps state group to state events. """ - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py index e20033bb28..5d7b59d861 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py @@ -16,7 +16,7 @@ import logging from enum import Enum from itertools import chain -from typing import Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from typing_extensions import Counter @@ -29,6 +29,9 @@ from synapse.storage.databases.main.state_deltas import StateDeltasStore from synapse.types import JsonDict from synapse.util.caches.descriptors import cached +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) # these fields track absolutes (e.g. total number of rooms on the server) @@ -93,7 +96,7 @@ class UserSortOrder(Enum): class StatsStore(StateDeltasStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.server_name = hs.hostname diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 860146cd1b..d7dc1f73ac 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -14,7 +14,7 @@ import logging from collections import namedtuple -from typing import Iterable, List, Optional, Tuple +from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple import attr from canonicaljson import encode_canonical_json @@ -26,6 +26,9 @@ from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.types import JsonDict from synapse.util.caches.descriptors import cached +if TYPE_CHECKING: + from synapse.server import HomeServer + db_binary_type = memoryview logger = logging.getLogger(__name__) @@ -57,7 +60,7 @@ class DestinationRetryTimings: class TransactionWorkerStore(CacheInvalidationWorkerStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) if hs.config.worker.run_background_tasks: diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 0e8270746d..402f134d89 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -18,6 +18,7 @@ import itertools import logging from collections import deque from typing import ( + TYPE_CHECKING, Any, Awaitable, Callable, @@ -56,6 +57,9 @@ from synapse.types import ( from synapse.util.async_helpers import ObservableDeferred, yieldable_gather_results from synapse.util.metrics import Measure +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) # The number of times we are recalculating the current state @@ -272,7 +276,7 @@ class EventsPersistenceStorage: current state and forward extremity changes. """ - def __init__(self, hs, stores: Databases): + def __init__(self, hs: "HomeServer", stores: Databases): # We ultimately want to split out the state store from the main store, # so we use separate variables here even though they point to the same # store for now. -- cgit 1.5.1 From 85a09f8b8ba7c8023c0d28a526d32111fc704197 Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Mon, 25 Oct 2021 13:01:04 +0100 Subject: Fix module API's `get_user_ip_and_agents` function when run on workers (#11112) --- changelog.d/11112.bugfix | 1 + synapse/module_api/__init__.py | 6 +- synapse/storage/databases/main/client_ips.py | 124 ++++++++++++++++++--------- 3 files changed, 91 insertions(+), 40 deletions(-) create mode 100644 changelog.d/11112.bugfix (limited to 'synapse/storage/databases') diff --git a/changelog.d/11112.bugfix b/changelog.d/11112.bugfix new file mode 100644 index 0000000000..c8e22da8cf --- /dev/null +++ b/changelog.d/11112.bugfix @@ -0,0 +1 @@ +Fix a bug which caused the module API's `get_user_ip_and_agents` function to always fail on workers. `get_user_ip_and_agents` was introduced in 1.44.0 and did not function correctly on worker processes at the time. diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index ab7ef8f950..d37252b6b3 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -46,6 +46,7 @@ from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.client.login import LoginResponse +from synapse.storage import DataStore from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.roommember import ProfileInfo from synapse.storage.state import StateFilter @@ -61,6 +62,7 @@ from synapse.util import Clock from synapse.util.caches.descriptors import cached if TYPE_CHECKING: + from synapse.app.generic_worker import GenericWorkerSlavedStore from synapse.server import HomeServer """ @@ -111,7 +113,9 @@ class ModuleApi: def __init__(self, hs: "HomeServer", auth_handler): self._hs = hs - self._store = hs.get_datastore() + # TODO: Fix this type hint once the types for the data stores have been ironed + # out. + self._store: Union[DataStore, "GenericWorkerSlavedStore"] = hs.get_datastore() self._auth = hs.get_auth() self._auth_handler = auth_handler self._server_name = hs.hostname diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index b81d9218ce..1dc7f0ebe3 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -478,6 +478,58 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore): return {(d["user_id"], d["device_id"]): d for d in res} + async def get_user_ip_and_agents( + self, user: UserID, since_ts: int = 0 + ) -> List[LastConnectionInfo]: + """Fetch the IPs and user agents for a user since the given timestamp. + + The result might be slightly out of date as client IPs are inserted in batches. + + Args: + user: The user for which to fetch IP addresses and user agents. + since_ts: The timestamp after which to fetch IP addresses and user agents, + in milliseconds. + + Returns: + A list of dictionaries, each containing: + * `access_token`: The access token used. + * `ip`: The IP address used. + * `user_agent`: The last user agent seen for this access token and IP + address combination. + * `last_seen`: The timestamp at which this access token and IP address + combination was last seen, in milliseconds. + + Only the latest user agent for each access token and IP address combination + is available. + """ + user_id = user.to_string() + + def get_recent(txn: LoggingTransaction) -> List[Tuple[str, str, str, int]]: + txn.execute( + """ + SELECT access_token, ip, user_agent, last_seen FROM user_ips + WHERE last_seen >= ? AND user_id = ? + ORDER BY last_seen + DESC + """, + (since_ts, user_id), + ) + return cast(List[Tuple[str, str, str, int]], txn.fetchall()) + + rows = await self.db_pool.runInteraction( + desc="get_user_ip_and_agents", func=get_recent + ) + + return [ + { + "access_token": access_token, + "ip": ip, + "user_agent": user_agent, + "last_seen": last_seen, + } + for access_token, ip, user_agent, last_seen in rows + ] + class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersStore): def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"): @@ -622,49 +674,43 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersStore): async def get_user_ip_and_agents( self, user: UserID, since_ts: int = 0 ) -> List[LastConnectionInfo]: + """Fetch the IPs and user agents for a user since the given timestamp. + + Args: + user: The user for which to fetch IP addresses and user agents. + since_ts: The timestamp after which to fetch IP addresses and user agents, + in milliseconds. + + Returns: + A list of dictionaries, each containing: + * `access_token`: The access token used. + * `ip`: The IP address used. + * `user_agent`: The last user agent seen for this access token and IP + address combination. + * `last_seen`: The timestamp at which this access token and IP address + combination was last seen, in milliseconds. + + Only the latest user agent for each access token and IP address combination + is available. """ - Fetch IP/User Agent connection since a given timestamp. - """ - user_id = user.to_string() - results: Dict[Tuple[str, str], Tuple[str, int]] = {} + results: Dict[Tuple[str, str], LastConnectionInfo] = { + (connection["access_token"], connection["ip"]): connection + for connection in await super().get_user_ip_and_agents(user, since_ts) + } + # Overlay data that is pending insertion on top of the results from the + # database. + user_id = user.to_string() for key in self._batch_row_update: - ( - uid, - access_token, - ip, - ) = key + uid, access_token, ip = key if uid == user_id: user_agent, _, last_seen = self._batch_row_update[key] if last_seen >= since_ts: - results[(access_token, ip)] = (user_agent, last_seen) - - def get_recent(txn: LoggingTransaction) -> List[Tuple[str, str, str, int]]: - txn.execute( - """ - SELECT access_token, ip, user_agent, last_seen FROM user_ips - WHERE last_seen >= ? AND user_id = ? - ORDER BY last_seen - DESC - """, - (since_ts, user_id), - ) - return cast(List[Tuple[str, str, str, int]], txn.fetchall()) - - rows = await self.db_pool.runInteraction( - desc="get_user_ip_and_agents", func=get_recent - ) + results[(access_token, ip)] = { + "access_token": access_token, + "ip": ip, + "user_agent": user_agent, + "last_seen": last_seen, + } - results.update( - ((access_token, ip), (user_agent, last_seen)) - for access_token, ip, user_agent, last_seen in rows - ) - return [ - { - "access_token": access_token, - "ip": ip, - "user_agent": user_agent, - "last_seen": last_seen, - } - for (access_token, ip), (user_agent, last_seen) in results.items() - ] + return list(results.values()) -- cgit 1.5.1 From 63cbdd8af081839f245915a18ed57f1a44f1a5f4 Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Tue, 26 Oct 2021 12:01:06 +0300 Subject: Enable changing user type via users admin API (#11174) Users admin API can now also modify user type in addition to allowing it to be set on user creation. Signed-off-by: Jason Robinson Co-authored-by: Brendan Abolivier --- changelog.d/11174.feature | 1 + docs/admin_api/user_admin_api.md | 9 ++++- synapse/rest/admin/users.py | 3 ++ synapse/storage/databases/main/registration.py | 18 +++++++++ tests/rest/admin/test_user.py | 51 ++++++++++++++++++++++++++ 5 files changed, 80 insertions(+), 2 deletions(-) create mode 100644 changelog.d/11174.feature (limited to 'synapse/storage/databases') diff --git a/changelog.d/11174.feature b/changelog.d/11174.feature new file mode 100644 index 0000000000..8eecd92681 --- /dev/null +++ b/changelog.d/11174.feature @@ -0,0 +1 @@ +Users admin API can now also modify user type in addition to allowing it to be set on user creation. diff --git a/docs/admin_api/user_admin_api.md b/docs/admin_api/user_admin_api.md index 534f8400ba..f03539c9f0 100644 --- a/docs/admin_api/user_admin_api.md +++ b/docs/admin_api/user_admin_api.md @@ -50,7 +50,8 @@ It returns a JSON body like the following: "auth_provider": "", "external_id": "" } - ] + ], + "user_type": null } ``` @@ -97,7 +98,8 @@ with a body of: ], "avatar_url": "", "admin": false, - "deactivated": false + "deactivated": false, + "user_type": null } ``` @@ -135,6 +137,9 @@ Body parameters: unchanged on existing accounts and set to `false` for new accounts. A user cannot be erased by deactivating with this API. For details on deactivating users see [Deactivate Account](#deactivate-account). +- `user_type` - string or null, optional. If provided, the user type will be + adjusted. If `null` given, the user type will be cleared. Other + allowed options are: `bot` and `support`. If the user already exists then optional parameters default to the current value. diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index c0bebc3cf0..d14fafbbc9 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -326,6 +326,9 @@ class UserRestServletV2(RestServlet): target_user.to_string() ) + if "user_type" in body: + await self.store.set_user_type(target_user, user_type) + user = await self.admin_handler.get_user(target_user) assert user is not None diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 37d47aa823..6c7d6ba508 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -499,6 +499,24 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn) + async def set_user_type(self, user: UserID, user_type: Optional[UserTypes]) -> None: + """Sets the user type. + + Args: + user: user ID of the user. + user_type: type of the user or None for a user without a type. + """ + + def set_user_type_txn(txn): + self.db_pool.simple_update_one_txn( + txn, "users", {"name": user.to_string()}, {"user_type": user_type} + ) + self._invalidate_cache_and_stream( + txn, self.get_user_by_id, (user.to_string(),) + ) + + await self.db_pool.runInteraction("set_user_type", set_user_type_txn) + def _query_for_auth(self, txn, token: str) -> Optional[TokenLookupResult]: sql = """ SELECT users.name as user_id, diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index 839442ddba..25e8d6cf27 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -2270,6 +2270,57 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual("@user:test", channel.json_body["name"]) self.assertTrue(channel.json_body["admin"]) + def test_set_user_type(self): + """ + Test changing user type. + """ + + # Set to support type + channel = self.make_request( + "PUT", + self.url_other_user, + access_token=self.admin_user_tok, + content={"user_type": UserTypes.SUPPORT}, + ) + + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertEqual("@user:test", channel.json_body["name"]) + self.assertEqual(UserTypes.SUPPORT, channel.json_body["user_type"]) + + # Get user + channel = self.make_request( + "GET", + self.url_other_user, + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertEqual("@user:test", channel.json_body["name"]) + self.assertEqual(UserTypes.SUPPORT, channel.json_body["user_type"]) + + # Change back to a regular user + channel = self.make_request( + "PUT", + self.url_other_user, + access_token=self.admin_user_tok, + content={"user_type": None}, + ) + + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertEqual("@user:test", channel.json_body["name"]) + self.assertIsNone(channel.json_body["user_type"]) + + # Get user + channel = self.make_request( + "GET", + self.url_other_user, + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertEqual("@user:test", channel.json_body["name"]) + self.assertIsNone(channel.json_body["user_type"]) + def test_accidental_deactivation_prevention(self): """ Ensure an account can't accidentally be deactivated by using a str value -- cgit 1.5.1 From d52c58dfa3f548b489dae0b1945cf733d4a6538c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 26 Oct 2021 07:38:45 -0400 Subject: Add a background update for updating MSC3440 relation threads. (#11181) --- changelog.d/11181.feature | 1 + .../storage/databases/main/events_bg_updates.py | 85 +++++++++++++++++++++- .../schema/main/delta/65/02_thread_relations.sql | 18 +++++ 3 files changed, 102 insertions(+), 2 deletions(-) create mode 100644 changelog.d/11181.feature create mode 100644 synapse/storage/schema/main/delta/65/02_thread_relations.sql (limited to 'synapse/storage/databases') diff --git a/changelog.d/11181.feature b/changelog.d/11181.feature new file mode 100644 index 0000000000..76b0d28084 --- /dev/null +++ b/changelog.d/11181.feature @@ -0,0 +1 @@ +Experimental support for the thread relation defined in [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440). diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index fc49112063..f92d824876 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -17,11 +17,15 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Tuple import attr -from synapse.api.constants import EventContentFields +from synapse.api.constants import EventContentFields, RelationTypes from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import make_event_from_dict from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause -from synapse.storage.database import DatabasePool, make_tuple_comparison_clause +from synapse.storage.database import ( + DatabasePool, + LoggingTransaction, + make_tuple_comparison_clause, +) from synapse.storage.databases.main.events import PersistEventsStore from synapse.storage.types import Cursor from synapse.types import JsonDict @@ -167,6 +171,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self._purged_chain_cover_index, ) + self.db_pool.updates.register_background_update_handler( + "event_thread_relation", self._event_thread_relation + ) + ################################################################################ # bg updates for replacing stream_ordering with a BIGINT @@ -1091,6 +1099,79 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): return result + async def _event_thread_relation(self, progress: JsonDict, batch_size: int) -> int: + """Background update handler which will store thread relations for existing events.""" + last_event_id = progress.get("last_event_id", "") + + def _event_thread_relation_txn(txn: LoggingTransaction) -> int: + txn.execute( + """ + SELECT event_id, json FROM event_json + LEFT JOIN event_relations USING (event_id) + WHERE event_id > ? AND relates_to_id IS NULL + ORDER BY event_id LIMIT ? + """, + (last_event_id, batch_size), + ) + + results = list(txn) + missing_thread_relations = [] + for (event_id, event_json_raw) in results: + try: + event_json = db_to_json(event_json_raw) + except Exception as e: + logger.warning( + "Unable to load event %s (no relations will be updated): %s", + event_id, + e, + ) + continue + + # If there's no relation (or it is not a thread), skip! + relates_to = event_json["content"].get("m.relates_to") + if not relates_to or not isinstance(relates_to, dict): + continue + if relates_to.get("rel_type") != RelationTypes.THREAD: + continue + + # Get the parent ID. + parent_id = relates_to.get("event_id") + if not isinstance(parent_id, str): + continue + + missing_thread_relations.append((event_id, parent_id)) + + # Insert the missing data. + self.db_pool.simple_insert_many_txn( + txn=txn, + table="event_relations", + values=[ + { + "event_id": event_id, + "relates_to_Id": parent_id, + "relation_type": RelationTypes.THREAD, + } + for event_id, parent_id in missing_thread_relations + ], + ) + + if results: + latest_event_id = results[-1][0] + self.db_pool.updates._background_update_progress_txn( + txn, "event_thread_relation", {"last_event_id": latest_event_id} + ) + + return len(results) + + num_rows = await self.db_pool.runInteraction( + desc="event_thread_relation", func=_event_thread_relation_txn + ) + + if not num_rows: + await self.db_pool.updates._end_background_update("event_thread_relation") + + return num_rows + async def _background_populate_stream_ordering2( self, progress: JsonDict, batch_size: int ) -> int: diff --git a/synapse/storage/schema/main/delta/65/02_thread_relations.sql b/synapse/storage/schema/main/delta/65/02_thread_relations.sql new file mode 100644 index 0000000000..d60517f7b4 --- /dev/null +++ b/synapse/storage/schema/main/delta/65/02_thread_relations.sql @@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Check old events for thread relations. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6502, 'event_thread_relation', '{}'); -- cgit 1.5.1 From 72626b78ef4aa9ab0bd11e332495f34bd43bbc26 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Oct 2021 12:33:21 +0100 Subject: Fix thread BG update to not seq scan event_json (#11192) For some reason the query optimiser decided to seq scan both tables, rather than index scanning `event_json`. --- changelog.d/11192.feature | 1 + synapse/storage/databases/main/events_bg_updates.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/11192.feature (limited to 'synapse/storage/databases') diff --git a/changelog.d/11192.feature b/changelog.d/11192.feature new file mode 100644 index 0000000000..76b0d28084 --- /dev/null +++ b/changelog.d/11192.feature @@ -0,0 +1 @@ +Experimental support for the thread relation defined in [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440). diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index f92d824876..ae3a8a63e4 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1108,7 +1108,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): """ SELECT event_id, json FROM event_json LEFT JOIN event_relations USING (event_id) - WHERE event_id > ? AND relates_to_id IS NULL + WHERE event_id > ? AND event_relations.event_id IS NULL ORDER BY event_id LIMIT ? """, (last_event_id, batch_size), -- cgit 1.5.1 From 8d46fac98e07ac319c7ae21dfc24502993de3f1d Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Wed, 27 Oct 2021 17:01:18 +0200 Subject: Delete messages from `device_inbox` table when deleting device (#10969) Fixes: #9346 --- changelog.d/10969.bugfix | 1 + synapse/storage/databases/main/deviceinbox.py | 92 +++++++++++++++++++++- synapse/storage/databases/main/devices.py | 35 ++++---- .../02remove_deleted_devices_from_device_inbox.sql | 22 ++++++ tests/handlers/test_device.py | 31 ++++++++ tests/storage/databases/main/test_deviceinbox.py | 90 +++++++++++++++++++++ 6 files changed, 256 insertions(+), 15 deletions(-) create mode 100644 changelog.d/10969.bugfix create mode 100644 synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql create mode 100644 tests/storage/databases/main/test_deviceinbox.py (limited to 'synapse/storage/databases') diff --git a/changelog.d/10969.bugfix b/changelog.d/10969.bugfix new file mode 100644 index 0000000000..89c299b8e8 --- /dev/null +++ b/changelog.d/10969.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where messages in the `device_inbox` table for deleted devices would persist indefinitely. Contributed by @dklimpel and @JohannesKleine. diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 8143168107..b0ccab0c9b 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -19,9 +19,10 @@ from synapse.logging import issue9533_logger from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.replication.tcp.streams import ToDeviceStream from synapse.storage._base import SQLBaseStore, db_to_json -from synapse.storage.database import DatabasePool +from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator +from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -555,6 +556,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): class DeviceInboxBackgroundUpdateStore(SQLBaseStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" + REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox" def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) @@ -570,6 +572,11 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox ) + self.db_pool.updates.register_background_update_handler( + self.REMOVE_DELETED_DEVICES, + self._remove_deleted_devices_from_device_inbox, + ) + async def _background_drop_index_device_inbox(self, progress, batch_size): def reindex_txn(conn): txn = conn.cursor() @@ -582,6 +589,89 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): return 1 + async def _remove_deleted_devices_from_device_inbox( + self, progress: JsonDict, batch_size: int + ) -> int: + """A background update that deletes all device_inboxes for deleted devices. + + This should only need to be run once (when users upgrade to v1.46.0) + + Args: + progress: JsonDict used to store progress of this background update + batch_size: the maximum number of rows to retrieve in a single select query + + Returns: + The number of deleted rows + """ + + def _remove_deleted_devices_from_device_inbox_txn( + txn: LoggingTransaction, + ) -> int: + """stream_id is not unique + we need to use an inclusive `stream_id >= ?` clause, + since we might not have deleted all dead device messages for the stream_id + returned from the previous query + + Then delete only rows matching the `(user_id, device_id, stream_id)` tuple, + to avoid problems of deleting a large number of rows all at once + due to a single device having lots of device messages. + """ + + last_stream_id = progress.get("stream_id", 0) + + sql = """ + SELECT device_id, user_id, stream_id + FROM device_inbox + WHERE + stream_id >= ? + AND (device_id, user_id) NOT IN ( + SELECT device_id, user_id FROM devices + ) + ORDER BY stream_id + LIMIT ? + """ + + txn.execute(sql, (last_stream_id, batch_size)) + rows = txn.fetchall() + + num_deleted = 0 + for row in rows: + num_deleted += self.db_pool.simple_delete_txn( + txn, + "device_inbox", + {"device_id": row[0], "user_id": row[1], "stream_id": row[2]}, + ) + + if rows: + # send more than stream_id to progress + # otherwise it can happen in large deployments that + # no change of status is visible in the log file + # it may be that the stream_id does not change in several runs + self.db_pool.updates._background_update_progress_txn( + txn, + self.REMOVE_DELETED_DEVICES, + { + "device_id": rows[-1][0], + "user_id": rows[-1][1], + "stream_id": rows[-1][2], + }, + ) + + return num_deleted + + number_deleted = await self.db_pool.runInteraction( + "_remove_deleted_devices_from_device_inbox", + _remove_deleted_devices_from_device_inbox_txn, + ) + + # The task is finished when no more lines are deleted. + if not number_deleted: + await self.db_pool.updates._end_background_update( + self.REMOVE_DELETED_DEVICES + ) + + return number_deleted + class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): pass diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index a01bf2c5b7..b15cd030e0 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1134,19 +1134,14 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): raise StoreError(500, "Problem storing device.") async def delete_device(self, user_id: str, device_id: str) -> None: - """Delete a device. + """Delete a device and its device_inbox. Args: user_id: The ID of the user which owns the device device_id: The ID of the device to delete """ - await self.db_pool.simple_delete_one( - table="devices", - keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False}, - desc="delete_device", - ) - self.device_id_exists_cache.invalidate((user_id, device_id)) + await self.delete_devices(user_id, [device_id]) async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: """Deletes several devices. @@ -1155,13 +1150,25 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): user_id: The ID of the user which owns the devices device_ids: The IDs of the devices to delete """ - await self.db_pool.simple_delete_many( - table="devices", - column="device_id", - iterable=device_ids, - keyvalues={"user_id": user_id, "hidden": False}, - desc="delete_devices", - ) + + def _delete_devices_txn(txn: LoggingTransaction) -> None: + self.db_pool.simple_delete_many_txn( + txn, + table="devices", + column="device_id", + values=device_ids, + keyvalues={"user_id": user_id, "hidden": False}, + ) + + self.db_pool.simple_delete_many_txn( + txn, + table="device_inbox", + column="device_id", + values=device_ids, + keyvalues={"user_id": user_id}, + ) + + await self.db_pool.runInteraction("delete_devices", _delete_devices_txn) for device_id in device_ids: self.device_id_exists_cache.invalidate((user_id, device_id)) diff --git a/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql b/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql new file mode 100644 index 0000000000..efe702f621 --- /dev/null +++ b/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql @@ -0,0 +1,22 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- Remove messages from the device_inbox table which were orphaned +-- when a device was deleted using Synapse earlier than 1.46.0. +-- This runs as background task, but may take a bit to finish. + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6402, 'remove_deleted_devices_from_device_inbox', '{}'); diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 3ac48e5e95..43031e07ea 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -160,6 +160,37 @@ class DeviceTestCase(unittest.HomeserverTestCase): # we'd like to check the access token was invalidated, but that's a # bit of a PITA. + def test_delete_device_and_device_inbox(self): + self._record_users() + + # add an device_inbox + self.get_success( + self.store.db_pool.simple_insert( + "device_inbox", + { + "user_id": user1, + "device_id": "abc", + "stream_id": 1, + "message_json": "{}", + }, + ) + ) + + # delete the device + self.get_success(self.handler.delete_device(user1, "abc")) + + # check that the device_inbox was deleted + res = self.get_success( + self.store.db_pool.simple_select_one( + table="device_inbox", + keyvalues={"user_id": user1, "device_id": "abc"}, + retcols=("user_id", "device_id"), + allow_none=True, + desc="get_device_id_from_device_inbox", + ) + ) + self.assertIsNone(res) + def test_update_device(self): self._record_users() diff --git a/tests/storage/databases/main/test_deviceinbox.py b/tests/storage/databases/main/test_deviceinbox.py new file mode 100644 index 0000000000..4cfd2677f7 --- /dev/null +++ b/tests/storage/databases/main/test_deviceinbox.py @@ -0,0 +1,90 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.rest import admin +from synapse.rest.client import devices + +from tests.unittest import HomeserverTestCase + + +class DeviceInboxBackgroundUpdateStoreTestCase(HomeserverTestCase): + + servlets = [ + admin.register_servlets, + devices.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + self.store = hs.get_datastore() + self.user_id = self.register_user("foo", "pass") + + def test_background_remove_deleted_devices_from_device_inbox(self): + """Test that the background task to delete old device_inboxes works properly.""" + + # create a valid device + self.get_success( + self.store.store_device(self.user_id, "cur_device", "display_name") + ) + + # Add device_inbox to devices + self.get_success( + self.store.db_pool.simple_insert( + "device_inbox", + { + "user_id": self.user_id, + "device_id": "cur_device", + "stream_id": 1, + "message_json": "{}", + }, + ) + ) + self.get_success( + self.store.db_pool.simple_insert( + "device_inbox", + { + "user_id": self.user_id, + "device_id": "old_device", + "stream_id": 2, + "message_json": "{}", + }, + ) + ) + + # Insert and run the background update. + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": "remove_deleted_devices_from_device_inbox", + "progress_json": "{}", + }, + ) + ) + + # ... and tell the DataStore that it hasn't finished all updates yet + self.store.db_pool.updates._all_done = False + + self.wait_for_background_updates() + + # Make sure the background task deleted old device_inbox + res = self.get_success( + self.store.db_pool.simple_select_onecol( + table="device_inbox", + keyvalues={}, + retcol="device_id", + desc="get_device_id_from_device_inbox", + ) + ) + self.assertEqual(1, len(res)) + self.assertEqual(res[0], "cur_device") -- cgit 1.5.1 From 75ca0a6168f92dab3255839cf85fb0df3a0076c3 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Wed, 27 Oct 2021 17:27:23 +0100 Subject: Annotate `log_function` decorator (#10943) Co-authored-by: Patrick Cloke --- changelog.d/10943.misc | 1 + synapse/federation/federation_client.py | 17 +++++++++++++++-- synapse/federation/federation_server.py | 10 ++++++---- synapse/federation/sender/transaction_manager.py | 1 - synapse/federation/transport/client.py | 22 ++++++++++++++++++---- synapse/handlers/directory.py | 2 +- synapse/handlers/federation_event.py | 2 +- synapse/handlers/presence.py | 2 ++ synapse/handlers/profile.py | 4 ++++ synapse/logging/utils.py | 8 ++++++-- synapse/state/__init__.py | 5 +++-- synapse/storage/databases/main/profile.py | 2 +- 12 files changed, 58 insertions(+), 18 deletions(-) create mode 100644 changelog.d/10943.misc (limited to 'synapse/storage/databases') diff --git a/changelog.d/10943.misc b/changelog.d/10943.misc new file mode 100644 index 0000000000..3ce28d1a67 --- /dev/null +++ b/changelog.d/10943.misc @@ -0,0 +1 @@ +Add type annotations for the `log_function` decorator. diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 2ab4dec88f..670186f548 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -227,7 +227,7 @@ class FederationClient(FederationBase): ) async def backfill( - self, dest: str, room_id: str, limit: int, extremities: Iterable[str] + self, dest: str, room_id: str, limit: int, extremities: Collection[str] ) -> Optional[List[EventBase]]: """Requests some more historic PDUs for the given room from the given destination server. @@ -237,6 +237,8 @@ class FederationClient(FederationBase): room_id: The room_id to backfill. limit: The maximum number of events to return. extremities: our current backwards extremities, to backfill from + Must be a Collection that is falsy when empty. + (Iterable is not enough here!) """ logger.debug("backfill extrem=%s", extremities) @@ -250,11 +252,22 @@ class FederationClient(FederationBase): logger.debug("backfill transaction_data=%r", transaction_data) + if not isinstance(transaction_data, dict): + # TODO we probably want an exception type specific to federation + # client validation. + raise TypeError("Backfill transaction_data is not a dict.") + + transaction_data_pdus = transaction_data.get("pdus") + if not isinstance(transaction_data_pdus, list): + # TODO we probably want an exception type specific to federation + # client validation. + raise TypeError("transaction_data.pdus is not a list.") + room_version = await self.store.get_room_version(room_id) pdus = [ event_from_pdu_json(p, room_version, outlier=False) - for p in transaction_data["pdus"] + for p in transaction_data_pdus ] # Check signatures and hash of pdus, removing any from the list that fail checks diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 0d66034f44..32a75993d9 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -295,14 +295,16 @@ class FederationServer(FederationBase): Returns: HTTP response code and body """ - response = await self.transaction_actions.have_responded(origin, transaction) + existing_response = await self.transaction_actions.have_responded( + origin, transaction + ) - if response: + if existing_response: logger.debug( "[%s] We've already responded to this request", transaction.transaction_id, ) - return response + return existing_response logger.debug("[%s] Transaction is new", transaction.transaction_id) @@ -632,7 +634,7 @@ class FederationServer(FederationBase): async def on_make_knock_request( self, origin: str, room_id: str, user_id: str, supported_versions: List[str] - ) -> Dict[str, Union[EventBase, str]]: + ) -> JsonDict: """We've received a /make_knock/ request, so we create a partial knock event for the room and hand that back, along with the room version, to the knocking homeserver. We do *not* persist or process this event until the other server has diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index dc555cca0b..ab935e5a7e 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -149,7 +149,6 @@ class TransactionManager: ) except HttpResponseException as e: code = e.code - response = e.response set_tag(tags.ERROR, True) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 8b247fe206..d963178838 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -15,7 +15,19 @@ import logging import urllib -from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, Union +from typing import ( + Any, + Awaitable, + Callable, + Collection, + Dict, + Iterable, + List, + Mapping, + Optional, + Tuple, + Union, +) import attr import ijson @@ -100,7 +112,7 @@ class TransportLayerClient: @log_function async def backfill( - self, destination: str, room_id: str, event_tuples: Iterable[str], limit: int + self, destination: str, room_id: str, event_tuples: Collection[str], limit: int ) -> Optional[JsonDict]: """Requests `limit` previous PDUs in a given context before list of PDUs. @@ -108,7 +120,9 @@ class TransportLayerClient: Args: destination room_id - event_tuples + event_tuples: + Must be a Collection that is falsy when empty. + (Iterable is not enough here!) limit Returns: @@ -786,7 +800,7 @@ class TransportLayerClient: @log_function def join_group( self, destination: str, group_id: str, user_id: str, content: JsonDict - ) -> JsonDict: + ) -> Awaitable[JsonDict]: """Attempts to join a group""" path = _create_v1_path("/groups/%s/users/%s/join", group_id, user_id) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 8567cb0e00..8ca5f60b1c 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -245,7 +245,7 @@ class DirectoryHandler: servers = result.servers else: try: - fed_result = await self.federation.make_query( + fed_result: Optional[JsonDict] = await self.federation.make_query( destination=room_alias.domain, query_type="directory", args={"room_alias": room_alias.to_string()}, diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index bd1fa08cef..e617db4c0d 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -477,7 +477,7 @@ class FederationEventHandler: @log_function async def backfill( - self, dest: str, room_id: str, limit: int, extremities: Iterable[str] + self, dest: str, room_id: str, limit: int, extremities: Collection[str] ) -> None: """Trigger a backfill request to `dest` for the given `room_id` diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index fdab50da37..3df872c578 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -52,6 +52,7 @@ import synapse.metrics from synapse.api.constants import EventTypes, Membership, PresenceState from synapse.api.errors import SynapseError from synapse.api.presence import UserPresenceState +from synapse.appservice import ApplicationService from synapse.events.presence_router import PresenceRouter from synapse.logging.context import run_in_background from synapse.logging.utils import log_function @@ -1551,6 +1552,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]): is_guest: bool = False, explicit_room_id: Optional[str] = None, include_offline: bool = True, + service: Optional[ApplicationService] = None, ) -> Tuple[List[UserPresenceState], int]: # The process for getting presence events are: # 1. Get the rooms the user is in. diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index e6c3cf585b..6b5a6ded8b 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -456,7 +456,11 @@ class ProfileHandler: continue new_name = profile.get("displayname") + if not isinstance(new_name, str): + new_name = None new_avatar = profile.get("avatar_url") + if not isinstance(new_avatar, str): + new_avatar = None # We always hit update to update the last_check timestamp await self.store.update_remote_profile_cache(user_id, new_name, new_avatar) diff --git a/synapse/logging/utils.py b/synapse/logging/utils.py index 08895e72ee..4a01b902c2 100644 --- a/synapse/logging/utils.py +++ b/synapse/logging/utils.py @@ -16,6 +16,7 @@ import logging from functools import wraps from inspect import getcallargs +from typing import Callable, TypeVar, cast _TIME_FUNC_ID = 0 @@ -41,7 +42,10 @@ def _log_debug_as_f(f, msg, msg_args): logger.handle(record) -def log_function(f): +F = TypeVar("F", bound=Callable) + + +def log_function(f: F) -> F: """Function decorator that logs every call to that function.""" func_name = f.__name__ @@ -69,4 +73,4 @@ def log_function(f): return f(*args, **kwargs) wrapped.__name__ = func_name - return wrapped + return cast(F, wrapped) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 5cf2e12575..98a0239759 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -26,6 +26,7 @@ from typing import ( FrozenSet, Iterable, List, + Mapping, Optional, Sequence, Set, @@ -519,7 +520,7 @@ class StateResolutionHandler: self, room_id: str, room_version: str, - state_groups_ids: Dict[int, StateMap[str]], + state_groups_ids: Mapping[int, StateMap[str]], event_map: Optional[Dict[str, EventBase]], state_res_store: "StateResolutionStore", ) -> _StateCacheEntry: @@ -703,7 +704,7 @@ class StateResolutionHandler: def _make_state_cache_entry( - new_state: StateMap[str], state_groups_ids: Dict[int, StateMap[str]] + new_state: StateMap[str], state_groups_ids: Mapping[int, StateMap[str]] ) -> _StateCacheEntry: """Given a resolved state, and a set of input state groups, pick one to base a new state group on (if any), and return an appropriately-constructed diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py index ba7075caa5..dd8e27e226 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py @@ -91,7 +91,7 @@ class ProfileWorkerStore(SQLBaseStore): ) async def update_remote_profile_cache( - self, user_id: str, displayname: str, avatar_url: str + self, user_id: str, displayname: Optional[str], avatar_url: Optional[str] ) -> int: return await self.db_pool.simple_update( table="remote_profile_cache", -- cgit 1.5.1 From 56e281bf6c4f58929d56e3901856f6d0fa4b1816 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 28 Oct 2021 14:35:12 -0400 Subject: Additional type hints for relations database class. (#11205) --- changelog.d/11205.misc | 1 + mypy.ini | 1 + synapse/storage/databases/main/relations.py | 38 +++++++++++++++++------------ 3 files changed, 25 insertions(+), 15 deletions(-) create mode 100644 changelog.d/11205.misc (limited to 'synapse/storage/databases') diff --git a/changelog.d/11205.misc b/changelog.d/11205.misc new file mode 100644 index 0000000000..62395c9432 --- /dev/null +++ b/changelog.d/11205.misc @@ -0,0 +1 @@ +Improve type hints for the relations datastore. diff --git a/mypy.ini b/mypy.ini index 8f5386c179..119a7d8c91 100644 --- a/mypy.ini +++ b/mypy.ini @@ -53,6 +53,7 @@ files = synapse/storage/databases/main/keys.py, synapse/storage/databases/main/pusher.py, synapse/storage/databases/main/registration.py, + synapse/storage/databases/main/relations.py, synapse/storage/databases/main/session.py, synapse/storage/databases/main/stream.py, synapse/storage/databases/main/ui_auth.py, diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 40760fbd1b..53576ad52f 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -13,13 +13,14 @@ # limitations under the License. import logging -from typing import Optional, Tuple +from typing import List, Optional, Tuple, Union import attr from synapse.api.constants import RelationTypes from synapse.events import EventBase from synapse.storage._base import SQLBaseStore +from synapse.storage.database import LoggingTransaction from synapse.storage.databases.main.stream import generate_pagination_where_clause from synapse.storage.relations import ( AggregationPaginationToken, @@ -63,7 +64,7 @@ class RelationsWorkerStore(SQLBaseStore): """ where_clause = ["relates_to_id = ?"] - where_args = [event_id] + where_args: List[Union[str, int]] = [event_id] if relation_type is not None: where_clause.append("relation_type = ?") @@ -80,8 +81,8 @@ class RelationsWorkerStore(SQLBaseStore): pagination_clause = generate_pagination_where_clause( direction=direction, column_names=("topological_ordering", "stream_ordering"), - from_token=attr.astuple(from_token) if from_token else None, - to_token=attr.astuple(to_token) if to_token else None, + from_token=attr.astuple(from_token) if from_token else None, # type: ignore[arg-type] + to_token=attr.astuple(to_token) if to_token else None, # type: ignore[arg-type] engine=self.database_engine, ) @@ -106,7 +107,9 @@ class RelationsWorkerStore(SQLBaseStore): order, ) - def _get_recent_references_for_event_txn(txn): + def _get_recent_references_for_event_txn( + txn: LoggingTransaction, + ) -> PaginationChunk: txn.execute(sql, where_args + [limit + 1]) last_topo_id = None @@ -160,7 +163,7 @@ class RelationsWorkerStore(SQLBaseStore): """ where_clause = ["relates_to_id = ?", "relation_type = ?"] - where_args = [event_id, RelationTypes.ANNOTATION] + where_args: List[Union[str, int]] = [event_id, RelationTypes.ANNOTATION] if event_type: where_clause.append("type = ?") @@ -169,8 +172,8 @@ class RelationsWorkerStore(SQLBaseStore): having_clause = generate_pagination_where_clause( direction=direction, column_names=("COUNT(*)", "MAX(stream_ordering)"), - from_token=attr.astuple(from_token) if from_token else None, - to_token=attr.astuple(to_token) if to_token else None, + from_token=attr.astuple(from_token) if from_token else None, # type: ignore[arg-type] + to_token=attr.astuple(to_token) if to_token else None, # type: ignore[arg-type] engine=self.database_engine, ) @@ -199,7 +202,9 @@ class RelationsWorkerStore(SQLBaseStore): having_clause=having_clause, ) - def _get_aggregation_groups_for_event_txn(txn): + def _get_aggregation_groups_for_event_txn( + txn: LoggingTransaction, + ) -> PaginationChunk: txn.execute(sql, where_args + [limit + 1]) next_batch = None @@ -254,11 +259,12 @@ class RelationsWorkerStore(SQLBaseStore): LIMIT 1 """ - def _get_applicable_edit_txn(txn): + def _get_applicable_edit_txn(txn: LoggingTransaction) -> Optional[str]: txn.execute(sql, (event_id, RelationTypes.REPLACE)) row = txn.fetchone() if row: return row[0] + return None edit_id = await self.db_pool.runInteraction( "get_applicable_edit", _get_applicable_edit_txn @@ -267,7 +273,7 @@ class RelationsWorkerStore(SQLBaseStore): if not edit_id: return None - return await self.get_event(edit_id, allow_none=True) + return await self.get_event(edit_id, allow_none=True) # type: ignore[attr-defined] @cached() async def get_thread_summary( @@ -283,7 +289,9 @@ class RelationsWorkerStore(SQLBaseStore): The number of items in the thread and the most recent response, if any. """ - def _get_thread_summary_txn(txn) -> Tuple[int, Optional[str]]: + def _get_thread_summary_txn( + txn: LoggingTransaction, + ) -> Tuple[int, Optional[str]]: # Fetch the count of threaded events and the latest event ID. # TODO Should this only allow m.room.message events. sql = """ @@ -312,7 +320,7 @@ class RelationsWorkerStore(SQLBaseStore): AND relation_type = ? """ txn.execute(sql, (event_id, RelationTypes.THREAD)) - count = txn.fetchone()[0] + count = txn.fetchone()[0] # type: ignore[index] return count, latest_event_id @@ -322,7 +330,7 @@ class RelationsWorkerStore(SQLBaseStore): latest_event = None if latest_event_id: - latest_event = await self.get_event(latest_event_id, allow_none=True) + latest_event = await self.get_event(latest_event_id, allow_none=True) # type: ignore[attr-defined] return count, latest_event @@ -354,7 +362,7 @@ class RelationsWorkerStore(SQLBaseStore): LIMIT 1; """ - def _get_if_user_has_annotated_event(txn): + def _get_if_user_has_annotated_event(txn: LoggingTransaction) -> bool: txn.execute( sql, ( -- cgit 1.5.1 From bfd7a9b65c5e092c6a7ccdd46e59a278b1cbbd57 Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Fri, 29 Oct 2021 19:43:51 +0200 Subject: Fix comments referencing v1.46.0 from PR #10969. (#11212) #10969 was merged after 1.46.0rc1 was cut and will be included in v1.47.0rc1 instead. --- changelog.d/11212.bugfix | 1 + synapse/storage/databases/main/deviceinbox.py | 2 +- .../schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 changelog.d/11212.bugfix (limited to 'synapse/storage/databases') diff --git a/changelog.d/11212.bugfix b/changelog.d/11212.bugfix new file mode 100644 index 0000000000..ba6efab25b --- /dev/null +++ b/changelog.d/11212.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where messages in the `device_inbox` table for deleted devices would persist indefinitely. Contributed by @dklimpel and @JohannesKleine. \ No newline at end of file diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index b0ccab0c9b..d03b5e5a7d 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -594,7 +594,7 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): ) -> int: """A background update that deletes all device_inboxes for deleted devices. - This should only need to be run once (when users upgrade to v1.46.0) + This should only need to be run once (when users upgrade to v1.47.0) Args: progress: JsonDict used to store progress of this background update diff --git a/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql b/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql index efe702f621..fca7290741 100644 --- a/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql +++ b/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql @@ -15,7 +15,7 @@ -- Remove messages from the device_inbox table which were orphaned --- when a device was deleted using Synapse earlier than 1.46.0. +-- when a device was deleted using Synapse earlier than 1.47.0. -- This runs as background task, but may take a bit to finish. INSERT INTO background_updates (ordering, update_name, progress_json) VALUES -- cgit 1.5.1 From 29ffd680bf0d0bf50383ad23404b348bf9cf90aa Mon Sep 17 00:00:00 2001 From: JohannesKleine Date: Mon, 1 Nov 2021 11:40:41 +0100 Subject: Stop synapse from saving messages in device_inbox for hidden devices. (#10097) Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- changelog.d/10097.bugfix | 1 + synapse/storage/databases/main/deviceinbox.py | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 changelog.d/10097.bugfix (limited to 'synapse/storage/databases') diff --git a/changelog.d/10097.bugfix b/changelog.d/10097.bugfix new file mode 100644 index 0000000000..5d3d9587c2 --- /dev/null +++ b/changelog.d/10097.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug which allowed hidden devices to receive to-device messages, resulting in unnecessary database bloat. diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index d03b5e5a7d..25e9c1efe1 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -489,10 +489,12 @@ class DeviceInboxWorkerStore(SQLBaseStore): devices = list(messages_by_device.keys()) if len(devices) == 1 and devices[0] == "*": # Handle wildcard device_ids. + # We exclude hidden devices (such as cross-signing keys) here as they are + # not expected to receive to-device messages. devices = self.db_pool.simple_select_onecol_txn( txn, table="devices", - keyvalues={"user_id": user_id}, + keyvalues={"user_id": user_id, "hidden": False}, retcol="device_id", ) @@ -505,10 +507,12 @@ class DeviceInboxWorkerStore(SQLBaseStore): if not devices: continue + # We exclude hidden devices (such as cross-signing keys) here as they are + # not expected to receive to-device messages. rows = self.db_pool.simple_select_many_txn( txn, table="devices", - keyvalues={"user_id": user_id}, + keyvalues={"user_id": user_id, "hidden": False}, column="device_id", iterable=devices, retcols=("device_id",), -- cgit 1.5.1 From 753720184042e01bf56478d15bd8c8db11da4b69 Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Tue, 2 Nov 2021 11:01:13 +0100 Subject: Add search by room ID and room alias to List Room admin API (#11099) Fixes: #10874 Signed-off-by: Dirk Klimpel dirk@klimpel.org --- changelog.d/11099.feature | 1 + docs/admin_api/rooms.md | 11 +++-- synapse/storage/databases/main/room.py | 29 ++++++----- tests/rest/admin/test_room.py | 88 +++++++++++++++++++--------------- 4 files changed, 76 insertions(+), 53 deletions(-) create mode 100644 changelog.d/11099.feature (limited to 'synapse/storage/databases') diff --git a/changelog.d/11099.feature b/changelog.d/11099.feature new file mode 100644 index 0000000000..c9126d4a9d --- /dev/null +++ b/changelog.d/11099.feature @@ -0,0 +1 @@ +Add search by room ID and room alias to List Room admin API. \ No newline at end of file diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md index 62eeff9e1a..1fc3cc3c42 100644 --- a/docs/admin_api/rooms.md +++ b/docs/admin_api/rooms.md @@ -38,9 +38,14 @@ The following query parameters are available: - `history_visibility` - Rooms are ordered alphabetically by visibility of history of the room. - `state_events` - Rooms are ordered by number of state events. Largest to smallest. * `dir` - Direction of room order. Either `f` for forwards or `b` for backwards. Setting - this value to `b` will reverse the above sort order. Defaults to `f`. -* `search_term` - Filter rooms by their room name. Search term can be contained in any - part of the room name. Defaults to no filtering. + this value to `b` will reverse the above sort order. Defaults to `f`. +* `search_term` - Filter rooms by their room name, canonical alias and room id. + Specifically, rooms are selected if the search term is contained in + - the room's name, + - the local part of the room's canonical alias, or + - the complete (local and server part) room's id (case sensitive). + + Defaults to no filtering. **Response** diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index f879bbe7c7..cefc77fa0f 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -412,22 +412,33 @@ class RoomWorkerStore(SQLBaseStore): limit: maximum amount of rooms to retrieve order_by: the sort order of the returned list reverse_order: whether to reverse the room list - search_term: a string to filter room names by + search_term: a string to filter room names, + canonical alias and room ids by. + Room ID must match exactly. Canonical alias must match a substring of the local part. Returns: A list of room dicts and an integer representing the total number of rooms that exist given this query """ # Filter room names by a string where_statement = "" + search_pattern = [] if search_term: - where_statement = "WHERE LOWER(state.name) LIKE ?" + where_statement = """ + WHERE LOWER(state.name) LIKE ? + OR LOWER(state.canonical_alias) LIKE ? + OR state.room_id = ? + """ # Our postgres db driver converts ? -> %s in SQL strings as that's the # placeholder for postgres. # HOWEVER, if you put a % into your SQL then everything goes wibbly. # To get around this, we're going to surround search_term with %'s # before giving it to the database in python instead - search_term = "%" + search_term.lower() + "%" + search_pattern = [ + "%" + search_term.lower() + "%", + "#%" + search_term.lower() + "%:%", + search_term, + ] # Set ordering if RoomSortOrder(order_by) == RoomSortOrder.SIZE: @@ -519,12 +530,9 @@ class RoomWorkerStore(SQLBaseStore): ) def _get_rooms_paginate_txn(txn): - # Execute the data query - sql_values = (limit, start) - if search_term: - # Add the search term into the WHERE clause - sql_values = (search_term,) + sql_values - txn.execute(info_sql, sql_values) + # Add the search term into the WHERE clause + # and execute the data query + txn.execute(info_sql, search_pattern + [limit, start]) # Refactor room query data into a structured dictionary rooms = [] @@ -551,8 +559,7 @@ class RoomWorkerStore(SQLBaseStore): # Execute the count query # Add the search term into the WHERE clause if present - sql_values = (search_term,) if search_term else () - txn.execute(count_sql, sql_values) + txn.execute(count_sql, search_pattern) room_count = txn.fetchone() return rooms, room_count[0] diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index b62a7248e8..46116644ce 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -680,36 +680,6 @@ class RoomTestCase(unittest.HomeserverTestCase): reversing the order, etc. """ - def _set_canonical_alias(room_id: str, test_alias: str, admin_user_tok: str): - # Create a new alias to this room - url = "/_matrix/client/r0/directory/room/%s" % ( - urllib.parse.quote(test_alias), - ) - channel = self.make_request( - "PUT", - url.encode("ascii"), - {"room_id": room_id}, - access_token=admin_user_tok, - ) - self.assertEqual( - 200, int(channel.result["code"]), msg=channel.result["body"] - ) - - # Set this new alias as the canonical alias for this room - self.helper.send_state( - room_id, - "m.room.aliases", - {"aliases": [test_alias]}, - tok=admin_user_tok, - state_key="test", - ) - self.helper.send_state( - room_id, - "m.room.canonical_alias", - {"alias": test_alias}, - tok=admin_user_tok, - ) - def _order_test( order_type: str, expected_room_list: List[str], @@ -781,9 +751,9 @@ class RoomTestCase(unittest.HomeserverTestCase): ) # Set room canonical room aliases - _set_canonical_alias(room_id_1, "#A_alias:test", self.admin_user_tok) - _set_canonical_alias(room_id_2, "#B_alias:test", self.admin_user_tok) - _set_canonical_alias(room_id_3, "#C_alias:test", self.admin_user_tok) + self._set_canonical_alias(room_id_1, "#A_alias:test", self.admin_user_tok) + self._set_canonical_alias(room_id_2, "#B_alias:test", self.admin_user_tok) + self._set_canonical_alias(room_id_3, "#C_alias:test", self.admin_user_tok) # Set room member size in the reverse order. room 1 -> 1 member, 2 -> 2, 3 -> 3 user_1 = self.register_user("bob1", "pass") @@ -850,7 +820,7 @@ class RoomTestCase(unittest.HomeserverTestCase): room_id_2 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok) room_name_1 = "something" - room_name_2 = "else" + room_name_2 = "LoremIpsum" # Set the name for each room self.helper.send_state( @@ -866,6 +836,8 @@ class RoomTestCase(unittest.HomeserverTestCase): tok=self.admin_user_tok, ) + self._set_canonical_alias(room_id_1, "#Room_Alias1:test", self.admin_user_tok) + def _search_test( expected_room_id: Optional[str], search_term: str, @@ -914,24 +886,36 @@ class RoomTestCase(unittest.HomeserverTestCase): r = rooms[0] self.assertEqual(expected_room_id, r["room_id"]) - # Perform search tests + # Test searching by room name _search_test(room_id_1, "something") _search_test(room_id_1, "thing") - _search_test(room_id_2, "else") - _search_test(room_id_2, "se") + _search_test(room_id_2, "LoremIpsum") + _search_test(room_id_2, "lorem") # Test case insensitive _search_test(room_id_1, "SOMETHING") _search_test(room_id_1, "THING") - _search_test(room_id_2, "ELSE") - _search_test(room_id_2, "SE") + _search_test(room_id_2, "LOREMIPSUM") + _search_test(room_id_2, "LOREM") _search_test(None, "foo") _search_test(None, "bar") _search_test(None, "", expected_http_code=400) + # Test that the whole room id returns the room + _search_test(room_id_1, room_id_1) + # Test that the search by room_id is case sensitive + _search_test(None, room_id_1.lower()) + # Test search part of local part of room id do not match + _search_test(None, room_id_1[1:10]) + + # Test that whole room alias return no result, because of domain + _search_test(None, "#Room_Alias1:test") + # Test search local part of alias + _search_test(room_id_1, "alias1") + def test_search_term_non_ascii(self): """Test that searching for a room with non-ASCII characters works correctly""" @@ -1114,6 +1098,32 @@ class RoomTestCase(unittest.HomeserverTestCase): # the create_room already does the right thing, so no need to verify that we got # the state events it created. + def _set_canonical_alias(self, room_id: str, test_alias: str, admin_user_tok: str): + # Create a new alias to this room + url = "/_matrix/client/r0/directory/room/%s" % (urllib.parse.quote(test_alias),) + channel = self.make_request( + "PUT", + url.encode("ascii"), + {"room_id": room_id}, + access_token=admin_user_tok, + ) + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + + # Set this new alias as the canonical alias for this room + self.helper.send_state( + room_id, + "m.room.aliases", + {"aliases": [test_alias]}, + tok=admin_user_tok, + state_key="test", + ) + self.helper.send_state( + room_id, + "m.room.canonical_alias", + {"alias": test_alias}, + tok=admin_user_tok, + ) + class JoinAliasRoomTestCase(unittest.HomeserverTestCase): -- cgit 1.5.1 From c9c3aea9b189cb606d7ec2905dad2c87acc039ef Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 2 Nov 2021 10:39:02 +0000 Subject: Fix providing a `RoomStreamToken` instance to `_notify_app_services_ephemeral` (#11137) Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- changelog.d/11137.misc | 1 + synapse/handlers/appservice.py | 22 +++++++++++++---- synapse/notifier.py | 38 +++++++----------------------- synapse/storage/databases/main/devices.py | 4 ++-- synapse/storage/databases/main/presence.py | 2 +- 5 files changed, 30 insertions(+), 37 deletions(-) create mode 100644 changelog.d/11137.misc (limited to 'synapse/storage/databases') diff --git a/changelog.d/11137.misc b/changelog.d/11137.misc new file mode 100644 index 0000000000..f0d6476f48 --- /dev/null +++ b/changelog.d/11137.misc @@ -0,0 +1 @@ +Remove and document unnecessary `RoomStreamToken` checks in application service ephemeral event code. \ No newline at end of file diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 36c206dae6..67f8ffcaff 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -182,7 +182,7 @@ class ApplicationServicesHandler: def notify_interested_services_ephemeral( self, stream_key: str, - new_token: Optional[int], + new_token: Union[int, RoomStreamToken], users: Optional[Collection[Union[str, UserID]]] = None, ) -> None: """ @@ -203,7 +203,7 @@ class ApplicationServicesHandler: Appservices will only receive ephemeral events that fall within their registered user and room namespaces. - new_token: The latest stream token. + new_token: The stream token of the event. users: The users that should be informed of the new event, if any. """ if not self.notify_appservices: @@ -212,6 +212,19 @@ class ApplicationServicesHandler: if stream_key not in ("typing_key", "receipt_key", "presence_key"): return + # Assert that new_token is an integer (and not a RoomStreamToken). + # All of the supported streams that this function handles use an + # integer to track progress (rather than a RoomStreamToken - a + # vector clock implementation) as they don't support multiple + # stream writers. + # + # As a result, we simply assert that new_token is an integer. + # If we do end up needing to pass a RoomStreamToken down here + # in the future, using RoomStreamToken.stream (the minimum stream + # position) to convert to an ascending integer value should work. + # Additional context: https://github.com/matrix-org/synapse/pull/11137 + assert isinstance(new_token, int) + services = [ service for service in self.store.get_app_services() @@ -231,14 +244,13 @@ class ApplicationServicesHandler: self, services: List[ApplicationService], stream_key: str, - new_token: Optional[int], + new_token: int, users: Collection[Union[str, UserID]], ) -> None: logger.debug("Checking interested services for %s" % (stream_key)) with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: - # Only handle typing if we have the latest token - if stream_key == "typing_key" and new_token is not None: + if stream_key == "typing_key": # Note that we don't persist the token (via set_type_stream_id_for_appservice) # for typing_key due to performance reasons and due to their highly # ephemeral nature. diff --git a/synapse/notifier.py b/synapse/notifier.py index 1882fffd2a..60e5409895 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -383,29 +383,6 @@ class Notifier: except Exception: logger.exception("Error notifying application services of event") - def _notify_app_services_ephemeral( - self, - stream_key: str, - new_token: Union[int, RoomStreamToken], - users: Optional[Collection[Union[str, UserID]]] = None, - ) -> None: - """Notify application services of ephemeral event activity. - - Args: - stream_key: The stream the event came from. - new_token: The value of the new stream token. - users: The users that should be informed of the new event, if any. - """ - try: - stream_token = None - if isinstance(new_token, int): - stream_token = new_token - self.appservice_handler.notify_interested_services_ephemeral( - stream_key, stream_token, users or [] - ) - except Exception: - logger.exception("Error notifying application services of event") - def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): try: self._pusher_pool.on_new_notifications(max_room_stream_token) @@ -467,12 +444,15 @@ class Notifier: self.notify_replication() - # Notify appservices - self._notify_app_services_ephemeral( - stream_key, - new_token, - users, - ) + # Notify appservices. + try: + self.appservice_handler.notify_interested_services_ephemeral( + stream_key, + new_token, + users, + ) + except Exception: + logger.exception("Error notifying application services of event") def on_new_replication_data(self) -> None: """Used to inform replication listeners that something has happened diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index b15cd030e0..9ccc66e589 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -427,7 +427,7 @@ class DeviceWorkerStore(SQLBaseStore): user_ids: the users who were signed Returns: - THe new stream ID. + The new stream ID. """ async with self._device_list_id_gen.get_next() as stream_id: @@ -1322,7 +1322,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): async def add_device_change_to_streams( self, user_id: str, device_ids: Collection[str], hosts: List[str] - ): + ) -> int: """Persist that a user's devices have been updated, and which hosts (if any) should be poked. """ diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 12cf6995eb..cc0eebdb46 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -92,7 +92,7 @@ class PresenceStore(PresenceBackgroundUpdateStore): prefilled_cache=presence_cache_prefill, ) - async def update_presence(self, presence_states): + async def update_presence(self, presence_states) -> Tuple[int, int]: assert self._can_persist_presence stream_ordering_manager = self._presence_id_gen.get_next_mult( -- cgit 1.5.1 From 4535532526581834ab798996ffe73f6d19c25123 Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Tue, 2 Nov 2021 14:18:30 +0100 Subject: Delete messages for hidden devices from `device_inbox` (#11199) --- changelog.d/11199.bugfix | 1 + synapse/storage/databases/main/deviceinbox.py | 89 ++++++++++++++++++++++ .../03remove_hidden_devices_from_device_inbox.sql | 22 ++++++ tests/storage/databases/main/test_deviceinbox.py | 74 ++++++++++++++++++ 4 files changed, 186 insertions(+) create mode 100644 changelog.d/11199.bugfix create mode 100644 synapse/storage/schema/main/delta/65/03remove_hidden_devices_from_device_inbox.sql (limited to 'synapse/storage/databases') diff --git a/changelog.d/11199.bugfix b/changelog.d/11199.bugfix new file mode 100644 index 0000000000..dc3ea8d515 --- /dev/null +++ b/changelog.d/11199.bugfix @@ -0,0 +1 @@ +Delete `to_device` messages for hidden devices that will never be read, reducing database size. \ No newline at end of file diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 25e9c1efe1..264e625bd7 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -561,6 +561,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): class DeviceInboxBackgroundUpdateStore(SQLBaseStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox" + REMOVE_HIDDEN_DEVICES = "remove_hidden_devices_from_device_inbox" def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) @@ -581,6 +582,11 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): self._remove_deleted_devices_from_device_inbox, ) + self.db_pool.updates.register_background_update_handler( + self.REMOVE_HIDDEN_DEVICES, + self._remove_hidden_devices_from_device_inbox, + ) + async def _background_drop_index_device_inbox(self, progress, batch_size): def reindex_txn(conn): txn = conn.cursor() @@ -676,6 +682,89 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): return number_deleted + async def _remove_hidden_devices_from_device_inbox( + self, progress: JsonDict, batch_size: int + ) -> int: + """A background update that deletes all device_inboxes for hidden devices. + + This should only need to be run once (when users upgrade to v1.47.0) + + Args: + progress: JsonDict used to store progress of this background update + batch_size: the maximum number of rows to retrieve in a single select query + + Returns: + The number of deleted rows + """ + + def _remove_hidden_devices_from_device_inbox_txn( + txn: LoggingTransaction, + ) -> int: + """stream_id is not unique + we need to use an inclusive `stream_id >= ?` clause, + since we might not have deleted all hidden device messages for the stream_id + returned from the previous query + + Then delete only rows matching the `(user_id, device_id, stream_id)` tuple, + to avoid problems of deleting a large number of rows all at once + due to a single device having lots of device messages. + """ + + last_stream_id = progress.get("stream_id", 0) + + sql = """ + SELECT device_id, user_id, stream_id + FROM device_inbox + WHERE + stream_id >= ? + AND (device_id, user_id) IN ( + SELECT device_id, user_id FROM devices WHERE hidden = ? + ) + ORDER BY stream_id + LIMIT ? + """ + + txn.execute(sql, (last_stream_id, True, batch_size)) + rows = txn.fetchall() + + num_deleted = 0 + for row in rows: + num_deleted += self.db_pool.simple_delete_txn( + txn, + "device_inbox", + {"device_id": row[0], "user_id": row[1], "stream_id": row[2]}, + ) + + if rows: + # We don't just save the `stream_id` in progress as + # otherwise it can happen in large deployments that + # no change of status is visible in the log file, as + # it may be that the stream_id does not change in several runs + self.db_pool.updates._background_update_progress_txn( + txn, + self.REMOVE_HIDDEN_DEVICES, + { + "device_id": rows[-1][0], + "user_id": rows[-1][1], + "stream_id": rows[-1][2], + }, + ) + + return num_deleted + + number_deleted = await self.db_pool.runInteraction( + "_remove_hidden_devices_from_device_inbox", + _remove_hidden_devices_from_device_inbox_txn, + ) + + # The task is finished when no more lines are deleted. + if not number_deleted: + await self.db_pool.updates._end_background_update( + self.REMOVE_HIDDEN_DEVICES + ) + + return number_deleted + class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): pass diff --git a/synapse/storage/schema/main/delta/65/03remove_hidden_devices_from_device_inbox.sql b/synapse/storage/schema/main/delta/65/03remove_hidden_devices_from_device_inbox.sql new file mode 100644 index 0000000000..7b3592dcf0 --- /dev/null +++ b/synapse/storage/schema/main/delta/65/03remove_hidden_devices_from_device_inbox.sql @@ -0,0 +1,22 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- Remove messages from the device_inbox table which were orphaned +-- because a device was hidden using Synapse earlier than 1.47.0. +-- This runs as background task, but may take a bit to finish. + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6503, 'remove_hidden_devices_from_device_inbox', '{}'); diff --git a/tests/storage/databases/main/test_deviceinbox.py b/tests/storage/databases/main/test_deviceinbox.py index 4cfd2677f7..4b67bd15b7 100644 --- a/tests/storage/databases/main/test_deviceinbox.py +++ b/tests/storage/databases/main/test_deviceinbox.py @@ -88,3 +88,77 @@ class DeviceInboxBackgroundUpdateStoreTestCase(HomeserverTestCase): ) self.assertEqual(1, len(res)) self.assertEqual(res[0], "cur_device") + + def test_background_remove_hidden_devices_from_device_inbox(self): + """Test that the background task to delete hidden devices + from device_inboxes works properly.""" + + # create a valid device + self.get_success( + self.store.store_device(self.user_id, "cur_device", "display_name") + ) + + # create a hidden device + self.get_success( + self.store.db_pool.simple_insert( + "devices", + values={ + "user_id": self.user_id, + "device_id": "hidden_device", + "display_name": "hidden_display_name", + "hidden": True, + }, + ) + ) + + # Add device_inbox to devices + self.get_success( + self.store.db_pool.simple_insert( + "device_inbox", + { + "user_id": self.user_id, + "device_id": "cur_device", + "stream_id": 1, + "message_json": "{}", + }, + ) + ) + self.get_success( + self.store.db_pool.simple_insert( + "device_inbox", + { + "user_id": self.user_id, + "device_id": "hidden_device", + "stream_id": 2, + "message_json": "{}", + }, + ) + ) + + # Insert and run the background update. + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": "remove_hidden_devices_from_device_inbox", + "progress_json": "{}", + }, + ) + ) + + # ... and tell the DataStore that it hasn't finished all updates yet + self.store.db_pool.updates._all_done = False + + self.wait_for_background_updates() + + # Make sure the background task deleted hidden devices from device_inbox + res = self.get_success( + self.store.db_pool.simple_select_onecol( + table="device_inbox", + keyvalues={}, + retcol="device_id", + desc="get_device_id_from_device_inbox", + ) + ) + self.assertEqual(1, len(res)) + self.assertEqual(res[0], "cur_device") -- cgit 1.5.1 From c01bc5f43d1c7d0a25f397b542ced57894395519 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 2 Nov 2021 09:55:52 -0400 Subject: Add remaining type hints to `synapse.events`. (#11098) --- changelog.d/11098.misc | 1 + mypy.ini | 8 +- synapse/events/__init__.py | 227 +++++++++++++++++---------- synapse/events/validator.py | 2 +- synapse/handlers/federation_event.py | 2 +- synapse/handlers/message.py | 14 +- synapse/handlers/room.py | 2 +- synapse/handlers/room_batch.py | 2 +- synapse/handlers/room_member.py | 4 +- synapse/push/bulk_push_rule_evaluator.py | 4 +- synapse/push/push_rule_evaluator.py | 10 +- synapse/rest/client/room_batch.py | 2 +- synapse/state/__init__.py | 2 +- synapse/storage/databases/main/events.py | 7 +- synapse/storage/databases/main/roommember.py | 8 +- 15 files changed, 185 insertions(+), 110 deletions(-) create mode 100644 changelog.d/11098.misc (limited to 'synapse/storage/databases') diff --git a/changelog.d/11098.misc b/changelog.d/11098.misc new file mode 100644 index 0000000000..1e337bee54 --- /dev/null +++ b/changelog.d/11098.misc @@ -0,0 +1 @@ +Add type hints to `synapse.events`. diff --git a/mypy.ini b/mypy.ini index 119a7d8c91..600402a5d3 100644 --- a/mypy.ini +++ b/mypy.ini @@ -22,13 +22,7 @@ files = synapse/config, synapse/crypto, synapse/event_auth.py, - synapse/events/builder.py, - synapse/events/presence_router.py, - synapse/events/snapshot.py, - synapse/events/spamcheck.py, - synapse/events/third_party_rules.py, - synapse/events/utils.py, - synapse/events/validator.py, + synapse/events, synapse/federation, synapse/groups, synapse/handlers, diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 157669ea88..38f3cf4d33 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -16,8 +16,23 @@ import abc import os -from typing import Dict, Optional, Tuple, Type - +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Generic, + Iterable, + List, + Optional, + Sequence, + Tuple, + Type, + TypeVar, + Union, + overload, +) + +from typing_extensions import Literal from unpaddedbase64 import encode_base64 from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions @@ -26,6 +41,9 @@ from synapse.util.caches import intern_dict from synapse.util.frozenutils import freeze from synapse.util.stringutils import strtobool +if TYPE_CHECKING: + from synapse.events.builder import EventBuilder + # Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents # bugs where we accidentally share e.g. signature dicts. However, converting a # dict to frozen_dicts is expensive. @@ -37,7 +55,23 @@ from synapse.util.stringutils import strtobool USE_FROZEN_DICTS = strtobool(os.environ.get("SYNAPSE_USE_FROZEN_DICTS", "0")) -class DictProperty: +T = TypeVar("T") + + +# DictProperty (and DefaultDictProperty) require the classes they're used with to +# have a _dict property to pull properties from. +# +# TODO _DictPropertyInstance should not include EventBuilder but due to +# https://github.com/python/mypy/issues/5570 it thinks the DictProperty and +# DefaultDictProperty get applied to EventBuilder when it is in a Union with +# EventBase. This is the least invasive hack to get mypy to comply. +# +# Note that DictProperty/DefaultDictProperty cannot actually be used with +# EventBuilder as it lacks a _dict property. +_DictPropertyInstance = Union["_EventInternalMetadata", "EventBase", "EventBuilder"] + + +class DictProperty(Generic[T]): """An object property which delegates to the `_dict` within its parent object.""" __slots__ = ["key"] @@ -45,12 +79,33 @@ class DictProperty: def __init__(self, key: str): self.key = key - def __get__(self, instance, owner=None): + @overload + def __get__( + self, + instance: Literal[None], + owner: Optional[Type[_DictPropertyInstance]] = None, + ) -> "DictProperty": + ... + + @overload + def __get__( + self, + instance: _DictPropertyInstance, + owner: Optional[Type[_DictPropertyInstance]] = None, + ) -> T: + ... + + def __get__( + self, + instance: Optional[_DictPropertyInstance], + owner: Optional[Type[_DictPropertyInstance]] = None, + ) -> Union[T, "DictProperty"]: # if the property is accessed as a class property rather than an instance # property, return the property itself rather than the value if instance is None: return self try: + assert isinstance(instance, (EventBase, _EventInternalMetadata)) return instance._dict[self.key] except KeyError as e1: # We want this to look like a regular attribute error (mostly so that @@ -65,10 +120,12 @@ class DictProperty: "'%s' has no '%s' property" % (type(instance), self.key) ) from e1.__context__ - def __set__(self, instance, v): + def __set__(self, instance: _DictPropertyInstance, v: T) -> None: + assert isinstance(instance, (EventBase, _EventInternalMetadata)) instance._dict[self.key] = v - def __delete__(self, instance): + def __delete__(self, instance: _DictPropertyInstance) -> None: + assert isinstance(instance, (EventBase, _EventInternalMetadata)) try: del instance._dict[self.key] except KeyError as e1: @@ -77,7 +134,7 @@ class DictProperty: ) from e1.__context__ -class DefaultDictProperty(DictProperty): +class DefaultDictProperty(DictProperty, Generic[T]): """An extension of DictProperty which provides a default if the property is not present in the parent's _dict. @@ -86,13 +143,34 @@ class DefaultDictProperty(DictProperty): __slots__ = ["default"] - def __init__(self, key, default): + def __init__(self, key: str, default: T): super().__init__(key) self.default = default - def __get__(self, instance, owner=None): + @overload + def __get__( + self, + instance: Literal[None], + owner: Optional[Type[_DictPropertyInstance]] = None, + ) -> "DefaultDictProperty": + ... + + @overload + def __get__( + self, + instance: _DictPropertyInstance, + owner: Optional[Type[_DictPropertyInstance]] = None, + ) -> T: + ... + + def __get__( + self, + instance: Optional[_DictPropertyInstance], + owner: Optional[Type[_DictPropertyInstance]] = None, + ) -> Union[T, "DefaultDictProperty"]: if instance is None: return self + assert isinstance(instance, (EventBase, _EventInternalMetadata)) return instance._dict.get(self.key, self.default) @@ -111,22 +189,22 @@ class _EventInternalMetadata: # in the DAG) self.outlier = False - out_of_band_membership: bool = DictProperty("out_of_band_membership") - send_on_behalf_of: str = DictProperty("send_on_behalf_of") - recheck_redaction: bool = DictProperty("recheck_redaction") - soft_failed: bool = DictProperty("soft_failed") - proactively_send: bool = DictProperty("proactively_send") - redacted: bool = DictProperty("redacted") - txn_id: str = DictProperty("txn_id") - token_id: int = DictProperty("token_id") - historical: bool = DictProperty("historical") + out_of_band_membership: DictProperty[bool] = DictProperty("out_of_band_membership") + send_on_behalf_of: DictProperty[str] = DictProperty("send_on_behalf_of") + recheck_redaction: DictProperty[bool] = DictProperty("recheck_redaction") + soft_failed: DictProperty[bool] = DictProperty("soft_failed") + proactively_send: DictProperty[bool] = DictProperty("proactively_send") + redacted: DictProperty[bool] = DictProperty("redacted") + txn_id: DictProperty[str] = DictProperty("txn_id") + token_id: DictProperty[int] = DictProperty("token_id") + historical: DictProperty[bool] = DictProperty("historical") # XXX: These are set by StreamWorkerStore._set_before_and_after. # I'm pretty sure that these are never persisted to the database, so shouldn't # be here - before: RoomStreamToken = DictProperty("before") - after: RoomStreamToken = DictProperty("after") - order: Tuple[int, int] = DictProperty("order") + before: DictProperty[RoomStreamToken] = DictProperty("before") + after: DictProperty[RoomStreamToken] = DictProperty("after") + order: DictProperty[Tuple[int, int]] = DictProperty("order") def get_dict(self) -> JsonDict: return dict(self._dict) @@ -162,9 +240,6 @@ class _EventInternalMetadata: If the sender of the redaction event is allowed to redact any event due to auth rules, then this will always return false. - - Returns: - bool """ return self._dict.get("recheck_redaction", False) @@ -176,32 +251,23 @@ class _EventInternalMetadata: sent to clients. 2. They should not be added to the forward extremities (and therefore not to current state). - - Returns: - bool """ return self._dict.get("soft_failed", False) - def should_proactively_send(self): + def should_proactively_send(self) -> bool: """Whether the event, if ours, should be sent to other clients and servers. This is used for sending dummy events internally. Servers and clients can still explicitly fetch the event. - - Returns: - bool """ return self._dict.get("proactively_send", True) - def is_redacted(self): + def is_redacted(self) -> bool: """Whether the event has been redacted. This is used for efficiently checking whether an event has been marked as redacted without needing to make another database call. - - Returns: - bool """ return self._dict.get("redacted", False) @@ -241,29 +307,31 @@ class EventBase(metaclass=abc.ABCMeta): self.internal_metadata = _EventInternalMetadata(internal_metadata_dict) - auth_events = DictProperty("auth_events") - depth = DictProperty("depth") - content = DictProperty("content") - hashes = DictProperty("hashes") - origin = DictProperty("origin") - origin_server_ts = DictProperty("origin_server_ts") - prev_events = DictProperty("prev_events") - redacts = DefaultDictProperty("redacts", None) - room_id = DictProperty("room_id") - sender = DictProperty("sender") - state_key = DictProperty("state_key") - type = DictProperty("type") - user_id = DictProperty("sender") + depth: DictProperty[int] = DictProperty("depth") + content: DictProperty[JsonDict] = DictProperty("content") + hashes: DictProperty[Dict[str, str]] = DictProperty("hashes") + origin: DictProperty[str] = DictProperty("origin") + origin_server_ts: DictProperty[int] = DictProperty("origin_server_ts") + redacts: DefaultDictProperty[Optional[str]] = DefaultDictProperty("redacts", None) + room_id: DictProperty[str] = DictProperty("room_id") + sender: DictProperty[str] = DictProperty("sender") + # TODO state_key should be Optional[str], this is generally asserted in Synapse + # by calling is_state() first (which ensures this), but it is hard (not possible?) + # to properly annotate that calling is_state() asserts that state_key exists + # and is non-None. + state_key: DictProperty[str] = DictProperty("state_key") + type: DictProperty[str] = DictProperty("type") + user_id: DictProperty[str] = DictProperty("sender") @property def event_id(self) -> str: raise NotImplementedError() @property - def membership(self): + def membership(self) -> str: return self.content["membership"] - def is_state(self): + def is_state(self) -> bool: return hasattr(self, "state_key") and self.state_key is not None def get_dict(self) -> JsonDict: @@ -272,13 +340,13 @@ class EventBase(metaclass=abc.ABCMeta): return d - def get(self, key, default=None): + def get(self, key: str, default: Optional[Any] = None) -> Any: return self._dict.get(key, default) - def get_internal_metadata_dict(self): + def get_internal_metadata_dict(self) -> JsonDict: return self.internal_metadata.get_dict() - def get_pdu_json(self, time_now=None) -> JsonDict: + def get_pdu_json(self, time_now: Optional[int] = None) -> JsonDict: pdu_json = self.get_dict() if time_now is not None and "age_ts" in pdu_json["unsigned"]: @@ -305,49 +373,46 @@ class EventBase(metaclass=abc.ABCMeta): return template_json - def __set__(self, instance, value): - raise AttributeError("Unrecognized attribute %s" % (instance,)) - - def __getitem__(self, field): + def __getitem__(self, field: str) -> Optional[Any]: return self._dict[field] - def __contains__(self, field): + def __contains__(self, field: str) -> bool: return field in self._dict - def items(self): + def items(self) -> List[Tuple[str, Optional[Any]]]: return list(self._dict.items()) - def keys(self): + def keys(self) -> Iterable[str]: return self._dict.keys() - def prev_event_ids(self): + def prev_event_ids(self) -> Sequence[str]: """Returns the list of prev event IDs. The order matches the order specified in the event, though there is no meaning to it. Returns: - list[str]: The list of event IDs of this event's prev_events + The list of event IDs of this event's prev_events """ - return [e for e, _ in self.prev_events] + return [e for e, _ in self._dict["prev_events"]] - def auth_event_ids(self): + def auth_event_ids(self) -> Sequence[str]: """Returns the list of auth event IDs. The order matches the order specified in the event, though there is no meaning to it. Returns: - list[str]: The list of event IDs of this event's auth_events + The list of event IDs of this event's auth_events """ - return [e for e, _ in self.auth_events] + return [e for e, _ in self._dict["auth_events"]] - def freeze(self): + def freeze(self) -> None: """'Freeze' the event dict, so it cannot be modified by accident""" # this will be a no-op if the event dict is already frozen. self._dict = freeze(self._dict) - def __str__(self): + def __str__(self) -> str: return self.__repr__() - def __repr__(self): + def __repr__(self) -> str: rejection = f"REJECTED={self.rejected_reason}, " if self.rejected_reason else "" return ( @@ -443,7 +508,7 @@ class FrozenEventV2(EventBase): else: frozen_dict = event_dict - self._event_id = None + self._event_id: Optional[str] = None super().__init__( frozen_dict, @@ -455,7 +520,7 @@ class FrozenEventV2(EventBase): ) @property - def event_id(self): + def event_id(self) -> str: # We have to import this here as otherwise we get an import loop which # is hard to break. from synapse.crypto.event_signing import compute_event_reference_hash @@ -465,23 +530,23 @@ class FrozenEventV2(EventBase): self._event_id = "$" + encode_base64(compute_event_reference_hash(self)[1]) return self._event_id - def prev_event_ids(self): + def prev_event_ids(self) -> Sequence[str]: """Returns the list of prev event IDs. The order matches the order specified in the event, though there is no meaning to it. Returns: - list[str]: The list of event IDs of this event's prev_events + The list of event IDs of this event's prev_events """ - return self.prev_events + return self._dict["prev_events"] - def auth_event_ids(self): + def auth_event_ids(self) -> Sequence[str]: """Returns the list of auth event IDs. The order matches the order specified in the event, though there is no meaning to it. Returns: - list[str]: The list of event IDs of this event's auth_events + The list of event IDs of this event's auth_events """ - return self.auth_events + return self._dict["auth_events"] class FrozenEventV3(FrozenEventV2): @@ -490,7 +555,7 @@ class FrozenEventV3(FrozenEventV2): format_version = EventFormatVersions.V3 # All events of this type are V3 @property - def event_id(self): + def event_id(self) -> str: # We have to import this here as otherwise we get an import loop which # is hard to break. from synapse.crypto.event_signing import compute_event_reference_hash @@ -503,12 +568,14 @@ class FrozenEventV3(FrozenEventV2): return self._event_id -def _event_type_from_format_version(format_version: int) -> Type[EventBase]: +def _event_type_from_format_version( + format_version: int, +) -> Type[Union[FrozenEvent, FrozenEventV2, FrozenEventV3]]: """Returns the python type to use to construct an Event object for the given event format version. Args: - format_version (int): The event format version + format_version: The event format version Returns: type: A type that can be initialized as per the initializer of diff --git a/synapse/events/validator.py b/synapse/events/validator.py index 4d459c17f1..cf86934968 100644 --- a/synapse/events/validator.py +++ b/synapse/events/validator.py @@ -55,7 +55,7 @@ class EventValidator: ] for k in required: - if not hasattr(event, k): + if k not in event: raise SynapseError(400, "Event does not have key %s" % (k,)) # Check that the following keys have string values diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index e617db4c0d..1a1cd93b1a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1643,7 +1643,7 @@ class FederationEventHandler: event: the event whose auth_events we want Returns: - all of the events in `event.auth_events`, after deduplication + all of the events listed in `event.auth_events_ids`, after deduplication Raises: AuthError if we were unable to fetch the auth_events for any reason. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4a0fccfcc6..b7bc187169 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1318,6 +1318,8 @@ class EventCreationHandler: # user is actually admin or not). is_admin_redaction = False if event.type == EventTypes.Redaction: + assert event.redacts is not None + original_event = await self.store.get_event( event.redacts, redact_behaviour=EventRedactBehaviour.AS_IS, @@ -1413,6 +1415,8 @@ class EventCreationHandler: ) if event.type == EventTypes.Redaction: + assert event.redacts is not None + original_event = await self.store.get_event( event.redacts, redact_behaviour=EventRedactBehaviour.AS_IS, @@ -1500,11 +1504,13 @@ class EventCreationHandler: next_batch_id = event.content.get( EventContentFields.MSC2716_NEXT_BATCH_ID ) - conflicting_insertion_event_id = ( - await self.store.get_insertion_event_by_batch_id( - event.room_id, next_batch_id + conflicting_insertion_event_id = None + if next_batch_id: + conflicting_insertion_event_id = ( + await self.store.get_insertion_event_by_batch_id( + event.room_id, next_batch_id + ) ) - ) if conflicting_insertion_event_id is not None: # The current insertion event that we're processing is invalid # because an insertion event already exists in the room with the diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 99e9b37344..969eb3b9b0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -525,7 +525,7 @@ class RoomCreationHandler: ): await self.room_member_handler.update_membership( requester, - UserID.from_string(old_event["state_key"]), + UserID.from_string(old_event.state_key), new_room_id, "ban", ratelimit=False, diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py index 2f5a3e4d19..0723286383 100644 --- a/synapse/handlers/room_batch.py +++ b/synapse/handlers/room_batch.py @@ -355,7 +355,7 @@ class RoomBatchHandler: for (event, context) in reversed(events_to_persist): await self.event_creation_handler.handle_new_client_event( await self.create_requester_for_user_id_from_app_service( - event["sender"], app_service_requester.app_service + event.sender, app_service_requester.app_service ), event=event, context=context, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 74e6c7eca6..08244b690d 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -1669,7 +1669,9 @@ class RoomMemberMasterHandler(RoomMemberHandler): # # the prev_events consist solely of the previous membership event. prev_event_ids = [previous_membership_event.event_id] - auth_event_ids = previous_membership_event.auth_event_ids() + prev_event_ids + auth_event_ids = ( + list(previous_membership_event.auth_event_ids()) + prev_event_ids + ) event, context = await self.event_creation_handler.create_event( requester, diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 0622a37ae8..009d8e77b0 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -232,6 +232,8 @@ class BulkPushRuleEvaluator: # that user, as they might not be already joined. if event.type == EventTypes.Member and event.state_key == uid: display_name = event.content.get("displayname", None) + if not isinstance(display_name, str): + display_name = None if count_as_unread: # Add an element for the current user if the event needs to be marked as @@ -268,7 +270,7 @@ def _condition_checker( evaluator: PushRuleEvaluatorForEvent, conditions: List[dict], uid: str, - display_name: str, + display_name: Optional[str], cache: Dict[str, bool], ) -> bool: for cond in conditions: diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 7a8dc63976..7f68092ec5 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -18,7 +18,7 @@ import re from typing import Any, Dict, List, Optional, Pattern, Tuple, Union from synapse.events import EventBase -from synapse.types import UserID +from synapse.types import JsonDict, UserID from synapse.util import glob_to_regex, re_word_boundary from synapse.util.caches.lrucache import LruCache @@ -129,7 +129,7 @@ class PushRuleEvaluatorForEvent: self._value_cache = _flatten_dict(event) def matches( - self, condition: Dict[str, Any], user_id: str, display_name: str + self, condition: Dict[str, Any], user_id: str, display_name: Optional[str] ) -> bool: if condition["kind"] == "event_match": return self._event_match(condition, user_id) @@ -172,7 +172,7 @@ class PushRuleEvaluatorForEvent: return _glob_matches(pattern, haystack) - def _contains_display_name(self, display_name: str) -> bool: + def _contains_display_name(self, display_name: Optional[str]) -> bool: if not display_name: return False @@ -222,7 +222,7 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool: def _flatten_dict( - d: Union[EventBase, dict], + d: Union[EventBase, JsonDict], prefix: Optional[List[str]] = None, result: Optional[Dict[str, str]] = None, ) -> Dict[str, str]: @@ -233,7 +233,7 @@ def _flatten_dict( for key, value in d.items(): if isinstance(value, str): result[".".join(prefix + [key])] = value.lower() - elif hasattr(value, "items"): + elif isinstance(value, dict): _flatten_dict(value, prefix=(prefix + [key]), result=result) return result diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index 99f8156ad0..ab9a743bba 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -191,7 +191,7 @@ class RoomBatchSendEventRestServlet(RestServlet): depth=inherited_depth, ) - batch_id_to_connect_to = base_insertion_event["content"][ + batch_id_to_connect_to = base_insertion_event.content[ EventContentFields.MSC2716_NEXT_BATCH_ID ] diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 98a0239759..1605411b00 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -247,7 +247,7 @@ class StateHandler: return await self.get_hosts_in_room_at_events(room_id, event_ids) async def get_hosts_in_room_at_events( - self, room_id: str, event_ids: List[str] + self, room_id: str, event_ids: Iterable[str] ) -> Set[str]: """Get the hosts that were in a room at the given event ids diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 8d9086ecf0..596275c23c 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -24,6 +24,7 @@ from typing import ( Iterable, List, Optional, + Sequence, Set, Tuple, ) @@ -494,7 +495,7 @@ class PersistEventsStore: event_chain_id_gen: SequenceGenerator, event_to_room_id: Dict[str, str], event_to_types: Dict[str, Tuple[str, str]], - event_to_auth_chain: Dict[str, List[str]], + event_to_auth_chain: Dict[str, Sequence[str]], ) -> None: """Calculate the chain cover index for the given events. @@ -786,7 +787,7 @@ class PersistEventsStore: event_chain_id_gen: SequenceGenerator, event_to_room_id: Dict[str, str], event_to_types: Dict[str, Tuple[str, str]], - event_to_auth_chain: Dict[str, List[str]], + event_to_auth_chain: Dict[str, Sequence[str]], events_to_calc_chain_id_for: Set[str], chain_map: Dict[str, Tuple[int, int]], ) -> Dict[str, Tuple[int, int]]: @@ -1794,7 +1795,7 @@ class PersistEventsStore: ) # Insert an edge for every prev_event connection - for prev_event_id in event.prev_events: + for prev_event_id in event.prev_event_ids(): self.db_pool.simple_insert_txn( txn, table="insertion_event_edges", diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 4b288bb2e7..033a9831d6 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -570,7 +570,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): async def get_joined_users_from_context( self, event: EventBase, context: EventContext - ): + ) -> Dict[str, ProfileInfo]: state_group = context.state_group if not state_group: # If state_group is None it means it has yet to be assigned a @@ -584,7 +584,9 @@ class RoomMemberWorkerStore(EventsWorkerStore): event.room_id, state_group, current_state_ids, event=event, context=context ) - async def get_joined_users_from_state(self, room_id, state_entry): + async def get_joined_users_from_state( + self, room_id, state_entry + ) -> Dict[str, ProfileInfo]: state_group = state_entry.state_group if not state_group: # If state_group is None it means it has yet to be assigned a @@ -607,7 +609,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): cache_context, event=None, context=None, - ): + ) -> Dict[str, ProfileInfo]: # We don't use `state_group`, it's there so that we can cache based # on it. However, it's important that it's never None, since two current_states # with a state_group of None are likely to be different. -- cgit 1.5.1 From 6250b95efe88385bb3ec2842d5eb76f42ef762ef Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Nov 2021 15:46:48 +0000 Subject: Add index to `local_group_updates.stream_id` (#11231) This should speed up startup times and generally increase performance of groups. --- changelog.d/11231.misc | 1 + scripts/synapse_port_db | 2 ++ synapse/storage/databases/main/group_server.py | 17 ++++++++++++++++- .../schema/main/delta/65/04_local_group_updates.sql | 18 ++++++++++++++++++ 4 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 changelog.d/11231.misc create mode 100644 synapse/storage/schema/main/delta/65/04_local_group_updates.sql (limited to 'synapse/storage/databases') diff --git a/changelog.d/11231.misc b/changelog.d/11231.misc new file mode 100644 index 0000000000..c7fca7071e --- /dev/null +++ b/changelog.d/11231.misc @@ -0,0 +1 @@ +Minor speed up to start up times and getting updates for groups by adding missing index to `local_group_updates.stream_id`. diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 349866eb9a..640ff15277 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -43,6 +43,7 @@ from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyBackground from synapse.storage.databases.main.events_bg_updates import ( EventsBackgroundUpdatesStore, ) +from synapse.storage.databases.main.group_server import GroupServerWorkerStore from synapse.storage.databases.main.media_repository import ( MediaRepositoryBackgroundUpdateStore, ) @@ -181,6 +182,7 @@ class Store( StatsStore, PusherWorkerStore, PresenceBackgroundUpdateStore, + GroupServerWorkerStore, ): def execute(self, f, *args, **kwargs): return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs) diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py index e70d3649ff..bb621df0dd 100644 --- a/synapse/storage/databases/main/group_server.py +++ b/synapse/storage/databases/main/group_server.py @@ -13,15 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from typing_extensions import TypedDict from synapse.api.errors import SynapseError from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.storage.database import DatabasePool +from synapse.storage.types import Connection from synapse.types import JsonDict from synapse.util import json_encoder +if TYPE_CHECKING: + from synapse.server import HomeServer + # The category ID for the "default" category. We don't store as null in the # database to avoid the fun of null != null _DEFAULT_CATEGORY_ID = "" @@ -35,6 +40,16 @@ class _RoomInGroup(TypedDict): class GroupServerWorkerStore(SQLBaseStore): + def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"): + database.updates.register_background_index_update( + update_name="local_group_updates_index", + index_name="local_group_updates_stream_id_index", + table="local_group_updates", + columns=("stream_id",), + unique=True, + ) + super().__init__(database, db_conn, hs) + async def get_group(self, group_id: str) -> Optional[Dict[str, Any]]: return await self.db_pool.simple_select_one( table="groups", diff --git a/synapse/storage/schema/main/delta/65/04_local_group_updates.sql b/synapse/storage/schema/main/delta/65/04_local_group_updates.sql new file mode 100644 index 0000000000..a178abfe12 --- /dev/null +++ b/synapse/storage/schema/main/delta/65/04_local_group_updates.sql @@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Check index on `local_group_updates.stream_id`. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6504, 'local_group_updates_index', '{}'); -- cgit 1.5.1