From 7bc110a19e6de0572b0c9513726d13298b45ced2 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Wed, 31 Aug 2022 11:16:05 +0000 Subject: Generalise the `@cancellable` annotation so it can be used on functions other than just servlet methods. (#13662) --- synapse/replication/http/_base.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/replication/http') diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 561ad5bf04..acb0bd18f7 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -26,12 +26,13 @@ from twisted.web.server import Request from synapse.api.errors import HttpResponseException, SynapseError from synapse.http import RequestTimedOutError -from synapse.http.server import HttpServer, is_method_cancellable +from synapse.http.server import HttpServer from synapse.http.site import SynapseRequest from synapse.logging import opentracing from synapse.logging.opentracing import trace_with_opname from synapse.types import JsonDict from synapse.util.caches.response_cache import ResponseCache +from synapse.util.cancellation import is_function_cancellable from synapse.util.stringutils import random_string if TYPE_CHECKING: @@ -311,7 +312,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): url_args = list(self.PATH_ARGS) method = self.METHOD - if self.CACHE and is_method_cancellable(self._handle_request): + if self.CACHE and is_function_cancellable(self._handle_request): raise Exception( f"{self.__class__.__name__} has been marked as cancellable, but CACHE " "is set. The cancellable flag would have no effect." @@ -359,6 +360,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): # The `@cancellable` decorator may be applied to `_handle_request`. But we # told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`, # so we have to set up the cancellable flag ourselves. - request.is_render_cancellable = is_method_cancellable(self._handle_request) + request.is_render_cancellable = is_function_cancellable(self._handle_request) return await self._handle_request(request, **kwargs) -- cgit 1.5.1 From 8ab16a92edd675453c78cfd9974081e374b0f998 Mon Sep 17 00:00:00 2001 From: Shay Date: Wed, 28 Sep 2022 03:11:48 -0700 Subject: Persist CreateRoom events to DB in a batch (#13800) --- changelog.d/13800.misc | 1 + synapse/handlers/message.py | 663 +++++++++++++++++--------------- synapse/handlers/room.py | 21 +- synapse/handlers/room_batch.py | 3 +- synapse/handlers/room_member.py | 11 +- synapse/replication/http/__init__.py | 2 + synapse/replication/http/send_event.py | 4 +- synapse/replication/http/send_events.py | 171 ++++++++ tests/handlers/test_message.py | 10 +- tests/handlers/test_register.py | 4 +- tests/storage/test_event_chain.py | 8 +- tests/unittest.py | 4 +- 12 files changed, 563 insertions(+), 339 deletions(-) create mode 100644 changelog.d/13800.misc create mode 100644 synapse/replication/http/send_events.py (limited to 'synapse/replication/http') diff --git a/changelog.d/13800.misc b/changelog.d/13800.misc new file mode 100644 index 0000000000..761adc8b05 --- /dev/null +++ b/changelog.d/13800.misc @@ -0,0 +1 @@ +Speed up creation of DM rooms. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 062f93bc67..00e7645ba5 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -56,11 +56,13 @@ from synapse.logging import opentracing from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.send_event import ReplicationSendEventRestServlet +from synapse.replication.http.send_events import ReplicationSendEventsRestServlet from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter from synapse.types import ( MutableStateMap, + PersistedEventPosition, Requester, RoomAlias, StateMap, @@ -493,6 +495,7 @@ class EventCreationHandler: self.membership_types_to_include_profile_data_in.add(Membership.INVITE) self.send_event = ReplicationSendEventRestServlet.make_client(hs) + self.send_events = ReplicationSendEventsRestServlet.make_client(hs) self.request_ratelimiter = hs.get_request_ratelimiter() @@ -1016,8 +1019,7 @@ class EventCreationHandler: ev = await self.handle_new_client_event( requester=requester, - event=event, - context=context, + events_and_context=[(event, context)], ratelimit=ratelimit, ignore_shadow_ban=ignore_shadow_ban, ) @@ -1293,13 +1295,13 @@ class EventCreationHandler: async def handle_new_client_event( self, requester: Requester, - event: EventBase, - context: EventContext, + events_and_context: List[Tuple[EventBase, EventContext]], ratelimit: bool = True, extra_users: Optional[List[UserID]] = None, ignore_shadow_ban: bool = False, ) -> EventBase: - """Processes a new event. + """Processes new events. Please note that if batch persisting events, an error in + handling any one of these events will result in all of the events being dropped. This includes deduplicating, checking auth, persisting, notifying users, sending to remote servers, etc. @@ -1309,8 +1311,7 @@ class EventCreationHandler: Args: requester - event - context + events_and_context: A list of one or more tuples of event, context to be persisted ratelimit extra_users: Any extra users to notify about event @@ -1328,62 +1329,63 @@ class EventCreationHandler: """ extra_users = extra_users or [] - # we don't apply shadow-banning to membership events here. Invites are blocked - # higher up the stack, and we allow shadow-banned users to send join and leave - # events as normal. - if ( - event.type != EventTypes.Member - and not ignore_shadow_ban - and requester.shadow_banned - ): - # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) - raise ShadowBanError() + for event, context in events_and_context: + # we don't apply shadow-banning to membership events here. Invites are blocked + # higher up the stack, and we allow shadow-banned users to send join and leave + # events as normal. + if ( + event.type != EventTypes.Member + and not ignore_shadow_ban + and requester.shadow_banned + ): + # We randomly sleep a bit just to annoy the requester. + await self.clock.sleep(random.randint(1, 10)) + raise ShadowBanError() - if event.is_state(): - prev_event = await self.deduplicate_state_event(event, context) - if prev_event is not None: - logger.info( - "Not bothering to persist state event %s duplicated by %s", - event.event_id, - prev_event.event_id, - ) - return prev_event + if event.is_state(): + prev_event = await self.deduplicate_state_event(event, context) + if prev_event is not None: + logger.info( + "Not bothering to persist state event %s duplicated by %s", + event.event_id, + prev_event.event_id, + ) + return prev_event - if event.internal_metadata.is_out_of_band_membership(): - # the only sort of out-of-band-membership events we expect to see here are - # invite rejections and rescinded knocks that we have generated ourselves. - assert event.type == EventTypes.Member - assert event.content["membership"] == Membership.LEAVE - else: - try: - validate_event_for_room_version(event) - await self._event_auth_handler.check_auth_rules_from_context( - event, context - ) - except AuthError as err: - logger.warning("Denying new event %r because %s", event, err) - raise err + if event.internal_metadata.is_out_of_band_membership(): + # the only sort of out-of-band-membership events we expect to see here are + # invite rejections and rescinded knocks that we have generated ourselves. + assert event.type == EventTypes.Member + assert event.content["membership"] == Membership.LEAVE + else: + try: + validate_event_for_room_version(event) + await self._event_auth_handler.check_auth_rules_from_context( + event, context + ) + except AuthError as err: + logger.warning("Denying new event %r because %s", event, err) + raise err - # Ensure that we can round trip before trying to persist in db - try: - dump = json_encoder.encode(event.content) - json_decoder.decode(dump) - except Exception: - logger.exception("Failed to encode content: %r", event.content) - raise + # Ensure that we can round trip before trying to persist in db + try: + dump = json_encoder.encode(event.content) + json_decoder.decode(dump) + except Exception: + logger.exception("Failed to encode content: %r", event.content) + raise # We now persist the event (and update the cache in parallel, since we # don't want to block on it). + event, context = events_and_context[0] try: result, _ = await make_deferred_yieldable( gather_results( ( run_in_background( - self._persist_event, + self._persist_events, requester=requester, - event=event, - context=context, + events_and_context=events_and_context, ratelimit=ratelimit, extra_users=extra_users, ), @@ -1407,45 +1409,47 @@ class EventCreationHandler: return result - async def _persist_event( + async def _persist_events( self, requester: Requester, - event: EventBase, - context: EventContext, + events_and_context: List[Tuple[EventBase, EventContext]], ratelimit: bool = True, extra_users: Optional[List[UserID]] = None, ) -> EventBase: - """Actually persists the event. Should only be called by + """Actually persists new events. Should only be called by `handle_new_client_event`, and see its docstring for documentation of - the arguments. + the arguments. Please note that if batch persisting events, an error in + handling any one of these events will result in all of the events being dropped. PartialStateConflictError: if attempting to persist a partial state event in a room that has been un-partial stated. """ - # Skip push notification actions for historical messages - # because we don't want to notify people about old history back in time. - # The historical messages also do not have the proper `context.current_state_ids` - # and `state_groups` because they have `prev_events` that aren't persisted yet - # (historical messages persisted in reverse-chronological order). - if not event.internal_metadata.is_historical(): - with opentracing.start_active_span("calculate_push_actions"): - await self._bulk_push_rule_evaluator.action_for_event_by_user( - event, context - ) + for event, context in events_and_context: + # Skip push notification actions for historical messages + # because we don't want to notify people about old history back in time. + # The historical messages also do not have the proper `context.current_state_ids` + # and `state_groups` because they have `prev_events` that aren't persisted yet + # (historical messages persisted in reverse-chronological order). + if not event.internal_metadata.is_historical(): + with opentracing.start_active_span("calculate_push_actions"): + await self._bulk_push_rule_evaluator.action_for_event_by_user( + event, context + ) try: # If we're a worker we need to hit out to the master. - writer_instance = self._events_shard_config.get_instance(event.room_id) + first_event, _ = events_and_context[0] + writer_instance = self._events_shard_config.get_instance( + first_event.room_id + ) if writer_instance != self._instance_name: try: - result = await self.send_event( + result = await self.send_events( instance_name=writer_instance, - event_id=event.event_id, + events_and_context=events_and_context, store=self.store, requester=requester, - event=event, - context=context, ratelimit=ratelimit, extra_users=extra_users, ) @@ -1455,6 +1459,11 @@ class EventCreationHandler: raise stream_id = result["stream_id"] event_id = result["event_id"] + + # If we batch persisted events we return the last persisted event, otherwise + # we return the one event that was persisted + event, _ = events_and_context[-1] + if event_id != event.event_id: # If we get a different event back then it means that its # been de-duplicated, so we replace the given event with the @@ -1467,15 +1476,19 @@ class EventCreationHandler: event.internal_metadata.stream_ordering = stream_id return event - event = await self.persist_and_notify_client_event( - requester, event, context, ratelimit=ratelimit, extra_users=extra_users + event = await self.persist_and_notify_client_events( + requester, + events_and_context, + ratelimit=ratelimit, + extra_users=extra_users, ) return event except Exception: - # Ensure that we actually remove the entries in the push actions - # staging area, if we calculated them. - await self.store.remove_push_actions_from_staging(event.event_id) + for event, _ in events_and_context: + # Ensure that we actually remove the entries in the push actions + # staging area, if we calculated them. + await self.store.remove_push_actions_from_staging(event.event_id) raise async def cache_joined_hosts_for_event( @@ -1569,23 +1582,26 @@ class EventCreationHandler: Codes.BAD_ALIAS, ) - async def persist_and_notify_client_event( + async def persist_and_notify_client_events( self, requester: Requester, - event: EventBase, - context: EventContext, + events_and_context: List[Tuple[EventBase, EventContext]], ratelimit: bool = True, extra_users: Optional[List[UserID]] = None, ) -> EventBase: - """Called when we have fully built the event, have already - calculated the push actions for the event, and checked auth. + """Called when we have fully built the events, have already + calculated the push actions for the events, and checked auth. This should only be run on the instance in charge of persisting events. + Please note that if batch persisting events, an error in + handling any one of these events will result in all of the events being dropped. + Returns: - The persisted event. This may be different than the given event if - it was de-duplicated (e.g. because we had already persisted an - event with the same transaction ID.) + The persisted event, if one event is passed in, or the last event in the + list in the case of batch persisting. If only one event was persisted, the + returned event may be different than the given event if it was de-duplicated + (e.g. because we had already persisted an event with the same transaction ID.) Raises: PartialStateConflictError: if attempting to persist a partial state event in @@ -1593,277 +1609,297 @@ class EventCreationHandler: """ extra_users = extra_users or [] - assert self._storage_controllers.persistence is not None - assert self._events_shard_config.should_handle( - self._instance_name, event.room_id - ) + for event, context in events_and_context: + assert self._events_shard_config.should_handle( + self._instance_name, event.room_id + ) - if ratelimit: - # We check if this is a room admin redacting an event so that we - # can apply different ratelimiting. We do this by simply checking - # it's not a self-redaction (to avoid having to look up whether the - # user is actually admin or not). - is_admin_redaction = False - if event.type == EventTypes.Redaction: - assert event.redacts is not None + if ratelimit: + # We check if this is a room admin redacting an event so that we + # can apply different ratelimiting. We do this by simply checking + # it's not a self-redaction (to avoid having to look up whether the + # user is actually admin or not). + is_admin_redaction = False + if event.type == EventTypes.Redaction: + assert event.redacts is not None + + original_event = await self.store.get_event( + event.redacts, + redact_behaviour=EventRedactBehaviour.as_is, + get_prev_content=False, + allow_rejected=False, + allow_none=True, + ) - original_event = await self.store.get_event( - event.redacts, - redact_behaviour=EventRedactBehaviour.as_is, - get_prev_content=False, - allow_rejected=False, - allow_none=True, + is_admin_redaction = bool( + original_event and event.sender != original_event.sender + ) + + await self.request_ratelimiter.ratelimit( + requester, is_admin_redaction=is_admin_redaction ) - is_admin_redaction = bool( - original_event and event.sender != original_event.sender + # run checks/actions on event based on type + if event.type == EventTypes.Member and event.membership == Membership.JOIN: + ( + current_membership, + _, + ) = await self.store.get_local_current_membership_for_user_in_room( + event.state_key, event.room_id ) + if current_membership != Membership.JOIN: + self._notifier.notify_user_joined_room( + event.event_id, event.room_id + ) - await self.request_ratelimiter.ratelimit( - requester, is_admin_redaction=is_admin_redaction - ) + await self._maybe_kick_guest_users(event, context) - if event.type == EventTypes.Member and event.membership == Membership.JOIN: - ( - current_membership, - _, - ) = await self.store.get_local_current_membership_for_user_in_room( - event.state_key, event.room_id - ) - if current_membership != Membership.JOIN: - self._notifier.notify_user_joined_room(event.event_id, event.room_id) + if event.type == EventTypes.CanonicalAlias: + # Validate a newly added alias or newly added alt_aliases. - await self._maybe_kick_guest_users(event, context) + original_alias = None + original_alt_aliases: object = [] - if event.type == EventTypes.CanonicalAlias: - # Validate a newly added alias or newly added alt_aliases. + original_event_id = event.unsigned.get("replaces_state") + if original_event_id: + original_alias_event = await self.store.get_event(original_event_id) - original_alias = None - original_alt_aliases: object = [] + if original_alias_event: + original_alias = original_alias_event.content.get("alias", None) + original_alt_aliases = original_alias_event.content.get( + "alt_aliases", [] + ) - original_event_id = event.unsigned.get("replaces_state") - if original_event_id: - original_event = await self.store.get_event(original_event_id) + # Check the alias is currently valid (if it has changed). + room_alias_str = event.content.get("alias", None) + directory_handler = self.hs.get_directory_handler() + if room_alias_str and room_alias_str != original_alias: + await self._validate_canonical_alias( + directory_handler, room_alias_str, event.room_id + ) - if original_event: - original_alias = original_event.content.get("alias", None) - original_alt_aliases = original_event.content.get("alt_aliases", []) - - # Check the alias is currently valid (if it has changed). - room_alias_str = event.content.get("alias", None) - directory_handler = self.hs.get_directory_handler() - if room_alias_str and room_alias_str != original_alias: - await self._validate_canonical_alias( - directory_handler, room_alias_str, event.room_id - ) + # Check that alt_aliases is the proper form. + alt_aliases = event.content.get("alt_aliases", []) + if not isinstance(alt_aliases, (list, tuple)): + raise SynapseError( + 400, + "The alt_aliases property must be a list.", + Codes.INVALID_PARAM, + ) - # Check that alt_aliases is the proper form. - alt_aliases = event.content.get("alt_aliases", []) - if not isinstance(alt_aliases, (list, tuple)): - raise SynapseError( - 400, "The alt_aliases property must be a list.", Codes.INVALID_PARAM - ) + # If the old version of alt_aliases is of an unknown form, + # completely replace it. + if not isinstance(original_alt_aliases, (list, tuple)): + # TODO: check that the original_alt_aliases' entries are all strings + original_alt_aliases = [] + + # Check that each alias is currently valid. + new_alt_aliases = set(alt_aliases) - set(original_alt_aliases) + if new_alt_aliases: + for alias_str in new_alt_aliases: + await self._validate_canonical_alias( + directory_handler, alias_str, event.room_id + ) - # If the old version of alt_aliases is of an unknown form, - # completely replace it. - if not isinstance(original_alt_aliases, (list, tuple)): - # TODO: check that the original_alt_aliases' entries are all strings - original_alt_aliases = [] + federation_handler = self.hs.get_federation_handler() - # Check that each alias is currently valid. - new_alt_aliases = set(alt_aliases) - set(original_alt_aliases) - if new_alt_aliases: - for alias_str in new_alt_aliases: - await self._validate_canonical_alias( - directory_handler, alias_str, event.room_id + if event.type == EventTypes.Member: + if event.content["membership"] == Membership.INVITE: + event.unsigned[ + "invite_room_state" + ] = await self.store.get_stripped_room_state_from_event_context( + context, + self.room_prejoin_state_types, + membership_user_id=event.sender, ) - federation_handler = self.hs.get_federation_handler() + invitee = UserID.from_string(event.state_key) + if not self.hs.is_mine(invitee): + # TODO: Can we add signature from remote server in a nicer + # way? If we have been invited by a remote server, we need + # to get them to sign the event. - if event.type == EventTypes.Member: - if event.content["membership"] == Membership.INVITE: - event.unsigned[ - "invite_room_state" - ] = await self.store.get_stripped_room_state_from_event_context( - context, - self.room_prejoin_state_types, - membership_user_id=event.sender, - ) + returned_invite = await federation_handler.send_invite( + invitee.domain, event + ) + event.unsigned.pop("room_state", None) - invitee = UserID.from_string(event.state_key) - if not self.hs.is_mine(invitee): - # TODO: Can we add signature from remote server in a nicer - # way? If we have been invited by a remote server, we need - # to get them to sign the event. + # TODO: Make sure the signatures actually are correct. + event.signatures.update(returned_invite.signatures) - returned_invite = await federation_handler.send_invite( - invitee.domain, event + if event.content["membership"] == Membership.KNOCK: + event.unsigned[ + "knock_room_state" + ] = await self.store.get_stripped_room_state_from_event_context( + context, + self.room_prejoin_state_types, ) - event.unsigned.pop("room_state", None) - # TODO: Make sure the signatures actually are correct. - event.signatures.update(returned_invite.signatures) + if event.type == EventTypes.Redaction: + assert event.redacts is not None - if event.content["membership"] == Membership.KNOCK: - event.unsigned[ - "knock_room_state" - ] = await self.store.get_stripped_room_state_from_event_context( - context, - self.room_prejoin_state_types, + original_event = await self.store.get_event( + event.redacts, + redact_behaviour=EventRedactBehaviour.as_is, + get_prev_content=False, + allow_rejected=False, + allow_none=True, ) - if event.type == EventTypes.Redaction: - assert event.redacts is not None + room_version = await self.store.get_room_version_id(event.room_id) + room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - original_event = await self.store.get_event( - event.redacts, - redact_behaviour=EventRedactBehaviour.as_is, - get_prev_content=False, - allow_rejected=False, - allow_none=True, - ) + # we can make some additional checks now if we have the original event. + if original_event: + if original_event.type == EventTypes.Create: + raise AuthError(403, "Redacting create events is not permitted") - room_version = await self.store.get_room_version_id(event.room_id) - room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - - # we can make some additional checks now if we have the original event. - if original_event: - if original_event.type == EventTypes.Create: - raise AuthError(403, "Redacting create events is not permitted") - - if original_event.room_id != event.room_id: - raise SynapseError(400, "Cannot redact event from a different room") - - if original_event.type == EventTypes.ServerACL: - raise AuthError(403, "Redacting server ACL events is not permitted") - - # Add a little safety stop-gap to prevent people from trying to - # redact MSC2716 related events when they're in a room version - # which does not support it yet. We allow people to use MSC2716 - # events in existing room versions but only from the room - # creator since it does not require any changes to the auth - # rules and in effect, the redaction algorithm . In the - # supported room version, we add the `historical` power level to - # auth the MSC2716 related events and adjust the redaction - # algorthim to keep the `historical` field around (redacting an - # event should only strip fields which don't affect the - # structural protocol level). - is_msc2716_event = ( - original_event.type == EventTypes.MSC2716_INSERTION - or original_event.type == EventTypes.MSC2716_BATCH - or original_event.type == EventTypes.MSC2716_MARKER - ) - if not room_version_obj.msc2716_historical and is_msc2716_event: - raise AuthError( - 403, - "Redacting MSC2716 events is not supported in this room version", - ) + if original_event.room_id != event.room_id: + raise SynapseError( + 400, "Cannot redact event from a different room" + ) - event_types = event_auth.auth_types_for_event(event.room_version, event) - prev_state_ids = await context.get_prev_state_ids( - StateFilter.from_types(event_types) - ) + if original_event.type == EventTypes.ServerACL: + raise AuthError( + 403, "Redacting server ACL events is not permitted" + ) - auth_events_ids = self._event_auth_handler.compute_auth_events( - event, prev_state_ids, for_verification=True - ) - auth_events_map = await self.store.get_events(auth_events_ids) - auth_events = {(e.type, e.state_key): e for e in auth_events_map.values()} + # Add a little safety stop-gap to prevent people from trying to + # redact MSC2716 related events when they're in a room version + # which does not support it yet. We allow people to use MSC2716 + # events in existing room versions but only from the room + # creator since it does not require any changes to the auth + # rules and in effect, the redaction algorithm . In the + # supported room version, we add the `historical` power level to + # auth the MSC2716 related events and adjust the redaction + # algorthim to keep the `historical` field around (redacting an + # event should only strip fields which don't affect the + # structural protocol level). + is_msc2716_event = ( + original_event.type == EventTypes.MSC2716_INSERTION + or original_event.type == EventTypes.MSC2716_BATCH + or original_event.type == EventTypes.MSC2716_MARKER + ) + if not room_version_obj.msc2716_historical and is_msc2716_event: + raise AuthError( + 403, + "Redacting MSC2716 events is not supported in this room version", + ) - if event_auth.check_redaction( - room_version_obj, event, auth_events=auth_events - ): - # this user doesn't have 'redact' rights, so we need to do some more - # checks on the original event. Let's start by checking the original - # event exists. - if not original_event: - raise NotFoundError("Could not find event %s" % (event.redacts,)) - - if event.user_id != original_event.user_id: - raise AuthError(403, "You don't have permission to redact events") - - # all the checks are done. - event.internal_metadata.recheck_redaction = False - - if event.type == EventTypes.Create: - prev_state_ids = await context.get_prev_state_ids() - if prev_state_ids: - raise AuthError(403, "Changing the room create event is forbidden") - - if event.type == EventTypes.MSC2716_INSERTION: - room_version = await self.store.get_room_version_id(event.room_id) - room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - - create_event = await self.store.get_create_event_for_room(event.room_id) - room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) - - # Only check an insertion event if the room version - # supports it or the event is from the room creator. - if room_version_obj.msc2716_historical or ( - self.config.experimental.msc2716_enabled - and event.sender == room_creator - ): - next_batch_id = event.content.get( - EventContentFields.MSC2716_NEXT_BATCH_ID + event_types = event_auth.auth_types_for_event(event.room_version, event) + prev_state_ids = await context.get_prev_state_ids( + StateFilter.from_types(event_types) ) - conflicting_insertion_event_id = None - if next_batch_id: - conflicting_insertion_event_id = ( - await self.store.get_insertion_event_id_by_batch_id( - event.room_id, next_batch_id + + auth_events_ids = self._event_auth_handler.compute_auth_events( + event, prev_state_ids, for_verification=True + ) + auth_events_map = await self.store.get_events(auth_events_ids) + auth_events = { + (e.type, e.state_key): e for e in auth_events_map.values() + } + + if event_auth.check_redaction( + room_version_obj, event, auth_events=auth_events + ): + # this user doesn't have 'redact' rights, so we need to do some more + # checks on the original event. Let's start by checking the original + # event exists. + if not original_event: + raise NotFoundError( + "Could not find event %s" % (event.redacts,) ) + + if event.user_id != original_event.user_id: + raise AuthError( + 403, "You don't have permission to redact events" + ) + + # all the checks are done. + event.internal_metadata.recheck_redaction = False + + if event.type == EventTypes.Create: + prev_state_ids = await context.get_prev_state_ids() + if prev_state_ids: + raise AuthError(403, "Changing the room create event is forbidden") + + if event.type == EventTypes.MSC2716_INSERTION: + room_version = await self.store.get_room_version_id(event.room_id) + room_version_obj = KNOWN_ROOM_VERSIONS[room_version] + + create_event = await self.store.get_create_event_for_room(event.room_id) + room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) + + # Only check an insertion event if the room version + # supports it or the event is from the room creator. + if room_version_obj.msc2716_historical or ( + self.config.experimental.msc2716_enabled + and event.sender == room_creator + ): + next_batch_id = event.content.get( + EventContentFields.MSC2716_NEXT_BATCH_ID ) - if conflicting_insertion_event_id is not None: - # The current insertion event that we're processing is invalid - # because an insertion event already exists in the room with the - # same next_batch_id. We can't allow multiple because the batch - # pointing will get weird, e.g. we can't determine which insertion - # event the batch event is pointing to. - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "Another insertion event already exists with the same next_batch_id", - errcode=Codes.INVALID_PARAM, - ) + conflicting_insertion_event_id = None + if next_batch_id: + conflicting_insertion_event_id = ( + await self.store.get_insertion_event_id_by_batch_id( + event.room_id, next_batch_id + ) + ) + if conflicting_insertion_event_id is not None: + # The current insertion event that we're processing is invalid + # because an insertion event already exists in the room with the + # same next_batch_id. We can't allow multiple because the batch + # pointing will get weird, e.g. we can't determine which insertion + # event the batch event is pointing to. + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Another insertion event already exists with the same next_batch_id", + errcode=Codes.INVALID_PARAM, + ) - # Mark any `m.historical` messages as backfilled so they don't appear - # in `/sync` and have the proper decrementing `stream_ordering` as we import - backfilled = False - if event.internal_metadata.is_historical(): - backfilled = True + # Mark any `m.historical` messages as backfilled so they don't appear + # in `/sync` and have the proper decrementing `stream_ordering` as we import + backfilled = False + if event.internal_metadata.is_historical(): + backfilled = True - # Note that this returns the event that was persisted, which may not be - # the same as we passed in if it was deduplicated due transaction IDs. + assert self._storage_controllers.persistence is not None ( - event, - event_pos, + persisted_events, max_stream_token, - ) = await self._storage_controllers.persistence.persist_event( - event, context=context, backfilled=backfilled + ) = await self._storage_controllers.persistence.persist_events( + events_and_context, backfilled=backfilled ) - if self._ephemeral_events_enabled: - # If there's an expiry timestamp on the event, schedule its expiry. - self._message_handler.maybe_schedule_expiry(event) + for event in persisted_events: + if self._ephemeral_events_enabled: + # If there's an expiry timestamp on the event, schedule its expiry. + self._message_handler.maybe_schedule_expiry(event) - async def _notify() -> None: - try: - await self.notifier.on_new_room_event( - event, event_pos, max_stream_token, extra_users=extra_users - ) - except Exception: - logger.exception( - "Error notifying about new room event %s", - event.event_id, - ) + stream_ordering = event.internal_metadata.stream_ordering + assert stream_ordering is not None + pos = PersistedEventPosition(self._instance_name, stream_ordering) + + async def _notify() -> None: + try: + await self.notifier.on_new_room_event( + event, pos, max_stream_token, extra_users=extra_users + ) + except Exception: + logger.exception( + "Error notifying about new room event %s", + event.event_id, + ) - run_in_background(_notify) + run_in_background(_notify) - if event.type == EventTypes.Message: - # We don't want to block sending messages on any presence code. This - # matters as sometimes presence code can take a while. - run_in_background(self._bump_active_time, requester.user) + if event.type == EventTypes.Message: + # We don't want to block sending messages on any presence code. This + # matters as sometimes presence code can take a while. + run_in_background(self._bump_active_time, requester.user) - return event + return persisted_events[-1] async def _maybe_kick_guest_users( self, event: EventBase, context: EventContext @@ -1952,8 +1988,7 @@ class EventCreationHandler: # shadow-banned user. await self.handle_new_client_event( requester, - event, - context, + events_and_context=[(event, context)], ratelimit=False, ignore_shadow_ban=True, ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 09a1a82e6c..b220238e55 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -301,8 +301,7 @@ class RoomCreationHandler: # now send the tombstone await self.event_creation_handler.handle_new_client_event( requester=requester, - event=tombstone_event, - context=tombstone_context, + events_and_context=[(tombstone_event, tombstone_context)], ) state_filter = StateFilter.from_types( @@ -1057,8 +1056,10 @@ class RoomCreationHandler: creator_id = creator.user.to_string() event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""} depth = 1 + # the last event sent/persisted to the db last_sent_event_id: Optional[str] = None + # the most recently created event prev_event: List[str] = [] # a map of event types, state keys -> event_ids. We collect these mappings this as events are @@ -1112,8 +1113,7 @@ class RoomCreationHandler: ev = await self.event_creation_handler.handle_new_client_event( requester=creator, - event=event, - context=context, + events_and_context=[(event, context)], ratelimit=False, ignore_shadow_ban=True, ) @@ -1152,7 +1152,6 @@ class RoomCreationHandler: prev_event_ids=[last_sent_event_id], depth=depth, ) - last_sent_event_id = member_event_id prev_event = [member_event_id] # update the depth and state map here as the membership event has been created @@ -1168,7 +1167,7 @@ class RoomCreationHandler: EventTypes.PowerLevels, pl_content, False ) current_state_group = power_context._state_group - last_sent_stream_id = await send(power_event, power_context, creator) + await send(power_event, power_context, creator) else: power_level_content: JsonDict = { "users": {creator_id: 100}, @@ -1217,7 +1216,7 @@ class RoomCreationHandler: False, ) current_state_group = pl_context._state_group - last_sent_stream_id = await send(pl_event, pl_context, creator) + await send(pl_event, pl_context, creator) events_to_send = [] if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state: @@ -1271,9 +1270,11 @@ class RoomCreationHandler: ) events_to_send.append((encryption_event, encryption_context)) - for event, context in events_to_send: - last_sent_stream_id = await send(event, context, creator) - return last_sent_stream_id, last_sent_event_id, depth + last_event = await self.event_creation_handler.handle_new_client_event( + creator, events_to_send, ignore_shadow_ban=True + ) + assert last_event.internal_metadata.stream_ordering is not None + return last_event.internal_metadata.stream_ordering, last_event.event_id, depth def _generate_room_id(self) -> str: """Generates a random room ID. diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py index 1414e575d6..411a6fb22f 100644 --- a/synapse/handlers/room_batch.py +++ b/synapse/handlers/room_batch.py @@ -379,8 +379,7 @@ class RoomBatchHandler: await self.create_requester_for_user_id_from_app_service( event.sender, app_service_requester.app_service ), - event=event, - context=context, + events_and_context=[(event, context)], ) return event_ids diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 8d01f4bf2b..88158822e0 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -432,8 +432,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): with opentracing.start_active_span("handle_new_client_event"): result_event = await self.event_creation_handler.handle_new_client_event( requester, - event, - context, + events_and_context=[(event, context)], extra_users=[target], ratelimit=ratelimit, ) @@ -1252,7 +1251,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): raise SynapseError(403, "This room has been blocked on this server") event = await self.event_creation_handler.handle_new_client_event( - requester, event, context, extra_users=[target_user], ratelimit=ratelimit + requester, + events_and_context=[(event, context)], + extra_users=[target_user], + ratelimit=ratelimit, ) prev_member_event_id = prev_state_ids.get( @@ -1860,8 +1862,7 @@ class RoomMemberMasterHandler(RoomMemberHandler): result_event = await self.event_creation_handler.handle_new_client_event( requester, - event, - context, + events_and_context=[(event, context)], extra_users=[UserID.from_string(target_user)], ) # we know it was persisted, so must have a stream ordering diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 53aa7fa4c6..ac9a92240a 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -25,6 +25,7 @@ from synapse.replication.http import ( push, register, send_event, + send_events, state, streams, ) @@ -43,6 +44,7 @@ class ReplicationRestResource(JsonResource): def register_servlets(self, hs: "HomeServer") -> None: send_event.register_servlets(hs, self) + send_events.register_servlets(hs, self) federation.register_servlets(hs, self) presence.register_servlets(hs, self) membership.register_servlets(hs, self) diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 486f04723c..4215a1c1bc 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -141,8 +141,8 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): "Got event to send with ID: %s into room: %s", event.event_id, event.room_id ) - event = await self.event_creation_handler.persist_and_notify_client_event( - requester, event, context, ratelimit=ratelimit, extra_users=extra_users + event = await self.event_creation_handler.persist_and_notify_client_events( + requester, [(event, context)], ratelimit=ratelimit, extra_users=extra_users ) return ( diff --git a/synapse/replication/http/send_events.py b/synapse/replication/http/send_events.py new file mode 100644 index 0000000000..8889bbb644 --- /dev/null +++ b/synapse/replication/http/send_events.py @@ -0,0 +1,171 @@ +# Copyright 2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING, List, Tuple + +from twisted.web.server import Request + +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.events import EventBase, make_event_from_dict +from synapse.events.snapshot import EventContext +from synapse.http.server import HttpServer +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint +from synapse.types import JsonDict, Requester, UserID +from synapse.util.metrics import Measure + +if TYPE_CHECKING: + from synapse.server import HomeServer + from synapse.storage.databases.main import DataStore + +logger = logging.getLogger(__name__) + + +class ReplicationSendEventsRestServlet(ReplicationEndpoint): + """Handles batches of newly created events on workers, including persisting and + notifying. + + The API looks like: + + POST /_synapse/replication/send_events/:txn_id + + { + "events": [{ + "event": { .. serialized event .. }, + "room_version": .., // "1", "2", "3", etc: the version of the room + // containing the event + "event_format_version": .., // 1,2,3 etc: the event format version + "internal_metadata": { .. serialized internal_metadata .. }, + "outlier": true|false, + "rejected_reason": .., // The event.rejected_reason field + "context": { .. serialized event context .. }, + "requester": { .. serialized requester .. }, + "ratelimit": true, + }] + } + + 200 OK + + { "stream_id": 12345, "event_id": "$abcdef..." } + + Responds with a 409 when a `PartialStateConflictError` is raised due to an event + context that needs to be recomputed due to the un-partial stating of a room. + + """ + + NAME = "send_events" + PATH_ARGS = () + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self.event_creation_handler = hs.get_event_creation_handler() + self.store = hs.get_datastores().main + self._storage_controllers = hs.get_storage_controllers() + self.clock = hs.get_clock() + + @staticmethod + async def _serialize_payload( # type: ignore[override] + events_and_context: List[Tuple[EventBase, EventContext]], + store: "DataStore", + requester: Requester, + ratelimit: bool, + extra_users: List[UserID], + ) -> JsonDict: + """ + Args: + store + requester + events_and_ctx + ratelimit + """ + serialized_events = [] + + for event, context in events_and_context: + serialized_context = await context.serialize(event, store) + serialized_event = { + "event": event.get_pdu_json(), + "room_version": event.room_version.identifier, + "event_format_version": event.format_version, + "internal_metadata": event.internal_metadata.get_dict(), + "outlier": event.internal_metadata.is_outlier(), + "rejected_reason": event.rejected_reason, + "context": serialized_context, + "requester": requester.serialize(), + "ratelimit": ratelimit, + "extra_users": [u.to_string() for u in extra_users], + } + serialized_events.append(serialized_event) + + payload = {"events": serialized_events} + + return payload + + async def _handle_request( # type: ignore[override] + self, request: Request + ) -> Tuple[int, JsonDict]: + with Measure(self.clock, "repl_send_events_parse"): + payload = parse_json_object_from_request(request) + events_and_context = [] + events = payload["events"] + + for event_payload in events: + event_dict = event_payload["event"] + room_ver = KNOWN_ROOM_VERSIONS[event_payload["room_version"]] + internal_metadata = event_payload["internal_metadata"] + rejected_reason = event_payload["rejected_reason"] + + event = make_event_from_dict( + event_dict, room_ver, internal_metadata, rejected_reason + ) + event.internal_metadata.outlier = event_payload["outlier"] + + requester = Requester.deserialize( + self.store, event_payload["requester"] + ) + context = EventContext.deserialize( + self._storage_controllers, event_payload["context"] + ) + + ratelimit = event_payload["ratelimit"] + events_and_context.append((event, context)) + + extra_users = [ + UserID.from_string(u) for u in event_payload["extra_users"] + ] + + logger.info( + "Got batch of events to send, last ID of batch is: %s, sending into room: %s", + event.event_id, + event.room_id, + ) + + last_event = ( + await self.event_creation_handler.persist_and_notify_client_events( + requester, events_and_context, ratelimit, extra_users + ) + ) + + return ( + 200, + { + "stream_id": last_event.internal_metadata.stream_ordering, + "event_id": last_event.event_id, + }, + ) + + +def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: + ReplicationSendEventsRestServlet(hs).register(http_server) diff --git a/tests/handlers/test_message.py b/tests/handlers/test_message.py index 986b50ce0c..99384837d0 100644 --- a/tests/handlers/test_message.py +++ b/tests/handlers/test_message.py @@ -105,7 +105,10 @@ class EventCreationTestCase(unittest.HomeserverTestCase): event1, context = self._create_duplicate_event(txn_id) ret_event1 = self.get_success( - self.handler.handle_new_client_event(self.requester, event1, context) + self.handler.handle_new_client_event( + self.requester, + events_and_context=[(event1, context)], + ) ) stream_id1 = ret_event1.internal_metadata.stream_ordering @@ -118,7 +121,10 @@ class EventCreationTestCase(unittest.HomeserverTestCase): self.assertNotEqual(event1.event_id, event2.event_id) ret_event2 = self.get_success( - self.handler.handle_new_client_event(self.requester, event2, context) + self.handler.handle_new_client_event( + self.requester, + events_and_context=[(event2, context)], + ) ) stream_id2 = ret_event2.internal_metadata.stream_ordering diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py index 86b3d51975..765df75d91 100644 --- a/tests/handlers/test_register.py +++ b/tests/handlers/test_register.py @@ -497,7 +497,9 @@ class RegistrationTestCase(unittest.HomeserverTestCase): ) ) self.get_success( - event_creation_handler.handle_new_client_event(requester, event, context) + event_creation_handler.handle_new_client_event( + requester, events_and_context=[(event, context)] + ) ) # Register a second user, which won't be be in the room (or even have an invite) diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index a0ce077a99..de9f4af2de 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -531,7 +531,9 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase): ) ) self.get_success( - event_handler.handle_new_client_event(self.requester, event, context) + event_handler.handle_new_client_event( + self.requester, events_and_context=[(event, context)] + ) ) state1 = set(self.get_success(context.get_current_state_ids()).values()) @@ -549,7 +551,9 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase): ) ) self.get_success( - event_handler.handle_new_client_event(self.requester, event, context) + event_handler.handle_new_client_event( + self.requester, events_and_context=[(event, context)] + ) ) state2 = set(self.get_success(context.get_current_state_ids()).values()) diff --git a/tests/unittest.py b/tests/unittest.py index 00cb023198..5116be338e 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -734,7 +734,9 @@ class HomeserverTestCase(TestCase): event.internal_metadata.soft_failed = True self.get_success( - event_creator.handle_new_client_event(requester, event, context) + event_creator.handle_new_client_event( + requester, events_and_context=[(event, context)] + ) ) return event.event_id -- cgit 1.5.1 From be76cd8200b18f3c68b895f85ac7ef5b0ddc2466 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 29 Sep 2022 14:23:24 +0100 Subject: Allow admins to require a manual approval process before new accounts can be used (using MSC3866) (#13556) --- changelog.d/13556.feature | 1 + synapse/_scripts/synapse_port_db.py | 2 +- synapse/api/constants.py | 11 ++ synapse/api/errors.py | 16 ++ synapse/config/experimental.py | 19 +++ synapse/handlers/admin.py | 5 + synapse/handlers/auth.py | 11 ++ synapse/handlers/register.py | 8 + synapse/replication/http/register.py | 5 + synapse/rest/admin/users.py | 43 ++++- synapse/rest/client/login.py | 37 +++- synapse/rest/client/register.py | 22 ++- synapse/storage/databases/main/__init__.py | 9 +- synapse/storage/databases/main/registration.py | 150 +++++++++++++++-- .../main/delta/73/03users_approved_column.sql | 20 +++ tests/rest/admin/test_user.py | 186 ++++++++++++++++++++- tests/rest/client/test_auth.py | 33 +++- tests/rest/client/test_login.py | 41 +++++ tests/rest/client/test_register.py | 32 +++- tests/rest/client/utils.py | 12 +- tests/storage/test_registration.py | 102 ++++++++++- 21 files changed, 731 insertions(+), 34 deletions(-) create mode 100644 changelog.d/13556.feature create mode 100644 synapse/storage/schema/main/delta/73/03users_approved_column.sql (limited to 'synapse/replication/http') diff --git a/changelog.d/13556.feature b/changelog.d/13556.feature new file mode 100644 index 0000000000..f9d63db6c0 --- /dev/null +++ b/changelog.d/13556.feature @@ -0,0 +1 @@ +Allow server admins to require a manual approval process before new accounts can be used (using [MSC3866](https://github.com/matrix-org/matrix-spec-proposals/pull/3866)). diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 450ba462ba..5fa599e70e 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -107,7 +107,7 @@ BOOLEAN_COLUMNS = { "redactions": ["have_censored"], "room_stats_state": ["is_federatable"], "local_media_repository": ["safe_from_quarantine"], - "users": ["shadow_banned"], + "users": ["shadow_banned", "approved"], "e2e_fallback_keys_json": ["used"], "access_tokens": ["used"], "device_lists_changes_in_room": ["converted_to_destinations"], diff --git a/synapse/api/constants.py b/synapse/api/constants.py index c178ddf070..c031903b1a 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -269,3 +269,14 @@ class PublicRoomsFilterFields: GENERIC_SEARCH_TERM: Final = "generic_search_term" ROOM_TYPES: Final = "room_types" + + +class ApprovalNoticeMedium: + """Identifier for the medium this server will use to serve notice of approval for a + specific user's registration. + + As defined in https://github.com/matrix-org/matrix-spec-proposals/blob/babolivier/m_not_approved/proposals/3866-user-not-approved-error.md + """ + + NONE = "org.matrix.msc3866.none" + EMAIL = "org.matrix.msc3866.email" diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 1c6b53aa24..c606207569 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -106,6 +106,8 @@ class Codes(str, Enum): # Part of MSC3895. UNABLE_DUE_TO_PARTIAL_STATE = "ORG.MATRIX.MSC3895_UNABLE_DUE_TO_PARTIAL_STATE" + USER_AWAITING_APPROVAL = "ORG.MATRIX.MSC3866_USER_AWAITING_APPROVAL" + class CodeMessageException(RuntimeError): """An exception with integer code and message string attributes. @@ -566,6 +568,20 @@ class UnredactedContentDeletedError(SynapseError): return cs_error(self.msg, self.errcode, **extra) +class NotApprovedError(SynapseError): + def __init__( + self, + msg: str, + approval_notice_medium: str, + ): + super().__init__( + code=403, + msg=msg, + errcode=Codes.USER_AWAITING_APPROVAL, + additional_fields={"approval_notice_medium": approval_notice_medium}, + ) + + def cs_error(msg: str, code: str = Codes.UNKNOWN, **kwargs: Any) -> "JsonDict": """Utility method for constructing an error response for client-server interactions. diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 933779c23a..31834fb27d 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -14,10 +14,25 @@ from typing import Any +import attr + from synapse.config._base import Config from synapse.types import JsonDict +@attr.s(auto_attribs=True, frozen=True, slots=True) +class MSC3866Config: + """Configuration for MSC3866 (mandating approval for new users)""" + + # Whether the base support for the approval process is enabled. This includes the + # ability for administrators to check and update the approval of users, even if no + # approval is currently required. + enabled: bool = False + # Whether to require that new users are approved by an admin before their account + # can be used. Note that this setting is ignored if 'enabled' is false. + require_approval_for_new_accounts: bool = False + + class ExperimentalConfig(Config): """Config section for enabling experimental features""" @@ -97,6 +112,10 @@ class ExperimentalConfig(Config): # MSC3852: Expose last seen user agent field on /_matrix/client/v3/devices. self.msc3852_enabled: bool = experimental.get("msc3852_enabled", False) + # MSC3866: M_USER_AWAITING_APPROVAL error code + raw_msc3866_config = experimental.get("msc3866", {}) + self.msc3866 = MSC3866Config(**raw_msc3866_config) + # MSC3881: Remotely toggle push notifications for another client self.msc3881_enabled: bool = experimental.get("msc3881_enabled", False) diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index cf9f19608a..f2989cc4a2 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -32,6 +32,7 @@ class AdminHandler: self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self._state_storage_controller = self._storage_controllers.state + self._msc3866_enabled = hs.config.experimental.msc3866.enabled async def get_whois(self, user: UserID) -> JsonDict: connections = [] @@ -75,6 +76,10 @@ class AdminHandler: "is_guest", } + if self._msc3866_enabled: + # Only include the approved flag if support for MSC3866 is enabled. + user_info_to_return.add("approved") + # Restrict returned keys to a known set. user_info_dict = { key: value diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index eacd631ee0..f5f0e0e7a7 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -1009,6 +1009,17 @@ class AuthHandler: return res[0] return None + async def is_user_approved(self, user_id: str) -> bool: + """Checks if a user is approved and therefore can be allowed to log in. + + Args: + user_id: the user to check the approval status of. + + Returns: + A boolean that is True if the user is approved, False otherwise. + """ + return await self.store.is_user_approved(user_id) + async def _find_user_id_and_pwd_hash( self, user_id: str ) -> Optional[Tuple[str, str]]: diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index cfcadb34db..ca1c7a1866 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -220,6 +220,7 @@ class RegistrationHandler: by_admin: bool = False, user_agent_ips: Optional[List[Tuple[str, str]]] = None, auth_provider_id: Optional[str] = None, + approved: bool = False, ) -> str: """Registers a new client on the server. @@ -246,6 +247,8 @@ class RegistrationHandler: user_agent_ips: Tuples of user-agents and IP addresses used during the registration process. auth_provider_id: The SSO IdP the user used, if any. + approved: True if the new user should be considered already + approved by an administrator. Returns: The registered user_id. Raises: @@ -307,6 +310,7 @@ class RegistrationHandler: user_type=user_type, address=address, shadow_banned=shadow_banned, + approved=approved, ) profile = await self.store.get_profileinfo(localpart) @@ -695,6 +699,7 @@ class RegistrationHandler: user_type: Optional[str] = None, address: Optional[str] = None, shadow_banned: bool = False, + approved: bool = False, ) -> None: """Register user in the datastore. @@ -713,6 +718,7 @@ class RegistrationHandler: api.constants.UserTypes, or None for a normal user. address: the IP address used to perform the registration. shadow_banned: Whether to shadow-ban the user + approved: Whether to mark the user as approved by an administrator """ if self.hs.config.worker.worker_app: await self._register_client( @@ -726,6 +732,7 @@ class RegistrationHandler: user_type=user_type, address=address, shadow_banned=shadow_banned, + approved=approved, ) else: await self.store.register_user( @@ -738,6 +745,7 @@ class RegistrationHandler: admin=admin, user_type=user_type, shadow_banned=shadow_banned, + approved=approved, ) # Only call the account validity module(s) on the main process, to avoid diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py index 6c8f8388fd..61abb529c8 100644 --- a/synapse/replication/http/register.py +++ b/synapse/replication/http/register.py @@ -51,6 +51,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint): user_type: Optional[str], address: Optional[str], shadow_banned: bool, + approved: bool, ) -> JsonDict: """ Args: @@ -68,6 +69,8 @@ class ReplicationRegisterServlet(ReplicationEndpoint): or None for a normal user. address: the IP address used to perform the regitration. shadow_banned: Whether to shadow-ban the user + approved: Whether the user should be considered already approved by an + administrator. """ return { "password_hash": password_hash, @@ -79,6 +82,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint): "user_type": user_type, "address": address, "shadow_banned": shadow_banned, + "approved": approved, } async def _handle_request( # type: ignore[override] @@ -99,6 +103,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint): user_type=content["user_type"], address=content["address"], shadow_banned=content["shadow_banned"], + approved=content["approved"], ) return 200, {} diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 1274773d7e..15ac2059aa 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -69,6 +69,7 @@ class UsersRestServletV2(RestServlet): self.store = hs.get_datastores().main self.auth = hs.get_auth() self.admin_handler = hs.get_admin_handler() + self._msc3866_enabled = hs.config.experimental.msc3866.enabled async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self.auth, request) @@ -95,6 +96,13 @@ class UsersRestServletV2(RestServlet): guests = parse_boolean(request, "guests", default=True) deactivated = parse_boolean(request, "deactivated", default=False) + # If support for MSC3866 is not enabled, apply no filtering based on the + # `approved` column. + if self._msc3866_enabled: + approved = parse_boolean(request, "approved", default=True) + else: + approved = True + order_by = parse_string( request, "order_by", @@ -115,8 +123,22 @@ class UsersRestServletV2(RestServlet): direction = parse_string(request, "dir", default="f", allowed_values=("f", "b")) users, total = await self.store.get_users_paginate( - start, limit, user_id, name, guests, deactivated, order_by, direction + start, + limit, + user_id, + name, + guests, + deactivated, + order_by, + direction, + approved, ) + + # If support for MSC3866 is not enabled, don't show the approval flag. + if not self._msc3866_enabled: + for user in users: + del user["approved"] + ret = {"users": users, "total": total} if (start + limit) < total: ret["next_token"] = str(start + len(users)) @@ -163,6 +185,7 @@ class UserRestServletV2(RestServlet): self.deactivate_account_handler = hs.get_deactivate_account_handler() self.registration_handler = hs.get_registration_handler() self.pusher_pool = hs.get_pusherpool() + self._msc3866_enabled = hs.config.experimental.msc3866.enabled async def on_GET( self, request: SynapseRequest, user_id: str @@ -239,6 +262,15 @@ class UserRestServletV2(RestServlet): HTTPStatus.BAD_REQUEST, "'deactivated' parameter is not of type boolean" ) + approved: Optional[bool] = None + if "approved" in body and self._msc3866_enabled: + approved = body["approved"] + if not isinstance(approved, bool): + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "'approved' parameter is not of type boolean", + ) + # convert List[Dict[str, str]] into List[Tuple[str, str]] if external_ids is not None: new_external_ids = [ @@ -343,6 +375,9 @@ class UserRestServletV2(RestServlet): if "user_type" in body: await self.store.set_user_type(target_user, user_type) + if approved is not None: + await self.store.update_user_approval_status(target_user, approved) + user = await self.admin_handler.get_user(target_user) assert user is not None @@ -355,6 +390,10 @@ class UserRestServletV2(RestServlet): if password is not None: password_hash = await self.auth_handler.hash(password) + new_user_approved = True + if self._msc3866_enabled and approved is not None: + new_user_approved = approved + user_id = await self.registration_handler.register_user( localpart=target_user.localpart, password_hash=password_hash, @@ -362,6 +401,7 @@ class UserRestServletV2(RestServlet): default_display_name=displayname, user_type=user_type, by_admin=True, + approved=new_user_approved, ) if threepids is not None: @@ -550,6 +590,7 @@ class UserRegisterServlet(RestServlet): user_type=user_type, default_display_name=displayname, by_admin=True, + approved=True, ) result = await register._create_registration_details(user_id, body) diff --git a/synapse/rest/client/login.py b/synapse/rest/client/login.py index 0437c87d8d..f554586ac3 100644 --- a/synapse/rest/client/login.py +++ b/synapse/rest/client/login.py @@ -28,7 +28,14 @@ from typing import ( from typing_extensions import TypedDict -from synapse.api.errors import Codes, InvalidClientTokenError, LoginError, SynapseError +from synapse.api.constants import ApprovalNoticeMedium +from synapse.api.errors import ( + Codes, + InvalidClientTokenError, + LoginError, + NotApprovedError, + SynapseError, +) from synapse.api.ratelimiting import Ratelimiter from synapse.api.urls import CLIENT_API_PREFIX from synapse.appservice import ApplicationService @@ -55,11 +62,11 @@ logger = logging.getLogger(__name__) class LoginResponse(TypedDict, total=False): user_id: str - access_token: str + access_token: Optional[str] home_server: str expires_in_ms: Optional[int] refresh_token: Optional[str] - device_id: str + device_id: Optional[str] well_known: Optional[Dict[str, Any]] @@ -92,6 +99,12 @@ class LoginRestServlet(RestServlet): hs.config.registration.refreshable_access_token_lifetime is not None ) + # Whether we need to check if the user has been approved or not. + self._require_approval = ( + hs.config.experimental.msc3866.enabled + and hs.config.experimental.msc3866.require_approval_for_new_accounts + ) + self.auth = hs.get_auth() self.clock = hs.get_clock() @@ -220,6 +233,14 @@ class LoginRestServlet(RestServlet): except KeyError: raise SynapseError(400, "Missing JSON keys.") + if self._require_approval: + approved = await self.auth_handler.is_user_approved(result["user_id"]) + if not approved: + raise NotApprovedError( + msg="This account is pending approval by a server administrator.", + approval_notice_medium=ApprovalNoticeMedium.NONE, + ) + well_known_data = self._well_known_builder.get_well_known() if well_known_data: result["well_known"] = well_known_data @@ -356,6 +377,16 @@ class LoginRestServlet(RestServlet): errcode=Codes.INVALID_PARAM, ) + if self._require_approval: + approved = await self.auth_handler.is_user_approved(user_id) + if not approved: + # If the user isn't approved (and needs to be) we won't allow them to + # actually log in, so we don't want to create a device/access token. + return LoginResponse( + user_id=user_id, + home_server=self.hs.hostname, + ) + initial_display_name = login_submission.get("initial_device_display_name") ( device_id, diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py index 20bab20c8f..de810ae3ec 100644 --- a/synapse/rest/client/register.py +++ b/synapse/rest/client/register.py @@ -21,10 +21,15 @@ from twisted.web.server import Request import synapse import synapse.api.auth import synapse.types -from synapse.api.constants import APP_SERVICE_REGISTRATION_TYPE, LoginType +from synapse.api.constants import ( + APP_SERVICE_REGISTRATION_TYPE, + ApprovalNoticeMedium, + LoginType, +) from synapse.api.errors import ( Codes, InteractiveAuthIncompleteError, + NotApprovedError, SynapseError, ThreepidValidationError, UnrecognizedRequestError, @@ -414,6 +419,11 @@ class RegisterRestServlet(RestServlet): hs.config.registration.inhibit_user_in_use_error ) + self._require_approval = ( + hs.config.experimental.msc3866.enabled + and hs.config.experimental.msc3866.require_approval_for_new_accounts + ) + self._registration_flows = _calculate_registration_flows( hs.config, self.auth_handler ) @@ -734,6 +744,12 @@ class RegisterRestServlet(RestServlet): access_token=return_dict.get("access_token"), ) + if self._require_approval: + raise NotApprovedError( + msg="This account needs to be approved by an administrator before it can be used.", + approval_notice_medium=ApprovalNoticeMedium.NONE, + ) + return 200, return_dict async def _do_appservice_registration( @@ -778,7 +794,9 @@ class RegisterRestServlet(RestServlet): "user_id": user_id, "home_server": self.hs.hostname, } - if not params.get("inhibit_login", False): + # We don't want to log the user in if we're going to deny them access because + # they need to be approved first. + if not params.get("inhibit_login", False) and not self._require_approval: device_id = params.get("device_id") initial_display_name = params.get("initial_device_display_name") ( diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 0843f10340..a62b4abd4e 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -203,6 +203,7 @@ class DataStore( deactivated: bool = False, order_by: str = UserSortOrder.USER_ID.value, direction: str = "f", + approved: bool = True, ) -> Tuple[List[JsonDict], int]: """Function to retrieve a paginated list of users from users list. This will return a json list of users and the @@ -217,6 +218,7 @@ class DataStore( deactivated: whether to include deactivated users order_by: the sort order of the returned list direction: sort ascending or descending + approved: whether to include approved users Returns: A tuple of a list of mappings from user to information and a count of total users. """ @@ -249,6 +251,11 @@ class DataStore( if not deactivated: filters.append("deactivated = 0") + if not approved: + # We ignore NULL values for the approved flag because these should only + # be already existing users that we consider as already approved. + filters.append("approved IS FALSE") + where_clause = "WHERE " + " AND ".join(filters) if len(filters) > 0 else "" sql_base = f""" @@ -262,7 +269,7 @@ class DataStore( sql = f""" SELECT name, user_type, is_guest, admin, deactivated, shadow_banned, - displayname, avatar_url, creation_ts * 1000 as creation_ts + displayname, avatar_url, creation_ts * 1000 as creation_ts, approved {sql_base} ORDER BY {order_by_column} {order}, u.name ASC LIMIT ? OFFSET ? diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index ac821878b0..2996d6bb4d 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -166,27 +166,49 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): @cached() async def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]: """Deprecated: use get_userinfo_by_id instead""" - return await self.db_pool.simple_select_one( - table="users", - keyvalues={"name": user_id}, - retcols=[ - "name", - "password_hash", - "is_guest", - "admin", - "consent_version", - "consent_ts", - "consent_server_notice_sent", - "appservice_id", - "creation_ts", - "user_type", - "deactivated", - "shadow_banned", - ], - allow_none=True, + + def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[Dict[str, Any]]: + # We could technically use simple_select_one here, but it would not perform + # the COALESCEs (unless hacked into the column names), which could yield + # confusing results. + txn.execute( + """ + SELECT + name, password_hash, is_guest, admin, consent_version, consent_ts, + consent_server_notice_sent, appservice_id, creation_ts, user_type, + deactivated, COALESCE(shadow_banned, FALSE) AS shadow_banned, + COALESCE(approved, TRUE) AS approved + FROM users + WHERE name = ? + """, + (user_id,), + ) + + rows = self.db_pool.cursor_to_dict(txn) + + if len(rows) == 0: + return None + + return rows[0] + + row = await self.db_pool.runInteraction( desc="get_user_by_id", + func=get_user_by_id_txn, ) + if row is not None: + # If we're using SQLite our boolean values will be integers. Because we + # present some of this data as is to e.g. server admins via REST APIs, we + # want to make sure we're returning the right type of data. + # Note: when adding a column name to this list, be wary of NULLable columns, + # since NULL values will be turned into False. + boolean_columns = ["admin", "deactivated", "shadow_banned", "approved"] + for column in boolean_columns: + if not isinstance(row[column], bool): + row[column] = bool(row[column]) + + return row + async def get_userinfo_by_id(self, user_id: str) -> Optional[UserInfo]: """Get a UserInfo object for a user by user ID. @@ -1779,6 +1801,40 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): return res if res else False + @cached() + async def is_user_approved(self, user_id: str) -> bool: + """Checks if a user is approved and therefore can be allowed to log in. + + If the user's 'approved' column is NULL, we consider it as true given it means + the user was registered when support for an approval flow was either disabled + or nonexistent. + + Args: + user_id: the user to check the approval status of. + + Returns: + A boolean that is True if the user is approved, False otherwise. + """ + + def is_user_approved_txn(txn: LoggingTransaction) -> bool: + txn.execute( + """ + SELECT COALESCE(approved, TRUE) AS approved FROM users WHERE name = ? + """, + (user_id,), + ) + + rows = self.db_pool.cursor_to_dict(txn) + + # We cast to bool because the value returned by the database engine might + # be an integer if we're using SQLite. + return bool(rows[0]["approved"]) + + return await self.db_pool.runInteraction( + desc="is_user_pending_approval", + func=is_user_approved_txn, + ) + class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): def __init__( @@ -1916,6 +1972,29 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) txn.call_after(self.is_guest.invalidate, (user_id,)) + def update_user_approval_status_txn( + self, txn: LoggingTransaction, user_id: str, approved: bool + ) -> None: + """Set the user's 'approved' flag to the given value. + + The boolean is turned into an int because the column is a smallint. + + Args: + txn: the current database transaction. + user_id: the user to update the flag for. + approved: the value to set the flag to. + """ + self.db_pool.simple_update_one_txn( + txn=txn, + table="users", + keyvalues={"name": user_id}, + updatevalues={"approved": approved}, + ) + + # Invalidate the caches of methods that read the value of the 'approved' flag. + self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) + self._invalidate_cache_and_stream(txn, self.is_user_approved, (user_id,)) + class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): def __init__( @@ -1933,6 +2012,13 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id") + # If support for MSC3866 is enabled and configured to require approval for new + # account, we will create new users with an 'approved' flag set to false. + self._require_approval = ( + hs.config.experimental.msc3866.enabled + and hs.config.experimental.msc3866.require_approval_for_new_accounts + ) + async def add_access_token_to_user( self, user_id: str, @@ -2065,6 +2151,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): admin: bool = False, user_type: Optional[str] = None, shadow_banned: bool = False, + approved: bool = False, ) -> None: """Attempts to register an account. @@ -2083,6 +2170,8 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): or None for a normal user. shadow_banned: Whether the user is shadow-banned, i.e. they may be told their requests succeeded but we ignore them. + approved: Whether to consider the user has already been approved by an + administrator. Raises: StoreError if the user_id could not be registered. @@ -2099,6 +2188,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): admin, user_type, shadow_banned, + approved, ) def _register_user( @@ -2113,11 +2203,14 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): admin: bool, user_type: Optional[str], shadow_banned: bool, + approved: bool, ) -> None: user_id_obj = UserID.from_string(user_id) now = int(self._clock.time()) + user_approved = approved or not self._require_approval + try: if was_guest: # Ensure that the guest user actually exists @@ -2143,6 +2236,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): "admin": 1 if admin else 0, "user_type": user_type, "shadow_banned": shadow_banned, + "approved": user_approved, }, ) else: @@ -2158,6 +2252,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): "admin": 1 if admin else 0, "user_type": user_type, "shadow_banned": shadow_banned, + "approved": user_approved, }, ) @@ -2503,6 +2598,25 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): start_or_continue_validation_session_txn, ) + async def update_user_approval_status( + self, user_id: UserID, approved: bool + ) -> None: + """Set the user's 'approved' flag to the given value. + + The boolean will be turned into an int (in update_user_approval_status_txn) + because the column is a smallint. + + Args: + user_id: the user to update the flag for. + approved: the value to set the flag to. + """ + await self.db_pool.runInteraction( + "update_user_approval_status", + self.update_user_approval_status_txn, + user_id.to_string(), + approved, + ) + def find_max_generated_user_id_localpart(cur: Cursor) -> int: """ diff --git a/synapse/storage/schema/main/delta/73/03users_approved_column.sql b/synapse/storage/schema/main/delta/73/03users_approved_column.sql new file mode 100644 index 0000000000..5328d592ea --- /dev/null +++ b/synapse/storage/schema/main/delta/73/03users_approved_column.sql @@ -0,0 +1,20 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add a column to the users table to track whether the user needs to be approved by an +-- administrator. +-- A NULL column means the user was created before this feature was supported by Synapse, +-- and should be considered as TRUE. +ALTER TABLE users ADD COLUMN approved BOOLEAN; diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index 1847e6ad6b..4c1ce33463 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -25,10 +25,10 @@ from parameterized import parameterized, parameterized_class from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin -from synapse.api.constants import UserTypes +from synapse.api.constants import ApprovalNoticeMedium, LoginType, UserTypes from synapse.api.errors import Codes, HttpResponseException, ResourceLimitError from synapse.api.room_versions import RoomVersions -from synapse.rest.client import devices, login, logout, profile, room, sync +from synapse.rest.client import devices, login, logout, profile, register, room, sync from synapse.rest.media.v1.filepath import MediaFilePaths from synapse.server import HomeServer from synapse.types import JsonDict, UserID @@ -578,6 +578,16 @@ class UsersListTestCase(unittest.HomeserverTestCase): _search_test(None, "foo", "user_id") _search_test(None, "bar", "user_id") + @override_config( + { + "experimental_features": { + "msc3866": { + "enabled": True, + "require_approval_for_new_accounts": True, + } + } + } + ) def test_invalid_parameter(self) -> None: """ If parameters are invalid, an error is returned. @@ -623,6 +633,16 @@ class UsersListTestCase(unittest.HomeserverTestCase): self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"]) + # invalid approved + channel = self.make_request( + "GET", + self.url + "?approved=not_bool", + access_token=self.admin_user_tok, + ) + + self.assertEqual(400, channel.code, msg=channel.json_body) + self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"]) + # unkown order_by channel = self.make_request( "GET", @@ -841,6 +861,69 @@ class UsersListTestCase(unittest.HomeserverTestCase): self._order_test([self.admin_user, user1, user2], "creation_ts", "f") self._order_test([user2, user1, self.admin_user], "creation_ts", "b") + @override_config( + { + "experimental_features": { + "msc3866": { + "enabled": True, + "require_approval_for_new_accounts": True, + } + } + } + ) + def test_filter_out_approved(self) -> None: + """Tests that the endpoint can filter out approved users.""" + # Create our users. + self._create_users(2) + + # Get the list of users. + channel = self.make_request( + "GET", + self.url, + access_token=self.admin_user_tok, + ) + self.assertEqual(200, channel.code, channel.result) + + # Exclude the admin, because we don't want to accidentally un-approve the admin. + non_admin_user_ids = [ + user["name"] + for user in channel.json_body["users"] + if user["name"] != self.admin_user + ] + + self.assertEqual(2, len(non_admin_user_ids), non_admin_user_ids) + + # Select a user and un-approve them. We do this rather than the other way around + # because, since these users are created by an admin, we consider them already + # approved. + not_approved_user = non_admin_user_ids[0] + + channel = self.make_request( + "PUT", + f"/_synapse/admin/v2/users/{not_approved_user}", + {"approved": False}, + access_token=self.admin_user_tok, + ) + self.assertEqual(200, channel.code, channel.result) + + # Now get the list of users again, this time filtering out approved users. + channel = self.make_request( + "GET", + self.url + "?approved=false", + access_token=self.admin_user_tok, + ) + self.assertEqual(200, channel.code, channel.result) + + non_admin_user_ids = [ + user["name"] + for user in channel.json_body["users"] + if user["name"] != self.admin_user + ] + + # We should only have our unapproved user now. + self.assertEqual(1, len(non_admin_user_ids), non_admin_user_ids) + self.assertEqual(not_approved_user, non_admin_user_ids[0]) + def _order_test( self, expected_user_list: List[str], @@ -1272,6 +1355,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): synapse.rest.admin.register_servlets, login.register_servlets, sync.register_servlets, + register.register_servlets, ] def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: @@ -2536,6 +2620,104 @@ class UserRestTestCase(unittest.HomeserverTestCase): # Ensure they're still alive self.assertEqual(0, channel.json_body["deactivated"]) + @override_config( + { + "experimental_features": { + "msc3866": { + "enabled": True, + "require_approval_for_new_accounts": True, + } + } + } + ) + def test_approve_account(self) -> None: + """Tests that approving an account correctly sets the approved flag for the user.""" + url = self.url_prefix % "@bob:test" + + # Create the user using the client-server API since otherwise the user will be + # marked as approved automatically. + channel = self.make_request( + "POST", + "register", + { + "username": "bob", + "password": "test", + "auth": {"type": LoginType.DUMMY}, + }, + ) + self.assertEqual(403, channel.code, channel.result) + self.assertEqual(Codes.USER_AWAITING_APPROVAL, channel.json_body["errcode"]) + self.assertEqual( + ApprovalNoticeMedium.NONE, channel.json_body["approval_notice_medium"] + ) + + # Get user + channel = self.make_request( + "GET", + url, + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertIs(False, channel.json_body["approved"]) + + # Approve user + channel = self.make_request( + "PUT", + url, + access_token=self.admin_user_tok, + content={"approved": True}, + ) + + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertIs(True, channel.json_body["approved"]) + + # Check that the user is now approved + channel = self.make_request( + "GET", + url, + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertIs(True, channel.json_body["approved"]) + + @override_config( + { + "experimental_features": { + "msc3866": { + "enabled": True, + "require_approval_for_new_accounts": True, + } + } + } + ) + def test_register_approved(self) -> None: + url = self.url_prefix % "@bob:test" + + # Create user + channel = self.make_request( + "PUT", + url, + access_token=self.admin_user_tok, + content={"password": "abc123", "approved": True}, + ) + + self.assertEqual(201, channel.code, msg=channel.json_body) + self.assertEqual("@bob:test", channel.json_body["name"]) + self.assertEqual(1, channel.json_body["approved"]) + + # Get user + channel = self.make_request( + "GET", + url, + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertEqual("@bob:test", channel.json_body["name"]) + self.assertEqual(1, channel.json_body["approved"]) + def _is_erased(self, user_id: str, expect: bool) -> None: """Assert that the user is erased or not""" d = self.store.is_user_erased(user_id) diff --git a/tests/rest/client/test_auth.py b/tests/rest/client/test_auth.py index 05355c7fb6..090cef5216 100644 --- a/tests/rest/client/test_auth.py +++ b/tests/rest/client/test_auth.py @@ -20,7 +20,8 @@ from twisted.test.proto_helpers import MemoryReactor from twisted.web.resource import Resource import synapse.rest.admin -from synapse.api.constants import LoginType +from synapse.api.constants import ApprovalNoticeMedium, LoginType +from synapse.api.errors import Codes from synapse.handlers.ui_auth.checkers import UserInteractiveAuthChecker from synapse.rest.client import account, auth, devices, login, logout, register from synapse.rest.synapse.client import build_synapse_client_resource_tree @@ -567,6 +568,36 @@ class UIAuthTests(unittest.HomeserverTestCase): body={"auth": {"session": session_id}}, ) + @skip_unless(HAS_OIDC, "requires OIDC") + @override_config( + { + "oidc_config": TEST_OIDC_CONFIG, + "experimental_features": { + "msc3866": { + "enabled": True, + "require_approval_for_new_accounts": True, + } + }, + } + ) + def test_sso_not_approved(self) -> None: + """Tests that if we register a user via SSO while requiring approval for new + accounts, we still raise the correct error before logging the user in. + """ + login_resp = self.helper.login_via_oidc("username", expected_status=403) + + self.assertEqual(login_resp["errcode"], Codes.USER_AWAITING_APPROVAL) + self.assertEqual( + ApprovalNoticeMedium.NONE, login_resp["approval_notice_medium"] + ) + + # Check that we didn't register a device for the user during the login attempt. + devices = self.get_success( + self.hs.get_datastores().main.get_devices_by_user("@username:test") + ) + + self.assertEqual(len(devices), 0) + class RefreshAuthTests(unittest.HomeserverTestCase): servlets = [ diff --git a/tests/rest/client/test_login.py b/tests/rest/client/test_login.py index e2a4d98275..e801ba8c8b 100644 --- a/tests/rest/client/test_login.py +++ b/tests/rest/client/test_login.py @@ -23,6 +23,8 @@ from twisted.test.proto_helpers import MemoryReactor from twisted.web.resource import Resource import synapse.rest.admin +from synapse.api.constants import ApprovalNoticeMedium, LoginType +from synapse.api.errors import Codes from synapse.appservice import ApplicationService from synapse.rest.client import devices, login, logout, register from synapse.rest.client.account import WhoamiRestServlet @@ -94,6 +96,7 @@ class LoginRestServletTestCase(unittest.HomeserverTestCase): logout.register_servlets, devices.register_servlets, lambda hs, http_server: WhoamiRestServlet(hs).register(http_server), + register.register_servlets, ] def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: @@ -406,6 +409,44 @@ class LoginRestServletTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.code, 400) self.assertEqual(channel.json_body["errcode"], "M_INVALID_PARAM") + @override_config( + { + "experimental_features": { + "msc3866": { + "enabled": True, + "require_approval_for_new_accounts": True, + } + } + } + ) + def test_require_approval(self) -> None: + channel = self.make_request( + "POST", + "register", + { + "username": "kermit", + "password": "monkey", + "auth": {"type": LoginType.DUMMY}, + }, + ) + self.assertEqual(403, channel.code, channel.result) + self.assertEqual(Codes.USER_AWAITING_APPROVAL, channel.json_body["errcode"]) + self.assertEqual( + ApprovalNoticeMedium.NONE, channel.json_body["approval_notice_medium"] + ) + + params = { + "type": LoginType.PASSWORD, + "identifier": {"type": "m.id.user", "user": "kermit"}, + "password": "monkey", + } + channel = self.make_request("POST", LOGIN_URL, params) + self.assertEqual(403, channel.code, channel.result) + self.assertEqual(Codes.USER_AWAITING_APPROVAL, channel.json_body["errcode"]) + self.assertEqual( + ApprovalNoticeMedium.NONE, channel.json_body["approval_notice_medium"] + ) + @skip_unless(has_saml2 and HAS_OIDC, "Requires SAML2 and OIDC") class MultiSSOTestCase(unittest.HomeserverTestCase): diff --git a/tests/rest/client/test_register.py b/tests/rest/client/test_register.py index b781875d52..11cf3939d8 100644 --- a/tests/rest/client/test_register.py +++ b/tests/rest/client/test_register.py @@ -22,7 +22,11 @@ import pkg_resources from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin -from synapse.api.constants import APP_SERVICE_REGISTRATION_TYPE, LoginType +from synapse.api.constants import ( + APP_SERVICE_REGISTRATION_TYPE, + ApprovalNoticeMedium, + LoginType, +) from synapse.api.errors import Codes from synapse.appservice import ApplicationService from synapse.rest.client import account, account_validity, login, logout, register, sync @@ -765,6 +769,32 @@ class RegisterRestServletTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.code, 400, channel.json_body) self.assertEqual(channel.json_body["errcode"], Codes.USER_IN_USE) + @override_config( + { + "experimental_features": { + "msc3866": { + "enabled": True, + "require_approval_for_new_accounts": True, + } + } + } + ) + def test_require_approval(self) -> None: + channel = self.make_request( + "POST", + "register", + { + "username": "kermit", + "password": "monkey", + "auth": {"type": LoginType.DUMMY}, + }, + ) + self.assertEqual(403, channel.code, channel.result) + self.assertEqual(Codes.USER_AWAITING_APPROVAL, channel.json_body["errcode"]) + self.assertEqual( + ApprovalNoticeMedium.NONE, channel.json_body["approval_notice_medium"] + ) + class AccountValidityTestCase(unittest.HomeserverTestCase): diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index dd26145bf8..c249a42bb6 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -543,8 +543,12 @@ class RestHelper: return channel.json_body - def login_via_oidc(self, remote_user_id: str) -> JsonDict: - """Log in (as a new user) via OIDC + def login_via_oidc( + self, + remote_user_id: str, + expected_status: int = 200, + ) -> JsonDict: + """Log in via OIDC Returns the result of the final token login. @@ -578,7 +582,9 @@ class RestHelper: "/login", content={"type": "m.login.token", "token": login_token}, ) - assert channel.code == HTTPStatus.OK + assert ( + channel.code == expected_status + ), f"unexpected status in response: {channel.code}" return channel.json_body def auth_via_oidc( diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py index 853a93afab..05ea802008 100644 --- a/tests/storage/test_registration.py +++ b/tests/storage/test_registration.py @@ -16,9 +16,10 @@ from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import UserTypes from synapse.api.errors import ThreepidValidationError from synapse.server import HomeServer +from synapse.types import JsonDict, UserID from synapse.util import Clock -from tests.unittest import HomeserverTestCase +from tests.unittest import HomeserverTestCase, override_config class RegistrationStoreTestCase(HomeserverTestCase): @@ -48,6 +49,7 @@ class RegistrationStoreTestCase(HomeserverTestCase): "user_type": None, "deactivated": 0, "shadow_banned": 0, + "approved": 1, }, (self.get_success(self.store.get_user_by_id(self.user_id))), ) @@ -166,3 +168,101 @@ class RegistrationStoreTestCase(HomeserverTestCase): ThreepidValidationError, ) self.assertEqual(e.value.msg, "Validation token not found or has expired", e) + + +class ApprovalRequiredRegistrationTestCase(HomeserverTestCase): + def default_config(self) -> JsonDict: + config = super().default_config() + + # If there's already some config for this feature in the default config, it + # means we're overriding it with @override_config. In this case we don't want + # to do anything more with it. + msc3866_config = config.get("experimental_features", {}).get("msc3866") + if msc3866_config is not None: + return config + + # Require approval for all new accounts. + config["experimental_features"] = { + "msc3866": { + "enabled": True, + "require_approval_for_new_accounts": True, + } + } + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.user_id = "@my-user:test" + self.pwhash = "{xx1}123456789" + + @override_config( + { + "experimental_features": { + "msc3866": { + "enabled": True, + "require_approval_for_new_accounts": False, + } + } + } + ) + def test_approval_not_required(self) -> None: + """Tests that if we don't require approval for new accounts, newly created + accounts are automatically marked as approved. + """ + self.get_success(self.store.register_user(self.user_id, self.pwhash)) + + user = self.get_success(self.store.get_user_by_id(self.user_id)) + assert user is not None + self.assertTrue(user["approved"]) + + approved = self.get_success(self.store.is_user_approved(self.user_id)) + self.assertTrue(approved) + + def test_approval_required(self) -> None: + """Tests that if we require approval for new accounts, newly created accounts + are not automatically marked as approved. + """ + self.get_success(self.store.register_user(self.user_id, self.pwhash)) + + user = self.get_success(self.store.get_user_by_id(self.user_id)) + assert user is not None + self.assertFalse(user["approved"]) + + approved = self.get_success(self.store.is_user_approved(self.user_id)) + self.assertFalse(approved) + + def test_override(self) -> None: + """Tests that if we require approval for new accounts, but we explicitly say the + new user should be considered approved, they're marked as approved. + """ + self.get_success( + self.store.register_user( + self.user_id, + self.pwhash, + approved=True, + ) + ) + + user = self.get_success(self.store.get_user_by_id(self.user_id)) + self.assertIsNotNone(user) + assert user is not None + self.assertEqual(user["approved"], 1) + + approved = self.get_success(self.store.is_user_approved(self.user_id)) + self.assertTrue(approved) + + def test_approve_user(self) -> None: + """Tests that approving the user updates their approval status.""" + self.get_success(self.store.register_user(self.user_id, self.pwhash)) + + approved = self.get_success(self.store.is_user_approved(self.user_id)) + self.assertFalse(approved) + + self.get_success( + self.store.update_user_approval_status( + UserID.from_string(self.user_id), True + ) + ) + + approved = self.get_success(self.store.is_user_approved(self.user_id)) + self.assertTrue(approved) -- cgit 1.5.1 From 422cff7df6df3ac3691829fbce3fbd486f399869 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 11 Oct 2022 14:41:06 +0200 Subject: Fallback if 'approved' isn't included in a registration replication request (#14135) --- changelog.d/14135.bugfix | 1 + synapse/replication/http/register.py | 18 +++++++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) create mode 100644 changelog.d/14135.bugfix (limited to 'synapse/replication/http') diff --git a/changelog.d/14135.bugfix b/changelog.d/14135.bugfix new file mode 100644 index 0000000000..6d1d7816e8 --- /dev/null +++ b/changelog.d/14135.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse 1.69.0rc1 which would cause registration replication requests to fail if the worker sending the request is not running Synapse 1.69. diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py index 61abb529c8..976c283360 100644 --- a/synapse/replication/http/register.py +++ b/synapse/replication/http/register.py @@ -39,6 +39,16 @@ class ReplicationRegisterServlet(ReplicationEndpoint): self.store = hs.get_datastores().main self.registration_handler = hs.get_registration_handler() + # Default value if the worker that sent the replication request did not include + # an 'approved' property. + if ( + hs.config.experimental.msc3866.enabled + and hs.config.experimental.msc3866.require_approval_for_new_accounts + ): + self._approval_default = False + else: + self._approval_default = True + @staticmethod async def _serialize_payload( # type: ignore[override] user_id: str, @@ -92,6 +102,12 @@ class ReplicationRegisterServlet(ReplicationEndpoint): await self.registration_handler.check_registration_ratelimit(content["address"]) + # Always default admin users to approved (since it means they were created by + # an admin). + approved_default = self._approval_default + if content["admin"]: + approved_default = True + await self.registration_handler.register_with_store( user_id=user_id, password_hash=content["password_hash"], @@ -103,7 +119,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint): user_type=content["user_type"], address=content["address"], shadow_banned=content["shadow_banned"], - approved=content["approved"], + approved=content.get("approved", approved_default), ) return 200, {} -- cgit 1.5.1 From b5ab2c428a1c5edd634ff084019811e5f6b963d8 Mon Sep 17 00:00:00 2001 From: Tuomas Ojamies Date: Tue, 15 Nov 2022 13:55:00 +0100 Subject: Support using SSL on worker endpoints. (#14128) * Fix missing SSL support in worker endpoints. * Add changelog * SSL for Replication endpoint * Remove unit test change * Refactor listener creation to reduce duplicated code * Fix the logger message * Update synapse/app/_base.py Co-authored-by: Patrick Cloke * Update synapse/app/_base.py Co-authored-by: Patrick Cloke * Update synapse/app/_base.py Co-authored-by: Patrick Cloke * Add config documentation for new TLS option Co-authored-by: Tuomas Ojamies Co-authored-by: Patrick Cloke Co-authored-by: Olivier Wilkinson (reivilibre) --- changelog.d/14128.misc | 1 + docs/usage/configuration/config_documentation.md | 20 +++++++++ synapse/app/_base.py | 53 +++++++++++++++++++++++- synapse/app/generic_worker.py | 28 ++++--------- synapse/app/homeserver.py | 34 ++------------- synapse/config/workers.py | 7 ++++ synapse/replication/http/_base.py | 10 ++++- 7 files changed, 100 insertions(+), 53 deletions(-) create mode 100644 changelog.d/14128.misc (limited to 'synapse/replication/http') diff --git a/changelog.d/14128.misc b/changelog.d/14128.misc new file mode 100644 index 0000000000..29168ef955 --- /dev/null +++ b/changelog.d/14128.misc @@ -0,0 +1 @@ +Add TLS support for generic worker endpoints. diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 9a6bd08d01..f5937dd902 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -3893,6 +3893,26 @@ Example configuration: worker_replication_http_port: 9093 ``` --- +### `worker_replication_http_tls` + +Whether TLS should be used for talking to the HTTP replication port on the main +Synapse process. +The main Synapse process defines this with the `tls` option on its [listener](#listeners) that +has the `replication` resource enabled. + +**Please note:** by default, it is not safe to expose replication ports to the +public Internet, even with TLS enabled. +See [`worker_replication_secret`](#worker_replication_secret). + +Defaults to `false`. + +*Added in Synapse 1.72.0.* + +Example configuration: +```yaml +worker_replication_http_tls: true +``` +--- ### `worker_listeners` A worker can handle HTTP requests. To do so, a `worker_listeners` option diff --git a/synapse/app/_base.py b/synapse/app/_base.py index a683ebf4cb..8f5b1a20f5 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -47,6 +47,7 @@ from twisted.internet.tcp import Port from twisted.logger import LoggingFile, LogLevel from twisted.protocols.tls import TLSMemoryBIOFactory from twisted.python.threadpool import ThreadPool +from twisted.web.resource import Resource import synapse.util.caches from synapse.api.constants import MAX_PDU_SIZE @@ -55,12 +56,13 @@ from synapse.app.phone_stats_home import start_phone_stats_home from synapse.config import ConfigError from synapse.config._base import format_config_error from synapse.config.homeserver import HomeServerConfig -from synapse.config.server import ManholeConfig +from synapse.config.server import ListenerConfig, ManholeConfig from synapse.crypto import context_factory from synapse.events.presence_router import load_legacy_presence_router from synapse.events.spamcheck import load_legacy_spam_checkers from synapse.events.third_party_rules import load_legacy_third_party_event_rules from synapse.handlers.auth import load_legacy_password_auth_providers +from synapse.http.site import SynapseSite from synapse.logging.context import PreserveLoggingContext from synapse.logging.opentracing import init_tracer from synapse.metrics import install_gc_manager, register_threadpool @@ -357,6 +359,55 @@ def listen_tcp( return r # type: ignore[return-value] +def listen_http( + listener_config: ListenerConfig, + root_resource: Resource, + version_string: str, + max_request_body_size: int, + context_factory: IOpenSSLContextFactory, + reactor: IReactorSSL = reactor, +) -> List[Port]: + port = listener_config.port + bind_addresses = listener_config.bind_addresses + tls = listener_config.tls + + assert listener_config.http_options is not None + + site_tag = listener_config.http_options.tag + if site_tag is None: + site_tag = str(port) + + site = SynapseSite( + "synapse.access.%s.%s" % ("https" if tls else "http", site_tag), + site_tag, + listener_config, + root_resource, + version_string, + max_request_body_size=max_request_body_size, + reactor=reactor, + ) + if tls: + # refresh_certificate should have been called before this. + assert context_factory is not None + ports = listen_ssl( + bind_addresses, + port, + site, + context_factory, + reactor=reactor, + ) + logger.info("Synapse now listening on TCP port %d (TLS)", port) + else: + ports = listen_tcp( + bind_addresses, + port, + site, + reactor=reactor, + ) + logger.info("Synapse now listening on TCP port %d", port) + return ports + + def listen_ssl( bind_addresses: Collection[str], port: int, diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 51446b49cd..1d9aef45c2 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -44,7 +44,7 @@ from synapse.config.server import ListenerConfig from synapse.federation.transport.server import TransportLayerServer from synapse.http.server import JsonResource, OptionsResource from synapse.http.servlet import RestServlet, parse_json_object_from_request -from synapse.http.site import SynapseRequest, SynapseSite +from synapse.http.site import SynapseRequest from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource @@ -288,15 +288,9 @@ class GenericWorkerServer(HomeServer): DATASTORE_CLASS = GenericWorkerSlavedStore # type: ignore def _listen_http(self, listener_config: ListenerConfig) -> None: - port = listener_config.port - bind_addresses = listener_config.bind_addresses assert listener_config.http_options is not None - site_tag = listener_config.http_options.tag - if site_tag is None: - site_tag = str(port) - # We always include a health resource. resources: Dict[str, Resource] = {"/health": HealthResource()} @@ -395,23 +389,15 @@ class GenericWorkerServer(HomeServer): root_resource = create_resource_tree(resources, OptionsResource()) - _base.listen_tcp( - bind_addresses, - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - self.version_string, - max_request_body_size=max_request_body_size(self.config), - reactor=self.get_reactor(), - ), + _base.listen_http( + listener_config, + root_resource, + self.version_string, + max_request_body_size(self.config), + self.tls_server_context_factory, reactor=self.get_reactor(), ) - logger.info("Synapse worker now listening on port %d", port) - def start_listening(self) -> None: for listener in self.config.worker.worker_listeners: if listener.type == "http": diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index de3f08876f..4f4fee4782 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -37,8 +37,7 @@ from synapse.api.urls import ( from synapse.app import _base from synapse.app._base import ( handle_startup_exception, - listen_ssl, - listen_tcp, + listen_http, max_request_body_size, redirect_stdio_to_logs, register_start, @@ -53,7 +52,6 @@ from synapse.http.server import ( RootOptionsRedirectResource, StaticResource, ) -from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource @@ -83,8 +81,6 @@ class SynapseHomeServer(HomeServer): self, config: HomeServerConfig, listener_config: ListenerConfig ) -> Iterable[Port]: port = listener_config.port - bind_addresses = listener_config.bind_addresses - tls = listener_config.tls # Must exist since this is an HTTP listener. assert listener_config.http_options is not None site_tag = listener_config.http_options.tag @@ -140,37 +136,15 @@ class SynapseHomeServer(HomeServer): else: root_resource = OptionsResource() - site = SynapseSite( - "synapse.access.%s.%s" % ("https" if tls else "http", site_tag), - site_tag, + ports = listen_http( listener_config, create_resource_tree(resources, root_resource), self.version_string, - max_request_body_size=max_request_body_size(self.config), + max_request_body_size(self.config), + self.tls_server_context_factory, reactor=self.get_reactor(), ) - if tls: - # refresh_certificate should have been called before this. - assert self.tls_server_context_factory is not None - ports = listen_ssl( - bind_addresses, - port, - site, - self.tls_server_context_factory, - reactor=self.get_reactor(), - ) - logger.info("Synapse now listening on TCP port %d (TLS)", port) - - else: - ports = listen_tcp( - bind_addresses, - port, - site, - reactor=self.get_reactor(), - ) - logger.info("Synapse now listening on TCP port %d", port) - return ports def _configure_named_resource( diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 0fb725dd8f..88b3168cbc 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -67,6 +67,7 @@ class InstanceLocationConfig: host: str port: int + tls: bool = False @attr.s @@ -149,6 +150,12 @@ class WorkerConfig(Config): # The port on the main synapse for HTTP replication endpoint self.worker_replication_http_port = config.get("worker_replication_http_port") + # The tls mode on the main synapse for HTTP replication endpoint. + # For backward compatibility this defaults to False. + self.worker_replication_http_tls = config.get( + "worker_replication_http_tls", False + ) + # The shared secret used for authentication when connecting to the main synapse. self.worker_replication_secret = config.get("worker_replication_secret", None) diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index acb0bd18f7..5e661f8c73 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -184,8 +184,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): client = hs.get_simple_http_client() local_instance_name = hs.get_instance_name() + # The value of these option should match the replication listener settings master_host = hs.config.worker.worker_replication_host master_port = hs.config.worker.worker_replication_http_port + master_tls = hs.config.worker.worker_replication_http_tls instance_map = hs.config.worker.instance_map @@ -205,9 +207,11 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): if instance_name == "master": host = master_host port = master_port + tls = master_tls elif instance_name in instance_map: host = instance_map[instance_name].host port = instance_map[instance_name].port + tls = instance_map[instance_name].tls else: raise Exception( "Instance %r not in 'instance_map' config" % (instance_name,) @@ -238,7 +242,11 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): "Unknown METHOD on %s replication endpoint" % (cls.NAME,) ) - uri = "http://%s:%s/_synapse/replication/%s/%s" % ( + # Here the protocol is hard coded to be http by default or https in case the replication + # port is set to have tls true. + scheme = "https" if tls else "http" + uri = "%s://%s:%s/_synapse/replication/%s/%s" % ( + scheme, host, port, cls.NAME, -- cgit 1.5.1 From d8cc86eff484b6f570f55a5badb337080c6e4dcd Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 16 Nov 2022 10:25:24 -0500 Subject: Remove redundant types from comments. (#14412) Remove type hints from comments which have been added as Python type hints. This helps avoid drift between comments and reality, as well as removing redundant information. Also adds some missing type hints which were simple to fill in. --- changelog.d/14412.misc | 1 + synapse/api/errors.py | 2 +- synapse/config/logger.py | 5 ++- synapse/crypto/keyring.py | 9 +++-- synapse/events/__init__.py | 3 +- synapse/federation/transport/client.py | 11 +++--- synapse/federation/transport/server/_base.py | 4 +-- synapse/handlers/e2e_keys.py | 2 +- synapse/handlers/e2e_room_keys.py | 5 +-- synapse/handlers/federation.py | 4 +-- synapse/handlers/identity.py | 2 +- synapse/handlers/oidc.py | 2 +- synapse/handlers/presence.py | 2 +- synapse/handlers/saml.py | 4 +-- synapse/http/additional_resource.py | 3 +- synapse/http/federation/matrix_federation_agent.py | 9 +++-- synapse/http/matrixfederationclient.py | 3 +- synapse/http/proxyagent.py | 20 +++++------ synapse/http/server.py | 2 +- synapse/http/site.py | 2 +- synapse/logging/context.py | 39 +++++++++++----------- synapse/logging/opentracing.py | 4 +-- synapse/module_api/__init__.py | 7 ++-- synapse/replication/http/_base.py | 2 +- synapse/rest/admin/users.py | 5 +-- synapse/rest/client/login.py | 2 +- synapse/rest/media/v1/media_repository.py | 4 +-- synapse/rest/media/v1/thumbnailer.py | 4 +-- synapse/server_notices/consent_server_notices.py | 5 ++- .../resource_limits_server_notices.py | 12 ++++--- synapse/storage/controllers/persist_events.py | 5 ++- synapse/storage/databases/main/devices.py | 2 +- synapse/storage/databases/main/e2e_room_keys.py | 8 ++--- synapse/storage/databases/main/end_to_end_keys.py | 7 ++-- synapse/storage/databases/main/events.py | 22 ++++++------ synapse/storage/databases/main/events_worker.py | 2 +- .../storage/databases/main/monthly_active_users.py | 8 ++--- synapse/storage/databases/main/registration.py | 6 ++-- synapse/storage/databases/main/room.py | 8 +++-- synapse/storage/databases/main/user_directory.py | 9 +++-- synapse/types.py | 4 +-- synapse/util/async_helpers.py | 3 +- synapse/util/caches/__init__.py | 2 +- synapse/util/caches/deferred_cache.py | 2 +- synapse/util/caches/dictionary_cache.py | 9 ++--- synapse/util/caches/expiringcache.py | 2 +- synapse/util/caches/lrucache.py | 8 ++--- synapse/util/ratelimitutils.py | 2 +- synapse/util/threepids.py | 2 +- synapse/util/wheel_timer.py | 4 +-- tests/http/__init__.py | 7 ++-- tests/replication/slave/storage/test_events.py | 7 ++-- tests/replication/test_multi_media_repo.py | 14 ++++---- .../test_resource_limits_server_notices.py | 10 +++--- tests/unittest.py | 18 +++++----- 55 files changed, 174 insertions(+), 176 deletions(-) create mode 100644 changelog.d/14412.misc (limited to 'synapse/replication/http') diff --git a/changelog.d/14412.misc b/changelog.d/14412.misc new file mode 100644 index 0000000000..4da061d461 --- /dev/null +++ b/changelog.d/14412.misc @@ -0,0 +1 @@ +Remove duplicated type information from type hints. diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 400dd12aba..e2cfcea0f2 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -713,7 +713,7 @@ class HttpResponseException(CodeMessageException): set to the reason code from the HTTP response. Returns: - SynapseError: + The error converted to a SynapseError. """ # try to parse the body as json, to get better errcode/msg, but # default to M_UNKNOWN with the HTTP status as the error text diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 94d1150415..5468b963a2 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -317,10 +317,9 @@ def setup_logging( Set up the logging subsystem. Args: - config (LoggingConfig | synapse.config.worker.WorkerConfig): - configuration data + config: configuration data - use_worker_options (bool): True to use the 'worker_log_config' option + use_worker_options: True to use the 'worker_log_config' option instead of 'log_config'. logBeginner: The Twisted logBeginner to use. diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index c88afb2986..dd9b8089ec 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -213,7 +213,7 @@ class Keyring: def verify_json_objects_for_server( self, server_and_json: Iterable[Tuple[str, dict, int]] - ) -> List[defer.Deferred]: + ) -> List["defer.Deferred[None]"]: """Bulk verifies signatures of json objects, bulk fetching keys as necessary. @@ -226,10 +226,9 @@ class Keyring: valid. Returns: - List: for each input triplet, a deferred indicating success - or failure to verify each json object's signature for the given - server_name. The deferreds run their callbacks in the sentinel - logcontext. + For each input triplet, a deferred indicating success or failure to + verify each json object's signature for the given server_name. The + deferreds run their callbacks in the sentinel logcontext. """ return [ run_in_background( diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 030c3ca408..8aca9a3ab9 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -597,8 +597,7 @@ def _event_type_from_format_version( format_version: The event format version Returns: - type: A type that can be initialized as per the initializer of - `FrozenEvent` + A type that can be initialized as per the initializer of `FrozenEvent` """ if format_version == EventFormatVersions.ROOM_V1_V2: diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index cd39d4d111..a3cfc701cd 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -280,12 +280,11 @@ class TransportLayerClient: Note that this does not append any events to any graphs. Args: - destination (str): address of remote homeserver - room_id (str): room to join/leave - user_id (str): user to be joined/left - membership (str): one of join/leave - params (dict[str, str|Iterable[str]]): Query parameters to include in the - request. + destination: address of remote homeserver + room_id: room to join/leave + user_id: user to be joined/left + membership: one of join/leave + params: Query parameters to include in the request. Returns: Succeeds when we get a 2xx HTTP response. The result diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py index 1db8009d6c..cdaf0d5de7 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py @@ -224,10 +224,10 @@ class BaseFederationServlet: With arguments: - origin (unicode|None): The authenticated server_name of the calling server, + origin (str|None): The authenticated server_name of the calling server, unless REQUIRE_AUTH is set to False and authentication failed. - content (unicode|None): decoded json body of the request. None if the + content (str|None): decoded json body of the request. None if the request was a GET. query (dict[bytes, list[bytes]]): Query params from the request. url-decoded diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index a9912c467d..bf1221f523 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -870,7 +870,7 @@ class E2eKeysHandler: - signatures of the user's master key by the user's devices. Args: - user_id (string): the user uploading the keys + user_id: the user uploading the keys signatures (dict[string, dict]): map of devices to signed keys Returns: diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 28dc08c22a..83f53ceb88 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -377,8 +377,9 @@ class E2eRoomKeysHandler: """Deletes a given version of the user's e2e_room_keys backup Args: - user_id(str): the user whose current backup version we're deleting - version(str): the version id of the backup being deleted + user_id: the user whose current backup version we're deleting + version: Optional. the version ID of the backup version we're deleting + If missing, we delete the current backup version info. Raises: NotFoundError: if this backup version doesn't exist """ diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 5fc3b8bc8c..188f0956ef 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1596,8 +1596,8 @@ class FederationHandler: Fetch the complexity of a remote room over federation. Args: - remote_room_hosts (list[str]): The remote servers to ask. - room_id (str): The room ID to ask about. + remote_room_hosts: The remote servers to ask. + room_id: The room ID to ask about. Returns: Dict contains the complexity diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 93d09e9939..848e46eb9b 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -711,7 +711,7 @@ class IdentityHandler: inviter_display_name: The current display name of the inviter. inviter_avatar_url: The URL of the inviter's avatar. - id_access_token (str): The access token to authenticate to the identity + id_access_token: The access token to authenticate to the identity server with Returns: diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py index 867973dcca..41c675f408 100644 --- a/synapse/handlers/oidc.py +++ b/synapse/handlers/oidc.py @@ -787,7 +787,7 @@ class OidcProvider: Must include an ``access_token`` field. Returns: - UserInfo: an object representing the user. + an object representing the user. """ logger.debug("Using the OAuth2 access_token to request userinfo") metadata = await self.load_metadata() diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 0066d63987..b7bc787636 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -201,7 +201,7 @@ class BasePresenceHandler(abc.ABC): """Get the current presence state for multiple users. Returns: - dict: `user_id` -> `UserPresenceState` + A mapping of `user_id` -> `UserPresenceState` """ states = {} missing = [] diff --git a/synapse/handlers/saml.py b/synapse/handlers/saml.py index 9602f0d0bb..874860d461 100644 --- a/synapse/handlers/saml.py +++ b/synapse/handlers/saml.py @@ -441,7 +441,7 @@ class DefaultSamlMappingProvider: client_redirect_url: where the client wants to redirect to Returns: - dict: A dict containing new user attributes. Possible keys: + A dict containing new user attributes. Possible keys: * mxid_localpart (str): Required. The localpart of the user's mxid * displayname (str): The displayname of the user * emails (list[str]): Any emails for the user @@ -483,7 +483,7 @@ class DefaultSamlMappingProvider: Args: config: A dictionary containing configuration options for this provider Returns: - SamlConfig: A custom config object for this module + A custom config object for this module """ # Parse config options and use defaults where necessary mxid_source_attribute = config.get("mxid_source_attribute", "uid") diff --git a/synapse/http/additional_resource.py b/synapse/http/additional_resource.py index 6a9f6635d2..8729630581 100644 --- a/synapse/http/additional_resource.py +++ b/synapse/http/additional_resource.py @@ -45,8 +45,7 @@ class AdditionalResource(DirectServeJsonResource): Args: hs: homeserver - handler ((twisted.web.server.Request) -> twisted.internet.defer.Deferred): - function to be called to handle the request. + handler: function to be called to handle the request. """ super().__init__() self._handler = handler diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 2f0177f1e2..0359231e7d 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -155,11 +155,10 @@ class MatrixFederationAgent: a file for a file upload). Or None if the request is to have no body. Returns: - Deferred[twisted.web.iweb.IResponse]: - fires when the header of the response has been received (regardless of the - response status code). Fails if there is any problem which prevents that - response from being received (including problems that prevent the request - from being sent). + A deferred which fires when the header of the response has been received + (regardless of the response status code). Fails if there is any problem + which prevents that response from being received (including problems that + prevent the request from being sent). """ # We use urlparse as that will set `port` to None if there is no # explicit port. diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 3c35b1d2c7..b92f1d3d1a 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -951,8 +951,7 @@ class MatrixFederationHttpClient: args: query params Returns: - dict|list: Succeeds when we get a 2xx HTTP response. The - result will be the decoded JSON body. + Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. Raises: HttpResponseException: If we get an HTTP response code >= 300 diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index 1f8227896f..18899bc6d1 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -34,7 +34,7 @@ from twisted.web.client import ( ) from twisted.web.error import SchemeNotSupported from twisted.web.http_headers import Headers -from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS +from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS, IResponse from synapse.http import redact_uri from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint, ProxyCredentials @@ -134,7 +134,7 @@ class ProxyAgent(_AgentBase): uri: bytes, headers: Optional[Headers] = None, bodyProducer: Optional[IBodyProducer] = None, - ) -> defer.Deferred: + ) -> "defer.Deferred[IResponse]": """ Issue a request to the server indicated by the given uri. @@ -157,17 +157,17 @@ class ProxyAgent(_AgentBase): a file upload). Or, None if the request is to have no body. Returns: - Deferred[IResponse]: completes when the header of the response has - been received (regardless of the response status code). + A deferred which completes when the header of the response has + been received (regardless of the response status code). - Can fail with: - SchemeNotSupported: if the uri is not http or https + Can fail with: + SchemeNotSupported: if the uri is not http or https - twisted.internet.error.TimeoutError if the server we are connecting - to (proxy or destination) does not accept a connection before - connectTimeout. + twisted.internet.error.TimeoutError if the server we are connecting + to (proxy or destination) does not accept a connection before + connectTimeout. - ... other things too. + ... other things too. """ uri = uri.strip() if not _VALID_URI.match(uri): diff --git a/synapse/http/server.py b/synapse/http/server.py index b26e34bceb..051a1899a0 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -267,7 +267,7 @@ class HttpServer(Protocol): request. The first argument will be the request object and subsequent arguments will be any matched groups from the regex. This should return either tuple of (code, response), or None. - servlet_classname (str): The name of the handler to be used in prometheus + servlet_classname: The name of the handler to be used in prometheus and opentracing logs. """ diff --git a/synapse/http/site.py b/synapse/http/site.py index 3dbd541fed..6a1dbf7f33 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -400,7 +400,7 @@ class SynapseRequest(Request): be sure to call finished_processing. Args: - servlet_name (str): the name of the servlet which will be + servlet_name: the name of the servlet which will be processing this request. This is used in the metrics. It is possible to update this afterwards by updating diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 6a08ffed64..f62bea968f 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -117,8 +117,7 @@ class ContextResourceUsage: """Create a new ContextResourceUsage Args: - copy_from (ContextResourceUsage|None): if not None, an object to - copy stats from + copy_from: if not None, an object to copy stats from """ if copy_from is None: self.reset() @@ -162,7 +161,7 @@ class ContextResourceUsage: """Add another ContextResourceUsage's stats to this one's. Args: - other (ContextResourceUsage): the other resource usage object + other: the other resource usage object """ self.ru_utime += other.ru_utime self.ru_stime += other.ru_stime @@ -342,7 +341,7 @@ class LoggingContext: called directly. Returns: - LoggingContext: the current logging context + The current logging context """ warnings.warn( "synapse.logging.context.LoggingContext.current_context() is deprecated " @@ -362,7 +361,8 @@ class LoggingContext: called directly. Args: - context(LoggingContext): The context to activate. + context: The context to activate. + Returns: The context that was previously active """ @@ -474,8 +474,7 @@ class LoggingContext: """Get resources used by this logcontext so far. Returns: - ContextResourceUsage: a *copy* of the object tracking resource - usage so far + A *copy* of the object tracking resource usage so far """ # we always return a copy, for consistency res = self._resource_usage.copy() @@ -663,7 +662,8 @@ def current_context() -> LoggingContextOrSentinel: def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSentinel: """Set the current logging context in thread local storage Args: - context(LoggingContext): The context to activate. + context: The context to activate. + Returns: The context that was previously active """ @@ -700,7 +700,7 @@ def nested_logging_context(suffix: str) -> LoggingContext: suffix: suffix to add to the parent context's 'name'. Returns: - LoggingContext: new logging context. + A new logging context. """ curr_context = current_context() if not curr_context: @@ -898,20 +898,19 @@ def defer_to_thread( on it. Args: - reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread - the Deferred will be invoked, and whose threadpool we should use for the - function. + reactor: The reactor in whose main thread the Deferred will be invoked, + and whose threadpool we should use for the function. Normally this will be hs.get_reactor(). - f (callable): The function to call. + f: The function to call. args: positional arguments to pass to f. kwargs: keyword arguments to pass to f. Returns: - Deferred: A Deferred which fires a callback with the result of `f`, or an + A Deferred which fires a callback with the result of `f`, or an errback if `f` throws an exception. """ return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs) @@ -939,20 +938,20 @@ def defer_to_threadpool( on it. Args: - reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread - the Deferred will be invoked. Normally this will be hs.get_reactor(). + reactor: The reactor in whose main thread the Deferred will be invoked. + Normally this will be hs.get_reactor(). - threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for - running `f`. Normally this will be hs.get_reactor().getThreadPool(). + threadpool: The threadpool to use for running `f`. Normally this will be + hs.get_reactor().getThreadPool(). - f (callable): The function to call. + f: The function to call. args: positional arguments to pass to f. kwargs: keyword arguments to pass to f. Returns: - Deferred: A Deferred which fires a callback with the result of `f`, or an + A Deferred which fires a callback with the result of `f`, or an errback if `f` throws an exception. """ curr_context = current_context() diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 8ce5a2a338..b69060854f 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -721,7 +721,7 @@ def inject_header_dict( destination: address of entity receiving the span context. Must be given unless check_destination is False. The context will only be injected if the destination matches the opentracing whitelist - check_destination (bool): If false, destination will be ignored and the context + check_destination: If false, destination will be ignored and the context will always be injected. Note: @@ -780,7 +780,7 @@ def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str destination: the name of the remote server. Returns: - dict: the active span's context if opentracing is enabled, otherwise empty. + the active span's context if opentracing is enabled, otherwise empty. """ if destination and not whitelisted_homeserver(destination): diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 30e689d00d..1adc1fd64f 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -787,7 +787,7 @@ class ModuleApi: Added in Synapse v0.25.0. Args: - access_token(str): access token + access_token: access token Returns: twisted.internet.defer.Deferred - resolves once the access token @@ -832,7 +832,7 @@ class ModuleApi: **kwargs: named args to be passed to func Returns: - Deferred[object]: result of func + Result of func """ # type-ignore: See https://github.com/python/mypy/issues/8862 return defer.ensureDeferred( @@ -924,8 +924,7 @@ class ModuleApi: to represent 'any') of the room state to acquire. Returns: - twisted.internet.defer.Deferred[list(synapse.events.FrozenEvent)]: - The filtered state events in the room. + The filtered state events in the room. """ state_ids = yield defer.ensureDeferred( self._storage_controllers.state.get_current_state_ids( diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 5e661f8c73..3f4d3fc51a 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -153,7 +153,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): argument list. Returns: - dict: If POST/PUT request then dictionary must be JSON serialisable, + If POST/PUT request then dictionary must be JSON serialisable, otherwise must be appropriate for adding as query args. """ return {} diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 1951b8a9f2..6e0c44be2a 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -903,8 +903,9 @@ class PushersRestServlet(RestServlet): @user:server/pushers Returns: - pushers: Dictionary containing pushers information. - total: Number of pushers in dictionary `pushers`. + A dictionary with keys: + pushers: Dictionary containing pushers information. + total: Number of pushers in dictionary `pushers`. """ PATTERNS = admin_patterns("/users/(?P[^/]*)/pushers$") diff --git a/synapse/rest/client/login.py b/synapse/rest/client/login.py index 05706b598c..8adced41e5 100644 --- a/synapse/rest/client/login.py +++ b/synapse/rest/client/login.py @@ -350,7 +350,7 @@ class LoginRestServlet(RestServlet): auth_provider_session_id: The session ID got during login from the SSO IdP. Returns: - result: Dictionary of account information after successful login. + Dictionary of account information after successful login. """ # Before we actually log them in we check if they've already logged in diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 328c0c5477..40b0d39eb2 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -344,8 +344,8 @@ class MediaRepository: download from remote server. Args: - server_name (str): Remote server_name where the media originated. - media_id (str): The media ID of the content (as defined by the + server_name: Remote server_name where the media originated. + media_id: The media ID of the content (as defined by the remote server). Returns: diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py index 9b93b9b4f6..a48a4de92a 100644 --- a/synapse/rest/media/v1/thumbnailer.py +++ b/synapse/rest/media/v1/thumbnailer.py @@ -138,7 +138,7 @@ class Thumbnailer: """Rescales the image to the given dimensions. Returns: - BytesIO: the bytes of the encoded image ready to be written to disk + The bytes of the encoded image ready to be written to disk """ with self._resize(width, height) as scaled: return self._encode_image(scaled, output_type) @@ -155,7 +155,7 @@ class Thumbnailer: max_height: The largest possible height. Returns: - BytesIO: the bytes of the encoded image ready to be written to disk + The bytes of the encoded image ready to be written to disk """ if width * self.height > height * self.width: scaled_width = width diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py index 698ca742ed..94025ba41f 100644 --- a/synapse/server_notices/consent_server_notices.py +++ b/synapse/server_notices/consent_server_notices.py @@ -113,9 +113,8 @@ def copy_with_str_subst(x: Any, substitutions: Any) -> Any: """Deep-copy a structure, carrying out string substitutions on any strings Args: - x (object): structure to be copied - substitutions (object): substitutions to be made - passed into the - string '%' operator + x: structure to be copied + substitutions: substitutions to be made - passed into the string '%' operator Returns: copy of x diff --git a/synapse/server_notices/resource_limits_server_notices.py b/synapse/server_notices/resource_limits_server_notices.py index 3134cd2d3d..a31a2c99a7 100644 --- a/synapse/server_notices/resource_limits_server_notices.py +++ b/synapse/server_notices/resource_limits_server_notices.py @@ -170,11 +170,13 @@ class ResourceLimitsServerNotices: room_id: The room id of the server notices room Returns: - bool: Is the room currently blocked - list: The list of pinned event IDs that are unrelated to limit blocking - This list can be used as a convenience in the case where the block - is to be lifted and the remaining pinned event references need to be - preserved + Tuple of: + Is the room currently blocked + + The list of pinned event IDs that are unrelated to limit blocking + This list can be used as a convenience in the case where the block + is to be lifted and the remaining pinned event references need to be + preserved """ currently_blocked = False pinned_state_event = None diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 48976dc570..33ffef521b 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -204,9 +204,8 @@ class _EventPeristenceQueue(Generic[_PersistResult]): process to to so, calling the per_item_callback for each item. Args: - room_id (str): - task (_EventPersistQueueTask): A _PersistEventsTask or - _UpdateCurrentStateTask to process. + room_id: + task: A _PersistEventsTask or _UpdateCurrentStateTask to process. Returns: the result returned by the `_per_item_callback` passed to diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index aa58c2adc3..e114c733d1 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -535,7 +535,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): limit: Maximum number of device updates to return Returns: - List: List of device update tuples: + List of device update tuples: - user_id - device_id - stream_id diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py index af59be6b48..6240f9a75e 100644 --- a/synapse/storage/databases/main/e2e_room_keys.py +++ b/synapse/storage/databases/main/e2e_room_keys.py @@ -391,10 +391,10 @@ class EndToEndRoomKeyStore(SQLBaseStore): Returns: A dict giving the info metadata for this backup version, with fields including: - version(str) - algorithm(str) - auth_data(object): opaque dict supplied by the client - etag(int): tag of the keys in the backup + version (str) + algorithm (str) + auth_data (object): opaque dict supplied by the client + etag (int): tag of the keys in the backup """ def _get_e2e_room_keys_version_info_txn(txn: LoggingTransaction) -> JsonDict: diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 2a4f58ed92..cf33e73e2b 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -412,10 +412,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker """Retrieve a number of one-time keys for a user Args: - user_id(str): id of user to get keys for - device_id(str): id of device to get keys for - key_ids(list[str]): list of key ids (excluding algorithm) to - retrieve + user_id: id of user to get keys for + device_id: id of device to get keys for + key_ids: list of key ids (excluding algorithm) to retrieve Returns: A map from (algorithm, key_id) to json string for key diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index c4acff5be6..d68f127f9b 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1279,9 +1279,10 @@ class PersistEventsStore: Pick the earliest non-outlier if there is one, else the earliest one. Args: - events_and_contexts (list[(EventBase, EventContext)]): + events_and_contexts: + Returns: - list[(EventBase, EventContext)]: filtered list + filtered list """ new_events_and_contexts: OrderedDict[ str, Tuple[EventBase, EventContext] @@ -1307,9 +1308,8 @@ class PersistEventsStore: """Update min_depth for each room Args: - txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events - we are persisting + txn: db connection + events_and_contexts: events we are persisting """ depth_updates: Dict[str, int] = {} for event, context in events_and_contexts: @@ -1580,13 +1580,11 @@ class PersistEventsStore: """Update all the miscellaneous tables for new events Args: - txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events - we are persisting - all_events_and_contexts (list[(EventBase, EventContext)]): all - events that we were going to persist. This includes events - we've already persisted, etc, that wouldn't appear in - events_and_context. + txn: db connection + events_and_contexts: events we are persisting + all_events_and_contexts: all events that we were going to persist. + This includes events we've already persisted, etc, that wouldn't + appear in events_and_context. inhibit_local_membership_updates: Stop the local_current_membership from being updated by these events. This should be set to True for backfilled events because backfilled events in the past do diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 467d20253d..8a104f7e93 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1589,7 +1589,7 @@ class EventsWorkerStore(SQLBaseStore): room_id: The room ID to query. Returns: - dict[str:float] of complexity version to complexity. + Map of complexity version to complexity. """ state_events = await self.get_current_state_event_counts(room_id) diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py index efd136a864..db9a24db5e 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py @@ -217,7 +217,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore): def _reap_users(txn: LoggingTransaction, reserved_users: List[str]) -> None: """ Args: - reserved_users (tuple): reserved users to preserve + reserved_users: reserved users to preserve """ thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30) @@ -370,8 +370,8 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore): should not appear in the MAU stats). Args: - txn (cursor): - user_id (str): user to add/update + txn: + user_id: user to add/update """ assert ( self._update_on_this_worker @@ -401,7 +401,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore): add the user to the monthly active tables Args: - user_id(str): the user_id to query + user_id: the user_id to query """ assert ( self._update_on_this_worker diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 5167089e03..31f0f2bd3d 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -953,7 +953,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): """Returns user id from threepid Args: - txn (cursor): + txn: medium: threepid medium e.g. email address: threepid address e.g. me@example.com @@ -1283,8 +1283,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): """Sets an expiration date to the account with the given user ID. Args: - user_id (str): User ID to set an expiration date for. - use_delta (bool): If set to False, the expiration date for the user will be + user_id: User ID to set an expiration date for. + use_delta: If set to False, the expiration date for the user will be now + validity period. If set to True, this expiration date will be a random value in the [now + period - d ; now + period] range, d being a delta equal to 10% of the validity period. diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 7d97f8f60e..4fbaefad73 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -2057,7 +2057,8 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): Args: report_id: ID of reported event in database Returns: - event_report: json list of information from event report + JSON dict of information from an event report or None if the + report does not exist. """ def _get_event_report_txn( @@ -2130,8 +2131,9 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): user_id: search for user_id. Ignored if user_id is None room_id: search for room_id. Ignored if room_id is None Returns: - event_reports: json list of event reports - count: total number of event reports matching the filter criteria + Tuple of: + json list of event reports + total number of event reports matching the filter criteria """ def _get_event_reports_paginate_txn( diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index ddb25b5cea..698d6f7515 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -185,9 +185,8 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): - who should be in the user_directory. Args: - progress (dict) - batch_size (int): Maximum number of state events to process - per cycle. + progress + batch_size: Maximum number of state events to process per cycle. Returns: number of events processed. @@ -708,10 +707,10 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): Returns the rooms that a user is in. Args: - user_id(str): Must be a local user + user_id: Must be a local user Returns: - list: user_id + List of room IDs """ rows = await self.db_pool.simple_select_onecol( table="users_who_share_private_rooms", diff --git a/synapse/types.py b/synapse/types.py index 773f0438d5..f2d436ddc3 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -143,8 +143,8 @@ class Requester: Requester. Args: - store (DataStore): Used to convert AS ID to AS object - input (dict): A dict produced by `serialize` + store: Used to convert AS ID to AS object + input: A dict produced by `serialize` Returns: Requester diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 7f1d41eb3c..d24c4f68c4 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -217,7 +217,8 @@ async def concurrently_execute( limit: Maximum number of conccurent executions. Returns: - Deferred: Resolved when all function invocations have finished. + None, when all function invocations have finished. The return values + from those functions are discarded. """ it = iter(args) diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index f7c3a6794e..9387632d0d 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -197,7 +197,7 @@ def register_cache( resize_callback: A function which can be called to resize the cache. Returns: - CacheMetric: an object which provides inc_{hits,misses,evictions} methods + an object which provides inc_{hits,misses,evictions} methods """ if resizable: if not resize_callback: diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index bcb1cba362..bf7bd351e0 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -153,7 +153,7 @@ class DeferredCache(Generic[KT, VT]): Args: key: callback: Gets called when the entry in the cache is invalidated - update_metrics (bool): whether to update the cache hit rate metrics + update_metrics: whether to update the cache hit rate metrics Returns: A Deferred which completes with the result. Note that this may later fail diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index fa91479c97..5eaf70c7ab 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -169,10 +169,11 @@ class DictionaryCache(Generic[KT, DKT, DV]): if it is in the cache. Returns: - DictionaryEntry: If `dict_keys` is not None then `DictionaryEntry` - will contain include the keys that are in the cache. If None then - will either return the full dict if in the cache, or the empty - dict (with `full` set to False) if it isn't. + If `dict_keys` is not None then `DictionaryEntry` will contain include + the keys that are in the cache. + + If None then will either return the full dict if in the cache, or the + empty dict (with `full` set to False) if it isn't. """ if dict_keys is None: # The caller wants the full set of dictionary keys for this cache key diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index c6a5d0dfc0..01ad02af67 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -207,7 +207,7 @@ class ExpiringCache(Generic[KT, VT]): items from the cache. Returns: - bool: Whether the cache changed size or not. + Whether the cache changed size or not. """ new_size = int(self._original_max_size * factor) if new_size != self._max_size: diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index aa93109d13..dcf0eac3bf 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -389,11 +389,11 @@ class LruCache(Generic[KT, VT]): cache_name: The name of this cache, for the prometheus metrics. If unset, no metrics will be reported on this cache. - cache_type (type): + cache_type: type of underlying cache to be used. Typically one of dict or TreeCache. - size_callback (func(V) -> int | None): + size_callback: metrics_collection_callback: metrics collection callback. This is called early in the metrics @@ -403,7 +403,7 @@ class LruCache(Generic[KT, VT]): Ignored if cache_name is None. - apply_cache_factor_from_config (bool): If true, `max_size` will be + apply_cache_factor_from_config: If true, `max_size` will be multiplied by a cache factor derived from the homeserver config clock: @@ -796,7 +796,7 @@ class LruCache(Generic[KT, VT]): items from the cache. Returns: - bool: Whether the cache changed size or not. + Whether the cache changed size or not. """ if not self.apply_cache_factor_from_config: return False diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 9f64fed0d7..2aceb1a47f 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -183,7 +183,7 @@ class FederationRateLimiter: # Handle request ... Args: - host (str): Origin of incoming request. + host: Origin of incoming request. Returns: context manager which returns a deferred. diff --git a/synapse/util/threepids.py b/synapse/util/threepids.py index 1e9c2faa64..54bc7589fd 100644 --- a/synapse/util/threepids.py +++ b/synapse/util/threepids.py @@ -48,7 +48,7 @@ async def check_3pid_allowed( registration: whether we want to bind the 3PID as part of registering a new user. Returns: - bool: whether the 3PID medium/address is allowed to be added to this HS + whether the 3PID medium/address is allowed to be added to this HS """ if not await hs.get_password_auth_provider().is_3pid_allowed( medium, address, registration diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py index 177e198e7e..b1ec7f4bd8 100644 --- a/synapse/util/wheel_timer.py +++ b/synapse/util/wheel_timer.py @@ -90,10 +90,10 @@ class WheelTimer(Generic[T]): """Fetch any objects that have timed out Args: - now (ms): Current time in msec + now: Current time in msec Returns: - list: List of objects that have timed out + List of objects that have timed out """ now_key = int(now / self.bucket_size) diff --git a/tests/http/__init__.py b/tests/http/__init__.py index e74f7f5b48..093537adef 100644 --- a/tests/http/__init__.py +++ b/tests/http/__init__.py @@ -13,6 +13,7 @@ # limitations under the License. import os.path import subprocess +from typing import List from zope.interface import implementer @@ -70,14 +71,14 @@ subjectAltName = %(sanentries)s """ -def create_test_cert_file(sanlist): +def create_test_cert_file(sanlist: List[bytes]) -> str: """build an x509 certificate file Args: - sanlist: list[bytes]: a list of subjectAltName values for the cert + sanlist: a list of subjectAltName values for the cert Returns: - str: the path to the file + The path to the file """ global cert_file_count csr_filename = "server.csr" diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 96f3880923..dce71f7334 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -143,6 +143,7 @@ class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase): self.persist(type="m.room.create", key="", creator=USER_ID) self.check("get_invited_rooms_for_local_user", [USER_ID_2], []) event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite") + assert event.internal_metadata.stream_ordering is not None self.replicate() @@ -230,6 +231,7 @@ class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase): j2 = self.persist( type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join" ) + assert j2.internal_metadata.stream_ordering is not None self.replicate() expected_pos = PersistedEventPosition( @@ -287,6 +289,7 @@ class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase): ) ) self.replicate() + assert j2.internal_metadata.stream_ordering is not None event_source = RoomEventSource(self.hs) event_source.store = self.slaved_store @@ -336,10 +339,10 @@ class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase): event_id = 0 - def persist(self, backfill=False, **kwargs): + def persist(self, backfill=False, **kwargs) -> FrozenEvent: """ Returns: - synapse.events.FrozenEvent: The event that was persisted. + The event that was persisted. """ event, context = self.build_event(**kwargs) diff --git a/tests/replication/test_multi_media_repo.py b/tests/replication/test_multi_media_repo.py index 13aa5eb51a..96cdf2c45b 100644 --- a/tests/replication/test_multi_media_repo.py +++ b/tests/replication/test_multi_media_repo.py @@ -15,8 +15,9 @@ import logging import os from typing import Optional, Tuple +from twisted.internet.interfaces import IOpenSSLServerConnectionCreator from twisted.internet.protocol import Factory -from twisted.protocols.tls import TLSMemoryBIOFactory +from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol from twisted.web.http import HTTPChannel from twisted.web.server import Request @@ -102,7 +103,7 @@ class MediaRepoShardTestCase(BaseMultiWorkerStreamTestCase): ) # fish the test server back out of the server-side TLS protocol. - http_server = server_tls_protocol.wrappedProtocol + http_server: HTTPChannel = server_tls_protocol.wrappedProtocol # type: ignore[assignment] # give the reactor a pump to get the TLS juices flowing. self.reactor.pump((0.1,)) @@ -238,16 +239,15 @@ def get_connection_factory(): return test_server_connection_factory -def _build_test_server(connection_creator): +def _build_test_server( + connection_creator: IOpenSSLServerConnectionCreator, +) -> TLSMemoryBIOProtocol: """Construct a test server This builds an HTTP channel, wrapped with a TLSMemoryBIOProtocol Args: - connection_creator (IOpenSSLServerConnectionCreator): thing to build - SSL connections - sanlist (list[bytes]): list of the SAN entries for the cert returned - by the server + connection_creator: thing to build SSL connections Returns: TLSMemoryBIOProtocol diff --git a/tests/server_notices/test_resource_limits_server_notices.py b/tests/server_notices/test_resource_limits_server_notices.py index bf403045e9..7cbc40736c 100644 --- a/tests/server_notices/test_resource_limits_server_notices.py +++ b/tests/server_notices/test_resource_limits_server_notices.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import Tuple from unittest.mock import Mock from twisted.test.proto_helpers import MemoryReactor @@ -350,14 +351,15 @@ class TestResourceLimitsServerNoticesWithRealRooms(unittest.HomeserverTestCase): self.assertTrue(notice_in_room, "No server notice in room") - def _trigger_notice_and_join(self): + def _trigger_notice_and_join(self) -> Tuple[str, str, str]: """Creates enough active users to hit the MAU limit and trigger a system notice about it, then joins the system notices room with one of the users created. Returns: - user_id (str): The ID of the user that joined the room. - tok (str): The access token of the user that joined the room. - room_id (str): The ID of the room that's been joined. + A tuple of: + user_id: The ID of the user that joined the room. + tok: The access token of the user that joined the room. + room_id: The ID of the room that's been joined. """ user_id = None tok = None diff --git a/tests/unittest.py b/tests/unittest.py index 5116be338e..a120c2976c 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -360,13 +360,13 @@ class HomeserverTestCase(TestCase): store.db_pool.updates.do_next_background_update(False), by=0.1 ) - def make_homeserver(self, reactor, clock): + def make_homeserver(self, reactor: MemoryReactor, clock: Clock): """ Make and return a homeserver. Args: reactor: A Twisted Reactor, or something that pretends to be one. - clock (synapse.util.Clock): The Clock, associated with the reactor. + clock: The Clock, associated with the reactor. Returns: A homeserver suitable for testing. @@ -426,9 +426,8 @@ class HomeserverTestCase(TestCase): Args: reactor: A Twisted Reactor, or something that pretends to be one. - clock (synapse.util.Clock): The Clock, associated with the reactor. - homeserver (synapse.server.HomeServer): The HomeServer to test - against. + clock: The Clock, associated with the reactor. + homeserver: The HomeServer to test against. Function to optionally be overridden in subclasses. """ @@ -452,11 +451,10 @@ class HomeserverTestCase(TestCase): given content. Args: - method (bytes/unicode): The HTTP request method ("verb"). - path (bytes/unicode): The HTTP path, suitably URL encoded (e.g. - escaped UTF-8 & spaces and such). - content (bytes or dict): The body of the request. JSON-encoded, if - a dict. + method: The HTTP request method ("verb"). + path: The HTTP path, suitably URL encoded (e.g. escaped UTF-8 & spaces + and such). content (bytes or dict): The body of the request. + JSON-encoded, if a dict. shorthand: Whether to try and be helpful and prefix the given URL with the usual REST API path, if it doesn't contain it. federation_auth_origin: if set to not-None, we will add a fake -- cgit 1.5.1 From c15e9a0edb696990365ac5a4e5be847b5ae23921 Mon Sep 17 00:00:00 2001 From: realtyem Date: Wed, 16 Nov 2022 16:16:25 -0600 Subject: Remove need for `worker_main_http_uri` setting to use /keys/upload. (#14400) --- changelog.d/14400.misc | 1 + docker/configure_workers_and_start.py | 5 +- docs/workers.md | 7 +-- synapse/app/generic_worker.py | 103 +--------------------------------- synapse/config/workers.py | 6 ++ synapse/replication/http/devices.py | 67 ++++++++++++++++++++++ synapse/rest/client/keys.py | 68 ++++++++++++++++------ 7 files changed, 130 insertions(+), 127 deletions(-) create mode 100644 changelog.d/14400.misc (limited to 'synapse/replication/http') diff --git a/changelog.d/14400.misc b/changelog.d/14400.misc new file mode 100644 index 0000000000..6e025329c4 --- /dev/null +++ b/changelog.d/14400.misc @@ -0,0 +1 @@ +Remove the `worker_main_http_uri` configuration setting. This is now handled via internal replication. diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 62b1bab297..c1e1544536 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -213,10 +213,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "listener_resources": ["client", "replication"], "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], "shared_extra_conf": {}, - "worker_extra_conf": ( - "worker_main_http_uri: http://127.0.0.1:%d" - % (MAIN_PROCESS_HTTP_LISTENER_PORT,) - ), + "worker_extra_conf": "", }, "account_data": { "app": "synapse.app.generic_worker", diff --git a/docs/workers.md b/docs/workers.md index 7ee8801161..4604650803 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -135,8 +135,8 @@ In the config file for each worker, you must specify: [`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)). * If handling HTTP requests, a [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) option with an `http` listener. - * If handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for - the main process (`worker_main_http_uri`). + * **Synapse 1.71 and older:** if handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for + the main process (`worker_main_http_uri`). This config option is no longer required and is ignored when running Synapse 1.72 and newer. For example: @@ -221,7 +221,6 @@ information. ^/_matrix/client/(api/v1|r0|v3|unstable)/search$ # Encryption requests - # Note that ^/_matrix/client/(r0|v3|unstable)/keys/upload/ requires `worker_main_http_uri` ^/_matrix/client/(r0|v3|unstable)/keys/query$ ^/_matrix/client/(r0|v3|unstable)/keys/changes$ ^/_matrix/client/(r0|v3|unstable)/keys/claim$ @@ -376,7 +375,7 @@ responsible for - persisting them to the DB, and finally - updating the events stream. -Because load is sharded in this way, you *must* restart all worker instances when +Because load is sharded in this way, you *must* restart all worker instances when adding or removing event persisters. An `event_persister` should not be mistaken for an `event_creator`. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 1d9aef45c2..74909b7d4a 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -14,14 +14,12 @@ # limitations under the License. import logging import sys -from typing import Dict, List, Optional, Tuple +from typing import Dict, List -from twisted.internet import address from twisted.web.resource import Resource import synapse import synapse.events -from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError from synapse.api.urls import ( CLIENT_API_PREFIX, FEDERATION_PREFIX, @@ -43,8 +41,6 @@ from synapse.config.logger import setup_logging from synapse.config.server import ListenerConfig from synapse.federation.transport.server import TransportLayerServer from synapse.http.server import JsonResource, OptionsResource -from synapse.http.servlet import RestServlet, parse_json_object_from_request -from synapse.http.site import SynapseRequest from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource @@ -70,12 +66,12 @@ from synapse.rest.client import ( versions, voip, ) -from synapse.rest.client._base import client_patterns from synapse.rest.client.account import ThreepidRestServlet, WhoamiRestServlet from synapse.rest.client.devices import DevicesRestServlet from synapse.rest.client.keys import ( KeyChangesServlet, KeyQueryServlet, + KeyUploadServlet, OneTimeKeyServlet, ) from synapse.rest.client.register import ( @@ -132,107 +128,12 @@ from synapse.storage.databases.main.transactions import TransactionWorkerStore from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore from synapse.storage.databases.main.user_directory import UserDirectoryStore from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore -from synapse.types import JsonDict from synapse.util import SYNAPSE_VERSION from synapse.util.httpresourcetree import create_resource_tree logger = logging.getLogger("synapse.app.generic_worker") -class KeyUploadServlet(RestServlet): - """An implementation of the `KeyUploadServlet` that responds to read only - requests, but otherwise proxies through to the master instance. - """ - - PATTERNS = client_patterns("/keys/upload(/(?P[^/]+))?$") - - def __init__(self, hs: HomeServer): - """ - Args: - hs: server - """ - super().__init__() - self.auth = hs.get_auth() - self.store = hs.get_datastores().main - self.http_client = hs.get_simple_http_client() - self.main_uri = hs.config.worker.worker_main_http_uri - - async def on_POST( - self, request: SynapseRequest, device_id: Optional[str] - ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request, allow_guest=True) - user_id = requester.user.to_string() - body = parse_json_object_from_request(request) - - if device_id is not None: - # passing the device_id here is deprecated; however, we allow it - # for now for compatibility with older clients. - if requester.device_id is not None and device_id != requester.device_id: - logger.warning( - "Client uploading keys for a different device " - "(logged in as %s, uploading for %s)", - requester.device_id, - device_id, - ) - else: - device_id = requester.device_id - - if device_id is None: - raise SynapseError( - 400, "To upload keys, you must pass device_id when authenticating" - ) - - if body: - # They're actually trying to upload something, proxy to main synapse. - - # Proxy headers from the original request, such as the auth headers - # (in case the access token is there) and the original IP / - # User-Agent of the request. - headers: Dict[bytes, List[bytes]] = { - header: list(request.requestHeaders.getRawHeaders(header, [])) - for header in (b"Authorization", b"User-Agent") - } - # Add the previous hop to the X-Forwarded-For header. - x_forwarded_for = list( - request.requestHeaders.getRawHeaders(b"X-Forwarded-For", []) - ) - # we use request.client here, since we want the previous hop, not the - # original client (as returned by request.getClientAddress()). - if isinstance(request.client, (address.IPv4Address, address.IPv6Address)): - previous_host = request.client.host.encode("ascii") - # If the header exists, add to the comma-separated list of the first - # instance of the header. Otherwise, generate a new header. - if x_forwarded_for: - x_forwarded_for = [x_forwarded_for[0] + b", " + previous_host] - x_forwarded_for.extend(x_forwarded_for[1:]) - else: - x_forwarded_for = [previous_host] - headers[b"X-Forwarded-For"] = x_forwarded_for - - # Replicate the original X-Forwarded-Proto header. Note that - # XForwardedForRequest overrides isSecure() to give us the original protocol - # used by the client, as opposed to the protocol used by our upstream proxy - # - which is what we want here. - headers[b"X-Forwarded-Proto"] = [ - b"https" if request.isSecure() else b"http" - ] - - try: - result = await self.http_client.post_json_get_json( - self.main_uri + request.uri.decode("ascii"), body, headers=headers - ) - except HttpResponseException as e: - raise e.to_synapse_error() from e - except RequestSendFailed as e: - raise SynapseError(502, "Failed to talk to master") from e - - return 200, result - else: - # Just interested in counts. - result = await self.store.count_e2e_one_time_keys(user_id, device_id) - return 200, {"one_time_key_counts": result} - - class GenericWorkerSlavedStore( # FIXME(#3714): We need to add UserDirectoryStore as we write directly # rather than going via the correct worker. diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 88b3168cbc..c4e2273a95 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -162,7 +162,13 @@ class WorkerConfig(Config): self.worker_name = config.get("worker_name", self.worker_app) self.instance_name = self.worker_name or "master" + # FIXME: Remove this check after a suitable amount of time. self.worker_main_http_uri = config.get("worker_main_http_uri", None) + if self.worker_main_http_uri is not None: + logger.warning( + "The config option worker_main_http_uri is unused since Synapse 1.72. " + "It can be safely removed from your configuration." + ) # This option is really only here to support `--manhole` command line # argument. diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py index 3d63645726..c21629def8 100644 --- a/synapse/replication/http/devices.py +++ b/synapse/replication/http/devices.py @@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, Tuple from twisted.web.server import Request from synapse.http.server import HttpServer +from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint from synapse.types import JsonDict @@ -78,5 +79,71 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint): return 200, user_devices +class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint): + """Ask master to upload keys for the user and send them out over federation to + update other servers. + + For now, only the master is permitted to handle key upload requests; + any worker can handle key query requests (since they're read-only). + + Calls to e2e_keys_handler.upload_keys_for_user(user_id, device_id, keys) on + the main process to accomplish this. + + Defined in https://spec.matrix.org/v1.4/client-server-api/#post_matrixclientv3keysupload + Request format(borrowed and expanded from KeyUploadServlet): + + POST /_synapse/replication/upload_keys_for_user + + { + "user_id": "", + "device_id": "", + "keys": { + ....this part can be found in KeyUploadServlet in rest/client/keys.py.... + } + } + + Response is equivalent to ` /_matrix/client/v3/keys/upload` found in KeyUploadServlet + + """ + + NAME = "upload_keys_for_user" + PATH_ARGS = () + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self.e2e_keys_handler = hs.get_e2e_keys_handler() + self.store = hs.get_datastores().main + self.clock = hs.get_clock() + + @staticmethod + async def _serialize_payload( # type: ignore[override] + user_id: str, device_id: str, keys: JsonDict + ) -> JsonDict: + + return { + "user_id": user_id, + "device_id": device_id, + "keys": keys, + } + + async def _handle_request( # type: ignore[override] + self, request: Request + ) -> Tuple[int, JsonDict]: + content = parse_json_object_from_request(request) + + user_id = content["user_id"] + device_id = content["device_id"] + keys = content["keys"] + + results = await self.e2e_keys_handler.upload_keys_for_user( + user_id, device_id, keys + ) + + return 200, results + + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: ReplicationUserDevicesResyncRestServlet(hs).register(http_server) + ReplicationUploadKeysForUserRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py index f653d2a3e1..ee038c7192 100644 --- a/synapse/rest/client/keys.py +++ b/synapse/rest/client/keys.py @@ -27,6 +27,7 @@ from synapse.http.servlet import ( ) from synapse.http.site import SynapseRequest from synapse.logging.opentracing import log_kv, set_tag +from synapse.replication.http.devices import ReplicationUploadKeysForUserRestServlet from synapse.rest.client._base import client_patterns, interactive_auth_handler from synapse.types import JsonDict, StreamToken from synapse.util.cancellation import cancellable @@ -43,24 +44,48 @@ class KeyUploadServlet(RestServlet): Content-Type: application/json { - "device_keys": { - "user_id": "", - "device_id": "", - "valid_until_ts": , - "algorithms": [ - "m.olm.curve25519-aes-sha2", - ] - "keys": { - ":": "", + "device_keys": { + "user_id": "", + "device_id": "", + "valid_until_ts": , + "algorithms": [ + "m.olm.curve25519-aes-sha2", + ] + "keys": { + ":": "", + }, + "signatures:" { + "" { + ":": "" + } + } + }, + "fallback_keys": { + ":": "", + "signed_:": { + "fallback": true, + "key": "", + "signatures": { + "": { + ":": "" + } + } + } + } + "one_time_keys": { + ":": "" }, - "signatures:" { - "" { - ":": "" - } } }, - "one_time_keys": { - ":": "" - }, } + + response, e.g.: + + { + "one_time_key_counts": { + "curve25519": 10, + "signed_curve25519": 20 + } + } + """ PATTERNS = client_patterns("/keys/upload(/(?P[^/]+))?$") @@ -71,6 +96,13 @@ class KeyUploadServlet(RestServlet): self.e2e_keys_handler = hs.get_e2e_keys_handler() self.device_handler = hs.get_device_handler() + if hs.config.worker.worker_app is None: + # if main process + self.key_uploader = self.e2e_keys_handler.upload_keys_for_user + else: + # then a worker + self.key_uploader = ReplicationUploadKeysForUserRestServlet.make_client(hs) + async def on_POST( self, request: SynapseRequest, device_id: Optional[str] ) -> Tuple[int, JsonDict]: @@ -109,8 +141,8 @@ class KeyUploadServlet(RestServlet): 400, "To upload keys, you must pass device_id when authenticating" ) - result = await self.e2e_keys_handler.upload_keys_for_user( - user_id, device_id, body + result = await self.key_uploader( + user_id=user_id, device_id=device_id, keys=body ) return 200, result -- cgit 1.5.1 From 6d47b7e32589e816eb766446cc1ff19ea73fc7c1 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 22 Nov 2022 14:08:04 -0500 Subject: Add a type hint for `get_device_handler()` and fix incorrect types. (#14055) This was the last untyped handler from the HomeServer object. Since it was being treated as Any (and thus unchecked) it was being used incorrectly in a few places. --- changelog.d/14055.misc | 1 + synapse/handlers/deactivate_account.py | 4 +++ synapse/handlers/device.py | 65 ++++++++++++++++++++++++++-------- synapse/handlers/e2e_keys.py | 61 ++++++++++++++++--------------- synapse/handlers/register.py | 4 +++ synapse/handlers/set_password.py | 6 +++- synapse/handlers/sso.py | 9 +++++ synapse/module_api/__init__.py | 10 +++++- synapse/replication/http/devices.py | 11 ++++-- synapse/rest/admin/__init__.py | 26 ++++++++------ synapse/rest/admin/devices.py | 13 +++++-- synapse/rest/client/devices.py | 17 ++++++--- synapse/rest/client/logout.py | 9 +++-- synapse/server.py | 2 +- tests/handlers/test_device.py | 19 ++++++---- tests/rest/admin/test_device.py | 5 ++- 16 files changed, 185 insertions(+), 77 deletions(-) create mode 100644 changelog.d/14055.misc (limited to 'synapse/replication/http') diff --git a/changelog.d/14055.misc b/changelog.d/14055.misc new file mode 100644 index 0000000000..02980bc528 --- /dev/null +++ b/changelog.d/14055.misc @@ -0,0 +1 @@ +Add missing type hints to `HomeServer`. diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 816e1a6d79..d74d135c0c 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -16,6 +16,7 @@ import logging from typing import TYPE_CHECKING, Optional from synapse.api.errors import SynapseError +from synapse.handlers.device import DeviceHandler from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import Codes, Requester, UserID, create_requester @@ -76,6 +77,9 @@ class DeactivateAccountHandler: True if identity server supports removing threepids, otherwise False. """ + # This can only be called on the main process. + assert isinstance(self._device_handler, DeviceHandler) + # Check if this user can be deactivated if not await self._third_party_rules.check_can_deactivate_user( user_id, by_admin diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index da3ddafeae..b1e55e1b9e 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -65,6 +65,8 @@ DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000 class DeviceWorkerHandler: + device_list_updater: "DeviceListWorkerUpdater" + def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self.hs = hs @@ -76,6 +78,8 @@ class DeviceWorkerHandler: self.server_name = hs.hostname self._msc3852_enabled = hs.config.experimental.msc3852_enabled + self.device_list_updater = DeviceListWorkerUpdater(hs) + @trace async def get_devices_by_user(self, user_id: str) -> List[JsonDict]: """ @@ -99,6 +103,19 @@ class DeviceWorkerHandler: log_kv(device_map) return devices + async def get_dehydrated_device( + self, user_id: str + ) -> Optional[Tuple[str, JsonDict]]: + """Retrieve the information for a dehydrated device. + + Args: + user_id: the user whose dehydrated device we are looking for + Returns: + a tuple whose first item is the device ID, and the second item is + the dehydrated device information + """ + return await self.store.get_dehydrated_device(user_id) + @trace async def get_device(self, user_id: str, device_id: str) -> JsonDict: """Retrieve the given device @@ -127,7 +144,7 @@ class DeviceWorkerHandler: @cancellable async def get_device_changes_in_shared_rooms( self, user_id: str, room_ids: Collection[str], from_token: StreamToken - ) -> Collection[str]: + ) -> Set[str]: """Get the set of users whose devices have changed who share a room with the given user. """ @@ -320,6 +337,8 @@ class DeviceWorkerHandler: class DeviceHandler(DeviceWorkerHandler): + device_list_updater: "DeviceListUpdater" + def __init__(self, hs: "HomeServer"): super().__init__(hs) @@ -606,19 +625,6 @@ class DeviceHandler(DeviceWorkerHandler): await self.delete_devices(user_id, [old_device_id]) return device_id - async def get_dehydrated_device( - self, user_id: str - ) -> Optional[Tuple[str, JsonDict]]: - """Retrieve the information for a dehydrated device. - - Args: - user_id: the user whose dehydrated device we are looking for - Returns: - a tuple whose first item is the device ID, and the second item is - the dehydrated device information - """ - return await self.store.get_dehydrated_device(user_id) - async def rehydrate_device( self, user_id: str, access_token: str, device_id: str ) -> dict: @@ -882,7 +888,36 @@ def _update_device_from_client_ips( ) -class DeviceListUpdater: +class DeviceListWorkerUpdater: + "Handles incoming device list updates from federation and contacts the main process over replication" + + def __init__(self, hs: "HomeServer"): + from synapse.replication.http.devices import ( + ReplicationUserDevicesResyncRestServlet, + ) + + self._user_device_resync_client = ( + ReplicationUserDevicesResyncRestServlet.make_client(hs) + ) + + async def user_device_resync( + self, user_id: str, mark_failed_as_stale: bool = True + ) -> Optional[JsonDict]: + """Fetches all devices for a user and updates the device cache with them. + + Args: + user_id: The user's id whose device_list will be updated. + mark_failed_as_stale: Whether to mark the user's device list as stale + if the attempt to resync failed. + Returns: + A dict with device info as under the "devices" in the result of this + request: + https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid + """ + return await self._user_device_resync_client(user_id=user_id) + + +class DeviceListUpdater(DeviceListWorkerUpdater): "Handles incoming device list updates from federation and updates the DB" def __init__(self, hs: "HomeServer", device_handler: DeviceHandler): diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index bf1221f523..5fe102e2f2 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -27,9 +27,9 @@ from twisted.internet import defer from synapse.api.constants import EduTypes from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError +from synapse.handlers.device import DeviceHandler from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace -from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.types import ( JsonDict, UserID, @@ -56,27 +56,23 @@ class E2eKeysHandler: self.is_mine = hs.is_mine self.clock = hs.get_clock() - self._edu_updater = SigningKeyEduUpdater(hs, self) - federation_registry = hs.get_federation_registry() - self._is_master = hs.config.worker.worker_app is None - if not self._is_master: - self._user_device_resync_client = ( - ReplicationUserDevicesResyncRestServlet.make_client(hs) - ) - else: + is_master = hs.config.worker.worker_app is None + if is_master: + edu_updater = SigningKeyEduUpdater(hs) + # Only register this edu handler on master as it requires writing # device updates to the db federation_registry.register_edu_handler( EduTypes.SIGNING_KEY_UPDATE, - self._edu_updater.incoming_signing_key_update, + edu_updater.incoming_signing_key_update, ) # also handle the unstable version # FIXME: remove this when enough servers have upgraded federation_registry.register_edu_handler( EduTypes.UNSTABLE_SIGNING_KEY_UPDATE, - self._edu_updater.incoming_signing_key_update, + edu_updater.incoming_signing_key_update, ) # doesn't really work as part of the generic query API, because the @@ -319,14 +315,13 @@ class E2eKeysHandler: # probably be tracking their device lists. However, we haven't # done an initial sync on the device list so we do it now. try: - if self._is_master: - resync_results = await self.device_handler.device_list_updater.user_device_resync( + resync_results = ( + await self.device_handler.device_list_updater.user_device_resync( user_id ) - else: - resync_results = await self._user_device_resync_client( - user_id=user_id - ) + ) + if resync_results is None: + raise ValueError("Device resync failed") # Add the device keys to the results. user_devices = resync_results["devices"] @@ -605,6 +600,8 @@ class E2eKeysHandler: async def upload_keys_for_user( self, user_id: str, device_id: str, keys: JsonDict ) -> JsonDict: + # This can only be called from the main process. + assert isinstance(self.device_handler, DeviceHandler) time_now = self.clock.time_msec() @@ -732,6 +729,8 @@ class E2eKeysHandler: user_id: the user uploading the keys keys: the signing keys """ + # This can only be called from the main process. + assert isinstance(self.device_handler, DeviceHandler) # if a master key is uploaded, then check it. Otherwise, load the # stored master key, to check signatures on other keys @@ -823,6 +822,9 @@ class E2eKeysHandler: Raises: SynapseError: if the signatures dict is not valid. """ + # This can only be called from the main process. + assert isinstance(self.device_handler, DeviceHandler) + failures = {} # signatures to be stored. Each item will be a SignatureListItem @@ -1200,6 +1202,9 @@ class E2eKeysHandler: A tuple of the retrieved key content, the key's ID and the matching VerifyKey. If the key cannot be retrieved, all values in the tuple will instead be None. """ + # This can only be called from the main process. + assert isinstance(self.device_handler, DeviceHandler) + try: remote_result = await self.federation.query_user_devices( user.domain, user.to_string() @@ -1396,11 +1401,14 @@ class SignatureListItem: class SigningKeyEduUpdater: """Handles incoming signing key updates from federation and updates the DB""" - def __init__(self, hs: "HomeServer", e2e_keys_handler: E2eKeysHandler): + def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main self.federation = hs.get_federation_client() self.clock = hs.get_clock() - self.e2e_keys_handler = e2e_keys_handler + + device_handler = hs.get_device_handler() + assert isinstance(device_handler, DeviceHandler) + self._device_handler = device_handler self._remote_edu_linearizer = Linearizer(name="remote_signing_key") @@ -1445,9 +1453,6 @@ class SigningKeyEduUpdater: user_id: the user whose updates we are processing """ - device_handler = self.e2e_keys_handler.device_handler - device_list_updater = device_handler.device_list_updater - async with self._remote_edu_linearizer.queue(user_id): pending_updates = self._pending_updates.pop(user_id, []) if not pending_updates: @@ -1459,13 +1464,11 @@ class SigningKeyEduUpdater: logger.info("pending updates: %r", pending_updates) for master_key, self_signing_key in pending_updates: - new_device_ids = ( - await device_list_updater.process_cross_signing_key_update( - user_id, - master_key, - self_signing_key, - ) + new_device_ids = await self._device_handler.device_list_updater.process_cross_signing_key_update( + user_id, + master_key, + self_signing_key, ) device_ids = device_ids + new_device_ids - await device_handler.notify_device_update(user_id, device_ids) + await self._device_handler.notify_device_update(user_id, device_ids) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index ca1c7a1866..6307fa9c5d 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -38,6 +38,7 @@ from synapse.api.errors import ( ) from synapse.appservice import ApplicationService from synapse.config.server import is_threepid_reserved +from synapse.handlers.device import DeviceHandler from synapse.http.servlet import assert_params_in_dict from synapse.replication.http.login import RegisterDeviceReplicationServlet from synapse.replication.http.register import ( @@ -841,6 +842,9 @@ class RegistrationHandler: refresh_token = None refresh_token_id = None + # This can only run on the main process. + assert isinstance(self.device_handler, DeviceHandler) + registered_device_id = await self.device_handler.check_device_registered( user_id, device_id, diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py index 73861bbd40..bd9d0bb34b 100644 --- a/synapse/handlers/set_password.py +++ b/synapse/handlers/set_password.py @@ -15,6 +15,7 @@ import logging from typing import TYPE_CHECKING, Optional from synapse.api.errors import Codes, StoreError, SynapseError +from synapse.handlers.device import DeviceHandler from synapse.types import Requester if TYPE_CHECKING: @@ -29,7 +30,10 @@ class SetPasswordHandler: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main self._auth_handler = hs.get_auth_handler() - self._device_handler = hs.get_device_handler() + # This can only be instantiated on the main process. + device_handler = hs.get_device_handler() + assert isinstance(device_handler, DeviceHandler) + self._device_handler = device_handler async def set_password( self, diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py index 749d7e93b0..e1c0bff1b2 100644 --- a/synapse/handlers/sso.py +++ b/synapse/handlers/sso.py @@ -37,6 +37,7 @@ from twisted.web.server import Request from synapse.api.constants import LoginType from synapse.api.errors import Codes, NotFoundError, RedirectException, SynapseError from synapse.config.sso import SsoAttributeRequirement +from synapse.handlers.device import DeviceHandler from synapse.handlers.register import init_counters_for_auth_provider from synapse.handlers.ui_auth import UIAuthSessionDataConstants from synapse.http import get_request_user_agent @@ -1035,6 +1036,8 @@ class SsoHandler: ) -> None: """Revoke any devices and in-flight logins tied to a provider session. + Can only be called from the main process. + Args: auth_provider_id: A unique identifier for this SSO provider, e.g. "oidc" or "saml". @@ -1042,6 +1045,12 @@ class SsoHandler: expected_user_id: The user we're expecting to logout. If set, it will ignore sessions belonging to other users and log an error. """ + + # It is expected that this is the main process. + assert isinstance( + self._device_handler, DeviceHandler + ), "revoking SSO sessions can only be called on the main process" + # Invalidate any running user-mapping sessions to_delete = [] for session_id, session in self._username_mapping_sessions.items(): diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 1adc1fd64f..96a661177a 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -86,6 +86,7 @@ from synapse.handlers.auth import ( ON_LOGGED_OUT_CALLBACK, AuthHandler, ) +from synapse.handlers.device import DeviceHandler from synapse.handlers.push_rules import RuleSpec, check_actions from synapse.http.client import SimpleHttpClient from synapse.http.server import ( @@ -207,6 +208,7 @@ class ModuleApi: self._registration_handler = hs.get_registration_handler() self._send_email_handler = hs.get_send_email_handler() self._push_rules_handler = hs.get_push_rules_handler() + self._device_handler = hs.get_device_handler() self.custom_template_dir = hs.config.server.custom_template_directory try: @@ -784,6 +786,8 @@ class ModuleApi: ) -> Generator["defer.Deferred[Any]", Any, None]: """Invalidate an access token for a user + Can only be called from the main process. + Added in Synapse v0.25.0. Args: @@ -796,6 +800,10 @@ class ModuleApi: Raises: synapse.api.errors.AuthError: the access token is invalid """ + assert isinstance( + self._device_handler, DeviceHandler + ), "invalidate_access_token can only be called on the main process" + # see if the access token corresponds to a device user_info = yield defer.ensureDeferred( self._auth.get_user_by_access_token(access_token) @@ -805,7 +813,7 @@ class ModuleApi: if device_id: # delete the device, which will also delete its access tokens yield defer.ensureDeferred( - self._hs.get_device_handler().delete_devices(user_id, [device_id]) + self._device_handler.delete_devices(user_id, [device_id]) ) else: # no associated device. Just delete the access token. diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py index c21629def8..7c4941c3d3 100644 --- a/synapse/replication/http/devices.py +++ b/synapse/replication/http/devices.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, Tuple +from typing import TYPE_CHECKING, Optional, Tuple from twisted.web.server import Request @@ -63,7 +63,12 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint): def __init__(self, hs: "HomeServer"): super().__init__(hs) - self.device_list_updater = hs.get_device_handler().device_list_updater + from synapse.handlers.device import DeviceHandler + + handler = hs.get_device_handler() + assert isinstance(handler, DeviceHandler) + self.device_list_updater = handler.device_list_updater + self.store = hs.get_datastores().main self.clock = hs.get_clock() @@ -73,7 +78,7 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint): async def _handle_request( # type: ignore[override] self, request: Request, user_id: str - ) -> Tuple[int, JsonDict]: + ) -> Tuple[int, Optional[JsonDict]]: user_devices = await self.device_list_updater.user_device_resync(user_id) return 200, user_devices diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index c62ea22116..fb73886df0 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -238,6 +238,10 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: """ Register all the admin servlets. """ + # Admin servlets aren't registered on workers. + if hs.config.worker.worker_app is not None: + return + register_servlets_for_client_rest_resource(hs, http_server) BlockRoomRestServlet(hs).register(http_server) ListRoomRestServlet(hs).register(http_server) @@ -254,9 +258,6 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: UserTokenRestServlet(hs).register(http_server) UserRestServletV2(hs).register(http_server) UsersRestServletV2(hs).register(http_server) - DeviceRestServlet(hs).register(http_server) - DevicesRestServlet(hs).register(http_server) - DeleteDevicesRestServlet(hs).register(http_server) UserMediaStatisticsRestServlet(hs).register(http_server) EventReportDetailRestServlet(hs).register(http_server) EventReportsRestServlet(hs).register(http_server) @@ -280,12 +281,13 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: UserByExternalId(hs).register(http_server) UserByThreePid(hs).register(http_server) - # Some servlets only get registered for the main process. - if hs.config.worker.worker_app is None: - SendServerNoticeServlet(hs).register(http_server) - BackgroundUpdateEnabledRestServlet(hs).register(http_server) - BackgroundUpdateRestServlet(hs).register(http_server) - BackgroundUpdateStartJobRestServlet(hs).register(http_server) + DeviceRestServlet(hs).register(http_server) + DevicesRestServlet(hs).register(http_server) + DeleteDevicesRestServlet(hs).register(http_server) + SendServerNoticeServlet(hs).register(http_server) + BackgroundUpdateEnabledRestServlet(hs).register(http_server) + BackgroundUpdateRestServlet(hs).register(http_server) + BackgroundUpdateStartJobRestServlet(hs).register(http_server) def register_servlets_for_client_rest_resource( @@ -294,9 +296,11 @@ def register_servlets_for_client_rest_resource( """Register only the servlets which need to be exposed on /_matrix/client/xxx""" WhoisRestServlet(hs).register(http_server) PurgeHistoryStatusRestServlet(hs).register(http_server) - DeactivateAccountRestServlet(hs).register(http_server) PurgeHistoryRestServlet(hs).register(http_server) - ResetPasswordRestServlet(hs).register(http_server) + # The following resources can only be run on the main process. + if hs.config.worker.worker_app is None: + DeactivateAccountRestServlet(hs).register(http_server) + ResetPasswordRestServlet(hs).register(http_server) SearchUsersRestServlet(hs).register(http_server) UserRegisterServlet(hs).register(http_server) AccountValidityRenewServlet(hs).register(http_server) diff --git a/synapse/rest/admin/devices.py b/synapse/rest/admin/devices.py index d934880102..3b2f2d9abb 100644 --- a/synapse/rest/admin/devices.py +++ b/synapse/rest/admin/devices.py @@ -16,6 +16,7 @@ from http import HTTPStatus from typing import TYPE_CHECKING, Tuple from synapse.api.errors import NotFoundError, SynapseError +from synapse.handlers.device import DeviceHandler from synapse.http.servlet import ( RestServlet, assert_params_in_dict, @@ -43,7 +44,9 @@ class DeviceRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() self.auth = hs.get_auth() - self.device_handler = hs.get_device_handler() + handler = hs.get_device_handler() + assert isinstance(handler, DeviceHandler) + self.device_handler = handler self.store = hs.get_datastores().main self.is_mine = hs.is_mine @@ -112,7 +115,9 @@ class DevicesRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): self.auth = hs.get_auth() - self.device_handler = hs.get_device_handler() + handler = hs.get_device_handler() + assert isinstance(handler, DeviceHandler) + self.device_handler = handler self.store = hs.get_datastores().main self.is_mine = hs.is_mine @@ -143,7 +148,9 @@ class DeleteDevicesRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): self.auth = hs.get_auth() - self.device_handler = hs.get_device_handler() + handler = hs.get_device_handler() + assert isinstance(handler, DeviceHandler) + self.device_handler = handler self.store = hs.get_datastores().main self.is_mine = hs.is_mine diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py index 8f3cbd4ea2..69b803f9f8 100644 --- a/synapse/rest/client/devices.py +++ b/synapse/rest/client/devices.py @@ -20,6 +20,7 @@ from pydantic import Extra, StrictStr from synapse.api import errors from synapse.api.errors import NotFoundError +from synapse.handlers.device import DeviceHandler from synapse.http.server import HttpServer from synapse.http.servlet import ( RestServlet, @@ -80,7 +81,9 @@ class DeleteDevicesRestServlet(RestServlet): super().__init__() self.hs = hs self.auth = hs.get_auth() - self.device_handler = hs.get_device_handler() + handler = hs.get_device_handler() + assert isinstance(handler, DeviceHandler) + self.device_handler = handler self.auth_handler = hs.get_auth_handler() class PostBody(RequestBodyModel): @@ -125,7 +128,9 @@ class DeviceRestServlet(RestServlet): super().__init__() self.hs = hs self.auth = hs.get_auth() - self.device_handler = hs.get_device_handler() + handler = hs.get_device_handler() + assert isinstance(handler, DeviceHandler) + self.device_handler = handler self.auth_handler = hs.get_auth_handler() self._msc3852_enabled = hs.config.experimental.msc3852_enabled @@ -256,7 +261,9 @@ class DehydratedDeviceServlet(RestServlet): super().__init__() self.hs = hs self.auth = hs.get_auth() - self.device_handler = hs.get_device_handler() + handler = hs.get_device_handler() + assert isinstance(handler, DeviceHandler) + self.device_handler = handler async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) @@ -313,7 +320,9 @@ class ClaimDehydratedDeviceServlet(RestServlet): super().__init__() self.hs = hs self.auth = hs.get_auth() - self.device_handler = hs.get_device_handler() + handler = hs.get_device_handler() + assert isinstance(handler, DeviceHandler) + self.device_handler = handler class PostBody(RequestBodyModel): device_id: StrictStr diff --git a/synapse/rest/client/logout.py b/synapse/rest/client/logout.py index 23dfa4518f..6d34625ad5 100644 --- a/synapse/rest/client/logout.py +++ b/synapse/rest/client/logout.py @@ -15,6 +15,7 @@ import logging from typing import TYPE_CHECKING, Tuple +from synapse.handlers.device import DeviceHandler from synapse.http.server import HttpServer from synapse.http.servlet import RestServlet from synapse.http.site import SynapseRequest @@ -34,7 +35,9 @@ class LogoutRestServlet(RestServlet): super().__init__() self.auth = hs.get_auth() self._auth_handler = hs.get_auth_handler() - self._device_handler = hs.get_device_handler() + handler = hs.get_device_handler() + assert isinstance(handler, DeviceHandler) + self._device_handler = handler async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_expired=True) @@ -59,7 +62,9 @@ class LogoutAllRestServlet(RestServlet): super().__init__() self.auth = hs.get_auth() self._auth_handler = hs.get_auth_handler() - self._device_handler = hs.get_device_handler() + handler = hs.get_device_handler() + assert isinstance(handler, DeviceHandler) + self._device_handler = handler async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_expired=True) diff --git a/synapse/server.py b/synapse/server.py index f0a60d0056..5baae2325e 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -510,7 +510,7 @@ class HomeServer(metaclass=abc.ABCMeta): ) @cache_in_self - def get_device_handler(self): + def get_device_handler(self) -> DeviceWorkerHandler: if self.config.worker.worker_app: return DeviceWorkerHandler(self) else: diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index b8b465d35b..ce7525e29c 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -19,7 +19,7 @@ from typing import Optional from twisted.test.proto_helpers import MemoryReactor from synapse.api.errors import NotFoundError, SynapseError -from synapse.handlers.device import MAX_DEVICE_DISPLAY_NAME_LEN +from synapse.handlers.device import MAX_DEVICE_DISPLAY_NAME_LEN, DeviceHandler from synapse.server import HomeServer from synapse.util import Clock @@ -32,7 +32,9 @@ user2 = "@theresa:bbb" class DeviceTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: hs = self.setup_test_homeserver("server", federation_http_client=None) - self.handler = hs.get_device_handler() + handler = hs.get_device_handler() + assert isinstance(handler, DeviceHandler) + self.handler = handler self.store = hs.get_datastores().main return hs @@ -61,6 +63,7 @@ class DeviceTestCase(unittest.HomeserverTestCase): self.assertEqual(res, "fco") dev = self.get_success(self.handler.store.get_device("@boris:foo", "fco")) + assert dev is not None self.assertEqual(dev["display_name"], "display name") def test_device_is_preserved_if_exists(self) -> None: @@ -83,6 +86,7 @@ class DeviceTestCase(unittest.HomeserverTestCase): self.assertEqual(res2, "fco") dev = self.get_success(self.handler.store.get_device("@boris:foo", "fco")) + assert dev is not None self.assertEqual(dev["display_name"], "display name") def test_device_id_is_made_up_if_unspecified(self) -> None: @@ -95,6 +99,7 @@ class DeviceTestCase(unittest.HomeserverTestCase): ) dev = self.get_success(self.handler.store.get_device("@theresa:foo", device_id)) + assert dev is not None self.assertEqual(dev["display_name"], "display") def test_get_devices_by_user(self) -> None: @@ -264,7 +269,9 @@ class DeviceTestCase(unittest.HomeserverTestCase): class DehydrationTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: hs = self.setup_test_homeserver("server", federation_http_client=None) - self.handler = hs.get_device_handler() + handler = hs.get_device_handler() + assert isinstance(handler, DeviceHandler) + self.handler = handler self.registration = hs.get_registration_handler() self.auth = hs.get_auth() self.store = hs.get_datastores().main @@ -284,9 +291,9 @@ class DehydrationTestCase(unittest.HomeserverTestCase): ) ) - retrieved_device_id, device_data = self.get_success( - self.handler.get_dehydrated_device(user_id=user_id) - ) + result = self.get_success(self.handler.get_dehydrated_device(user_id=user_id)) + assert result is not None + retrieved_device_id, device_data = result self.assertEqual(retrieved_device_id, stored_dehydrated_device_id) self.assertEqual(device_data, {"device_data": {"foo": "bar"}}) diff --git a/tests/rest/admin/test_device.py b/tests/rest/admin/test_device.py index d52aee8f92..03f2112b07 100644 --- a/tests/rest/admin/test_device.py +++ b/tests/rest/admin/test_device.py @@ -19,6 +19,7 @@ from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin from synapse.api.errors import Codes +from synapse.handlers.device import DeviceHandler from synapse.rest.client import login from synapse.server import HomeServer from synapse.util import Clock @@ -34,7 +35,9 @@ class DeviceRestTestCase(unittest.HomeserverTestCase): ] def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - self.handler = hs.get_device_handler() + handler = hs.get_device_handler() + assert isinstance(handler, DeviceHandler) + self.handler = handler self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") -- cgit 1.5.1