summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/sync.py138
-rw-r--r--synapse/http/servlet.py82
-rw-r--r--synapse/rest/admin/rooms.py36
-rw-r--r--synapse/rest/client/room.py35
4 files changed, 244 insertions, 47 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py

index 8ff45a3353..1fb7ca4f71 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -88,6 +88,10 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# Logging for https://github.com/matrix-org/matrix-spec/issues/1209 and +# https://github.com/element-hq/synapse/issues/16940 +client_state_desync_logger = logging.getLogger("synapse.client_state_desync_debug") + # Counts the number of times we returned a non-empty sync. `type` is one of # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is # "true" or "false" depending on if the request asked for lazy loaded members or @@ -1216,6 +1220,12 @@ class SyncHandler: previous_timeline_end={}, lazy_load_members=lazy_load_members, ) + + if client_state_desync_logger.isEnabledFor(logging.DEBUG): + await self._log_client_state_desync( + room_id, None, state_ids, timeline_state, lazy_load_members + ) + return state_ids async def _compute_state_delta_for_incremental_sync( @@ -1361,6 +1371,15 @@ class SyncHandler: lazy_load_members=lazy_load_members, ) + if client_state_desync_logger.isEnabledFor(logging.DEBUG): + await self._log_client_state_desync( + room_id, + since_token, + state_ids, + timeline_state, + lazy_load_members, + ) + return state_ids async def _find_missing_partial_state_memberships( @@ -1477,6 +1496,125 @@ class SyncHandler: return additional_state_ids + async def _log_client_state_desync( + self, + room_id: str, + since_token: Optional[StreamToken], + sync_response_state_state: StateMap[str], + sync_response_timeline_state: StateMap[str], + lazy_load_members: bool, + ) -> None: + """ + Logging to see how often the client's state gets out of sync with the + actual current state of the room. + + There are few different potential failure modes here: + + * State resolution can cause changes in the state of the room that don't + directly correspond to events with the corresponding (type, state_key). + https://github.com/matrix-org/matrix-spec/issues/1209 discusses this in + more detail. + + * Even where there is an event that causes a given state change, Synapse + may not serve it to the client, since it works on state at specific points + in the DAG, rather than "current state". + See https://github.com/element-hq/synapse/issues/16940. + + * Lazy-loading adds more complexity, as it means that events that would + normally be served via the `state` part of an incremental sync are filtered + out. + + To try to get a handle on this, let's put ourselves in the shoes of a client, + and compare the state they will calculate against the actual current state. + """ + # We only care about membership events. + state_filter = StateFilter.from_types(types=(("m.room.member", None),)) + + if since_token is None: + if lazy_load_members: + # For initial syncs with lazy-loading enabled, there's not too much + # concern here. We know the client will do a `/members` query before + # doing any encryption, so what sync returns isn't too important. + # + # (Of course, then `/members` might also return an incomplete list, but + # that's a separate problem.) + return + + # For regular initial syncs, compare the returned response with the actual + # current state. + client_calculated_state = {} + client_calculated_state.update(sync_response_state_state) + client_calculated_state.update(sync_response_timeline_state) + else: + # For an incremental (gappy or otherwise) sync, let's assume the client has + # a complete membership list as of the last sync (or rather, at + # `since_token`, which is the closest approximation we have to it + # right now), and see what they would calculate as the current state given + # this sync update. + + client_calculated_state = dict( + await self.get_state_at( + room_id, + stream_position=since_token, + state_filter=state_filter, + await_full_state=False, + ) + ) + client_calculated_state.update(sync_response_state_state) + client_calculated_state.update(sync_response_timeline_state) + + current_state = await self._state_storage_controller.get_current_state_ids( + room_id, state_filter=state_filter, await_full_state=False + ) + missing_users = await self._calculate_missing_members( + current_state, client_calculated_state + ) + if missing_users: + client_state_desync_logger.debug( + "client state discrepancy in incremental sync in room %s: missing users %s", + room_id, + missing_users, + ) + + async def _calculate_missing_members( + self, + actual_state: StateMap[str], + client_calculated_state: StateMap[str], + ) -> List[str]: + """Helper for `_log_client_state_desync`: calculates the difference in + joined members between two state maps. + + Returns: + A list of user IDs + """ + missing_users = [] + + async def event_id_to_membership(event_id: Optional[str]) -> Optional[str]: + if event_id is None: + return None + event = await self.store.get_event(event_id, allow_none=True) + if event is None: + return "MISSING_EVENT" + return event.membership + + # Check for joined members in the actual state that are missing or have a + # different membership in the actual state. + for (event_type, state_key), actual_event_id in actual_state.items(): + if event_type != EventTypes.Member: + continue + + calculated_event_id = client_calculated_state.get((event_type, state_key)) + if calculated_event_id != actual_event_id: + actual_membership = event_id_to_membership(actual_event_id) + calculated_membership = event_id_to_membership(calculated_event_id) + if ( + actual_membership == Membership.JOIN + and calculated_membership != Membership.JOIN + ): + missing_users.append(state_key) + + return missing_users + async def unread_notifs_for_room_id( self, room_id: str, sync_config: SyncConfig ) -> RoomNotifCounts: diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 0ca08038f4..ab12951da8 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py
@@ -23,6 +23,7 @@ import enum import logging +import urllib.parse as urlparse from http import HTTPStatus from typing import ( TYPE_CHECKING, @@ -450,6 +451,87 @@ def parse_string( ) +def parse_json( + request: Request, + name: str, + default: Optional[dict] = None, + required: bool = False, + encoding: str = "ascii", +) -> Optional[JsonDict]: + """ + Parse a JSON parameter from the request query string. + + Args: + request: the twisted HTTP request. + name: the name of the query parameter. + default: value to use if the parameter is absent, + defaults to None. + required: whether to raise a 400 SynapseError if the + parameter is absent, defaults to False. + encoding: The encoding to decode the string content with. + + Returns: + A JSON value, or `default` if the named query parameter was not found + and `required` was False. + + Raises: + SynapseError if the parameter is absent and required, or if the + parameter is present and not a JSON object. + """ + args: Mapping[bytes, Sequence[bytes]] = request.args # type: ignore + return parse_json_from_args( + args, + name, + default, + required=required, + encoding=encoding, + ) + + +def parse_json_from_args( + args: Mapping[bytes, Sequence[bytes]], + name: str, + default: Optional[dict] = None, + required: bool = False, + encoding: str = "ascii", +) -> Optional[JsonDict]: + """ + Parse a JSON parameter from the request query string. + + Args: + args: a mapping of request args as bytes to a list of bytes (e.g. request.args). + name: the name of the query parameter. + default: value to use if the parameter is absent, + defaults to None. + required: whether to raise a 400 SynapseError if the + parameter is absent, defaults to False. + encoding: the encoding to decode the string content with. + + A JSON value, or `default` if the named query parameter was not found + and `required` was False. + + Raises: + SynapseError if the parameter is absent and required, or if the + parameter is present and not a JSON object. + """ + name_bytes = name.encode("ascii") + + if name_bytes not in args: + if not required: + return default + + message = f"Missing required integer query parameter {name}" + raise SynapseError(HTTPStatus.BAD_REQUEST, message, errcode=Codes.MISSING_PARAM) + + json_str = parse_string_from_args(args, name, required=True, encoding=encoding) + + try: + return json_decoder.decode(urlparse.unquote(json_str)) + except Exception: + message = f"Query parameter {name} must be a valid JSON object" + raise SynapseError(HTTPStatus.BAD_REQUEST, message, errcode=Codes.NOT_JSON) + + EnumT = TypeVar("EnumT", bound=enum.Enum) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 4252f98a6c..0d86a4e15f 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py
@@ -21,7 +21,6 @@ import logging from http import HTTPStatus from typing import TYPE_CHECKING, List, Optional, Tuple, cast -from urllib import parse as urlparse import attr @@ -38,6 +37,7 @@ from synapse.http.servlet import ( assert_params_in_dict, parse_enum, parse_integer, + parse_json, parse_json_object_from_request, parse_string, ) @@ -51,7 +51,6 @@ from synapse.storage.databases.main.room import RoomSortOrder from synapse.streams.config import PaginationConfig from synapse.types import JsonDict, RoomID, ScheduledTask, UserID, create_requester from synapse.types.state import StateFilter -from synapse.util import json_decoder if TYPE_CHECKING: from synapse.api.auth import Auth @@ -776,14 +775,8 @@ class RoomEventContextServlet(RestServlet): limit = parse_integer(request, "limit", default=10) # picking the API shape for symmetry with /messages - filter_str = parse_string(request, "filter", encoding="utf-8") - if filter_str: - filter_json = urlparse.unquote(filter_str) - event_filter: Optional[Filter] = Filter( - self._hs, json_decoder.decode(filter_json) - ) - else: - event_filter = None + filter_json = parse_json(request, "filter", encoding="utf-8") + event_filter = Filter(self._hs, filter_json) if filter_json else None event_context = await self.room_context_handler.get_event_context( requester, @@ -914,21 +907,16 @@ class RoomMessagesRestServlet(RestServlet): ) # Twisted will have processed the args by now. assert request.args is not None + + filter_json = parse_json(request, "filter", encoding="utf-8") + event_filter = Filter(self._hs, filter_json) if filter_json else None + as_client_event = b"raw" not in request.args - filter_str = parse_string(request, "filter", encoding="utf-8") - if filter_str: - filter_json = urlparse.unquote(filter_str) - event_filter: Optional[Filter] = Filter( - self._hs, json_decoder.decode(filter_json) - ) - if ( - event_filter - and event_filter.filter_json.get("event_format", "client") - == "federation" - ): - as_client_event = False - else: - event_filter = None + if ( + event_filter + and event_filter.filter_json.get("event_format", "client") == "federation" + ): + as_client_event = False msgs = await self._pagination_handler.get_messages( room_id=room_id, diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 4eeadf8779..e4c7dd1a58 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py
@@ -52,6 +52,7 @@ from synapse.http.servlet import ( parse_boolean, parse_enum, parse_integer, + parse_json, parse_json_object_from_request, parse_string, parse_strings_from_args, @@ -65,7 +66,6 @@ from synapse.rest.client.transactions import HttpTransactionCache from synapse.streams.config import PaginationConfig from synapse.types import JsonDict, Requester, StreamToken, ThirdPartyInstanceID, UserID from synapse.types.state import StateFilter -from synapse.util import json_decoder from synapse.util.cancellation import cancellable from synapse.util.stringutils import parse_and_validate_server_name, random_string @@ -703,21 +703,16 @@ class RoomMessageListRestServlet(RestServlet): ) # Twisted will have processed the args by now. assert request.args is not None + + filter_json = parse_json(request, "filter", encoding="utf-8") + event_filter = Filter(self._hs, filter_json) if filter_json else None + as_client_event = b"raw" not in request.args - filter_str = parse_string(request, "filter", encoding="utf-8") - if filter_str: - filter_json = urlparse.unquote(filter_str) - event_filter: Optional[Filter] = Filter( - self._hs, json_decoder.decode(filter_json) - ) - if ( - event_filter - and event_filter.filter_json.get("event_format", "client") - == "federation" - ): - as_client_event = False - else: - event_filter = None + if ( + event_filter + and event_filter.filter_json.get("event_format", "client") == "federation" + ): + as_client_event = False msgs = await self.pagination_handler.get_messages( room_id=room_id, @@ -898,14 +893,8 @@ class RoomEventContextServlet(RestServlet): limit = parse_integer(request, "limit", default=10) # picking the API shape for symmetry with /messages - filter_str = parse_string(request, "filter", encoding="utf-8") - if filter_str: - filter_json = urlparse.unquote(filter_str) - event_filter: Optional[Filter] = Filter( - self._hs, json_decoder.decode(filter_json) - ) - else: - event_filter = None + filter_json = parse_json(request, "filter", encoding="utf-8") + event_filter = Filter(self._hs, filter_json) if filter_json else None event_context = await self.room_context_handler.get_event_context( requester, room_id, event_id, limit, event_filter