diff options
Diffstat (limited to 'synapse')
37 files changed, 197 insertions, 142 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index 1613941759..b1369aca8f 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -20,8 +20,6 @@ import json import os import sys -from matrix_common.versionstring import get_distribution_version_string - # Check that we're not running on an unsupported Python version. if sys.version_info < (3, 7): print("Synapse requires Python 3.7 or above.") @@ -70,7 +68,9 @@ try: except ImportError: pass -__version__ = get_distribution_version_string("matrix-synapse") +import synapse.util + +__version__ = synapse.util.SYNAPSE_VERSION if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 361b51d2fa..c753dfa7cb 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -40,7 +40,6 @@ from typing import ( ) import yaml -from matrix_common.versionstring import get_distribution_version_string from typing_extensions import TypedDict from twisted.internet import defer, reactor as reactor_ @@ -84,7 +83,7 @@ from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStor from synapse.storage.engines import create_engine from synapse.storage.prepare_database import prepare_database from synapse.types import ISynapseReactor -from synapse.util import Clock +from synapse.util import SYNAPSE_VERSION, Clock # Cast safety: Twisted does some naughty magic which replaces the # twisted.internet.reactor module with a Reactor instance at runtime. @@ -258,9 +257,7 @@ class MockHomeserver: self.clock = Clock(reactor) self.config = config self.hostname = config.server.server_name - self.version_string = "Synapse/" + get_distribution_version_string( - "matrix-synapse" - ) + self.version_string = SYNAPSE_VERSION def get_clock(self) -> Clock: return self.clock diff --git a/synapse/_scripts/update_synapse_database.py b/synapse/_scripts/update_synapse_database.py index c443522c05..b4aeae6dd5 100755 --- a/synapse/_scripts/update_synapse_database.py +++ b/synapse/_scripts/update_synapse_database.py @@ -19,7 +19,6 @@ import sys from typing import cast import yaml -from matrix_common.versionstring import get_distribution_version_string from twisted.internet import defer, reactor as reactor_ @@ -28,6 +27,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.server import HomeServer from synapse.storage import DataStore from synapse.types import ISynapseReactor +from synapse.util import SYNAPSE_VERSION # Cast safety: Twisted does some naughty magic which replaces the # twisted.internet.reactor module with a Reactor instance at runtime. @@ -43,8 +43,7 @@ class MockHomeserver(HomeServer): hostname=config.server.server_name, config=config, reactor=reactor, - version_string="Synapse/" - + get_distribution_version_string("matrix-synapse"), + version_string=f"Synapse/{SYNAPSE_VERSION}", ) diff --git a/synapse/app/_base.py b/synapse/app/_base.py index a3446ac6e8..84e389a6cd 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -37,7 +37,6 @@ from typing import ( ) from cryptography.utils import CryptographyDeprecationWarning -from matrix_common.versionstring import get_distribution_version_string from typing_extensions import ParamSpec import twisted @@ -68,6 +67,7 @@ from synapse.metrics import install_gc_manager, register_threadpool from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.metrics.jemalloc import setup_jemalloc_stats from synapse.types import ISynapseReactor +from synapse.util import SYNAPSE_VERSION from synapse.util.caches.lrucache import setup_expire_lru_cache_entries from synapse.util.daemonize import daemonize_process from synapse.util.gai_resolver import GAIResolver @@ -540,7 +540,7 @@ def setup_sentry(hs: "HomeServer") -> None: sentry_sdk.init( dsn=hs.config.metrics.sentry_dsn, - release=get_distribution_version_string("matrix-synapse"), + release=SYNAPSE_VERSION, ) # We set some default tags that give some context to this instance diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index 6fedf681f8..561621a285 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -19,8 +19,6 @@ import sys import tempfile from typing import List, Optional -from matrix_common.versionstring import get_distribution_version_string - from twisted.internet import defer, task import synapse @@ -43,6 +41,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.server import HomeServer from synapse.storage.databases.main.room import RoomWorkerStore from synapse.types import StateMap +from synapse.util import SYNAPSE_VERSION from synapse.util.logcontext import LoggingContext logger = logging.getLogger("synapse.app.admin_cmd") @@ -220,7 +219,7 @@ def start(config_options: List[str]) -> None: ss = AdminCmdServer( config.server.server_name, config=config, - version_string="Synapse/" + get_distribution_version_string("matrix-synapse"), + version_string=f"Synapse/{SYNAPSE_VERSION}", ) setup_logging(ss, config, use_worker_options=True) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 89f8998f0e..4a987fb759 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -16,8 +16,6 @@ import logging import sys from typing import Dict, List, Optional, Tuple -from matrix_common.versionstring import get_distribution_version_string - from twisted.internet import address from twisted.web.resource import Resource @@ -121,6 +119,7 @@ from synapse.storage.databases.main.transactions import TransactionWorkerStore from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore from synapse.storage.databases.main.user_directory import UserDirectoryStore from synapse.types import JsonDict +from synapse.util import SYNAPSE_VERSION from synapse.util.httpresourcetree import create_resource_tree logger = logging.getLogger("synapse.app.generic_worker") @@ -447,7 +446,7 @@ def start(config_options: List[str]) -> None: hs = GenericWorkerServer( config.server.server_name, config=config, - version_string="Synapse/" + get_distribution_version_string("matrix-synapse"), + version_string=f"Synapse/{SYNAPSE_VERSION}", ) setup_logging(hs, config, use_worker_options=True) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 4c6c0658ab..745e704141 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -18,8 +18,6 @@ import os import sys from typing import Dict, Iterable, List -from matrix_common.versionstring import get_distribution_version_string - from twisted.internet.tcp import Port from twisted.web.resource import EncodingResourceWrapper, Resource from twisted.web.server import GzipEncoderFactory @@ -69,7 +67,7 @@ from synapse.rest.synapse.client import build_synapse_client_resource_tree from synapse.rest.well_known import well_known_resource from synapse.server import HomeServer from synapse.storage import DataStore -from synapse.util.check_dependencies import check_requirements +from synapse.util.check_dependencies import VERSION, check_requirements from synapse.util.httpresourcetree import create_resource_tree from synapse.util.module_loader import load_module @@ -371,7 +369,7 @@ def setup(config_options: List[str]) -> SynapseHomeServer: hs = SynapseHomeServer( config.server.server_name, config=config, - version_string="Synapse/" + get_distribution_version_string("matrix-synapse"), + version_string=f"Synapse/{VERSION}", ) synapse.config.logger.setup_logging(hs, config, use_worker_options=False) diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index f2dfd49b07..0a285dba31 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -84,3 +84,6 @@ class ExperimentalConfig(Config): # MSC3772: A push rule for mutual relations. self.msc3772_enabled: bool = experimental.get("msc3772_enabled", False) + + # MSC3715: dir param on /relations. + self.msc3715_enabled: bool = experimental.get("msc3715_enabled", False) diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 470b8b4492..82a5b5fa12 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -22,7 +22,6 @@ from string import Template from typing import TYPE_CHECKING, Any, Dict, Optional import yaml -from matrix_common.versionstring import get_distribution_version_string from zope.interface import implementer from twisted.logger import ( @@ -37,6 +36,7 @@ from synapse.logging.context import LoggingContextFilter from synapse.logging.filter import MetadataFilter from synapse.types import JsonDict +from ..util import SYNAPSE_VERSION from ._base import Config, ConfigError if TYPE_CHECKING: @@ -349,7 +349,7 @@ def setup_logging( logging.warning( "Server %s version %s", sys.argv[0], - get_distribution_version_string("matrix-synapse"), + SYNAPSE_VERSION, ) logging.info("Server hostname: %s", config.server.server_name) logging.info("Instance name: %s", hs.get_instance_name()) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ad475a913b..66e6305562 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1642,10 +1642,6 @@ def _validate_hierarchy_event(d: JsonDict) -> None: if not isinstance(event_type, str): raise ValueError("Invalid event: 'event_type' must be a str") - room_id = d.get("room_id") - if not isinstance(room_id, str): - raise ValueError("Invalid event: 'room_id' must be a str") - state_key = d.get("state_key") if not isinstance(state_key, str): raise ValueError("Invalid event: 'state_key' must be a str") diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 333ca9a97f..41d8b937af 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -37,6 +37,7 @@ from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import ReadReceipt from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter +from synapse.visibility import filter_events_for_server if TYPE_CHECKING: import synapse.server @@ -77,6 +78,7 @@ class PerDestinationQueue: ): self._server_name = hs.hostname self._clock = hs.get_clock() + self._storage_controllers = hs.get_storage_controllers() self._store = hs.get_datastores().main self._transaction_manager = transaction_manager self._instance_name = hs.get_instance_name() @@ -442,6 +444,12 @@ class PerDestinationQueue: "This should not happen." % event_ids ) + logger.info( + "Catching up destination %s with %d PDUs", + self._destination, + len(catchup_pdus), + ) + # We send transactions with events from one room only, as its likely # that the remote will have to do additional processing, which may # take some time. It's better to give it small amounts of work @@ -487,19 +495,20 @@ class PerDestinationQueue: ): continue - # Filter out events where the server is not in the room, - # e.g. it may have left/been kicked. *Ideally* we'd pull - # out the kick and send that, but it's a rare edge case - # so we don't bother for now (the server that sent the - # kick should send it out if its online). - hosts = await self._state.get_hosts_in_room_at_events( - p.room_id, [p.event_id] - ) - if self._destination not in hosts: - continue - new_pdus.append(p) + # Filter out events where the server is not in the room, + # e.g. it may have left/been kicked. *Ideally* we'd pull + # out the kick and send that, but it's a rare edge case + # so we don't bother for now (the server that sent the + # kick should send it out if its online). + new_pdus = await filter_events_for_server( + self._storage_controllers, + self._destination, + new_pdus, + redact=False, + ) + # If we've filtered out all the extremities, fall back to # sending the original event. This should ensure that the # server gets at least some of missed events (especially if diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index 7dfb890661..f7884bfbe0 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -24,7 +24,6 @@ from typing import ( Union, ) -from matrix_common.versionstring import get_distribution_version_string from typing_extensions import Literal from synapse.api.constants import EduTypes @@ -42,6 +41,7 @@ from synapse.http.servlet import ( parse_strings_from_args, ) from synapse.types import JsonDict +from synapse.util import SYNAPSE_VERSION from synapse.util.ratelimitutils import FederationRateLimiter if TYPE_CHECKING: @@ -622,7 +622,7 @@ class FederationVersionServlet(BaseFederationServlet): { "server": { "name": "Synapse", - "version": get_distribution_version_string("matrix-synapse"), + "version": SYNAPSE_VERSION, } }, ) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index fbafbbee6b..6e15028b0a 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -81,6 +81,8 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +INVALID_USERNAME_OR_PASSWORD = "Invalid username or password" + def convert_client_dict_legacy_fields_to_identifier( submission: JsonDict, @@ -1215,7 +1217,9 @@ class AuthHandler: await self._failed_login_attempts_ratelimiter.can_do_action( None, (medium, address) ) - raise LoginError(403, "", errcode=Codes.FORBIDDEN) + raise LoginError( + 403, msg=INVALID_USERNAME_OR_PASSWORD, errcode=Codes.FORBIDDEN + ) identifier_dict = {"type": "m.id.user", "user": user_id} @@ -1341,7 +1345,7 @@ class AuthHandler: # We raise a 403 here, but note that if we're doing user-interactive # login, it turns all LoginErrors into a 401 anyway. - raise LoginError(403, "Invalid password", errcode=Codes.FORBIDDEN) + raise LoginError(403, msg=INVALID_USERNAME_OR_PASSWORD, errcode=Codes.FORBIDDEN) async def check_password_provider_3pid( self, medium: str, address: str, password: str diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index a0cbeedc30..b79c551703 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -398,35 +398,6 @@ class DeviceHandler(DeviceWorkerHandler): await self.delete_devices(user_id, user_devices) @trace - async def delete_device(self, user_id: str, device_id: str) -> None: - """Delete the given device - - Args: - user_id: The user to delete the device from. - device_id: The device to delete. - """ - - try: - await self.store.delete_device(user_id, device_id) - except errors.StoreError as e: - if e.code == 404: - # no match - set_tag("error", True) - log_kv( - {"reason": "User doesn't have device id.", "device_id": device_id} - ) - else: - raise - - await self._auth_handler.delete_access_tokens_for_user( - user_id, device_id=device_id - ) - - await self.store.delete_e2e_keys_by_device(user_id=user_id, device_id=device_id) - - await self.notify_device_update(user_id, [device_id]) - - @trace async def delete_all_devices_for_user( self, user_id: str, except_device_id: Optional[str] = None ) -> None: @@ -591,7 +562,7 @@ class DeviceHandler(DeviceWorkerHandler): user_id, device_id, device_data ) if old_device_id is not None: - await self.delete_device(user_id, old_device_id) + await self.delete_devices(user_id, [old_device_id]) return device_id async def get_dehydrated_device( @@ -638,7 +609,7 @@ class DeviceHandler(DeviceWorkerHandler): await self.store.update_device(user_id, device_id, old_device["display_name"]) # can't call self.delete_device because that will clobber the # access token so call the storage layer directly - await self.store.delete_device(user_id, old_device_id) + await self.store.delete_devices(user_id, [old_device_id]) await self.store.delete_e2e_keys_by_device( user_id=user_id, device_id=old_device_id ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6310f0ef27..1e5694244a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -545,6 +545,7 @@ class FederationHandler: if ret.partial_state: # TODO(faster_joins): roll this back if we don't manage to start the # background resync (eg process_remote_join fails) + # https://github.com/matrix-org/synapse/issues/12998 await self.store.store_partial_state_room(room_id, ret.servers_in_room) max_stream_id = await self._federation_event_handler.process_remote_join( @@ -1498,14 +1499,17 @@ class FederationHandler: # TODO(faster_joins): do we need to lock to avoid races? What happens if other # worker processes kick off a resync in parallel? Perhaps we should just elect # a single worker to do the resync. + # https://github.com/matrix-org/synapse/issues/12994 # # TODO(faster_joins): what happens if we leave the room during a resync? if we # really leave, that might mean we have difficulty getting the room state over # federation. + # https://github.com/matrix-org/synapse/issues/12802 # # TODO(faster_joins): we need some way of prioritising which homeservers in # `other_destinations` to try first, otherwise we'll spend ages trying dead # homeservers for large rooms. + # https://github.com/matrix-org/synapse/issues/12999 if initial_destination is None and len(other_destinations) == 0: raise ValueError( @@ -1535,9 +1539,11 @@ class FederationHandler: # all the events are updated, so we can update current state and # clear the lazy-loading flag. logger.info("Updating current state for %s", room_id) + # TODO(faster_joins): support workers + # https://github.com/matrix-org/synapse/issues/12994 assert ( self._storage_controllers.persistence is not None - ), "TODO(faster_joins): support for workers" + ), "worker-mode deployments not currently supported here" await self._storage_controllers.persistence.update_current_state( room_id ) @@ -1551,6 +1557,8 @@ class FederationHandler: ) # TODO(faster_joins) update room stats and user directory? + # https://github.com/matrix-org/synapse/issues/12814 + # https://github.com/matrix-org/synapse/issues/12815 return # we raced against more events arriving with partial state. Go round @@ -1558,6 +1566,8 @@ class FederationHandler: # TODO(faster_joins): there is still a race here, whereby incoming events which raced # with us will fail to be persisted after the call to `clear_partial_state_room` due to # having partial state. + # https://github.com/matrix-org/synapse/issues/12988 + # continue events = await self.store.get_events_as_list( @@ -1580,6 +1590,7 @@ class FederationHandler: # indefinitely is also not the right thing to do if we can # reach all homeservers and they all claim they don't have # the state we want. + # https://github.com/matrix-org/synapse/issues/13000 logger.error( "Failed to get state for %s at %s from %s because %s, " "giving up!", diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 9488fef297..6c9e6a00b5 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -532,6 +532,7 @@ class FederationEventHandler: # # TODO(faster_joins): we probably need to be more intelligent, and # exclude partial-state prev_events from consideration + # https://github.com/matrix-org/synapse/issues/13001 logger.warning( "%s still has partial state: can't de-partial-state it yet", event.event_id, @@ -777,6 +778,7 @@ class FederationEventHandler: state_ids = await self._resolve_state_at_missing_prevs(origin, event) # TODO(faster_joins): make sure that _resolve_state_at_missing_prevs does # not return partial state + # https://github.com/matrix-org/synapse/issues/13002 await self._process_received_pdu( origin, event, state_ids=state_ids, backfilled=backfilled diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c8bbcfd8c2..9b17939163 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1102,6 +1102,7 @@ class EventCreationHandler: # # TODO(faster_joins): figure out how this works, and make sure that the # old state is complete. + # https://github.com/matrix-org/synapse/issues/13003 metadata = await self.store.get_metadata_for_events(state_event_ids) state_map_for_event: MutableStateMap[str] = {} diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index fffd83546c..496fce2ecc 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -35,7 +35,6 @@ from typing import ( ) import attr -from matrix_common.versionstring import get_distribution_version_string from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric from prometheus_client.core import ( REGISTRY, @@ -54,6 +53,7 @@ from synapse.metrics._exposition import ( ) from synapse.metrics._gc import MIN_TIME_BETWEEN_GCS, install_gc_manager from synapse.metrics._types import Collector +from synapse.util import SYNAPSE_VERSION logger = logging.getLogger(__name__) @@ -419,7 +419,7 @@ build_info = Gauge( ) build_info.labels( " ".join([platform.python_implementation(), platform.python_version()]), - get_distribution_version_string("matrix-synapse"), + SYNAPSE_VERSION, " ".join([platform.system(), platform.release()]), ).set(1) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index a8ad575fcd..30b2aeffdd 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -799,7 +799,7 @@ class ModuleApi: if device_id: # delete the device, which will also delete its access tokens yield defer.ensureDeferred( - self._hs.get_device_handler().delete_device(user_id, device_id) + self._hs.get_device_handler().delete_devices(user_id, [device_id]) ) else: # no associated device. Just delete the access token. diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 1aa08f8d95..fa3266720b 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -20,8 +20,6 @@ import platform from http import HTTPStatus from typing import TYPE_CHECKING, Optional, Tuple -from matrix_common.versionstring import get_distribution_version_string - from synapse.api.errors import Codes, NotFoundError, SynapseError from synapse.http.server import HttpServer, JsonResource from synapse.http.servlet import RestServlet, parse_json_object_from_request @@ -88,6 +86,7 @@ from synapse.rest.admin.users import ( WhoisRestServlet, ) from synapse.types import JsonDict, RoomStreamToken +from synapse.util import SYNAPSE_VERSION if TYPE_CHECKING: from synapse.server import HomeServer @@ -100,7 +99,7 @@ class VersionServlet(RestServlet): def __init__(self, hs: "HomeServer"): self.res = { - "server_version": get_distribution_version_string("matrix-synapse"), + "server_version": SYNAPSE_VERSION, "python_version": platform.python_version(), } diff --git a/synapse/rest/admin/devices.py b/synapse/rest/admin/devices.py index cef46ba0dd..d934880102 100644 --- a/synapse/rest/admin/devices.py +++ b/synapse/rest/admin/devices.py @@ -80,7 +80,7 @@ class DeviceRestServlet(RestServlet): if u is None: raise NotFoundError("Unknown user") - await self.device_handler.delete_device(target_user.to_string(), device_id) + await self.device_handler.delete_devices(target_user.to_string(), [device_id]) return HTTPStatus.OK, {} async def on_PUT( diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py index ad6fd6492b..6fab102437 100644 --- a/synapse/rest/client/devices.py +++ b/synapse/rest/client/devices.py @@ -147,7 +147,9 @@ class DeviceRestServlet(RestServlet): can_skip_ui_auth=True, ) - await self.device_handler.delete_device(requester.user.to_string(), device_id) + await self.device_handler.delete_devices( + requester.user.to_string(), [device_id] + ) return 200, {} async def on_PUT( diff --git a/synapse/rest/client/logout.py b/synapse/rest/client/logout.py index 193a6951b9..23dfa4518f 100644 --- a/synapse/rest/client/logout.py +++ b/synapse/rest/client/logout.py @@ -45,8 +45,8 @@ class LogoutRestServlet(RestServlet): access_token = self.auth.get_access_token_from_request(request) await self._auth_handler.delete_access_token(access_token) else: - await self._device_handler.delete_device( - requester.user.to_string(), requester.device_id + await self._device_handler.delete_devices( + requester.user.to_string(), [requester.device_id] ) return 200, {} diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py index 3cae6d2b55..ce97080013 100644 --- a/synapse/rest/client/relations.py +++ b/synapse/rest/client/relations.py @@ -43,6 +43,7 @@ class RelationPaginationServlet(RestServlet): self.auth = hs.get_auth() self.store = hs.get_datastores().main self._relations_handler = hs.get_relations_handler() + self._msc3715_enabled = hs.config.experimental.msc3715_enabled async def on_GET( self, @@ -55,9 +56,15 @@ class RelationPaginationServlet(RestServlet): requester = await self.auth.get_user_by_req(request, allow_guest=True) limit = parse_integer(request, "limit", default=5) - direction = parse_string( - request, "org.matrix.msc3715.dir", default="b", allowed_values=["f", "b"] - ) + if self._msc3715_enabled: + direction = parse_string( + request, + "org.matrix.msc3715.dir", + default="b", + allowed_values=["f", "b"], + ) + else: + direction = "b" from_token_str = parse_string(request, "from") to_token_str = parse_string(request, "to") diff --git a/synapse/state/v2.py b/synapse/state/v2.py index 041ccac59e..6a16f38a15 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -17,12 +17,14 @@ import itertools import logging from typing import ( Any, + Awaitable, Callable, Collection, Dict, Generator, Iterable, List, + Mapping, Optional, Sequence, Set, @@ -30,33 +32,58 @@ from typing import ( overload, ) -from typing_extensions import Literal +from typing_extensions import Literal, Protocol -import synapse.state from synapse import event_auth from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.api.room_versions import RoomVersion from synapse.events import EventBase from synapse.types import MutableStateMap, StateMap -from synapse.util import Clock logger = logging.getLogger(__name__) +class Clock(Protocol): + # This is usually synapse.util.Clock, but it's replaced with a FakeClock in tests. + # We only ever sleep(0) though, so that other async functions can make forward + # progress without waiting for stateres to complete. + def sleep(self, duration_ms: float) -> Awaitable[None]: + ... + + +class StateResolutionStore(Protocol): + # This is usually synapse.state.StateResolutionStore, but it's replaced with a + # TestStateResolutionStore in tests. + def get_events( + self, event_ids: Collection[str], allow_rejected: bool = False + ) -> Awaitable[Dict[str, EventBase]]: + ... + + def get_auth_chain_difference( + self, room_id: str, state_sets: List[Set[str]] + ) -> Awaitable[Set[str]]: + ... + + # We want to await to the reactor occasionally during state res when dealing # with large data sets, so that we don't exhaust the reactor. This is done by # awaiting to reactor during loops every N iterations. _AWAIT_AFTER_ITERATIONS = 100 +__all__ = [ + "resolve_events_with_store", +] + + async def resolve_events_with_store( clock: Clock, room_id: str, room_version: RoomVersion, state_sets: Sequence[StateMap[str]], event_map: Optional[Dict[str, EventBase]], - state_res_store: "synapse.state.StateResolutionStore", + state_res_store: StateResolutionStore, ) -> StateMap[str]: """Resolves the state using the v2 state resolution algorithm @@ -194,7 +221,7 @@ async def _get_power_level_for_sender( room_id: str, event_id: str, event_map: Dict[str, EventBase], - state_res_store: "synapse.state.StateResolutionStore", + state_res_store: StateResolutionStore, ) -> int: """Return the power level of the sender of the given event according to their auth events. @@ -243,9 +270,9 @@ async def _get_power_level_for_sender( async def _get_auth_chain_difference( room_id: str, - state_sets: Sequence[StateMap[str]], + state_sets: Sequence[Mapping[Any, str]], event_map: Dict[str, EventBase], - state_res_store: "synapse.state.StateResolutionStore", + state_res_store: StateResolutionStore, ) -> Set[str]: """Compare the auth chains of each state set and return the set of events that only appear in some but not all of the auth chains. @@ -406,7 +433,7 @@ async def _add_event_and_auth_chain_to_graph( room_id: str, event_id: str, event_map: Dict[str, EventBase], - state_res_store: "synapse.state.StateResolutionStore", + state_res_store: StateResolutionStore, auth_diff: Set[str], ) -> None: """Helper function for _reverse_topological_power_sort that add the event @@ -440,7 +467,7 @@ async def _reverse_topological_power_sort( room_id: str, event_ids: Iterable[str], event_map: Dict[str, EventBase], - state_res_store: "synapse.state.StateResolutionStore", + state_res_store: StateResolutionStore, auth_diff: Set[str], ) -> List[str]: """Returns a list of the event_ids sorted by reverse topological ordering, @@ -501,7 +528,7 @@ async def _iterative_auth_checks( event_ids: List[str], base_state: StateMap[str], event_map: Dict[str, EventBase], - state_res_store: "synapse.state.StateResolutionStore", + state_res_store: StateResolutionStore, ) -> MutableStateMap[str]: """Sequentially apply auth checks to each event in given list, updating the state as it goes along. @@ -569,7 +596,7 @@ async def _mainline_sort( event_ids: List[str], resolved_power_event_id: Optional[str], event_map: Dict[str, EventBase], - state_res_store: "synapse.state.StateResolutionStore", + state_res_store: StateResolutionStore, ) -> List[str]: """Returns a sorted list of event_ids sorted by mainline ordering based on the given event resolved_power_event_id @@ -638,7 +665,7 @@ async def _get_mainline_depth_for_event( event: EventBase, mainline_map: Dict[str, int], event_map: Dict[str, EventBase], - state_res_store: "synapse.state.StateResolutionStore", + state_res_store: StateResolutionStore, ) -> int: """Get the mainline depths for the given event based on the mainline map @@ -682,7 +709,7 @@ async def _get_event( room_id: str, event_id: str, event_map: Dict[str, EventBase], - state_res_store: "synapse.state.StateResolutionStore", + state_res_store: StateResolutionStore, allow_none: Literal[False] = False, ) -> EventBase: ... @@ -693,7 +720,7 @@ async def _get_event( room_id: str, event_id: str, event_map: Dict[str, EventBase], - state_res_store: "synapse.state.StateResolutionStore", + state_res_store: StateResolutionStore, allow_none: Literal[True], ) -> Optional[EventBase]: ... @@ -703,7 +730,7 @@ async def _get_event( room_id: str, event_id: str, event_map: Dict[str, EventBase], - state_res_store: "synapse.state.StateResolutionStore", + state_res_store: StateResolutionStore, allow_none: bool = False, ) -> Optional[EventBase]: """Helper function to look up event in event_map, falling back to looking diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 4caaa81808..4bcb99d06e 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -388,10 +388,13 @@ class EventsPersistenceStorageController: # TODO(faster_joins): get a real stream ordering, to make this work correctly # across workers. + # https://github.com/matrix-org/synapse/issues/12994 # # TODO(faster_joins): this can race against event persistence, in which case we # will end up with incorrect state. Perhaps we should make this a job we - # farm out to the event persister, somehow. + # farm out to the event persister thread, somehow. + # https://github.com/matrix-org/synapse/issues/13007 + # stream_id = self.main_store.get_room_max_stream_ordering() await self.persist_events_store.update_current_state(room_id, delta, stream_id) diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index 3b4cdb67eb..d3a44bc876 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -452,6 +452,9 @@ class StateStorageController: up to date. """ # FIXME(faster_joins): what do we do here? + # https://github.com/matrix-org/synapse/issues/12814 + # https://github.com/matrix-org/synapse/issues/12815 + # https://github.com/matrix-org/synapse/issues/13008 return await self.stores.main.get_partial_current_state_deltas( prev_stream_id, max_stream_id diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index d900064c07..71e7863dd8 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1433,16 +1433,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): ) raise StoreError(500, "Problem storing device.") - async def delete_device(self, user_id: str, device_id: str) -> None: - """Delete a device and its device_inbox. - - Args: - user_id: The ID of the user which owns the device - device_id: The ID of the device to delete - """ - - await self.delete_devices(user_id, [device_id]) - async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: """Deletes several devices. diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 17e35cf63e..a8773374be 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -46,7 +46,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.events_worker import EventCacheEntry from synapse.storage.databases.main.search import SearchEntry -from synapse.storage.engines.postgres import PostgresEngine +from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import AbstractStreamIdGenerator from synapse.storage.util.sequence import SequenceGenerator from synapse.types import JsonDict, StateMap, get_domain_from_id diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 21e954ccc1..b6106affa6 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -36,6 +36,7 @@ from synapse.storage.database import ( LoggingTransaction, ) from synapse.storage.engines import PostgresEngine +from synapse.storage.engines._base import IsolationLevel from synapse.storage.util.id_generators import ( AbstractStreamIdTracker, MultiWriterIdGenerator, @@ -764,6 +765,10 @@ class ReceiptsWorkerStore(SQLBaseStore): linearized_event_id, data, stream_id=stream_id, + # Read committed is actually beneficial here because we check for a receipt with + # greater stream order, and checking the very latest data at select time is better + # than the data at transaction start time. + isolation_level=IsolationLevel.READ_COMMITTED, ) # If the receipt was older than the currently persisted one, nothing to do. diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 68d4fc2e64..5760d3428e 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1112,6 +1112,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): # this can race with incoming events, so we watch out for FK errors. # TODO(faster_joins): this still doesn't completely fix the race, since the persist process # is not atomic. I fear we need an application-level lock. + # https://github.com/matrix-org/synapse/issues/12988 try: await self.db_pool.runInteraction( "clear_partial_state_room", self._clear_partial_state_room_txn, room_id @@ -1119,6 +1120,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): return True except self.db_pool.engine.module.DatabaseError as e: # TODO(faster_joins): how do we distinguish between FK errors and other errors? + # https://github.com/matrix-org/synapse/issues/12988 logger.warning( "Exception while clearing lazy partial-state-room %s, retrying: %s", room_id, diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index bdd00273cd..9674c4a757 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -127,13 +127,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): NotFoundError: if the room is unknown """ - # First we try looking up room version from the database, but for old - # rooms we might not have added the room version to it yet so we fall - # back to previous behaviour and look in current state events. - # # We really should have an entry in the rooms table for every room we - # care about, but let's be a bit paranoid (at least while the background - # update is happening) to avoid breaking existing rooms. + # care about, but let's be a bit paranoid. room_version = self.db_pool.simple_select_one_onecol_txn( txn, table="rooms", @@ -440,6 +435,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): ) # TODO(faster_joins): need to do something about workers here + # https://github.com/matrix-org/synapse/issues/12994 txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,)) txn.call_after( self._get_state_group_for_event.prefill, diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index f51b3d228e..a182e8a098 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -11,11 +11,35 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Mapping +from typing import Any, Mapping, NoReturn from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup -from .postgres import PostgresEngine -from .sqlite import Sqlite3Engine + +# The classes `PostgresEngine` and `Sqlite3Engine` must always be importable, because +# we use `isinstance(engine, PostgresEngine)` to write different queries for postgres +# and sqlite. But the database driver modules are both optional: they may not be +# installed. To account for this, create dummy classes on import failure so we can +# still run `isinstance()` checks. +try: + from .postgres import PostgresEngine +except ImportError: + + class PostgresEngine(BaseDatabaseEngine): # type: ignore[no-redef] + def __new__(cls, *args: object, **kwargs: object) -> NoReturn: # type: ignore[misc] + raise RuntimeError( + f"Cannot create {cls.__name__} -- psycopg2 module is not installed" + ) + + +try: + from .sqlite import Sqlite3Engine +except ImportError: + + class Sqlite3Engine(BaseDatabaseEngine): # type: ignore[no-redef] + def __new__(cls, *args: object, **kwargs: object) -> NoReturn: # type: ignore[misc] + raise RuntimeError( + f"Cannot create {cls.__name__} -- sqlite3 module is not installed" + ) def create_engine(database_config: Mapping[str, Any]) -> BaseDatabaseEngine: @@ -30,4 +54,10 @@ def create_engine(database_config: Mapping[str, Any]) -> BaseDatabaseEngine: raise RuntimeError("Unsupported database engine '%s'" % (name,)) -__all__ = ["create_engine", "BaseDatabaseEngine", "IncorrectDatabaseSetup"] +__all__ = [ + "create_engine", + "BaseDatabaseEngine", + "PostgresEngine", + "Sqlite3Engine", + "IncorrectDatabaseSetup", +] diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 391f8ed24a..517f9d5f98 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -15,6 +15,8 @@ import logging from typing import TYPE_CHECKING, Any, Mapping, NoReturn, Optional, Tuple, cast +import psycopg2.extensions + from synapse.storage.engines._base import ( BaseDatabaseEngine, IncorrectDatabaseSetup, @@ -23,18 +25,14 @@ from synapse.storage.engines._base import ( from synapse.storage.types import Cursor if TYPE_CHECKING: - import psycopg2 # noqa: F401 - from synapse.storage.database import LoggingDatabaseConnection logger = logging.getLogger(__name__) -class PostgresEngine(BaseDatabaseEngine["psycopg2.connection"]): +class PostgresEngine(BaseDatabaseEngine[psycopg2.extensions.connection]): def __init__(self, database_config: Mapping[str, Any]): - import psycopg2.extensions - super().__init__(psycopg2, database_config) psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) @@ -69,7 +67,9 @@ class PostgresEngine(BaseDatabaseEngine["psycopg2.connection"]): return collation, ctype def check_database( - self, db_conn: "psycopg2.connection", allow_outdated_version: bool = False + self, + db_conn: psycopg2.extensions.connection, + allow_outdated_version: bool = False, ) -> None: # Get the version of PostgreSQL that we're using. As per the psycopg2 # docs: The number is formed by converting the major, minor, and @@ -176,8 +176,6 @@ class PostgresEngine(BaseDatabaseEngine["psycopg2.connection"]): return True def is_deadlock(self, error: Exception) -> bool: - import psycopg2.extensions - if isinstance(error, psycopg2.DatabaseError): # https://www.postgresql.org/docs/current/static/errcodes-appendix.html # "40001" serialization_failure @@ -185,7 +183,7 @@ class PostgresEngine(BaseDatabaseEngine["psycopg2.connection"]): return error.pgcode in ["40001", "40P01"] return False - def is_connection_closed(self, conn: "psycopg2.connection") -> bool: + def is_connection_closed(self, conn: psycopg2.extensions.connection) -> bool: return bool(conn.closed) def lock_table(self, txn: Cursor, table: str) -> None: @@ -205,18 +203,16 @@ class PostgresEngine(BaseDatabaseEngine["psycopg2.connection"]): else: return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100) - def in_transaction(self, conn: "psycopg2.connection") -> bool: - import psycopg2.extensions - + def in_transaction(self, conn: psycopg2.extensions.connection) -> bool: return conn.status != psycopg2.extensions.STATUS_READY def attempt_to_set_autocommit( - self, conn: "psycopg2.connection", autocommit: bool + self, conn: psycopg2.extensions.connection, autocommit: bool ) -> None: return conn.set_session(autocommit=autocommit) def attempt_to_set_isolation_level( - self, conn: "psycopg2.connection", isolation_level: Optional[int] + self, conn: psycopg2.extensions.connection, isolation_level: Optional[int] ) -> None: if isolation_level is None: isolation_level = self.default_isolation_level diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index c33df42084..09a2b58f4c 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -23,8 +23,7 @@ from typing_extensions import Counter as CounterType from synapse.config.homeserver import HomeServerConfig from synapse.storage.database import LoggingDatabaseConnection -from synapse.storage.engines import BaseDatabaseEngine -from synapse.storage.engines.postgres import PostgresEngine +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION from synapse.storage.types import Cursor diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 96aaffb53c..af3bab2c15 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -546,6 +546,7 @@ class StateFilter: # the sender of a piece of state wasn't actually in the room, then clearly that # state shouldn't have been returned. # We should at least add some tests around this to see what happens. + # https://github.com/matrix-org/synapse/issues/13006 # if we haven't requested membership events, then it depends on the value of # 'include_others' diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index d8046b7553..6323d452e7 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -19,6 +19,7 @@ from typing import Any, Callable, Dict, Generator, Optional import attr from frozendict import frozendict +from matrix_common.versionstring import get_distribution_version_string from twisted.internet import defer, task from twisted.internet.defer import Deferred @@ -183,3 +184,8 @@ def log_failure( if not consumeErrors: return failure return None + + +# Version string with git info. Computed here once so that we don't invoke git multiple +# times. +SYNAPSE_VERSION = get_distribution_version_string("matrix-synapse", __file__) |