diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 8ff45a3353..1fb7ca4f71 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -88,6 +88,10 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+# Logging for https://github.com/matrix-org/matrix-spec/issues/1209 and
+# https://github.com/element-hq/synapse/issues/16940
+client_state_desync_logger = logging.getLogger("synapse.client_state_desync_debug")
+
# Counts the number of times we returned a non-empty sync. `type` is one of
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
# "true" or "false" depending on if the request asked for lazy loaded members or
@@ -1216,6 +1220,12 @@ class SyncHandler:
previous_timeline_end={},
lazy_load_members=lazy_load_members,
)
+
+ if client_state_desync_logger.isEnabledFor(logging.DEBUG):
+ await self._log_client_state_desync(
+ room_id, None, state_ids, timeline_state, lazy_load_members
+ )
+
return state_ids
async def _compute_state_delta_for_incremental_sync(
@@ -1361,6 +1371,15 @@ class SyncHandler:
lazy_load_members=lazy_load_members,
)
+ if client_state_desync_logger.isEnabledFor(logging.DEBUG):
+ await self._log_client_state_desync(
+ room_id,
+ since_token,
+ state_ids,
+ timeline_state,
+ lazy_load_members,
+ )
+
return state_ids
async def _find_missing_partial_state_memberships(
@@ -1477,6 +1496,125 @@ class SyncHandler:
return additional_state_ids
+ async def _log_client_state_desync(
+ self,
+ room_id: str,
+ since_token: Optional[StreamToken],
+ sync_response_state_state: StateMap[str],
+ sync_response_timeline_state: StateMap[str],
+ lazy_load_members: bool,
+ ) -> None:
+ """
+ Logging to see how often the client's state gets out of sync with the
+ actual current state of the room.
+
+ There are few different potential failure modes here:
+
+ * State resolution can cause changes in the state of the room that don't
+ directly correspond to events with the corresponding (type, state_key).
+ https://github.com/matrix-org/matrix-spec/issues/1209 discusses this in
+ more detail.
+
+ * Even where there is an event that causes a given state change, Synapse
+ may not serve it to the client, since it works on state at specific points
+ in the DAG, rather than "current state".
+ See https://github.com/element-hq/synapse/issues/16940.
+
+ * Lazy-loading adds more complexity, as it means that events that would
+ normally be served via the `state` part of an incremental sync are filtered
+ out.
+
+ To try to get a handle on this, let's put ourselves in the shoes of a client,
+ and compare the state they will calculate against the actual current state.
+ """
+ # We only care about membership events.
+ state_filter = StateFilter.from_types(types=(("m.room.member", None),))
+
+ if since_token is None:
+ if lazy_load_members:
+ # For initial syncs with lazy-loading enabled, there's not too much
+ # concern here. We know the client will do a `/members` query before
+ # doing any encryption, so what sync returns isn't too important.
+ #
+ # (Of course, then `/members` might also return an incomplete list, but
+ # that's a separate problem.)
+ return
+
+ # For regular initial syncs, compare the returned response with the actual
+ # current state.
+ client_calculated_state = {}
+ client_calculated_state.update(sync_response_state_state)
+ client_calculated_state.update(sync_response_timeline_state)
+ else:
+ # For an incremental (gappy or otherwise) sync, let's assume the client has
+ # a complete membership list as of the last sync (or rather, at
+ # `since_token`, which is the closest approximation we have to it
+ # right now), and see what they would calculate as the current state given
+ # this sync update.
+
+ client_calculated_state = dict(
+ await self.get_state_at(
+ room_id,
+ stream_position=since_token,
+ state_filter=state_filter,
+ await_full_state=False,
+ )
+ )
+ client_calculated_state.update(sync_response_state_state)
+ client_calculated_state.update(sync_response_timeline_state)
+
+ current_state = await self._state_storage_controller.get_current_state_ids(
+ room_id, state_filter=state_filter, await_full_state=False
+ )
+ missing_users = await self._calculate_missing_members(
+ current_state, client_calculated_state
+ )
+ if missing_users:
+ client_state_desync_logger.debug(
+ "client state discrepancy in incremental sync in room %s: missing users %s",
+ room_id,
+ missing_users,
+ )
+
+ async def _calculate_missing_members(
+ self,
+ actual_state: StateMap[str],
+ client_calculated_state: StateMap[str],
+ ) -> List[str]:
+ """Helper for `_log_client_state_desync`: calculates the difference in
+ joined members between two state maps.
+
+ Returns:
+ A list of user IDs
+ """
+ missing_users = []
+
+ async def event_id_to_membership(event_id: Optional[str]) -> Optional[str]:
+ if event_id is None:
+ return None
+ event = await self.store.get_event(event_id, allow_none=True)
+ if event is None:
+ return "MISSING_EVENT"
+ return event.membership
+
+ # Check for joined members in the actual state that are missing or have a
+ # different membership in the actual state.
+ for (event_type, state_key), actual_event_id in actual_state.items():
+ if event_type != EventTypes.Member:
+ continue
+
+ calculated_event_id = client_calculated_state.get((event_type, state_key))
+ if calculated_event_id != actual_event_id:
+ actual_membership = event_id_to_membership(actual_event_id)
+ calculated_membership = event_id_to_membership(calculated_event_id)
+ if (
+ actual_membership == Membership.JOIN
+ and calculated_membership != Membership.JOIN
+ ):
+ missing_users.append(state_key)
+
+ return missing_users
+
async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
) -> RoomNotifCounts:
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 0ca08038f4..ab12951da8 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -23,6 +23,7 @@
import enum
import logging
+import urllib.parse as urlparse
from http import HTTPStatus
from typing import (
TYPE_CHECKING,
@@ -450,6 +451,87 @@ def parse_string(
)
+def parse_json(
+ request: Request,
+ name: str,
+ default: Optional[dict] = None,
+ required: bool = False,
+ encoding: str = "ascii",
+) -> Optional[JsonDict]:
+ """
+ Parse a JSON parameter from the request query string.
+
+ Args:
+ request: the twisted HTTP request.
+ name: the name of the query parameter.
+ default: value to use if the parameter is absent,
+ defaults to None.
+ required: whether to raise a 400 SynapseError if the
+ parameter is absent, defaults to False.
+ encoding: The encoding to decode the string content with.
+
+ Returns:
+ A JSON value, or `default` if the named query parameter was not found
+ and `required` was False.
+
+ Raises:
+ SynapseError if the parameter is absent and required, or if the
+ parameter is present and not a JSON object.
+ """
+ args: Mapping[bytes, Sequence[bytes]] = request.args # type: ignore
+ return parse_json_from_args(
+ args,
+ name,
+ default,
+ required=required,
+ encoding=encoding,
+ )
+
+
+def parse_json_from_args(
+ args: Mapping[bytes, Sequence[bytes]],
+ name: str,
+ default: Optional[dict] = None,
+ required: bool = False,
+ encoding: str = "ascii",
+) -> Optional[JsonDict]:
+ """
+ Parse a JSON parameter from the request query string.
+
+ Args:
+ args: a mapping of request args as bytes to a list of bytes (e.g. request.args).
+ name: the name of the query parameter.
+ default: value to use if the parameter is absent,
+ defaults to None.
+ required: whether to raise a 400 SynapseError if the
+ parameter is absent, defaults to False.
+ encoding: the encoding to decode the string content with.
+
+ A JSON value, or `default` if the named query parameter was not found
+ and `required` was False.
+
+ Raises:
+ SynapseError if the parameter is absent and required, or if the
+ parameter is present and not a JSON object.
+ """
+ name_bytes = name.encode("ascii")
+
+ if name_bytes not in args:
+ if not required:
+ return default
+
+ message = f"Missing required integer query parameter {name}"
+ raise SynapseError(HTTPStatus.BAD_REQUEST, message, errcode=Codes.MISSING_PARAM)
+
+ json_str = parse_string_from_args(args, name, required=True, encoding=encoding)
+
+ try:
+ return json_decoder.decode(urlparse.unquote(json_str))
+ except Exception:
+ message = f"Query parameter {name} must be a valid JSON object"
+ raise SynapseError(HTTPStatus.BAD_REQUEST, message, errcode=Codes.NOT_JSON)
+
+
EnumT = TypeVar("EnumT", bound=enum.Enum)
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 4252f98a6c..0d86a4e15f 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -21,7 +21,6 @@
import logging
from http import HTTPStatus
from typing import TYPE_CHECKING, List, Optional, Tuple, cast
-from urllib import parse as urlparse
import attr
@@ -38,6 +37,7 @@ from synapse.http.servlet import (
assert_params_in_dict,
parse_enum,
parse_integer,
+ parse_json,
parse_json_object_from_request,
parse_string,
)
@@ -51,7 +51,6 @@ from synapse.storage.databases.main.room import RoomSortOrder
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, RoomID, ScheduledTask, UserID, create_requester
from synapse.types.state import StateFilter
-from synapse.util import json_decoder
if TYPE_CHECKING:
from synapse.api.auth import Auth
@@ -776,14 +775,8 @@ class RoomEventContextServlet(RestServlet):
limit = parse_integer(request, "limit", default=10)
# picking the API shape for symmetry with /messages
- filter_str = parse_string(request, "filter", encoding="utf-8")
- if filter_str:
- filter_json = urlparse.unquote(filter_str)
- event_filter: Optional[Filter] = Filter(
- self._hs, json_decoder.decode(filter_json)
- )
- else:
- event_filter = None
+ filter_json = parse_json(request, "filter", encoding="utf-8")
+ event_filter = Filter(self._hs, filter_json) if filter_json else None
event_context = await self.room_context_handler.get_event_context(
requester,
@@ -914,21 +907,16 @@ class RoomMessagesRestServlet(RestServlet):
)
# Twisted will have processed the args by now.
assert request.args is not None
+
+ filter_json = parse_json(request, "filter", encoding="utf-8")
+ event_filter = Filter(self._hs, filter_json) if filter_json else None
+
as_client_event = b"raw" not in request.args
- filter_str = parse_string(request, "filter", encoding="utf-8")
- if filter_str:
- filter_json = urlparse.unquote(filter_str)
- event_filter: Optional[Filter] = Filter(
- self._hs, json_decoder.decode(filter_json)
- )
- if (
- event_filter
- and event_filter.filter_json.get("event_format", "client")
- == "federation"
- ):
- as_client_event = False
- else:
- event_filter = None
+ if (
+ event_filter
+ and event_filter.filter_json.get("event_format", "client") == "federation"
+ ):
+ as_client_event = False
msgs = await self._pagination_handler.get_messages(
room_id=room_id,
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 4eeadf8779..e4c7dd1a58 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -52,6 +52,7 @@ from synapse.http.servlet import (
parse_boolean,
parse_enum,
parse_integer,
+ parse_json,
parse_json_object_from_request,
parse_string,
parse_strings_from_args,
@@ -65,7 +66,6 @@ from synapse.rest.client.transactions import HttpTransactionCache
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, StreamToken, ThirdPartyInstanceID, UserID
from synapse.types.state import StateFilter
-from synapse.util import json_decoder
from synapse.util.cancellation import cancellable
from synapse.util.stringutils import parse_and_validate_server_name, random_string
@@ -703,21 +703,16 @@ class RoomMessageListRestServlet(RestServlet):
)
# Twisted will have processed the args by now.
assert request.args is not None
+
+ filter_json = parse_json(request, "filter", encoding="utf-8")
+ event_filter = Filter(self._hs, filter_json) if filter_json else None
+
as_client_event = b"raw" not in request.args
- filter_str = parse_string(request, "filter", encoding="utf-8")
- if filter_str:
- filter_json = urlparse.unquote(filter_str)
- event_filter: Optional[Filter] = Filter(
- self._hs, json_decoder.decode(filter_json)
- )
- if (
- event_filter
- and event_filter.filter_json.get("event_format", "client")
- == "federation"
- ):
- as_client_event = False
- else:
- event_filter = None
+ if (
+ event_filter
+ and event_filter.filter_json.get("event_format", "client") == "federation"
+ ):
+ as_client_event = False
msgs = await self.pagination_handler.get_messages(
room_id=room_id,
@@ -898,14 +893,8 @@ class RoomEventContextServlet(RestServlet):
limit = parse_integer(request, "limit", default=10)
# picking the API shape for symmetry with /messages
- filter_str = parse_string(request, "filter", encoding="utf-8")
- if filter_str:
- filter_json = urlparse.unquote(filter_str)
- event_filter: Optional[Filter] = Filter(
- self._hs, json_decoder.decode(filter_json)
- )
- else:
- event_filter = None
+ filter_json = parse_json(request, "filter", encoding="utf-8")
+ event_filter = Filter(self._hs, filter_json) if filter_json else None
event_context = await self.room_context_handler.get_event_context(
requester, room_id, event_id, limit, event_filter
|