From c16e192e2f9970cc62adfd758034244631968102 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Nov 2019 15:49:43 +0000 Subject: Fix caching devices for remote servers in worker. When the `/keys/query` API is hit on client_reader worker Synapse may decide that it needs to resync some remote deivces. Usually this happens on master, and then gets cached. However, that fails on workers and so it falls back to fetching devices from remotes directly, which may in turn fail if the remote is down. --- synapse/replication/http/__init__.py | 10 +++++- synapse/replication/http/devices.py | 69 ++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 synapse/replication/http/devices.py (limited to 'synapse/replication') diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 81b85352b1..28dbc6fcba 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -14,7 +14,14 @@ # limitations under the License. from synapse.http.server import JsonResource -from synapse.replication.http import federation, login, membership, register, send_event +from synapse.replication.http import ( + devices, + federation, + login, + membership, + register, + send_event, +) REPLICATION_PREFIX = "/_synapse/replication" @@ -30,3 +37,4 @@ class ReplicationRestResource(JsonResource): federation.register_servlets(hs, self) login.register_servlets(hs, self) register.register_servlets(hs, self) + devices.register_servlets(hs, self) diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py new file mode 100644 index 0000000000..795ca7b65e --- /dev/null +++ b/synapse/replication/http/devices.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from synapse.replication.http._base import ReplicationEndpoint + +logger = logging.getLogger(__name__) + + +class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint): + """Notifies that a user has joined or left the room + + Request format: + + POST /_synapse/replication/user_device_resync/:user_id + + {} + + Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id` + response, e.g.: + + { + "user_id": "@alice:example.org", + "devices": [ + { + "device_id": "JLAFKJWSCS", + "keys": { ... }, + "device_display_name": "Alice's Mobile Phone" + } + ] + } + """ + + NAME = "user_device_resync" + PATH_ARGS = ("user_id",) + CACHE = False + + def __init__(self, hs): + super(ReplicationUserDevicesResyncRestServlet, self).__init__(hs) + + self.device_list_updater = hs.get_device_handler().device_list_updater + self.store = hs.get_datastore() + self.clock = hs.get_clock() + + @staticmethod + def _serialize_payload(user_id): + return {} + + async def _handle_request(self, request, user_id): + user_devices = await self.device_list_updater.user_device_resync(user_id) + + return 200, user_devices + + +def register_servlets(hs, http_server): + ReplicationUserDevicesResyncRestServlet(hs).register(http_server) -- cgit 1.5.1 From 4059d61e2608ac823ef04fe37f23fcac2387a37b Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 6 Nov 2019 12:01:54 +0000 Subject: Don't forget to ratelimit calls outside of RegistrationHandler --- synapse/handlers/register.py | 4 ++-- synapse/replication/http/register.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 3c142a4395..8be82e3754 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -168,7 +168,7 @@ class RegistrationHandler(BaseHandler): Raises: RegistrationError if there was a problem registering. """ - yield self._check_registration_ratelimit(address) + yield self.check_registration_ratelimit(address) yield self.auth.check_auth_blocking(threepid=threepid) password_hash = None @@ -415,7 +415,7 @@ class RegistrationHandler(BaseHandler): ratelimit=False, ) - def _check_registration_ratelimit(self, address): + def check_registration_ratelimit(self, address): """A simple helper method to check whether the registration rate limit has been hit for a given IP address diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py index 915cfb9430..6f4bba7aa4 100644 --- a/synapse/replication/http/register.py +++ b/synapse/replication/http/register.py @@ -75,6 +75,8 @@ class ReplicationRegisterServlet(ReplicationEndpoint): async def _handle_request(self, request, user_id): content = parse_json_object_from_request(request) + await self.registration_handler.check_registration_ratelimit(content["address"]) + await self.registration_handler.register_with_store( user_id=user_id, password_hash=content["password_hash"], -- cgit 1.5.1 From 1fe3cc2c9c59001a6d3f7b28f81bd6681c3c03ac Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 6 Nov 2019 14:54:24 +0000 Subject: Address review comments --- synapse/handlers/register.py | 24 ++++++++++++------------ synapse/replication/http/register.py | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 8be82e3754..47b9ae8d7f 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -24,7 +24,6 @@ from synapse.api.errors import ( AuthError, Codes, ConsentNotGivenError, - LimitExceededError, RegistrationError, SynapseError, ) @@ -218,8 +217,8 @@ class RegistrationHandler(BaseHandler): else: # autogen a sequential user ID - user = None - while not user: + # Fail after being unable to find a suitable ID a few times + for x in range(10): localpart = yield self._generate_user_id() user = UserID(localpart, self.hs.hostname) user_id = user.to_string() @@ -234,10 +233,12 @@ class RegistrationHandler(BaseHandler): create_profile_with_displayname=default_display_name, address=address, ) + + # Successfully registered + break except SynapseError: # if user id is taken, just generate another - user = None - user_id = None + pass if not self.hs.config.user_consent_at_registration: yield self._auto_join_rooms(user_id) @@ -420,25 +421,24 @@ class RegistrationHandler(BaseHandler): for a given IP address Args: - address (str): the IP address used to perform the registration. + address (str|None): the IP address used to perform the registration. If this is + None, no ratelimiting will be performed. Raises: LimitExceededError: If the rate limit has been exceeded. """ + if not address: + return + time_now = self.clock.time() - allowed, time_allowed = self.ratelimiter.can_do_action( + self.ratelimiter.ratelimit( address, time_now_s=time_now, rate_hz=self.hs.config.rc_registration.per_second, burst_count=self.hs.config.rc_registration.burst_count, ) - if not allowed: - raise LimitExceededError( - retry_after_ms=int(1000 * (time_allowed - time_now)) - ) - def register_with_store( self, user_id, diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py index 6f4bba7aa4..0c4aca1291 100644 --- a/synapse/replication/http/register.py +++ b/synapse/replication/http/register.py @@ -75,7 +75,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint): async def _handle_request(self, request, user_id): content = parse_json_object_from_request(request) - await self.registration_handler.check_registration_ratelimit(content["address"]) + self.registration_handler.check_registration_ratelimit(content["address"]) await self.registration_handler.register_with_store( user_id=user_id, -- cgit 1.5.1 From c4bdf2d7855dcddb7e294e9dba458884db2be7e0 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 8 Nov 2019 11:42:55 +0000 Subject: Remove content from being sent for account data rdata stream --- synapse/replication/tcp/streams/_base.py | 6 +++--- synapse/storage/data_stores/main/account_data.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 9e45429d49..dd87733842 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -89,7 +89,7 @@ TagAccountDataStreamRow = namedtuple( ) AccountDataStreamRow = namedtuple( "AccountDataStream", - ("user_id", "room_id", "data_type", "data"), # str # str # str # dict + ("user_id", "room_id", "data_type"), # str # str # str ) GroupsStreamRow = namedtuple( "GroupsStreamRow", @@ -421,8 +421,8 @@ class AccountDataStream(Stream): results = list(room_results) results.extend( - (stream_id, user_id, None, account_data_type, content) - for stream_id, user_id, account_data_type, content in global_results + (stream_id, user_id, None, account_data_type) + for stream_id, user_id, account_data_type in global_results ) return results diff --git a/synapse/storage/data_stores/main/account_data.py b/synapse/storage/data_stores/main/account_data.py index 6afbfc0d74..22093484ed 100644 --- a/synapse/storage/data_stores/main/account_data.py +++ b/synapse/storage/data_stores/main/account_data.py @@ -184,14 +184,14 @@ class AccountDataWorkerStore(SQLBaseStore): current_id(int): The position to fetch up to. Returns: A deferred pair of lists of tuples of stream_id int, user_id string, - room_id string, type string, and content string. + room_id string, and type string. """ if last_room_id == current_id and last_global_id == current_id: return defer.succeed(([], [])) def get_updated_account_data_txn(txn): sql = ( - "SELECT stream_id, user_id, account_data_type, content" + "SELECT stream_id, user_id, account_data_type" " FROM account_data WHERE ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC LIMIT ?" ) @@ -199,7 +199,7 @@ class AccountDataWorkerStore(SQLBaseStore): global_results = txn.fetchall() sql = ( - "SELECT stream_id, user_id, room_id, account_data_type, content" + "SELECT stream_id, user_id, room_id, account_data_type" " FROM room_account_data WHERE ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC LIMIT ?" ) -- cgit 1.5.1 From cd96b4586f8dfa8a6857e1bee7de2bd9660238d5 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 8 Nov 2019 15:45:45 +0000 Subject: lint --- synapse/replication/tcp/streams/_base.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index dd87733842..8512923eae 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -88,8 +88,7 @@ TagAccountDataStreamRow = namedtuple( "TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict ) AccountDataStreamRow = namedtuple( - "AccountDataStream", - ("user_id", "room_id", "data_type"), # str # str # str + "AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str ) GroupsStreamRow = namedtuple( "GroupsStreamRow", -- cgit 1.5.1 From 35f9165e96f6261e15aadb439a5d2199bede3c99 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Nov 2019 12:04:48 +0000 Subject: Fixup docs --- changelog.d/6332.bugfix | 2 +- synapse/replication/http/devices.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) (limited to 'synapse/replication') diff --git a/changelog.d/6332.bugfix b/changelog.d/6332.bugfix index b14bd7e43c..67d5170ba0 100644 --- a/changelog.d/6332.bugfix +++ b/changelog.d/6332.bugfix @@ -1 +1 @@ -Fix caching devices for remote users when using workers. +Fix caching devices for remote users when using workers, so that we don't attempt to refetch (and potentially fail) each time a user requests devices. diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py index 795ca7b65e..e32aac0a25 100644 --- a/synapse/replication/http/devices.py +++ b/synapse/replication/http/devices.py @@ -21,7 +21,11 @@ logger = logging.getLogger(__name__) class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint): - """Notifies that a user has joined or left the room + """Ask master to resync the device list for a user by contacting their + server. + + This must happen on master so that the results can be correctly cached in + the database and streamed to workers. Request format: -- cgit 1.5.1 From 2173785f0d9124037ca841b568349ad0424b39cd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Nov 2019 11:31:56 +0000 Subject: Propagate reason in remotely rejected invites --- synapse/handlers/federation.py | 4 ++-- synapse/handlers/room_member.py | 13 +++++++++---- synapse/handlers/room_member_worker.py | 5 ++++- synapse/replication/http/membership.py | 7 +++++-- 4 files changed, 20 insertions(+), 9 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a5ae7b77d1..d3267734f7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1428,9 +1428,9 @@ class FederationHandler(BaseHandler): return event @defer.inlineCallbacks - def do_remotely_reject_invite(self, target_hosts, room_id, user_id): + def do_remotely_reject_invite(self, target_hosts, room_id, user_id, content): origin, event, event_format_version = yield self._make_and_verify_event( - target_hosts, room_id, user_id, "leave" + target_hosts, room_id, user_id, "leave", content=content, ) # Mark as outlier as we don't have any state for this event; we're not # even in the room. diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 6cfee4b361..7b7270fc61 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -94,7 +94,9 @@ class RoomMemberHandler(object): raise NotImplementedError() @abc.abstractmethod - def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): + def _remote_reject_invite( + self, requester, remote_room_hosts, room_id, target, content + ): """Attempt to reject an invite for a room this server is not in. If we fail to do so we locally mark the invite as rejected. @@ -104,6 +106,7 @@ class RoomMemberHandler(object): reject invite room_id (str) target (UserID): The user rejecting the invite + content (dict): The content for the rejection event Returns: Deferred[dict]: A dictionary to be returned to the client, may @@ -471,7 +474,7 @@ class RoomMemberHandler(object): # send the rejection to the inviter's HS. remote_room_hosts = remote_room_hosts + [inviter.domain] res = yield self._remote_reject_invite( - requester, remote_room_hosts, room_id, target + requester, remote_room_hosts, room_id, target, content, ) return res @@ -971,13 +974,15 @@ class RoomMemberMasterHandler(RoomMemberHandler): ) @defer.inlineCallbacks - def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): + def _remote_reject_invite( + self, requester, remote_room_hosts, room_id, target, content + ): """Implements RoomMemberHandler._remote_reject_invite """ fed_handler = self.federation_handler try: ret = yield fed_handler.do_remotely_reject_invite( - remote_room_hosts, room_id, target.to_string() + remote_room_hosts, room_id, target.to_string(), content=content, ) return ret except Exception as e: diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 75e96ae1a2..69be86893b 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -55,7 +55,9 @@ class RoomMemberWorkerHandler(RoomMemberHandler): return ret - def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): + def _remote_reject_invite( + self, requester, remote_room_hosts, room_id, target, content + ): """Implements RoomMemberHandler._remote_reject_invite """ return self._remote_reject_client( @@ -63,6 +65,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): remote_room_hosts=remote_room_hosts, room_id=room_id, user_id=target.to_string(), + content=content, ) def _user_joined_room(self, target, room_id): diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index cc1f249740..3577611fd7 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -93,6 +93,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): { "requester": ..., "remote_room_hosts": [...], + "content": { ... } } """ @@ -107,7 +108,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): self.clock = hs.get_clock() @staticmethod - def _serialize_payload(requester, room_id, user_id, remote_room_hosts): + def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content): """ Args: requester(Requester) @@ -118,12 +119,14 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): return { "requester": requester.serialize(), "remote_room_hosts": remote_room_hosts, + "content": content, } async def _handle_request(self, request, room_id, user_id): content = parse_json_object_from_request(request) remote_room_hosts = content["remote_room_hosts"] + event_content = content["content"] requester = Requester.deserialize(self.store, content["requester"]) @@ -134,7 +137,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): try: event = await self.federation_handler.do_remotely_reject_invite( - remote_room_hosts, room_id, user_id + remote_room_hosts, room_id, user_id, event_content, ) ret = event.get_pdu_json() except Exception as e: -- cgit 1.5.1