From 09957ce0e4dcfd84c2de4039653059faae03065b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 4 Nov 2019 17:09:22 +0000 Subject: Implement per-room message retention policies --- synapse/handlers/federation.py | 2 +- synapse/handlers/message.py | 4 +- synapse/handlers/pagination.py | 111 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 114 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8cafcfdab0..3994137d18 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2454,7 +2454,7 @@ class FederationHandler(BaseHandler): room_version, event_dict, event, context ) - EventValidator().validate_new(event) + EventValidator().validate_new(event, self.config) # We need to tell the transaction queue to send this out, even # though the sender isn't a local user. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index d682dc2b7a..155ed6e06a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -417,7 +417,7 @@ class EventCreationHandler(object): 403, "You must be in the room to create an alias for it" ) - self.validator.validate_new(event) + self.validator.validate_new(event, self.config) return (event, context) @@ -634,7 +634,7 @@ class EventCreationHandler(object): if requester: context.app_service = requester.app_service - self.validator.validate_new(event) + self.validator.validate_new(event, self.config) # If this event is an annotation then we check that that the sender # can't annotate the same way twice (e.g. stops users from liking an diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 97f15a1c32..e1800177fa 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -15,12 +15,15 @@ # limitations under the License. import logging +from six import iteritems + from twisted.internet import defer from twisted.python.failure import Failure from synapse.api.constants import EventTypes, Membership from synapse.api.errors import SynapseError from synapse.logging.context import run_in_background +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.state import StateFilter from synapse.types import RoomStreamToken from synapse.util.async_helpers import ReadWriteLock @@ -80,6 +83,114 @@ class PaginationHandler(object): self._purges_by_id = {} self._event_serializer = hs.get_event_client_serializer() + self._retention_default_max_lifetime = hs.config.retention_default_max_lifetime + + if hs.config.retention_enabled: + # Run the purge jobs described in the configuration file. + for job in hs.config.retention_purge_jobs: + self.clock.looping_call( + run_as_background_process, + job["interval"], + "purge_history_for_rooms_in_range", + self.purge_history_for_rooms_in_range, + job["shortest_max_lifetime"], + job["longest_max_lifetime"], + ) + + @defer.inlineCallbacks + def purge_history_for_rooms_in_range(self, min_ms, max_ms): + """Purge outdated events from rooms within the given retention range. + + If a default retention policy is defined in the server's configuration and its + 'max_lifetime' is within this range, also targets rooms which don't have a + retention policy. + + Args: + min_ms (int|None): Duration in milliseconds that define the lower limit of + the range to handle (exclusive). If None, it means that the range has no + lower limit. + max_ms (int|None): Duration in milliseconds that define the upper limit of + the range to handle (inclusive). If None, it means that the range has no + upper limit. + """ + # We want the storage layer to to include rooms with no retention policy in its + # return value only if a default retention policy is defined in the server's + # configuration and that policy's 'max_lifetime' is either lower (or equal) than + # max_ms or higher than min_ms (or both). + if self._retention_default_max_lifetime is not None: + include_null = True + + if min_ms is not None and min_ms >= self._retention_default_max_lifetime: + # The default max_lifetime is lower than (or equal to) min_ms. + include_null = False + + if max_ms is not None and max_ms < self._retention_default_max_lifetime: + # The default max_lifetime is higher than max_ms. + include_null = False + else: + include_null = False + + rooms = yield self.store.get_rooms_for_retention_period_in_range( + min_ms, max_ms, include_null + ) + + for room_id, retention_policy in iteritems(rooms): + if room_id in self._purges_in_progress_by_room: + logger.warning( + "[purge] not purging room %s as there's an ongoing purge running" + " for this room", + room_id, + ) + continue + + max_lifetime = retention_policy["max_lifetime"] + + if max_lifetime is None: + # If max_lifetime is None, it means that include_null equals True, + # therefore we can safely assume that there is a default policy defined + # in the server's configuration. + max_lifetime = self._retention_default_max_lifetime + + # Figure out what token we should start purging at. + ts = self.clock.time_msec() - max_lifetime + + stream_ordering = ( + yield self.store.find_first_stream_ordering_after_ts(ts) + ) + + r = ( + yield self.store.get_room_event_after_stream_ordering( + room_id, stream_ordering, + ) + ) + if not r: + logger.warning( + "[purge] purging events not possible: No event found " + "(ts %i => stream_ordering %i)", + ts, stream_ordering, + ) + continue + + (stream, topo, _event_id) = r + token = "t%d-%d" % (topo, stream) + + purge_id = random_string(16) + + self._purges_by_id[purge_id] = PurgeStatus() + + logger.info( + "Starting purging events in room %s (purge_id %s)" % (room_id, purge_id) + ) + + # We want to purge everything, including local events, and to run the purge in + # the background so that it's not blocking any other operation apart from + # other purges in the same room. + run_as_background_process( + "_purge_history", + self._purge_history, + purge_id, room_id, token, True, + ) + def start_purge_history(self, room_id, token, delete_local_events=False): """Start off a history purge on a room. -- cgit 1.5.1 From 1dffa78701d43b299419090d544fb8bb91ab4d5b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 5 Nov 2019 12:21:59 +0000 Subject: Filter events_before and events_after in /context requests While the current version of the spec doesn't say much about how this endpoint uses filters (see https://github.com/matrix-org/matrix-doc/issues/2338), the current implementation is that some fields of an EventFilter apply (the ones that are used when running the SQL query) and others don't (the ones that are used by the filter itself) because we don't call event_filter.filter(...). This seems counter-intuitive and probably not what we want so this commit fixes it. --- synapse/handlers/room.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index e92b2eafd5..899bb63114 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -874,8 +874,10 @@ class RoomContextHandler(object): room_id, event_id, before_limit, after_limit, event_filter ) - results["events_before"] = yield filter_evts(results["events_before"]) - results["events_after"] = yield filter_evts(results["events_after"]) + filtered_before_events = event_filter.filter(results["events_before"]) + results["events_before"] = yield filter_evts(filtered_before_events) + filtered_after_events = event_filter.filter(results["events_after"]) + results["events_after"] = yield filter_evts(filtered_after_events) results["event"] = event if results["events_after"]: -- cgit 1.5.1 From f141af4c79b2be8e87d683420e2d8117e2a8525c Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 5 Nov 2019 14:52:38 +0000 Subject: Update copyright --- synapse/handlers/room.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 899bb63114..f6e162484c 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2014 - 2016 OpenMarket Ltd -# Copyright 2018 New Vector Ltd +# Copyright 2018-2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. -- cgit 1.5.1 From cb2cbe4d26b5d0c082c82a62260c0c05afde8aeb Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 5 Nov 2019 15:27:38 +0000 Subject: Only filter if a filter was provided --- synapse/handlers/room.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index f6e162484c..f47237b3fb 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -875,10 +875,12 @@ class RoomContextHandler(object): room_id, event_id, before_limit, after_limit, event_filter ) - filtered_before_events = event_filter.filter(results["events_before"]) - results["events_before"] = yield filter_evts(filtered_before_events) - filtered_after_events = event_filter.filter(results["events_after"]) - results["events_after"] = yield filter_evts(filtered_after_events) + if event_filter: + results["events_before"] = event_filter.filter(results["events_before"]) + results["events_after"] = event_filter.filter(results["events_after"]) + + results["events_before"] = yield filter_evts(results["events_before"]) + results["events_after"] = yield filter_evts(results["events_after"]) results["event"] = event if results["events_after"]: -- cgit 1.5.1 From c16e192e2f9970cc62adfd758034244631968102 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Nov 2019 15:49:43 +0000 Subject: Fix caching devices for remote servers in worker. When the `/keys/query` API is hit on client_reader worker Synapse may decide that it needs to resync some remote deivces. Usually this happens on master, and then gets cached. However, that fails on workers and so it falls back to fetching devices from remotes directly, which may in turn fail if the remote is down. --- synapse/handlers/e2e_keys.py | 19 ++++++++-- synapse/replication/http/__init__.py | 10 +++++- synapse/replication/http/devices.py | 69 ++++++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 synapse/replication/http/devices.py (limited to 'synapse/handlers') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index f09a0b73c8..28c12753c1 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -30,6 +30,7 @@ from twisted.internet import defer from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace +from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.types import ( UserID, get_domain_from_id, @@ -53,6 +54,12 @@ class E2eKeysHandler(object): self._edu_updater = SigningKeyEduUpdater(hs, self) + self._is_master = hs.config.worker_app is None + if not self._is_master: + self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client( + hs + ) + federation_registry = hs.get_federation_registry() # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec @@ -191,9 +198,15 @@ class E2eKeysHandler(object): # probably be tracking their device lists. However, we haven't # done an initial sync on the device list so we do it now. try: - user_devices = yield self.device_handler.device_list_updater.user_device_resync( - user_id - ) + if self._is_master: + user_devices = yield self.device_handler.device_list_updater.user_device_resync( + user_id + ) + else: + user_devices = yield self._user_device_resync_client( + user_id=user_id + ) + user_devices = user_devices["devices"] for device in user_devices: results[user_id] = {device["device_id"]: device["keys"]} diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 81b85352b1..28dbc6fcba 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -14,7 +14,14 @@ # limitations under the License. from synapse.http.server import JsonResource -from synapse.replication.http import federation, login, membership, register, send_event +from synapse.replication.http import ( + devices, + federation, + login, + membership, + register, + send_event, +) REPLICATION_PREFIX = "/_synapse/replication" @@ -30,3 +37,4 @@ class ReplicationRestResource(JsonResource): federation.register_servlets(hs, self) login.register_servlets(hs, self) register.register_servlets(hs, self) + devices.register_servlets(hs, self) diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py new file mode 100644 index 0000000000..795ca7b65e --- /dev/null +++ b/synapse/replication/http/devices.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from synapse.replication.http._base import ReplicationEndpoint + +logger = logging.getLogger(__name__) + + +class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint): + """Notifies that a user has joined or left the room + + Request format: + + POST /_synapse/replication/user_device_resync/:user_id + + {} + + Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id` + response, e.g.: + + { + "user_id": "@alice:example.org", + "devices": [ + { + "device_id": "JLAFKJWSCS", + "keys": { ... }, + "device_display_name": "Alice's Mobile Phone" + } + ] + } + """ + + NAME = "user_device_resync" + PATH_ARGS = ("user_id",) + CACHE = False + + def __init__(self, hs): + super(ReplicationUserDevicesResyncRestServlet, self).__init__(hs) + + self.device_list_updater = hs.get_device_handler().device_list_updater + self.store = hs.get_datastore() + self.clock = hs.get_clock() + + @staticmethod + def _serialize_payload(user_id): + return {} + + async def _handle_request(self, request, user_id): + user_devices = await self.device_list_updater.user_device_resync(user_id) + + return 200, user_devices + + +def register_servlets(hs, http_server): + ReplicationUserDevicesResyncRestServlet(hs).register(http_server) -- cgit 1.5.1 From eda14737cf0faf789ec587633b12bb2cf65fa305 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 6 Nov 2019 18:14:03 +0000 Subject: Also filter state events --- synapse/handlers/room.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index f47237b3fb..3148df0de9 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -907,7 +907,13 @@ class RoomContextHandler(object): state = yield self.state_store.get_state_for_events( [last_event_id], state_filter=state_filter ) - results["state"] = list(state[last_event_id].values()) + + # Apply the filter on state events. + state_events = list(state[last_event_id].values()) + if event_filter: + state_events = event_filter.filter(state_events) + + results["state"] = list(state_events) # We use a dummy token here as we only care about the room portion of # the token, which we replace. -- cgit 1.5.1 From 772d414975608c03d5690e2d8f65c7f382403a99 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 11 Oct 2019 16:05:21 +0100 Subject: Simplify _update_auth_events_and_context_for_auth move event_key calculation into _update_context_for_auth_events, since it's only used there. --- synapse/handlers/federation.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 05dd8d2671..ab152e8dcd 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2098,11 +2098,6 @@ class FederationHandler(BaseHandler): """ event_auth_events = set(event.auth_event_ids()) - if event.is_state(): - event_key = (event.type, event.state_key) - else: - event_key = None - # if the event's auth_events refers to events which are not in our # calculated auth_events, we need to fetch those events from somewhere. # @@ -2231,13 +2226,13 @@ class FederationHandler(BaseHandler): auth_events.update(new_state) context = yield self._update_context_for_auth_events( - event, context, auth_events, event_key + event, context, auth_events ) return context @defer.inlineCallbacks - def _update_context_for_auth_events(self, event, context, auth_events, event_key): + def _update_context_for_auth_events(self, event, context, auth_events): """Update the state_ids in an event context after auth event resolution, storing the changes as a new state group. @@ -2246,18 +2241,21 @@ class FederationHandler(BaseHandler): context (synapse.events.snapshot.EventContext): initial event context - auth_events (dict[(str, str)->str]): Events to update in the event + auth_events (dict[(str, str)->EventBase]): Events to update in the event context. - event_key ((str, str)): (type, state_key) for the current event. - this will not be included in the current_state in the context. - Returns: Deferred[EventContext]: new event context """ + # exclude the state key of the new event from the current_state in the context. + if event.is_state(): + event_key = (event.type, event.state_key) + else: + event_key = None state_updates = { k: a.event_id for k, a in iteritems(auth_events) if k != event_key } + current_state_ids = yield context.get_current_state_ids(self.store) current_state_ids = dict(current_state_ids) -- cgit 1.5.1 From f8407975e7ad080d04a028771ae5a84590a19da1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 8 Nov 2019 12:18:20 +0000 Subject: Update some docstrings and comments --- synapse/handlers/federation.py | 39 +++++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ab152e8dcd..4bc4d57efb 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2040,8 +2040,10 @@ class FederationHandler(BaseHandler): auth_events (dict[(str, str)->synapse.events.EventBase]): Map from (event_type, state_key) to event - What we expect the event's auth_events to be, based on the event's - position in the dag. I think? maybe?? + Normally, our calculated auth_events based on the state of the room + at the event's position in the DAG, though occasionally (eg if the + event is an outlier), may be the auth events claimed by the remote + server. Also NB that this function adds entries to it. Returns: @@ -2091,25 +2093,35 @@ class FederationHandler(BaseHandler): origin (str): event (synapse.events.EventBase): context (synapse.events.snapshot.EventContext): + auth_events (dict[(str, str)->synapse.events.EventBase]): + Map from (event_type, state_key) to event + + Normally, our calculated auth_events based on the state of the room + at the event's position in the DAG, though occasionally (eg if the + event is an outlier), may be the auth events claimed by the remote + server. + + Also NB that this function adds entries to it. Returns: defer.Deferred[EventContext]: updated context """ event_auth_events = set(event.auth_event_ids()) - # if the event's auth_events refers to events which are not in our - # calculated auth_events, we need to fetch those events from somewhere. - # - # we start by fetching them from the store, and then try calling /event_auth/. + # missing_auth is the set of the event's auth_events which we don't yet have + # in auth_events. missing_auth = event_auth_events.difference( e.event_id for e in auth_events.values() ) + # if we have missing events, we need to fetch those events from somewhere. + # + # we start by checking if they are in the store, and then try calling /event_auth/. if missing_auth: # TODO: can we use store.have_seen_events here instead? have_events = yield self.store.get_seen_events_with_rejections(missing_auth) - logger.debug("Got events %s from store", have_events) + logger.debug("Found events %s in the store", have_events) missing_auth.difference_update(have_events.keys()) else: have_events = {} @@ -2164,15 +2176,23 @@ class FederationHandler(BaseHandler): event.auth_event_ids() ) except Exception: - # FIXME: logger.exception("Failed to get auth chain") if event.internal_metadata.is_outlier(): + # XXX: given that, for an outlier, we'll be working with the + # event's *claimed* auth events rather than those we calculated: + # (a) is there any point in this test, since different_auth below will + # obviously be empty + # (b) alternatively, why don't we do it earlier? logger.info("Skipping auth_event fetch for outlier") return context # FIXME: Assumes we have and stored all the state for all the # prev_events + # + # FIXME: what does the fixme above mean? where do prev_events come into + # it, why do we care about the state for those events, and what does "have and + # stored" mean? Seems erik wrote it in c1d860870b different_auth = event_auth_events.difference( e.event_id for e in auth_events.values() ) @@ -2186,6 +2206,9 @@ class FederationHandler(BaseHandler): different_auth, ) + # now we state-resolve between our own idea of the auth events, and the remote's + # idea of them. + room_version = yield self.store.get_room_version(event.room_id) different_events = yield make_deferred_yieldable( -- cgit 1.5.1 From f41027f74678f35ad9e9eb2531c416dd58a65127 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 8 Nov 2019 12:21:28 +0000 Subject: Use get_events_as_list rather than lots of calls to get_event It's more efficient and clearer. --- synapse/handlers/federation.py | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 4bc4d57efb..3d4197ed69 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2210,26 +2210,18 @@ class FederationHandler(BaseHandler): # idea of them. room_version = yield self.store.get_room_version(event.room_id) + different_event_ids = [ + d for d in different_auth if d in have_events and not have_events[d] + ] - different_events = yield make_deferred_yieldable( - defer.gatherResults( - [ - run_in_background( - self.store.get_event, d, allow_none=True, allow_rejected=False - ) - for d in different_auth - if d in have_events and not have_events[d] - ], - consumeErrors=True, - ) - ).addErrback(unwrapFirstError) + if different_event_ids: + # XXX: currently this checks for redactions but I'm not convinced that is + # necessary? + different_events = yield self.store.get_events_as_list(different_event_ids) - if different_events: local_view = dict(auth_events) remote_view = dict(auth_events) - remote_view.update( - {(d.type, d.state_key): d for d in different_events if d} - ) + remote_view.update({(d.type, d.state_key): d for d in different_events}) new_state = yield self.state_handler.resolve_events( room_version, -- cgit 1.5.1 From 7c24d0f443724082376c89f9f75954d81f524a8e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 19 Nov 2019 13:22:37 +0000 Subject: Lint --- synapse/config/server.py | 39 ++++++++++------- synapse/handlers/pagination.py | 17 +++----- synapse/storage/data_stores/main/room.py | 49 +++++++++++---------- tests/rest/client/test_retention.py | 73 ++++++++++---------------------- 4 files changed, 77 insertions(+), 101 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/config/server.py b/synapse/config/server.py index aa93a416f1..8a55ffac4f 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -19,7 +19,7 @@ import logging import os.path import re from textwrap import indent -from typing import List +from typing import List, Dict, Optional import attr import yaml @@ -287,13 +287,17 @@ class ServerConfig(Config): self.retention_default_min_lifetime = None self.retention_default_max_lifetime = None - self.retention_allowed_lifetime_min = retention_config.get("allowed_lifetime_min") + self.retention_allowed_lifetime_min = retention_config.get( + "allowed_lifetime_min" + ) if self.retention_allowed_lifetime_min is not None: self.retention_allowed_lifetime_min = self.parse_duration( self.retention_allowed_lifetime_min ) - self.retention_allowed_lifetime_max = retention_config.get("allowed_lifetime_max") + self.retention_allowed_lifetime_max = retention_config.get( + "allowed_lifetime_max" + ) if self.retention_allowed_lifetime_max is not None: self.retention_allowed_lifetime_max = self.parse_duration( self.retention_allowed_lifetime_max @@ -302,14 +306,15 @@ class ServerConfig(Config): if ( self.retention_allowed_lifetime_min is not None and self.retention_allowed_lifetime_max is not None - and self.retention_allowed_lifetime_min > self.retention_allowed_lifetime_max + and self.retention_allowed_lifetime_min + > self.retention_allowed_lifetime_max ): raise ConfigError( "Invalid retention policy limits: 'allowed_lifetime_min' can not be" " greater than 'allowed_lifetime_max'" ) - self.retention_purge_jobs = [] + self.retention_purge_jobs = [] # type: List[Dict[str, Optional[int]]] for purge_job_config in retention_config.get("purge_jobs", []): interval_config = purge_job_config.get("interval") @@ -342,18 +347,22 @@ class ServerConfig(Config): " 'longest_max_lifetime' value." ) - self.retention_purge_jobs.append({ - "interval": interval, - "shortest_max_lifetime": shortest_max_lifetime, - "longest_max_lifetime": longest_max_lifetime, - }) + self.retention_purge_jobs.append( + { + "interval": interval, + "shortest_max_lifetime": shortest_max_lifetime, + "longest_max_lifetime": longest_max_lifetime, + } + ) if not self.retention_purge_jobs: - self.retention_purge_jobs = [{ - "interval": self.parse_duration("1d"), - "shortest_max_lifetime": None, - "longest_max_lifetime": None, - }] + self.retention_purge_jobs = [ + { + "interval": self.parse_duration("1d"), + "shortest_max_lifetime": None, + "longest_max_lifetime": None, + } + ] self.listeners = [] # type: List[dict] for listener in config.get("listeners", []): diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index e1800177fa..d122c11a4d 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -154,20 +154,17 @@ class PaginationHandler(object): # Figure out what token we should start purging at. ts = self.clock.time_msec() - max_lifetime - stream_ordering = ( - yield self.store.find_first_stream_ordering_after_ts(ts) - ) + stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts) - r = ( - yield self.store.get_room_event_after_stream_ordering( - room_id, stream_ordering, - ) + r = yield self.store.get_room_event_after_stream_ordering( + room_id, stream_ordering, ) if not r: logger.warning( "[purge] purging events not possible: No event found " "(ts %i => stream_ordering %i)", - ts, stream_ordering, + ts, + stream_ordering, ) continue @@ -186,9 +183,7 @@ class PaginationHandler(object): # the background so that it's not blocking any other operation apart from # other purges in the same room. run_as_background_process( - "_purge_history", - self._purge_history, - purge_id, room_id, token, True, + "_purge_history", self._purge_history, purge_id, room_id, token, True, ) def start_purge_history(self, room_id, token, delete_local_events=False): diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py index 54a7d24c73..7fceae59ca 100644 --- a/synapse/storage/data_stores/main/room.py +++ b/synapse/storage/data_stores/main/room.py @@ -334,8 +334,9 @@ class RoomStore(RoomWorkerStore, SearchStore): WHERE state.room_id > ? AND state.type = '%s' ORDER BY state.room_id ASC LIMIT ?; - """ % EventTypes.Retention, - (last_room, batch_size) + """ + % EventTypes.Retention, + (last_room, batch_size), ) rows = self.cursor_to_dict(txn) @@ -358,15 +359,13 @@ class RoomStore(RoomWorkerStore, SearchStore): "event_id": row["event_id"], "min_lifetime": retention_policy.get("min_lifetime"), "max_lifetime": retention_policy.get("max_lifetime"), - } + }, ) logger.info("Inserted %d rows into room_retention", len(rows)) self._background_update_progress_txn( - txn, "insert_room_retention", { - "room_id": rows[-1]["room_id"], - } + txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]} ) if batch_size > len(rows): @@ -375,8 +374,7 @@ class RoomStore(RoomWorkerStore, SearchStore): return False end = yield self.runInteraction( - "insert_room_retention", - _background_insert_retention_txn, + "insert_room_retention", _background_insert_retention_txn, ) if end: @@ -585,17 +583,15 @@ class RoomStore(RoomWorkerStore, SearchStore): ) def _store_retention_policy_for_room_txn(self, txn, event): - if ( - hasattr(event, "content") - and ("min_lifetime" in event.content or "max_lifetime" in event.content) + if hasattr(event, "content") and ( + "min_lifetime" in event.content or "max_lifetime" in event.content ): if ( - ("min_lifetime" in event.content and not isinstance( - event.content.get("min_lifetime"), integer_types - )) - or ("max_lifetime" in event.content and not isinstance( - event.content.get("max_lifetime"), integer_types - )) + "min_lifetime" in event.content + and not isinstance(event.content.get("min_lifetime"), integer_types) + ) or ( + "max_lifetime" in event.content + and not isinstance(event.content.get("max_lifetime"), integer_types) ): # Ignore the event if one of the value isn't an integer. return @@ -798,7 +794,9 @@ class RoomStore(RoomWorkerStore, SearchStore): return local_media_mxcs, remote_media_mxcs @defer.inlineCallbacks - def get_rooms_for_retention_period_in_range(self, min_ms, max_ms, include_null=False): + def get_rooms_for_retention_period_in_range( + self, min_ms, max_ms, include_null=False + ): """Retrieves all of the rooms within the given retention range. Optionally includes the rooms which don't have a retention policy. @@ -904,23 +902,24 @@ class RoomStore(RoomWorkerStore, SearchStore): INNER JOIN current_state_events USING (event_id, room_id) WHERE room_id = ?; """, - (room_id,) + (room_id,), ) return self.cursor_to_dict(txn) ret = yield self.runInteraction( - "get_retention_policy_for_room", - get_retention_policy_for_room_txn, + "get_retention_policy_for_room", get_retention_policy_for_room_txn, ) # If we don't know this room ID, ret will be None, in this case return the default # policy. if not ret: - defer.returnValue({ - "min_lifetime": self.config.retention_default_min_lifetime, - "max_lifetime": self.config.retention_default_max_lifetime, - }) + defer.returnValue( + { + "min_lifetime": self.config.retention_default_min_lifetime, + "max_lifetime": self.config.retention_default_max_lifetime, + } + ) row = ret[0] diff --git a/tests/rest/client/test_retention.py b/tests/rest/client/test_retention.py index 7b6f25a838..6bf485c239 100644 --- a/tests/rest/client/test_retention.py +++ b/tests/rest/client/test_retention.py @@ -61,9 +61,7 @@ class RetentionTestCase(unittest.HomeserverTestCase): self.helper.send_state( room_id=room_id, event_type=EventTypes.Retention, - body={ - "max_lifetime": one_day_ms * 4, - }, + body={"max_lifetime": one_day_ms * 4}, tok=self.token, expect_code=400, ) @@ -71,9 +69,7 @@ class RetentionTestCase(unittest.HomeserverTestCase): self.helper.send_state( room_id=room_id, event_type=EventTypes.Retention, - body={ - "max_lifetime": one_hour_ms, - }, + body={"max_lifetime": one_hour_ms}, tok=self.token, expect_code=400, ) @@ -89,9 +85,7 @@ class RetentionTestCase(unittest.HomeserverTestCase): self.helper.send_state( room_id=room_id, event_type=EventTypes.Retention, - body={ - "max_lifetime": lifetime, - }, + body={"max_lifetime": lifetime}, tok=self.token, ) @@ -115,20 +109,12 @@ class RetentionTestCase(unittest.HomeserverTestCase): events = [] # Send a first event, which should be filtered out at the end of the test. - resp = self.helper.send( - room_id=room_id, - body="1", - tok=self.token, - ) + resp = self.helper.send(room_id=room_id, body="1", tok=self.token) # Get the event from the store so that we end up with a FrozenEvent that we can # give to filter_events_for_client. We need to do this now because the event won't # be in the database anymore after it has expired. - events.append(self.get_success( - store.get_event( - resp.get("event_id") - ) - )) + events.append(self.get_success(store.get_event(resp.get("event_id")))) # Advance the time by 2 days. We're using the default retention policy, therefore # after this the first event will still be valid. @@ -143,20 +129,16 @@ class RetentionTestCase(unittest.HomeserverTestCase): valid_event_id = resp.get("event_id") - events.append(self.get_success( - store.get_event( - valid_event_id - ) - )) + events.append(self.get_success(store.get_event(valid_event_id))) # Advance the time by anothe 2 days. After this, the first event should be # outdated but not the second one. self.reactor.advance(one_day_ms * 2 / 1000) # Run filter_events_for_client with our list of FrozenEvents. - filtered_events = self.get_success(filter_events_for_client( - storage, self.user_id, events - )) + filtered_events = self.get_success( + filter_events_for_client(storage, self.user_id, events) + ) # We should only get one event back. self.assertEqual(len(filtered_events), 1, filtered_events) @@ -172,28 +154,22 @@ class RetentionTestCase(unittest.HomeserverTestCase): # Send a first event to the room. This is the event we'll want to be purged at the # end of the test. - resp = self.helper.send( - room_id=room_id, - body="1", - tok=self.token, - ) + resp = self.helper.send(room_id=room_id, body="1", tok=self.token) expired_event_id = resp.get("event_id") # Check that we can retrieve the event. expired_event = self.get_event(room_id, expired_event_id) - self.assertEqual(expired_event.get("content", {}).get("body"), "1", expired_event) + self.assertEqual( + expired_event.get("content", {}).get("body"), "1", expired_event + ) # Advance the time. self.reactor.advance(increment / 1000) # Send another event. We need this because the purge job won't purge the most # recent event in the room. - resp = self.helper.send( - room_id=room_id, - body="2", - tok=self.token, - ) + resp = self.helper.send(room_id=room_id, body="2", tok=self.token) valid_event_id = resp.get("event_id") @@ -240,8 +216,7 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase): mock_federation_client = Mock(spec=["backfill"]) self.hs = self.setup_test_homeserver( - config=config, - federation_client=mock_federation_client, + config=config, federation_client=mock_federation_client, ) return self.hs @@ -268,9 +243,7 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase): self.helper.send_state( room_id=room_id, event_type=EventTypes.Retention, - body={ - "max_lifetime": one_day_ms * 35, - }, + body={"max_lifetime": one_day_ms * 35}, tok=self.token, ) @@ -289,18 +262,16 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase): # Check that we can retrieve the event. expired_event = self.get_event(room_id, first_event_id) - self.assertEqual(expired_event.get("content", {}).get("body"), "1", expired_event) + self.assertEqual( + expired_event.get("content", {}).get("body"), "1", expired_event + ) # Advance the time by a month. self.reactor.advance(one_day_ms * 30 / 1000) # Send another event. We need this because the purge job won't purge the most # recent event in the room. - resp = self.helper.send( - room_id=room_id, - body="2", - tok=self.token, - ) + resp = self.helper.send(room_id=room_id, body="2", tok=self.token) second_event_id = resp.get("event_id") @@ -313,7 +284,9 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase): ) if expected_code_for_first_event == 200: - self.assertEqual(first_event.get("content", {}).get("body"), "1", first_event) + self.assertEqual( + first_event.get("content", {}).get("body"), "1", first_event + ) # Check that the event that hasn't been purged can still be retrieved. second_event = self.get_event(room_id, second_event_id) -- cgit 1.5.1 From a6fc6754f8a680aa31e2be2b10d02e953d4ff368 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 19 Nov 2019 14:07:39 +0000 Subject: Fix 3PID invite exchange --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3994137d18..ab82f83625 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2569,7 +2569,7 @@ class FederationHandler(BaseHandler): event, context = yield self.event_creation_handler.create_new_client_event( builder=builder ) - EventValidator().validate_new(event) + EventValidator().validate_new(event, self.config) return (event, context) @defer.inlineCallbacks -- cgit 1.5.1 From 3916e1b97a1ffc481dfdf66f7da58201a52140a9 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Thu, 21 Nov 2019 12:00:14 +0000 Subject: Clean up newline quote marks around the codebase (#6362) --- changelog.d/6362.misc | 1 + synapse/app/federation_sender.py | 2 +- synapse/appservice/api.py | 2 +- synapse/config/appservice.py | 2 +- synapse/config/room_directory.py | 2 +- synapse/config/server.py | 6 +++--- synapse/federation/persistence.py | 4 ++-- synapse/federation/sender/__init__.py | 2 +- synapse/federation/sender/transaction_manager.py | 4 ++-- synapse/handlers/directory.py | 2 +- synapse/http/servlet.py | 2 +- synapse/push/httppusher.py | 5 ++--- synapse/push/mailer.py | 4 ++-- synapse/rest/media/v1/preview_url_resource.py | 2 +- synapse/server_notices/consent_server_notices.py | 2 +- synapse/storage/_base.py | 2 +- synapse/storage/data_stores/main/deviceinbox.py | 2 +- synapse/storage/data_stores/main/end_to_end_keys.py | 6 +++--- synapse/storage/data_stores/main/events.py | 8 +++----- synapse/storage/data_stores/main/filtering.py | 2 +- synapse/storage/data_stores/main/media_repository.py | 6 +++--- synapse/storage/data_stores/main/registration.py | 4 +--- synapse/storage/data_stores/main/stream.py | 2 +- synapse/storage/data_stores/main/tags.py | 4 +--- synapse/storage/prepare_database.py | 2 +- synapse/streams/config.py | 9 ++++++--- 26 files changed, 43 insertions(+), 46 deletions(-) create mode 100644 changelog.d/6362.misc (limited to 'synapse/handlers') diff --git a/changelog.d/6362.misc b/changelog.d/6362.misc new file mode 100644 index 0000000000..b79a5bea99 --- /dev/null +++ b/changelog.d/6362.misc @@ -0,0 +1 @@ +Clean up some unnecessary quotation marks around the codebase. \ No newline at end of file diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 139221ad34..448e45e00f 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -69,7 +69,7 @@ class FederationSenderSlaveStore( self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) def _get_federation_out_pos(self, db_conn): - sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?" + sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?" sql = self.database_engine.convert_param_style(sql) txn = db_conn.cursor() diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 3e25bf5747..57174da021 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -185,7 +185,7 @@ class ApplicationServiceApi(SimpleHttpClient): if not _is_valid_3pe_metadata(info): logger.warning( - "query_3pe_protocol to %s did not return a" " valid result", uri + "query_3pe_protocol to %s did not return a valid result", uri ) return None diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index e77d3387ff..ca43e96bd1 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -134,7 +134,7 @@ def _load_appservice(hostname, as_info, config_filename): for regex_obj in as_info["namespaces"][ns]: if not isinstance(regex_obj, dict): raise ValueError( - "Expected namespace entry in %s to be an object," " but got %s", + "Expected namespace entry in %s to be an object, but got %s", ns, regex_obj, ) diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py index 7c9f05bde4..7ac7699676 100644 --- a/synapse/config/room_directory.py +++ b/synapse/config/room_directory.py @@ -170,7 +170,7 @@ class _RoomDirectoryRule(object): self.action = action else: raise ConfigError( - "%s rules can only have action of 'allow'" " or 'deny'" % (option_name,) + "%s rules can only have action of 'allow' or 'deny'" % (option_name,) ) self._alias_matches_all = alias == "*" diff --git a/synapse/config/server.py b/synapse/config/server.py index 00d01c43af..11336d7549 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -223,7 +223,7 @@ class ServerConfig(Config): self.federation_ip_range_blacklist.update(["0.0.0.0", "::"]) except Exception as e: raise ConfigError( - "Invalid range(s) provided in " "federation_ip_range_blacklist: %s" % e + "Invalid range(s) provided in federation_ip_range_blacklist: %s" % e ) if self.public_baseurl is not None: @@ -787,14 +787,14 @@ class ServerConfig(Config): "--print-pidfile", action="store_true", default=None, - help="Print the path to the pidfile just" " before daemonizing", + help="Print the path to the pidfile just before daemonizing", ) server_group.add_argument( "--manhole", metavar="PORT", dest="manhole", type=int, - help="Turn on the twisted telnet manhole" " service on the given port.", + help="Turn on the twisted telnet manhole service on the given port.", ) diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 44edcabed4..d68b4bd670 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -44,7 +44,7 @@ class TransactionActions(object): response code and response body. """ if not transaction.transaction_id: - raise RuntimeError("Cannot persist a transaction with no " "transaction_id") + raise RuntimeError("Cannot persist a transaction with no transaction_id") return self.store.get_received_txn_response(transaction.transaction_id, origin) @@ -56,7 +56,7 @@ class TransactionActions(object): Deferred """ if not transaction.transaction_id: - raise RuntimeError("Cannot persist a transaction with no " "transaction_id") + raise RuntimeError("Cannot persist a transaction with no transaction_id") return self.store.set_received_txn_response( transaction.transaction_id, origin, code, response diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 2b2ee8612a..4ebb0e8bc0 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -49,7 +49,7 @@ sent_pdus_destination_dist_count = Counter( sent_pdus_destination_dist_total = Counter( "synapse_federation_client_sent_pdu_destinations:total", - "" "Total number of PDUs queued for sending across all destinations", + "Total number of PDUs queued for sending across all destinations", ) diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 67b3e1ab6e..5fed626d5b 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -84,7 +84,7 @@ class TransactionManager(object): txn_id = str(self._next_txn_id) logger.debug( - "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)", + "TX [%s] {%s} Attempting new transaction (pdus: %d, edus: %d)", destination, txn_id, len(pdus), @@ -103,7 +103,7 @@ class TransactionManager(object): self._next_txn_id += 1 logger.info( - "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)", + "TX [%s] {%s} Sending transaction [%s], (PDUs: %d, EDUs: %d)", destination, txn_id, transaction.transaction_id, diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 69051101a6..a07d2f1a17 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -119,7 +119,7 @@ class DirectoryHandler(BaseHandler): if not service.is_interested_in_alias(room_alias.to_string()): raise SynapseError( 400, - "This application service has not reserved" " this kind of alias.", + "This application service has not reserved this kind of alias.", errcode=Codes.EXCLUSIVE, ) else: diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index e9a5e46ced..13fcb408a6 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -96,7 +96,7 @@ def parse_boolean_from_args(args, name, default=None, required=False): return {b"true": True, b"false": False}[args[name][0]] except Exception: message = ( - "Boolean query parameter %r must be one of" " ['true', 'false']" + "Boolean query parameter %r must be one of ['true', 'false']" ) % (name,) raise SynapseError(400, message) else: diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index e994037be6..d0879b0490 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -246,7 +246,7 @@ class HttpPusher(object): # fixed, we don't suddenly deliver a load # of old notifications. logger.warning( - "Giving up on a notification to user %s, " "pushkey %s", + "Giving up on a notification to user %s, pushkey %s", self.user_id, self.pushkey, ) @@ -299,8 +299,7 @@ class HttpPusher(object): # for sanity, we only remove the pushkey if it # was the one we actually sent... logger.warning( - ("Ignoring rejected pushkey %s because we" " didn't send it"), - pk, + ("Ignoring rejected pushkey %s because we didn't send it"), pk, ) else: logger.info("Pushkey %s was rejected: removing", pk) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 1d15a06a58..b13b646bfd 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -43,7 +43,7 @@ logger = logging.getLogger(__name__) MESSAGE_FROM_PERSON_IN_ROOM = ( - "You have a message on %(app)s from %(person)s " "in the %(room)s room..." + "You have a message on %(app)s from %(person)s in the %(room)s room..." ) MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..." MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..." @@ -55,7 +55,7 @@ MESSAGES_FROM_PERSON_AND_OTHERS = ( "You have messages on %(app)s from %(person)s and others..." ) INVITE_FROM_PERSON_TO_ROOM = ( - "%(person)s has invited you to join the " "%(room)s room on %(app)s..." + "%(person)s has invited you to join the %(room)s room on %(app)s..." ) INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..." diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 15c15a12f5..a23d6f5c75 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -122,7 +122,7 @@ class PreviewUrlResource(DirectServeResource): pattern = entry[attrib] value = getattr(url_tuple, attrib) logger.debug( - "Matching attrib '%s' with value '%s' against" " pattern '%s'", + "Matching attrib '%s' with value '%s' against pattern '%s'", attrib, value, pattern, diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py index 415e9c17d8..5736c56032 100644 --- a/synapse/server_notices/consent_server_notices.py +++ b/synapse/server_notices/consent_server_notices.py @@ -54,7 +54,7 @@ class ConsentServerNotices(object): ) if "body" not in self._server_notice_content: raise ConfigError( - "user_consent server_notice_consent must contain a 'body' " "key." + "user_consent server_notice_consent must contain a 'body' key." ) self._consent_uri_builder = ConsentURIBuilder(hs.config) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index ab596fa68d..6b8a9cd89a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -851,7 +851,7 @@ class SQLBaseStore(object): allvalues.update(values) latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values) - sql = ("INSERT INTO %s (%s) VALUES (%s) " "ON CONFLICT (%s) DO %s") % ( + sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % ( table, ", ".join(k for k in allvalues), ", ".join("?" for _ in allvalues), diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py index 96cd0fb77a..a23744f11c 100644 --- a/synapse/storage/data_stores/main/deviceinbox.py +++ b/synapse/storage/data_stores/main/deviceinbox.py @@ -380,7 +380,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) devices = list(messages_by_device.keys()) if len(devices) == 1 and devices[0] == "*": # Handle wildcard device_ids. - sql = "SELECT device_id FROM devices" " WHERE user_id = ?" + sql = "SELECT device_id FROM devices WHERE user_id = ?" txn.execute(sql, (user_id,)) message_json = json.dumps(messages_by_device["*"]) for row in txn: diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py index 073412a78d..d8ad59ad93 100644 --- a/synapse/storage/data_stores/main/end_to_end_keys.py +++ b/synapse/storage/data_stores/main/end_to_end_keys.py @@ -138,9 +138,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore): result.setdefault(user_id, {})[device_id] = None # get signatures on the device - signature_sql = ( - "SELECT * " " FROM e2e_cross_signing_signatures " " WHERE %s" - ) % (" OR ".join("(" + q + ")" for q in signature_query_clauses)) + signature_sql = ("SELECT * FROM e2e_cross_signing_signatures WHERE %s") % ( + " OR ".join("(" + q + ")" for q in signature_query_clauses) + ) txn.execute(signature_sql, signature_query_params) rows = self.cursor_to_dict(txn) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 878f7568a6..627c0b67f1 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -713,9 +713,7 @@ class EventsStore( metadata_json = encode_json(event.internal_metadata.get_dict()) - sql = ( - "UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?" - ) + sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?" txn.execute(sql, (metadata_json, event.event_id)) # Add an entry to the ex_outlier_stream table to replicate the @@ -732,7 +730,7 @@ class EventsStore( }, ) - sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?" + sql = "UPDATE events SET outlier = ? WHERE event_id = ?" txn.execute(sql, (False, event.event_id)) # Update the event_backward_extremities table now that this @@ -1479,7 +1477,7 @@ class EventsStore( # We do joins against events_to_purge for e.g. calculating state # groups to purge, etc., so lets make an index. - txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)") + txn.execute("CREATE INDEX events_to_purge_id ON events_to_purge(event_id)") txn.execute("SELECT event_id, should_delete FROM events_to_purge") event_rows = txn.fetchall() diff --git a/synapse/storage/data_stores/main/filtering.py b/synapse/storage/data_stores/main/filtering.py index a2a2a67927..f05ace299a 100644 --- a/synapse/storage/data_stores/main/filtering.py +++ b/synapse/storage/data_stores/main/filtering.py @@ -55,7 +55,7 @@ class FilteringStore(SQLBaseStore): if filter_id_response is not None: return filter_id_response[0] - sql = "SELECT MAX(filter_id) FROM user_filters " "WHERE user_id = ?" + sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?" txn.execute(sql, (user_localpart,)) max_id = txn.fetchone()[0] if max_id is None: diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py index 84b5f3ad5e..0f2887bdce 100644 --- a/synapse/storage/data_stores/main/media_repository.py +++ b/synapse/storage/data_stores/main/media_repository.py @@ -337,7 +337,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): if len(media_ids) == 0: return - sql = "DELETE FROM local_media_repository_url_cache" " WHERE media_id = ?" + sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?" def _delete_url_cache_txn(txn): txn.executemany(sql, [(media_id,) for media_id in media_ids]) @@ -365,11 +365,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): return def _delete_url_cache_media_txn(txn): - sql = "DELETE FROM local_media_repository" " WHERE media_id = ?" + sql = "DELETE FROM local_media_repository WHERE media_id = ?" txn.executemany(sql, [(media_id,) for media_id in media_ids]) - sql = "DELETE FROM local_media_repository_thumbnails" " WHERE media_id = ?" + sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?" txn.executemany(sql, [(media_id,) for media_id in media_ids]) diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py index ee1b2b2bbf..6a594c160c 100644 --- a/synapse/storage/data_stores/main/registration.py +++ b/synapse/storage/data_stores/main/registration.py @@ -377,9 +377,7 @@ class RegistrationWorkerStore(SQLBaseStore): """ def f(txn): - sql = ( - "SELECT name, password_hash FROM users" " WHERE lower(name) = lower(?)" - ) + sql = "SELECT name, password_hash FROM users WHERE lower(name) = lower(?)" txn.execute(sql, (user_id,)) return dict(txn) diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 8780fdd989..9ae4a913a1 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -616,7 +616,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def _get_max_topological_txn(self, txn, room_id): txn.execute( - "SELECT MAX(topological_ordering) FROM events" " WHERE room_id = ?", + "SELECT MAX(topological_ordering) FROM events WHERE room_id = ?", (room_id,), ) diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py index 10d1887f75..aa24339717 100644 --- a/synapse/storage/data_stores/main/tags.py +++ b/synapse/storage/data_stores/main/tags.py @@ -83,9 +83,7 @@ class TagsWorkerStore(AccountDataWorkerStore): ) def get_tag_content(txn, tag_ids): - sql = ( - "SELECT tag, content" " FROM room_tags" " WHERE user_id=? AND room_id=?" - ) + sql = "SELECT tag, content FROM room_tags WHERE user_id=? AND room_id=?" results = [] for stream_id, user_id, room_id in tag_ids: txn.execute(sql, (user_id, room_id)) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 2e7753820e..731e1c9d9c 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -447,7 +447,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams) # Mark as done. cur.execute( database_engine.convert_param_style( - "INSERT INTO applied_module_schemas (module_name, file)" " VALUES (?,?)" + "INSERT INTO applied_module_schemas (module_name, file) VALUES (?,?)" ), (modname, name), ) diff --git a/synapse/streams/config.py b/synapse/streams/config.py index 02994ab2a5..cd56cd91ed 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -88,9 +88,12 @@ class PaginationConfig(object): raise SynapseError(400, "Invalid request.") def __repr__(self): - return ( - "PaginationConfig(from_tok=%r, to_tok=%r," " direction=%r, limit=%r)" - ) % (self.from_token, self.to_token, self.direction, self.limit) + return ("PaginationConfig(from_tok=%r, to_tok=%r, direction=%r, limit=%r)") % ( + self.from_token, + self.to_token, + self.direction, + self.limit, + ) def get_source_config(self, source_name): keyname = "%s_key" % source_name -- cgit 1.5.1 From 4d394d6415e3e4b64a577a1c83ee8acf147ce0af Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 26 Nov 2019 12:32:37 +0000 Subject: remove confusing fixme --- synapse/handlers/federation.py | 6 ------ 1 file changed, 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 66852153c4..97d045db10 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2187,12 +2187,6 @@ class FederationHandler(BaseHandler): logger.info("Skipping auth_event fetch for outlier") return context - # FIXME: Assumes we have and stored all the state for all the - # prev_events - # - # FIXME: what does the fixme above mean? where do prev_events come into - # it, why do we care about the state for those events, and what does "have and - # stored" mean? Seems erik wrote it in c1d860870b different_auth = event_auth_events.difference( e.event_id for e in auth_events.values() ) -- cgit 1.5.1 From 0d27aba900136514a8801b902f9a8ac69150e2c0 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Wed, 27 Nov 2019 16:14:44 -0500 Subject: add etag and count to key backup endpoints (#5858) --- changelog.d/5858.feature | 1 + synapse/handlers/e2e_room_keys.py | 130 +++++++----- synapse/rest/client/v2_alpha/room_keys.py | 8 +- synapse/storage/data_stores/main/e2e_room_keys.py | 226 +++++++++++++++------ .../main/schema/delta/56/room_key_etag.sql | 17 ++ tests/handlers/test_e2e_room_keys.py | 31 +++ tests/storage/test_e2e_room_keys.py | 8 +- 7 files changed, 297 insertions(+), 124 deletions(-) create mode 100644 changelog.d/5858.feature create mode 100644 synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql (limited to 'synapse/handlers') diff --git a/changelog.d/5858.feature b/changelog.d/5858.feature new file mode 100644 index 0000000000..55ee93051e --- /dev/null +++ b/changelog.d/5858.feature @@ -0,0 +1 @@ +Add etag and count fields to key backup endpoints to help clients guess if there are new keys. diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 0cea445f0d..f1b4424a02 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2017, 2018 New Vector Ltd +# Copyright 2019 Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -103,14 +104,35 @@ class E2eRoomKeysHandler(object): rooms session_id(string): session ID to delete keys for, for None to delete keys for all sessions + Raises: + NotFoundError: if the backup version does not exist Returns: - A deferred of the deletion transaction + A dict containing the count and etag for the backup version """ # lock for consistency with uploading with (yield self._upload_linearizer.queue(user_id)): + # make sure the backup version exists + try: + version_info = yield self.store.get_e2e_room_keys_version_info( + user_id, version + ) + except StoreError as e: + if e.code == 404: + raise NotFoundError("Unknown backup version") + else: + raise + yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id) + version_etag = version_info["etag"] + 1 + yield self.store.update_e2e_room_keys_version( + user_id, version, None, version_etag + ) + + count = yield self.store.count_e2e_room_keys(user_id, version) + return {"etag": str(version_etag), "count": count} + @trace @defer.inlineCallbacks def upload_room_keys(self, user_id, version, room_keys): @@ -138,6 +160,9 @@ class E2eRoomKeysHandler(object): } } + Returns: + A dict containing the count and etag for the backup version + Raises: NotFoundError: if there are no versions defined RoomKeysVersionError: if the uploaded version is not the current version @@ -171,59 +196,62 @@ class E2eRoomKeysHandler(object): else: raise - # go through the room_keys. - # XXX: this should/could be done concurrently, given we're in a lock. + # Fetch any existing room keys for the sessions that have been + # submitted. Then compare them with the submitted keys. If the + # key is new, insert it; if the key should be updated, then update + # it; otherwise, drop it. + existing_keys = yield self.store.get_e2e_room_keys_multi( + user_id, version, room_keys["rooms"] + ) + to_insert = [] # batch the inserts together + changed = False # if anything has changed, we need to update the etag for room_id, room in iteritems(room_keys["rooms"]): - for session_id, session in iteritems(room["sessions"]): - yield self._upload_room_key( - user_id, version, room_id, session_id, session + for session_id, room_key in iteritems(room["sessions"]): + log_kv( + { + "message": "Trying to upload room key", + "room_id": room_id, + "session_id": session_id, + "user_id": user_id, + } ) - - @defer.inlineCallbacks - def _upload_room_key(self, user_id, version, room_id, session_id, room_key): - """Upload a given room_key for a given room and session into a given - version of the backup. Merges the key with any which might already exist. - - Args: - user_id(str): the user whose backup we're setting - version(str): the version ID of the backup we're updating - room_id(str): the ID of the room whose keys we're setting - session_id(str): the session whose room_key we're setting - room_key(dict): the room_key being set - """ - log_kv( - { - "message": "Trying to upload room key", - "room_id": room_id, - "session_id": session_id, - "user_id": user_id, - } - ) - # get the room_key for this particular row - current_room_key = None - try: - current_room_key = yield self.store.get_e2e_room_key( - user_id, version, room_id, session_id - ) - except StoreError as e: - if e.code == 404: - log_kv( - { - "message": "Room key not found.", - "room_id": room_id, - "user_id": user_id, - } + current_room_key = existing_keys.get(room_id, {}).get(session_id) + if current_room_key: + if self._should_replace_room_key(current_room_key, room_key): + log_kv({"message": "Replacing room key."}) + # updates are done one at a time in the DB, so send + # updates right away rather than batching them up, + # like we do with the inserts + yield self.store.update_e2e_room_key( + user_id, version, room_id, session_id, room_key + ) + changed = True + else: + log_kv({"message": "Not replacing room_key."}) + else: + log_kv( + { + "message": "Room key not found.", + "room_id": room_id, + "user_id": user_id, + } + ) + log_kv({"message": "Replacing room key."}) + to_insert.append((room_id, session_id, room_key)) + changed = True + + if len(to_insert): + yield self.store.add_e2e_room_keys(user_id, version, to_insert) + + version_etag = version_info["etag"] + if changed: + version_etag = version_etag + 1 + yield self.store.update_e2e_room_keys_version( + user_id, version, None, version_etag ) - else: - raise - if self._should_replace_room_key(current_room_key, room_key): - log_kv({"message": "Replacing room key."}) - yield self.store.set_e2e_room_key( - user_id, version, room_id, session_id, room_key - ) - else: - log_kv({"message": "Not replacing room_key."}) + count = yield self.store.count_e2e_room_keys(user_id, version) + return {"etag": str(version_etag), "count": count} @staticmethod def _should_replace_room_key(current_room_key, room_key): @@ -314,6 +342,8 @@ class E2eRoomKeysHandler(object): raise NotFoundError("Unknown backup version") else: raise + + res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"]) return res @trace diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py index d596786430..d83ac8e3c5 100644 --- a/synapse/rest/client/v2_alpha/room_keys.py +++ b/synapse/rest/client/v2_alpha/room_keys.py @@ -134,8 +134,8 @@ class RoomKeysServlet(RestServlet): if room_id: body = {"rooms": {room_id: body}} - yield self.e2e_room_keys_handler.upload_room_keys(user_id, version, body) - return 200, {} + ret = yield self.e2e_room_keys_handler.upload_room_keys(user_id, version, body) + return 200, ret @defer.inlineCallbacks def on_GET(self, request, room_id, session_id): @@ -239,10 +239,10 @@ class RoomKeysServlet(RestServlet): user_id = requester.user.to_string() version = parse_string(request, "version") - yield self.e2e_room_keys_handler.delete_room_keys( + ret = yield self.e2e_room_keys_handler.delete_room_keys( user_id, version, room_id, session_id ) - return 200, {} + return 200, ret class RoomKeysNewVersionServlet(RestServlet): diff --git a/synapse/storage/data_stores/main/e2e_room_keys.py b/synapse/storage/data_stores/main/e2e_room_keys.py index 1cbbae5b63..113224fd7c 100644 --- a/synapse/storage/data_stores/main/e2e_room_keys.py +++ b/synapse/storage/data_stores/main/e2e_room_keys.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2017 New Vector Ltd +# Copyright 2019 Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,49 +25,8 @@ from synapse.storage._base import SQLBaseStore class EndToEndRoomKeyStore(SQLBaseStore): @defer.inlineCallbacks - def get_e2e_room_key(self, user_id, version, room_id, session_id): - """Get the encrypted E2E room key for a given session from a given - backup version of room_keys. We only store the 'best' room key for a given - session at a given time, as determined by the handler. - - Args: - user_id(str): the user whose backup we're querying - version(str): the version ID of the backup for the set of keys we're querying - room_id(str): the ID of the room whose keys we're querying. - This is a bit redundant as it's implied by the session_id, but - we include for consistency with the rest of the API. - session_id(str): the session whose room_key we're querying. - - Returns: - A deferred dict giving the session_data and message metadata for - this room key. - """ - - row = yield self._simple_select_one( - table="e2e_room_keys", - keyvalues={ - "user_id": user_id, - "version": version, - "room_id": room_id, - "session_id": session_id, - }, - retcols=( - "first_message_index", - "forwarded_count", - "is_verified", - "session_data", - ), - desc="get_e2e_room_key", - ) - - row["session_data"] = json.loads(row["session_data"]) - - return row - - @defer.inlineCallbacks - def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key): - """Replaces or inserts the encrypted E2E room key for a given session in - a given backup + def update_e2e_room_key(self, user_id, version, room_id, session_id, room_key): + """Replaces the encrypted E2E room key for a given session in a given backup Args: user_id(str): the user whose backup we're setting @@ -78,7 +38,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): StoreError """ - yield self._simple_upsert( + yield self._simple_update_one( table="e2e_room_keys", keyvalues={ "user_id": user_id, @@ -86,21 +46,51 @@ class EndToEndRoomKeyStore(SQLBaseStore): "room_id": room_id, "session_id": session_id, }, - values={ + updatevalues={ "first_message_index": room_key["first_message_index"], "forwarded_count": room_key["forwarded_count"], "is_verified": room_key["is_verified"], "session_data": json.dumps(room_key["session_data"]), }, - lock=False, + desc="update_e2e_room_key", ) - log_kv( - { - "message": "Set room key", - "room_id": room_id, - "session_id": session_id, - "room_key": room_key, - } + + @defer.inlineCallbacks + def add_e2e_room_keys(self, user_id, version, room_keys): + """Bulk add room keys to a given backup. + + Args: + user_id (str): the user whose backup we're adding to + version (str): the version ID of the backup for the set of keys we're adding to + room_keys (iterable[(str, str, dict)]): the keys to add, in the form + (roomID, sessionID, keyData) + """ + + values = [] + for (room_id, session_id, room_key) in room_keys: + values.append( + { + "user_id": user_id, + "version": version, + "room_id": room_id, + "session_id": session_id, + "first_message_index": room_key["first_message_index"], + "forwarded_count": room_key["forwarded_count"], + "is_verified": room_key["is_verified"], + "session_data": json.dumps(room_key["session_data"]), + } + ) + log_kv( + { + "message": "Set room key", + "room_id": room_id, + "session_id": session_id, + "room_key": room_key, + } + ) + + yield self._simple_insert_many( + table="e2e_room_keys", values=values, desc="add_e2e_room_keys" ) @trace @@ -110,11 +100,11 @@ class EndToEndRoomKeyStore(SQLBaseStore): room, or a given session. Args: - user_id(str): the user whose backup we're querying - version(str): the version ID of the backup for the set of keys we're querying - room_id(str): Optional. the ID of the room whose keys we're querying, if any. + user_id (str): the user whose backup we're querying + version (str): the version ID of the backup for the set of keys we're querying + room_id (str): Optional. the ID of the room whose keys we're querying, if any. If not specified, we return the keys for all the rooms in the backup. - session_id(str): Optional. the session whose room_key we're querying, if any. + session_id (str): Optional. the session whose room_key we're querying, if any. If specified, we also require the room_id to be specified. If not specified, we return all the keys in this version of the backup (or for the specified room) @@ -162,6 +152,95 @@ class EndToEndRoomKeyStore(SQLBaseStore): return sessions + def get_e2e_room_keys_multi(self, user_id, version, room_keys): + """Get multiple room keys at a time. The difference between this function and + get_e2e_room_keys is that this function can be used to retrieve + multiple specific keys at a time, whereas get_e2e_room_keys is used for + getting all the keys in a backup version, all the keys for a room, or a + specific key. + + Args: + user_id (str): the user whose backup we're querying + version (str): the version ID of the backup we're querying about + room_keys (dict[str, dict[str, iterable[str]]]): a map from + room ID -> {"session": [session ids]} indicating the session IDs + that we want to query + + Returns: + Deferred[dict[str, dict[str, dict]]]: a map of room IDs to session IDs to room key + """ + + return self.runInteraction( + "get_e2e_room_keys_multi", + self._get_e2e_room_keys_multi_txn, + user_id, + version, + room_keys, + ) + + @staticmethod + def _get_e2e_room_keys_multi_txn(txn, user_id, version, room_keys): + if not room_keys: + return {} + + where_clauses = [] + params = [user_id, version] + for room_id, room in room_keys.items(): + sessions = list(room["sessions"]) + if not sessions: + continue + params.append(room_id) + params.extend(sessions) + where_clauses.append( + "(room_id = ? AND session_id IN (%s))" + % (",".join(["?" for _ in sessions]),) + ) + + # check if we're actually querying something + if not where_clauses: + return {} + + sql = """ + SELECT room_id, session_id, first_message_index, forwarded_count, + is_verified, session_data + FROM e2e_room_keys + WHERE user_id = ? AND version = ? AND (%s) + """ % ( + " OR ".join(where_clauses) + ) + + txn.execute(sql, params) + + ret = {} + + for row in txn: + room_id = row[0] + session_id = row[1] + ret.setdefault(room_id, {}) + ret[room_id][session_id] = { + "first_message_index": row[2], + "forwarded_count": row[3], + "is_verified": row[4], + "session_data": json.loads(row[5]), + } + + return ret + + def count_e2e_room_keys(self, user_id, version): + """Get the number of keys in a backup version. + + Args: + user_id (str): the user whose backup we're querying + version (str): the version ID of the backup we're querying about + """ + + return self._simple_select_one_onecol( + table="e2e_room_keys", + keyvalues={"user_id": user_id, "version": version}, + retcol="COUNT(*)", + desc="count_e2e_room_keys", + ) + @trace @defer.inlineCallbacks def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): @@ -219,6 +298,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): version(str) algorithm(str) auth_data(object): opaque dict supplied by the client + etag(int): tag of the keys in the backup """ def _get_e2e_room_keys_version_info_txn(txn): @@ -236,10 +316,12 @@ class EndToEndRoomKeyStore(SQLBaseStore): txn, table="e2e_room_keys_versions", keyvalues={"user_id": user_id, "version": this_version, "deleted": 0}, - retcols=("version", "algorithm", "auth_data"), + retcols=("version", "algorithm", "auth_data", "etag"), ) result["auth_data"] = json.loads(result["auth_data"]) result["version"] = str(result["version"]) + if result["etag"] is None: + result["etag"] = 0 return result return self.runInteraction( @@ -288,21 +370,33 @@ class EndToEndRoomKeyStore(SQLBaseStore): ) @trace - def update_e2e_room_keys_version(self, user_id, version, info): + def update_e2e_room_keys_version( + self, user_id, version, info=None, version_etag=None + ): """Update a given backup version Args: user_id(str): the user whose backup version we're updating version(str): the version ID of the backup version we're updating - info(dict): the new backup version info to store + info (dict): the new backup version info to store. If None, then + the backup version info is not updated + version_etag (Optional[int]): etag of the keys in the backup. If + None, then the etag is not updated """ + updatevalues = {} - return self._simple_update( - table="e2e_room_keys_versions", - keyvalues={"user_id": user_id, "version": version}, - updatevalues={"auth_data": json.dumps(info["auth_data"])}, - desc="update_e2e_room_keys_version", - ) + if info is not None and "auth_data" in info: + updatevalues["auth_data"] = json.dumps(info["auth_data"]) + if version_etag is not None: + updatevalues["etag"] = version_etag + + if updatevalues: + return self._simple_update( + table="e2e_room_keys_versions", + keyvalues={"user_id": user_id, "version": version}, + updatevalues=updatevalues, + desc="update_e2e_room_keys_version", + ) @trace def delete_e2e_room_keys_version(self, user_id, version=None): diff --git a/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql b/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql new file mode 100644 index 0000000000..7d70dd071e --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql @@ -0,0 +1,17 @@ +/* Copyright 2019 Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- store the current etag of backup version +ALTER TABLE e2e_room_keys_versions ADD COLUMN etag BIGINT; diff --git a/tests/handlers/test_e2e_room_keys.py b/tests/handlers/test_e2e_room_keys.py index 0bb96674a2..70f172eb02 100644 --- a/tests/handlers/test_e2e_room_keys.py +++ b/tests/handlers/test_e2e_room_keys.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # Copyright 2017 New Vector Ltd +# Copyright 2019 Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -94,23 +95,29 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase): # check we can retrieve it as the current version res = yield self.handler.get_version_info(self.local_user) + version_etag = res["etag"] + del res["etag"] self.assertDictEqual( res, { "version": "1", "algorithm": "m.megolm_backup.v1", "auth_data": "first_version_auth_data", + "count": 0, }, ) # check we can retrieve it as a specific version res = yield self.handler.get_version_info(self.local_user, "1") + self.assertEqual(res["etag"], version_etag) + del res["etag"] self.assertDictEqual( res, { "version": "1", "algorithm": "m.megolm_backup.v1", "auth_data": "first_version_auth_data", + "count": 0, }, ) @@ -126,12 +133,14 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase): # check we can retrieve it as the current version res = yield self.handler.get_version_info(self.local_user) + del res["etag"] self.assertDictEqual( res, { "version": "2", "algorithm": "m.megolm_backup.v1", "auth_data": "second_version_auth_data", + "count": 0, }, ) @@ -158,12 +167,14 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase): # check we can retrieve it as the current version res = yield self.handler.get_version_info(self.local_user) + del res["etag"] self.assertDictEqual( res, { "algorithm": "m.megolm_backup.v1", "auth_data": "revised_first_version_auth_data", "version": version, + "count": 0, }, ) @@ -207,12 +218,14 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase): # check we can retrieve it as the current version res = yield self.handler.get_version_info(self.local_user) + del res["etag"] # etag is opaque, so don't test its contents self.assertDictEqual( res, { "algorithm": "m.megolm_backup.v1", "auth_data": "revised_first_version_auth_data", "version": version, + "count": 0, }, ) @@ -409,6 +422,11 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase): yield self.handler.upload_room_keys(self.local_user, version, room_keys) + # get the etag to compare to future versions + res = yield self.handler.get_version_info(self.local_user) + backup_etag = res["etag"] + self.assertEqual(res["count"], 1) + new_room_keys = copy.deepcopy(room_keys) new_room_key = new_room_keys["rooms"]["!abc:matrix.org"]["sessions"]["c0ff33"] @@ -423,6 +441,10 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase): "SSBBTSBBIEZJU0gK", ) + # the etag should be the same since the session did not change + res = yield self.handler.get_version_info(self.local_user) + self.assertEqual(res["etag"], backup_etag) + # test that marking the session as verified however /does/ replace it new_room_key["is_verified"] = True yield self.handler.upload_room_keys(self.local_user, version, new_room_keys) @@ -432,6 +454,11 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase): res["rooms"]["!abc:matrix.org"]["sessions"]["c0ff33"]["session_data"], "new" ) + # the etag should NOT be equal now, since the key changed + res = yield self.handler.get_version_info(self.local_user) + self.assertNotEqual(res["etag"], backup_etag) + backup_etag = res["etag"] + # test that a session with a higher forwarded_count doesn't replace one # with a lower forwarding count new_room_key["forwarded_count"] = 2 @@ -443,6 +470,10 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase): res["rooms"]["!abc:matrix.org"]["sessions"]["c0ff33"]["session_data"], "new" ) + # the etag should be the same since the session did not change + res = yield self.handler.get_version_info(self.local_user) + self.assertEqual(res["etag"], backup_etag) + # TODO: check edge cases as well as the common variations here @defer.inlineCallbacks diff --git a/tests/storage/test_e2e_room_keys.py b/tests/storage/test_e2e_room_keys.py index d128fde441..35dafbb904 100644 --- a/tests/storage/test_e2e_room_keys.py +++ b/tests/storage/test_e2e_room_keys.py @@ -39,8 +39,8 @@ class E2eRoomKeysHandlerTestCase(unittest.HomeserverTestCase): ) self.get_success( - self.store.set_e2e_room_key( - "user_id", version1, "room", "session", room_key + self.store.add_e2e_room_keys( + "user_id", version1, [("room", "session", room_key)] ) ) @@ -51,8 +51,8 @@ class E2eRoomKeysHandlerTestCase(unittest.HomeserverTestCase): ) self.get_success( - self.store.set_e2e_room_key( - "user_id", version2, "room", "session", room_key + self.store.add_e2e_room_keys( + "user_id", version2, [("room", "session", room_key)] ) ) -- cgit 1.5.1 From a9c44d4008deb29503e2de00e5aae1a56a72d630 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Thu, 28 Nov 2019 10:40:42 +0000 Subject: Remove local threepids on account deactivation (#6426) --- changelog.d/6426.bugfix | 1 + synapse/handlers/deactivate_account.py | 3 +++ synapse/storage/data_stores/main/registration.py | 13 +++++++++++++ 3 files changed, 17 insertions(+) create mode 100644 changelog.d/6426.bugfix (limited to 'synapse/handlers') diff --git a/changelog.d/6426.bugfix b/changelog.d/6426.bugfix new file mode 100644 index 0000000000..3acfde4211 --- /dev/null +++ b/changelog.d/6426.bugfix @@ -0,0 +1 @@ +Clean up local threepids from user on account deactivation. \ No newline at end of file diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 63267a0a4c..6dedaaff8d 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -95,6 +95,9 @@ class DeactivateAccountHandler(BaseHandler): user_id, threepid["medium"], threepid["address"] ) + # Remove all 3PIDs this user has bound to the homeserver + yield self.store.user_delete_threepids(user_id) + # delete any devices belonging to the user, which will also # delete corresponding access tokens. yield self._device_handler.delete_all_devices_for_user(user_id) diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py index 0a3c1f0510..98cf6427c3 100644 --- a/synapse/storage/data_stores/main/registration.py +++ b/synapse/storage/data_stores/main/registration.py @@ -569,6 +569,19 @@ class RegistrationWorkerStore(SQLBaseStore): return self._simple_delete( "user_threepids", keyvalues={"user_id": user_id, "medium": medium, "address": address}, + desc="user_delete_threepid", + ) + + def user_delete_threepids(self, user_id: str): + """Delete all threepid this user has bound + + Args: + user_id: The user id to delete all threepids of + + """ + return self._simple_delete( + "user_threepids", + keyvalues={"user_id": user_id}, desc="user_delete_threepids", ) -- cgit 1.5.1 From 2173785f0d9124037ca841b568349ad0424b39cd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Nov 2019 11:31:56 +0000 Subject: Propagate reason in remotely rejected invites --- synapse/handlers/federation.py | 4 ++-- synapse/handlers/room_member.py | 13 +++++++++---- synapse/handlers/room_member_worker.py | 5 ++++- synapse/replication/http/membership.py | 7 +++++-- 4 files changed, 20 insertions(+), 9 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a5ae7b77d1..d3267734f7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1428,9 +1428,9 @@ class FederationHandler(BaseHandler): return event @defer.inlineCallbacks - def do_remotely_reject_invite(self, target_hosts, room_id, user_id): + def do_remotely_reject_invite(self, target_hosts, room_id, user_id, content): origin, event, event_format_version = yield self._make_and_verify_event( - target_hosts, room_id, user_id, "leave" + target_hosts, room_id, user_id, "leave", content=content, ) # Mark as outlier as we don't have any state for this event; we're not # even in the room. diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 6cfee4b361..7b7270fc61 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -94,7 +94,9 @@ class RoomMemberHandler(object): raise NotImplementedError() @abc.abstractmethod - def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): + def _remote_reject_invite( + self, requester, remote_room_hosts, room_id, target, content + ): """Attempt to reject an invite for a room this server is not in. If we fail to do so we locally mark the invite as rejected. @@ -104,6 +106,7 @@ class RoomMemberHandler(object): reject invite room_id (str) target (UserID): The user rejecting the invite + content (dict): The content for the rejection event Returns: Deferred[dict]: A dictionary to be returned to the client, may @@ -471,7 +474,7 @@ class RoomMemberHandler(object): # send the rejection to the inviter's HS. remote_room_hosts = remote_room_hosts + [inviter.domain] res = yield self._remote_reject_invite( - requester, remote_room_hosts, room_id, target + requester, remote_room_hosts, room_id, target, content, ) return res @@ -971,13 +974,15 @@ class RoomMemberMasterHandler(RoomMemberHandler): ) @defer.inlineCallbacks - def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): + def _remote_reject_invite( + self, requester, remote_room_hosts, room_id, target, content + ): """Implements RoomMemberHandler._remote_reject_invite """ fed_handler = self.federation_handler try: ret = yield fed_handler.do_remotely_reject_invite( - remote_room_hosts, room_id, target.to_string() + remote_room_hosts, room_id, target.to_string(), content=content, ) return ret except Exception as e: diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 75e96ae1a2..69be86893b 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -55,7 +55,9 @@ class RoomMemberWorkerHandler(RoomMemberHandler): return ret - def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): + def _remote_reject_invite( + self, requester, remote_room_hosts, room_id, target, content + ): """Implements RoomMemberHandler._remote_reject_invite """ return self._remote_reject_client( @@ -63,6 +65,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): remote_room_hosts=remote_room_hosts, room_id=room_id, user_id=target.to_string(), + content=content, ) def _user_joined_room(self, target, room_id): diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index cc1f249740..3577611fd7 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -93,6 +93,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): { "requester": ..., "remote_room_hosts": [...], + "content": { ... } } """ @@ -107,7 +108,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): self.clock = hs.get_clock() @staticmethod - def _serialize_payload(requester, room_id, user_id, remote_room_hosts): + def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content): """ Args: requester(Requester) @@ -118,12 +119,14 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): return { "requester": requester.serialize(), "remote_room_hosts": remote_room_hosts, + "content": content, } async def _handle_request(self, request, room_id, user_id): content = parse_json_object_from_request(request) remote_room_hosts = content["remote_room_hosts"] + event_content = content["content"] requester = Requester.deserialize(self.store, content["requester"]) @@ -134,7 +137,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): try: event = await self.federation_handler.do_remotely_reject_invite( - remote_room_hosts, room_id, user_id + remote_room_hosts, room_id, user_id, event_content, ) ret = event.get_pdu_json() except Exception as e: -- cgit 1.5.1 From 708cef88cfbf8dd6df44d2da4ab4dbc7eb584f74 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 28 Nov 2019 19:26:13 +0000 Subject: Discard retention policies when retrieving state Purge jobs don't delete the latest event in a room in order to keep the forward extremity and not break the room. On the other hand, get_state_events, when given an at_token argument calls filter_events_for_client to know if the user can see the event that matches that (sync) token. That function uses the retention policies of the events it's given to filter out those that are too old from a client's view. Some clients, such as Riot, when loading a room, request the list of members for the latest sync token it knows about, and get confused to the point of refusing to send any message if the server tells it that it can't get that information. This can happen very easily with the message retention feature turned on and a room with low activity so that the last event sent becomes too old according to the room's retention policy. An easy and clean fix for that issue is to discard the room's retention policies when retrieving state. --- synapse/handlers/message.py | 2 +- synapse/visibility.py | 22 ++++++++++++++-------- 2 files changed, 15 insertions(+), 9 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 155ed6e06a..3b0156f516 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -138,7 +138,7 @@ class MessageHandler(object): raise NotFoundError("Can't find event for token %s" % (at_token,)) visible_events = yield filter_events_for_client( - self.storage, user_id, last_events + self.storage, user_id, last_events, apply_retention_policies=False ) event = last_events[0] diff --git a/synapse/visibility.py b/synapse/visibility.py index 4d4141dacc..7b037eeb0c 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -44,7 +44,8 @@ MEMBERSHIP_PRIORITY = ( @defer.inlineCallbacks def filter_events_for_client( - storage: Storage, user_id, events, is_peeking=False, always_include_ids=frozenset() + storage: Storage, user_id, events, is_peeking=False, always_include_ids=frozenset(), + apply_retention_policies=True, ): """ Check which events a user is allowed to see @@ -59,6 +60,10 @@ def filter_events_for_client( events always_include_ids (set(event_id)): set of event ids to specifically include (unless sender is ignored) + apply_retention_policies (bool): Whether to filter out events that's older than + allowed by the room's retention policy. Useful when this function is called + to e.g. check whether a user should be allowed to see the state at a given + event rather than to know if it should send an event to a user's client(s). Returns: Deferred[list[synapse.events.EventBase]] @@ -86,13 +91,14 @@ def filter_events_for_client( erased_senders = yield storage.main.are_users_erased((e.sender for e in events)) - room_ids = set(e.room_id for e in events) - retention_policies = {} + if apply_retention_policies: + room_ids = set(e.room_id for e in events) + retention_policies = {} - for room_id in room_ids: - retention_policies[room_id] = yield storage.main.get_retention_policy_for_room( - room_id - ) + for room_id in room_ids: + retention_policies[room_id] = ( + yield storage.main.get_retention_policy_for_room(room_id) + ) def allowed(event): """ @@ -113,7 +119,7 @@ def filter_events_for_client( # Don't try to apply the room's retention policy if the event is a state event, as # MSC1763 states that retention is only considered for non-state events. - if not event.is_state(): + if apply_retention_policies and not event.is_state(): retention_policy = retention_policies[event.room_id] max_lifetime = retention_policy.get("max_lifetime") -- cgit 1.5.1 From 72078e4be56d42421e8748e0e45d0fe1204853dd Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Mon, 2 Dec 2019 15:11:32 +0000 Subject: Transfer power level state events on room upgrade (#6237) --- changelog.d/6237.bugfix | 1 + synapse/handlers/room.py | 36 +++++++++++++++++++++++++++++++----- 2 files changed, 32 insertions(+), 5 deletions(-) create mode 100644 changelog.d/6237.bugfix (limited to 'synapse/handlers') diff --git a/changelog.d/6237.bugfix b/changelog.d/6237.bugfix new file mode 100644 index 0000000000..9285600b00 --- /dev/null +++ b/changelog.d/6237.bugfix @@ -0,0 +1 @@ +Transfer non-standard power levels on room upgrade. \ No newline at end of file diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index e92b2eafd5..35a759f2fe 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -198,21 +198,21 @@ class RoomCreationHandler(BaseHandler): # finally, shut down the PLs in the old room, and update them in the new # room. yield self._update_upgraded_room_pls( - requester, old_room_id, new_room_id, old_room_state + requester, old_room_id, new_room_id, old_room_state, ) return new_room_id @defer.inlineCallbacks def _update_upgraded_room_pls( - self, requester, old_room_id, new_room_id, old_room_state + self, requester, old_room_id, new_room_id, old_room_state, ): """Send updated power levels in both rooms after an upgrade Args: requester (synapse.types.Requester): the user requesting the upgrade - old_room_id (unicode): the id of the room to be replaced - new_room_id (unicode): the id of the replacement room + old_room_id (str): the id of the room to be replaced + new_room_id (str): the id of the replacement room old_room_state (dict[tuple[str, str], str]): the state map for the old room Returns: @@ -298,7 +298,7 @@ class RoomCreationHandler(BaseHandler): tombstone_event_id (unicode|str): the ID of the tombstone event in the old room. Returns: - Deferred[None] + Deferred """ user_id = requester.user.to_string() @@ -333,6 +333,7 @@ class RoomCreationHandler(BaseHandler): (EventTypes.Encryption, ""), (EventTypes.ServerACL, ""), (EventTypes.RelatedGroups, ""), + (EventTypes.PowerLevels, ""), ) old_room_state_ids = yield self.store.get_filtered_current_state_ids( @@ -346,6 +347,31 @@ class RoomCreationHandler(BaseHandler): if old_event: initial_state[k] = old_event.content + # Resolve the minimum power level required to send any state event + # We will give the upgrading user this power level temporarily (if necessary) such that + # they are able to copy all of the state events over, then revert them back to their + # original power level afterwards in _update_upgraded_room_pls + + # Copy over user power levels now as this will not be possible with >100PL users once + # the room has been created + + power_levels = initial_state[(EventTypes.PowerLevels, "")] + + # Calculate the minimum power level needed to clone the room + event_power_levels = power_levels.get("events", {}) + state_default = power_levels.get("state_default", 0) + ban = power_levels.get("ban") + needed_power_level = max(state_default, ban, max(event_power_levels.values())) + + # Raise the requester's power level in the new room if necessary + current_power_level = power_levels["users"][requester.user.to_string()] + if current_power_level < needed_power_level: + # Assign this power level to the requester + power_levels["users"][requester.user.to_string()] = needed_power_level + + # Set the power levels to the modified state + initial_state[(EventTypes.PowerLevels, "")] = power_levels + yield self._send_events_for_new_room( requester, new_room_id, -- cgit 1.5.1 From 54dd5dc12b0ac5c48303144c4a73ce3822209488 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 3 Dec 2019 19:19:45 +0000 Subject: Add ephemeral messages support (MSC2228) (#6409) Implement part [MSC2228](https://github.com/matrix-org/matrix-doc/pull/2228). The parts that differ are: * the feature is hidden behind a configuration flag (`enable_ephemeral_messages`) * self-destruction doesn't happen for state events * only implement support for the `m.self_destruct_after` field (not the `m.self_destruct` one) * doesn't send synthetic redactions to clients because for this specific case we consider the clients to be able to destroy an event themselves, instead we just censor it (by pruning its JSON) in the database --- changelog.d/6409.feature | 1 + synapse/api/constants.py | 4 + synapse/config/server.py | 2 + synapse/handlers/federation.py | 8 ++ synapse/handlers/message.py | 123 +++++++++++++++++++- synapse/storage/data_stores/main/events.py | 126 ++++++++++++++++++++- .../main/schema/delta/56/event_expiry.sql | 21 ++++ tests/rest/client/test_ephemeral_message.py | 101 +++++++++++++++++ 8 files changed, 379 insertions(+), 7 deletions(-) create mode 100644 changelog.d/6409.feature create mode 100644 synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql create mode 100644 tests/rest/client/test_ephemeral_message.py (limited to 'synapse/handlers') diff --git a/changelog.d/6409.feature b/changelog.d/6409.feature new file mode 100644 index 0000000000..653ff5a5ad --- /dev/null +++ b/changelog.d/6409.feature @@ -0,0 +1 @@ +Add ephemeral messages support by partially implementing [MSC2228](https://github.com/matrix-org/matrix-doc/pull/2228). diff --git a/synapse/api/constants.py b/synapse/api/constants.py index e3f086f1c3..69cef369a5 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -147,3 +147,7 @@ class EventContentFields(object): # Labels for the event, cf https://github.com/matrix-org/matrix-doc/pull/2326 LABELS = "org.matrix.labels" + + # Timestamp to delete the event after + # cf https://github.com/matrix-org/matrix-doc/pull/2228 + SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after" diff --git a/synapse/config/server.py b/synapse/config/server.py index 7a9d711669..837fbe1582 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -490,6 +490,8 @@ class ServerConfig(Config): "cleanup_extremities_with_dummy_events", True ) + self.enable_ephemeral_messages = config.get("enable_ephemeral_messages", False) + def has_tls_listener(self) -> bool: return any(l["tls"] for l in self.listeners) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d3267734f7..d9d0cd9eef 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -121,6 +121,7 @@ class FederationHandler(BaseHandler): self.pusher_pool = hs.get_pusherpool() self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() + self._message_handler = hs.get_message_handler() self._server_notices_mxid = hs.config.server_notices_mxid self.config = hs.config self.http_client = hs.get_simple_http_client() @@ -141,6 +142,8 @@ class FederationHandler(BaseHandler): self.third_party_event_rules = hs.get_third_party_event_rules() + self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages + @defer.inlineCallbacks def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False): """ Process a PDU received via a federation /send/ transaction, or @@ -2715,6 +2718,11 @@ class FederationHandler(BaseHandler): event_and_contexts, backfilled=backfilled ) + if self._ephemeral_messages_enabled: + for (event, context) in event_and_contexts: + # If there's an expiry timestamp on the event, schedule its expiry. + self._message_handler.maybe_schedule_expiry(event) + if not backfilled: # Never notify for backfilled events for event, _ in event_and_contexts: yield self._notify_persisted_event(event, max_stream_id) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3b0156f516..4f53a5f5dc 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import Optional from six import iteritems, itervalues, string_types @@ -22,9 +23,16 @@ from canonicaljson import encode_canonical_json, json from twisted.internet import defer from twisted.internet.defer import succeed +from twisted.internet.interfaces import IDelayedCall from synapse import event_auth -from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes +from synapse.api.constants import ( + EventContentFields, + EventTypes, + Membership, + RelationTypes, + UserTypes, +) from synapse.api.errors import ( AuthError, Codes, @@ -62,6 +70,17 @@ class MessageHandler(object): self.storage = hs.get_storage() self.state_store = self.storage.state self._event_serializer = hs.get_event_client_serializer() + self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages + self._is_worker_app = bool(hs.config.worker_app) + + # The scheduled call to self._expire_event. None if no call is currently + # scheduled. + self._scheduled_expiry = None # type: Optional[IDelayedCall] + + if not hs.config.worker_app: + run_as_background_process( + "_schedule_next_expiry", self._schedule_next_expiry + ) @defer.inlineCallbacks def get_room_data( @@ -225,6 +244,100 @@ class MessageHandler(object): for user_id, profile in iteritems(users_with_profile) } + def maybe_schedule_expiry(self, event): + """Schedule the expiry of an event if there's not already one scheduled, + or if the one running is for an event that will expire after the provided + timestamp. + + This function needs to invalidate the event cache, which is only possible on + the master process, and therefore needs to be run on there. + + Args: + event (EventBase): The event to schedule the expiry of. + """ + assert not self._is_worker_app + + expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) + if not isinstance(expiry_ts, int) or event.is_state(): + return + + # _schedule_expiry_for_event won't actually schedule anything if there's already + # a task scheduled for a timestamp that's sooner than the provided one. + self._schedule_expiry_for_event(event.event_id, expiry_ts) + + @defer.inlineCallbacks + def _schedule_next_expiry(self): + """Retrieve the ID and the expiry timestamp of the next event to be expired, + and schedule an expiry task for it. + + If there's no event left to expire, set _expiry_scheduled to None so that a + future call to save_expiry_ts can schedule a new expiry task. + """ + # Try to get the expiry timestamp of the next event to expire. + res = yield self.store.get_next_event_to_expire() + if res: + event_id, expiry_ts = res + self._schedule_expiry_for_event(event_id, expiry_ts) + + def _schedule_expiry_for_event(self, event_id, expiry_ts): + """Schedule an expiry task for the provided event if there's not already one + scheduled at a timestamp that's sooner than the provided one. + + Args: + event_id (str): The ID of the event to expire. + expiry_ts (int): The timestamp at which to expire the event. + """ + if self._scheduled_expiry: + # If the provided timestamp refers to a time before the scheduled time of the + # next expiry task, cancel that task and reschedule it for this timestamp. + next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000 + if expiry_ts < next_scheduled_expiry_ts: + self._scheduled_expiry.cancel() + else: + return + + # Figure out how many seconds we need to wait before expiring the event. + now_ms = self.clock.time_msec() + delay = (expiry_ts - now_ms) / 1000 + + # callLater doesn't support negative delays, so trim the delay to 0 if we're + # in that case. + if delay < 0: + delay = 0 + + logger.info("Scheduling expiry for event %s in %.3fs", event_id, delay) + + self._scheduled_expiry = self.clock.call_later( + delay, + run_as_background_process, + "_expire_event", + self._expire_event, + event_id, + ) + + @defer.inlineCallbacks + def _expire_event(self, event_id): + """Retrieve and expire an event that needs to be expired from the database. + + If the event doesn't exist in the database, log it and delete the expiry date + from the database (so that we don't try to expire it again). + """ + assert self._ephemeral_events_enabled + + self._scheduled_expiry = None + + logger.info("Expiring event %s", event_id) + + try: + # Expire the event if we know about it. This function also deletes the expiry + # date from the database in the same database transaction. + yield self.store.expire_event(event_id) + except Exception as e: + logger.error("Could not expire event %s: %r", event_id, e) + + # Schedule the expiry of the next event to expire. + yield self._schedule_next_expiry() + # The duration (in ms) after which rooms should be removed # `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try @@ -295,6 +408,10 @@ class EventCreationHandler(object): 5 * 60 * 1000, ) + self._message_handler = hs.get_message_handler() + + self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages + @defer.inlineCallbacks def create_event( self, @@ -877,6 +994,10 @@ class EventCreationHandler(object): event, context=context ) + if self._ephemeral_events_enabled: + # If there's an expiry timestamp on the event, schedule its expiry. + self._message_handler.maybe_schedule_expiry(event) + yield self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) def _notify(): diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 2737a1d3ae..79c91fe284 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -130,6 +130,8 @@ class EventsStore( if self.hs.config.redaction_retention_period is not None: hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000) + self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages + @defer.inlineCallbacks def _read_forward_extremities(self): def fetch(txn): @@ -940,6 +942,12 @@ class EventsStore( txn, event.event_id, labels, event.room_id, event.depth ) + if self._ephemeral_messages_enabled: + # If there's an expiry timestamp on the event, store it. + expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) + if isinstance(expiry_ts, int) and not event.is_state(): + self._insert_event_expiry_txn(txn, event.event_id, expiry_ts) + # Insert into the room_memberships table. self._store_room_members_txn( txn, @@ -1101,12 +1109,7 @@ class EventsStore( def _update_censor_txn(txn): for redaction_id, event_id, pruned_json in updates: if pruned_json: - self._simple_update_one_txn( - txn, - table="event_json", - keyvalues={"event_id": event_id}, - updatevalues={"json": pruned_json}, - ) + self._censor_event_txn(txn, event_id, pruned_json) self._simple_update_one_txn( txn, @@ -1117,6 +1120,22 @@ class EventsStore( yield self.runInteraction("_update_censor_txn", _update_censor_txn) + def _censor_event_txn(self, txn, event_id, pruned_json): + """Censor an event by replacing its JSON in the event_json table with the + provided pruned JSON. + + Args: + txn (LoggingTransaction): The database transaction. + event_id (str): The ID of the event to censor. + pruned_json (str): The pruned JSON + """ + self._simple_update_one_txn( + txn, + table="event_json", + keyvalues={"event_id": event_id}, + updatevalues={"json": pruned_json}, + ) + @defer.inlineCallbacks def count_daily_messages(self): """ @@ -1957,6 +1976,101 @@ class EventsStore( ], ) + def _insert_event_expiry_txn(self, txn, event_id, expiry_ts): + """Save the expiry timestamp associated with a given event ID. + + Args: + txn (LoggingTransaction): The database transaction to use. + event_id (str): The event ID the expiry timestamp is associated with. + expiry_ts (int): The timestamp at which to expire (delete) the event. + """ + return self._simple_insert_txn( + txn=txn, + table="event_expiry", + values={"event_id": event_id, "expiry_ts": expiry_ts}, + ) + + @defer.inlineCallbacks + def expire_event(self, event_id): + """Retrieve and expire an event that has expired, and delete its associated + expiry timestamp. If the event can't be retrieved, delete its associated + timestamp so we don't try to expire it again in the future. + + Args: + event_id (str): The ID of the event to delete. + """ + # Try to retrieve the event's content from the database or the event cache. + event = yield self.get_event(event_id) + + def delete_expired_event_txn(txn): + # Delete the expiry timestamp associated with this event from the database. + self._delete_event_expiry_txn(txn, event_id) + + if not event: + # If we can't find the event, log a warning and delete the expiry date + # from the database so that we don't try to expire it again in the + # future. + logger.warning( + "Can't expire event %s because we don't have it.", event_id + ) + return + + # Prune the event's dict then convert it to JSON. + pruned_json = encode_json(prune_event_dict(event.get_dict())) + + # Update the event_json table to replace the event's JSON with the pruned + # JSON. + self._censor_event_txn(txn, event.event_id, pruned_json) + + # We need to invalidate the event cache entry for this event because we + # changed its content in the database. We can't call + # self._invalidate_cache_and_stream because self.get_event_cache isn't of the + # right type. + txn.call_after(self._get_event_cache.invalidate, (event.event_id,)) + # Send that invalidation to replication so that other workers also invalidate + # the event cache. + self._send_invalidation_to_replication( + txn, "_get_event_cache", (event.event_id,) + ) + + yield self.runInteraction("delete_expired_event", delete_expired_event_txn) + + def _delete_event_expiry_txn(self, txn, event_id): + """Delete the expiry timestamp associated with an event ID without deleting the + actual event. + + Args: + txn (LoggingTransaction): The transaction to use to perform the deletion. + event_id (str): The event ID to delete the associated expiry timestamp of. + """ + return self._simple_delete_txn( + txn=txn, table="event_expiry", keyvalues={"event_id": event_id} + ) + + def get_next_event_to_expire(self): + """Retrieve the entry with the lowest expiry timestamp in the event_expiry + table, or None if there's no more event to expire. + + Returns: Deferred[Optional[Tuple[str, int]]] + A tuple containing the event ID as its first element and an expiry timestamp + as its second one, if there's at least one row in the event_expiry table. + None otherwise. + """ + + def get_next_event_to_expire_txn(txn): + txn.execute( + """ + SELECT event_id, expiry_ts FROM event_expiry + ORDER BY expiry_ts ASC LIMIT 1 + """ + ) + + return txn.fetchone() + + return self.runInteraction( + desc="get_next_event_to_expire", func=get_next_event_to_expire_txn + ) + AllNewEventsResult = namedtuple( "AllNewEventsResult", diff --git a/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql b/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql new file mode 100644 index 0000000000..81a36a8b1d --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql @@ -0,0 +1,21 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_expiry ( + event_id TEXT PRIMARY KEY, + expiry_ts BIGINT NOT NULL +); + +CREATE INDEX event_expiry_expiry_ts_idx ON event_expiry(expiry_ts); diff --git a/tests/rest/client/test_ephemeral_message.py b/tests/rest/client/test_ephemeral_message.py new file mode 100644 index 0000000000..5e9c07ebf3 --- /dev/null +++ b/tests/rest/client/test_ephemeral_message.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.api.constants import EventContentFields, EventTypes +from synapse.rest import admin +from synapse.rest.client.v1 import room + +from tests import unittest + + +class EphemeralMessageTestCase(unittest.HomeserverTestCase): + + user_id = "@user:test" + + servlets = [ + admin.register_servlets, + room.register_servlets, + ] + + def make_homeserver(self, reactor, clock): + config = self.default_config() + + config["enable_ephemeral_messages"] = True + + self.hs = self.setup_test_homeserver(config=config) + return self.hs + + def prepare(self, reactor, clock, homeserver): + self.room_id = self.helper.create_room_as(self.user_id) + + def test_message_expiry_no_delay(self): + """Tests that sending a message sent with a m.self_destruct_after field set to the + past results in that event being deleted right away. + """ + # Send a message in the room that has expired. From here, the reactor clock is + # at 200ms, so 0 is in the past, and even if that wasn't the case and the clock + # is at 0ms the code path is the same if the event's expiry timestamp is the + # current timestamp. + res = self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={ + "msgtype": "m.text", + "body": "hello", + EventContentFields.SELF_DESTRUCT_AFTER: 0, + }, + ) + event_id = res["event_id"] + + # Check that we can't retrieve the content of the event. + event_content = self.get_event(self.room_id, event_id)["content"] + self.assertFalse(bool(event_content), event_content) + + def test_message_expiry_delay(self): + """Tests that sending a message with a m.self_destruct_after field set to the + future results in that event not being deleted right away, but advancing the + clock to after that expiry timestamp causes the event to be deleted. + """ + # Send a message in the room that'll expire in 1s. + res = self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={ + "msgtype": "m.text", + "body": "hello", + EventContentFields.SELF_DESTRUCT_AFTER: self.clock.time_msec() + 1000, + }, + ) + event_id = res["event_id"] + + # Check that we can retrieve the content of the event before it has expired. + event_content = self.get_event(self.room_id, event_id)["content"] + self.assertTrue(bool(event_content), event_content) + + # Advance the clock to after the deletion. + self.reactor.advance(1) + + # Check that we can't retrieve the content of the event anymore. + event_content = self.get_event(self.room_id, event_id)["content"] + self.assertFalse(bool(event_content), event_content) + + def get_event(self, room_id, event_id, expected_code=200): + url = "/_matrix/client/r0/rooms/%s/event/%s" % (room_id, event_id) + + request, channel = self.make_request("GET", url) + self.render(request) + + self.assertEqual(channel.code, expected_code, channel.result) + + return channel.json_body -- cgit 1.5.1 From ce1c975ebc3f2ba6448ec97dcc94ff7f4da8d4c4 Mon Sep 17 00:00:00 2001 From: Syam G Krishnan Date: Fri, 29 Nov 2019 22:50:13 +0530 Subject: Issue #6406 Fix parameter mismatch Signed-off-by: Syam G Krishnan --- synapse/handlers/register.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 95806af41e..8a7d965feb 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -266,7 +266,7 @@ class RegistrationHandler(BaseHandler): } # Bind email to new account - yield self._register_email_threepid(user_id, threepid_dict, None, False) + yield self._register_email_threepid(user_id, threepid_dict, None) return user_id -- cgit 1.5.1 From 08a436ecb25de2c4c8f2daf423bfcaf72e985143 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 4 Dec 2019 14:18:46 +0000 Subject: Incorporate review --- changelog.d/6329.bugfix | 1 + changelog.d/6329.feature | 1 - synapse/handlers/room.py | 3 +-- 3 files changed, 2 insertions(+), 3 deletions(-) create mode 100644 changelog.d/6329.bugfix delete mode 100644 changelog.d/6329.feature (limited to 'synapse/handlers') diff --git a/changelog.d/6329.bugfix b/changelog.d/6329.bugfix new file mode 100644 index 0000000000..e558d13b7d --- /dev/null +++ b/changelog.d/6329.bugfix @@ -0,0 +1 @@ +Correctly apply the event filter to the `state`, `events_before` and `events_after` fields in the response to `/context` requests. \ No newline at end of file diff --git a/changelog.d/6329.feature b/changelog.d/6329.feature deleted file mode 100644 index c27dbb06a4..0000000000 --- a/changelog.d/6329.feature +++ /dev/null @@ -1 +0,0 @@ -Filter `state`, `events_before` and `events_after` in `/context` requests. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3148df0de9..fd3ea8daf8 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -908,12 +908,11 @@ class RoomContextHandler(object): [last_event_id], state_filter=state_filter ) - # Apply the filter on state events. state_events = list(state[last_event_id].values()) if event_filter: state_events = event_filter.filter(state_events) - results["state"] = list(state_events) + results["state"] = state_events # We use a dummy token here as we only care about the room portion of # the token, which we replace. -- cgit 1.5.1 From e203874caaae2a378ccbb6b827b6847b3d9a06b8 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 4 Dec 2019 17:27:32 +0000 Subject: get rid of (most of) have_events from _update_auth_events_and_context_for_auth (#6468) have_events was a map from event_id to rejection reason (or None) for events which are in our local database. It was used as filter on the list of event_ids being passed into get_events_as_list. However, since get_events_as_list will ignore any event_ids that are unknown or rejected, we can equivalently just leave it to get_events_as_list to do the filtering. That means that we don't have to keep `have_events` up-to-date, and can use `have_seen_events` instead of `get_seen_events_with_rejection` in the one place we do need it. --- changelog.d/6468.misc | 1 + synapse/handlers/federation.py | 62 +++++++++-------------- synapse/storage/data_stores/main/events_worker.py | 34 ------------- 3 files changed, 25 insertions(+), 72 deletions(-) create mode 100644 changelog.d/6468.misc (limited to 'synapse/handlers') diff --git a/changelog.d/6468.misc b/changelog.d/6468.misc new file mode 100644 index 0000000000..d9a44389b9 --- /dev/null +++ b/changelog.d/6468.misc @@ -0,0 +1 @@ +Refactor some code in the event authentication path for clarity. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d9d0cd9eef..7784b80b77 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2122,14 +2122,9 @@ class FederationHandler(BaseHandler): # # we start by checking if they are in the store, and then try calling /event_auth/. if missing_auth: - # TODO: can we use store.have_seen_events here instead? - have_events = yield self.store.get_seen_events_with_rejections(missing_auth) - logger.debug("Found events %s in the store", have_events) - missing_auth.difference_update(have_events.keys()) - else: - have_events = {} - - have_events.update({e.event_id: "" for e in auth_events.values()}) + have_events = yield self.store.have_seen_events(missing_auth) + logger.debug("Events %s are in the store", have_events) + missing_auth.difference_update(have_events) if missing_auth: # If we don't have all the auth events, we need to get them. @@ -2175,9 +2170,6 @@ class FederationHandler(BaseHandler): except AuthError: pass - have_events = yield self.store.get_seen_events_with_rejections( - event.auth_event_ids() - ) except Exception: logger.exception("Failed to get auth chain") @@ -2207,39 +2199,33 @@ class FederationHandler(BaseHandler): # idea of them. room_version = yield self.store.get_room_version(event.room_id) - different_event_ids = [ - d for d in different_auth if d in have_events and not have_events[d] - ] - if different_event_ids: - # XXX: currently this checks for redactions but I'm not convinced that is - # necessary? - different_events = yield self.store.get_events_as_list(different_event_ids) + # XXX: currently this checks for redactions but I'm not convinced that is + # necessary? + different_events = yield self.store.get_events_as_list(different_auth) - local_view = dict(auth_events) - remote_view = dict(auth_events) - remote_view.update({(d.type, d.state_key): d for d in different_events}) + local_view = dict(auth_events) + remote_view = dict(auth_events) + remote_view.update({(d.type, d.state_key): d for d in different_events}) - new_state = yield self.state_handler.resolve_events( - room_version, - [list(local_view.values()), list(remote_view.values())], - event, - ) + new_state = yield self.state_handler.resolve_events( + room_version, [list(local_view.values()), list(remote_view.values())], event + ) - logger.info( - "After state res: updating auth_events with new state %s", - { - (d.type, d.state_key): d.event_id - for d in new_state.values() - if auth_events.get((d.type, d.state_key)) != d - }, - ) + logger.info( + "After state res: updating auth_events with new state %s", + { + (d.type, d.state_key): d.event_id + for d in new_state.values() + if auth_events.get((d.type, d.state_key)) != d + }, + ) - auth_events.update(new_state) + auth_events.update(new_state) - context = yield self._update_context_for_auth_events( - event, context, auth_events - ) + context = yield self._update_context_for_auth_events( + event, context, auth_events + ) return context diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index e782e8f481..eaddca65b7 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -783,40 +783,6 @@ class EventsWorkerStore(SQLBaseStore): yield self.runInteraction("have_seen_events", have_seen_events_txn, chunk) return results - def get_seen_events_with_rejections(self, event_ids): - """Given a list of event ids, check if we rejected them. - - Args: - event_ids (list[str]) - - Returns: - Deferred[dict[str, str|None): - Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps - to None. - """ - if not event_ids: - return defer.succeed({}) - - def f(txn): - sql = ( - "SELECT e.event_id, reason FROM events as e " - "LEFT JOIN rejections as r ON e.event_id = r.event_id " - "WHERE e.event_id = ?" - ) - - res = {} - for event_id in event_ids: - txn.execute(sql, (event_id,)) - row = txn.fetchone() - if row: - _, rejected = row - res[event_id] = rejected - - return res - - return self.runInteraction("get_seen_events_with_rejections", f) - def _get_total_state_event_counts_txn(self, txn, room_id): """ See get_total_state_event_counts. -- cgit 1.5.1 From e1f4c83f41bf6f06bef3d160eb94eacabe59eff1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 5 Dec 2019 14:14:45 +0000 Subject: Sanity-check the rooms of auth events before pulling them in. (#6472) --- changelog.d/6472.bugfix | 1 + synapse/handlers/federation.py | 34 +++++++++++++++++++++++++--------- 2 files changed, 26 insertions(+), 9 deletions(-) create mode 100644 changelog.d/6472.bugfix (limited to 'synapse/handlers') diff --git a/changelog.d/6472.bugfix b/changelog.d/6472.bugfix new file mode 100644 index 0000000000..598efb79fc --- /dev/null +++ b/changelog.d/6472.bugfix @@ -0,0 +1 @@ +Improve sanity-checking when receiving events over federation. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7784b80b77..f5d04cdf91 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2195,21 +2195,37 @@ class FederationHandler(BaseHandler): different_auth, ) - # now we state-resolve between our own idea of the auth events, and the remote's - # idea of them. - - room_version = yield self.store.get_room_version(event.room_id) - # XXX: currently this checks for redactions but I'm not convinced that is # necessary? different_events = yield self.store.get_events_as_list(different_auth) - local_view = dict(auth_events) - remote_view = dict(auth_events) - remote_view.update({(d.type, d.state_key): d for d in different_events}) + for d in different_events: + if d.room_id != event.room_id: + logger.warning( + "Event %s refers to auth_event %s which is in a different room", + event.event_id, + d.event_id, + ) + + # don't attempt to resolve the claimed auth events against our own + # in this case: just use our own auth events. + # + # XXX: should we reject the event in this case? It feels like we should, + # but then shouldn't we also do so if we've failed to fetch any of the + # auth events? + return context + # now we state-resolve between our own idea of the auth events, and the remote's + # idea of them. + + local_state = auth_events.values() + remote_auth_events = dict(auth_events) + remote_auth_events.update({(d.type, d.state_key): d for d in different_events}) + remote_state = remote_auth_events.values() + + room_version = yield self.store.get_room_version(event.room_id) new_state = yield self.state_handler.resolve_events( - room_version, [list(local_view.values()), list(remote_view.values())], event + room_version, (local_state, remote_state), event ) logger.info( -- cgit 1.5.1 From 63d6ad1064c1a5fe23da3b6b64474a2b211f5eea Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 5 Dec 2019 15:02:35 +0000 Subject: Stronger typing in the federation handler (#6480) replace the event_info dict with an attrs thing --- changelog.d/6480.misc | 1 + synapse/handlers/federation.py | 81 +++++++++++++++++++++++++++++------------- 2 files changed, 58 insertions(+), 24 deletions(-) create mode 100644 changelog.d/6480.misc (limited to 'synapse/handlers') diff --git a/changelog.d/6480.misc b/changelog.d/6480.misc new file mode 100644 index 0000000000..d9a44389b9 --- /dev/null +++ b/changelog.d/6480.misc @@ -0,0 +1 @@ +Refactor some code in the event authentication path for clarity. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f5d04cdf91..bc26921768 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -19,11 +19,13 @@ import itertools import logging +from typing import Dict, Iterable, Optional, Sequence, Tuple import six from six import iteritems, itervalues from six.moves import http_client, zip +import attr from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from unpaddedbase64 import decode_base64 @@ -45,6 +47,7 @@ from synapse.api.errors import ( from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions from synapse.crypto.event_signing import compute_event_signature from synapse.event_auth import auth_types_for_event +from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.events.validator import EventValidator from synapse.logging.context import ( @@ -72,6 +75,23 @@ from ._base import BaseHandler logger = logging.getLogger(__name__) +@attr.s +class _NewEventInfo: + """Holds information about a received event, ready for passing to _handle_new_events + + Attributes: + event: the received event + + state: the state at that event + + auth_events: the auth_event map for that event + """ + + event = attr.ib(type=EventBase) + state = attr.ib(type=Optional[Sequence[EventBase]], default=None) + auth_events = attr.ib(type=Optional[Dict[Tuple[str, str], EventBase]], default=None) + + def shortstr(iterable, maxitems=5): """If iterable has maxitems or fewer, return the stringification of a list containing those items. @@ -597,14 +617,14 @@ class FederationHandler(BaseHandler): for e in auth_chain if e.event_id in auth_ids or e.type == EventTypes.Create } - event_infos.append({"event": e, "auth_events": auth}) + event_infos.append(_NewEventInfo(event=e, auth_events=auth)) seen_ids.add(e.event_id) logger.info( "[%s %s] persisting newly-received auth/state events %s", room_id, event_id, - [e["event"].event_id for e in event_infos], + [e.event.event_id for e in event_infos], ) yield self._handle_new_events(origin, event_infos) @@ -795,9 +815,9 @@ class FederationHandler(BaseHandler): a.internal_metadata.outlier = True ev_infos.append( - { - "event": a, - "auth_events": { + _NewEventInfo( + event=a, + auth_events={ ( auth_events[a_id].type, auth_events[a_id].state_key, @@ -805,7 +825,7 @@ class FederationHandler(BaseHandler): for a_id in a.auth_event_ids() if a_id in auth_events }, - } + ) ) # Step 1b: persist the events in the chunk we fetched state for (i.e. @@ -817,10 +837,10 @@ class FederationHandler(BaseHandler): assert not ev.internal_metadata.is_outlier() ev_infos.append( - { - "event": ev, - "state": events_to_state[e_id], - "auth_events": { + _NewEventInfo( + event=ev, + state=events_to_state[e_id], + auth_events={ ( auth_events[a_id].type, auth_events[a_id].state_key, @@ -828,7 +848,7 @@ class FederationHandler(BaseHandler): for a_id in ev.auth_event_ids() if a_id in auth_events }, - } + ) ) yield self._handle_new_events(dest, ev_infos, backfilled=True) @@ -1713,7 +1733,12 @@ class FederationHandler(BaseHandler): return context @defer.inlineCallbacks - def _handle_new_events(self, origin, event_infos, backfilled=False): + def _handle_new_events( + self, + origin: str, + event_infos: Iterable[_NewEventInfo], + backfilled: bool = False, + ): """Creates the appropriate contexts and persists events. The events should not depend on one another, e.g. this should be used to persist a bunch of outliers, but not a chunk of individual events that depend @@ -1723,14 +1748,14 @@ class FederationHandler(BaseHandler): """ @defer.inlineCallbacks - def prep(ev_info): - event = ev_info["event"] + def prep(ev_info: _NewEventInfo): + event = ev_info.event with nested_logging_context(suffix=event.event_id): res = yield self._prep_event( origin, event, - state=ev_info.get("state"), - auth_events=ev_info.get("auth_events"), + state=ev_info.state, + auth_events=ev_info.auth_events, backfilled=backfilled, ) return res @@ -1744,7 +1769,7 @@ class FederationHandler(BaseHandler): yield self.persist_events_and_notify( [ - (ev_info["event"], context) + (ev_info.event, context) for ev_info, context in zip(event_infos, contexts) ], backfilled=backfilled, @@ -1846,7 +1871,14 @@ class FederationHandler(BaseHandler): yield self.persist_events_and_notify([(event, new_event_context)]) @defer.inlineCallbacks - def _prep_event(self, origin, event, state, auth_events, backfilled): + def _prep_event( + self, + origin: str, + event: EventBase, + state: Optional[Iterable[EventBase]], + auth_events: Optional[Dict[Tuple[str, str], EventBase]], + backfilled: bool, + ): """ Args: @@ -1854,7 +1886,7 @@ class FederationHandler(BaseHandler): event: state: auth_events: - backfilled (bool) + backfilled: Returns: Deferred, which resolves to synapse.events.snapshot.EventContext @@ -1890,15 +1922,16 @@ class FederationHandler(BaseHandler): return context @defer.inlineCallbacks - def _check_for_soft_fail(self, event, state, backfilled): + def _check_for_soft_fail( + self, event: EventBase, state: Optional[Iterable[EventBase]], backfilled: bool + ): """Checks if we should soft fail the event, if so marks the event as such. Args: - event (FrozenEvent) - state (dict|None): The state at the event if we don't have all the - event's prev events - backfilled (bool): Whether the event is from backfill + event + state: The state at the event if we don't have all the event's prev events + backfilled: Whether the event is from backfill Returns: Deferred -- cgit 1.5.1 From 8437e2383ed2dffacca5395851023eeacb33d7ba Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Dec 2019 17:58:25 +0000 Subject: Port SyncHandler to async/await --- synapse/handlers/events.py | 30 +++--- synapse/handlers/sync.py | 251 +++++++++++++++++++++----------------------- synapse/notifier.py | 29 +++-- synapse/util/metrics.py | 23 ++-- tests/handlers/test_sync.py | 33 +++--- tests/unittest.py | 7 +- 6 files changed, 182 insertions(+), 191 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 45fe13c62f..ec18a42a68 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -16,8 +16,6 @@ import logging import random -from twisted.internet import defer - from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, SynapseError from synapse.events import EventBase @@ -50,9 +48,8 @@ class EventStreamHandler(BaseHandler): self._server_notices_sender = hs.get_server_notices_sender() self._event_serializer = hs.get_event_client_serializer() - @defer.inlineCallbacks @log_function - def get_stream( + async def get_stream( self, auth_user_id, pagin_config, @@ -69,17 +66,17 @@ class EventStreamHandler(BaseHandler): """ if room_id: - blocked = yield self.store.is_room_blocked(room_id) + blocked = await self.store.is_room_blocked(room_id) if blocked: raise SynapseError(403, "This room has been blocked on this server") # send any outstanding server notices to the user. - yield self._server_notices_sender.on_user_syncing(auth_user_id) + await self._server_notices_sender.on_user_syncing(auth_user_id) auth_user = UserID.from_string(auth_user_id) presence_handler = self.hs.get_presence_handler() - context = yield presence_handler.user_syncing( + context = await presence_handler.user_syncing( auth_user_id, affect_presence=affect_presence ) with context: @@ -91,7 +88,7 @@ class EventStreamHandler(BaseHandler): # thundering herds on restart. timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1)) - events, tokens = yield self.notifier.get_events_for( + events, tokens = await self.notifier.get_events_for( auth_user, pagin_config, timeout, @@ -112,14 +109,14 @@ class EventStreamHandler(BaseHandler): # Send down presence. if event.state_key == auth_user_id: # Send down presence for everyone in the room. - users = yield self.state.get_current_users_in_room( + users = await self.state.get_current_users_in_room( event.room_id ) - states = yield presence_handler.get_states(users, as_event=True) + states = await presence_handler.get_states(users, as_event=True) to_add.extend(states) else: - ev = yield presence_handler.get_state( + ev = await presence_handler.get_state( UserID.from_string(event.state_key), as_event=True ) to_add.append(ev) @@ -128,7 +125,7 @@ class EventStreamHandler(BaseHandler): time_now = self.clock.time_msec() - chunks = yield self._event_serializer.serialize_events( + chunks = await self._event_serializer.serialize_events( events, time_now, as_client_event=as_client_event, @@ -151,8 +148,7 @@ class EventHandler(BaseHandler): super(EventHandler, self).__init__(hs) self.storage = hs.get_storage() - @defer.inlineCallbacks - def get_event(self, user, room_id, event_id): + async def get_event(self, user, room_id, event_id): """Retrieve a single specified event. Args: @@ -167,15 +163,15 @@ class EventHandler(BaseHandler): AuthError if the user does not have the rights to inspect this event. """ - event = yield self.store.get_event(event_id, check_room_id=room_id) + event = await self.store.get_event(event_id, check_room_id=room_id) if not event: return None - users = yield self.store.get_users_in_room(event.room_id) + users = await self.store.get_users_in_room(event.room_id) is_peeking = user.to_string() not in users - filtered = yield filter_events_for_client( + filtered = await filter_events_for_client( self.storage, user.to_string(), [event], is_peeking=is_peeking ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b536d410e5..12751fd8c0 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -22,8 +22,6 @@ from six import iteritems, itervalues from prometheus_client import Counter -from twisted.internet import defer - from synapse.api.constants import EventTypes, Membership from synapse.logging.context import LoggingContext from synapse.push.clientformat import format_push_rules_for_user @@ -241,8 +239,7 @@ class SyncHandler(object): expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE, ) - @defer.inlineCallbacks - def wait_for_sync_for_user( + async def wait_for_sync_for_user( self, sync_config, since_token=None, timeout=0, full_state=False ): """Get the sync for a client if we have new data for it now. Otherwise @@ -255,9 +252,9 @@ class SyncHandler(object): # not been exceeded (if not part of the group by this point, almost certain # auth_blocking will occur) user_id = sync_config.user.to_string() - yield self.auth.check_auth_blocking(user_id) + await self.auth.check_auth_blocking(user_id) - res = yield self.response_cache.wrap( + res = await self.response_cache.wrap( sync_config.request_key, self._wait_for_sync_for_user, sync_config, @@ -267,8 +264,9 @@ class SyncHandler(object): ) return res - @defer.inlineCallbacks - def _wait_for_sync_for_user(self, sync_config, since_token, timeout, full_state): + async def _wait_for_sync_for_user( + self, sync_config, since_token, timeout, full_state + ): if since_token is None: sync_type = "initial_sync" elif full_state: @@ -283,7 +281,7 @@ class SyncHandler(object): if timeout == 0 or since_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. - result = yield self.current_sync_for_user( + result = await self.current_sync_for_user( sync_config, since_token, full_state=full_state ) else: @@ -291,7 +289,7 @@ class SyncHandler(object): def current_sync_callback(before_token, after_token): return self.current_sync_for_user(sync_config, since_token) - result = yield self.notifier.wait_for_events( + result = await self.notifier.wait_for_events( sync_config.user.to_string(), timeout, current_sync_callback, @@ -314,15 +312,13 @@ class SyncHandler(object): """ return self.generate_sync_result(sync_config, since_token, full_state) - @defer.inlineCallbacks - def push_rules_for_user(self, user): + async def push_rules_for_user(self, user): user_id = user.to_string() - rules = yield self.store.get_push_rules_for_user(user_id) + rules = await self.store.get_push_rules_for_user(user_id) rules = format_push_rules_for_user(user, rules) return rules - @defer.inlineCallbacks - def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None): + async def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None): """Get the ephemeral events for each room the user is in Args: sync_result_builder(SyncResultBuilder) @@ -343,7 +339,7 @@ class SyncHandler(object): room_ids = sync_result_builder.joined_room_ids typing_source = self.event_sources.sources["typing"] - typing, typing_key = yield typing_source.get_new_events( + typing, typing_key = typing_source.get_new_events( user=sync_config.user, from_key=typing_key, limit=sync_config.filter_collection.ephemeral_limit(), @@ -365,7 +361,7 @@ class SyncHandler(object): receipt_key = since_token.receipt_key if since_token else "0" receipt_source = self.event_sources.sources["receipt"] - receipts, receipt_key = yield receipt_source.get_new_events( + receipts, receipt_key = await receipt_source.get_new_events( user=sync_config.user, from_key=receipt_key, limit=sync_config.filter_collection.ephemeral_limit(), @@ -382,8 +378,7 @@ class SyncHandler(object): return now_token, ephemeral_by_room - @defer.inlineCallbacks - def _load_filtered_recents( + async def _load_filtered_recents( self, room_id, sync_config, @@ -415,10 +410,10 @@ class SyncHandler(object): # ensure that we always include current state in the timeline current_state_ids = frozenset() if any(e.is_state() for e in recents): - current_state_ids = yield self.state.get_current_state_ids(room_id) + current_state_ids = await self.state.get_current_state_ids(room_id) current_state_ids = frozenset(itervalues(current_state_ids)) - recents = yield filter_events_for_client( + recents = await filter_events_for_client( self.storage, sync_config.user.to_string(), recents, @@ -449,14 +444,14 @@ class SyncHandler(object): # Otherwise, we want to return the last N events in the room # in toplogical ordering. if since_key: - events, end_key = yield self.store.get_room_events_stream_for_room( + events, end_key = await self.store.get_room_events_stream_for_room( room_id, limit=load_limit + 1, from_key=since_key, to_key=end_key, ) else: - events, end_key = yield self.store.get_recent_events_for_room( + events, end_key = await self.store.get_recent_events_for_room( room_id, limit=load_limit + 1, end_token=end_key ) loaded_recents = sync_config.filter_collection.filter_room_timeline( @@ -468,10 +463,10 @@ class SyncHandler(object): # ensure that we always include current state in the timeline current_state_ids = frozenset() if any(e.is_state() for e in loaded_recents): - current_state_ids = yield self.state.get_current_state_ids(room_id) + current_state_ids = await self.state.get_current_state_ids(room_id) current_state_ids = frozenset(itervalues(current_state_ids)) - loaded_recents = yield filter_events_for_client( + loaded_recents = await filter_events_for_client( self.storage, sync_config.user.to_string(), loaded_recents, @@ -498,8 +493,7 @@ class SyncHandler(object): limited=limited or newly_joined_room, ) - @defer.inlineCallbacks - def get_state_after_event(self, event, state_filter=StateFilter.all()): + async def get_state_after_event(self, event, state_filter=StateFilter.all()): """ Get the room state after the given event @@ -511,7 +505,7 @@ class SyncHandler(object): Returns: A Deferred map from ((type, state_key)->Event) """ - state_ids = yield self.state_store.get_state_ids_for_event( + state_ids = await self.state_store.get_state_ids_for_event( event.event_id, state_filter=state_filter ) if event.is_state(): @@ -519,8 +513,9 @@ class SyncHandler(object): state_ids[(event.type, event.state_key)] = event.event_id return state_ids - @defer.inlineCallbacks - def get_state_at(self, room_id, stream_position, state_filter=StateFilter.all()): + async def get_state_at( + self, room_id, stream_position, state_filter=StateFilter.all() + ): """ Get the room state at a particular stream position Args: @@ -536,13 +531,13 @@ class SyncHandler(object): # get_recent_events_for_room operates by topo ordering. This therefore # does not reliably give you the state at the given stream position. # (https://github.com/matrix-org/synapse/issues/3305) - last_events, _ = yield self.store.get_recent_events_for_room( + last_events, _ = await self.store.get_recent_events_for_room( room_id, end_token=stream_position.room_key, limit=1 ) if last_events: last_event = last_events[-1] - state = yield self.get_state_after_event( + state = await self.get_state_after_event( last_event, state_filter=state_filter ) @@ -551,8 +546,7 @@ class SyncHandler(object): state = {} return state - @defer.inlineCallbacks - def compute_summary(self, room_id, sync_config, batch, state, now_token): + async def compute_summary(self, room_id, sync_config, batch, state, now_token): """ Works out a room summary block for this room, summarising the number of joined members in the room, and providing the 'hero' members if the room has no name so clients can consistently name rooms. Also adds @@ -574,7 +568,7 @@ class SyncHandler(object): # FIXME: we could/should get this from room_stats when matthew/stats lands # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305 - last_events, _ = yield self.store.get_recent_event_ids_for_room( + last_events, _ = await self.store.get_recent_event_ids_for_room( room_id, end_token=now_token.room_key, limit=1 ) @@ -582,7 +576,7 @@ class SyncHandler(object): return None last_event = last_events[-1] - state_ids = yield self.state_store.get_state_ids_for_event( + state_ids = await self.state_store.get_state_ids_for_event( last_event.event_id, state_filter=StateFilter.from_types( [(EventTypes.Name, ""), (EventTypes.CanonicalAlias, "")] @@ -590,7 +584,7 @@ class SyncHandler(object): ) # this is heavily cached, thus: fast. - details = yield self.store.get_room_summary(room_id) + details = await self.store.get_room_summary(room_id) name_id = state_ids.get((EventTypes.Name, "")) canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, "")) @@ -608,12 +602,12 @@ class SyncHandler(object): # calculating heroes. Empty strings are falsey, so we check # for the "name" value and default to an empty string. if name_id: - name = yield self.store.get_event(name_id, allow_none=True) + name = await self.store.get_event(name_id, allow_none=True) if name and name.content.get("name"): return summary if canonical_alias_id: - canonical_alias = yield self.store.get_event( + canonical_alias = await self.store.get_event( canonical_alias_id, allow_none=True ) if canonical_alias and canonical_alias.content.get("alias"): @@ -678,7 +672,7 @@ class SyncHandler(object): ) ] - missing_hero_state = yield self.store.get_events(missing_hero_event_ids) + missing_hero_state = await self.store.get_events(missing_hero_event_ids) missing_hero_state = missing_hero_state.values() for s in missing_hero_state: @@ -697,8 +691,7 @@ class SyncHandler(object): logger.debug("found LruCache for %r", cache_key) return cache - @defer.inlineCallbacks - def compute_state_delta( + async def compute_state_delta( self, room_id, batch, sync_config, since_token, now_token, full_state ): """ Works out the difference in state between the start of the timeline @@ -759,16 +752,16 @@ class SyncHandler(object): if full_state: if batch: - current_state_ids = yield self.state_store.get_state_ids_for_event( + current_state_ids = await self.state_store.get_state_ids_for_event( batch.events[-1].event_id, state_filter=state_filter ) - state_ids = yield self.state_store.get_state_ids_for_event( + state_ids = await self.state_store.get_state_ids_for_event( batch.events[0].event_id, state_filter=state_filter ) else: - current_state_ids = yield self.get_state_at( + current_state_ids = await self.get_state_at( room_id, stream_position=now_token, state_filter=state_filter ) @@ -783,13 +776,13 @@ class SyncHandler(object): ) elif batch.limited: if batch: - state_at_timeline_start = yield self.state_store.get_state_ids_for_event( + state_at_timeline_start = await self.state_store.get_state_ids_for_event( batch.events[0].event_id, state_filter=state_filter ) else: # We can get here if the user has ignored the senders of all # the recent events. - state_at_timeline_start = yield self.get_state_at( + state_at_timeline_start = await self.get_state_at( room_id, stream_position=now_token, state_filter=state_filter ) @@ -807,19 +800,19 @@ class SyncHandler(object): # about them). state_filter = StateFilter.all() - state_at_previous_sync = yield self.get_state_at( + state_at_previous_sync = await self.get_state_at( room_id, stream_position=since_token, state_filter=state_filter ) if batch: - current_state_ids = yield self.state_store.get_state_ids_for_event( + current_state_ids = await self.state_store.get_state_ids_for_event( batch.events[-1].event_id, state_filter=state_filter ) else: # Its not clear how we get here, but empirically we do # (#5407). Logging has been added elsewhere to try and # figure out where this state comes from. - current_state_ids = yield self.get_state_at( + current_state_ids = await self.get_state_at( room_id, stream_position=now_token, state_filter=state_filter ) @@ -843,7 +836,7 @@ class SyncHandler(object): # So we fish out all the member events corresponding to the # timeline here, and then dedupe any redundant ones below. - state_ids = yield self.state_store.get_state_ids_for_event( + state_ids = await self.state_store.get_state_ids_for_event( batch.events[0].event_id, # we only want members! state_filter=StateFilter.from_types( @@ -883,7 +876,7 @@ class SyncHandler(object): state = {} if state_ids: - state = yield self.store.get_events(list(state_ids.values())) + state = await self.store.get_events(list(state_ids.values())) return { (e.type, e.state_key): e @@ -892,10 +885,9 @@ class SyncHandler(object): ) } - @defer.inlineCallbacks - def unread_notifs_for_room_id(self, room_id, sync_config): + async def unread_notifs_for_room_id(self, room_id, sync_config): with Measure(self.clock, "unread_notifs_for_room_id"): - last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user( + last_unread_event_id = await self.store.get_last_receipt_event_id_for_user( user_id=sync_config.user.to_string(), room_id=room_id, receipt_type="m.read", @@ -903,7 +895,7 @@ class SyncHandler(object): notifs = [] if last_unread_event_id: - notifs = yield self.store.get_unread_event_push_actions_by_room_for_user( + notifs = await self.store.get_unread_event_push_actions_by_room_for_user( room_id, sync_config.user.to_string(), last_unread_event_id ) return notifs @@ -912,8 +904,9 @@ class SyncHandler(object): # count is whatever it was last time. return None - @defer.inlineCallbacks - def generate_sync_result(self, sync_config, since_token=None, full_state=False): + async def generate_sync_result( + self, sync_config, since_token=None, full_state=False + ): """Generates a sync result. Args: @@ -928,7 +921,7 @@ class SyncHandler(object): # this is due to some of the underlying streams not supporting the ability # to query up to a given point. # Always use the `now_token` in `SyncResultBuilder` - now_token = yield self.event_sources.get_current_token() + now_token = await self.event_sources.get_current_token() logger.info( "Calculating sync response for %r between %s and %s", @@ -944,10 +937,9 @@ class SyncHandler(object): # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() else: - joined_room_ids = yield self.get_rooms_for_user_at( + joined_room_ids = await self.get_rooms_for_user_at( user_id, now_token.room_stream_id ) - sync_result_builder = SyncResultBuilder( sync_config, full_state, @@ -956,11 +948,11 @@ class SyncHandler(object): joined_room_ids=joined_room_ids, ) - account_data_by_room = yield self._generate_sync_entry_for_account_data( + account_data_by_room = await self._generate_sync_entry_for_account_data( sync_result_builder ) - res = yield self._generate_sync_entry_for_rooms( + res = await self._generate_sync_entry_for_rooms( sync_result_builder, account_data_by_room ) newly_joined_rooms, newly_joined_or_invited_users, _, _ = res @@ -970,13 +962,13 @@ class SyncHandler(object): since_token is None and sync_config.filter_collection.blocks_all_presence() ) if self.hs_config.use_presence and not block_all_presence_data: - yield self._generate_sync_entry_for_presence( + await self._generate_sync_entry_for_presence( sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users ) - yield self._generate_sync_entry_for_to_device(sync_result_builder) + await self._generate_sync_entry_for_to_device(sync_result_builder) - device_lists = yield self._generate_sync_entry_for_device_list( + device_lists = await self._generate_sync_entry_for_device_list( sync_result_builder, newly_joined_rooms=newly_joined_rooms, newly_joined_or_invited_users=newly_joined_or_invited_users, @@ -987,11 +979,11 @@ class SyncHandler(object): device_id = sync_config.device_id one_time_key_counts = {} if device_id: - one_time_key_counts = yield self.store.count_e2e_one_time_keys( + one_time_key_counts = await self.store.count_e2e_one_time_keys( user_id, device_id ) - yield self._generate_sync_entry_for_groups(sync_result_builder) + await self._generate_sync_entry_for_groups(sync_result_builder) # debug for https://github.com/matrix-org/synapse/issues/4422 for joined_room in sync_result_builder.joined: @@ -1015,18 +1007,17 @@ class SyncHandler(object): ) @measure_func("_generate_sync_entry_for_groups") - @defer.inlineCallbacks - def _generate_sync_entry_for_groups(self, sync_result_builder): + async def _generate_sync_entry_for_groups(self, sync_result_builder): user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token now_token = sync_result_builder.now_token if since_token and since_token.groups_key: - results = yield self.store.get_groups_changes_for_user( + results = self.store.get_groups_changes_for_user( user_id, since_token.groups_key, now_token.groups_key ) else: - results = yield self.store.get_all_groups_for_user( + results = await self.store.get_all_groups_for_user( user_id, now_token.groups_key ) @@ -1059,8 +1050,7 @@ class SyncHandler(object): ) @measure_func("_generate_sync_entry_for_device_list") - @defer.inlineCallbacks - def _generate_sync_entry_for_device_list( + async def _generate_sync_entry_for_device_list( self, sync_result_builder, newly_joined_rooms, @@ -1108,32 +1098,32 @@ class SyncHandler(object): # room with by looking at all users that have left a room plus users # that were in a room we've left. - users_who_share_room = yield self.store.get_users_who_share_room_with_user( + users_who_share_room = await self.store.get_users_who_share_room_with_user( user_id ) # Step 1a, check for changes in devices of users we share a room with - users_that_have_changed = yield self.store.get_users_whose_devices_changed( + users_that_have_changed = await self.store.get_users_whose_devices_changed( since_token.device_list_key, users_who_share_room ) # Step 1b, check for newly joined rooms for room_id in newly_joined_rooms: - joined_users = yield self.state.get_current_users_in_room(room_id) + joined_users = await self.state.get_current_users_in_room(room_id) newly_joined_or_invited_users.update(joined_users) # TODO: Check that these users are actually new, i.e. either they # weren't in the previous sync *or* they left and rejoined. users_that_have_changed.update(newly_joined_or_invited_users) - user_signatures_changed = yield self.store.get_users_whose_signatures_changed( + user_signatures_changed = await self.store.get_users_whose_signatures_changed( user_id, since_token.device_list_key ) users_that_have_changed.update(user_signatures_changed) # Now find users that we no longer track for room_id in newly_left_rooms: - left_users = yield self.state.get_current_users_in_room(room_id) + left_users = await self.state.get_current_users_in_room(room_id) newly_left_users.update(left_users) # Remove any users that we still share a room with. @@ -1143,8 +1133,7 @@ class SyncHandler(object): else: return DeviceLists(changed=[], left=[]) - @defer.inlineCallbacks - def _generate_sync_entry_for_to_device(self, sync_result_builder): + async def _generate_sync_entry_for_to_device(self, sync_result_builder): """Generates the portion of the sync response. Populates `sync_result_builder` with the result. @@ -1165,14 +1154,14 @@ class SyncHandler(object): # We only delete messages when a new message comes in, but that's # fine so long as we delete them at some point. - deleted = yield self.store.delete_messages_for_device( + deleted = await self.store.delete_messages_for_device( user_id, device_id, since_stream_id ) logger.debug( "Deleted %d to-device messages up to %d", deleted, since_stream_id ) - messages, stream_id = yield self.store.get_new_messages_for_device( + messages, stream_id = await self.store.get_new_messages_for_device( user_id, device_id, since_stream_id, now_token.to_device_key ) @@ -1190,8 +1179,7 @@ class SyncHandler(object): else: sync_result_builder.to_device = [] - @defer.inlineCallbacks - def _generate_sync_entry_for_account_data(self, sync_result_builder): + async def _generate_sync_entry_for_account_data(self, sync_result_builder): """Generates the account data portion of the sync response. Populates `sync_result_builder` with the result. @@ -1209,25 +1197,25 @@ class SyncHandler(object): ( account_data, account_data_by_room, - ) = yield self.store.get_updated_account_data_for_user( + ) = self.store.get_updated_account_data_for_user( user_id, since_token.account_data_key ) - push_rules_changed = yield self.store.have_push_rules_changed_for_user( + push_rules_changed = await self.store.have_push_rules_changed_for_user( user_id, int(since_token.push_rules_key) ) if push_rules_changed: - account_data["m.push_rules"] = yield self.push_rules_for_user( + account_data["m.push_rules"] = await self.push_rules_for_user( sync_config.user ) else: ( account_data, account_data_by_room, - ) = yield self.store.get_account_data_for_user(sync_config.user.to_string()) + ) = await self.store.get_account_data_for_user(sync_config.user.to_string()) - account_data["m.push_rules"] = yield self.push_rules_for_user( + account_data["m.push_rules"] = await self.push_rules_for_user( sync_config.user ) @@ -1242,8 +1230,7 @@ class SyncHandler(object): return account_data_by_room - @defer.inlineCallbacks - def _generate_sync_entry_for_presence( + async def _generate_sync_entry_for_presence( self, sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users ): """Generates the presence portion of the sync response. Populates the @@ -1271,7 +1258,7 @@ class SyncHandler(object): presence_key = None include_offline = False - presence, presence_key = yield presence_source.get_new_events( + presence, presence_key = await presence_source.get_new_events( user=user, from_key=presence_key, is_guest=sync_config.is_guest, @@ -1283,12 +1270,12 @@ class SyncHandler(object): extra_users_ids = set(newly_joined_or_invited_users) for room_id in newly_joined_rooms: - users = yield self.state.get_current_users_in_room(room_id) + users = await self.state.get_current_users_in_room(room_id) extra_users_ids.update(users) extra_users_ids.discard(user.to_string()) if extra_users_ids: - states = yield self.presence_handler.get_states(extra_users_ids) + states = await self.presence_handler.get_states(extra_users_ids) presence.extend(states) # Deduplicate the presence entries so that there's at most one per user @@ -1298,8 +1285,9 @@ class SyncHandler(object): sync_result_builder.presence = presence - @defer.inlineCallbacks - def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room): + async def _generate_sync_entry_for_rooms( + self, sync_result_builder, account_data_by_room + ): """Generates the rooms portion of the sync response. Populates the `sync_result_builder` with the result. @@ -1321,7 +1309,7 @@ class SyncHandler(object): if block_all_room_ephemeral: ephemeral_by_room = {} else: - now_token, ephemeral_by_room = yield self.ephemeral_by_room( + now_token, ephemeral_by_room = await self.ephemeral_by_room( sync_result_builder, now_token=sync_result_builder.now_token, since_token=sync_result_builder.since_token, @@ -1333,16 +1321,16 @@ class SyncHandler(object): since_token = sync_result_builder.since_token if not sync_result_builder.full_state: if since_token and not ephemeral_by_room and not account_data_by_room: - have_changed = yield self._have_rooms_changed(sync_result_builder) + have_changed = await self._have_rooms_changed(sync_result_builder) if not have_changed: - tags_by_room = yield self.store.get_updated_tags( + tags_by_room = await self.store.get_updated_tags( user_id, since_token.account_data_key ) if not tags_by_room: logger.debug("no-oping sync") return [], [], [], [] - ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( + ignored_account_data = await self.store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id=user_id ) @@ -1352,18 +1340,18 @@ class SyncHandler(object): ignored_users = frozenset() if since_token: - res = yield self._get_rooms_changed(sync_result_builder, ignored_users) + res = await self._get_rooms_changed(sync_result_builder, ignored_users) room_entries, invited, newly_joined_rooms, newly_left_rooms = res - tags_by_room = yield self.store.get_updated_tags( + tags_by_room = await self.store.get_updated_tags( user_id, since_token.account_data_key ) else: - res = yield self._get_all_rooms(sync_result_builder, ignored_users) + res = await self._get_all_rooms(sync_result_builder, ignored_users) room_entries, invited, newly_joined_rooms = res newly_left_rooms = [] - tags_by_room = yield self.store.get_tags_for_user(user_id) + tags_by_room = await self.store.get_tags_for_user(user_id) def handle_room_entries(room_entry): return self._generate_room_entry( @@ -1376,7 +1364,7 @@ class SyncHandler(object): always_include=sync_result_builder.full_state, ) - yield concurrently_execute(handle_room_entries, room_entries, 10) + await concurrently_execute(handle_room_entries, room_entries, 10) sync_result_builder.invited.extend(invited) @@ -1410,8 +1398,7 @@ class SyncHandler(object): newly_left_users, ) - @defer.inlineCallbacks - def _have_rooms_changed(self, sync_result_builder): + async def _have_rooms_changed(self, sync_result_builder): """Returns whether there may be any new events that should be sent down the sync. Returns True if there are. """ @@ -1422,7 +1409,7 @@ class SyncHandler(object): assert since_token # Get a list of membership change events that have happened. - rooms_changed = yield self.store.get_membership_changes_for_user( + rooms_changed = await self.store.get_membership_changes_for_user( user_id, since_token.room_key, now_token.room_key ) @@ -1435,8 +1422,7 @@ class SyncHandler(object): return True return False - @defer.inlineCallbacks - def _get_rooms_changed(self, sync_result_builder, ignored_users): + async def _get_rooms_changed(self, sync_result_builder, ignored_users): """Gets the the changes that have happened since the last sync. Args: @@ -1461,7 +1447,7 @@ class SyncHandler(object): assert since_token # Get a list of membership change events that have happened. - rooms_changed = yield self.store.get_membership_changes_for_user( + rooms_changed = await self.store.get_membership_changes_for_user( user_id, since_token.room_key, now_token.room_key ) @@ -1499,11 +1485,11 @@ class SyncHandler(object): continue if room_id in sync_result_builder.joined_room_ids or has_join: - old_state_ids = yield self.get_state_at(room_id, since_token) + old_state_ids = await self.get_state_at(room_id, since_token) old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) old_mem_ev = None if old_mem_ev_id: - old_mem_ev = yield self.store.get_event( + old_mem_ev = await self.store.get_event( old_mem_ev_id, allow_none=True ) @@ -1536,13 +1522,13 @@ class SyncHandler(object): newly_left_rooms.append(room_id) else: if not old_state_ids: - old_state_ids = yield self.get_state_at(room_id, since_token) + old_state_ids = await self.get_state_at(room_id, since_token) old_mem_ev_id = old_state_ids.get( (EventTypes.Member, user_id), None ) old_mem_ev = None if old_mem_ev_id: - old_mem_ev = yield self.store.get_event( + old_mem_ev = await self.store.get_event( old_mem_ev_id, allow_none=True ) if old_mem_ev and old_mem_ev.membership == Membership.JOIN: @@ -1566,7 +1552,7 @@ class SyncHandler(object): if leave_events: leave_event = leave_events[-1] - leave_stream_token = yield self.store.get_stream_token_for_event( + leave_stream_token = await self.store.get_stream_token_for_event( leave_event.event_id ) leave_token = since_token.copy_and_replace( @@ -1603,7 +1589,7 @@ class SyncHandler(object): timeline_limit = sync_config.filter_collection.timeline_limit() # Get all events for rooms we're currently joined to. - room_to_events = yield self.store.get_room_events_stream_for_rooms( + room_to_events = await self.store.get_room_events_stream_for_rooms( room_ids=sync_result_builder.joined_room_ids, from_key=since_token.room_key, to_key=now_token.room_key, @@ -1652,8 +1638,7 @@ class SyncHandler(object): return room_entries, invited, newly_joined_rooms, newly_left_rooms - @defer.inlineCallbacks - def _get_all_rooms(self, sync_result_builder, ignored_users): + async def _get_all_rooms(self, sync_result_builder, ignored_users): """Returns entries for all rooms for the user. Args: @@ -1677,7 +1662,7 @@ class SyncHandler(object): Membership.BAN, ) - room_list = yield self.store.get_rooms_for_user_where_membership_is( + room_list = await self.store.get_rooms_for_user_where_membership_is( user_id=user_id, membership_list=membership_list ) @@ -1700,7 +1685,7 @@ class SyncHandler(object): elif event.membership == Membership.INVITE: if event.sender in ignored_users: continue - invite = yield self.store.get_event(event.event_id) + invite = await self.store.get_event(event.event_id) invited.append(InvitedSyncResult(room_id=event.room_id, invite=invite)) elif event.membership in (Membership.LEAVE, Membership.BAN): # Always send down rooms we were banned or kicked from. @@ -1726,8 +1711,7 @@ class SyncHandler(object): return room_entries, invited, [] - @defer.inlineCallbacks - def _generate_room_entry( + async def _generate_room_entry( self, sync_result_builder, ignored_users, @@ -1769,7 +1753,7 @@ class SyncHandler(object): since_token = room_builder.since_token upto_token = room_builder.upto_token - batch = yield self._load_filtered_recents( + batch = await self._load_filtered_recents( room_id, sync_config, now_token=upto_token, @@ -1796,7 +1780,7 @@ class SyncHandler(object): # tag was added by synapse e.g. for server notice rooms. if full_state: user_id = sync_result_builder.sync_config.user.to_string() - tags = yield self.store.get_tags_for_room(user_id, room_id) + tags = await self.store.get_tags_for_room(user_id, room_id) # If there aren't any tags, don't send the empty tags list down # sync @@ -1821,7 +1805,7 @@ class SyncHandler(object): ): return - state = yield self.compute_state_delta( + state = await self.compute_state_delta( room_id, batch, sync_config, since_token, now_token, full_state=full_state ) @@ -1844,7 +1828,7 @@ class SyncHandler(object): ) or since_token is None ): - summary = yield self.compute_summary( + summary = await self.compute_summary( room_id, sync_config, batch, state, now_token ) @@ -1861,7 +1845,7 @@ class SyncHandler(object): ) if room_sync or always_include: - notifs = yield self.unread_notifs_for_room_id(room_id, sync_config) + notifs = await self.unread_notifs_for_room_id(room_id, sync_config) if notifs is not None: unread_notifications["notification_count"] = notifs["notify_count"] @@ -1887,8 +1871,7 @@ class SyncHandler(object): else: raise Exception("Unrecognized rtype: %r", room_builder.rtype) - @defer.inlineCallbacks - def get_rooms_for_user_at(self, user_id, stream_ordering): + async def get_rooms_for_user_at(self, user_id, stream_ordering): """Get set of joined rooms for a user at the given stream ordering. The stream ordering *must* be recent, otherwise this may throw an @@ -1903,7 +1886,7 @@ class SyncHandler(object): Deferred[frozenset[str]]: Set of room_ids the user is in at given stream_ordering. """ - joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering(user_id) + joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id) joined_room_ids = set() @@ -1921,10 +1904,10 @@ class SyncHandler(object): logger.info("User joined room after current token: %s", room_id) - extrems = yield self.store.get_forward_extremeties_for_room( + extrems = await self.store.get_forward_extremeties_for_room( room_id, stream_ordering ) - users_in_room = yield self.state.get_current_users_in_room(room_id, extrems) + users_in_room = await self.state.get_current_users_in_room(room_id, extrems) if user_id in users_in_room: joined_room_ids.add(room_id) diff --git a/synapse/notifier.py b/synapse/notifier.py index af161a81d7..5f5f765bea 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -304,8 +304,7 @@ class Notifier(object): without waking up any of the normal user event streams""" self.notify_replication() - @defer.inlineCallbacks - def wait_for_events( + async def wait_for_events( self, user_id, timeout, callback, room_ids=None, from_token=StreamToken.START ): """Wait until the callback returns a non empty response or the @@ -313,9 +312,9 @@ class Notifier(object): """ user_stream = self.user_to_user_stream.get(user_id) if user_stream is None: - current_token = yield self.event_sources.get_current_token() + current_token = await self.event_sources.get_current_token() if room_ids is None: - room_ids = yield self.store.get_rooms_for_user(user_id) + room_ids = await self.store.get_rooms_for_user(user_id) user_stream = _NotifierUserStream( user_id=user_id, rooms=room_ids, @@ -344,11 +343,11 @@ class Notifier(object): self.hs.get_reactor(), ) with PreserveLoggingContext(): - yield listener.deferred + await listener.deferred current_token = user_stream.current_token - result = yield callback(prev_token, current_token) + result = await callback(prev_token, current_token) if result: break @@ -364,12 +363,11 @@ class Notifier(object): # This happened if there was no timeout or if the timeout had # already expired. current_token = user_stream.current_token - result = yield callback(prev_token, current_token) + result = await callback(prev_token, current_token) return result - @defer.inlineCallbacks - def get_events_for( + async def get_events_for( self, user, pagination_config, @@ -391,15 +389,14 @@ class Notifier(object): """ from_token = pagination_config.from_token if not from_token: - from_token = yield self.event_sources.get_current_token() + from_token = await self.event_sources.get_current_token() limit = pagination_config.limit - room_ids, is_joined = yield self._get_room_ids(user, explicit_room_id) + room_ids, is_joined = await self._get_room_ids(user, explicit_room_id) is_peeking = not is_joined - @defer.inlineCallbacks - def check_for_updates(before_token, after_token): + async def check_for_updates(before_token, after_token): if not after_token.is_after(before_token): return EventStreamResult([], (from_token, from_token)) @@ -415,7 +412,7 @@ class Notifier(object): if only_keys and name not in only_keys: continue - new_events, new_key = yield source.get_new_events( + new_events, new_key = await source.get_new_events( user=user, from_key=getattr(from_token, keyname), limit=limit, @@ -425,7 +422,7 @@ class Notifier(object): ) if name == "room": - new_events = yield filter_events_for_client( + new_events = await filter_events_for_client( self.storage, user.to_string(), new_events, @@ -461,7 +458,7 @@ class Notifier(object): user_id_for_stream, ) - result = yield self.wait_for_events( + result = await self.wait_for_events( user_id_for_stream, timeout, check_for_updates, diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 3286804322..63ddaaba87 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import inspect import logging from functools import wraps @@ -64,12 +65,22 @@ def measure_func(name=None): def wrapper(func): block_name = func.__name__ if name is None else name - @wraps(func) - @defer.inlineCallbacks - def measured_func(self, *args, **kwargs): - with Measure(self.clock, block_name): - r = yield func(self, *args, **kwargs) - return r + if inspect.iscoroutinefunction(func): + + @wraps(func) + async def measured_func(self, *args, **kwargs): + with Measure(self.clock, block_name): + r = await func(self, *args, **kwargs) + return r + + else: + + @wraps(func) + @defer.inlineCallbacks + def measured_func(self, *args, **kwargs): + with Measure(self.clock, block_name): + r = yield func(self, *args, **kwargs) + return r return measured_func diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 31f54bbd7d..758ee071a5 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -12,54 +12,53 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer from synapse.api.errors import Codes, ResourceLimitError from synapse.api.filtering import DEFAULT_FILTER_COLLECTION -from synapse.handlers.sync import SyncConfig, SyncHandler +from synapse.handlers.sync import SyncConfig from synapse.types import UserID import tests.unittest import tests.utils -from tests.utils import setup_test_homeserver -class SyncTestCase(tests.unittest.TestCase): +class SyncTestCase(tests.unittest.HomeserverTestCase): """ Tests Sync Handler. """ - @defer.inlineCallbacks - def setUp(self): - self.hs = yield setup_test_homeserver(self.addCleanup) - self.sync_handler = SyncHandler(self.hs) + def prepare(self, reactor, clock, hs): + self.hs = hs + self.sync_handler = self.hs.get_sync_handler() self.store = self.hs.get_datastore() - @defer.inlineCallbacks def test_wait_for_sync_for_user_auth_blocking(self): user_id1 = "@user1:server" user_id2 = "@user2:server" sync_config = self._generate_sync_config(user_id1) + self.reactor.advance(100) # So we get not 0 time self.hs.config.limit_usage_by_mau = True self.hs.config.max_mau_value = 1 # Check that the happy case does not throw errors - yield self.store.upsert_monthly_active_user(user_id1) - yield self.sync_handler.wait_for_sync_for_user(sync_config) + self.get_success(self.store.upsert_monthly_active_user(user_id1)) + self.get_success(self.sync_handler.wait_for_sync_for_user(sync_config)) # Test that global lock works self.hs.config.hs_disabled = True - with self.assertRaises(ResourceLimitError) as e: - yield self.sync_handler.wait_for_sync_for_user(sync_config) - self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) + e = self.get_failure( + self.sync_handler.wait_for_sync_for_user(sync_config), ResourceLimitError + ) + self.assertEquals(e.value.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) self.hs.config.hs_disabled = False sync_config = self._generate_sync_config(user_id2) - with self.assertRaises(ResourceLimitError) as e: - yield self.sync_handler.wait_for_sync_for_user(sync_config) - self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) + e = self.get_failure( + self.sync_handler.wait_for_sync_for_user(sync_config), ResourceLimitError + ) + self.assertEquals(e.value.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) def _generate_sync_config(self, user_id): return SyncConfig( diff --git a/tests/unittest.py b/tests/unittest.py index 295573bc46..a1bdd963e6 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -18,6 +18,7 @@ import gc import hashlib import hmac +import inspect import logging import time @@ -25,7 +26,7 @@ from mock import Mock from canonicaljson import json -from twisted.internet.defer import Deferred, succeed +from twisted.internet.defer import Deferred, ensureDeferred, succeed from twisted.python.threadpool import ThreadPool from twisted.trial import unittest @@ -415,6 +416,8 @@ class HomeserverTestCase(TestCase): self.reactor.pump([by] * 100) def get_success(self, d, by=0.0): + if inspect.isawaitable(d): + d = ensureDeferred(d) if not isinstance(d, Deferred): return d self.pump(by=by) @@ -424,6 +427,8 @@ class HomeserverTestCase(TestCase): """ Run a Deferred and get a Failure from it. The failure must be of the type `exc`. """ + if inspect.isawaitable(d): + d = ensureDeferred(d) if not isinstance(d, Deferred): return d self.pump() -- cgit 1.5.1 From 649b6bc0888bb1f8c408d72dd92b0c025535a866 Mon Sep 17 00:00:00 2001 From: Manuel Stahl <37705355+awesome-manuel@users.noreply.github.com> Date: Thu, 5 Dec 2019 19:12:23 +0100 Subject: Replace /admin/v1/users_paginate endpoint with /admin/v2/users (#5925) --- changelog.d/5925.feature | 1 + changelog.d/5925.removal | 1 + docs/admin_api/user_admin_api.rst | 45 +++++++++++++++ synapse/handlers/admin.py | 21 ++++--- synapse/rest/admin/__init__.py | 4 +- synapse/rest/admin/users.py | 83 ++++++++++------------------ synapse/storage/_base.py | 50 +++++++++-------- synapse/storage/data_stores/main/__init__.py | 63 ++++++++++++++------- synapse/storage/data_stores/main/stats.py | 2 +- 9 files changed, 161 insertions(+), 109 deletions(-) create mode 100644 changelog.d/5925.feature create mode 100644 changelog.d/5925.removal (limited to 'synapse/handlers') diff --git a/changelog.d/5925.feature b/changelog.d/5925.feature new file mode 100644 index 0000000000..8025cc8231 --- /dev/null +++ b/changelog.d/5925.feature @@ -0,0 +1 @@ +Add admin/v2/users endpoint with pagination. Contributed by Awesome Technologies Innovationslabor GmbH. diff --git a/changelog.d/5925.removal b/changelog.d/5925.removal new file mode 100644 index 0000000000..cbba2855cb --- /dev/null +++ b/changelog.d/5925.removal @@ -0,0 +1 @@ +Remove admin/v1/users_paginate endpoint. Contributed by Awesome Technologies Innovationslabor GmbH. diff --git a/docs/admin_api/user_admin_api.rst b/docs/admin_api/user_admin_api.rst index d0871f9438..b451dc5014 100644 --- a/docs/admin_api/user_admin_api.rst +++ b/docs/admin_api/user_admin_api.rst @@ -1,3 +1,48 @@ +List Accounts +============= + +This API returns all local user accounts. + +The api is:: + + GET /_synapse/admin/v2/users?from=0&limit=10&guests=false + +including an ``access_token`` of a server admin. +The parameters ``from`` and ``limit`` are required only for pagination. +By default, a ``limit`` of 100 is used. +The parameter ``user_id`` can be used to select only users with user ids that +contain this value. +The parameter ``guests=false`` can be used to exclude guest users, +default is to include guest users. +The parameter ``deactivated=true`` can be used to include deactivated users, +default is to exclude deactivated users. +If the endpoint does not return a ``next_token`` then there are no more users left. +It returns a JSON body like the following: + +.. code:: json + + { + "users": [ + { + "name": "", + "password_hash": "", + "is_guest": 0, + "admin": 0, + "user_type": null, + "deactivated": 0 + }, { + "name": "", + "password_hash": "", + "is_guest": 0, + "admin": 1, + "user_type": null, + "deactivated": 0 + } + ], + "next_token": "100" + } + + Query Account ============= diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 6407d56f8e..14449b9a1e 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -56,7 +56,7 @@ class AdminHandler(BaseHandler): @defer.inlineCallbacks def get_users(self): - """Function to reterive a list of users in users table. + """Function to retrieve a list of users in users table. Args: Returns: @@ -67,19 +67,22 @@ class AdminHandler(BaseHandler): return ret @defer.inlineCallbacks - def get_users_paginate(self, order, start, limit): - """Function to reterive a paginated list of users from - users list. This will return a json object, which contains - list of users and the total number of users in users table. + def get_users_paginate(self, start, limit, name, guests, deactivated): + """Function to retrieve a paginated list of users from + users list. This will return a json list of users. Args: - order (str): column name to order the select by this column start (int): start number to begin the query from - limit (int): number of rows to reterive + limit (int): number of rows to retrieve + name (string): filter for user names + guests (bool): whether to in include guest users + deactivated (bool): whether to include deactivated users Returns: - defer.Deferred: resolves to json object {list[dict[str, Any]], count} + defer.Deferred: resolves to json list[dict[str, Any]] """ - ret = yield self.store.get_users_paginate(order, start, limit) + ret = yield self.store.get_users_paginate( + start, limit, name, guests, deactivated + ) return ret diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 68a59a3424..c122c449f4 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -34,12 +34,12 @@ from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet from synapse.rest.admin.users import ( AccountValidityRenewServlet, DeactivateAccountRestServlet, - GetUsersPaginatedRestServlet, ResetPasswordRestServlet, SearchUsersRestServlet, UserAdminServlet, UserRegisterServlet, UsersRestServlet, + UsersRestServletV2, WhoisRestServlet, ) from synapse.util.versionstring import get_version_string @@ -191,6 +191,7 @@ def register_servlets(hs, http_server): SendServerNoticeServlet(hs).register(http_server) VersionServlet(hs).register(http_server) UserAdminServlet(hs).register(http_server) + UsersRestServletV2(hs).register(http_server) def register_servlets_for_client_rest_resource(hs, http_server): @@ -201,7 +202,6 @@ def register_servlets_for_client_rest_resource(hs, http_server): PurgeHistoryRestServlet(hs).register(http_server) UsersRestServlet(hs).register(http_server) ResetPasswordRestServlet(hs).register(http_server) - GetUsersPaginatedRestServlet(hs).register(http_server) SearchUsersRestServlet(hs).register(http_server) ShutdownRoomRestServlet(hs).register(http_server) UserRegisterServlet(hs).register(http_server) diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 58a83f93af..1937879dbe 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -25,6 +25,7 @@ from synapse.api.errors import Codes, SynapseError from synapse.http.servlet import ( RestServlet, assert_params_in_dict, + parse_boolean, parse_integer, parse_json_object_from_request, parse_string, @@ -59,71 +60,45 @@ class UsersRestServlet(RestServlet): return 200, ret -class GetUsersPaginatedRestServlet(RestServlet): - """Get request to get specific number of users from Synapse. +class UsersRestServletV2(RestServlet): + PATTERNS = (re.compile("^/_synapse/admin/v2/users$"),) + + """Get request to list all local users. This needs user to have administrator access in Synapse. - Example: - http://localhost:8008/_synapse/admin/v1/users_paginate/ - @admin:user?access_token=admin_access_token&start=0&limit=10 - Returns: - 200 OK with json object {list[dict[str, Any]], count} or empty object. - """ - PATTERNS = historical_admin_path_patterns( - "/users_paginate/(?P[^/]*)" - ) + GET /_synapse/admin/v2/users?from=0&limit=10&guests=false + + returns: + 200 OK with list of users if success otherwise an error. + + The parameters `from` and `limit` are required only for pagination. + By default, a `limit` of 100 is used. + The parameter `user_id` can be used to filter by user id. + The parameter `guests` can be used to exclude guest users. + The parameter `deactivated` can be used to include deactivated users. + """ def __init__(self, hs): - self.store = hs.get_datastore() self.hs = hs self.auth = hs.get_auth() - self.handlers = hs.get_handlers() + self.admin_handler = hs.get_handlers().admin_handler - async def on_GET(self, request, target_user_id): - """Get request to get specific number of users from Synapse. - This needs user to have administrator access in Synapse. - """ + async def on_GET(self, request): await assert_requester_is_admin(self.auth, request) - target_user = UserID.from_string(target_user_id) - - if not self.hs.is_mine(target_user): - raise SynapseError(400, "Can only users a local user") - - order = "name" # order by name in user table - start = parse_integer(request, "start", required=True) - limit = parse_integer(request, "limit", required=True) - - logger.info("limit: %s, start: %s", limit, start) - - ret = await self.handlers.admin_handler.get_users_paginate(order, start, limit) - return 200, ret + start = parse_integer(request, "from", default=0) + limit = parse_integer(request, "limit", default=100) + user_id = parse_string(request, "user_id", default=None) + guests = parse_boolean(request, "guests", default=True) + deactivated = parse_boolean(request, "deactivated", default=False) - async def on_POST(self, request, target_user_id): - """Post request to get specific number of users from Synapse.. - This needs user to have administrator access in Synapse. - Example: - http://localhost:8008/_synapse/admin/v1/users_paginate/ - @admin:user?access_token=admin_access_token - JsonBodyToSend: - { - "start": "0", - "limit": "10 - } - Returns: - 200 OK with json object {list[dict[str, Any]], count} or empty object. - """ - await assert_requester_is_admin(self.auth, request) - UserID.from_string(target_user_id) - - order = "name" # order by name in user table - params = parse_json_object_from_request(request) - assert_params_in_dict(params, ["limit", "start"]) - limit = params["limit"] - start = params["start"] - logger.info("limit: %s, start: %s", limit, start) + users = await self.admin_handler.get_users_paginate( + start, limit, user_id, guests, deactivated + ) + ret = {"users": users} + if len(users) >= limit: + ret["next_token"] = str(start + len(users)) - ret = await self.handlers.admin_handler.get_users_paginate(order, start, limit) return 200, ret diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 9205e550bb..0d7c7dff27 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -1350,11 +1350,12 @@ class SQLBaseStore(object): def simple_select_list_paginate( self, table, - keyvalues, orderby, start, limit, retcols, + filters=None, + keyvalues=None, order_direction="ASC", desc="simple_select_list_paginate", ): @@ -1365,6 +1366,9 @@ class SQLBaseStore(object): Args: table (str): the table name + filters (dict[str, T] | None): + column names and values to filter the rows with, or None to not + apply a WHERE ? LIKE ? clause. keyvalues (dict[str, T] | None): column names and values to select the rows with, or None to not apply a WHERE clause. @@ -1380,11 +1384,12 @@ class SQLBaseStore(object): desc, self.simple_select_list_paginate_txn, table, - keyvalues, orderby, start, limit, retcols, + filters=filters, + keyvalues=keyvalues, order_direction=order_direction, ) @@ -1393,11 +1398,12 @@ class SQLBaseStore(object): cls, txn, table, - keyvalues, orderby, start, limit, retcols, + filters=None, + keyvalues=None, order_direction="ASC", ): """ @@ -1405,16 +1411,23 @@ class SQLBaseStore(object): of row numbers, which may return zero or number of rows from start to limit, returning the result as a list of dicts. + Use `filters` to search attributes using SQL wildcards and/or `keyvalues` to + select attributes with exact matches. All constraints are joined together + using 'AND'. + Args: txn : Transaction object table (str): the table name - keyvalues (dict[str, T] | None): - column names and values to select the rows with, or None to not - apply a WHERE clause. orderby (str): Column to order the results by. start (int): Index to begin the query at. limit (int): Number of results to return. retcols (iterable[str]): the names of the columns to return + filters (dict[str, T] | None): + column names and values to filter the rows with, or None to not + apply a WHERE ? LIKE ? clause. + keyvalues (dict[str, T] | None): + column names and values to select the rows with, or None to not + apply a WHERE clause. order_direction (str): Whether the results should be ordered "ASC" or "DESC". Returns: defer.Deferred: resolves to list[dict[str, Any]] @@ -1422,10 +1435,15 @@ class SQLBaseStore(object): if order_direction not in ["ASC", "DESC"]: raise ValueError("order_direction must be one of 'ASC' or 'DESC'.") + where_clause = "WHERE " if filters or keyvalues else "" + arg_list = [] + if filters: + where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters) + arg_list += list(filters.values()) + where_clause += " AND " if filters and keyvalues else "" if keyvalues: - where_clause = "WHERE " + " AND ".join("%s = ?" % (k,) for k in keyvalues) - else: - where_clause = "" + where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues) + arg_list += list(keyvalues.values()) sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % ( ", ".join(retcols), @@ -1434,22 +1452,10 @@ class SQLBaseStore(object): orderby, order_direction, ) - txn.execute(sql, list(keyvalues.values()) + [limit, start]) + txn.execute(sql, arg_list + [limit, start]) return cls.cursor_to_dict(txn) - def get_user_count_txn(self, txn): - """Get a total number of registered users in the users list. - - Args: - txn : Transaction object - Returns: - int : number of users - """ - sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;" - txn.execute(sql_count) - return txn.fetchone()[0] - def simple_search_list(self, table, term, col, retcols, desc="simple_search_list"): """Executes a SELECT query on the named table, which may return zero or more rows, returning the result as a list of dicts. diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py index 2a5b33dda1..3720ff3088 100644 --- a/synapse/storage/data_stores/main/__init__.py +++ b/synapse/storage/data_stores/main/__init__.py @@ -19,8 +19,6 @@ import calendar import logging import time -from twisted.internet import defer - from synapse.api.constants import PresenceState from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import ( @@ -476,7 +474,7 @@ class DataStore( ) def get_users(self): - """Function to reterive a list of users in users table. + """Function to retrieve a list of users in users table. Args: Returns: @@ -485,36 +483,59 @@ class DataStore( return self.simple_select_list( table="users", keyvalues={}, - retcols=["name", "password_hash", "is_guest", "admin", "user_type"], + retcols=[ + "name", + "password_hash", + "is_guest", + "admin", + "user_type", + "deactivated", + ], desc="get_users", ) - @defer.inlineCallbacks - def get_users_paginate(self, order, start, limit): - """Function to reterive a paginated list of users from - users list. This will return a json object, which contains - list of users and the total number of users in users table. + def get_users_paginate( + self, start, limit, name=None, guests=True, deactivated=False + ): + """Function to retrieve a paginated list of users from + users list. This will return a json list of users. Args: - order (str): column name to order the select by this column start (int): start number to begin the query from - limit (int): number of rows to reterive + limit (int): number of rows to retrieve + name (string): filter for user names + guests (bool): whether to in include guest users + deactivated (bool): whether to include deactivated users Returns: - defer.Deferred: resolves to json object {list[dict[str, Any]], count} + defer.Deferred: resolves to list[dict[str, Any]] """ - users = yield self.runInteraction( - "get_users_paginate", - self.simple_select_list_paginate_txn, + name_filter = {} + if name: + name_filter["name"] = "%" + name + "%" + + attr_filter = {} + if not guests: + attr_filter["is_guest"] = False + if not deactivated: + attr_filter["deactivated"] = False + + return self.simple_select_list_paginate( + desc="get_users_paginate", table="users", - keyvalues={"is_guest": False}, - orderby=order, + orderby="name", start=start, limit=limit, - retcols=["name", "password_hash", "is_guest", "admin", "user_type"], + filters=name_filter, + keyvalues=attr_filter, + retcols=[ + "name", + "password_hash", + "is_guest", + "admin", + "user_type", + "deactivated", + ], ) - count = yield self.runInteraction("get_users_paginate", self.get_user_count_txn) - retval = {"users": users, "total": count} - return retval def search_users(self, term): """Function to search users list for one or more users with diff --git a/synapse/storage/data_stores/main/stats.py b/synapse/storage/data_stores/main/stats.py index 3aeba859fd..b306478824 100644 --- a/synapse/storage/data_stores/main/stats.py +++ b/synapse/storage/data_stores/main/stats.py @@ -260,11 +260,11 @@ class StatsStore(StateDeltasStore): slice_list = self.simple_select_list_paginate_txn( txn, table + "_historical", - {id_col: stats_id}, "end_ts", start, size, retcols=selected_columns + ["bucket_size", "end_ts"], + keyvalues={id_col: stats_id}, order_direction="DESC", ) -- cgit 1.5.1 From b3a4e35ca84a29fe4ccdfb1125ed098c68405d6c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 Dec 2019 10:14:59 +0000 Subject: Fixup functions to consistently return deferreds --- synapse/handlers/sync.py | 6 +++--- synapse/handlers/typing.py | 2 +- synapse/storage/data_stores/main/account_data.py | 2 +- synapse/storage/data_stores/main/group_server.py | 4 ++-- tests/handlers/test_typing.py | 24 ++++++++++++++++++------ tests/rest/client/v1/test_typing.py | 4 +++- 6 files changed, 28 insertions(+), 14 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 12751fd8c0..2d3b8ba73c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -339,7 +339,7 @@ class SyncHandler(object): room_ids = sync_result_builder.joined_room_ids typing_source = self.event_sources.sources["typing"] - typing, typing_key = typing_source.get_new_events( + typing, typing_key = await typing_source.get_new_events( user=sync_config.user, from_key=typing_key, limit=sync_config.filter_collection.ephemeral_limit(), @@ -1013,7 +1013,7 @@ class SyncHandler(object): now_token = sync_result_builder.now_token if since_token and since_token.groups_key: - results = self.store.get_groups_changes_for_user( + results = await self.store.get_groups_changes_for_user( user_id, since_token.groups_key, now_token.groups_key ) else: @@ -1197,7 +1197,7 @@ class SyncHandler(object): ( account_data, account_data_by_room, - ) = self.store.get_updated_account_data_for_user( + ) = await self.store.get_updated_account_data_for_user( user_id, since_token.account_data_key ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 856337b7e2..6f78454322 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -313,7 +313,7 @@ class TypingNotificationEventSource(object): events.append(self._make_event_for(room_id)) - return events, handler._latest_room_serial + return defer.succeed((events, handler._latest_room_serial)) def get_current_key(self): return self.get_typing_handler()._latest_room_serial diff --git a/synapse/storage/data_stores/main/account_data.py b/synapse/storage/data_stores/main/account_data.py index b0d22faf3f..ed97b3ffe5 100644 --- a/synapse/storage/data_stores/main/account_data.py +++ b/synapse/storage/data_stores/main/account_data.py @@ -250,7 +250,7 @@ class AccountDataWorkerStore(SQLBaseStore): user_id, int(stream_id) ) if not changed: - return {}, {} + return defer.succeed(({}, {})) return self.runInteraction( "get_updated_account_data_for_user", get_updated_account_data_for_user_txn diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py index 9e1d12bcb7..d29155a3b5 100644 --- a/synapse/storage/data_stores/main/group_server.py +++ b/synapse/storage/data_stores/main/group_server.py @@ -1109,7 +1109,7 @@ class GroupServerStore(SQLBaseStore): user_id, from_token ) if not has_changed: - return [] + return defer.succeed([]) def _get_groups_changes_for_user_txn(txn): sql = """ @@ -1139,7 +1139,7 @@ class GroupServerStore(SQLBaseStore): from_token ) if not has_changed: - return [] + return defer.succeed([]) def _get_all_groups_changes_txn(txn): sql = """ diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index f6d8660285..92b8726093 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -163,7 +163,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): self.on_new_event.assert_has_calls([call("typing_key", 1, rooms=[ROOM_ID])]) self.assertEquals(self.event_source.get_current_key(), 1) - events = self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=0) + events = self.get_success( + self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=0) + ) self.assertEquals( events[0], [ @@ -227,7 +229,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): self.on_new_event.assert_has_calls([call("typing_key", 1, rooms=[ROOM_ID])]) self.assertEquals(self.event_source.get_current_key(), 1) - events = self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=0) + events = self.get_success( + self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=0) + ) self.assertEquals( events[0], [ @@ -279,7 +283,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): ) self.assertEquals(self.event_source.get_current_key(), 1) - events = self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=0) + events = self.get_success( + self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=0) + ) self.assertEquals( events[0], [{"type": "m.typing", "room_id": ROOM_ID, "content": {"user_ids": []}}], @@ -300,7 +306,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): self.on_new_event.reset_mock() self.assertEquals(self.event_source.get_current_key(), 1) - events = self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=0) + events = self.get_success( + self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=0) + ) self.assertEquals( events[0], [ @@ -317,7 +325,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): self.on_new_event.assert_has_calls([call("typing_key", 2, rooms=[ROOM_ID])]) self.assertEquals(self.event_source.get_current_key(), 2) - events = self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=1) + events = self.get_success( + self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=1) + ) self.assertEquals( events[0], [{"type": "m.typing", "room_id": ROOM_ID, "content": {"user_ids": []}}], @@ -335,7 +345,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): self.on_new_event.reset_mock() self.assertEquals(self.event_source.get_current_key(), 3) - events = self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=0) + events = self.get_success( + self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=0) + ) self.assertEquals( events[0], [ diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py index 30fb77bac8..4bc3aaf02d 100644 --- a/tests/rest/client/v1/test_typing.py +++ b/tests/rest/client/v1/test_typing.py @@ -109,7 +109,9 @@ class RoomTypingTestCase(unittest.HomeserverTestCase): self.assertEquals(200, channel.code) self.assertEquals(self.event_source.get_current_key(), 1) - events = self.event_source.get_new_events(from_key=0, room_ids=[self.room_id]) + events = self.get_success( + self.event_source.get_new_events(from_key=0, room_ids=[self.room_id]) + ) self.assertEquals( events[0], [ -- cgit 1.5.1 From 8ad8bcbed0bbd8a00ddfbe693b99785b72bb8ee2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 Dec 2019 11:50:34 +0000 Subject: Pull out room_invite_state_types config option once. Pulling things out of config is currently surprisingly expensive. --- synapse/handlers/message.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4f53a5f5dc..54fa216d83 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -363,6 +363,8 @@ class EventCreationHandler(object): self.config = hs.config self.require_membership_for_aliases = hs.config.require_membership_for_aliases + self.room_invite_state_types = self.hs.config.room_invite_state_types + self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs) # This is only used to get at ratelimit function, and maybe_kick_guest_users @@ -916,7 +918,7 @@ class EventCreationHandler(object): state_to_include_ids = [ e_id for k, e_id in iteritems(current_state_ids) - if k[0] in self.hs.config.room_invite_state_types + if k[0] in self.room_invite_state_types or k == (EventTypes.Member, event.sender) ] -- cgit 1.5.1 From adfdd82b21ae296ed77453b2f51d55414890f162 Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Mon, 9 Dec 2019 13:59:27 +0000 Subject: Back out perf regression from get_cross_signing_keys_from_cache. (#6494) Back out cross-signing code added in Synapse 1.5.0, which caused a performance regression. --- changelog.d/6494.bugfix | 1 + synapse/handlers/e2e_keys.py | 38 ++++++++------------------------------ sytest-blacklist | 3 +++ tests/handlers/test_e2e_keys.py | 8 ++++++++ 4 files changed, 20 insertions(+), 30 deletions(-) create mode 100644 changelog.d/6494.bugfix (limited to 'synapse/handlers') diff --git a/changelog.d/6494.bugfix b/changelog.d/6494.bugfix new file mode 100644 index 0000000000..78726d5d7f --- /dev/null +++ b/changelog.d/6494.bugfix @@ -0,0 +1 @@ +Back out cross-signing code added in Synapse 1.5.0, which caused a performance regression. diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 28c12753c1..57a10daefd 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -264,7 +264,6 @@ class E2eKeysHandler(object): return ret - @defer.inlineCallbacks def get_cross_signing_keys_from_cache(self, query, from_user_id): """Get cross-signing keys for users from the database @@ -284,35 +283,14 @@ class E2eKeysHandler(object): self_signing_keys = {} user_signing_keys = {} - for user_id in query: - # XXX: consider changing the store functions to allow querying - # multiple users simultaneously. - key = yield self.store.get_e2e_cross_signing_key( - user_id, "master", from_user_id - ) - if key: - master_keys[user_id] = key - - key = yield self.store.get_e2e_cross_signing_key( - user_id, "self_signing", from_user_id - ) - if key: - self_signing_keys[user_id] = key - - # users can see other users' master and self-signing keys, but can - # only see their own user-signing keys - if from_user_id == user_id: - key = yield self.store.get_e2e_cross_signing_key( - user_id, "user_signing", from_user_id - ) - if key: - user_signing_keys[user_id] = key - - return { - "master_keys": master_keys, - "self_signing_keys": self_signing_keys, - "user_signing_keys": user_signing_keys, - } + # Currently a stub, implementation coming in https://github.com/matrix-org/synapse/pull/6486 + return defer.succeed( + { + "master_keys": master_keys, + "self_signing_keys": self_signing_keys, + "user_signing_keys": user_signing_keys, + } + ) @trace @defer.inlineCallbacks diff --git a/sytest-blacklist b/sytest-blacklist index 411cce0692..79b2d4402a 100644 --- a/sytest-blacklist +++ b/sytest-blacklist @@ -33,3 +33,6 @@ New federated private chats get full presence information (SYN-115) # Blacklisted due to https://github.com/matrix-org/matrix-doc/pull/2314 removing # this requirement from the spec Inbound federation of state requires event_id as a mandatory paramater + +# Blacklisted until https://github.com/matrix-org/synapse/pull/6486 lands +Can upload self-signing keys diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index 854eb6c024..fdfa2cbbc4 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -183,6 +183,10 @@ class E2eKeysHandlerTestCase(unittest.TestCase): ) self.assertDictEqual(devices["master_keys"], {local_user: keys2["master_key"]}) + test_replace_master_key.skip = ( + "Disabled waiting on #https://github.com/matrix-org/synapse/pull/6486" + ) + @defer.inlineCallbacks def test_reupload_signatures(self): """re-uploading a signature should not fail""" @@ -503,3 +507,7 @@ class E2eKeysHandlerTestCase(unittest.TestCase): ], other_master_key["signatures"][local_user]["ed25519:" + usersigning_pubkey], ) + + test_upload_signatures.skip = ( + "Disabled waiting on #https://github.com/matrix-org/synapse/pull/6486" + ) -- cgit 1.5.1