diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/config/experimental.py | 3 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 91 | ||||
-rw-r--r-- | synapse/federation/transport/server/federation.py | 20 | ||||
-rw-r--r-- | synapse/storage/databases/main/event_federation.py | 12 |
4 files changed, 109 insertions, 17 deletions
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index f05a803a71..09d692d9a1 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -61,3 +61,6 @@ class ExperimentalConfig(Config): self.msc2409_to_device_messages_enabled: bool = experimental.get( "msc2409_to_device_messages_enabled", False ) + + # MSC3706 (server-side support for partial state in /send_join responses) + self.msc3706_enabled: bool = experimental.get("msc3706_enabled", False) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index af9cb98f67..482bbdd867 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -20,6 +20,7 @@ from typing import ( Any, Awaitable, Callable, + Collection, Dict, Iterable, List, @@ -64,7 +65,7 @@ from synapse.replication.http.federation import ( ReplicationGetQueryRestServlet, ) from synapse.storage.databases.main.lock import Lock -from synapse.types import JsonDict, get_domain_from_id +from synapse.types import JsonDict, StateMap, get_domain_from_id from synapse.util import json_decoder, unwrapFirstError from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results from synapse.util.caches.response_cache import ResponseCache @@ -571,7 +572,7 @@ class FederationServer(FederationBase): ) -> JsonDict: state_ids = await self.handler.get_state_ids_for_pdu(room_id, event_id) auth_chain_ids = await self.store.get_auth_chain_ids(room_id, state_ids) - return {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids} + return {"pdu_ids": state_ids, "auth_chain_ids": list(auth_chain_ids)} async def _on_context_state_request_compute( self, room_id: str, event_id: Optional[str] @@ -645,27 +646,61 @@ class FederationServer(FederationBase): return {"event": ret_pdu.get_pdu_json(time_now)} async def on_send_join_request( - self, origin: str, content: JsonDict, room_id: str + self, + origin: str, + content: JsonDict, + room_id: str, + caller_supports_partial_state: bool = False, ) -> Dict[str, Any]: event, context = await self._on_send_membership_event( origin, content, Membership.JOIN, room_id ) prev_state_ids = await context.get_prev_state_ids() - state_ids = list(prev_state_ids.values()) - auth_chain = await self.store.get_auth_chain(room_id, state_ids) - state = await self.store.get_events(state_ids) + state_event_ids: Collection[str] + servers_in_room: Optional[Collection[str]] + if caller_supports_partial_state: + state_event_ids = _get_event_ids_for_partial_state_join( + event, prev_state_ids + ) + servers_in_room = await self.state.get_hosts_in_room_at_events( + room_id, event_ids=event.prev_event_ids() + ) + else: + state_event_ids = prev_state_ids.values() + servers_in_room = None + + auth_chain_event_ids = await self.store.get_auth_chain_ids( + room_id, state_event_ids + ) + + # if the caller has opted in, we can omit any auth_chain events which are + # already in state_event_ids + if caller_supports_partial_state: + auth_chain_event_ids.difference_update(state_event_ids) + + auth_chain_events = await self.store.get_events_as_list(auth_chain_event_ids) + state_events = await self.store.get_events_as_list(state_event_ids) + + # we try to do all the async stuff before this point, so that time_now is as + # accurate as possible. time_now = self._clock.time_msec() - event_json = event.get_pdu_json() - return { + event_json = event.get_pdu_json(time_now) + resp = { # TODO Remove the unstable prefix when servers have updated. "org.matrix.msc3083.v2.event": event_json, "event": event_json, - "state": [p.get_pdu_json(time_now) for p in state.values()], - "auth_chain": [p.get_pdu_json(time_now) for p in auth_chain], + "state": [p.get_pdu_json(time_now) for p in state_events], + "auth_chain": [p.get_pdu_json(time_now) for p in auth_chain_events], + "org.matrix.msc3706.partial_state": caller_supports_partial_state, } + if servers_in_room is not None: + resp["org.matrix.msc3706.servers_in_room"] = list(servers_in_room) + + return resp + async def on_make_leave_request( self, origin: str, room_id: str, user_id: str ) -> Dict[str, Any]: @@ -1339,3 +1374,39 @@ class FederationHandlerRegistry: # error. logger.warning("No handler registered for query type %s", query_type) raise NotFoundError("No handler for Query type '%s'" % (query_type,)) + + +def _get_event_ids_for_partial_state_join( + join_event: EventBase, + prev_state_ids: StateMap[str], +) -> Collection[str]: + """Calculate state to be retuned in a partial_state send_join + + Args: + join_event: the join event being send_joined + prev_state_ids: the event ids of the state before the join + + Returns: + the event ids to be returned + """ + + # return all non-member events + state_event_ids = { + event_id + for (event_type, state_key), event_id in prev_state_ids.items() + if event_type != EventTypes.Member + } + + # we also need the current state of the current user (it's going to + # be an auth event for the new join, so we may as well return it) + current_membership_event_id = prev_state_ids.get( + (EventTypes.Member, join_event.state_key) + ) + if current_membership_event_id is not None: + state_event_ids.add(current_membership_event_id) + + # TODO: return a few more members: + # - those with invites + # - those that are kicked? / banned + + return state_event_ids diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index d86dfede4e..310733097d 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -412,6 +412,16 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet): PREFIX = FEDERATION_V2_PREFIX + def __init__( + self, + hs: "HomeServer", + authenticator: Authenticator, + ratelimiter: FederationRateLimiter, + server_name: str, + ): + super().__init__(hs, authenticator, ratelimiter, server_name) + self._msc3706_enabled = hs.config.experimental.msc3706_enabled + async def on_PUT( self, origin: str, @@ -422,7 +432,15 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet): ) -> Tuple[int, JsonDict]: # TODO(paul): assert that event_id parsed from path actually # match those given in content - result = await self.handler.on_send_join_request(origin, content, room_id) + + partial_state = False + if self._msc3706_enabled: + partial_state = parse_boolean_from_args( + query, "org.matrix.msc3706.partial_state", default=False + ) + result = await self.handler.on_send_join_request( + origin, content, room_id, caller_supports_partial_state=partial_state + ) return 200, result diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 22f6474127..277e6422eb 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -121,7 +121,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas room_id: str, event_ids: Collection[str], include_given: bool = False, - ) -> List[str]: + ) -> Set[str]: """Get auth events for given event_ids. The events *must* be state events. Args: @@ -130,7 +130,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas include_given: include the given events in result Returns: - list of event_ids + set of event_ids """ # Check if we have indexed the room so we can use the chain cover @@ -159,7 +159,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas def _get_auth_chain_ids_using_cover_index_txn( self, txn: Cursor, room_id: str, event_ids: Collection[str], include_given: bool - ) -> List[str]: + ) -> Set[str]: """Calculates the auth chain IDs using the chain index.""" # First we look up the chain ID/sequence numbers for the given events. @@ -272,11 +272,11 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas txn.execute(sql, (chain_id, max_no)) results.update(r for r, in txn) - return list(results) + return results def _get_auth_chain_ids_txn( self, txn: LoggingTransaction, event_ids: Collection[str], include_given: bool - ) -> List[str]: + ) -> Set[str]: """Calculates the auth chain IDs. This is used when we don't have a cover index for the room. @@ -331,7 +331,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas front = new_front results.update(front) - return list(results) + return results async def get_auth_chain_difference( self, room_id: str, state_sets: List[Set[str]] |