diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 8c5db2a513..bac02122d0 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -21,12 +21,13 @@
import itertools
import logging
from collections import defaultdict
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
+from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union
from synapse.api.constants import AccountDataTypes, EduTypes, Membership, PresenceState
from synapse.api.errors import Codes, StoreError, SynapseError
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
+from synapse.api.ratelimiting import Ratelimiter
from synapse.events.utils import (
SerializeEventConfig,
format_event_for_client_v2_without_room_id,
@@ -126,6 +127,13 @@ class SyncRestServlet(RestServlet):
cache_name="sync_valid_filter",
)
+ # Ratelimiter for presence updates, keyed by requester.
+ self._presence_per_user_limiter = Ratelimiter(
+ store=self.store,
+ clock=self.clock,
+ cfg=hs.config.ratelimiting.rc_presence_per_user,
+ )
+
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
# This will always be set by the time Twisted calls us.
assert request.args is not None
@@ -152,6 +160,14 @@ class SyncRestServlet(RestServlet):
filter_id = parse_string(request, "filter")
full_state = parse_boolean(request, "full_state", default=False)
+ use_state_after = False
+ if await self.store.is_feature_enabled(
+ user.to_string(), ExperimentalFeature.MSC4222
+ ):
+ use_state_after = parse_boolean(
+ request, "org.matrix.msc4222.use_state_after", default=False
+ )
+
logger.debug(
"/sync: user=%r, timeout=%r, since=%r, "
"set_presence=%r, filter_id=%r, device_id=%r",
@@ -184,6 +200,7 @@ class SyncRestServlet(RestServlet):
full_state,
device_id,
last_ignore_accdata_streampos,
+ use_state_after,
)
if filter_id is None:
@@ -220,6 +237,7 @@ class SyncRestServlet(RestServlet):
filter_collection=filter_collection,
is_guest=requester.is_guest,
device_id=device_id,
+ use_state_after=use_state_after,
)
since_token = None
@@ -229,7 +247,13 @@ class SyncRestServlet(RestServlet):
# send any outstanding server notices to the user.
await self._server_notices_sender.on_user_syncing(user.to_string())
- affect_presence = set_presence != PresenceState.OFFLINE
+ # ignore the presence update if the ratelimit is exceeded but do not pause the request
+ allowed, _ = await self._presence_per_user_limiter.can_do_action(requester)
+ if not allowed:
+ affect_presence = False
+ logger.debug("User set_presence ratelimit exceeded; ignoring it.")
+ else:
+ affect_presence = set_presence != PresenceState.OFFLINE
context = await self.presence_handler.user_syncing(
user.to_string(),
@@ -258,7 +282,7 @@ class SyncRestServlet(RestServlet):
# We know that the the requester has an access token since appservices
# cannot use sync.
response_content = await self.encode_response(
- time_now, sync_result, requester, filter_collection
+ time_now, sync_config, sync_result, requester, filter_collection
)
logger.debug("Event formatting complete")
@@ -268,6 +292,7 @@ class SyncRestServlet(RestServlet):
async def encode_response(
self,
time_now: int,
+ sync_config: SyncConfig,
sync_result: SyncResult,
requester: Requester,
filter: FilterCollection,
@@ -292,7 +317,7 @@ class SyncRestServlet(RestServlet):
)
joined = await self.encode_joined(
- sync_result.joined, time_now, serialize_options
+ sync_config, sync_result.joined, time_now, serialize_options
)
invited = await self.encode_invited(
@@ -304,7 +329,7 @@ class SyncRestServlet(RestServlet):
)
archived = await self.encode_archived(
- sync_result.archived, time_now, serialize_options
+ sync_config, sync_result.archived, time_now, serialize_options
)
logger.debug("building sync response dict")
@@ -372,6 +397,7 @@ class SyncRestServlet(RestServlet):
@trace_with_opname("sync.encode_joined")
async def encode_joined(
self,
+ sync_config: SyncConfig,
rooms: List[JoinedSyncResult],
time_now: int,
serialize_options: SerializeEventConfig,
@@ -380,6 +406,7 @@ class SyncRestServlet(RestServlet):
Encode the joined rooms in a sync result
Args:
+ sync_config
rooms: list of sync results for rooms this user is joined to
time_now: current time - used as a baseline for age calculations
serialize_options: Event serializer options
@@ -389,7 +416,11 @@ class SyncRestServlet(RestServlet):
joined = {}
for room in rooms:
joined[room.room_id] = await self.encode_room(
- room, time_now, joined=True, serialize_options=serialize_options
+ sync_config,
+ room,
+ time_now,
+ joined=True,
+ serialize_options=serialize_options,
)
return joined
@@ -419,7 +450,12 @@ class SyncRestServlet(RestServlet):
)
unsigned = dict(invite.get("unsigned", {}))
invite["unsigned"] = unsigned
- invited_state = list(unsigned.pop("invite_room_state", []))
+
+ invited_state = unsigned.pop("invite_room_state", [])
+ if not isinstance(invited_state, list):
+ invited_state = []
+
+ invited_state = list(invited_state)
invited_state.append(invite)
invited[room.room_id] = {"invite_state": {"events": invited_state}}
@@ -459,7 +495,10 @@ class SyncRestServlet(RestServlet):
# Extract the stripped room state from the unsigned dict
# This is for clients to get a little bit of information about
# the room they've knocked on, without revealing any sensitive information
- knocked_state = list(unsigned.pop("knock_room_state", []))
+ knocked_state = unsigned.pop("knock_room_state", [])
+ if not isinstance(knocked_state, list):
+ knocked_state = []
+ knocked_state = list(knocked_state)
# Append the actual knock membership event itself as well. This provides
# the client with:
@@ -477,6 +516,7 @@ class SyncRestServlet(RestServlet):
@trace_with_opname("sync.encode_archived")
async def encode_archived(
self,
+ sync_config: SyncConfig,
rooms: List[ArchivedSyncResult],
time_now: int,
serialize_options: SerializeEventConfig,
@@ -485,6 +525,7 @@ class SyncRestServlet(RestServlet):
Encode the archived rooms in a sync result
Args:
+ sync_config
rooms: list of sync results for rooms this user is joined to
time_now: current time - used as a baseline for age calculations
serialize_options: Event serializer options
@@ -494,13 +535,18 @@ class SyncRestServlet(RestServlet):
joined = {}
for room in rooms:
joined[room.room_id] = await self.encode_room(
- room, time_now, joined=False, serialize_options=serialize_options
+ sync_config,
+ room,
+ time_now,
+ joined=False,
+ serialize_options=serialize_options,
)
return joined
async def encode_room(
self,
+ sync_config: SyncConfig,
room: Union[JoinedSyncResult, ArchivedSyncResult],
time_now: int,
joined: bool,
@@ -508,6 +554,7 @@ class SyncRestServlet(RestServlet):
) -> JsonDict:
"""
Args:
+ sync_config
room: sync result for a single room
time_now: current time - used as a baseline for age calculations
token_id: ID of the user's auth token - used for namespacing
@@ -548,13 +595,20 @@ class SyncRestServlet(RestServlet):
account_data = room.account_data
+ # We either include a `state` or `state_after` field depending on
+ # whether the client has opted in to the newer `state_after` behavior.
+ if sync_config.use_state_after:
+ state_key_name = "org.matrix.msc4222.state_after"
+ else:
+ state_key_name = "state"
+
result: JsonDict = {
"timeline": {
"events": serialized_timeline,
"prev_batch": await room.timeline.prev_batch.to_string(self.store),
"limited": room.timeline.limited,
},
- "state": {"events": serialized_state},
+ state_key_name: {"events": serialized_state},
"account_data": {"events": account_data},
}
@@ -688,6 +742,7 @@ class SlidingSyncE2eeRestServlet(RestServlet):
filter_collection=self.only_member_events_filter_collection,
is_guest=requester.is_guest,
device_id=device_id,
+ use_state_after=False, # We don't return any rooms so this flag is a no-op
)
since_token = None
@@ -975,7 +1030,7 @@ class SlidingSyncRestServlet(RestServlet):
return response
def encode_lists(
- self, lists: Dict[str, SlidingSyncResult.SlidingWindowList]
+ self, lists: Mapping[str, SlidingSyncResult.SlidingWindowList]
) -> JsonDict:
def encode_operation(
operation: SlidingSyncResult.SlidingWindowList.Operation,
@@ -1010,13 +1065,19 @@ class SlidingSyncRestServlet(RestServlet):
serialized_rooms: Dict[str, JsonDict] = {}
for room_id, room_result in rooms.items():
serialized_rooms[room_id] = {
- "bump_stamp": room_result.bump_stamp,
- "joined_count": room_result.joined_count,
- "invited_count": room_result.invited_count,
"notification_count": room_result.notification_count,
"highlight_count": room_result.highlight_count,
}
+ if room_result.bump_stamp is not None:
+ serialized_rooms[room_id]["bump_stamp"] = room_result.bump_stamp
+
+ if room_result.joined_count is not None:
+ serialized_rooms[room_id]["joined_count"] = room_result.joined_count
+
+ if room_result.invited_count is not None:
+ serialized_rooms[room_id]["invited_count"] = room_result.invited_count
+
if room_result.name:
serialized_rooms[room_id]["name"] = room_result.name
@@ -1040,10 +1101,15 @@ class SlidingSyncRestServlet(RestServlet):
serialized_rooms[room_id]["heroes"] = serialized_heroes
# We should only include the `initial` key if it's `True` to save bandwidth.
- # The absense of this flag means `False`.
+ # The absence of this flag means `False`.
if room_result.initial:
serialized_rooms[room_id]["initial"] = room_result.initial
+ if room_result.unstable_expanded_timeline:
+ serialized_rooms[room_id]["unstable_expanded_timeline"] = (
+ room_result.unstable_expanded_timeline
+ )
+
# This will be omitted for invite/knock rooms with `stripped_state`
if (
room_result.required_state is not None
@@ -1077,9 +1143,9 @@ class SlidingSyncRestServlet(RestServlet):
# This will be omitted for invite/knock rooms with `stripped_state`
if room_result.prev_batch is not None:
- serialized_rooms[room_id]["prev_batch"] = (
- await room_result.prev_batch.to_string(self.store)
- )
+ serialized_rooms[room_id][
+ "prev_batch"
+ ] = await room_result.prev_batch.to_string(self.store)
# This will be omitted for invite/knock rooms with `stripped_state`
if room_result.num_live is not None:
|