summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py4
-rw-r--r--synapse/api/constants.py2
-rw-r--r--synapse/api/errors.py18
-rw-r--r--synapse/api/filtering.py12
-rw-r--r--synapse/config/experimental.py8
-rw-r--r--synapse/config/server.py8
-rw-r--r--synapse/events/utils.py27
-rw-r--r--synapse/federation/federation_server.py28
-rw-r--r--synapse/federation/transport/client.py10
-rw-r--r--synapse/federation/transport/server/_base.py39
-rw-r--r--synapse/federation/transport/server/federation.py2
-rw-r--r--synapse/handlers/device.py38
-rw-r--r--synapse/handlers/events.py21
-rw-r--r--synapse/handlers/federation.py75
-rw-r--r--synapse/handlers/federation_event.py39
-rw-r--r--synapse/handlers/message.py19
-rw-r--r--synapse/handlers/presence.py56
-rw-r--r--synapse/handlers/sync.py15
-rw-r--r--synapse/http/matrixfederationclient.py12
-rw-r--r--synapse/python_dependencies.py151
-rw-r--r--synapse/res/templates/notif.html2
-rw-r--r--synapse/rest/client/login.py9
-rw-r--r--synapse/rest/client/room.py50
-rw-r--r--synapse/rest/client/sync.py9
-rw-r--r--synapse/rest/client/versions.py5
-rw-r--r--synapse/server.py2
-rw-r--r--synapse/storage/databases/main/devices.py65
-rw-r--r--synapse/storage/databases/main/events.py20
-rw-r--r--synapse/storage/databases/main/events_forward_extremities.py8
-rw-r--r--synapse/storage/databases/main/events_worker.py42
-rw-r--r--synapse/storage/databases/main/relations.py78
-rw-r--r--synapse/storage/databases/main/room.py31
-rw-r--r--synapse/storage/databases/main/state.py50
-rw-r--r--synapse/storage/databases/main/stream.py26
-rw-r--r--synapse/storage/persist_events.py56
-rw-r--r--synapse/storage/schema/__init__.py6
36 files changed, 607 insertions, 436 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py

