From 64ec45fc1b0856dc7daacca7d3ab75d50bd89f84 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 1 Feb 2022 14:13:38 +0000 Subject: Send to-device messages to application services (#11215) Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/handlers/appservice.py | 136 +++++++++++++++++++++++++++++++++++------ synapse/handlers/sync.py | 4 +- 2 files changed, 121 insertions(+), 19 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 7833e77e2b..0fb919acf6 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -55,6 +55,9 @@ class ApplicationServicesHandler: self.clock = hs.get_clock() self.notify_appservices = hs.config.appservice.notify_appservices self.event_sources = hs.get_event_sources() + self._msc2409_to_device_messages_enabled = ( + hs.config.experimental.msc2409_to_device_messages_enabled + ) self.current_max = 0 self.is_processing = False @@ -132,7 +135,9 @@ class ApplicationServicesHandler: # Fork off pushes to these services for service in services: - self.scheduler.submit_event_for_as(service, event) + self.scheduler.enqueue_for_appservice( + service, events=[event] + ) now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) @@ -199,8 +204,9 @@ class ApplicationServicesHandler: Args: stream_key: The stream the event came from. - `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other - value for `stream_key` will cause this function to return early. + `stream_key` can be "typing_key", "receipt_key", "presence_key" or + "to_device_key". Any other value for `stream_key` will cause this function + to return early. Ephemeral events will only be pushed to appservices that have opted into receiving them by setting `push_ephemeral` to true in their registration @@ -216,8 +222,15 @@ class ApplicationServicesHandler: if not self.notify_appservices: return - # Ignore any unsupported streams - if stream_key not in ("typing_key", "receipt_key", "presence_key"): + # Notify appservices of updates in ephemeral event streams. + # Only the following streams are currently supported. + # FIXME: We should use constants for these values. + if stream_key not in ( + "typing_key", + "receipt_key", + "presence_key", + "to_device_key", + ): return # Assert that new_token is an integer (and not a RoomStreamToken). @@ -233,6 +246,13 @@ class ApplicationServicesHandler: # Additional context: https://github.com/matrix-org/synapse/pull/11137 assert isinstance(new_token, int) + # Ignore to-device messages if the feature flag is not enabled + if ( + stream_key == "to_device_key" + and not self._msc2409_to_device_messages_enabled + ): + return + # Check whether there are any appservices which have registered to receive # ephemeral events. # @@ -266,7 +286,7 @@ class ApplicationServicesHandler: with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: if stream_key == "typing_key": - # Note that we don't persist the token (via set_type_stream_id_for_appservice) + # Note that we don't persist the token (via set_appservice_stream_type_pos) # for typing_key due to performance reasons and due to their highly # ephemeral nature. # @@ -274,7 +294,7 @@ class ApplicationServicesHandler: # and, if they apply to this application service, send it off. events = await self._handle_typing(service, new_token) if events: - self.scheduler.submit_ephemeral_events_for_as(service, events) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) continue # Since we read/update the stream position for this AS/stream @@ -285,28 +305,37 @@ class ApplicationServicesHandler: ): if stream_key == "receipt_key": events = await self._handle_receipts(service, new_token) - if events: - self.scheduler.submit_ephemeral_events_for_as( - service, events - ) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( + await self.store.set_appservice_stream_type_pos( service, "read_receipt", new_token ) elif stream_key == "presence_key": events = await self._handle_presence(service, users, new_token) - if events: - self.scheduler.submit_ephemeral_events_for_as( - service, events - ) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( + await self.store.set_appservice_stream_type_pos( service, "presence", new_token ) + elif stream_key == "to_device_key": + # Retrieve a list of to-device message events, as well as the + # maximum stream token of the messages we were able to retrieve. + to_device_messages = await self._get_to_device_messages( + service, new_token, users + ) + self.scheduler.enqueue_for_appservice( + service, to_device_messages=to_device_messages + ) + + # Persist the latest handled stream token for this appservice + await self.store.set_appservice_stream_type_pos( + service, "to_device", new_token + ) + async def _handle_typing( self, service: ApplicationService, new_token: int ) -> List[JsonDict]: @@ -440,6 +469,79 @@ class ApplicationServicesHandler: return events + async def _get_to_device_messages( + self, + service: ApplicationService, + new_token: int, + users: Collection[Union[str, UserID]], + ) -> List[JsonDict]: + """ + Given an application service, determine which events it should receive + from those between the last-recorded to-device message stream token for this + appservice and the given stream token. + + Args: + service: The application service to check for which events it should receive. + new_token: The latest to-device event stream token. + users: The users to be notified for the new to-device messages + (ie, the recipients of the messages). + + Returns: + A list of JSON dictionaries containing data derived from the to-device events + that should be sent to the given application service. + """ + # Get the stream token that this application service has processed up until + from_key = await self.store.get_type_stream_id_for_appservice( + service, "to_device" + ) + + # Filter out users that this appservice is not interested in + users_appservice_is_interested_in: List[str] = [] + for user in users: + # FIXME: We should do this farther up the call stack. We currently repeat + # this operation in _handle_presence. + if isinstance(user, UserID): + user = user.to_string() + + if service.is_interested_in_user(user): + users_appservice_is_interested_in.append(user) + + if not users_appservice_is_interested_in: + # Return early if the AS was not interested in any of these users + return [] + + # Retrieve the to-device messages for each user + recipient_device_to_messages = await self.store.get_messages_for_user_devices( + users_appservice_is_interested_in, + from_key, + new_token, + ) + + # According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields + # to the event JSON so that the application service will know which user/device + # combination this messages was intended for. + # + # So we mangle this dict into a flat list of to-device messages with the relevant + # user ID and device ID embedded inside each message dict. + message_payload: List[JsonDict] = [] + for ( + user_id, + device_id, + ), messages in recipient_device_to_messages.items(): + for message_json in messages: + # Remove 'message_id' from the to-device message, as it's an internal ID + message_json.pop("message_id", None) + + message_payload.append( + { + "to_user_id": user_id, + "to_device_id": device_id, + **message_json, + } + ) + + return message_payload + async def query_user_exists(self, user_id: str) -> bool: """Check if any application service knows this user_id exists. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c72ed7c290..aa9a76f8a9 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1348,8 +1348,8 @@ class SyncHandler: if sync_result_builder.since_token is not None: since_stream_id = int(sync_result_builder.since_token.to_device_key) - if since_stream_id != int(now_token.to_device_key): - messages, stream_id = await self.store.get_new_messages_for_device( + if device_id is not None and since_stream_id != int(now_token.to_device_key): + messages, stream_id = await self.store.get_messages_for_device( user_id, device_id, since_stream_id, now_token.to_device_key ) -- cgit 1.4.1 From d80d39b0359d3891d0078e9c8b53e189986d417f Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Feb 2022 14:28:15 +0100 Subject: Add a ratelimiter for 3pid invite (#11892) --- changelog.d/11892.feature | 1 + docs/sample_config.yaml | 7 +++++++ synapse/config/ratelimiting.py | 15 +++++++++++++++ synapse/handlers/room_member.py | 9 ++++++++- 4 files changed, 31 insertions(+), 1 deletion(-) create mode 100644 changelog.d/11892.feature (limited to 'synapse/handlers') diff --git a/changelog.d/11892.feature b/changelog.d/11892.feature new file mode 100644 index 0000000000..86e21a7f84 --- /dev/null +++ b/changelog.d/11892.feature @@ -0,0 +1 @@ +Use a dedicated configurable rate limiter for 3PID invites. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 689b207fc0..946cd281d2 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -857,6 +857,9 @@ log_config: "CONFDIR/SERVERNAME.log.config" # - one for ratelimiting how often a user or IP can attempt to validate a 3PID. # - two for ratelimiting how often invites can be sent in a room or to a # specific user. +# - one for ratelimiting 3PID invites (i.e. invites sent to a third-party ID +# such as an email address or a phone number) based on the account that's +# sending the invite. # # The defaults are as shown below. # @@ -906,6 +909,10 @@ log_config: "CONFDIR/SERVERNAME.log.config" # per_user: # per_second: 0.003 # burst_count: 5 +# +#rc_third_party_invite: +# per_second: 0.2 +# burst_count: 10 # Ratelimiting settings for incoming federation # diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index 36636ab07e..e9ccf1bd62 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -134,6 +134,14 @@ class RatelimitConfig(Config): defaults={"per_second": 0.003, "burst_count": 5}, ) + self.rc_third_party_invite = RateLimitConfig( + config.get("rc_third_party_invite", {}), + defaults={ + "per_second": self.rc_message.per_second, + "burst_count": self.rc_message.burst_count, + }, + ) + def generate_config_section(self, **kwargs): return """\ ## Ratelimiting ## @@ -168,6 +176,9 @@ class RatelimitConfig(Config): # - one for ratelimiting how often a user or IP can attempt to validate a 3PID. # - two for ratelimiting how often invites can be sent in a room or to a # specific user. + # - one for ratelimiting 3PID invites (i.e. invites sent to a third-party ID + # such as an email address or a phone number) based on the account that's + # sending the invite. # # The defaults are as shown below. # @@ -217,6 +228,10 @@ class RatelimitConfig(Config): # per_user: # per_second: 0.003 # burst_count: 5 + # + #rc_third_party_invite: + # per_second: 0.2 + # burst_count: 10 # Ratelimiting settings for incoming federation # diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 3dd5e1b6e4..efe6b4c9aa 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -116,6 +116,13 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): burst_count=hs.config.ratelimiting.rc_invites_per_user.burst_count, ) + self._third_party_invite_limiter = Ratelimiter( + store=self.store, + clock=self.clock, + rate_hz=hs.config.ratelimiting.rc_third_party_invite.per_second, + burst_count=hs.config.ratelimiting.rc_third_party_invite.burst_count, + ) + self.request_ratelimiter = hs.get_request_ratelimiter() @abc.abstractmethod @@ -1295,7 +1302,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # We need to rate limit *before* we send out any 3PID invites, so we # can't just rely on the standard ratelimiting of events. - await self.request_ratelimiter.ratelimit(requester) + await self._third_party_invite_limiter.ratelimit(requester) can_invite = await self.third_party_event_rules.check_threepid_can_be_invited( medium, address, room_id -- cgit 1.4.1 From 02632b3504ad4512c5f5a4f859b3fe326b19c788 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Fri, 4 Feb 2022 13:15:13 +0100 Subject: Stabilise MSC3231 (Token Based Registration) (#11867) --- changelog.d/11867.feature | 5 +++++ docs/modules/password_auth_provider_callbacks.md | 2 +- docs/upgrade.md | 15 +++++++++++++++ docs/workers.md | 2 +- synapse/api/constants.py | 2 +- synapse/handlers/ui_auth/__init__.py | 2 +- synapse/rest/client/register.py | 7 +++---- tests/rest/client/test_register.py | 2 +- 8 files changed, 28 insertions(+), 9 deletions(-) create mode 100644 changelog.d/11867.feature (limited to 'synapse/handlers') diff --git a/changelog.d/11867.feature b/changelog.d/11867.feature new file mode 100644 index 0000000000..dbd9de0e4c --- /dev/null +++ b/changelog.d/11867.feature @@ -0,0 +1,5 @@ +Stabilize [MSC3231](https://github.com/matrix-org/matrix-doc/pull/3231). + +Client implementations using `m.login.registration_token` should switch to the stable identifiers: +* `org.matrix.msc3231.login.registration_token` in query parameters and request/response bodies becomes `m.login.registration_token`. +* `/_matrix/client/unstable/org.matrix.msc3231/register/org.matrix.msc3231.login.registration_token/validity` becomes `/_matrix/client/v1/register/m.login.registration_token/validity`. \ No newline at end of file diff --git a/docs/modules/password_auth_provider_callbacks.md b/docs/modules/password_auth_provider_callbacks.md index ec8324d292..3697e3782e 100644 --- a/docs/modules/password_auth_provider_callbacks.md +++ b/docs/modules/password_auth_provider_callbacks.md @@ -148,7 +148,7 @@ Here's an example featuring all currently supported keys: "address": "33123456789", "validated_at": 1642701357084, }, - "org.matrix.msc3231.login.registration_token": "sometoken", # User has registered through the flow described in MSC3231 + "m.login.registration_token": "sometoken", # User has registered through a registration token } ``` diff --git a/docs/upgrade.md b/docs/upgrade.md index 75febb4adf..8ce37bcdee 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -84,6 +84,21 @@ process, for example: wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb ``` +# Upgrading to v1.(next) + +## Stablisation of MSC3231 + +The unstable validity-check endpoint for the +[Registration Tokens](https://spec.matrix.org/v1.2/client-server-api/#get_matrixclientv1registermloginregistration_tokenvalidity) +feature has been stabilised and moved from: + +`/_matrix/client/unstable/org.matrix.msc3231/register/org.matrix.msc3231.login.registration_token/validity` + +to: + +`/_matrix/client/v1/register/m.login.registration_token/validity` + +Please update any relevant reverse proxy or firewall configurations appropriately. # Upgrading to v1.53.0 diff --git a/docs/workers.md b/docs/workers.md index fd83e2ddeb..dadde4d726 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -241,7 +241,7 @@ expressions: # Registration/login requests ^/_matrix/client/(api/v1|r0|v3|unstable)/login$ ^/_matrix/client/(r0|v3|unstable)/register$ - ^/_matrix/client/unstable/org.matrix.msc3231/register/org.matrix.msc3231.login.registration_token/validity$ + ^/_matrix/client/v1/register/m.login.registration_token/validity$ # Event sending requests ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 52c083a20b..36ace7c613 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -81,7 +81,7 @@ class LoginType: TERMS: Final = "m.login.terms" SSO: Final = "m.login.sso" DUMMY: Final = "m.login.dummy" - REGISTRATION_TOKEN: Final = "org.matrix.msc3231.login.registration_token" + REGISTRATION_TOKEN: Final = "m.login.registration_token" # This is used in the `type` parameter for /register when called by diff --git a/synapse/handlers/ui_auth/__init__.py b/synapse/handlers/ui_auth/__init__.py index 13b0c61d2e..56eee4057f 100644 --- a/synapse/handlers/ui_auth/__init__.py +++ b/synapse/handlers/ui_auth/__init__.py @@ -38,4 +38,4 @@ class UIAuthSessionDataConstants: # used during registration to store the registration token used (if required) so that: # - we can prevent a token being used twice by one session # - we can 'use up' the token after registration has successfully completed - REGISTRATION_TOKEN = "org.matrix.msc3231.login.registration_token" + REGISTRATION_TOKEN = "m.login.registration_token" diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py index e3492f9f93..c283313e8d 100644 --- a/synapse/rest/client/register.py +++ b/synapse/rest/client/register.py @@ -368,7 +368,7 @@ class RegistrationTokenValidityRestServlet(RestServlet): Example: - GET /_matrix/client/unstable/org.matrix.msc3231/register/org.matrix.msc3231.login.registration_token/validity?token=abcd + GET /_matrix/client/v1/register/m.login.registration_token/validity?token=abcd 200 OK @@ -378,9 +378,8 @@ class RegistrationTokenValidityRestServlet(RestServlet): """ PATTERNS = client_patterns( - f"/org.matrix.msc3231/register/{LoginType.REGISTRATION_TOKEN}/validity", - releases=(), - unstable=True, + f"/register/{LoginType.REGISTRATION_TOKEN}/validity", + releases=("v1",), ) def __init__(self, hs: "HomeServer"): diff --git a/tests/rest/client/test_register.py b/tests/rest/client/test_register.py index 407dd32a73..0f1c47dcbb 100644 --- a/tests/rest/client/test_register.py +++ b/tests/rest/client/test_register.py @@ -1154,7 +1154,7 @@ class AccountValidityBackgroundJobTestCase(unittest.HomeserverTestCase): class RegistrationTokenValidityRestServletTestCase(unittest.HomeserverTestCase): servlets = [register.register_servlets] - url = "/_matrix/client/unstable/org.matrix.msc3231/register/org.matrix.msc3231.login.registration_token/validity" + url = "/_matrix/client/v1/register/m.login.registration_token/validity" def default_config(self): config = super().default_config() -- cgit 1.4.1 From e03dde259b741ae824a2569d23ba8bdbc336a54a Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Mon, 7 Feb 2022 13:25:09 +0000 Subject: Clean up an indirect reference to the homeserver datastore (#11914) --- changelog.d/11914.misc | 1 + synapse/handlers/typing.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 changelog.d/11914.misc (limited to 'synapse/handlers') diff --git a/changelog.d/11914.misc b/changelog.d/11914.misc new file mode 100644 index 0000000000..c288d43455 --- /dev/null +++ b/changelog.d/11914.misc @@ -0,0 +1 @@ +Various refactors to the typing notifications code. \ No newline at end of file diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index e43c22832d..e4bed1c937 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -446,7 +446,7 @@ class TypingWriterHandler(FollowerTypingHandler): class TypingNotificationEventSource(EventSource[int, JsonDict]): def __init__(self, hs: "HomeServer"): - self.hs = hs + self._main_store = hs.get_datastore() self.clock = hs.get_clock() # We can't call get_typing_handler here because there's a cycle: # @@ -487,7 +487,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]): continue if not await service.matches_user_in_member_list( - room_id, handler.store + room_id, self._main_store ): continue -- cgit 1.4.1 From cf06783d54b2b9090aef595a9094ddd857c1155b Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Mon, 7 Feb 2022 18:26:42 +0000 Subject: Remove optional state of `ApplicationService.is_interested`'s `store` parameter (#11911) --- changelog.d/11911.misc | 1 + synapse/appservice/__init__.py | 23 +++++----------------- synapse/handlers/appservice.py | 2 +- tests/appservice/test_appservice.py | 38 +++++++++++++++++++++++++++++++------ 4 files changed, 39 insertions(+), 25 deletions(-) create mode 100644 changelog.d/11911.misc (limited to 'synapse/handlers') diff --git a/changelog.d/11911.misc b/changelog.d/11911.misc new file mode 100644 index 0000000000..805588c2e9 --- /dev/null +++ b/changelog.d/11911.misc @@ -0,0 +1 @@ +Various refactors to the application service notifier code. \ No newline at end of file diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 7dbebd97b5..a340a8c9c7 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -165,23 +165,16 @@ class ApplicationService: return namespace.exclusive return False - async def _matches_user( - self, event: Optional[EventBase], store: Optional["DataStore"] = None - ) -> bool: - if not event: - return False - + async def _matches_user(self, event: EventBase, store: "DataStore") -> bool: if self.is_interested_in_user(event.sender): return True + # also check m.room.member state key if event.type == EventTypes.Member and self.is_interested_in_user( event.state_key ): return True - if not store: - return False - does_match = await self.matches_user_in_member_list(event.room_id, store) return does_match @@ -216,21 +209,15 @@ class ApplicationService: return self.is_interested_in_room(event.room_id) return False - async def _matches_aliases( - self, event: EventBase, store: Optional["DataStore"] = None - ) -> bool: - if not store or not event: - return False - + async def _matches_aliases(self, event: EventBase, store: "DataStore") -> bool: alias_list = await store.get_aliases_for_room(event.room_id) for alias in alias_list: if self.is_interested_in_alias(alias): return True + return False - async def is_interested( - self, event: EventBase, store: Optional["DataStore"] = None - ) -> bool: + async def is_interested(self, event: EventBase, store: "DataStore") -> bool: """Check if this service is interested in this event. Args: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 0fb919acf6..a42c3558e4 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -649,7 +649,7 @@ class ApplicationServicesHandler: """Retrieve a list of application services interested in this event. Args: - event: The event to check. Can be None if alias_list is not. + event: The event to check. Returns: A list of services interested in this event based on the service regex. """ diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py index 07d8105f41..9bd6275e92 100644 --- a/tests/appservice/test_appservice.py +++ b/tests/appservice/test_appservice.py @@ -40,13 +40,19 @@ class ApplicationServiceTestCase(unittest.TestCase): ) self.store = Mock() + self.store.get_aliases_for_room = simple_async_mock([]) + self.store.get_users_in_room = simple_async_mock([]) @defer.inlineCallbacks def test_regex_user_id_prefix_match(self): self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*")) self.event.sender = "@irc_foobar:matrix.org" self.assertTrue( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks @@ -54,7 +60,11 @@ class ApplicationServiceTestCase(unittest.TestCase): self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*")) self.event.sender = "@someone_else:matrix.org" self.assertFalse( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks @@ -64,7 +74,11 @@ class ApplicationServiceTestCase(unittest.TestCase): self.event.type = "m.room.member" self.event.state_key = "@irc_foobar:matrix.org" self.assertTrue( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks @@ -74,7 +88,11 @@ class ApplicationServiceTestCase(unittest.TestCase): ) self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org" self.assertTrue( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks @@ -84,7 +102,11 @@ class ApplicationServiceTestCase(unittest.TestCase): ) self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org" self.assertFalse( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks @@ -183,7 +205,11 @@ class ApplicationServiceTestCase(unittest.TestCase): self.event.content = {"membership": "invite"} self.event.state_key = self.service.sender self.assertTrue( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks -- cgit 1.4.1 From fef2e792beec9c126953b4f6b6d2d9f6e31ed96f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 7 Feb 2022 15:54:13 -0600 Subject: Fix historical messages backfilling in random order on remote homeservers (MSC2716) (#11114) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix https://github.com/matrix-org/synapse/issues/11091 Fix https://github.com/matrix-org/synapse/issues/10764 (side-stepping the issue because we no longer have to deal with `fake_prev_event_id`) 1. Made the `/backfill` response return messages in `(depth, stream_ordering)` order (previously only sorted by `depth`) - Technically, it shouldn't really matter how `/backfill` returns things but I'm just trying to make the `stream_ordering` a little more consistent from the origin to the remote homeservers in order to get the order of messages from `/messages` consistent ([sorted by `(topological_ordering, stream_ordering)`](https://github.com/matrix-org/synapse/blob/develop/docs/development/room-dag-concepts.md#depth-and-stream-ordering)). - Even now that we return backfilled messages in order, it still doesn't guarantee the same `stream_ordering` (and more importantly the [`/messages` order](https://github.com/matrix-org/synapse/blob/develop/docs/development/room-dag-concepts.md#depth-and-stream-ordering)) on the other server. For example, if a room has a bunch of history imported and someone visits a permalink to a historical message back in time, their homeserver will skip over the historical messages in between and insert the permalink as the next message in the `stream_order` and totally throw off the sort. - This will be even more the case when we add the [MSC3030 jump to date API endpoint](https://github.com/matrix-org/matrix-doc/pull/3030) so the static archives can navigate and jump to a certain date. - We're solving this in the future by switching to [online topological ordering](https://github.com/matrix-org/gomatrixserverlib/issues/187) and [chunking](https://github.com/matrix-org/synapse/issues/3785) which by its nature will apply retroactively to fix any inconsistencies introduced by people permalinking 2. As we're navigating `prev_events` to return in `/backfill`, we order by `depth` first (newest -> oldest) and now also tie-break based on the `stream_ordering` (newest -> oldest). This is technically important because MSC2716 inserts a bunch of historical messages at the same `depth` so it's best to be prescriptive about which ones we should process first. In reality, I think the code already looped over the historical messages as expected because the database is already in order. 3. Making the historical state chain and historical event chain float on their own by having no `prev_events` instead of a fake `prev_event` which caused backfill to get clogged with an unresolvable event. Fixes https://github.com/matrix-org/synapse/issues/11091 and https://github.com/matrix-org/synapse/issues/10764 4. We no longer find connected insertion events by finding a potential `prev_event` connection to the current event we're iterating over. We now solely rely on marker events which when processed, add the insertion event as an extremity and the federating homeserver can ask about it when time calls. - Related discussion, https://github.com/matrix-org/synapse/pull/11114#discussion_r741514793 Before | After --- | --- ![](https://user-images.githubusercontent.com/558581/139218681-b465c862-5c49-4702-a59e-466733b0cf45.png) | ![](https://user-images.githubusercontent.com/558581/146453159-a1609e0a-8324-439d-ae44-e4bce43ac6d1.png) #### Why aren't we sorting topologically when receiving backfill events? > The main reason we're going to opt to not sort topologically when receiving backfill events is because it's probably best to do whatever is easiest to make it just work. People will probably have opinions once they look at [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) which could change whatever implementation anyway. > > As mentioned, ideally we would do this but code necessary to make the fake edges but it gets confusing and gives an impression of “just whyyyy” (feels icky). This problem also dissolves with online topological ordering. > > -- https://github.com/matrix-org/synapse/pull/11114#discussion_r741517138 See https://github.com/matrix-org/synapse/pull/11114#discussion_r739610091 for the technical difficulties --- changelog.d/11114.bugfix | 1 + synapse/handlers/federation.py | 27 +- synapse/handlers/federation_event.py | 34 ++- synapse/handlers/message.py | 20 +- synapse/handlers/room_batch.py | 44 +-- synapse/handlers/room_member.py | 22 +- synapse/rest/client/room_batch.py | 17 +- synapse/storage/databases/main/event_federation.py | 313 ++++++++++++++------- synapse/storage/databases/main/events.py | 13 +- 9 files changed, 342 insertions(+), 149 deletions(-) create mode 100644 changelog.d/11114.bugfix (limited to 'synapse/handlers') diff --git a/changelog.d/11114.bugfix b/changelog.d/11114.bugfix new file mode 100644 index 0000000000..c6e65df97f --- /dev/null +++ b/changelog.d/11114.bugfix @@ -0,0 +1 @@ +Fix [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical messages backfilling in random order on remote homeservers. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a37ae0ca09..c0f642005f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -166,9 +166,14 @@ class FederationHandler: oldest_events_with_depth = ( await self.store.get_oldest_event_ids_with_depth_in_room(room_id) ) - insertion_events_to_be_backfilled = ( - await self.store.get_insertion_event_backwards_extremities_in_room(room_id) - ) + + insertion_events_to_be_backfilled: Dict[str, int] = {} + if self.hs.config.experimental.msc2716_enabled: + insertion_events_to_be_backfilled = ( + await self.store.get_insertion_event_backward_extremities_in_room( + room_id + ) + ) logger.debug( "_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s", oldest_events_with_depth, @@ -271,11 +276,12 @@ class FederationHandler: ] logger.debug( - "room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems: %s filtered_sorted_extremeties_tuple: %s", + "room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems (%d): %s filtered_sorted_extremeties_tuple: %s", room_id, current_depth, limit, max_depth, + len(sorted_extremeties_tuple), sorted_extremeties_tuple, filtered_sorted_extremeties_tuple, ) @@ -1047,6 +1053,19 @@ class FederationHandler: limit = min(limit, 100) events = await self.store.get_backfill_events(room_id, pdu_list, limit) + logger.debug( + "on_backfill_request: backfill events=%s", + [ + "event_id=%s,depth=%d,body=%s,prevs=%s\n" + % ( + event.event_id, + event.depth, + event.content.get("body", event.type), + event.prev_event_ids(), + ) + for event in events + ], + ) events = await filter_events_for_server(self.storage, origin, events) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 3905f60b3a..9edc7369d6 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -508,7 +508,11 @@ class FederationEventHandler: f"room {ev.room_id}, when we were backfilling in {room_id}" ) - await self._process_pulled_events(dest, events, backfilled=True) + await self._process_pulled_events( + dest, + events, + backfilled=True, + ) async def _get_missing_events_for_pdu( self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int @@ -626,11 +630,24 @@ class FederationEventHandler: backfilled: True if this is part of a historical batch of events (inhibits notification to clients, and validation of device keys.) """ + logger.debug( + "processing pulled backfilled=%s events=%s", + backfilled, + [ + "event_id=%s,depth=%d,body=%s,prevs=%s\n" + % ( + event.event_id, + event.depth, + event.content.get("body", event.type), + event.prev_event_ids(), + ) + for event in events + ], + ) # We want to sort these by depth so we process them and # tell clients about them in order. sorted_events = sorted(events, key=lambda x: x.depth) - for ev in sorted_events: with nested_logging_context(ev.event_id): await self._process_pulled_event(origin, ev, backfilled=backfilled) @@ -992,6 +1009,8 @@ class FederationEventHandler: await self._run_push_actions_and_persist_event(event, context, backfilled) + await self._handle_marker_event(origin, event) + if backfilled or context.rejected: return @@ -1071,8 +1090,6 @@ class FederationEventHandler: event.sender, ) - await self._handle_marker_event(origin, event) - async def _resync_device(self, sender: str) -> None: """We have detected that the device list for the given user may be out of sync, so we try and resync them. @@ -1323,7 +1340,14 @@ class FederationEventHandler: return event, context events_to_persist = (x for x in (prep(event) for event in fetched_events) if x) - await self.persist_events_and_notify(room_id, tuple(events_to_persist)) + await self.persist_events_and_notify( + room_id, + tuple(events_to_persist), + # Mark these events backfilled as they're historic events that will + # eventually be backfilled. For example, missing events we fetch + # during backfill should be marked as backfilled as well. + backfilled=True, + ) async def _check_event_auth( self, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index b37250aa38..9267e586a8 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -490,12 +490,12 @@ class EventCreationHandler: requester: Requester, event_dict: dict, txn_id: Optional[str] = None, + allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, require_consent: bool = True, outlier: bool = False, historical: bool = False, - allow_no_prev_events: bool = False, depth: Optional[int] = None, ) -> Tuple[EventBase, EventContext]: """ @@ -510,6 +510,10 @@ class EventCreationHandler: requester event_dict: An entire event txn_id + allow_no_prev_events: Whether to allow this event to be created an empty + list of prev_events. Normally this is prohibited just because most + events should have a prev_event and we should only use this in special + cases like MSC2716. prev_event_ids: the forward extremities to use as the prev_events for the new event. @@ -604,10 +608,10 @@ class EventCreationHandler: event, context = await self.create_new_client_event( builder=builder, requester=requester, + allow_no_prev_events=allow_no_prev_events, prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids, depth=depth, - allow_no_prev_events=allow_no_prev_events, ) # In an ideal world we wouldn't need the second part of this condition. However, @@ -764,6 +768,7 @@ class EventCreationHandler: self, requester: Requester, event_dict: dict, + allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, ratelimit: bool = True, @@ -781,6 +786,10 @@ class EventCreationHandler: Args: requester: The requester sending the event. event_dict: An entire event. + allow_no_prev_events: Whether to allow this event to be created an empty + list of prev_events. Normally this is prohibited just because most + events should have a prev_event and we should only use this in special + cases like MSC2716. prev_event_ids: The event IDs to use as the prev events. Should normally be left as None to automatically request them @@ -880,16 +889,20 @@ class EventCreationHandler: self, builder: EventBuilder, requester: Optional[Requester] = None, + allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, depth: Optional[int] = None, - allow_no_prev_events: bool = False, ) -> Tuple[EventBase, EventContext]: """Create a new event for a local client Args: builder: requester: + allow_no_prev_events: Whether to allow this event to be created an empty + list of prev_events. Normally this is prohibited just because most + events should have a prev_event and we should only use this in special + cases like MSC2716. prev_event_ids: the forward extremities to use as the prev_events for the new event. @@ -908,7 +921,6 @@ class EventCreationHandler: Returns: Tuple of created event, context """ - # Strip down the auth_event_ids to only what we need to auth the event. # For example, we don't need extra m.room.member that don't match event.sender full_state_ids_at_event = None diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py index f880aa93d2..f8137ec04c 100644 --- a/synapse/handlers/room_batch.py +++ b/synapse/handlers/room_batch.py @@ -13,10 +13,6 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -def generate_fake_event_id() -> str: - return "$fake_" + random_string(43) - - class RoomBatchHandler: def __init__(self, hs: "HomeServer"): self.hs = hs @@ -182,11 +178,12 @@ class RoomBatchHandler: state_event_ids_at_start = [] auth_event_ids = initial_auth_event_ids.copy() - # Make the state events float off on their own so we don't have a - # bunch of `@mxid joined the room` noise between each batch - prev_event_id_for_state_chain = generate_fake_event_id() + # Make the state events float off on their own by specifying no + # prev_events for the first one in the chain so we don't have a bunch of + # `@mxid joined the room` noise between each batch. + prev_event_ids_for_state_chain: List[str] = [] - for state_event in state_events_at_start: + for index, state_event in enumerate(state_events_at_start): assert_params_in_dict( state_event, ["type", "origin_server_ts", "content", "sender"] ) @@ -222,7 +219,10 @@ class RoomBatchHandler: content=event_dict["content"], outlier=True, historical=True, - prev_event_ids=[prev_event_id_for_state_chain], + # Only the first event in the chain should be floating. + # The rest should hang off each other in a chain. + allow_no_prev_events=index == 0, + prev_event_ids=prev_event_ids_for_state_chain, # Make sure to use a copy of this list because we modify it # later in the loop here. Otherwise it will be the same # reference and also update in the event when we append later. @@ -242,7 +242,10 @@ class RoomBatchHandler: event_dict, outlier=True, historical=True, - prev_event_ids=[prev_event_id_for_state_chain], + # Only the first event in the chain should be floating. + # The rest should hang off each other in a chain. + allow_no_prev_events=index == 0, + prev_event_ids=prev_event_ids_for_state_chain, # Make sure to use a copy of this list because we modify it # later in the loop here. Otherwise it will be the same # reference and also update in the event when we append later. @@ -253,7 +256,7 @@ class RoomBatchHandler: state_event_ids_at_start.append(event_id) auth_event_ids.append(event_id) # Connect all the state in a floating chain - prev_event_id_for_state_chain = event_id + prev_event_ids_for_state_chain = [event_id] return state_event_ids_at_start @@ -261,7 +264,6 @@ class RoomBatchHandler: self, events_to_create: List[JsonDict], room_id: str, - initial_prev_event_ids: List[str], inherited_depth: int, auth_event_ids: List[str], app_service_requester: Requester, @@ -277,9 +279,6 @@ class RoomBatchHandler: events_to_create: List of historical events to create in JSON dictionary format. room_id: Room where you want the events persisted in. - initial_prev_event_ids: These will be the prev_events for the first - event created. Each event created afterwards will point to the - previous event created. inherited_depth: The depth to create the events at (you will probably by calling inherit_depth_from_prev_ids(...)). auth_event_ids: Define which events allow you to create the given @@ -291,11 +290,14 @@ class RoomBatchHandler: """ assert app_service_requester.app_service - prev_event_ids = initial_prev_event_ids.copy() + # Make the historical event chain float off on its own by specifying no + # prev_events for the first event in the chain which causes the HS to + # ask for the state at the start of the batch later. + prev_event_ids: List[str] = [] event_ids = [] events_to_persist = [] - for ev in events_to_create: + for index, ev in enumerate(events_to_create): assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"]) assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % ( @@ -319,6 +321,9 @@ class RoomBatchHandler: ev["sender"], app_service_requester.app_service ), event_dict, + # Only the first event in the chain should be floating. + # The rest should hang off each other in a chain. + allow_no_prev_events=index == 0, prev_event_ids=event_dict.get("prev_events"), auth_event_ids=auth_event_ids, historical=True, @@ -370,7 +375,6 @@ class RoomBatchHandler: events_to_create: List[JsonDict], room_id: str, batch_id_to_connect_to: str, - initial_prev_event_ids: List[str], inherited_depth: int, auth_event_ids: List[str], app_service_requester: Requester, @@ -385,9 +389,6 @@ class RoomBatchHandler: room_id: Room where you want the events created in. batch_id_to_connect_to: The batch_id from the insertion event you want this batch to connect to. - initial_prev_event_ids: These will be the prev_events for the first - event created. Each event created afterwards will point to the - previous event created. inherited_depth: The depth to create the events at (you will probably by calling inherit_depth_from_prev_ids(...)). auth_event_ids: Define which events allow you to create the given @@ -436,7 +437,6 @@ class RoomBatchHandler: event_ids = await self.persist_historical_events( events_to_create=events_to_create, room_id=room_id, - initial_prev_event_ids=initial_prev_event_ids, inherited_depth=inherited_depth, auth_event_ids=auth_event_ids, app_service_requester=app_service_requester, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index efe6b4c9aa..bf1a47efb0 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -268,7 +268,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): target: UserID, room_id: str, membership: str, - prev_event_ids: List[str], + allow_no_prev_events: bool = False, + prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, txn_id: Optional[str] = None, ratelimit: bool = True, @@ -286,8 +287,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): target: room_id: membership: - prev_event_ids: The event IDs to use as the prev events + allow_no_prev_events: Whether to allow this event to be created an empty + list of prev_events. Normally this is prohibited just because most + events should have a prev_event and we should only use this in special + cases like MSC2716. + prev_event_ids: The event IDs to use as the prev events auth_event_ids: The event ids to use as the auth_events for the new event. Should normally be left as None, which will cause them to be calculated @@ -344,6 +349,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): "membership": membership, }, txn_id=txn_id, + allow_no_prev_events=allow_no_prev_events, prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids, require_consent=require_consent, @@ -446,6 +452,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): require_consent: bool = True, outlier: bool = False, historical: bool = False, + allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, ) -> Tuple[str, int]: @@ -470,6 +477,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): historical: Indicates whether the message is being inserted back in time around some existing events. This is used to skip a few checks and mark the event as backfilled. + allow_no_prev_events: Whether to allow this event to be created an empty + list of prev_events. Normally this is prohibited just because most + events should have a prev_event and we should only use this in special + cases like MSC2716. prev_event_ids: The event IDs to use as the prev events auth_event_ids: The event ids to use as the auth_events for the new event. @@ -504,6 +515,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): require_consent=require_consent, outlier=outlier, historical=historical, + allow_no_prev_events=allow_no_prev_events, prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids, ) @@ -525,6 +537,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): require_consent: bool = True, outlier: bool = False, historical: bool = False, + allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, ) -> Tuple[str, int]: @@ -551,6 +564,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): historical: Indicates whether the message is being inserted back in time around some existing events. This is used to skip a few checks and mark the event as backfilled. + allow_no_prev_events: Whether to allow this event to be created an empty + list of prev_events. Normally this is prohibited just because most + events should have a prev_event and we should only use this in special + cases like MSC2716. prev_event_ids: The event IDs to use as the prev events auth_event_ids: The event ids to use as the auth_events for the new event. @@ -680,6 +697,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): membership=effective_membership_state, txn_id=txn_id, ratelimit=ratelimit, + allow_no_prev_events=allow_no_prev_events, prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids, content=content, diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index e4c9451ae0..4b6be38327 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -131,6 +131,14 @@ class RoomBatchSendEventRestServlet(RestServlet): prev_event_ids_from_query ) + if not auth_event_ids: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "No auth events found for given prev_event query parameter. The prev_event=%s probably does not exist." + % prev_event_ids_from_query, + errcode=Codes.INVALID_PARAM, + ) + state_event_ids_at_start = [] # Create and persist all of the state events that float off on their own # before the batch. These will most likely be all of the invite/member @@ -197,21 +205,12 @@ class RoomBatchSendEventRestServlet(RestServlet): EventContentFields.MSC2716_NEXT_BATCH_ID ] - # Also connect the historical event chain to the end of the floating - # state chain, which causes the HS to ask for the state at the start of - # the batch later. If there is no state chain to connect to, just make - # the insertion event float itself. - prev_event_ids = [] - if len(state_event_ids_at_start): - prev_event_ids = [state_event_ids_at_start[-1]] - # Create and persist all of the historical events as well as insertion # and batch meta events to make the batch navigable in the DAG. event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events( events_to_create=events_to_create, room_id=room_id, batch_id_to_connect_to=batch_id_to_connect_to, - initial_prev_event_ids=prev_event_ids, inherited_depth=inherited_depth, auth_event_ids=auth_event_ids, app_service_requester=requester, diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index ca71f073fc..22f6474127 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -16,9 +16,10 @@ import logging from queue import Empty, PriorityQueue from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple +import attr from prometheus_client import Counter, Gauge -from synapse.api.constants import MAX_DEPTH +from synapse.api.constants import MAX_DEPTH, EventTypes from synapse.api.errors import StoreError from synapse.api.room_versions import EventFormatVersions, RoomVersion from synapse.events import EventBase, make_event_from_dict @@ -60,6 +61,15 @@ pdus_pruned_from_federation_queue = Counter( logger = logging.getLogger(__name__) +# All the info we need while iterating the DAG while backfilling +@attr.s(frozen=True, slots=True, auto_attribs=True) +class BackfillQueueNavigationItem: + depth: int + stream_ordering: int + event_id: str + type: str + + class _NoChainCoverIndex(Exception): def __init__(self, room_id: str): super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,)) @@ -74,6 +84,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas ): super().__init__(database, db_conn, hs) + self.hs = hs + if hs.config.worker.run_background_tasks: hs.get_clock().looping_call( self._delete_old_forward_extrem_cache, 60 * 60 * 1000 @@ -737,7 +749,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas room_id, ) - async def get_insertion_event_backwards_extremities_in_room( + async def get_insertion_event_backward_extremities_in_room( self, room_id ) -> Dict[str, int]: """Get the insertion events we know about that we haven't backfilled yet. @@ -754,7 +766,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas Map from event_id to depth """ - def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id): + def get_insertion_event_backward_extremities_in_room_txn(txn, room_id): sql = """ SELECT b.event_id, MAX(e.depth) FROM insertion_events as i /* We only want insertion events that are also marked as backwards extremities */ @@ -770,8 +782,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas return dict(txn) return await self.db_pool.runInteraction( - "get_insertion_event_backwards_extremities_in_room", - get_insertion_event_backwards_extremities_in_room_txn, + "get_insertion_event_backward_extremities_in_room", + get_insertion_event_backward_extremities_in_room_txn, room_id, ) @@ -997,143 +1009,242 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas "get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn ) - async def get_backfill_events(self, room_id: str, event_list: list, limit: int): - """Get a list of Events for a given topic that occurred before (and - including) the events in event_list. Return a list of max size `limit` + def _get_connected_batch_event_backfill_results_txn( + self, txn: LoggingTransaction, insertion_event_id: str, limit: int + ) -> List[BackfillQueueNavigationItem]: + """ + Find any batch connections of a given insertion event. + A batch event points at a insertion event via: + batch_event.content[MSC2716_BATCH_ID] -> insertion_event.content[MSC2716_NEXT_BATCH_ID] Args: - room_id - event_list - limit + txn: The database transaction to use + insertion_event_id: The event ID to navigate from. We will find + batch events that point back at this insertion event. + limit: Max number of event ID's to query for and return + + Returns: + List of batch events that the backfill queue can process + """ + batch_connection_query = """ + SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i + /* Find the batch that connects to the given insertion event */ + INNER JOIN batch_events AS c + ON i.next_batch_id = c.batch_id + /* Get the depth of the batch start event from the events table */ + INNER JOIN events AS e USING (event_id) + /* Find an insertion event which matches the given event_id */ + WHERE i.event_id = ? + LIMIT ? """ - event_ids = await self.db_pool.runInteraction( - "get_backfill_events", - self._get_backfill_events, - room_id, - event_list, - limit, - ) - events = await self.get_events_as_list(event_ids) - return sorted(events, key=lambda e: -e.depth) - def _get_backfill_events(self, txn, room_id, event_list, limit): - logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit) + # Find any batch connections for the given insertion event + txn.execute( + batch_connection_query, + (insertion_event_id, limit), + ) + return [ + BackfillQueueNavigationItem( + depth=row[0], + stream_ordering=row[1], + event_id=row[2], + type=row[3], + ) + for row in txn + ] - event_results = set() + def _get_connected_prev_event_backfill_results_txn( + self, txn: LoggingTransaction, event_id: str, limit: int + ) -> List[BackfillQueueNavigationItem]: + """ + Find any events connected by prev_event the specified event_id. - # We want to make sure that we do a breadth-first, "depth" ordered - # search. + Args: + txn: The database transaction to use + event_id: The event ID to navigate from + limit: Max number of event ID's to query for and return + Returns: + List of prev events that the backfill queue can process + """ # Look for the prev_event_id connected to the given event_id - query = """ - SELECT depth, prev_event_id FROM event_edges - /* Get the depth of the prev_event_id from the events table */ + connected_prev_event_query = """ + SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges + /* Get the depth and stream_ordering of the prev_event_id from the events table */ INNER JOIN events ON prev_event_id = events.event_id - /* Find an event which matches the given event_id */ + /* Look for an edge which matches the given event_id */ WHERE event_edges.event_id = ? AND event_edges.is_state = ? + /* Because we can have many events at the same depth, + * we want to also tie-break and sort on stream_ordering */ + ORDER BY depth DESC, stream_ordering DESC LIMIT ? """ - # Look for the "insertion" events connected to the given event_id - connected_insertion_event_query = """ - SELECT e.depth, i.event_id FROM insertion_event_edges AS i - /* Get the depth of the insertion event from the events table */ - INNER JOIN events AS e USING (event_id) - /* Find an insertion event which points via prev_events to the given event_id */ - WHERE i.insertion_prev_event_id = ? - LIMIT ? + txn.execute( + connected_prev_event_query, + (event_id, False, limit), + ) + return [ + BackfillQueueNavigationItem( + depth=row[0], + stream_ordering=row[1], + event_id=row[2], + type=row[3], + ) + for row in txn + ] + + async def get_backfill_events( + self, room_id: str, seed_event_id_list: list, limit: int + ): + """Get a list of Events for a given topic that occurred before (and + including) the events in seed_event_id_list. Return a list of max size `limit` + + Args: + room_id + seed_event_id_list + limit """ + event_ids = await self.db_pool.runInteraction( + "get_backfill_events", + self._get_backfill_events, + room_id, + seed_event_id_list, + limit, + ) + events = await self.get_events_as_list(event_ids) + return sorted( + events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering) + ) - # Find any batch connections of a given insertion event - batch_connection_query = """ - SELECT e.depth, c.event_id FROM insertion_events AS i - /* Find the batch that connects to the given insertion event */ - INNER JOIN batch_events AS c - ON i.next_batch_id = c.batch_id - /* Get the depth of the batch start event from the events table */ - INNER JOIN events AS e USING (event_id) - /* Find an insertion event which matches the given event_id */ - WHERE i.event_id = ? - LIMIT ? + def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit): + """ + We want to make sure that we do a breadth-first, "depth" ordered search. + We also handle navigating historical branches of history connected by + insertion and batch events. """ + logger.debug( + "_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s", + room_id, + seed_event_id_list, + limit, + ) + + event_id_results = set() # In a PriorityQueue, the lowest valued entries are retrieved first. - # We're using depth as the priority in the queue. - # Depth is lowest at the oldest-in-time message and highest and - # newest-in-time message. We add events to the queue with a negative depth so that - # we process the newest-in-time messages first going backwards in time. + # We're using depth as the priority in the queue and tie-break based on + # stream_ordering. Depth is lowest at the oldest-in-time message and + # highest and newest-in-time message. We add events to the queue with a + # negative depth so that we process the newest-in-time messages first + # going backwards in time. stream_ordering follows the same pattern. queue = PriorityQueue() - for event_id in event_list: - depth = self.db_pool.simple_select_one_onecol_txn( + for seed_event_id in seed_event_id_list: + event_lookup_result = self.db_pool.simple_select_one_txn( txn, table="events", - keyvalues={"event_id": event_id, "room_id": room_id}, - retcol="depth", + keyvalues={"event_id": seed_event_id, "room_id": room_id}, + retcols=( + "type", + "depth", + "stream_ordering", + ), allow_none=True, ) - if depth: - queue.put((-depth, event_id)) + if event_lookup_result is not None: + logger.debug( + "_get_backfill_events(room_id=%s): seed_event_id=%s depth=%s stream_ordering=%s type=%s", + room_id, + seed_event_id, + event_lookup_result["depth"], + event_lookup_result["stream_ordering"], + event_lookup_result["type"], + ) - while not queue.empty() and len(event_results) < limit: + if event_lookup_result["depth"]: + queue.put( + ( + -event_lookup_result["depth"], + -event_lookup_result["stream_ordering"], + seed_event_id, + event_lookup_result["type"], + ) + ) + + while not queue.empty() and len(event_id_results) < limit: try: - _, event_id = queue.get_nowait() + _, _, event_id, event_type = queue.get_nowait() except Empty: break - if event_id in event_results: + if event_id in event_id_results: continue - event_results.add(event_id) + event_id_results.add(event_id) # Try and find any potential historical batches of message history. - # - # First we look for an insertion event connected to the current - # event (by prev_event). If we find any, we need to go and try to - # find any batch events connected to the insertion event (by - # batch_id). If we find any, we'll add them to the queue and - # navigate up the DAG like normal in the next iteration of the loop. - txn.execute( - connected_insertion_event_query, (event_id, limit - len(event_results)) - ) - connected_insertion_event_id_results = txn.fetchall() - logger.debug( - "_get_backfill_events: connected_insertion_event_query %s", - connected_insertion_event_id_results, - ) - for row in connected_insertion_event_id_results: - connected_insertion_event_depth = row[0] - connected_insertion_event = row[1] - queue.put((-connected_insertion_event_depth, connected_insertion_event)) + if self.hs.config.experimental.msc2716_enabled: + # We need to go and try to find any batch events connected + # to a given insertion event (by batch_id). If we find any, we'll + # add them to the queue and navigate up the DAG like normal in the + # next iteration of the loop. + if event_type == EventTypes.MSC2716_INSERTION: + # Find any batch connections for the given insertion event + connected_batch_event_backfill_results = ( + self._get_connected_batch_event_backfill_results_txn( + txn, event_id, limit - len(event_id_results) + ) + ) + logger.debug( + "_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s", + room_id, + connected_batch_event_backfill_results, + ) + for ( + connected_batch_event_backfill_item + ) in connected_batch_event_backfill_results: + if ( + connected_batch_event_backfill_item.event_id + not in event_id_results + ): + queue.put( + ( + -connected_batch_event_backfill_item.depth, + -connected_batch_event_backfill_item.stream_ordering, + connected_batch_event_backfill_item.event_id, + connected_batch_event_backfill_item.type, + ) + ) - # Find any batch connections for the given insertion event - txn.execute( - batch_connection_query, - (connected_insertion_event, limit - len(event_results)), - ) - batch_start_event_id_results = txn.fetchall() - logger.debug( - "_get_backfill_events: batch_start_event_id_results %s", - batch_start_event_id_results, + # Now we just look up the DAG by prev_events as normal + connected_prev_event_backfill_results = ( + self._get_connected_prev_event_backfill_results_txn( + txn, event_id, limit - len(event_id_results) ) - for row in batch_start_event_id_results: - if row[1] not in event_results: - queue.put((-row[0], row[1])) - - txn.execute(query, (event_id, False, limit - len(event_results))) - prev_event_id_results = txn.fetchall() + ) logger.debug( - "_get_backfill_events: prev_event_ids %s", prev_event_id_results + "_get_backfill_events(room_id=%s): connected_prev_event_backfill_results=%s", + room_id, + connected_prev_event_backfill_results, ) + for ( + connected_prev_event_backfill_item + ) in connected_prev_event_backfill_results: + if connected_prev_event_backfill_item.event_id not in event_id_results: + queue.put( + ( + -connected_prev_event_backfill_item.depth, + -connected_prev_event_backfill_item.stream_ordering, + connected_prev_event_backfill_item.event_id, + connected_prev_event_backfill_item.type, + ) + ) - for row in prev_event_id_results: - if row[1] not in event_results: - queue.put((-row[0], row[1])) - - return event_results + return event_id_results async def get_missing_events(self, room_id, earliest_events, latest_events, limit): ids = await self.db_pool.runInteraction( diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index b7554154ac..b804185c40 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2215,9 +2215,14 @@ class PersistEventsStore: " SELECT 1 FROM event_backward_extremities" " WHERE event_id = ? AND room_id = ?" " )" + # 1. Don't add an event as a extremity again if we already persisted it + # as a non-outlier. + # 2. Don't add an outlier as an extremity if it has no prev_events " AND NOT EXISTS (" - " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? " - " AND outlier = ?" + " SELECT 1 FROM events" + " LEFT JOIN event_edges edge" + " ON edge.event_id = events.event_id" + " WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = ? OR edge.event_id IS NULL)" " )" ) @@ -2243,6 +2248,10 @@ class PersistEventsStore: (ev.event_id, ev.room_id) for ev in events if not ev.internal_metadata.is_outlier() + # If we encountered an event with no prev_events, then we might + # as well remove it now because it won't ever have anything else + # to backfill from. + or len(ev.prev_event_ids()) == 0 ], ) -- cgit 1.4.1 From 0640f8ebaa34e10a69ad7481b738ae36fda1c103 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 8 Feb 2022 11:20:32 +0100 Subject: Add a callback to allow modules to deny 3PID (#11854) Part of the Tchap Synapse mainlining. This allows modules to implement extra logic to figure out whether a given 3PID can be added to the local homeserver. In the Tchap use case, this will allow a Synapse module to interface with the custom endpoint /internal_info. --- changelog.d/11854.feature | 1 + docs/modules/password_auth_provider_callbacks.md | 19 ++++++ synapse/handlers/auth.py | 44 ++++++++++++++ synapse/module_api/__init__.py | 3 + synapse/rest/client/account.py | 4 +- synapse/rest/client/register.py | 8 ++- synapse/util/threepids.py | 13 +++- tests/handlers/test_password_providers.py | 76 +++++++++++++++++++++++- 8 files changed, 161 insertions(+), 7 deletions(-) create mode 100644 changelog.d/11854.feature (limited to 'synapse/handlers') diff --git a/changelog.d/11854.feature b/changelog.d/11854.feature new file mode 100644 index 0000000000..975e95bc52 --- /dev/null +++ b/changelog.d/11854.feature @@ -0,0 +1 @@ +Add a callback to allow modules to allow or forbid a 3PID (email address, phone number) from being associated to a local account. diff --git a/docs/modules/password_auth_provider_callbacks.md b/docs/modules/password_auth_provider_callbacks.md index 3697e3782e..88b59bb09e 100644 --- a/docs/modules/password_auth_provider_callbacks.md +++ b/docs/modules/password_auth_provider_callbacks.md @@ -166,6 +166,25 @@ any of the subsequent implementations of this callback. If every callback return the username provided by the user is used, if any (otherwise one is automatically generated). +## `is_3pid_allowed` + +_First introduced in Synapse v1.53.0_ + +```python +async def is_3pid_allowed(self, medium: str, address: str, registration: bool) -> bool +``` + +Called when attempting to bind a third-party identifier (i.e. an email address or a phone +number). The module is given the medium of the third-party identifier (which is `email` if +the identifier is an email address, or `msisdn` if the identifier is a phone number) and +its address, as well as a boolean indicating whether the attempt to bind is happening as +part of registering a new user. The module must return a boolean indicating whether the +identifier can be allowed to be bound to an account on the local homeserver. + +If multiple modules implement this callback, they will be considered in order. If a +callback returns `True`, Synapse falls through to the next one. The value of the first +callback that does not return `True` will be used. If this happens, Synapse will not call +any of the subsequent implementations of this callback. ## Example diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index e32c93e234..6959d1aa7e 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -2064,6 +2064,7 @@ GET_USERNAME_FOR_REGISTRATION_CALLBACK = Callable[ [JsonDict, JsonDict], Awaitable[Optional[str]], ] +IS_3PID_ALLOWED_CALLBACK = Callable[[str, str, bool], Awaitable[bool]] class PasswordAuthProvider: @@ -2079,6 +2080,7 @@ class PasswordAuthProvider: self.get_username_for_registration_callbacks: List[ GET_USERNAME_FOR_REGISTRATION_CALLBACK ] = [] + self.is_3pid_allowed_callbacks: List[IS_3PID_ALLOWED_CALLBACK] = [] # Mapping from login type to login parameters self._supported_login_types: Dict[str, Iterable[str]] = {} @@ -2090,6 +2092,7 @@ class PasswordAuthProvider: self, check_3pid_auth: Optional[CHECK_3PID_AUTH_CALLBACK] = None, on_logged_out: Optional[ON_LOGGED_OUT_CALLBACK] = None, + is_3pid_allowed: Optional[IS_3PID_ALLOWED_CALLBACK] = None, auth_checkers: Optional[ Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK] ] = None, @@ -2145,6 +2148,9 @@ class PasswordAuthProvider: get_username_for_registration, ) + if is_3pid_allowed is not None: + self.is_3pid_allowed_callbacks.append(is_3pid_allowed) + def get_supported_login_types(self) -> Mapping[str, Iterable[str]]: """Get the login types supported by this password provider @@ -2343,3 +2349,41 @@ class PasswordAuthProvider: raise SynapseError(code=500, msg="Internal Server Error") return None + + async def is_3pid_allowed( + self, + medium: str, + address: str, + registration: bool, + ) -> bool: + """Check if the user can be allowed to bind a 3PID on this homeserver. + + Args: + medium: The medium of the 3PID. + address: The address of the 3PID. + registration: Whether the 3PID is being bound when registering a new user. + + Returns: + Whether the 3PID is allowed to be bound on this homeserver + """ + for callback in self.is_3pid_allowed_callbacks: + try: + res = await callback(medium, address, registration) + + if res is False: + return res + elif not isinstance(res, bool): + # mypy complains that this line is unreachable because it assumes the + # data returned by the module fits the expected type. We just want + # to make sure this is the case. + logger.warning( # type: ignore[unreachable] + "Ignoring non-string value returned by" + " is_3pid_allowed callback %s: %s", + callback, + res, + ) + except Exception as e: + logger.error("Module raised an exception in is_3pid_allowed: %s", e) + raise SynapseError(code=500, msg="Internal Server Error") + + return True diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 29fbc73c97..a91a7fa3ce 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -72,6 +72,7 @@ from synapse.handlers.auth import ( CHECK_3PID_AUTH_CALLBACK, CHECK_AUTH_CALLBACK, GET_USERNAME_FOR_REGISTRATION_CALLBACK, + IS_3PID_ALLOWED_CALLBACK, ON_LOGGED_OUT_CALLBACK, AuthHandler, ) @@ -312,6 +313,7 @@ class ModuleApi: auth_checkers: Optional[ Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK] ] = None, + is_3pid_allowed: Optional[IS_3PID_ALLOWED_CALLBACK] = None, get_username_for_registration: Optional[ GET_USERNAME_FOR_REGISTRATION_CALLBACK ] = None, @@ -323,6 +325,7 @@ class ModuleApi: return self._password_auth_provider.register_password_auth_provider_callbacks( check_3pid_auth=check_3pid_auth, on_logged_out=on_logged_out, + is_3pid_allowed=is_3pid_allowed, auth_checkers=auth_checkers, get_username_for_registration=get_username_for_registration, ) diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py index 6b272658fc..cfa2aee76d 100644 --- a/synapse/rest/client/account.py +++ b/synapse/rest/client/account.py @@ -385,7 +385,7 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): send_attempt = body["send_attempt"] next_link = body.get("next_link") # Optional param - if not check_3pid_allowed(self.hs, "email", email): + if not await check_3pid_allowed(self.hs, "email", email): raise SynapseError( 403, "Your email domain is not authorized on this server", @@ -468,7 +468,7 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet): msisdn = phone_number_to_msisdn(country, phone_number) - if not check_3pid_allowed(self.hs, "msisdn", msisdn): + if not await check_3pid_allowed(self.hs, "msisdn", msisdn): raise SynapseError( 403, "Account phone numbers are not authorized on this server", diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py index c283313e8d..c965e2bda2 100644 --- a/synapse/rest/client/register.py +++ b/synapse/rest/client/register.py @@ -112,7 +112,7 @@ class EmailRegisterRequestTokenRestServlet(RestServlet): send_attempt = body["send_attempt"] next_link = body.get("next_link") # Optional param - if not check_3pid_allowed(self.hs, "email", email): + if not await check_3pid_allowed(self.hs, "email", email, registration=True): raise SynapseError( 403, "Your email domain is not authorized to register on this server", @@ -192,7 +192,7 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet): msisdn = phone_number_to_msisdn(country, phone_number) - if not check_3pid_allowed(self.hs, "msisdn", msisdn): + if not await check_3pid_allowed(self.hs, "msisdn", msisdn, registration=True): raise SynapseError( 403, "Phone numbers are not authorized to register on this server", @@ -616,7 +616,9 @@ class RegisterRestServlet(RestServlet): medium = auth_result[login_type]["medium"] address = auth_result[login_type]["address"] - if not check_3pid_allowed(self.hs, medium, address): + if not await check_3pid_allowed( + self.hs, medium, address, registration=True + ): raise SynapseError( 403, "Third party identifiers (email/phone numbers)" diff --git a/synapse/util/threepids.py b/synapse/util/threepids.py index 389adf00f6..1e9c2faa64 100644 --- a/synapse/util/threepids.py +++ b/synapse/util/threepids.py @@ -32,7 +32,12 @@ logger = logging.getLogger(__name__) MAX_EMAIL_ADDRESS_LENGTH = 500 -def check_3pid_allowed(hs: "HomeServer", medium: str, address: str) -> bool: +async def check_3pid_allowed( + hs: "HomeServer", + medium: str, + address: str, + registration: bool = False, +) -> bool: """Checks whether a given format of 3PID is allowed to be used on this HS Args: @@ -40,9 +45,15 @@ def check_3pid_allowed(hs: "HomeServer", medium: str, address: str) -> bool: medium: 3pid medium - e.g. email, msisdn address: address within that medium (e.g. "wotan@matrix.org") msisdns need to first have been canonicalised + registration: whether we want to bind the 3PID as part of registering a new user. + Returns: bool: whether the 3PID medium/address is allowed to be added to this HS """ + if not await hs.get_password_auth_provider().is_3pid_allowed( + medium, address, registration + ): + return False if hs.config.registration.allowed_local_3pids: for constraint in hs.config.registration.allowed_local_3pids: diff --git a/tests/handlers/test_password_providers.py b/tests/handlers/test_password_providers.py index 94809cb8be..4740dd0a65 100644 --- a/tests/handlers/test_password_providers.py +++ b/tests/handlers/test_password_providers.py @@ -21,13 +21,15 @@ from twisted.internet import defer import synapse from synapse.api.constants import LoginType +from synapse.api.errors import Codes from synapse.handlers.auth import load_legacy_password_auth_providers from synapse.module_api import ModuleApi -from synapse.rest.client import devices, login, logout, register +from synapse.rest.client import account, devices, login, logout, register from synapse.types import JsonDict, UserID from tests import unittest from tests.server import FakeChannel +from tests.test_utils import make_awaitable from tests.unittest import override_config # (possibly experimental) login flows we expect to appear in the list after the normal @@ -158,6 +160,7 @@ class PasswordAuthProviderTests(unittest.HomeserverTestCase): devices.register_servlets, logout.register_servlets, register.register_servlets, + account.register_servlets, ] def setUp(self): @@ -803,6 +806,77 @@ class PasswordAuthProviderTests(unittest.HomeserverTestCase): # Check that the callback has been called. m.assert_called_once() + # Set some email configuration so the test doesn't fail because of its absence. + @override_config({"email": {"notif_from": "noreply@test"}}) + def test_3pid_allowed(self): + """Tests that an is_3pid_allowed_callbacks forbidding a 3PID makes Synapse refuse + to bind the new 3PID, and that one allowing a 3PID makes Synapse accept to bind + the 3PID. Also checks that the module is passed a boolean indicating whether the + user to bind this 3PID to is currently registering. + """ + self._test_3pid_allowed("rin", False) + self._test_3pid_allowed("kitay", True) + + def _test_3pid_allowed(self, username: str, registration: bool): + """Tests that the "is_3pid_allowed" module callback is called correctly, using + either /register or /account URLs depending on the arguments. + + Args: + username: The username to use for the test. + registration: Whether to test with registration URLs. + """ + self.hs.get_identity_handler().send_threepid_validation = Mock( + return_value=make_awaitable(0), + ) + + m = Mock(return_value=make_awaitable(False)) + self.hs.get_password_auth_provider().is_3pid_allowed_callbacks = [m] + + self.register_user(username, "password") + tok = self.login(username, "password") + + if registration: + url = "/register/email/requestToken" + else: + url = "/account/3pid/email/requestToken" + + channel = self.make_request( + "POST", + url, + { + "client_secret": "foo", + "email": "foo@test.com", + "send_attempt": 0, + }, + access_token=tok, + ) + self.assertEqual(channel.code, 403, channel.result) + self.assertEqual( + channel.json_body["errcode"], + Codes.THREEPID_DENIED, + channel.json_body, + ) + + m.assert_called_once_with("email", "foo@test.com", registration) + + m = Mock(return_value=make_awaitable(True)) + self.hs.get_password_auth_provider().is_3pid_allowed_callbacks = [m] + + channel = self.make_request( + "POST", + url, + { + "client_secret": "foo", + "email": "bar@test.com", + "send_attempt": 0, + }, + access_token=tok, + ) + self.assertEqual(channel.code, 200, channel.result) + self.assertIn("sid", channel.json_body) + + m.assert_called_once_with("email", "bar@test.com", registration) + def _setup_get_username_for_registration(self) -> Mock: """Registers a get_username_for_registration callback that appends "-foo" to the username the client is trying to register. -- cgit 1.4.1 From 6c0984e3f007de469af74d8b6a432c8704633b03 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 8 Feb 2022 09:15:59 -0500 Subject: Remove unnecessary ignores due to Twisted upgrade. (#11939) Twisted 22.1.0 fixed some internal type hints, allowing Synapse to remove ignore calls for parameters to connectTCP. --- changelog.d/11939.misc | 1 + synapse/handlers/send_email.py | 2 +- synapse/replication/tcp/handler.py | 4 ++-- synapse/replication/tcp/redis.py | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) create mode 100644 changelog.d/11939.misc (limited to 'synapse/handlers') diff --git a/changelog.d/11939.misc b/changelog.d/11939.misc new file mode 100644 index 0000000000..317526f9ef --- /dev/null +++ b/changelog.d/11939.misc @@ -0,0 +1 @@ +Remove an unnecessary ignoring of type hints due to fixes in upstream packages. diff --git a/synapse/handlers/send_email.py b/synapse/handlers/send_email.py index 1a062a784c..a305a66860 100644 --- a/synapse/handlers/send_email.py +++ b/synapse/handlers/send_email.py @@ -106,7 +106,7 @@ async def _sendmail( factory = build_sender_factory(hostname=smtphost if enable_tls else None) reactor.connectTCP( - smtphost, # type: ignore[arg-type] + smtphost, smtpport, factory, timeout=30, diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 21293038ef..f7e6bc1e62 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -318,7 +318,7 @@ class ReplicationCommandHandler: hs, outbound_redis_connection ) hs.get_reactor().connectTCP( - hs.config.redis.redis_host, # type: ignore[arg-type] + hs.config.redis.redis_host, hs.config.redis.redis_port, self._factory, timeout=30, @@ -330,7 +330,7 @@ class ReplicationCommandHandler: host = hs.config.worker.worker_replication_host port = hs.config.worker.worker_replication_port hs.get_reactor().connectTCP( - host, # type: ignore[arg-type] + host, port, self._factory, timeout=30, diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 8d28bd3f3f..5b37f379d0 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -373,7 +373,7 @@ def lazyConnection( reactor = hs.get_reactor() reactor.connectTCP( - host, # type: ignore[arg-type] + host, port, factory, timeout=30, -- cgit 1.4.1 From 8c94b3abe93fe8c3e2ddd29fa350f54f69714151 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 8 Feb 2022 09:21:20 -0500 Subject: Experimental support to include bundled aggregations in search results (MSC3666) (#11837) --- changelog.d/11837.feature | 1 + synapse/config/experimental.py | 2 ++ synapse/handlers/search.py | 29 +++++++++++++++++---- synapse/storage/databases/main/relations.py | 13 ++++++++-- tests/rest/client/test_relations.py | 39 ++++++++++++++++++++++++++++- 5 files changed, 76 insertions(+), 8 deletions(-) create mode 100644 changelog.d/11837.feature (limited to 'synapse/handlers') diff --git a/changelog.d/11837.feature b/changelog.d/11837.feature new file mode 100644 index 0000000000..62ef707123 --- /dev/null +++ b/changelog.d/11837.feature @@ -0,0 +1 @@ +Experimental support for [MSC3666](https://github.com/matrix-org/matrix-doc/pull/3666): including bundled aggregations in server side search results. diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index e4719d19b8..f05a803a71 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -26,6 +26,8 @@ class ExperimentalConfig(Config): # MSC3440 (thread relation) self.msc3440_enabled: bool = experimental.get("msc3440_enabled", False) + # MSC3666: including bundled relations in /search. + self.msc3666_enabled: bool = experimental.get("msc3666_enabled", False) # MSC3026 (busy presence state) self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 02bb5ae72f..41cb809078 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -43,6 +43,8 @@ class SearchHandler: self.state_store = self.storage.state self.auth = hs.get_auth() + self._msc3666_enabled = hs.config.experimental.msc3666_enabled + async def get_old_rooms_from_upgraded_room(self, room_id: str) -> Iterable[str]: """Retrieves room IDs of old rooms in the history of an upgraded room. @@ -238,8 +240,6 @@ class SearchHandler: results = search_result["results"] - results_map = {r["event"].event_id: r for r in results} - rank_map.update({r["event"].event_id: r["rank"] for r in results}) filtered_events = await search_filter.filter([r["event"] for r in results]) @@ -420,12 +420,29 @@ class SearchHandler: time_now = self.clock.time_msec() + aggregations = None + if self._msc3666_enabled: + aggregations = await self.store.get_bundled_aggregations( + # Generate an iterable of EventBase for all the events that will be + # returned, including contextual events. + itertools.chain( + # The events_before and events_after for each context. + itertools.chain.from_iterable( + itertools.chain(context["events_before"], context["events_after"]) # type: ignore[arg-type] + for context in contexts.values() + ), + # The returned events. + allowed_events, + ), + user.to_string(), + ) + for context in contexts.values(): context["events_before"] = self._event_serializer.serialize_events( - context["events_before"], time_now # type: ignore[arg-type] + context["events_before"], time_now, bundle_aggregations=aggregations # type: ignore[arg-type] ) context["events_after"] = self._event_serializer.serialize_events( - context["events_after"], time_now # type: ignore[arg-type] + context["events_after"], time_now, bundle_aggregations=aggregations # type: ignore[arg-type] ) state_results = {} @@ -442,7 +459,9 @@ class SearchHandler: results.append( { "rank": rank_map[e.event_id], - "result": self._event_serializer.serialize_event(e, time_now), + "result": self._event_serializer.serialize_event( + e, time_now, bundle_aggregations=aggregations + ), "context": contexts.get(e.event_id, {}), } ) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 6180b17296..7718acbf1c 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -715,6 +715,9 @@ class RelationsWorkerStore(SQLBaseStore): A map of event ID to the bundled aggregation for the event. Not all events may have bundled aggregations in the results. """ + # The already processed event IDs. Tracked separately from the result + # since the result omits events which do not have bundled aggregations. + seen_event_ids = set() # State events and redacted events do not get bundled aggregations. events = [ @@ -728,13 +731,19 @@ class RelationsWorkerStore(SQLBaseStore): # Fetch other relations per event. for event in events: + # De-duplicate events by ID to handle the same event requested multiple + # times. The caches that _get_bundled_aggregation_for_event use should + # capture this, but best to reduce work. + if event.event_id in seen_event_ids: + continue + seen_event_ids.add(event.event_id) + event_result = await self._get_bundled_aggregation_for_event(event, user_id) if event_result: results[event.event_id] = event_result # Fetch any edits. - event_ids = [event.event_id for event in events] - edits = await self._get_applicable_edits(event_ids) + edits = await self._get_applicable_edits(seen_event_ids) for event_id, edit in edits.items(): results.setdefault(event_id, BundledAggregations()).replace = edit diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 96ae7790bb..06721e67c9 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -453,7 +453,9 @@ class RelationsTestCase(unittest.HomeserverTestCase): ) self.assertEquals(400, channel.code, channel.json_body) - @unittest.override_config({"experimental_features": {"msc3440_enabled": True}}) + @unittest.override_config( + {"experimental_features": {"msc3440_enabled": True, "msc3666_enabled": True}} + ) def test_bundled_aggregations(self): """ Test that annotations, references, and threads get correctly bundled. @@ -579,6 +581,23 @@ class RelationsTestCase(unittest.HomeserverTestCase): self.assertTrue(room_timeline["limited"]) assert_bundle(self._find_event_in_chunk(room_timeline["events"])) + # Request search. + channel = self.make_request( + "POST", + "/search", + # Search term matches the parent message. + content={"search_categories": {"room_events": {"search_term": "Hi"}}}, + access_token=self.user_token, + ) + self.assertEquals(200, channel.code, channel.json_body) + chunk = [ + result["result"] + for result in channel.json_body["search_categories"]["room_events"][ + "results" + ] + ] + assert_bundle(self._find_event_in_chunk(chunk)) + def test_aggregation_get_event_for_annotation(self): """Test that annotations do not get bundled aggregations included when directly requested. @@ -759,6 +778,7 @@ class RelationsTestCase(unittest.HomeserverTestCase): self.assertEquals(200, channel.code, channel.json_body) self.assertNotIn("m.relations", channel.json_body["unsigned"]) + @unittest.override_config({"experimental_features": {"msc3666_enabled": True}}) def test_edit(self): """Test that a simple edit works.""" @@ -825,6 +845,23 @@ class RelationsTestCase(unittest.HomeserverTestCase): self.assertTrue(room_timeline["limited"]) assert_bundle(self._find_event_in_chunk(room_timeline["events"])) + # Request search. + channel = self.make_request( + "POST", + "/search", + # Search term matches the parent message. + content={"search_categories": {"room_events": {"search_term": "Hi"}}}, + access_token=self.user_token, + ) + self.assertEquals(200, channel.code, channel.json_body) + chunk = [ + result["result"] + for result in channel.json_body["search_categories"]["room_events"][ + "results" + ] + ] + assert_bundle(self._find_event_in_chunk(chunk)) + def test_multi_edit(self): """Test that multiple edits, including attempts by people who shouldn't be allowed, are correctly handled. -- cgit 1.4.1 From a121507cfec0ffce45a89f5a1019034eda5b0c70 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 11 Feb 2022 07:20:16 -0500 Subject: Adds misc missing type hints (#11953) --- changelog.d/11953.misc | 1 + mypy.ini | 6 +++++ synapse/event_auth.py | 4 +++- synapse/handlers/oidc.py | 4 ++-- synapse/http/client.py | 11 ++++----- synapse/http/matrixfederationclient.py | 3 +-- synapse/notifier.py | 43 +++++++++++++++++++--------------- synapse/server.py | 8 +++---- tests/handlers/test_oidc.py | 9 ++----- 9 files changed, 48 insertions(+), 41 deletions(-) create mode 100644 changelog.d/11953.misc (limited to 'synapse/handlers') diff --git a/changelog.d/11953.misc b/changelog.d/11953.misc new file mode 100644 index 0000000000..d44571b731 --- /dev/null +++ b/changelog.d/11953.misc @@ -0,0 +1 @@ +Add missing type hints. diff --git a/mypy.ini b/mypy.ini index cd28ac0dd2..63848d664c 100644 --- a/mypy.ini +++ b/mypy.ini @@ -142,6 +142,9 @@ disallow_untyped_defs = True [mypy-synapse.crypto.*] disallow_untyped_defs = True +[mypy-synapse.event_auth] +disallow_untyped_defs = True + [mypy-synapse.events.*] disallow_untyped_defs = True @@ -166,6 +169,9 @@ disallow_untyped_defs = True [mypy-synapse.module_api.*] disallow_untyped_defs = True +[mypy-synapse.notifier] +disallow_untyped_defs = True + [mypy-synapse.push.*] disallow_untyped_defs = True diff --git a/synapse/event_auth.py b/synapse/event_auth.py index e885961698..19b55a9559 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -763,7 +763,9 @@ def get_named_level(auth_events: StateMap[EventBase], name: str, default: int) - return default -def _verify_third_party_invite(event: EventBase, auth_events: StateMap[EventBase]): +def _verify_third_party_invite( + event: EventBase, auth_events: StateMap[EventBase] +) -> bool: """ Validates that the invite event is authorized by a previous third-party invite. diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py index deb3539751..8f71d975e9 100644 --- a/synapse/handlers/oidc.py +++ b/synapse/handlers/oidc.py @@ -544,9 +544,9 @@ class OidcProvider: """ metadata = await self.load_metadata() token_endpoint = metadata.get("token_endpoint") - raw_headers = { + raw_headers: Dict[str, str] = { "Content-Type": "application/x-www-form-urlencoded", - "User-Agent": self._http_client.user_agent, + "User-Agent": self._http_client.user_agent.decode("ascii"), "Accept": "application/json", } diff --git a/synapse/http/client.py b/synapse/http/client.py index d617055617..c01d2326cf 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -322,21 +322,20 @@ class SimpleHttpClient: self._ip_whitelist = ip_whitelist self._ip_blacklist = ip_blacklist self._extra_treq_args = treq_args or {} - - self.user_agent = hs.version_string self.clock = hs.get_clock() + + user_agent = hs.version_string if hs.config.server.user_agent_suffix: - self.user_agent = "%s %s" % ( - self.user_agent, + user_agent = "%s %s" % ( + user_agent, hs.config.server.user_agent_suffix, ) + self.user_agent = user_agent.encode("ascii") # We use this for our body producers to ensure that they use the correct # reactor. self._cooperator = Cooperator(scheduler=_make_scheduler(hs.get_reactor())) - self.user_agent = self.user_agent.encode("ascii") - if self._ip_blacklist: # If we have an IP blacklist, we need to use a DNS resolver which # filters out blacklisted IP addresses, to prevent DNS rebinding. diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 2e668363b2..c5f8fcbb2a 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -334,12 +334,11 @@ class MatrixFederationHttpClient: user_agent = hs.version_string if hs.config.server.user_agent_suffix: user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix) - user_agent = user_agent.encode("ascii") federation_agent = MatrixFederationAgent( self.reactor, tls_client_options_factory, - user_agent, + user_agent.encode("ascii"), hs.config.server.federation_ip_range_whitelist, hs.config.server.federation_ip_range_blacklist, ) diff --git a/synapse/notifier.py b/synapse/notifier.py index 5988c67d90..e0fad2da66 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -14,6 +14,7 @@ import logging from typing import ( + TYPE_CHECKING, Awaitable, Callable, Collection, @@ -32,7 +33,6 @@ from prometheus_client import Counter from twisted.internet import defer -import synapse.server from synapse.api.constants import EventTypes, HistoryVisibility, Membership from synapse.api.errors import AuthError from synapse.events import EventBase @@ -53,6 +53,9 @@ from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.metrics import Measure from synapse.visibility import filter_events_for_client +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) notified_events_counter = Counter("synapse_notifier_notified_events", "") @@ -82,7 +85,7 @@ class _NotificationListener: __slots__ = ["deferred"] - def __init__(self, deferred): + def __init__(self, deferred: "defer.Deferred"): self.deferred = deferred @@ -124,7 +127,7 @@ class _NotifierUserStream: stream_key: str, stream_id: Union[int, RoomStreamToken], time_now_ms: int, - ): + ) -> None: """Notify any listeners for this user of a new event from an event source. Args: @@ -152,7 +155,7 @@ class _NotifierUserStream: self.notify_deferred = ObservableDeferred(defer.Deferred()) noify_deferred.callback(self.current_token) - def remove(self, notifier: "Notifier"): + def remove(self, notifier: "Notifier") -> None: """Remove this listener from all the indexes in the Notifier it knows about. """ @@ -188,7 +191,7 @@ class EventStreamResult: start_token: StreamToken end_token: StreamToken - def __bool__(self): + def __bool__(self) -> bool: return bool(self.events) @@ -212,7 +215,7 @@ class Notifier: UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 - def __init__(self, hs: "synapse.server.HomeServer"): + def __init__(self, hs: "HomeServer"): self.user_to_user_stream: Dict[str, _NotifierUserStream] = {} self.room_to_user_streams: Dict[str, Set[_NotifierUserStream]] = {} @@ -248,7 +251,7 @@ class Notifier: # This is not a very cheap test to perform, but it's only executed # when rendering the metrics page, which is likely once per minute at # most when scraping it. - def count_listeners(): + def count_listeners() -> int: all_user_streams: Set[_NotifierUserStream] = set() for streams in list(self.room_to_user_streams.values()): @@ -270,7 +273,7 @@ class Notifier: "synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream) ) - def add_replication_callback(self, cb: Callable[[], None]): + def add_replication_callback(self, cb: Callable[[], None]) -> None: """Add a callback that will be called when some new data is available. Callback is not given any arguments. It should *not* return a Deferred - if it needs to do any asynchronous work, a background thread should be started and @@ -284,7 +287,7 @@ class Notifier: event_pos: PersistedEventPosition, max_room_stream_token: RoomStreamToken, extra_users: Optional[Collection[UserID]] = None, - ): + ) -> None: """Unwraps event and calls `on_new_room_event_args`.""" await self.on_new_room_event_args( event_pos=event_pos, @@ -307,7 +310,7 @@ class Notifier: event_pos: PersistedEventPosition, max_room_stream_token: RoomStreamToken, extra_users: Optional[Collection[UserID]] = None, - ): + ) -> None: """Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -338,7 +341,9 @@ class Notifier: self.notify_replication() - def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken): + def _notify_pending_new_room_events( + self, max_room_stream_token: RoomStreamToken + ) -> None: """Notify for the room events that were queued waiting for a previous event to be persisted. Args: @@ -374,7 +379,7 @@ class Notifier: ) self._on_updated_room_token(max_room_stream_token) - def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken): + def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken) -> None: """Poke services that might care that the room position has been updated. """ @@ -386,13 +391,13 @@ class Notifier: if self.federation_sender: self.federation_sender.notify_new_events(max_room_stream_token) - def _notify_app_services(self, max_room_stream_token: RoomStreamToken): + def _notify_app_services(self, max_room_stream_token: RoomStreamToken) -> None: try: self.appservice_handler.notify_interested_services(max_room_stream_token) except Exception: logger.exception("Error notifying application services of event") - def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): + def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken) -> None: try: self._pusher_pool.on_new_notifications(max_room_stream_token) except Exception: @@ -475,8 +480,8 @@ class Notifier: user_id: str, timeout: int, callback: Callable[[StreamToken, StreamToken], Awaitable[T]], - room_ids=None, - from_token=StreamToken.START, + room_ids: Optional[Collection[str]] = None, + from_token: StreamToken = StreamToken.START, ) -> T: """Wait until the callback returns a non empty response or the timeout fires. @@ -700,14 +705,14 @@ class Notifier: for expired_stream in expired_streams: expired_stream.remove(self) - def _register_with_keys(self, user_stream: _NotifierUserStream): + def _register_with_keys(self, user_stream: _NotifierUserStream) -> None: self.user_to_user_stream[user_stream.user_id] = user_stream for room in user_stream.rooms: s = self.room_to_user_streams.setdefault(room, set()) s.add(user_stream) - def _user_joined_room(self, user_id: str, room_id: str): + def _user_joined_room(self, user_id: str, room_id: str) -> None: new_user_stream = self.user_to_user_stream.get(user_id) if new_user_stream is not None: room_streams = self.room_to_user_streams.setdefault(room_id, set()) @@ -719,7 +724,7 @@ class Notifier: for cb in self.replication_callbacks: cb() - def notify_remote_server_up(self, server: str): + def notify_remote_server_up(self, server: str) -> None: """Notify any replication that a remote server has come back up""" # We call federation_sender directly rather than registering as a # callback as a) we already have a reference to it and b) it introduces diff --git a/synapse/server.py b/synapse/server.py index 3032f0b738..564afdcb96 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -233,8 +233,8 @@ class HomeServer(metaclass=abc.ABCMeta): self, hostname: str, config: HomeServerConfig, - reactor=None, - version_string="Synapse", + reactor: Optional[ISynapseReactor] = None, + version_string: str = "Synapse", ): """ Args: @@ -244,7 +244,7 @@ class HomeServer(metaclass=abc.ABCMeta): if not reactor: from twisted.internet import reactor as _reactor - reactor = _reactor + reactor = cast(ISynapseReactor, _reactor) self._reactor = reactor self.hostname = hostname @@ -264,7 +264,7 @@ class HomeServer(metaclass=abc.ABCMeta): self._module_web_resources: Dict[str, Resource] = {} self._module_web_resources_consumed = False - def register_module_web_resource(self, path: str, resource: Resource): + def register_module_web_resource(self, path: str, resource: Resource) -> None: """Allows a module to register a web resource to be served at the given path. If multiple modules register a resource for the same path, the module that diff --git a/tests/handlers/test_oidc.py b/tests/handlers/test_oidc.py index cfe3de5266..a552d8182e 100644 --- a/tests/handlers/test_oidc.py +++ b/tests/handlers/test_oidc.py @@ -155,7 +155,7 @@ class OidcHandlerTestCase(HomeserverTestCase): def make_homeserver(self, reactor, clock): self.http_client = Mock(spec=["get_json"]) self.http_client.get_json.side_effect = get_json - self.http_client.user_agent = "Synapse Test" + self.http_client.user_agent = b"Synapse Test" hs = self.setup_test_homeserver(proxied_http_client=self.http_client) @@ -438,12 +438,9 @@ class OidcHandlerTestCase(HomeserverTestCase): state = "state" nonce = "nonce" client_redirect_url = "http://client/redirect" - user_agent = "Browser" ip_address = "10.0.0.1" session = self._generate_oidc_session_token(state, nonce, client_redirect_url) - request = _build_callback_request( - code, state, session, user_agent=user_agent, ip_address=ip_address - ) + request = _build_callback_request(code, state, session, ip_address=ip_address) self.get_success(self.handler.handle_oidc_callback(request)) @@ -1274,7 +1271,6 @@ def _build_callback_request( code: str, state: str, session: str, - user_agent: str = "Browser", ip_address: str = "10.0.0.1", ): """Builds a fake SynapseRequest to mock the browser callback @@ -1289,7 +1285,6 @@ def _build_callback_request( query param. Should be the same as was embedded in the session in _build_oidc_session. session: the "session" which would have been passed around in the cookie. - user_agent: the user-agent to present ip_address: the IP address to pretend the request came from """ request = Mock( -- cgit 1.4.1 From 0171fa5226a6aa808d9965dab20f22f9794810d9 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 11 Feb 2022 14:58:11 +0100 Subject: Remove deprecated user_may_create_room_with_invites callback (#11950) Co-authored-by: Patrick Cloke --- changelog.d/11950.removal | 1 + docs/upgrade.md | 29 ++++++---- synapse/events/spamcheck.py | 42 -------------- synapse/handlers/room.py | 5 -- synapse/module_api/__init__.py | 5 -- tests/rest/client/test_rooms.py | 119 +--------------------------------------- 6 files changed, 22 insertions(+), 179 deletions(-) create mode 100644 changelog.d/11950.removal (limited to 'synapse/handlers') diff --git a/changelog.d/11950.removal b/changelog.d/11950.removal new file mode 100644 index 0000000000..f75de40f2f --- /dev/null +++ b/changelog.d/11950.removal @@ -0,0 +1 @@ +Remove deprecated `user_may_create_room_with_invites` spam checker callback. See the [upgrade notes](https://matrix-org.github.io/synapse/latest/upgrade.html#removal-of-user_may_create_room_with_invites) for more information. diff --git a/docs/upgrade.md b/docs/upgrade.md index c5e0697333..6f20000295 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -84,7 +84,18 @@ process, for example: wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb ``` -# Upgrading to v1.(next) + +# Upgrading to v1.53.0 + +## Dropping support for `webclient` listeners and non-HTTP(S) `web_client_location` + +Per the deprecation notice in Synapse v1.51.0, listeners of type `webclient` +are no longer supported and configuring them is a now a configuration error. + +Configuring a non-HTTP(S) `web_client_location` configuration is is now a +configuration error. Since the `webclient` listener is no longer supported, this +setting only applies to the root path `/` of Synapse's web server and no longer +the `/_matrix/client/` path. ## Stablisation of MSC3231 @@ -119,17 +130,15 @@ The new `capabilities` are now active by default. -# Upgrading to v1.53.0 - -## Dropping support for `webclient` listeners and non-HTTP(S) `web_client_location` +## Removal of `user_may_create_room_with_invites` -Per the deprecation notice in Synapse v1.51.0, listeners of type `webclient` -are no longer supported and configuring them is a now a configuration error. +As announced with the release of [Synapse 1.47.0](#deprecation-of-the-user_may_create_room_with_invites-module-callback), +the deprecated `user_may_create_room_with_invites` module callback has been removed. -Configuring a non-HTTP(S) `web_client_location` configuration is is now a -configuration error. Since the `webclient` listener is no longer supported, this -setting only applies to the root path `/` of Synapse's web server and no longer -the `/_matrix/client/` path. +Modules relying on it can instead implement [`user_may_invite`](https://matrix-org.github.io/synapse/latest/modules/spam_checker_callbacks.html#user_may_invite) +and use the [`get_room_state`](https://github.com/matrix-org/synapse/blob/872f23b95fa980a61b0866c1475e84491991fa20/synapse/module_api/__init__.py#L869-L876) +module API to infer whether the invite is happening while creating a room (see [this function](https://github.com/matrix-org/synapse-domain-rule-checker/blob/e7d092dd9f2a7f844928771dbfd9fd24c2332e48/synapse_domain_rule_checker/__init__.py#L56-L89) +as an example). Alternately, modules can also implement [`on_create_room`](https://matrix-org.github.io/synapse/latest/modules/third_party_rules_callbacks.html#on_create_room). # Upgrading to v1.52.0 diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index 3134beb8d3..04afd48274 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -48,9 +48,6 @@ USER_MAY_JOIN_ROOM_CALLBACK = Callable[[str, str, bool], Awaitable[bool]] USER_MAY_INVITE_CALLBACK = Callable[[str, str, str], Awaitable[bool]] USER_MAY_SEND_3PID_INVITE_CALLBACK = Callable[[str, str, str, str], Awaitable[bool]] USER_MAY_CREATE_ROOM_CALLBACK = Callable[[str], Awaitable[bool]] -USER_MAY_CREATE_ROOM_WITH_INVITES_CALLBACK = Callable[ - [str, List[str], List[Dict[str, str]]], Awaitable[bool] -] USER_MAY_CREATE_ROOM_ALIAS_CALLBACK = Callable[[str, RoomAlias], Awaitable[bool]] USER_MAY_PUBLISH_ROOM_CALLBACK = Callable[[str, str], Awaitable[bool]] CHECK_USERNAME_FOR_SPAM_CALLBACK = Callable[[Dict[str, str]], Awaitable[bool]] @@ -174,9 +171,6 @@ class SpamChecker: USER_MAY_SEND_3PID_INVITE_CALLBACK ] = [] self._user_may_create_room_callbacks: List[USER_MAY_CREATE_ROOM_CALLBACK] = [] - self._user_may_create_room_with_invites_callbacks: List[ - USER_MAY_CREATE_ROOM_WITH_INVITES_CALLBACK - ] = [] self._user_may_create_room_alias_callbacks: List[ USER_MAY_CREATE_ROOM_ALIAS_CALLBACK ] = [] @@ -198,9 +192,6 @@ class SpamChecker: user_may_invite: Optional[USER_MAY_INVITE_CALLBACK] = None, user_may_send_3pid_invite: Optional[USER_MAY_SEND_3PID_INVITE_CALLBACK] = None, user_may_create_room: Optional[USER_MAY_CREATE_ROOM_CALLBACK] = None, - user_may_create_room_with_invites: Optional[ - USER_MAY_CREATE_ROOM_WITH_INVITES_CALLBACK - ] = None, user_may_create_room_alias: Optional[ USER_MAY_CREATE_ROOM_ALIAS_CALLBACK ] = None, @@ -229,11 +220,6 @@ class SpamChecker: if user_may_create_room is not None: self._user_may_create_room_callbacks.append(user_may_create_room) - if user_may_create_room_with_invites is not None: - self._user_may_create_room_with_invites_callbacks.append( - user_may_create_room_with_invites, - ) - if user_may_create_room_alias is not None: self._user_may_create_room_alias_callbacks.append( user_may_create_room_alias, @@ -359,34 +345,6 @@ class SpamChecker: return True - async def user_may_create_room_with_invites( - self, - userid: str, - invites: List[str], - threepid_invites: List[Dict[str, str]], - ) -> bool: - """Checks if a given user may create a room with invites - - If this method returns false, the creation request will be rejected. - - Args: - userid: The ID of the user attempting to create a room - invites: The IDs of the Matrix users to be invited if the room creation is - allowed. - threepid_invites: The threepids to be invited if the room creation is allowed, - as a dict including a "medium" key indicating the threepid's medium (e.g. - "email") and an "address" key indicating the threepid's address (e.g. - "alice@example.com") - - Returns: - True if the user may create the room, otherwise False - """ - for callback in self._user_may_create_room_with_invites_callbacks: - if await callback(userid, invites, threepid_invites) is False: - return False - - return True - async def user_may_create_room_alias( self, userid: str, room_alias: RoomAlias ) -> bool: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 1420d67729..a990727fc5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -694,11 +694,6 @@ class RoomCreationHandler: if not is_requester_admin and not ( await self.spam_checker.user_may_create_room(user_id) - and await self.spam_checker.user_may_create_room_with_invites( - user_id, - invite_list, - invite_3pid_list, - ) ): raise SynapseError( 403, "You are not permitted to create rooms", Codes.FORBIDDEN diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index a91a7fa3ce..c636779308 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -48,7 +48,6 @@ from synapse.events.spamcheck import ( CHECK_USERNAME_FOR_SPAM_CALLBACK, USER_MAY_CREATE_ROOM_ALIAS_CALLBACK, USER_MAY_CREATE_ROOM_CALLBACK, - USER_MAY_CREATE_ROOM_WITH_INVITES_CALLBACK, USER_MAY_INVITE_CALLBACK, USER_MAY_JOIN_ROOM_CALLBACK, USER_MAY_PUBLISH_ROOM_CALLBACK, @@ -217,9 +216,6 @@ class ModuleApi: user_may_invite: Optional[USER_MAY_INVITE_CALLBACK] = None, user_may_send_3pid_invite: Optional[USER_MAY_SEND_3PID_INVITE_CALLBACK] = None, user_may_create_room: Optional[USER_MAY_CREATE_ROOM_CALLBACK] = None, - user_may_create_room_with_invites: Optional[ - USER_MAY_CREATE_ROOM_WITH_INVITES_CALLBACK - ] = None, user_may_create_room_alias: Optional[ USER_MAY_CREATE_ROOM_ALIAS_CALLBACK ] = None, @@ -240,7 +236,6 @@ class ModuleApi: user_may_invite=user_may_invite, user_may_send_3pid_invite=user_may_send_3pid_invite, user_may_create_room=user_may_create_room, - user_may_create_room_with_invites=user_may_create_room_with_invites, user_may_create_room_alias=user_may_create_room_alias, user_may_publish_room=user_may_publish_room, check_username_for_spam=check_username_for_spam, diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index 10a4a4dc5e..b7f086927b 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -18,7 +18,7 @@ """Tests REST events for /rooms paths.""" import json -from typing import Dict, Iterable, List, Optional +from typing import Iterable, List from unittest.mock import Mock, call from urllib import parse as urlparse @@ -35,7 +35,7 @@ from synapse.api.errors import Codes, HttpResponseException from synapse.handlers.pagination import PurgeStatus from synapse.rest import admin from synapse.rest.client import account, directory, login, profile, room, sync -from synapse.types import JsonDict, Requester, RoomAlias, UserID, create_requester +from synapse.types import JsonDict, RoomAlias, UserID, create_requester from synapse.util.stringutils import random_string from tests import unittest @@ -674,121 +674,6 @@ class RoomsCreateTestCase(RoomBase): channel = self.make_request("POST", "/createRoom", content) self.assertEqual(200, channel.code) - def test_spamchecker_invites(self): - """Tests the user_may_create_room_with_invites spam checker callback.""" - - # Mock do_3pid_invite, so we don't fail from failing to send a 3PID invite to an - # IS. - async def do_3pid_invite( - room_id: str, - inviter: UserID, - medium: str, - address: str, - id_server: str, - requester: Requester, - txn_id: Optional[str], - id_access_token: Optional[str] = None, - ) -> int: - return 0 - - do_3pid_invite_mock = Mock(side_effect=do_3pid_invite) - self.hs.get_room_member_handler().do_3pid_invite = do_3pid_invite_mock - - # Add a mock callback for user_may_create_room_with_invites. Make it allow any - # room creation request for now. - return_value = True - - async def user_may_create_room_with_invites( - user: str, - invites: List[str], - threepid_invites: List[Dict[str, str]], - ) -> bool: - return return_value - - callback_mock = Mock(side_effect=user_may_create_room_with_invites) - self.hs.get_spam_checker()._user_may_create_room_with_invites_callbacks.append( - callback_mock, - ) - - # The MXIDs we'll try to invite. - invited_mxids = [ - "@alice1:red", - "@alice2:red", - "@alice3:red", - "@alice4:red", - ] - - # The 3PIDs we'll try to invite. - invited_3pids = [ - { - "id_server": "example.com", - "id_access_token": "sometoken", - "medium": "email", - "address": "alice1@example.com", - }, - { - "id_server": "example.com", - "id_access_token": "sometoken", - "medium": "email", - "address": "alice2@example.com", - }, - { - "id_server": "example.com", - "id_access_token": "sometoken", - "medium": "email", - "address": "alice3@example.com", - }, - ] - - # Create a room and invite the Matrix users, and check that it succeeded. - channel = self.make_request( - "POST", - "/createRoom", - json.dumps({"invite": invited_mxids}).encode("utf8"), - ) - self.assertEqual(200, channel.code) - - # Check that the callback was called with the right arguments. - expected_call_args = ((self.user_id, invited_mxids, []),) - self.assertEquals( - callback_mock.call_args, - expected_call_args, - callback_mock.call_args, - ) - - # Create a room and invite the 3PIDs, and check that it succeeded. - channel = self.make_request( - "POST", - "/createRoom", - json.dumps({"invite_3pid": invited_3pids}).encode("utf8"), - ) - self.assertEqual(200, channel.code) - - # Check that do_3pid_invite was called the right amount of time - self.assertEquals(do_3pid_invite_mock.call_count, len(invited_3pids)) - - # Check that the callback was called with the right arguments. - expected_call_args = ((self.user_id, [], invited_3pids),) - self.assertEquals( - callback_mock.call_args, - expected_call_args, - callback_mock.call_args, - ) - - # Now deny any room creation. - return_value = False - - # Create a room and invite the 3PIDs, and check that it failed. - channel = self.make_request( - "POST", - "/createRoom", - json.dumps({"invite_3pid": invited_3pids}).encode("utf8"), - ) - self.assertEqual(403, channel.code) - - # Check that do_3pid_invite wasn't called this time. - self.assertEquals(do_3pid_invite_mock.call_count, len(invited_3pids)) - def test_spam_checker_may_join_room(self): """Tests that the user_may_join_room spam checker callback is correctly bypassed when creating a new room. -- cgit 1.4.1 From 55113dd5e880815b3d7881f72147f25f37b00045 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Sat, 12 Feb 2022 14:33:49 +0000 Subject: Notify users, rather than rooms, of device list updates (#11905) Co-authored-by: Patrick Cloke --- changelog.d/11905.misc | 1 + synapse/handlers/device.py | 8 +++----- 2 files changed, 4 insertions(+), 5 deletions(-) create mode 100644 changelog.d/11905.misc (limited to 'synapse/handlers') diff --git a/changelog.d/11905.misc b/changelog.d/11905.misc new file mode 100644 index 0000000000..4f170cf01a --- /dev/null +++ b/changelog.d/11905.misc @@ -0,0 +1 @@ +Preparation to support sending device list updates to application services. \ No newline at end of file diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index b184a48cb1..36c05f8363 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -495,13 +495,11 @@ class DeviceHandler(DeviceWorkerHandler): "Notifying about update %r/%r, ID: %r", user_id, device_id, position ) - room_ids = await self.store.get_rooms_for_user(user_id) - # specify the user ID too since the user should always get their own device list # updates, even if they aren't in any rooms. - self.notifier.on_new_event( - "device_list_key", position, users=[user_id], rooms=room_ids - ) + users_to_notify = users_who_share_room.union({user_id}) + + self.notifier.on_new_event("device_list_key", position, users=users_to_notify) if hosts: logger.info( -- cgit 1.4.1