index b62eed66e2..1613941759 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py
@@ -20,6 +20,8 @@ import json import os import sys +from matrix_common.versionstring import get_distribution_version_string + # Check that we're not running on an unsupported Python version. if sys.version_info < (3, 7): print("Synapse requires Python 3.7 or above.") @@ -68,7 +70,7 @@ try: except ImportError: pass -__version__ = "1.57.1" +__version__ = get_distribution_version_string("matrix-synapse") if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 92907415e6..0172eb60b8 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py
@@ -179,8 +179,6 @@ class RelationTypes: REPLACE: Final = "m.replace" REFERENCE: Final = "m.reference" THREAD: Final = "m.thread" - # TODO Remove this in Synapse >= v1.57.0. - UNSTABLE_THREAD: Final = "io.element.thread" class LimitBlockingTypes: diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index e92db29f6d..cb3b7323d5 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py
@@ -79,6 +79,8 @@ class Codes: UNABLE_AUTHORISE_JOIN = "M_UNABLE_TO_AUTHORISE_JOIN" UNABLE_TO_GRANT_JOIN = "M_UNABLE_TO_GRANT_JOIN" + UNREDACTED_CONTENT_DELETED = "FI.MAU.MSC2815_UNREDACTED_CONTENT_DELETED" + class CodeMessageException(RuntimeError): """An exception with integer code and message string attributes. @@ -483,6 +485,22 @@ class RequestSendFailed(RuntimeError): self.can_retry = can_retry +class UnredactedContentDeletedError(SynapseError): + def __init__(self, content_keep_ms: Optional[int] = None): + super().__init__( + 404, + "The content for that event has already been erased from the database", + errcode=Codes.UNREDACTED_CONTENT_DELETED, + ) + self.content_keep_ms = content_keep_ms + + def error_dict(self) -> "JsonDict": + extra = {} + if self.content_keep_ms is not None: + extra = {"fi.mau.msc2815.content_keep_ms": self.content_keep_ms} + return cs_error(self.msg, self.errcode, **extra) + + def cs_error(msg: str, code: str = Codes.UNKNOWN, **kwargs: Any) -> "JsonDict": """Utility method for constructing an error response for client-server interactions. diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 27e97d6f37..4a808e33fe 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py
@@ -89,9 +89,7 @@ ROOM_EVENT_FILTER_SCHEMA = { "org.matrix.not_labels": {"type": "array", "items": {"type": "string"}}, # MSC3440, filtering by event relations. "related_by_senders": {"type": "array", "items": {"type": "string"}}, - "io.element.relation_senders": {"type": "array", "items": {"type": "string"}}, "related_by_rel_types": {"type": "array", "items": {"type": "string"}}, - "io.element.relation_types": {"type": "array", "items": {"type": "string"}}, }, } @@ -323,16 +321,6 @@ class Filter: self.related_by_senders = self.filter_json.get("related_by_senders", None) self.related_by_rel_types = self.filter_json.get("related_by_rel_types", None) - # Fallback to the unstable prefix if the stable version is not given. - if hs.config.experimental.msc3440_enabled: - self.related_by_senders = self.related_by_senders or self.filter_json.get( - "io.element.relation_senders", None - ) - self.related_by_rel_types = ( - self.related_by_rel_types - or self.filter_json.get("io.element.relation_types", None) - ) - def filters_all_types(self) -> bool: return "*" in self.not_types diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 447476fbfa..421ed7481b 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py
@@ -26,9 +26,6 @@ class ExperimentalConfig(Config): def read_config(self, config: JsonDict, **kwargs: Any) -> None: experimental = config.get("experimental_features") or {} - # MSC3440 (thread relation) - self.msc3440_enabled: bool = experimental.get("msc3440_enabled", False) - # MSC3026 (busy presence state) self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False) @@ -77,7 +74,10 @@ class ExperimentalConfig(Config): self.msc3720_enabled: bool = experimental.get("msc3720_enabled", False) # The deprecated groups feature. - self.groups_enabled: bool = experimental.get("groups_enabled", True) + self.groups_enabled: bool = experimental.get("groups_enabled", False) # MSC2654: Unread counts self.msc2654_enabled: bool = experimental.get("msc2654_enabled", False) + + # MSC2815 (allow room moderators to view redacted event content) + self.msc2815_enabled: bool = experimental.get("msc2815_enabled", False) diff --git a/synapse/config/server.py b/synapse/config/server.py
index 415279d269..d771045b52 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py
@@ -680,14 +680,6 @@ class ServerConfig(Config): config.get("use_account_validity_in_account_status") or False ) - # This is a temporary option that enables fully using the new - # `device_lists_changes_in_room` without the backwards compat code. This - # is primarily for testing. If enabled the server should *not* be - # downgraded, as it may lead to missing device list updates. - self.use_new_device_lists_changes_in_room = ( - config.get("use_new_device_lists_changes_in_room") or False - ) - self.rooms_to_exclude_from_sync: List[str] = ( config.get("exclude_rooms_from_sync") or [] ) diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 918e87ed9c..2174b4a094 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py
@@ -39,7 +39,6 @@ from . import EventBase if TYPE_CHECKING: from synapse.handlers.relations import BundledAggregations - from synapse.server import HomeServer # Split strings on "." but not "\." This uses a negative lookbehind assertion for '\' @@ -396,9 +395,6 @@ class EventClientSerializer: clients. """ - def __init__(self, hs: "HomeServer"): - self._msc3440_enabled = hs.config.experimental.msc3440_enabled - def serialize_event( self, event: Union[JsonDict, EventBase], @@ -406,6 +402,7 @@ class EventClientSerializer: *, config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG, bundle_aggregations: Optional[Dict[str, "BundledAggregations"]] = None, + apply_edits: bool = True, ) -> JsonDict: """Serializes a single event. @@ -413,10 +410,10 @@ class EventClientSerializer: event: The event being serialized. time_now: The current time in milliseconds config: Event serialization config - bundle_aggregations: Whether to include the bundled aggregations for this - event. Only applies to non-state events. (State events never include - bundled aggregations.) - + bundle_aggregations: A map from event_id to the aggregations to be bundled + into the event. + apply_edits: Whether the content of the event should be modified to reflect + any replacement in `bundle_aggregations[<event_id>].replace`. Returns: The serialized event """ @@ -434,8 +431,9 @@ class EventClientSerializer: event, time_now, config, - bundle_aggregations[event.event_id], + event_aggregations, serialized_event, + apply_edits=apply_edits, ) return serialized_event @@ -474,6 +472,7 @@ class EventClientSerializer: config: SerializeEventConfig, aggregations: "BundledAggregations", serialized_event: JsonDict, + apply_edits: bool, ) -> None: """Potentially injects bundled aggregations into the unsigned portion of the serialized event. @@ -483,7 +482,8 @@ class EventClientSerializer: aggregations: The bundled aggregation to serialize. serialized_event: The serialized event which may be modified. config: Event serialization config - + apply_edits: Whether the content of the event should be modified to reflect + any replacement in `aggregations.replace`. """ serialized_aggregations = {} @@ -494,9 +494,10 @@ class EventClientSerializer: serialized_aggregations[RelationTypes.REFERENCE] = aggregations.references if aggregations.replace: - # If there is an edit, apply it to the event. + # If there is an edit, optionally apply it to the event. edit = aggregations.replace - self._apply_edit(event, serialized_event, edit) + if apply_edits: + self._apply_edit(event, serialized_event, edit) # Include information about it in the relations dict. serialized_aggregations[RelationTypes.REPLACE] = { @@ -525,8 +526,6 @@ class EventClientSerializer: "current_user_participated": thread.current_user_participated, } serialized_aggregations[RelationTypes.THREAD] = thread_summary - if self._msc3440_enabled: - serialized_aggregations[RelationTypes.UNSTABLE_THREAD] = thread_summary # Include the bundled aggregations in the event. if serialized_aggregations: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 69d833585f..beab1227b8 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -515,7 +515,7 @@ class FederationServer(FederationBase): ) async def on_room_state_request( - self, origin: str, room_id: str, event_id: Optional[str] + self, origin: str, room_id: str, event_id: str ) -> Tuple[int, JsonDict]: origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) @@ -530,18 +530,13 @@ class FederationServer(FederationBase): # - but that's non-trivial to get right, and anyway somewhat defeats # the point of the linearizer. async with self._server_linearizer.queue((origin, room_id)): - resp: JsonDict = dict( - await self._state_resp_cache.wrap( - (room_id, event_id), - self._on_context_state_request_compute, - room_id, - event_id, - ) + resp = await self._state_resp_cache.wrap( + (room_id, event_id), + self._on_context_state_request_compute, + room_id, + event_id, ) - room_version = await self.store.get_room_version_id(room_id) - resp["room_version"] = room_version - return 200, resp async def on_state_ids_request( @@ -574,14 +569,11 @@ class FederationServer(FederationBase): return {"pdu_ids": state_ids, "auth_chain_ids": list(auth_chain_ids)} async def _on_context_state_request_compute( - self, room_id: str, event_id: Optional[str] + self, room_id: str, event_id: str ) -> Dict[str, list]: pdus: Collection[EventBase] - if event_id: - event_ids = await self.handler.get_state_ids_for_pdu(room_id, event_id) - pdus = await self.store.get_events_as_list(event_ids) - else: - pdus = (await self.state.get_current_state(room_id)).values() + event_ids = await self.handler.get_state_ids_for_pdu(room_id, event_id) + pdus = await self.store.get_events_as_list(event_ids) auth_chain = await self.store.get_auth_chain( room_id, [pdu.event_id for pdu in pdus] @@ -687,8 +679,6 @@ class FederationServer(FederationBase): time_now = self._clock.time_msec() event_json = event.get_pdu_json(time_now) resp = { - # TODO Remove the unstable prefix when servers have updated. - "org.matrix.msc3083.v2.event": event_json, "event": event_json, "state": [p.get_pdu_json(time_now) for p in state_events], "auth_chain": [p.get_pdu_json(time_now) for p in auth_chain_events], diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 01dc5ca94f..1421050b9a 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py
@@ -1380,16 +1380,6 @@ class SendJoinParser(ByteParser[SendJoinResponse]): prefix + "auth_chain.item", use_float=True, ), - # TODO Remove the unstable prefix when servers have updated. - # - # By re-using the same event dictionary this will cause the parsing of - # org.matrix.msc3083.v2.event and event to stomp over each other. - # Generally this should be fine. - ijson.kvitems_coro( - _event_parser(self._response.event_dict), - prefix + "org.matrix.msc3083.v2.event", - use_float=True, - ), ijson.kvitems_coro( _event_parser(self._response.event_dict), prefix + "event", diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index 2529dee613..d629a3ecb5 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py
@@ -16,7 +16,8 @@ import functools import logging import re import time -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional, Tuple, cast +from http import HTTPStatus +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional, Tuple, cast from synapse.api.errors import Codes, FederationDeniedError, SynapseError from synapse.api.urls import FEDERATION_V1_PREFIX @@ -86,15 +87,24 @@ class Authenticator: if not auth_headers: raise NoAuthenticationError( - 401, "Missing Authorization headers", Codes.UNAUTHORIZED + HTTPStatus.UNAUTHORIZED, + "Missing Authorization headers", + Codes.UNAUTHORIZED, ) for auth in auth_headers: if auth.startswith(b"X-Matrix"): - (origin, key, sig) = _parse_auth_header(auth) + (origin, key, sig, destination) = _parse_auth_header(auth) json_request["origin"] = origin json_request["signatures"].setdefault(origin, {})[key] = sig + # if the origin_server sent a destination along it needs to match our own server_name + if destination is not None and destination != self.server_name: + raise AuthenticationError( + HTTPStatus.UNAUTHORIZED, + "Destination mismatch in auth header", + Codes.UNAUTHORIZED, + ) if ( self.federation_domain_whitelist is not None and origin not in self.federation_domain_whitelist @@ -103,7 +113,9 @@ class Authenticator: if origin is None or not json_request["signatures"]: raise NoAuthenticationError( - 401, "Missing Authorization headers", Codes.UNAUTHORIZED + HTTPStatus.UNAUTHORIZED, + "Missing Authorization headers", + Codes.UNAUTHORIZED, ) await self.keyring.verify_json_for_server( @@ -142,13 +154,14 @@ class Authenticator: logger.exception("Error resetting retry timings on %s", origin) -def _parse_auth_header(header_bytes: bytes) -> Tuple[str, str, str]: +def _parse_auth_header(header_bytes: bytes) -> Tuple[str, str, str, Optional[str]]: """Parse an X-Matrix auth header Args: header_bytes: header value Returns: + origin, key id, signature, destination. origin, key id, signature. Raises: @@ -157,7 +170,9 @@ def _parse_auth_header(header_bytes: bytes) -> Tuple[str, str, str]: try: header_str = header_bytes.decode("utf-8") params = header_str.split(" ")[1].split(",") - param_dict = {k: v for k, v in (kv.split("=", maxsplit=1) for kv in params)} + param_dict: Dict[str, str] = { + k: v for k, v in [param.split("=", maxsplit=1) for param in params] + } def strip_quotes(value: str) -> str: if value.startswith('"'): @@ -172,7 +187,15 @@ def _parse_auth_header(header_bytes: bytes) -> Tuple[str, str, str]: key = strip_quotes(param_dict["key"]) sig = strip_quotes(param_dict["sig"]) - return origin, key, sig + + # get the destination server_name from the auth header if it exists + destination = param_dict.get("destination") + if destination is not None: + destination = strip_quotes(destination) + else: + destination = None + + return origin, key, sig, destination except Exception as e: logger.warning( "Error parsing auth header '%s': %s", @@ -180,7 +203,7 @@ def _parse_auth_header(header_bytes: bytes) -> Tuple[str, str, str]: e, ) raise AuthenticationError( - 400, "Malformed Authorization header", Codes.UNAUTHORIZED + HTTPStatus.BAD_REQUEST, "Malformed Authorization header", Codes.UNAUTHORIZED ) diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index aed3d5069c..6fbc7b5f15 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py
@@ -160,7 +160,7 @@ class FederationStateV1Servlet(BaseFederationServerServlet): return await self.handler.on_room_state_request( origin, room_id, - parse_string_from_args(query, "event_id", None, required=False), + parse_string_from_args(query, "event_id", None, required=True), ) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index ffa28b2a30..3c0fc756d4 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py
@@ -291,12 +291,6 @@ class DeviceHandler(DeviceWorkerHandler): # On start up check if there are any updates pending. hs.get_reactor().callWhenRunning(self._handle_new_device_update_async) - # Used to decide if we calculate outbound pokes up front or not. By - # default we do to allow safely downgrading Synapse. - self.use_new_device_lists_changes_in_room = ( - hs.config.server.use_new_device_lists_changes_in_room - ) - def _check_device_name_length(self, name: Optional[str]) -> None: """ Checks whether a device name is longer than the maximum allowed length. @@ -490,23 +484,9 @@ class DeviceHandler(DeviceWorkerHandler): room_ids = await self.store.get_rooms_for_user(user_id) - hosts: Optional[Set[str]] = None - if not self.use_new_device_lists_changes_in_room: - hosts = set() - - if self.hs.is_mine_id(user_id): - for room_id in room_ids: - joined_users = await self.store.get_users_in_room(room_id) - hosts.update(get_domain_from_id(u) for u in joined_users) - - set_tag("target_hosts", hosts) - - hosts.discard(self.server_name) - position = await self.store.add_device_change_to_streams( user_id, device_ids, - hosts=hosts, room_ids=room_ids, ) @@ -528,14 +508,6 @@ class DeviceHandler(DeviceWorkerHandler): # We may need to do some processing asynchronously. self._handle_new_device_update_async() - if hosts: - logger.info( - "Sending device list update notif for %r to: %r", user_id, hosts - ) - for host in hosts: - self.federation_sender.send_device_messages(host, immediate=False) - log_kv({"message": "sent device update to host", "host": host}) - async def notify_user_signature_update( self, from_user_id: str, user_ids: List[str] ) -> None: @@ -677,9 +649,13 @@ class DeviceHandler(DeviceWorkerHandler): return for user_id, device_id, room_id, stream_id, opentracing_context in rows: - joined_user_ids = await self.store.get_users_in_room(room_id) - hosts = {get_domain_from_id(u) for u in joined_user_ids} - hosts.discard(self.server_name) + hosts = set() + + # Ignore any users that aren't ours + if self.hs.is_mine_id(user_id): + joined_user_ids = await self.store.get_users_in_room(room_id) + hosts = {get_domain_from_id(u) for u in joined_user_ids} + hosts.discard(self.server_name) # Check if we've already sent this update to some hosts if current_stream_id == stream_id: diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index d2ccb5c5d3..5b94b00bc3 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py
@@ -16,11 +16,12 @@ import logging import random from typing import TYPE_CHECKING, Iterable, List, Optional -from synapse.api.constants import EduTypes, EventTypes, Membership +from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState from synapse.api.errors import AuthError, SynapseError from synapse.events import EventBase from synapse.events.utils import SerializeEventConfig from synapse.handlers.presence import format_user_presence_state +from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.streams.config import PaginationConfig from synapse.types import JsonDict, UserID from synapse.visibility import filter_events_for_client @@ -67,7 +68,9 @@ class EventStreamHandler: presence_handler = self.hs.get_presence_handler() context = await presence_handler.user_syncing( - auth_user_id, affect_presence=affect_presence + auth_user_id, + affect_presence=affect_presence, + presence_state=PresenceState.ONLINE, ) with context: if timeout: @@ -139,7 +142,11 @@ class EventHandler: self.storage = hs.get_storage() async def get_event( - self, user: UserID, room_id: Optional[str], event_id: str + self, + user: UserID, + room_id: Optional[str], + event_id: str, + show_redacted: bool = False, ) -> Optional[EventBase]: """Retrieve a single specified event. @@ -148,6 +155,7 @@ class EventHandler: room_id: The expected room id. We'll return None if the event's room does not match. event_id: The event ID to obtain. + show_redacted: Should the full content of redacted events be returned? Returns: An event, or None if there is no event matching this ID. Raises: @@ -155,7 +163,12 @@ class EventHandler: AuthError if the user does not have the rights to inspect this event. """ - event = await self.store.get_event(event_id, check_room_id=room_id) + redact_behaviour = ( + EventRedactBehaviour.AS_IS if show_redacted else EventRedactBehaviour.REDACT + ) + event = await self.store.get_event( + event_id, check_room_id=room_id, redact_behaviour=redact_behaviour + ) if not event: return None diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 78d149905f..1434e99056 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -466,6 +466,8 @@ class FederationHandler: ) if ret.partial_state: + # TODO(faster_joins): roll this back if we don't manage to start the + # background resync (eg process_remote_join fails) await self.store.store_partial_state_room(room_id, ret.servers_in_room) max_stream_id = await self._federation_event_handler.process_remote_join( @@ -478,6 +480,18 @@ class FederationHandler: partial_state=ret.partial_state, ) + if ret.partial_state: + # Kick off the process of asynchronously fetching the state for this + # room. + # + # TODO(faster_joins): pick this up again on restart + run_as_background_process( + desc="sync_partial_state_room", + func=self._sync_partial_state_room, + destination=origin, + room_id=room_id, + ) + # We wait here until this instance has seen the events come down # replication (if we're using replication) as the below uses caches. await self._replication.wait_for_stream_position( @@ -1370,3 +1384,64 @@ class FederationHandler: # We fell off the bottom, couldn't get the complexity from anyone. Oh # well. return None + + async def _sync_partial_state_room( + self, + destination: str, + room_id: str, + ) -> None: + """Background process to resync the state of a partial-state room + + Args: + destination: homeserver to pull the state from + room_id: room to be resynced + """ + + # TODO(faster_joins): do we need to lock to avoid races? What happens if other + # worker processes kick off a resync in parallel? Perhaps we should just elect + # a single worker to do the resync. + # + # TODO(faster_joins): what happens if we leave the room during a resync? if we + # really leave, that might mean we have difficulty getting the room state over + # federation. + # + # TODO(faster_joins): try other destinations if the one we have fails + + logger.info("Syncing state for room %s via %s", room_id, destination) + + # we work through the queue in order of increasing stream ordering. + while True: + batch = await self.store.get_partial_state_events_batch(room_id) + if not batch: + # all the events are updated, so we can update current state and + # clear the lazy-loading flag. + logger.info("Updating current state for %s", room_id) + assert ( + self.storage.persistence is not None + ), "TODO(faster_joins): support for workers" + await self.storage.persistence.update_current_state(room_id) + + logger.info("Clearing partial-state flag for %s", room_id) + success = await self.store.clear_partial_state_room(room_id) + if success: + logger.info("State resync complete for %s", room_id) + + # TODO(faster_joins) update room stats and user directory? + return + + # we raced against more events arriving with partial state. Go round + # the loop again. We've already logged a warning, so no need for more. + # TODO(faster_joins): there is still a race here, whereby incoming events which raced + # with us will fail to be persisted after the call to `clear_partial_state_room` due to + # having partial state. + continue + + events = await self.store.get_events_as_list( + batch, + redact_behaviour=EventRedactBehaviour.AS_IS, + allow_rejected=True, + ) + for event in events: + await self._federation_event_handler.update_state_for_partial_state_event( + destination, event + ) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 03c1197c99..32bf02818c 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py
@@ -477,6 +477,45 @@ class FederationEventHandler: return await self.persist_events_and_notify(room_id, [(event, context)]) + async def update_state_for_partial_state_event( + self, destination: str, event: EventBase + ) -> None: + """Recalculate the state at an event as part of a de-partial-stating process + + Args: + destination: server to request full state from + event: partial-state event to be de-partial-stated + """ + logger.info("Updating state for %s", event.event_id) + with nested_logging_context(suffix=event.event_id): + # if we have all the event's prev_events, then we can work out the + # state based on their states. Otherwise, we request it from the destination + # server. + # + # This is the same operation as we do when we receive a regular event + # over federation. + state = await self._resolve_state_at_missing_prevs(destination, event) + + # build a new state group for it if need be + context = await self._state_handler.compute_event_context( + event, + old_state=state, + ) + if context.partial_state: + # this can happen if some or all of the event's prev_events still have + # partial state - ie, an event has an earlier stream_ordering than one + # or more of its prev_events, so we de-partial-state it before its + # prev_events. + # + # TODO(faster_joins): we probably need to be more intelligent, and + # exclude partial-state prev_events from consideration + logger.warning( + "%s still has partial state: can't de-partial-state it yet", + event.event_id, + ) + return + await self._store.update_state_for_partial_state_event(event, context) + async def backfill( self, dest: str, room_id: str, limit: int, extremities: Collection[str] ) -> None: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7db6905c61..1b092e900e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -175,17 +175,13 @@ class MessageHandler: state_filter = state_filter or StateFilter.all() if at_token: - # FIXME this claims to get the state at a stream position, but - # get_recent_events_for_room operates by topo ordering. This therefore - # does not reliably give you the state at the given stream position. - # (https://github.com/matrix-org/synapse/issues/3305) - last_events, _ = await self.store.get_recent_events_for_room( - room_id, end_token=at_token.room_key, limit=1 + last_event = await self.store.get_last_event_in_room_before_stream_ordering( + room_id, + end_token=at_token.room_key, ) - if not last_events: + if not last_event: raise NotFoundError("Can't find event for token %s" % (at_token,)) - last_event = last_events[0] # check whether the user is in the room at that time to determine # whether they should be treated as peeking. @@ -204,7 +200,7 @@ class MessageHandler: visible_events = await filter_events_for_client( self.storage, user_id, - last_events, + [last_event], filter_send_to_client=False, is_peeking=is_peeking, ) @@ -1102,10 +1098,7 @@ class EventCreationHandler: raise SynapseError(400, "Can't send same reaction twice") # Don't attempt to start a thread if the parent event is a relation. - elif ( - relation_type == RelationTypes.THREAD - or relation_type == RelationTypes.UNSTABLE_THREAD - ): + elif relation_type == RelationTypes.THREAD: if await self.store.event_includes_relation(relates_to): raise SynapseError( 400, "Cannot start threads from an event with a relation" diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 209a4b0e52..d078162c29 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py
@@ -151,7 +151,7 @@ class BasePresenceHandler(abc.ABC): @abc.abstractmethod async def user_syncing( - self, user_id: str, affect_presence: bool + self, user_id: str, affect_presence: bool, presence_state: str ) -> ContextManager[None]: """Returns a context manager that should surround any stream requests from the user. @@ -165,6 +165,7 @@ class BasePresenceHandler(abc.ABC): affect_presence: If false this function will be a no-op. Useful for streams that are not associated with an actual client that is being used by a user. + presence_state: The presence state indicated in the sync request """ @abc.abstractmethod @@ -228,6 +229,11 @@ class BasePresenceHandler(abc.ABC): return states + async def current_state_for_user(self, user_id: str) -> UserPresenceState: + """Get the current presence state for a user.""" + res = await self.current_state_for_users([user_id]) + return res[user_id] + @abc.abstractmethod async def set_state( self, @@ -461,7 +467,7 @@ class WorkerPresenceHandler(BasePresenceHandler): self.send_user_sync(user_id, False, last_sync_ms) async def user_syncing( - self, user_id: str, affect_presence: bool + self, user_id: str, affect_presence: bool, presence_state: str ) -> ContextManager[None]: """Record that a user is syncing. @@ -471,6 +477,17 @@ class WorkerPresenceHandler(BasePresenceHandler): if not affect_presence or not self._presence_enabled: return _NullContextManager() + prev_state = await self.current_state_for_user(user_id) + if prev_state != PresenceState.BUSY: + # We set state here but pass ignore_status_msg = True as we don't want to + # cause the status message to be cleared. + # Note that this causes last_active_ts to be incremented which is not + # what the spec wants: see comment in the BasePresenceHandler version + # of this function. + await self.set_state( + UserID.from_string(user_id), {"presence": presence_state}, True + ) + curr_sync = self._user_to_num_current_syncs.get(user_id, 0) self._user_to_num_current_syncs[user_id] = curr_sync + 1 @@ -942,7 +959,10 @@ class PresenceHandler(BasePresenceHandler): await self._update_states([prev_state.copy_and_replace(**new_fields)]) async def user_syncing( - self, user_id: str, affect_presence: bool = True + self, + user_id: str, + affect_presence: bool = True, + presence_state: str = PresenceState.ONLINE, ) -> ContextManager[None]: """Returns a context manager that should surround any stream requests from the user. @@ -956,6 +976,7 @@ class PresenceHandler(BasePresenceHandler): affect_presence: If false this function will be a no-op. Useful for streams that are not associated with an actual client that is being used by a user. + presence_state: The presence state indicated in the sync request """ # Override if it should affect the user's presence, if presence is # disabled. @@ -967,9 +988,25 @@ class PresenceHandler(BasePresenceHandler): self.user_to_num_current_syncs[user_id] = curr_sync + 1 prev_state = await self.current_state_for_user(user_id) + + # If they're busy then they don't stop being busy just by syncing, + # so just update the last sync time. + if prev_state.state != PresenceState.BUSY: + # XXX: We set_state separately here and just update the last_active_ts above + # This keeps the logic as similar as possible between the worker and single + # process modes. Using set_state will actually cause last_active_ts to be + # updated always, which is not what the spec calls for, but synapse has done + # this for... forever, I think. + await self.set_state( + UserID.from_string(user_id), {"presence": presence_state}, True + ) + # Retrieve the new state for the logic below. This should come from the + # in-memory cache. + prev_state = await self.current_state_for_user(user_id) + + # To keep the single process behaviour consistent with worker mode, run the + # same logic as `update_external_syncs_row`, even though it looks weird. if prev_state.state == PresenceState.OFFLINE: - # If they're currently offline then bring them online, otherwise - # just update the last sync times. await self._update_states( [ prev_state.copy_and_replace( @@ -979,6 +1016,10 @@ class PresenceHandler(BasePresenceHandler): ) ] ) + # otherwise, set the new presence state & update the last sync time, + # but don't update last_active_ts as this isn't an indication that + # they've been active (even though it's probably been updated by + # set_state above) else: await self._update_states( [ @@ -1086,11 +1127,6 @@ class PresenceHandler(BasePresenceHandler): ) self.external_process_last_updated_ms.pop(process_id, None) - async def current_state_for_user(self, user_id: str) -> UserPresenceState: - """Get the current presence state for a user.""" - res = await self.current_state_for_users([user_id]) - return res[user_id] - async def _persist_and_notify(self, states: List[UserPresenceState]) -> None: """Persist states in the database, poke the notifier and send to interested remote servers diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6c8b17c420..5125126a80 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -661,16 +661,15 @@ class SyncHandler: stream_position: point at which to get state state_filter: The state filter used to fetch state from the database. """ - # FIXME this claims to get the state at a stream position, but - # get_recent_events_for_room operates by topo ordering. This therefore - # does not reliably give you the state at the given stream position. - # (https://github.com/matrix-org/synapse/issues/3305) - last_events, _ = await self.store.get_recent_events_for_room( - room_id, end_token=stream_position.room_key, limit=1 + # FIXME: This gets the state at the latest event before the stream ordering, + # which might not be the same as the "current state" of the room at the time + # of the stream token if there were multiple forward extremities at the time. + last_event = await self.store.get_last_event_in_room_before_stream_ordering( + room_id, + end_token=stream_position.room_key, ) - if last_events: - last_event = last_events[-1] + if last_event: state = await self.get_state_after_event( last_event, state_filter=state_filter or StateFilter.all() ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 5097b3ca57..e686445955 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py
@@ -704,6 +704,9 @@ class MatrixFederationHttpClient: Returns: A list of headers to be added as "Authorization:" headers """ + if destination is None and destination_is is None: + raise ValueError("destination and destination_is cannot both be None!") + request: JsonDict = { "method": method.decode("ascii"), "uri": url_bytes.decode("ascii"), @@ -726,8 +729,13 @@ class MatrixFederationHttpClient: for key, sig in request["signatures"][self.server_name].items(): auth_headers.append( ( - 'X-Matrix origin=%s,key="%s",sig="%s"' - % (self.server_name, key, sig) + 'X-Matrix origin=%s,key="%s",sig="%s",destination="%s"' + % ( + self.server_name, + key, + sig, + request.get("destination") or request["destination_is"], + ) ).encode("ascii") ) return auth_headers diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py deleted file mode 100644
index ec199a161d..0000000000 --- a/synapse/python_dependencies.py +++ /dev/null
@@ -1,151 +0,0 @@ -# Copyright 2015, 2016 OpenMarket Ltd -# Copyright 2017 Vector Creations Ltd -# Copyright 2018 New Vector Ltd -# Copyright 2020 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import itertools -import logging -from typing import Set - -logger = logging.getLogger(__name__) - - -# REQUIREMENTS is a simple list of requirement specifiers[1], and must be -# installed. It is passed to setup() as install_requires in setup.py. -# -# CONDITIONAL_REQUIREMENTS is the optional dependencies, represented as a dict -# of lists. The dict key is the optional dependency name and can be passed to -# pip when installing. The list is a series of requirement specifiers[1] to be -# installed when that optional dependency requirement is specified. It is passed -# to setup() as extras_require in setup.py -# -# Note that these both represent runtime dependencies (and the versions -# installed are checked at runtime). -# -# Also note that we replicate these constraints in the Synapse Dockerfile while -# pre-installing dependencies. If these constraints are updated here, the same -# change should be made in the Dockerfile. -# -# [1] https://pip.pypa.io/en/stable/reference/pip_install/#requirement-specifiers. - -REQUIREMENTS = [ - # we use the TYPE_CHECKER.redefine method added in jsonschema 3.0.0 - "jsonschema>=3.0.0", - # frozendict 2.1.2 is broken on Debian 10: https://github.com/Marco-Sulla/python-frozendict/issues/41 - "frozendict>=1,!=2.1.2", - "unpaddedbase64>=1.1.0", - "canonicaljson>=1.4.0", - # we use the type definitions added in signedjson 1.1. - "signedjson>=1.1.0", - "pynacl>=1.2.1", - # validating SSL certs for IP addresses requires service_identity 18.1. - "service_identity>=18.1.0", - # Twisted 18.9 introduces some logger improvements that the structured - # logger utilises - "Twisted[tls]>=18.9.0", - "treq>=15.1", - # Twisted has required pyopenssl 16.0 since about Twisted 16.6. - "pyopenssl>=16.0.0", - "pyyaml>=3.11", - "pyasn1>=0.1.9", - "pyasn1-modules>=0.0.7", - "bcrypt>=3.1.0", - "pillow>=5.4.0", - "sortedcontainers>=1.4.4", - "pymacaroons>=0.13.0", - "msgpack>=0.5.2", - "phonenumbers>=8.2.0", - # we use GaugeHistogramMetric, which was added in prom-client 0.4.0. - "prometheus_client>=0.4.0", - # we use `order`, which arrived in attrs 19.2.0. - # Note: 21.1.0 broke `/sync`, see #9936 - "attrs>=19.2.0,!=21.1.0", - "netaddr>=0.7.18", - # Jinja 2.x is incompatible with MarkupSafe>=2.1. To ensure that admins do not - # end up with a broken installation, with recent MarkupSafe but old Jinja, we - # add a lower bound to the Jinja2 dependency. - "Jinja2>=3.0", - "bleach>=1.4.3", - # We use `ParamSpec`, which was added in `typing-extensions` 3.10.0.0. - "typing-extensions>=3.10.0", - # We enforce that we have a `cryptography` version that bundles an `openssl` - # with the latest security patches. - "cryptography>=3.4.7", - # ijson 3.1.4 fixes a bug with "." in property names - "ijson>=3.1.4", - "matrix-common~=1.1.0", - # We need packaging.requirements.Requirement, added in 16.1. - "packaging>=16.1", - # At the time of writing, we only use functions from the version `importlib.metadata` - # which shipped in Python 3.8. This corresponds to version 1.4 of the backport. - "importlib_metadata>=1.4 ; python_version < '3.8'", -] - -CONDITIONAL_REQUIREMENTS = { - "matrix-synapse-ldap3": ["matrix-synapse-ldap3>=0.1"], - "postgres": [ - # we use execute_values with the fetch param, which arrived in psycopg 2.8. - "psycopg2>=2.8 ; platform_python_implementation != 'PyPy'", - "psycopg2cffi>=2.8 ; platform_python_implementation == 'PyPy'", - "psycopg2cffi-compat==1.1 ; platform_python_implementation == 'PyPy'", - ], - "saml2": [ - "pysaml2>=4.5.0", - ], - "oidc": ["authlib>=0.14.0"], - # systemd-python is necessary for logging to the systemd journal via - # `systemd.journal.JournalHandler`, as is documented in - # `contrib/systemd/log_config.yaml`. - "systemd": ["systemd-python>=231"], - "url_preview": ["lxml>=4.2.0"], - "sentry": ["sentry-sdk>=0.7.2"], - "opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"], - "jwt": ["pyjwt>=1.6.4"], - # hiredis is not a *strict* dependency, but it makes things much faster. - # (if it is not installed, we fall back to slow code.) - "redis": ["txredisapi>=1.4.7", "hiredis"], - # Required to use experimental `caches.track_memory_usage` config option. - "cache_memory": ["pympler"], -} - -ALL_OPTIONAL_REQUIREMENTS: Set[str] = set() - -for name, optional_deps in CONDITIONAL_REQUIREMENTS.items(): - # Exclude systemd as it's a system-based requirement. - # Exclude lint as it's a dev-based requirement. - if name not in ["systemd"]: - ALL_OPTIONAL_REQUIREMENTS = set(optional_deps) | ALL_OPTIONAL_REQUIREMENTS - - -# ensure there are no double-quote characters in any of the deps (otherwise the -# 'pip install' incantation in DependencyException will break) -for dep in itertools.chain( - REQUIREMENTS, - *CONDITIONAL_REQUIREMENTS.values(), -): - if '"' in dep: - raise Exception( - "Dependency `%s` contains double-quote; use single-quotes instead" % (dep,) - ) - - -def list_requirements(): - return list(set(REQUIREMENTS) | ALL_OPTIONAL_REQUIREMENTS) - - -if __name__ == "__main__": - import sys - - sys.stdout.writelines(req + "\n" for req in list_requirements()) diff --git a/synapse/res/templates/notif.html b/synapse/res/templates/notif.html
index 0aaef97df8..7d86681fed 100644 --- a/synapse/res/templates/notif.html +++ b/synapse/res/templates/notif.html
@@ -30,7 +30,7 @@ {%- elif message.msgtype == "m.notice" %} {{ message.body_text_html }} {%- elif message.msgtype == "m.image" and message.image_url %} - <img src="{{ message.image_url|mxc_to_http(640, 480, scale) }}" /> + <img src="{{ message.image_url|mxc_to_http(640, 480, 'scale') }}" /> {%- elif message.msgtype == "m.file" %} <span class="filename">{{ message.body_text_plain }}</span> {%- else %} diff --git a/synapse/rest/client/login.py b/synapse/rest/client/login.py
index c9d44c5964..4a4dbe75de 100644 --- a/synapse/rest/client/login.py +++ b/synapse/rest/client/login.py
@@ -342,6 +342,15 @@ class LoginRestServlet(RestServlet): user_id = canonical_uid device_id = login_submission.get("device_id") + + # If device_id is present, check that device_id is not longer than a reasonable 512 characters + if device_id and len(device_id) > 512: + raise LoginError( + 400, + "device_id cannot be longer than 512 characters.", + errcode=Codes.INVALID_PARAM, + ) + initial_display_name = login_submission.get("initial_device_display_name") ( device_id, diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 47e152c8cc..906fe09e97 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py
@@ -21,6 +21,7 @@ from urllib import parse as urlparse from twisted.web.server import Request +from synapse import event_auth from synapse.api.constants import EventTypes, Membership from synapse.api.errors import ( AuthError, @@ -29,6 +30,7 @@ from synapse.api.errors import ( MissingClientTokenError, ShadowBanError, SynapseError, + UnredactedContentDeletedError, ) from synapse.api.filtering import Filter from synapse.events.utils import format_event_for_client_v2 @@ -643,18 +645,55 @@ class RoomEventServlet(RestServlet): super().__init__() self.clock = hs.get_clock() self._store = hs.get_datastores().main + self._state = hs.get_state_handler() self.event_handler = hs.get_event_handler() self._event_serializer = hs.get_event_client_serializer() self._relations_handler = hs.get_relations_handler() self.auth = hs.get_auth() + self.content_keep_ms = hs.config.server.redaction_retention_period + self.msc2815_enabled = hs.config.experimental.msc2815_enabled async def on_GET( self, request: SynapseRequest, room_id: str, event_id: str ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) + + include_unredacted_content = self.msc2815_enabled and ( + parse_string( + request, + "fi.mau.msc2815.include_unredacted_content", + allowed_values=("true", "false"), + ) + == "true" + ) + if include_unredacted_content and not await self.auth.is_server_admin( + requester.user + ): + power_level_event = await self._state.get_current_state( + room_id, EventTypes.PowerLevels, "" + ) + + auth_events = {} + if power_level_event: + auth_events[(EventTypes.PowerLevels, "")] = power_level_event + + redact_level = event_auth.get_named_level(auth_events, "redact", 50) + user_level = event_auth.get_user_power_level( + requester.user.to_string(), auth_events + ) + if user_level < redact_level: + raise SynapseError( + 403, + "You don't have permission to view redacted events in this room.", + errcode=Codes.FORBIDDEN, + ) + try: event = await self.event_handler.get_event( - requester.user, room_id, event_id + requester.user, + room_id, + event_id, + show_redacted=include_unredacted_content, ) except AuthError: # This endpoint is supposed to return a 404 when the requester does @@ -663,14 +702,21 @@ class RoomEventServlet(RestServlet): raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND) if event: + if include_unredacted_content and await self._store.have_censored_event( + event_id + ): + raise UnredactedContentDeletedError(self.content_keep_ms) + # Ensure there are bundled aggregations available. aggregations = await self._relations_handler.get_bundled_aggregations( [event], requester.user.to_string() ) time_now = self.clock.time_msec() + # per MSC2676, /rooms/{roomId}/event/{eventId}, should return the + # *original* event, rather than the edited version event_dict = self._event_serializer.serialize_event( - event, time_now, bundle_aggregations=aggregations + event, time_now, bundle_aggregations=aggregations, apply_edits=False ) return 200, event_dict diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 2e25e8638b..e8772f86e7 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py
@@ -180,13 +180,10 @@ class SyncRestServlet(RestServlet): affect_presence = set_presence != PresenceState.OFFLINE - if affect_presence: - await self.presence_handler.set_state( - user, {"presence": set_presence}, True - ) - context = await self.presence_handler.user_syncing( - user.to_string(), affect_presence=affect_presence + user.to_string(), + affect_presence=affect_presence, + presence_state=set_presence, ) with context: sync_result = await self.sync_handler.wait_for_sync_for_user( diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 9a65aa4843..bfc1d4ee08 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py
@@ -86,7 +86,7 @@ class VersionsRestServlet(RestServlet): # Implements additional endpoints as described in MSC2432 "org.matrix.msc2432": True, # Implements additional endpoints as described in MSC2666 - "uk.half-shot.msc2666": True, + "uk.half-shot.msc2666.mutual_rooms": True, # Whether new rooms will be set to encrypted or not (based on presets). "io.element.e2ee_forced.public": self.e2ee_forced_public, "io.element.e2ee_forced.private": self.e2ee_forced_private, @@ -100,8 +100,9 @@ class VersionsRestServlet(RestServlet): # Adds support for jump to date endpoints (/timestamp_to_event) as per MSC3030 "org.matrix.msc3030": self.config.experimental.msc3030_enabled, # Adds support for thread relations, per MSC3440. - "org.matrix.msc3440": self.config.experimental.msc3440_enabled, "org.matrix.msc3440.stable": True, # TODO: remove when "v1.3" is added above + # Allows moderators to fetch redacted event content as described in MSC2815 + "fi.mau.msc2815": self.config.experimental.msc2815_enabled, }, }, ) diff --git a/synapse/server.py b/synapse/server.py
index 380369db92..37c72bd83a 100644 --- a/synapse/server.py +++ b/synapse/server.py
@@ -758,7 +758,7 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_event_client_serializer(self) -> EventClientSerializer: - return EventClientSerializer(self) + return EventClientSerializer() @cache_in_self def get_password_policy_handler(self) -> PasswordPolicyHandler: diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index dc8009b23d..318e4df376 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py
@@ -1582,7 +1582,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self, user_id: str, device_ids: Collection[str], - hosts: Optional[Collection[str]], room_ids: Collection[str], ) -> Optional[int]: """Persist that a user's devices have been updated, and which hosts @@ -1592,9 +1591,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): user_id: The ID of the user whose device changed. device_ids: The IDs of any changed devices. If empty, this function will return None. - hosts: The remote destinations that should be notified of the change. If - None then the set of hosts have *not* been calculated, and will be - calculated later by a background task. room_ids: The rooms that the user is in Returns: @@ -1606,14 +1602,12 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): context = get_active_span_text_map() - def add_device_changes_txn( - txn, stream_ids_for_device_change, stream_ids_for_outbound_pokes - ): + def add_device_changes_txn(txn, stream_ids): self._add_device_change_to_stream_txn( txn, user_id, device_ids, - stream_ids_for_device_change, + stream_ids, ) self._add_device_outbound_room_poke_txn( @@ -1621,43 +1615,17 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): user_id, device_ids, room_ids, - stream_ids_for_device_change, - context, - hosts_have_been_calculated=hosts is not None, - ) - - # If the set of hosts to send to has not been calculated yet (and so - # `hosts` is None) or there are no `hosts` to send to, then skip - # trying to persist them to the DB. - if not hosts: - return - - self._add_device_outbound_poke_to_stream_txn( - txn, - user_id, - device_ids, - hosts, - stream_ids_for_outbound_pokes, + stream_ids, context, ) - # `device_lists_stream` wants a stream ID per device update. - num_stream_ids = len(device_ids) - - if hosts: - # `device_lists_outbound_pokes` wants a different stream ID for - # each row, which is a row per host per device update. - num_stream_ids += len(hosts) * len(device_ids) - - async with self._device_list_id_gen.get_next_mult(num_stream_ids) as stream_ids: - stream_ids_for_device_change = stream_ids[: len(device_ids)] - stream_ids_for_outbound_pokes = stream_ids[len(device_ids) :] - + async with self._device_list_id_gen.get_next_mult( + len(device_ids) + ) as stream_ids: await self.db_pool.runInteraction( "add_device_change_to_stream", add_device_changes_txn, - stream_ids_for_device_change, - stream_ids_for_outbound_pokes, + stream_ids, ) return stream_ids[-1] @@ -1735,7 +1703,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): next(stream_id_iterator), user_id, device_id, - False, + not self.hs.is_mine_id( + user_id + ), # We only need to send out update for *our* users now, encoded_context if whitelisted_homeserver(destination) else "{}", ) @@ -1752,19 +1722,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): room_ids: Collection[str], stream_ids: List[str], context: Dict[str, str], - hosts_have_been_calculated: bool, ) -> None: - """Record the user in the room has updated their device. - - Args: - hosts_have_been_calculated: True if `device_lists_outbound_pokes` - has been updated already with the updates. - """ - - # We only need to convert to outbound pokes if they are our user. - converted_to_destinations = ( - hosts_have_been_calculated or not self.hs.is_mine_id(user_id) - ) + """Record the user in the room has updated their device.""" encoded_context = json_encoder.encode(context) @@ -1789,7 +1748,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): device_id, room_id, stream_id, - converted_to_destinations, + False, encoded_context, ) for room_id in room_ids diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 3fcd5f5b99..2a1e567ce0 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -963,6 +963,21 @@ class PersistEventsStore: values=to_insert, ) + async def update_current_state( + self, + room_id: str, + state_delta: DeltaState, + stream_id: int, + ) -> None: + """Update the current state stored in the datatabase for the given room""" + + await self.db_pool.runInteraction( + "update_current_state", + self._update_current_state_txn, + state_delta_by_room={room_id: state_delta}, + stream_id=stream_id, + ) + def _update_current_state_txn( self, txn: LoggingTransaction, @@ -1819,10 +1834,7 @@ class PersistEventsStore: if rel_type == RelationTypes.REPLACE: txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,)) - if ( - rel_type == RelationTypes.THREAD - or rel_type == RelationTypes.UNSTABLE_THREAD - ): + if rel_type == RelationTypes.THREAD: txn.call_after(self.store.get_thread_summary.invalidate, (parent_id,)) # It should be safe to only invalidate the cache if the user has not # previously participated in the thread, but that's difficult (and diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py
index 68901b4335..f851bff604 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py
@@ -66,13 +66,15 @@ class EventForwardExtremitiesStore( """ txn.execute(sql, (event_id, room_id)) + + deleted_count = txn.rowcount logger.info( "Deleted %s extra forward extremities for room %s", - txn.rowcount, + deleted_count, room_id, ) - if txn.rowcount > 0: + if deleted_count > 0: # Invalidate the cache self._invalidate_cache_and_stream( txn, @@ -80,7 +82,7 @@ class EventForwardExtremitiesStore( (room_id,), ) - return txn.rowcount + return deleted_count return await self.db_pool.runInteraction( "delete_forward_extremities_for_room", diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index a60e3f4fdd..60876204bd 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -303,6 +303,24 @@ class EventsWorkerStore(SQLBaseStore): desc="get_received_ts", ) + async def have_censored_event(self, event_id: str) -> bool: + """Check if an event has been censored, i.e. if the content of the event has been erased + from the database due to a redaction. + + Args: + event_id: The event ID that was redacted. + + Returns: + True if the event has been censored, False otherwise. + """ + censored_redactions_list = await self.db_pool.simple_select_onecol( + table="redactions", + keyvalues={"redacts": event_id}, + retcol="have_censored", + desc="get_have_censored", + ) + return any(censored_redactions_list) + # Inform mypy that if allow_none is False (the default) then get_event # always returns an EventBase. @overload @@ -1979,3 +1997,27 @@ class EventsWorkerStore(SQLBaseStore): desc="is_partial_state_event", ) return result is not None + + async def get_partial_state_events_batch(self, room_id: str) -> List[str]: + """Get a list of events in the given room that have partial state""" + return await self.db_pool.runInteraction( + "get_partial_state_events_batch", + self._get_partial_state_events_batch_txn, + room_id, + ) + + @staticmethod + def _get_partial_state_events_batch_txn( + txn: LoggingTransaction, room_id: str + ) -> List[str]: + txn.execute( + """ + SELECT event_id FROM partial_state_events AS pse + JOIN events USING (event_id) + WHERE pse.room_id = ? + ORDER BY events.stream_ordering + LIMIT 100 + """, + (room_id,), + ) + return [row[0] for row in txn] diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 407158ceee..a5c31f6787 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py
@@ -14,7 +14,6 @@ import logging from typing import ( - TYPE_CHECKING, Collection, Dict, FrozenSet, @@ -32,20 +31,12 @@ import attr from synapse.api.constants import RelationTypes from synapse.events import EventBase from synapse.storage._base import SQLBaseStore -from synapse.storage.database import ( - DatabasePool, - LoggingDatabaseConnection, - LoggingTransaction, - make_in_list_sql_clause, -) +from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause from synapse.storage.databases.main.stream import generate_pagination_where_clause from synapse.storage.engines import PostgresEngine from synapse.types import JsonDict, RoomStreamToken, StreamToken from synapse.util.caches.descriptors import cached, cachedList -if TYPE_CHECKING: - from synapse.server import HomeServer - logger = logging.getLogger(__name__) @@ -63,16 +54,6 @@ class _RelatedEvent: class RelationsWorkerStore(SQLBaseStore): - def __init__( - self, - database: DatabasePool, - db_conn: LoggingDatabaseConnection, - hs: "HomeServer", - ): - super().__init__(database, db_conn, hs) - - self._msc3440_enabled = hs.config.experimental.msc3440_enabled - @cached(uncached_args=("event",), tree=True) async def get_relations_for_event( self, @@ -497,7 +478,7 @@ class RelationsWorkerStore(SQLBaseStore): AND parent.room_id = child.room_id WHERE %s - AND %s + AND relation_type = ? ORDER BY parent.event_id, child.topological_ordering DESC, child.stream_ordering DESC """ else: @@ -512,22 +493,16 @@ class RelationsWorkerStore(SQLBaseStore): AND parent.room_id = child.room_id WHERE %s - AND %s + AND relation_type = ? ORDER BY child.topological_ordering DESC, child.stream_ordering DESC """ clause, args = make_in_list_sql_clause( txn.database_engine, "relates_to_id", event_ids ) + args.append(RelationTypes.THREAD) - if self._msc3440_enabled: - relations_clause = "(relation_type = ? OR relation_type = ?)" - args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD)) - else: - relations_clause = "relation_type = ?" - args.append(RelationTypes.THREAD) - - txn.execute(sql % (clause, relations_clause), args) + txn.execute(sql % (clause,), args) latest_event_ids = {} for parent_event_id, child_event_id in txn: # Only consider the latest threaded reply (by topological ordering). @@ -547,7 +522,7 @@ class RelationsWorkerStore(SQLBaseStore): AND parent.room_id = child.room_id WHERE %s - AND %s + AND relation_type = ? GROUP BY parent.event_id """ @@ -556,15 +531,9 @@ class RelationsWorkerStore(SQLBaseStore): clause, args = make_in_list_sql_clause( txn.database_engine, "relates_to_id", latest_event_ids.keys() ) + args.append(RelationTypes.THREAD) - if self._msc3440_enabled: - relations_clause = "(relation_type = ? OR relation_type = ?)" - args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD)) - else: - relations_clause = "relation_type = ?" - args.append(RelationTypes.THREAD) - - txn.execute(sql % (clause, relations_clause), args) + txn.execute(sql % (clause,), args) counts = dict(cast(List[Tuple[str, int]], txn.fetchall())) return counts, latest_event_ids @@ -622,7 +591,7 @@ class RelationsWorkerStore(SQLBaseStore): parent.event_id = relates_to_id AND parent.room_id = child.room_id WHERE - %s + relation_type = ? AND %s AND %s GROUP BY parent.event_id, child.sender @@ -638,16 +607,9 @@ class RelationsWorkerStore(SQLBaseStore): txn.database_engine, "relates_to_id", event_ids ) - if self._msc3440_enabled: - relations_clause = "(relation_type = ? OR relation_type = ?)" - relations_args = [RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD] - else: - relations_clause = "relation_type = ?" - relations_args = [RelationTypes.THREAD] - txn.execute( - sql % (users_sql, events_clause, relations_clause), - users_args + events_args + relations_args, + sql % (users_sql, events_clause), + [RelationTypes.THREAD] + users_args + events_args, ) return {(row[0], row[1]): row[2] for row in txn} @@ -677,7 +639,7 @@ class RelationsWorkerStore(SQLBaseStore): user participated in that event's thread, otherwise false. """ - def _get_thread_summary_txn(txn: LoggingTransaction) -> Set[str]: + def _get_threads_participated_txn(txn: LoggingTransaction) -> Set[str]: # Fetch whether the requester has participated or not. sql = """ SELECT DISTINCT relates_to_id @@ -688,28 +650,20 @@ class RelationsWorkerStore(SQLBaseStore): AND parent.room_id = child.room_id WHERE %s - AND %s + AND relation_type = ? AND child.sender = ? """ clause, args = make_in_list_sql_clause( txn.database_engine, "relates_to_id", event_ids ) + args.extend([RelationTypes.THREAD, user_id]) - if self._msc3440_enabled: - relations_clause = "(relation_type = ? OR relation_type = ?)" - args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD)) - else: - relations_clause = "relation_type = ?" - args.append(RelationTypes.THREAD) - - args.append(user_id) - - txn.execute(sql % (clause, relations_clause), args) + txn.execute(sql % (clause,), args) return {row[0] for row in txn.fetchall()} participated_threads = await self.db_pool.runInteraction( - "get_thread_summary", _get_thread_summary_txn + "get_threads_participated", _get_threads_participated_txn ) return {event_id: event_id in participated_threads for event_id in event_ids} diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 18b1acd9e1..87e9482c60 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py
@@ -1077,6 +1077,37 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): get_rooms_for_retention_period_in_range_txn, ) + async def clear_partial_state_room(self, room_id: str) -> bool: + # this can race with incoming events, so we watch out for FK errors. + # TODO(faster_joins): this still doesn't completely fix the race, since the persist process + # is not atomic. I fear we need an application-level lock. + try: + await self.db_pool.runInteraction( + "clear_partial_state_room", self._clear_partial_state_room_txn, room_id + ) + return True + except self.db_pool.engine.module.DatabaseError as e: + # TODO(faster_joins): how do we distinguish between FK errors and other errors? + logger.warning( + "Exception while clearing lazy partial-state-room %s, retrying: %s", + room_id, + e, + ) + return False + + @staticmethod + def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None: + DatabasePool.simple_delete_txn( + txn, + table="partial_state_rooms_servers", + keyvalues={"room_id": room_id}, + ) + DatabasePool.simple_delete_one_txn( + txn, + table="partial_state_rooms", + keyvalues={"room_id": room_id}, + ) + class _BackgroundUpdates: REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory" diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index ecdc1fdc4c..7a1b013fa3 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py
@@ -21,6 +21,7 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion from synapse.events import EventBase +from synapse.events.snapshot import EventContext from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -129,7 +130,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): ) if room_version is None: - raise NotFoundError("Could not room_version for %s" % (room_id,)) + raise NotFoundError("Could not find room_version for %s" % (room_id,)) return room_version @@ -354,6 +355,53 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return {row["state_group"] for row in rows} + async def update_state_for_partial_state_event( + self, + event: EventBase, + context: EventContext, + ) -> None: + """Update the state group for a partial state event""" + await self.db_pool.runInteraction( + "update_state_for_partial_state_event", + self._update_state_for_partial_state_event_txn, + event, + context, + ) + + def _update_state_for_partial_state_event_txn( + self, + txn, + event: EventBase, + context: EventContext, + ): + # we shouldn't have any outliers here + assert not event.internal_metadata.is_outlier() + + # anything that was rejected should have the same state as its + # predecessor. + if context.rejected: + assert context.state_group == context.state_group_before_event + + self.db_pool.simple_update_txn( + txn, + table="event_to_state_groups", + keyvalues={"event_id": event.event_id}, + updatevalues={"state_group": context.state_group}, + ) + + self.db_pool.simple_delete_one_txn( + txn, + table="partial_state_events", + keyvalues={"event_id": event.event_id}, + ) + + # TODO(faster_joins): need to do something about workers here + txn.call_after( + self._get_state_group_for_event.prefill, + (event.event_id,), + context.state_group, + ) + class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 6d45a8a9f6..793e906630 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -758,6 +758,32 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "get_room_event_before_stream_ordering", _f ) + async def get_last_event_in_room_before_stream_ordering( + self, + room_id: str, + end_token: RoomStreamToken, + ) -> Optional[EventBase]: + """Returns the last event in a room at or before a stream ordering + + Args: + room_id + end_token: The token used to stream from + + Returns: + The most recent event. + """ + + last_row = await self.get_room_event_before_stream_ordering( + room_id=room_id, + stream_ordering=end_token.stream, + ) + if last_row: + _, _, event_id = last_row + event = await self.get_event(event_id, get_prev_content=True) + return event + + return None + async def get_current_room_stream_token_for_room_id( self, room_id: Optional[str] = None ) -> RoomStreamToken: diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index b402922817..e496ba7bed 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py
@@ -376,6 +376,62 @@ class EventsPersistenceStorage: pos = PersistedEventPosition(self._instance_name, event_stream_id) return event, pos, self.main_store.get_room_max_token() + async def update_current_state(self, room_id: str) -> None: + """Recalculate the current state for a room, and persist it""" + state = await self._calculate_current_state(room_id) + delta = await self._calculate_state_delta(room_id, state) + + # TODO(faster_joins): get a real stream ordering, to make this work correctly + # across workers. + # + # TODO(faster_joins): this can race against event persistence, in which case we + # will end up with incorrect state. Perhaps we should make this a job we + # farm out to the event persister, somehow. + stream_id = self.main_store.get_room_max_stream_ordering() + await self.persist_events_store.update_current_state(room_id, delta, stream_id) + + async def _calculate_current_state(self, room_id: str) -> StateMap[str]: + """Calculate the current state of a room, based on the forward extremities + + Args: + room_id: room for which to calculate current state + + Returns: + map from (type, state_key) to event id for the current state in the room + """ + latest_event_ids = await self.main_store.get_latest_event_ids_in_room(room_id) + state_groups = set( + ( + await self.main_store._get_state_group_for_events(latest_event_ids) + ).values() + ) + + state_maps_by_state_group = await self.state_store._get_state_for_groups( + state_groups + ) + + if len(state_groups) == 1: + # If there is only one state group, then we know what the current + # state is. + return state_maps_by_state_group[state_groups.pop()] + + # Ok, we need to defer to the state handler to resolve our state sets. + logger.debug("calling resolve_state_groups from preserve_events") + + # Avoid a circular import. + from synapse.state import StateResolutionStore + + room_version = await self.main_store.get_room_version_id(room_id) + res = await self._state_resolution_handler.resolve_state_groups( + room_id, + room_version, + state_maps_by_state_group, + event_map=None, + state_res_store=StateResolutionStore(self.main_store), + ) + + return res.state + async def _persist_event_batch( self, events_and_contexts: List[Tuple[EventBase, EventContext]], diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 151f2aa9bb..871d4ace12 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py
@@ -66,9 +66,9 @@ Changes in SCHEMA_VERSION = 69: SCHEMA_COMPAT_VERSION = ( - # we now have `state_key` columns in both `events` and `state_events`, so - # now incompatible with synapses wth SCHEMA_VERSION < 66. - 66 + # We now assume that `device_lists_changes_in_room` has been filled out for + # recent device_list_updates. + 69 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat