diff options
Diffstat (limited to 'synapse')
41 files changed, 1845 insertions, 1714 deletions
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index ca96da6a4a..7fa91a3b11 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -57,6 +57,7 @@ from synapse.rest.client.v1.room import ( RoomStateRestServlet, ) from synapse.rest.client.v1.voip import VoipRestServlet +from synapse.rest.client.v2_alpha import groups from synapse.rest.client.v2_alpha.account import ThreepidRestServlet from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet from synapse.rest.client.v2_alpha.register import RegisterRestServlet @@ -124,6 +125,8 @@ class ClientReaderServer(HomeServer): PushRuleRestServlet(self).register(resource) VersionsRestServlet(self).register(resource) + groups.register_servlets(self, resource) + resources.update({"/_matrix/client": resource}) root_resource = create_resource_tree(resources, NoResource()) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 1f1cea1416..d055d11b23 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -33,8 +33,10 @@ from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.groups import SlavedGroupServerStore from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.profile import SlavedProfileStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore @@ -66,6 +68,8 @@ class FederationReaderSlavedStore( SlavedEventStore, SlavedKeyStore, SlavedRegistrationStore, + SlavedGroupServerStore, + SlavedDeviceStore, RoomStore, DirectoryStore, SlavedTransactionStore, diff --git a/synapse/config/tls.py b/synapse/config/tls.py index 2e9e478a2a..2514b0713d 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -109,6 +109,8 @@ class TlsConfig(Config): fed_whitelist_entries = config.get( "federation_certificate_verification_whitelist", [] ) + if fed_whitelist_entries is None: + fed_whitelist_entries = [] # Support globs (*) in whitelist values self.federation_certificate_verification_whitelist = [] # type: List[str] diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index f813fa2fe7..a842661a90 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -16,13 +16,13 @@ import os from distutils.util import strtobool +from typing import Optional, Type import six from unpaddedbase64 import encode_base64 -from synapse.api.errors import UnsupportedRoomVersionError -from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions +from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions from synapse.types import JsonDict from synapse.util.caches import intern_dict from synapse.util.frozenutils import freeze @@ -189,9 +189,15 @@ class EventBase(object): redacts = _event_dict_property("redacts", None) room_id = _event_dict_property("room_id") sender = _event_dict_property("sender") + state_key = _event_dict_property("state_key") + type = _event_dict_property("type") user_id = _event_dict_property("sender") @property + def event_id(self) -> str: + raise NotImplementedError() + + @property def membership(self): return self.content["membership"] @@ -281,10 +287,7 @@ class FrozenEvent(EventBase): else: frozen_dict = event_dict - self.event_id = event_dict["event_id"] - self.type = event_dict["type"] - if "state_key" in event_dict: - self.state_key = event_dict["state_key"] + self._event_id = event_dict["event_id"] super(FrozenEvent, self).__init__( frozen_dict, @@ -294,6 +297,10 @@ class FrozenEvent(EventBase): rejected_reason=rejected_reason, ) + @property + def event_id(self) -> str: + return self._event_id + def __str__(self): return self.__repr__() @@ -332,9 +339,6 @@ class FrozenEventV2(EventBase): frozen_dict = event_dict self._event_id = None - self.type = event_dict["type"] - if "state_key" in event_dict: - self.state_key = event_dict["state_key"] super(FrozenEventV2, self).__init__( frozen_dict, @@ -404,28 +408,7 @@ class FrozenEventV3(FrozenEventV2): return self._event_id -def room_version_to_event_format(room_version): - """Converts a room version string to the event format - - Args: - room_version (str) - - Returns: - int - - Raises: - UnsupportedRoomVersionError if the room version is unknown - """ - v = KNOWN_ROOM_VERSIONS.get(room_version) - - if not v: - # this can happen if support is withdrawn for a room version - raise UnsupportedRoomVersionError() - - return v.event_format - - -def event_type_from_format_version(format_version): +def event_type_from_format_version(format_version: int) -> Type[EventBase]: """Returns the python type to use to construct an Event object for the given event format version. @@ -445,3 +428,14 @@ def event_type_from_format_version(format_version): return FrozenEventV3 else: raise Exception("No event format %r" % (format_version,)) + + +def make_event_from_dict( + event_dict: JsonDict, + room_version: RoomVersion = RoomVersions.V1, + internal_metadata_dict: JsonDict = {}, + rejected_reason: Optional[str] = None, +) -> EventBase: + """Construct an EventBase from the given event dict""" + event_type = event_type_from_format_version(room_version.event_format) + return event_type(event_dict, internal_metadata_dict, rejected_reason) diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 8d63ad6dc3..a0c4a40c27 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -28,11 +28,7 @@ from synapse.api.room_versions import ( RoomVersion, ) from synapse.crypto.event_signing import add_hashes_and_signatures -from synapse.events import ( - EventBase, - _EventInternalMetadata, - event_type_from_format_version, -) +from synapse.events import EventBase, _EventInternalMetadata, make_event_from_dict from synapse.types import EventID, JsonDict from synapse.util import Clock from synapse.util.stringutils import random_string @@ -256,8 +252,8 @@ def create_local_event_from_event_dict( event_dict.setdefault("signatures", {}) add_hashes_and_signatures(room_version, event_dict, hostname, signing_key) - return event_type_from_format_version(format_version)( - event_dict, internal_metadata_dict=internal_metadata_dict + return make_event_from_dict( + event_dict, room_version, internal_metadata_dict=internal_metadata_dict ) diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py index 86f7e5f8aa..459132d388 100644 --- a/synapse/events/third_party_rules.py +++ b/synapse/events/third_party_rules.py @@ -74,15 +74,16 @@ class ThirdPartyEventRules(object): is_requester_admin (bool): If the requester is an admin Returns: - defer.Deferred + defer.Deferred[bool]: Whether room creation is allowed or denied. """ if self.third_party_rules is None: - return + return True - yield self.third_party_rules.on_create_room( + ret = yield self.third_party_rules.on_create_room( requester, config, is_requester_admin ) + return ret @defer.inlineCallbacks def check_threepid_can_be_invited(self, medium, address, room_id): diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 0e22183280..eea64c1c9f 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -22,9 +23,13 @@ from twisted.internet.defer import DeferredList from synapse.api.constants import MAX_DEPTH, EventTypes, Membership from synapse.api.errors import Codes, SynapseError -from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions +from synapse.api.room_versions import ( + KNOWN_ROOM_VERSIONS, + EventFormatVersions, + RoomVersion, +) from synapse.crypto.event_signing import check_event_content_hash -from synapse.events import event_type_from_format_version +from synapse.events import EventBase, make_event_from_dict from synapse.events.utils import prune_event from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import ( @@ -33,7 +38,7 @@ from synapse.logging.context import ( make_deferred_yieldable, preserve_fn, ) -from synapse.types import get_domain_from_id +from synapse.types import JsonDict, get_domain_from_id from synapse.util import unwrapFirstError logger = logging.getLogger(__name__) @@ -342,16 +347,15 @@ def _is_invite_via_3pid(event): ) -def event_from_pdu_json(pdu_json, event_format_version, outlier=False): - """Construct a FrozenEvent from an event json received over federation +def event_from_pdu_json( + pdu_json: JsonDict, room_version: RoomVersion, outlier: bool = False +) -> EventBase: + """Construct an EventBase from an event json received over federation Args: - pdu_json (object): pdu as received over federation - event_format_version (int): The event format version - outlier (bool): True to mark this event as an outlier - - Returns: - FrozenEvent + pdu_json: pdu as received over federation + room_version: The version of the room this event belongs to + outlier: True to mark this event as an outlier Raises: SynapseError: if the pdu is missing required fields or is otherwise @@ -370,8 +374,7 @@ def event_from_pdu_json(pdu_json, event_format_version, outlier=False): elif depth > MAX_DEPTH: raise SynapseError(400, "Depth too large", Codes.BAD_JSON) - event = event_type_from_format_version(event_format_version)(pdu_json) - + event = make_event_from_dict(pdu_json, room_version) event.internal_metadata.outlier = outlier return event diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f99d17a7de..4870e39652 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -17,7 +17,18 @@ import copy import itertools import logging -from typing import Dict, Iterable +from typing import ( + Any, + Awaitable, + Callable, + Dict, + Iterable, + List, + Optional, + Sequence, + Tuple, + TypeVar, +) from prometheus_client import Counter @@ -35,12 +46,14 @@ from synapse.api.errors import ( from synapse.api.room_versions import ( KNOWN_ROOM_VERSIONS, EventFormatVersions, + RoomVersion, RoomVersions, ) -from synapse.events import builder, room_version_to_event_format +from synapse.events import EventBase, builder from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.logging.context import make_deferred_yieldable from synapse.logging.utils import log_function +from synapse.types import JsonDict from synapse.util import unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.retryutils import NotRetryingDestination @@ -52,6 +65,8 @@ sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["t PDU_RETRY_TIME_MS = 1 * 60 * 1000 +T = TypeVar("T") + class InvalidResponseError(RuntimeError): """Helper for _try_destination_list: indicates that the server returned a response @@ -170,21 +185,17 @@ class FederationClient(FederationBase): sent_queries_counter.labels("client_one_time_keys").inc() return self.transport_layer.claim_client_keys(destination, content, timeout) - @defer.inlineCallbacks - @log_function - def backfill(self, dest, room_id, limit, extremities): - """Requests some more historic PDUs for the given context from the + async def backfill( + self, dest: str, room_id: str, limit: int, extremities: Iterable[str] + ) -> List[EventBase]: + """Requests some more historic PDUs for the given room from the given destination server. Args: dest (str): The remote homeserver to ask. room_id (str): The room_id to backfill. - limit (int): The maximum number of PDUs to return. - extremities (list): List of PDU id and origins of the first pdus - we have seen from the context - - Returns: - Deferred: Results in the received PDUs. + limit (int): The maximum number of events to return. + extremities (list): our current backwards extremities, to backfill from """ logger.debug("backfill extrem=%s", extremities) @@ -192,34 +203,37 @@ class FederationClient(FederationBase): if not extremities: return - transaction_data = yield self.transport_layer.backfill( + transaction_data = await self.transport_layer.backfill( dest, room_id, extremities, limit ) logger.debug("backfill transaction_data=%r", transaction_data) - room_version = yield self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) + room_version = await self.store.get_room_version(room_id) pdus = [ - event_from_pdu_json(p, format_ver, outlier=False) + event_from_pdu_json(p, room_version, outlier=False) for p in transaction_data["pdus"] ] # FIXME: We should handle signature failures more gracefully. - pdus[:] = yield make_deferred_yieldable( + pdus[:] = await make_deferred_yieldable( defer.gatherResults( - self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True + self._check_sigs_and_hashes(room_version.identifier, pdus), + consumeErrors=True, ).addErrback(unwrapFirstError) ) return pdus - @defer.inlineCallbacks - @log_function - def get_pdu( - self, destinations, event_id, room_version, outlier=False, timeout=None - ): + async def get_pdu( + self, + destinations: Iterable[str], + event_id: str, + room_version: RoomVersion, + outlier: bool = False, + timeout: Optional[int] = None, + ) -> Optional[EventBase]: """Requests the PDU with given origin and ID from the remote home servers. @@ -227,18 +241,17 @@ class FederationClient(FederationBase): one succeeds. Args: - destinations (list): Which homeservers to query - event_id (str): event to fetch - room_version (str): version of the room - outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if + destinations: Which homeservers to query + event_id: event to fetch + room_version: version of the room + outlier: Indicates whether the PDU is an `outlier`, i.e. if it's from an arbitary point in the context as opposed to part of the current block of PDUs. Defaults to `False` - timeout (int): How long to try (in ms) each destination for before + timeout: How long to try (in ms) each destination for before moving to the next destination. None indicates no timeout. Returns: - Deferred: Results in the requested PDU, or None if we were unable to find - it. + The requested PDU, or None if we were unable to find it. """ # TODO: Rate limit the number of times we try and get the same event. @@ -249,8 +262,6 @@ class FederationClient(FederationBase): pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) - format_ver = room_version_to_event_format(room_version) - signed_pdu = None for destination in destinations: now = self._clock.time_msec() @@ -259,7 +270,7 @@ class FederationClient(FederationBase): continue try: - transaction_data = yield self.transport_layer.get_event( + transaction_data = await self.transport_layer.get_event( destination, event_id, timeout=timeout ) @@ -271,7 +282,7 @@ class FederationClient(FederationBase): ) pdu_list = [ - event_from_pdu_json(p, format_ver, outlier=outlier) + event_from_pdu_json(p, room_version, outlier=outlier) for p in transaction_data["pdus"] ] @@ -279,7 +290,9 @@ class FederationClient(FederationBase): pdu = pdu_list[0] # Check signatures are correct. - signed_pdu = yield self._check_sigs_and_hash(room_version, pdu) + signed_pdu = await self._check_sigs_and_hash( + room_version.identifier, pdu + ) break @@ -309,15 +322,16 @@ class FederationClient(FederationBase): return signed_pdu - @defer.inlineCallbacks - def get_room_state_ids(self, destination: str, room_id: str, event_id: str): + async def get_room_state_ids( + self, destination: str, room_id: str, event_id: str + ) -> Tuple[List[str], List[str]]: """Calls the /state_ids endpoint to fetch the state at a particular point in the room, and the auth events for the given event Returns: - Tuple[List[str], List[str]]: a tuple of (state event_ids, auth event_ids) + a tuple of (state event_ids, auth event_ids) """ - result = yield self.transport_layer.get_room_state_ids( + result = await self.transport_layer.get_room_state_ids( destination, room_id, event_id=event_id ) @@ -331,37 +345,39 @@ class FederationClient(FederationBase): return state_event_ids, auth_event_ids - @defer.inlineCallbacks - @log_function - def get_event_auth(self, destination, room_id, event_id): - res = yield self.transport_layer.get_event_auth(destination, room_id, event_id) + async def get_event_auth(self, destination, room_id, event_id): + res = await self.transport_layer.get_event_auth(destination, room_id, event_id) - room_version = yield self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) + room_version = await self.store.get_room_version(room_id) auth_chain = [ - event_from_pdu_json(p, format_ver, outlier=True) for p in res["auth_chain"] + event_from_pdu_json(p, room_version, outlier=True) + for p in res["auth_chain"] ] - signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True, room_version=room_version + signed_auth = await self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True, room_version=room_version.identifier ) signed_auth.sort(key=lambda e: e.depth) return signed_auth - @defer.inlineCallbacks - def _try_destination_list(self, description, destinations, callback): + async def _try_destination_list( + self, + description: str, + destinations: Iterable[str], + callback: Callable[[str], Awaitable[T]], + ) -> T: """Try an operation on a series of servers, until it succeeds Args: - description (unicode): description of the operation we're doing, for logging + description: description of the operation we're doing, for logging - destinations (Iterable[unicode]): list of server_names to try + destinations: list of server_names to try - callback (callable): Function to run for each server. Passed a single - argument: the server_name to try. May return a deferred. + callback: Function to run for each server. Passed a single + argument: the server_name to try. If the callback raises a CodeMessageException with a 300/400 code, attempts to perform the operation stop immediately and the exception is @@ -372,7 +388,7 @@ class FederationClient(FederationBase): suppressed if the exception is an InvalidResponseError. Returns: - The [Deferred] result of callback, if it succeeds + The result of callback, if it succeeds Raises: SynapseError if the chosen remote server returns a 300/400 code, or @@ -383,7 +399,7 @@ class FederationClient(FederationBase): continue try: - res = yield callback(destination) + res = await callback(destination) return res except InvalidResponseError as e: logger.warning("Failed to %s via %s: %s", description, destination, e) @@ -402,12 +418,12 @@ class FederationClient(FederationBase): ) except Exception: logger.warning( - "Failed to %s via %s", description, destination, exc_info=1 + "Failed to %s via %s", description, destination, exc_info=True ) raise SynapseError(502, "Failed to %s via any server" % (description,)) - def make_membership_event( + async def make_membership_event( self, destinations: Iterable[str], room_id: str, @@ -415,7 +431,7 @@ class FederationClient(FederationBase): membership: str, content: dict, params: Dict[str, str], - ): + ) -> Tuple[str, EventBase, RoomVersion]: """ Creates an m.room.member event, with context, without participating in the room. @@ -436,19 +452,19 @@ class FederationClient(FederationBase): content: Any additional data to put into the content field of the event. params: Query parameters to include in the request. - Return: - Deferred[Tuple[str, FrozenEvent, RoomVersion]]: resolves to a tuple of + + Returns: `(origin, event, room_version)` where origin is the remote homeserver which generated the event, and room_version is the version of the room. - Fails with a `UnsupportedRoomVersionError` if remote responds with - a room version we don't understand. + Raises: + UnsupportedRoomVersionError: if remote responds with + a room version we don't understand. - Fails with a ``SynapseError`` if the chosen remote server - returns a 300/400 code. + SynapseError: if the chosen remote server returns a 300/400 code. - Fails with a ``RuntimeError`` if no servers were reachable. + RuntimeError: if no servers were reachable. """ valid_memberships = {Membership.JOIN, Membership.LEAVE} if membership not in valid_memberships: @@ -457,9 +473,8 @@ class FederationClient(FederationBase): % (membership, ",".join(valid_memberships)) ) - @defer.inlineCallbacks - def send_request(destination): - ret = yield self.transport_layer.make_membership_event( + async def send_request(destination: str) -> Tuple[str, EventBase, RoomVersion]: + ret = await self.transport_layer.make_membership_event( destination, room_id, user_id, membership, params ) @@ -492,88 +507,83 @@ class FederationClient(FederationBase): event_dict=pdu_dict, ) - return (destination, ev, room_version) + return destination, ev, room_version - return self._try_destination_list( + return await self._try_destination_list( "make_" + membership, destinations, send_request ) - def send_join(self, destinations, pdu, event_format_version): + async def send_join( + self, destinations: Iterable[str], pdu: EventBase, room_version: RoomVersion + ) -> Dict[str, Any]: """Sends a join event to one of a list of homeservers. Doing so will cause the remote server to add the event to the graph, and send the event out to the rest of the federation. Args: - destinations (str): Candidate homeservers which are probably + destinations: Candidate homeservers which are probably participating in the room. - pdu (BaseEvent): event to be sent - event_format_version (int): The event format version + pdu: event to be sent + room_version: the version of the room (according to the server that + did the make_join) - Return: - Deferred: resolves to a dict with members ``origin`` (a string - giving the serer the event was sent to, ``state`` (?) and + Returns: + a dict with members ``origin`` (a string + giving the server the event was sent to, ``state`` (?) and ``auth_chain``. - Fails with a ``SynapseError`` if the chosen remote server - returns a 300/400 code. + Raises: + SynapseError: if the chosen remote server returns a 300/400 code. - Fails with a ``RuntimeError`` if no servers were reachable. + RuntimeError: if no servers were reachable. """ - def check_authchain_validity(signed_auth_chain): - for e in signed_auth_chain: - if e.type == EventTypes.Create: - create_event = e - break - else: - raise InvalidResponseError("no %s in auth chain" % (EventTypes.Create,)) - - # the room version should be sane. - room_version = create_event.content.get("room_version", "1") - if room_version not in KNOWN_ROOM_VERSIONS: - # This shouldn't be possible, because the remote server should have - # rejected the join attempt during make_join. - raise InvalidResponseError( - "room appears to have unsupported version %s" % (room_version,) - ) - - @defer.inlineCallbacks - def send_request(destination): - content = yield self._do_send_join(destination, pdu) + async def send_request(destination) -> Dict[str, Any]: + content = await self._do_send_join(destination, pdu) logger.debug("Got content: %s", content) state = [ - event_from_pdu_json(p, event_format_version, outlier=True) + event_from_pdu_json(p, room_version, outlier=True) for p in content.get("state", []) ] auth_chain = [ - event_from_pdu_json(p, event_format_version, outlier=True) + event_from_pdu_json(p, room_version, outlier=True) for p in content.get("auth_chain", []) ] pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)} - room_version = None + create_event = None for e in state: if (e.type, e.state_key) == (EventTypes.Create, ""): - room_version = e.content.get( - "room_version", RoomVersions.V1.identifier - ) + create_event = e break - if room_version is None: + if create_event is None: # If the state doesn't have a create event then the room is # invalid, and it would fail auth checks anyway. raise SynapseError(400, "No create event in state") - valid_pdus = yield self._check_sigs_and_hash_and_fetch( + # the room version should be sane. + create_room_version = create_event.content.get( + "room_version", RoomVersions.V1.identifier + ) + if create_room_version != room_version.identifier: + # either the server that fulfilled the make_join, or the server that is + # handling the send_join, is lying. + raise InvalidResponseError( + "Unexpected room version %s in create event" + % (create_room_version,) + ) + + valid_pdus = await self._check_sigs_and_hash_and_fetch( destination, list(pdus.values()), outlier=True, - room_version=room_version, + room_version=room_version.identifier, ) valid_pdus_map = {p.event_id: p for p in valid_pdus} @@ -597,7 +607,17 @@ class FederationClient(FederationBase): for s in signed_state: s.internal_metadata = copy.deepcopy(s.internal_metadata) - check_authchain_validity(signed_auth) + # double-check that the same create event has ended up in the auth chain + auth_chain_create_events = [ + e.event_id + for e in signed_auth + if (e.type, e.state_key) == (EventTypes.Create, "") + ] + if auth_chain_create_events != [create_event.event_id]: + raise InvalidResponseError( + "Unexpected create event(s) in auth chain" + % (auth_chain_create_events,) + ) return { "state": signed_state, @@ -605,14 +625,13 @@ class FederationClient(FederationBase): "origin": destination, } - return self._try_destination_list("send_join", destinations, send_request) + return await self._try_destination_list("send_join", destinations, send_request) - @defer.inlineCallbacks - def _do_send_join(self, destination, pdu): + async def _do_send_join(self, destination: str, pdu: EventBase): time_now = self._clock.time_msec() try: - content = yield self.transport_layer.send_join_v2( + content = await self.transport_layer.send_join_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -634,7 +653,7 @@ class FederationClient(FederationBase): logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API") - resp = yield self.transport_layer.send_join_v1( + resp = await self.transport_layer.send_join_v1( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -645,51 +664,45 @@ class FederationClient(FederationBase): # content. return resp[1] - @defer.inlineCallbacks - def send_invite(self, destination, room_id, event_id, pdu): - room_version = yield self.store.get_room_version_id(room_id) + async def send_invite( + self, destination: str, room_id: str, event_id: str, pdu: EventBase, + ) -> EventBase: + room_version = await self.store.get_room_version(room_id) - content = yield self._do_send_invite(destination, pdu, room_version) + content = await self._do_send_invite(destination, pdu, room_version) pdu_dict = content["event"] logger.debug("Got response to send_invite: %s", pdu_dict) - room_version = yield self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) - - pdu = event_from_pdu_json(pdu_dict, format_ver) + pdu = event_from_pdu_json(pdu_dict, room_version) # Check signatures are correct. - pdu = yield self._check_sigs_and_hash(room_version, pdu) + pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) # FIXME: We should handle signature failures more gracefully. return pdu - @defer.inlineCallbacks - def _do_send_invite(self, destination, pdu, room_version): + async def _do_send_invite( + self, destination: str, pdu: EventBase, room_version: RoomVersion + ) -> JsonDict: """Actually sends the invite, first trying v2 API and falling back to v1 API if necessary. - Args: - destination (str): Target server - pdu (FrozenEvent) - room_version (str) - Returns: - dict: The event as a dict as returned by the remote server + The event as a dict as returned by the remote server """ time_now = self._clock.time_msec() try: - content = yield self.transport_layer.send_invite_v2( + content = await self.transport_layer.send_invite_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, content={ "event": pdu.get_pdu_json(time_now), - "room_version": room_version, + "room_version": room_version.identifier, "invite_room_state": pdu.unsigned.get("invite_room_state", []), }, ) @@ -707,8 +720,7 @@ class FederationClient(FederationBase): # Otherwise, we assume that the remote server doesn't understand # the v2 invite API. That's ok provided the room uses old-style event # IDs. - v = KNOWN_ROOM_VERSIONS.get(room_version) - if v.event_format != EventFormatVersions.V1: + if room_version.event_format != EventFormatVersions.V1: raise SynapseError( 400, "User's homeserver does not support this room version", @@ -722,7 +734,7 @@ class FederationClient(FederationBase): # Didn't work, try v1 API. # Note the v1 API returns a tuple of `(200, content)` - _, content = yield self.transport_layer.send_invite_v1( + _, content = await self.transport_layer.send_invite_v1( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -730,7 +742,7 @@ class FederationClient(FederationBase): ) return content - def send_leave(self, destinations, pdu): + async def send_leave(self, destinations: Iterable[str], pdu: EventBase) -> None: """Sends a leave event to one of a list of homeservers. Doing so will cause the remote server to add the event to the graph, @@ -739,34 +751,29 @@ class FederationClient(FederationBase): This is mostly useful to reject received invites. Args: - destinations (str): Candidate homeservers which are probably + destinations: Candidate homeservers which are probably participating in the room. - pdu (BaseEvent): event to be sent + pdu: event to be sent - Return: - Deferred: resolves to None. - - Fails with a ``SynapseError`` if the chosen remote server - returns a 300/400 code. + Raises: + SynapseError if the chosen remote server returns a 300/400 code. - Fails with a ``RuntimeError`` if no servers were reachable. + RuntimeError if no servers were reachable. """ - @defer.inlineCallbacks - def send_request(destination): - content = yield self._do_send_leave(destination, pdu) - + async def send_request(destination: str) -> None: + content = await self._do_send_leave(destination, pdu) logger.debug("Got content: %s", content) - return None - return self._try_destination_list("send_leave", destinations, send_request) + return await self._try_destination_list( + "send_leave", destinations, send_request + ) - @defer.inlineCallbacks - def _do_send_leave(self, destination, pdu): + async def _do_send_leave(self, destination, pdu): time_now = self._clock.time_msec() try: - content = yield self.transport_layer.send_leave_v2( + content = await self.transport_layer.send_leave_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -788,7 +795,7 @@ class FederationClient(FederationBase): logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API") - resp = yield self.transport_layer.send_leave_v1( + resp = await self.transport_layer.send_leave_v1( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -820,34 +827,33 @@ class FederationClient(FederationBase): third_party_instance_id=third_party_instance_id, ) - @defer.inlineCallbacks - def get_missing_events( + async def get_missing_events( self, - destination, - room_id, - earliest_events_ids, - latest_events, - limit, - min_depth, - timeout, - ): + destination: str, + room_id: str, + earliest_events_ids: Sequence[str], + latest_events: Iterable[EventBase], + limit: int, + min_depth: int, + timeout: int, + ) -> List[EventBase]: """Tries to fetch events we are missing. This is called when we receive an event without having received all of its ancestors. Args: - destination (str) - room_id (str) - earliest_events_ids (list): List of event ids. Effectively the + destination + room_id + earliest_events_ids: List of event ids. Effectively the events we expected to receive, but haven't. `get_missing_events` should only return events that didn't happen before these. - latest_events (list): List of events we have received that we don't + latest_events: List of events we have received that we don't have all previous events for. - limit (int): Maximum number of events to return. - min_depth (int): Minimum depth of events tor return. - timeout (int): Max time to wait in ms + limit: Maximum number of events to return. + min_depth: Minimum depth of events to return. + timeout: Max time to wait in ms """ try: - content = yield self.transport_layer.get_missing_events( + content = await self.transport_layer.get_missing_events( destination=destination, room_id=room_id, earliest_events=earliest_events_ids, @@ -857,15 +863,14 @@ class FederationClient(FederationBase): timeout=timeout, ) - room_version = yield self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) + room_version = await self.store.get_room_version(room_id) events = [ - event_from_pdu_json(e, format_ver) for e in content.get("events", []) + event_from_pdu_json(e, room_version) for e in content.get("events", []) ] - signed_events = yield self._check_sigs_and_hash_and_fetch( - destination, events, outlier=False, room_version=room_version + signed_events = await self._check_sigs_and_hash_and_fetch( + destination, events, outlier=False, room_version=room_version.identifier ) except HttpResponseException as e: if not e.code == 400: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index a4c97ed458..7f9da49326 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -38,7 +38,6 @@ from synapse.api.errors import ( UnsupportedRoomVersionError, ) from synapse.api.room_versions import KNOWN_ROOM_VERSIONS -from synapse.events import room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction @@ -54,7 +53,7 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, ReplicationGetQueryRestServlet, ) -from synapse.types import get_domain_from_id +from synapse.types import JsonDict, get_domain_from_id from synapse.util import glob_to_regex, unwrapFirstError from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache @@ -82,6 +81,8 @@ class FederationServer(FederationBase): self.handler = hs.get_handlers().federation_handler self.state = hs.get_state_handler() + self.device_handler = hs.get_device_handler() + self._server_linearizer = Linearizer("fed_server") self._transaction_linearizer = Linearizer("fed_txn_handler") @@ -234,24 +235,17 @@ class FederationServer(FederationBase): continue try: - room_version = await self.store.get_room_version_id(room_id) + room_version = await self.store.get_room_version(room_id) except NotFoundError: logger.info("Ignoring PDU for unknown room_id: %s", room_id) continue - - try: - format_ver = room_version_to_event_format(room_version) - except UnsupportedRoomVersionError: + except UnsupportedRoomVersionError as e: # this can happen if support for a given room version is withdrawn, # so that we still get events for said room. - logger.info( - "Ignoring PDU for room %s with unknown version %s", - room_id, - room_version, - ) + logger.info("Ignoring PDU: %s", e) continue - event = event_from_pdu_json(p, format_ver) + event = event_from_pdu_json(p, room_version) pdus_by_room.setdefault(room_id, []).append(event) pdu_results = {} @@ -302,7 +296,12 @@ class FederationServer(FederationBase): async def _process_edu(edu_dict): received_edus_counter.inc() - edu = Edu(**edu_dict) + edu = Edu( + origin=origin, + destination=self.server_name, + edu_type=edu_dict["edu_type"], + content=edu_dict["content"], + ) await self.registry.on_edu(edu.edu_type, origin, edu.content) await concurrently_execute( @@ -396,20 +395,21 @@ class FederationServer(FederationBase): time_now = self._clock.time_msec() return {"event": pdu.get_pdu_json(time_now), "room_version": room_version} - async def on_invite_request(self, origin, content, room_version): - if room_version not in KNOWN_ROOM_VERSIONS: + async def on_invite_request( + self, origin: str, content: JsonDict, room_version_id: str + ): + room_version = KNOWN_ROOM_VERSIONS.get(room_version_id) + if not room_version: raise SynapseError( 400, "Homeserver does not support this room version", Codes.UNSUPPORTED_ROOM_VERSION, ) - format_ver = room_version_to_event_format(room_version) - - pdu = event_from_pdu_json(content, format_ver) + pdu = event_from_pdu_json(content, room_version) origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, pdu.room_id) - pdu = await self._check_sigs_and_hash(room_version, pdu) + pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) ret_pdu = await self.handler.on_invite_request(origin, pdu, room_version) time_now = self._clock.time_msec() return {"event": ret_pdu.get_pdu_json(time_now)} @@ -417,16 +417,15 @@ class FederationServer(FederationBase): async def on_send_join_request(self, origin, content, room_id): logger.debug("on_send_join_request: content: %s", content) - room_version = await self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) - pdu = event_from_pdu_json(content, format_ver) + room_version = await self.store.get_room_version(room_id) + pdu = event_from_pdu_json(content, room_version) origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, pdu.room_id) logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures) - pdu = await self._check_sigs_and_hash(room_version, pdu) + pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) res_pdus = await self.handler.on_send_join_request(origin, pdu) time_now = self._clock.time_msec() @@ -448,16 +447,15 @@ class FederationServer(FederationBase): async def on_send_leave_request(self, origin, content, room_id): logger.debug("on_send_leave_request: content: %s", content) - room_version = await self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) - pdu = event_from_pdu_json(content, format_ver) + room_version = await self.store.get_room_version(room_id) + pdu = event_from_pdu_json(content, room_version) origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, pdu.room_id) logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures) - pdu = await self._check_sigs_and_hash(room_version, pdu) + pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) await self.handler.on_send_leave_request(origin, pdu) return {} @@ -495,15 +493,14 @@ class FederationServer(FederationBase): origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) - room_version = await self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) + room_version = await self.store.get_room_version(room_id) auth_chain = [ - event_from_pdu_json(e, format_ver) for e in content["auth_chain"] + event_from_pdu_json(e, room_version) for e in content["auth_chain"] ] signed_auth = await self._check_sigs_and_hash_and_fetch( - origin, auth_chain, outlier=True, room_version=room_version + origin, auth_chain, outlier=True, room_version=room_version.identifier ) ret = await self.handler.on_query_auth( @@ -528,8 +525,9 @@ class FederationServer(FederationBase): def on_query_client_keys(self, origin, content): return self.on_query_request("client_keys", content) - def on_query_user_devices(self, origin, user_id): - return self.on_query_request("user_devices", user_id) + async def on_query_user_devices(self, origin: str, user_id: str): + keys = await self.device_handler.on_federation_query_user_devices(user_id) + return 200, keys @trace async def on_claim_client_keys(self, origin, content): @@ -570,7 +568,7 @@ class FederationServer(FederationBase): origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) - logger.info( + logger.debug( "on_get_missing_events: earliest_events: %r, latest_events: %r," " limit: %d", earliest_events, @@ -583,11 +581,11 @@ class FederationServer(FederationBase): ) if len(missing_events) < 5: - logger.info( + logger.debug( "Returning %d events: %r", len(missing_events), missing_events ) else: - logger.info("Returning %d events", len(missing_events)) + logger.debug("Returning %d events", len(missing_events)) time_now = self._clock.time_msec() diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 36c83c3027..233cb33daf 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import Dict, Hashable, Iterable, List, Optional, Set from six import itervalues @@ -23,6 +24,7 @@ from twisted.internet import defer import synapse import synapse.metrics +from synapse.events import EventBase from synapse.federation.sender.per_destination_queue import PerDestinationQueue from synapse.federation.sender.transaction_manager import TransactionManager from synapse.federation.units import Edu @@ -39,6 +41,8 @@ from synapse.metrics import ( events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.storage.presence import UserPresenceState +from synapse.types import ReadReceipt from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) @@ -68,7 +72,7 @@ class FederationSender(object): self._transaction_manager = TransactionManager(hs) # map from destination to PerDestinationQueue - self._per_destination_queues = {} # type: dict[str, PerDestinationQueue] + self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue] LaterGauge( "synapse_federation_transaction_queue_pending_destinations", @@ -84,7 +88,7 @@ class FederationSender(object): # Map of user_id -> UserPresenceState for all the pending presence # to be sent out by user_id. Entries here get processed and put in # pending_presence_by_dest - self.pending_presence = {} + self.pending_presence = {} # type: Dict[str, UserPresenceState] LaterGauge( "synapse_federation_transaction_queue_pending_pdus", @@ -116,20 +120,17 @@ class FederationSender(object): # and that there is a pending call to _flush_rrs_for_room in the system. self._queues_awaiting_rr_flush_by_room = ( {} - ) # type: dict[str, set[PerDestinationQueue]] + ) # type: Dict[str, Set[PerDestinationQueue]] self._rr_txn_interval_per_room_ms = ( - 1000.0 / hs.get_config().federation_rr_transactions_per_room_per_second + 1000.0 / hs.config.federation_rr_transactions_per_room_per_second ) - def _get_per_destination_queue(self, destination): + def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue: """Get or create a PerDestinationQueue for the given destination Args: - destination (str): server_name of remote server - - Returns: - PerDestinationQueue + destination: server_name of remote server """ queue = self._per_destination_queues.get(destination) if not queue: @@ -137,7 +138,7 @@ class FederationSender(object): self._per_destination_queues[destination] = queue return queue - def notify_new_events(self, current_id): + def notify_new_events(self, current_id: int) -> None: """This gets called when we have some new events we might want to send out to other servers. """ @@ -151,13 +152,12 @@ class FederationSender(object): "process_event_queue_for_federation", self._process_event_queue_loop ) - @defer.inlineCallbacks - def _process_event_queue_loop(self): + async def _process_event_queue_loop(self) -> None: try: self._is_processing = True while True: - last_token = yield self.store.get_federation_out_pos("events") - next_token, events = yield self.store.get_all_new_events_stream( + last_token = await self.store.get_federation_out_pos("events") + next_token, events = await self.store.get_all_new_events_stream( last_token, self._last_poked_id, limit=100 ) @@ -166,8 +166,7 @@ class FederationSender(object): if not events and next_token >= self._last_poked_id: break - @defer.inlineCallbacks - def handle_event(event): + async def handle_event(event: EventBase) -> None: # Only send events for this server. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() is_mine = self.is_mine_id(event.sender) @@ -184,7 +183,7 @@ class FederationSender(object): # Otherwise if the last member on a server in a room is # banned then it won't receive the event because it won't # be in the room after the ban. - destinations = yield self.state.get_hosts_in_room_at_events( + destinations = await self.state.get_hosts_in_room_at_events( event.room_id, event_ids=event.prev_event_ids() ) except Exception: @@ -206,17 +205,16 @@ class FederationSender(object): self._send_pdu(event, destinations) - @defer.inlineCallbacks - def handle_room_events(events): + async def handle_room_events(events: Iterable[EventBase]) -> None: with Measure(self.clock, "handle_room_events"): for event in events: - yield handle_event(event) + await handle_event(event) - events_by_room = {} + events_by_room = {} # type: Dict[str, List[EventBase]] for event in events: events_by_room.setdefault(event.room_id, []).append(event) - yield make_deferred_yieldable( + await make_deferred_yieldable( defer.gatherResults( [ run_in_background(handle_room_events, evs) @@ -226,11 +224,11 @@ class FederationSender(object): ) ) - yield self.store.update_federation_out_pos("events", next_token) + await self.store.update_federation_out_pos("events", next_token) if events: now = self.clock.time_msec() - ts = yield self.store.get_received_ts(events[-1].event_id) + ts = await self.store.get_received_ts(events[-1].event_id) synapse.metrics.event_processing_lag.labels( "federation_sender" @@ -254,7 +252,7 @@ class FederationSender(object): finally: self._is_processing = False - def _send_pdu(self, pdu, destinations): + def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. @@ -276,11 +274,11 @@ class FederationSender(object): self._get_per_destination_queue(destination).send_pdu(pdu, order) @defer.inlineCallbacks - def send_read_receipt(self, receipt): + def send_read_receipt(self, receipt: ReadReceipt): """Send a RR to any other servers in the room Args: - receipt (synapse.types.ReadReceipt): receipt to be sent + receipt: receipt to be sent """ # Some background on the rate-limiting going on here. @@ -343,7 +341,7 @@ class FederationSender(object): else: queue.flush_read_receipts_for_room(room_id) - def _schedule_rr_flush_for_room(self, room_id, n_domains): + def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None: # that is going to cause approximately len(domains) transactions, so now back # off for that multiplied by RR_TXN_INTERVAL_PER_ROOM backoff_ms = self._rr_txn_interval_per_room_ms * n_domains @@ -352,7 +350,7 @@ class FederationSender(object): self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id) self._queues_awaiting_rr_flush_by_room[room_id] = set() - def _flush_rrs_for_room(self, room_id): + def _flush_rrs_for_room(self, room_id: str) -> None: queues = self._queues_awaiting_rr_flush_by_room.pop(room_id) logger.debug("Flushing RRs in %s to %s", room_id, queues) @@ -368,14 +366,11 @@ class FederationSender(object): @preserve_fn # the caller should not yield on this @defer.inlineCallbacks - def send_presence(self, states): + def send_presence(self, states: List[UserPresenceState]): """Send the new presence states to the appropriate destinations. This actually queues up the presence states ready for sending and triggers a background task to process them and send out the transactions. - - Args: - states (list(UserPresenceState)) """ if not self.hs.config.use_presence: # No-op if presence is disabled. @@ -412,11 +407,10 @@ class FederationSender(object): finally: self._processing_pending_presence = False - def send_presence_to_destinations(self, states, destinations): + def send_presence_to_destinations( + self, states: List[UserPresenceState], destinations: List[str] + ) -> None: """Send the given presence states to the given destinations. - - Args: - states (list[UserPresenceState]) destinations (list[str]) """ @@ -431,12 +425,9 @@ class FederationSender(object): @measure_func("txnqueue._process_presence") @defer.inlineCallbacks - def _process_presence_inner(self, states): + def _process_presence_inner(self, states: List[UserPresenceState]): """Given a list of states populate self.pending_presence_by_dest and poke to send a new transaction to each destination - - Args: - states (list(UserPresenceState)) """ hosts_and_states = yield get_interested_remotes(self.store, states, self.state) @@ -446,14 +437,20 @@ class FederationSender(object): continue self._get_per_destination_queue(destination).send_presence(states) - def build_and_send_edu(self, destination, edu_type, content, key=None): + def build_and_send_edu( + self, + destination: str, + edu_type: str, + content: dict, + key: Optional[Hashable] = None, + ): """Construct an Edu object, and queue it for sending Args: - destination (str): name of server to send to - edu_type (str): type of EDU to send - content (dict): content of EDU - key (Any|None): clobbering key for this edu + destination: name of server to send to + edu_type: type of EDU to send + content: content of EDU + key: clobbering key for this edu """ if destination == self.server_name: logger.info("Not sending EDU to ourselves") @@ -468,12 +465,12 @@ class FederationSender(object): self.send_edu(edu, key) - def send_edu(self, edu, key): + def send_edu(self, edu: Edu, key: Optional[Hashable]): """Queue an EDU for sending Args: - edu (Edu): edu to send - key (Any|None): clobbering key for this edu + edu: edu to send + key: clobbering key for this edu """ queue = self._get_per_destination_queue(edu.destination) if key: @@ -481,7 +478,7 @@ class FederationSender(object): else: queue.send_edu(edu) - def send_device_messages(self, destination): + def send_device_messages(self, destination: str): if destination == self.server_name: logger.warning("Not sending device update to ourselves") return @@ -501,5 +498,5 @@ class FederationSender(object): self._get_per_destination_queue(destination).attempt_new_transaction() - def get_current_token(self): + def get_current_token(self) -> int: return 0 diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 5012aaea35..e13cd20ffa 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -15,11 +15,11 @@ # limitations under the License. import datetime import logging +from typing import Dict, Hashable, Iterable, List, Tuple from prometheus_client import Counter -from twisted.internet import defer - +import synapse.server from synapse.api.errors import ( FederationDeniedError, HttpResponseException, @@ -31,7 +31,7 @@ from synapse.handlers.presence import format_user_presence_state from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.presence import UserPresenceState -from synapse.types import StateMap +from synapse.types import ReadReceipt from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter # This is defined in the Matrix spec and enforced by the receiver. @@ -56,13 +56,18 @@ class PerDestinationQueue(object): Manages the per-destination transmission queues. Args: - hs (synapse.HomeServer): - transaction_sender (TransactionManager): - destination (str): the server_name of the destination that we are managing + hs + transaction_sender + destination: the server_name of the destination that we are managing transmission for. """ - def __init__(self, hs, transaction_manager, destination): + def __init__( + self, + hs: "synapse.server.HomeServer", + transaction_manager: "synapse.federation.sender.TransactionManager", + destination: str, + ): self._server_name = hs.hostname self._clock = hs.get_clock() self._store = hs.get_datastore() @@ -72,20 +77,20 @@ class PerDestinationQueue(object): self.transmission_loop_running = False # a list of tuples of (pending pdu, order) - self._pending_pdus = [] # type: list[tuple[EventBase, int]] - self._pending_edus = [] # type: list[Edu] + self._pending_pdus = [] # type: List[Tuple[EventBase, int]] + self._pending_edus = [] # type: List[Edu] # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered # based on their key (e.g. typing events by room_id) # Map of (edu_type, key) -> Edu - self._pending_edus_keyed = {} # type: StateMap[Edu] + self._pending_edus_keyed = {} # type: Dict[Tuple[str, Hashable], Edu] # Map of user_id -> UserPresenceState of pending presence to be sent to this # destination - self._pending_presence = {} # type: dict[str, UserPresenceState] + self._pending_presence = {} # type: Dict[str, UserPresenceState] # room_id -> receipt_type -> user_id -> receipt_dict - self._pending_rrs = {} + self._pending_rrs = {} # type: Dict[str, Dict[str, Dict[str, dict]]] self._rrs_pending_flush = False # stream_id of last successfully sent to-device message. @@ -95,50 +100,50 @@ class PerDestinationQueue(object): # stream_id of last successfully sent device list update. self._last_device_list_stream_id = 0 - def __str__(self): + def __str__(self) -> str: return "PerDestinationQueue[%s]" % self._destination - def pending_pdu_count(self): + def pending_pdu_count(self) -> int: return len(self._pending_pdus) - def pending_edu_count(self): + def pending_edu_count(self) -> int: return ( len(self._pending_edus) + len(self._pending_presence) + len(self._pending_edus_keyed) ) - def send_pdu(self, pdu, order): + def send_pdu(self, pdu: EventBase, order: int) -> None: """Add a PDU to the queue, and start the transmission loop if neccessary Args: - pdu (EventBase): pdu to send - order (int): + pdu: pdu to send + order """ self._pending_pdus.append((pdu, order)) self.attempt_new_transaction() - def send_presence(self, states): + def send_presence(self, states: Iterable[UserPresenceState]) -> None: """Add presence updates to the queue. Start the transmission loop if neccessary. Args: - states (iterable[UserPresenceState]): presence to send + states: presence to send """ self._pending_presence.update({state.user_id: state for state in states}) self.attempt_new_transaction() - def queue_read_receipt(self, receipt): + def queue_read_receipt(self, receipt: ReadReceipt) -> None: """Add a RR to the list to be sent. Doesn't start the transmission loop yet (see flush_read_receipts_for_room) Args: - receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued + receipt: receipt to be queued """ self._pending_rrs.setdefault(receipt.room_id, {}).setdefault( receipt.receipt_type, {} )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data} - def flush_read_receipts_for_room(self, room_id): + def flush_read_receipts_for_room(self, room_id: str) -> None: # if we don't have any read-receipts for this room, it may be that we've already # sent them out, so we don't need to flush. if room_id not in self._pending_rrs: @@ -146,15 +151,15 @@ class PerDestinationQueue(object): self._rrs_pending_flush = True self.attempt_new_transaction() - def send_keyed_edu(self, edu, key): + def send_keyed_edu(self, edu: Edu, key: Hashable) -> None: self._pending_edus_keyed[(edu.edu_type, key)] = edu self.attempt_new_transaction() - def send_edu(self, edu): + def send_edu(self, edu) -> None: self._pending_edus.append(edu) self.attempt_new_transaction() - def attempt_new_transaction(self): + def attempt_new_transaction(self) -> None: """Try to start a new transaction to this destination If there is already a transaction in progress to this destination, @@ -177,23 +182,22 @@ class PerDestinationQueue(object): self._transaction_transmission_loop, ) - @defer.inlineCallbacks - def _transaction_transmission_loop(self): - pending_pdus = [] + async def _transaction_transmission_loop(self) -> None: + pending_pdus = [] # type: List[Tuple[EventBase, int]] try: self.transmission_loop_running = True # This will throw if we wouldn't retry. We do this here so we fail # quickly, but we will later check this again in the http client, # hence why we throw the result away. - yield get_retry_limiter(self._destination, self._clock, self._store) + await get_retry_limiter(self._destination, self._clock, self._store) pending_pdus = [] while True: # We have to keep 2 free slots for presence and rr_edus limit = MAX_EDUS_PER_TRANSACTION - 2 - device_update_edus, dev_list_id = yield self._get_device_update_edus( + device_update_edus, dev_list_id = await self._get_device_update_edus( limit ) @@ -202,7 +206,7 @@ class PerDestinationQueue(object): ( to_device_edus, device_stream_id, - ) = yield self._get_to_device_message_edus(limit) + ) = await self._get_to_device_message_edus(limit) pending_edus = device_update_edus + to_device_edus @@ -269,7 +273,7 @@ class PerDestinationQueue(object): # END CRITICAL SECTION - success = yield self._transaction_manager.send_new_transaction( + success = await self._transaction_manager.send_new_transaction( self._destination, pending_pdus, pending_edus ) if success: @@ -280,7 +284,7 @@ class PerDestinationQueue(object): # Remove the acknowledged device messages from the database # Only bother if we actually sent some device messages if to_device_edus: - yield self._store.delete_device_msgs_for_remote( + await self._store.delete_device_msgs_for_remote( self._destination, device_stream_id ) @@ -289,7 +293,7 @@ class PerDestinationQueue(object): logger.info( "Marking as sent %r %r", self._destination, dev_list_id ) - yield self._store.mark_as_sent_devices_by_remote( + await self._store.mark_as_sent_devices_by_remote( self._destination, dev_list_id ) @@ -334,7 +338,7 @@ class PerDestinationQueue(object): # We want to be *very* sure we clear this after we stop processing self.transmission_loop_running = False - def _get_rr_edus(self, force_flush): + def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: if not self._pending_rrs: return if not force_flush and not self._rrs_pending_flush: @@ -351,17 +355,16 @@ class PerDestinationQueue(object): self._rrs_pending_flush = False yield edu - def _pop_pending_edus(self, limit): + def _pop_pending_edus(self, limit: int) -> List[Edu]: pending_edus = self._pending_edus pending_edus, self._pending_edus = pending_edus[:limit], pending_edus[limit:] return pending_edus - @defer.inlineCallbacks - def _get_device_update_edus(self, limit): + async def _get_device_update_edus(self, limit: int) -> Tuple[List[Edu], int]: last_device_list = self._last_device_list_stream_id # Retrieve list of new device updates to send to the destination - now_stream_id, results = yield self._store.get_device_updates_by_remote( + now_stream_id, results = await self._store.get_device_updates_by_remote( self._destination, last_device_list, limit=limit ) edus = [ @@ -378,11 +381,10 @@ class PerDestinationQueue(object): return (edus, now_stream_id) - @defer.inlineCallbacks - def _get_to_device_message_edus(self, limit): + async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]: last_device_stream_id = self._last_device_stream_id to_device_stream_id = self._store.get_to_device_stream_token() - contents, stream_id = yield self._store.get_new_device_msgs_for_remote( + contents, stream_id = await self._store.get_new_device_msgs_for_remote( self._destination, last_device_stream_id, to_device_stream_id, limit ) edus = [ diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 5fed626d5b..3c2a02a3b3 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -13,14 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import List from canonicaljson import json -from twisted.internet import defer - +import synapse.server from synapse.api.errors import HttpResponseException +from synapse.events import EventBase from synapse.federation.persistence import TransactionActions -from synapse.federation.units import Transaction +from synapse.federation.units import Edu, Transaction from synapse.logging.opentracing import ( extract_text_map, set_tag, @@ -39,7 +40,7 @@ class TransactionManager(object): shared between PerDestinationQueue objects """ - def __init__(self, hs): + def __init__(self, hs: "synapse.server.HomeServer"): self._server_name = hs.hostname self.clock = hs.get_clock() # nb must be called this for @measure_func self._store = hs.get_datastore() @@ -50,8 +51,9 @@ class TransactionManager(object): self._next_txn_id = int(self.clock.time_msec()) @measure_func("_send_new_transaction") - @defer.inlineCallbacks - def send_new_transaction(self, destination, pending_pdus, pending_edus): + async def send_new_transaction( + self, destination: str, pending_pdus: List[EventBase], pending_edus: List[Edu] + ): # Make a transaction-sending opentracing span. This span follows on from # all the edus in that transaction. This needs to be done since there is @@ -127,7 +129,7 @@ class TransactionManager(object): return data try: - response = yield self._transport_layer.send_transaction( + response = await self._transport_layer.send_transaction( transaction, json_data_cb ) code = 200 diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 125eadd796..92a9ae2320 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -158,7 +158,7 @@ class Authenticator(object): origin, json_request, now, "Incoming request" ) - logger.info("Request from %s", origin) + logger.debug("Request from %s", origin) request.authenticated_entity = origin # If we get a valid signed request from the other side, its probably @@ -579,7 +579,7 @@ class FederationV1InviteServlet(BaseFederationServlet): # state resolution algorithm, and we don't use that for processing # invites content = await self.handler.on_invite_request( - origin, content, room_version=RoomVersions.V1.identifier + origin, content, room_version_id=RoomVersions.V1.identifier ) # V1 federation API is defined to return a content of `[200, {...}]` @@ -606,7 +606,7 @@ class FederationV2InviteServlet(BaseFederationServlet): event.setdefault("unsigned", {})["invite_room_state"] = invite_room_state content = await self.handler.on_invite_request( - origin, event, room_version=room_version + origin, event, room_version_id=room_version ) return 200, content diff --git a/synapse/federation/units.py b/synapse/federation/units.py index b4d743cde7..6b32e0dcbf 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -19,11 +19,15 @@ server protocol. import logging +import attr + +from synapse.types import JsonDict from synapse.util.jsonobject import JsonEncodedObject logger = logging.getLogger(__name__) +@attr.s(slots=True) class Edu(JsonEncodedObject): """ An Edu represents a piece of data sent from one homeserver to another. @@ -32,11 +36,24 @@ class Edu(JsonEncodedObject): internal ID or previous references graph. """ - valid_keys = ["origin", "destination", "edu_type", "content"] + edu_type = attr.ib(type=str) + content = attr.ib(type=dict) + origin = attr.ib(type=str) + destination = attr.ib(type=str) - required_keys = ["edu_type"] + def get_dict(self) -> JsonDict: + return { + "edu_type": self.edu_type, + "content": self.content, + } - internal_keys = ["origin", "destination"] + def get_internal_dict(self) -> JsonDict: + return { + "edu_type": self.edu_type, + "content": self.content, + "origin": self.origin, + "destination": self.destination, + } def get_context(self): return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}") diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 0ec9be3cb5..c106abae21 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -36,7 +36,7 @@ logger = logging.getLogger(__name__) # TODO: Flairs -class GroupsServerHandler(object): +class GroupsServerWorkerHandler(object): def __init__(self, hs): self.hs = hs self.store = hs.get_datastore() @@ -51,9 +51,6 @@ class GroupsServerHandler(object): self.transport_client = hs.get_federation_transport_client() self.profile_handler = hs.get_profile_handler() - # Ensure attestations get renewed - hs.get_groups_attestation_renewer() - @defer.inlineCallbacks def check_group_is_ours( self, group_id, requester_user_id, and_exists=False, and_is_admin=None @@ -168,6 +165,197 @@ class GroupsServerHandler(object): } @defer.inlineCallbacks + def get_group_categories(self, group_id, requester_user_id): + """Get all categories in a group (as seen by user) + """ + yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + + categories = yield self.store.get_group_categories(group_id=group_id) + return {"categories": categories} + + @defer.inlineCallbacks + def get_group_category(self, group_id, requester_user_id, category_id): + """Get a specific category in a group (as seen by user) + """ + yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + + res = yield self.store.get_group_category( + group_id=group_id, category_id=category_id + ) + + logger.info("group %s", res) + + return res + + @defer.inlineCallbacks + def get_group_roles(self, group_id, requester_user_id): + """Get all roles in a group (as seen by user) + """ + yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + + roles = yield self.store.get_group_roles(group_id=group_id) + return {"roles": roles} + + @defer.inlineCallbacks + def get_group_role(self, group_id, requester_user_id, role_id): + """Get a specific role in a group (as seen by user) + """ + yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + + res = yield self.store.get_group_role(group_id=group_id, role_id=role_id) + return res + + @defer.inlineCallbacks + def get_group_profile(self, group_id, requester_user_id): + """Get the group profile as seen by requester_user_id + """ + + yield self.check_group_is_ours(group_id, requester_user_id) + + group = yield self.store.get_group(group_id) + + if group: + cols = [ + "name", + "short_description", + "long_description", + "avatar_url", + "is_public", + ] + group_description = {key: group[key] for key in cols} + group_description["is_openly_joinable"] = group["join_policy"] == "open" + + return group_description + else: + raise SynapseError(404, "Unknown group") + + @defer.inlineCallbacks + def get_users_in_group(self, group_id, requester_user_id): + """Get the users in group as seen by requester_user_id. + + The ordering is arbitrary at the moment + """ + + yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + + is_user_in_group = yield self.store.is_user_in_group( + requester_user_id, group_id + ) + + user_results = yield self.store.get_users_in_group( + group_id, include_private=is_user_in_group + ) + + chunk = [] + for user_result in user_results: + g_user_id = user_result["user_id"] + is_public = user_result["is_public"] + is_privileged = user_result["is_admin"] + + entry = {"user_id": g_user_id} + + profile = yield self.profile_handler.get_profile_from_cache(g_user_id) + entry.update(profile) + + entry["is_public"] = bool(is_public) + entry["is_privileged"] = bool(is_privileged) + + if not self.is_mine_id(g_user_id): + attestation = yield self.store.get_remote_attestation( + group_id, g_user_id + ) + if not attestation: + continue + + entry["attestation"] = attestation + else: + entry["attestation"] = self.attestations.create_attestation( + group_id, g_user_id + ) + + chunk.append(entry) + + # TODO: If admin add lists of users whose attestations have timed out + + return {"chunk": chunk, "total_user_count_estimate": len(user_results)} + + @defer.inlineCallbacks + def get_invited_users_in_group(self, group_id, requester_user_id): + """Get the users that have been invited to a group as seen by requester_user_id. + + The ordering is arbitrary at the moment + """ + + yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + + is_user_in_group = yield self.store.is_user_in_group( + requester_user_id, group_id + ) + + if not is_user_in_group: + raise SynapseError(403, "User not in group") + + invited_users = yield self.store.get_invited_users_in_group(group_id) + + user_profiles = [] + + for user_id in invited_users: + user_profile = {"user_id": user_id} + try: + profile = yield self.profile_handler.get_profile_from_cache(user_id) + user_profile.update(profile) + except Exception as e: + logger.warning("Error getting profile for %s: %s", user_id, e) + user_profiles.append(user_profile) + + return {"chunk": user_profiles, "total_user_count_estimate": len(invited_users)} + + @defer.inlineCallbacks + def get_rooms_in_group(self, group_id, requester_user_id): + """Get the rooms in group as seen by requester_user_id + + This returns rooms in order of decreasing number of joined users + """ + + yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + + is_user_in_group = yield self.store.is_user_in_group( + requester_user_id, group_id + ) + + room_results = yield self.store.get_rooms_in_group( + group_id, include_private=is_user_in_group + ) + + chunk = [] + for room_result in room_results: + room_id = room_result["room_id"] + + joined_users = yield self.store.get_users_in_room(room_id) + entry = yield self.room_list_handler.generate_room_entry( + room_id, len(joined_users), with_alias=False, allow_private=True + ) + + if not entry: + continue + + entry["is_public"] = bool(room_result["is_public"]) + + chunk.append(entry) + + chunk.sort(key=lambda e: -e["num_joined_members"]) + + return {"chunk": chunk, "total_room_count_estimate": len(room_results)} + + +class GroupsServerHandler(GroupsServerWorkerHandler): + def __init__(self, hs): + super(GroupsServerHandler, self).__init__(hs) + + # Ensure attestations get renewed + hs.get_groups_attestation_renewer() + + @defer.inlineCallbacks def update_group_summary_room( self, group_id, requester_user_id, room_id, category_id, content ): @@ -230,27 +418,6 @@ class GroupsServerHandler(object): return {} @defer.inlineCallbacks - def get_group_categories(self, group_id, requester_user_id): - """Get all categories in a group (as seen by user) - """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - - categories = yield self.store.get_group_categories(group_id=group_id) - return {"categories": categories} - - @defer.inlineCallbacks - def get_group_category(self, group_id, requester_user_id, category_id): - """Get a specific category in a group (as seen by user) - """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - - res = yield self.store.get_group_category( - group_id=group_id, category_id=category_id - ) - - return res - - @defer.inlineCallbacks def update_group_category(self, group_id, requester_user_id, category_id, content): """Add/Update a group category """ @@ -285,24 +452,6 @@ class GroupsServerHandler(object): return {} @defer.inlineCallbacks - def get_group_roles(self, group_id, requester_user_id): - """Get all roles in a group (as seen by user) - """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - - roles = yield self.store.get_group_roles(group_id=group_id) - return {"roles": roles} - - @defer.inlineCallbacks - def get_group_role(self, group_id, requester_user_id, role_id): - """Get a specific role in a group (as seen by user) - """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - - res = yield self.store.get_group_role(group_id=group_id, role_id=role_id) - return res - - @defer.inlineCallbacks def update_group_role(self, group_id, requester_user_id, role_id, content): """Add/update a role in a group """ @@ -371,30 +520,6 @@ class GroupsServerHandler(object): return {} @defer.inlineCallbacks - def get_group_profile(self, group_id, requester_user_id): - """Get the group profile as seen by requester_user_id - """ - - yield self.check_group_is_ours(group_id, requester_user_id) - - group = yield self.store.get_group(group_id) - - if group: - cols = [ - "name", - "short_description", - "long_description", - "avatar_url", - "is_public", - ] - group_description = {key: group[key] for key in cols} - group_description["is_openly_joinable"] = group["join_policy"] == "open" - - return group_description - else: - raise SynapseError(404, "Unknown group") - - @defer.inlineCallbacks def update_group_profile(self, group_id, requester_user_id, content): """Update the group profile """ @@ -413,124 +538,6 @@ class GroupsServerHandler(object): yield self.store.update_group_profile(group_id, profile) @defer.inlineCallbacks - def get_users_in_group(self, group_id, requester_user_id): - """Get the users in group as seen by requester_user_id. - - The ordering is arbitrary at the moment - """ - - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - - is_user_in_group = yield self.store.is_user_in_group( - requester_user_id, group_id - ) - - user_results = yield self.store.get_users_in_group( - group_id, include_private=is_user_in_group - ) - - chunk = [] - for user_result in user_results: - g_user_id = user_result["user_id"] - is_public = user_result["is_public"] - is_privileged = user_result["is_admin"] - - entry = {"user_id": g_user_id} - - profile = yield self.profile_handler.get_profile_from_cache(g_user_id) - entry.update(profile) - - entry["is_public"] = bool(is_public) - entry["is_privileged"] = bool(is_privileged) - - if not self.is_mine_id(g_user_id): - attestation = yield self.store.get_remote_attestation( - group_id, g_user_id - ) - if not attestation: - continue - - entry["attestation"] = attestation - else: - entry["attestation"] = self.attestations.create_attestation( - group_id, g_user_id - ) - - chunk.append(entry) - - # TODO: If admin add lists of users whose attestations have timed out - - return {"chunk": chunk, "total_user_count_estimate": len(user_results)} - - @defer.inlineCallbacks - def get_invited_users_in_group(self, group_id, requester_user_id): - """Get the users that have been invited to a group as seen by requester_user_id. - - The ordering is arbitrary at the moment - """ - - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - - is_user_in_group = yield self.store.is_user_in_group( - requester_user_id, group_id - ) - - if not is_user_in_group: - raise SynapseError(403, "User not in group") - - invited_users = yield self.store.get_invited_users_in_group(group_id) - - user_profiles = [] - - for user_id in invited_users: - user_profile = {"user_id": user_id} - try: - profile = yield self.profile_handler.get_profile_from_cache(user_id) - user_profile.update(profile) - except Exception as e: - logger.warning("Error getting profile for %s: %s", user_id, e) - user_profiles.append(user_profile) - - return {"chunk": user_profiles, "total_user_count_estimate": len(invited_users)} - - @defer.inlineCallbacks - def get_rooms_in_group(self, group_id, requester_user_id): - """Get the rooms in group as seen by requester_user_id - - This returns rooms in order of decreasing number of joined users - """ - - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - - is_user_in_group = yield self.store.is_user_in_group( - requester_user_id, group_id - ) - - room_results = yield self.store.get_rooms_in_group( - group_id, include_private=is_user_in_group - ) - - chunk = [] - for room_result in room_results: - room_id = room_result["room_id"] - - joined_users = yield self.store.get_users_in_room(room_id) - entry = yield self.room_list_handler.generate_room_entry( - room_id, len(joined_users), with_alias=False, allow_private=True - ) - - if not entry: - continue - - entry["is_public"] = bool(room_result["is_public"]) - - chunk.append(entry) - - chunk.sort(key=lambda e: -e["num_joined_members"]) - - return {"chunk": chunk, "total_room_count_estimate": len(room_results)} - - @defer.inlineCallbacks def add_room_to_group(self, group_id, requester_user_id, room_id, content): """Add room to group """ diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 9205865231..f3c0aeceb6 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -58,8 +58,10 @@ class AdminHandler(BaseHandler): ret = await self.store.get_user_by_id(user.to_string()) if ret: profile = await self.store.get_profileinfo(user.localpart) + threepids = await self.store.user_get_threepids(user.to_string()) ret["displayname"] = profile.display_name ret["avatar_url"] = profile.avatar_url + ret["threepids"] = threepids return ret async def export_user_data(self, user_id, writer): diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 54a71c49d2..48a88d3c2a 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -816,6 +816,14 @@ class AuthHandler(BaseHandler): @defer.inlineCallbacks def add_threepid(self, user_id, medium, address, validated_at): + # check if medium has a valid value + if medium not in ["email", "msisdn"]: + raise SynapseError( + code=400, + msg=("'%s' is not a valid value for 'medium'" % (medium,)), + errcode=Codes.INVALID_PARAM, + ) + # 'Canonicalise' email addresses down to lower case. # We've now moving towards the homeserver being the entity that # is responsible for validating threepids used for resetting passwords diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index a9bd431486..6d8e48ed39 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -225,6 +225,22 @@ class DeviceWorkerHandler(BaseHandler): return result + @defer.inlineCallbacks + def on_federation_query_user_devices(self, user_id): + stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) + master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master") + self_signing_key = yield self.store.get_e2e_cross_signing_key( + user_id, "self_signing" + ) + + return { + "user_id": user_id, + "stream_id": stream_id, + "devices": devices, + "master_key": master_key, + "self_signing_key": self_signing_key, + } + class DeviceHandler(DeviceWorkerHandler): def __init__(self, hs): @@ -239,9 +255,6 @@ class DeviceHandler(DeviceWorkerHandler): federation_registry.register_edu_handler( "m.device_list_update", self.device_list_updater.incoming_device_list_update ) - federation_registry.register_query_handler( - "user_devices", self.on_federation_query_user_devices - ) hs.get_distributor().observe("user_left_room", self.user_left_room) @@ -457,22 +470,6 @@ class DeviceHandler(DeviceWorkerHandler): self.notifier.on_new_event("device_list_key", position, users=[from_user_id]) @defer.inlineCallbacks - def on_federation_query_user_devices(self, user_id): - stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) - master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master") - self_signing_key = yield self.store.get_e2e_cross_signing_key( - user_id, "self_signing" - ) - - return { - "user_id": user_id, - "stream_id": stream_id, - "devices": devices, - "master_key": master_key, - "self_signing_key": self_signing_key, - } - - @defer.inlineCallbacks def user_left_room(self, user, room_id): user_id = user.to_string() room_ids = yield self.store.get_rooms_for_user(user_id) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e9441bbeff..eb20ef4aec 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -65,7 +65,7 @@ from synapse.replication.http.federation import ( from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet from synapse.state import StateResolutionStore, resolve_events_with_store from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour -from synapse.types import StateMap, UserID, get_domain_from_id +from synapse.types import JsonDict, StateMap, UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.distributor import user_joined_room from synapse.util.retryutils import NotRetryingDestination @@ -1156,7 +1156,7 @@ class FederationHandler(BaseHandler): Logs a warning if we can't find the given event. """ - room_version = await self.store.get_room_version_id(room_id) + room_version = await self.store.get_room_version(room_id) event_infos = [] @@ -1230,13 +1230,12 @@ class FederationHandler(BaseHandler): ) raise SynapseError(http_client.BAD_REQUEST, "Too many auth_events") - @defer.inlineCallbacks - def send_invite(self, target_host, event): + async def send_invite(self, target_host, event): """ Sends the invite to the remote server for signing. Invites must be signed by the invitee's server before distribution. """ - pdu = yield self.federation_client.send_invite( + pdu = await self.federation_client.send_invite( destination=target_host, room_id=event.room_id, event_id=event.event_id, @@ -1245,17 +1244,16 @@ class FederationHandler(BaseHandler): return pdu - @defer.inlineCallbacks - def on_event_auth(self, event_id): - event = yield self.store.get_event(event_id) - auth = yield self.store.get_auth_chain( + async def on_event_auth(self, event_id: str) -> List[EventBase]: + event = await self.store.get_event(event_id) + auth = await self.store.get_auth_chain( [auth_id for auth_id in event.auth_event_ids()], include_given=True ) - return [e for e in auth] + return list(auth) - @log_function - @defer.inlineCallbacks - def do_invite_join(self, target_hosts, room_id, joinee, content): + async def do_invite_join( + self, target_hosts: Iterable[str], room_id: str, joinee: str, content: JsonDict + ) -> None: """ Attempts to join the `joinee` to the room `room_id` via the servers contained in `target_hosts`. @@ -1268,17 +1266,17 @@ class FederationHandler(BaseHandler): have finished processing the join. Args: - target_hosts (Iterable[str]): List of servers to attempt to join the room with. + target_hosts: List of servers to attempt to join the room with. - room_id (str): The ID of the room to join. + room_id: The ID of the room to join. - joinee (str): The User ID of the joining user. + joinee: The User ID of the joining user. - content (dict): The event content to use for the join event. + content: The event content to use for the join event. """ logger.debug("Joining %s to %s", joinee, room_id) - origin, event, room_version_obj = yield self._make_and_verify_event( + origin, event, room_version_obj = await self._make_and_verify_event( target_hosts, room_id, joinee, @@ -1294,7 +1292,7 @@ class FederationHandler(BaseHandler): self.room_queues[room_id] = [] - yield self._clean_room_for_join(room_id) + await self._clean_room_for_join(room_id) handled_events = set() @@ -1307,9 +1305,8 @@ class FederationHandler(BaseHandler): except ValueError: pass - event_format_version = room_version_obj.event_format - ret = yield self.federation_client.send_join( - target_hosts, event, event_format_version + ret = await self.federation_client.send_join( + target_hosts, event, room_version_obj ) origin = ret["origin"] @@ -1327,7 +1324,7 @@ class FederationHandler(BaseHandler): logger.debug("do_invite_join event: %s", event) try: - yield self.store.store_room( + await self.store.store_room( room_id=room_id, room_creator_user_id="", is_public=False, @@ -1337,13 +1334,13 @@ class FederationHandler(BaseHandler): # FIXME pass - yield self._persist_auth_tree( + await self._persist_auth_tree( origin, auth_chain, state, event, room_version_obj ) # Check whether this room is the result of an upgrade of a room we already know # about. If so, migrate over user information - predecessor = yield self.store.get_room_predecessor(room_id) + predecessor = await self.store.get_room_predecessor(room_id) if not predecessor or not isinstance(predecessor.get("room_id"), str): return old_room_id = predecessor["room_id"] @@ -1353,7 +1350,7 @@ class FederationHandler(BaseHandler): # We retrieve the room member handler here as to not cause a cyclic dependency member_handler = self.hs.get_room_member_handler() - yield member_handler.transfer_room_state_on_room_upgrade( + await member_handler.transfer_room_state_on_room_upgrade( old_room_id, room_id ) @@ -1370,8 +1367,6 @@ class FederationHandler(BaseHandler): run_in_background(self._handle_queued_pdus, room_queue) - return True - async def _handle_queued_pdus(self, room_queue): """Process PDUs which got queued up while we were busy send_joining. @@ -1394,20 +1389,17 @@ class FederationHandler(BaseHandler): "Error handling queued PDU %s from %s: %s", p.event_id, origin, e ) - @defer.inlineCallbacks - @log_function - def on_make_join_request(self, origin, room_id, user_id): + async def on_make_join_request( + self, origin: str, room_id: str, user_id: str + ) -> EventBase: """ We've received a /make_join/ request, so we create a partial join event for the room and return that. We do *not* persist or process it until the other server has signed it and sent it back. Args: - origin (str): The (verified) server name of the requesting server. - room_id (str): Room to create join event in - user_id (str): The user to create the join for - - Returns: - Deferred[FrozenEvent] + origin: The (verified) server name of the requesting server. + room_id: Room to create join event in + user_id: The user to create the join for """ if get_domain_from_id(user_id) != origin: logger.info( @@ -1419,7 +1411,7 @@ class FederationHandler(BaseHandler): event_content = {"membership": Membership.JOIN} - room_version = yield self.store.get_room_version_id(room_id) + room_version = await self.store.get_room_version_id(room_id) builder = self.event_builder_factory.new( room_version, @@ -1433,14 +1425,14 @@ class FederationHandler(BaseHandler): ) try: - event, context = yield self.event_creation_handler.create_new_client_event( + event, context = await self.event_creation_handler.create_new_client_event( builder=builder ) except AuthError as e: logger.warning("Failed to create join to %s because %s", room_id, e) raise e - event_allowed = yield self.third_party_event_rules.check_event_allowed( + event_allowed = await self.third_party_event_rules.check_event_allowed( event, context ) if not event_allowed: @@ -1451,15 +1443,13 @@ class FederationHandler(BaseHandler): # The remote hasn't signed it yet, obviously. We'll do the full checks # when we get the event back in `on_send_join_request` - yield self.auth.check_from_context( + await self.auth.check_from_context( room_version, event, context, do_sig_check=False ) return event - @defer.inlineCallbacks - @log_function - def on_send_join_request(self, origin, pdu): + async def on_send_join_request(self, origin, pdu): """ We have received a join event for a room. Fully process it and respond with the current state and auth chains. """ @@ -1496,9 +1486,9 @@ class FederationHandler(BaseHandler): # would introduce the danger of backwards-compatibility problems. event.internal_metadata.send_on_behalf_of = origin - context = yield self._handle_new_event(origin, event) + context = await self._handle_new_event(origin, event) - event_allowed = yield self.third_party_event_rules.check_event_allowed( + event_allowed = await self.third_party_event_rules.check_event_allowed( event, context ) if not event_allowed: @@ -1516,19 +1506,18 @@ class FederationHandler(BaseHandler): if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = UserID.from_string(event.state_key) - yield self.user_joined_room(user, event.room_id) + await self.user_joined_room(user, event.room_id) - prev_state_ids = yield context.get_prev_state_ids() + prev_state_ids = await context.get_prev_state_ids() state_ids = list(prev_state_ids.values()) - auth_chain = yield self.store.get_auth_chain(state_ids) + auth_chain = await self.store.get_auth_chain(state_ids) - state = yield self.store.get_events(list(prev_state_ids.values())) + state = await self.store.get_events(list(prev_state_ids.values())) return {"state": list(state.values()), "auth_chain": auth_chain} - @defer.inlineCallbacks - def on_invite_request( + async def on_invite_request( self, origin: str, event: EventBase, room_version: RoomVersion ): """ We've got an invite event. Process and persist it. Sign it. @@ -1538,7 +1527,7 @@ class FederationHandler(BaseHandler): if event.state_key is None: raise SynapseError(400, "The invite event did not have a state key") - is_blocked = yield self.store.is_room_blocked(event.room_id) + is_blocked = await self.store.is_room_blocked(event.room_id) if is_blocked: raise SynapseError(403, "This room has been blocked on this server") @@ -1581,14 +1570,15 @@ class FederationHandler(BaseHandler): ) ) - context = yield self.state_handler.compute_event_context(event) - yield self.persist_events_and_notify([(event, context)]) + context = await self.state_handler.compute_event_context(event) + await self.persist_events_and_notify([(event, context)]) return event - @defer.inlineCallbacks - def do_remotely_reject_invite(self, target_hosts, room_id, user_id, content): - origin, event, room_version = yield self._make_and_verify_event( + async def do_remotely_reject_invite( + self, target_hosts: Iterable[str], room_id: str, user_id: str, content: JsonDict + ) -> EventBase: + origin, event, room_version = await self._make_and_verify_event( target_hosts, room_id, user_id, "leave", content=content ) # Mark as outlier as we don't have any state for this event; we're not @@ -1604,22 +1594,27 @@ class FederationHandler(BaseHandler): except ValueError: pass - yield self.federation_client.send_leave(target_hosts, event) + await self.federation_client.send_leave(target_hosts, event) - context = yield self.state_handler.compute_event_context(event) - yield self.persist_events_and_notify([(event, context)]) + context = await self.state_handler.compute_event_context(event) + await self.persist_events_and_notify([(event, context)]) return event - @defer.inlineCallbacks - def _make_and_verify_event( - self, target_hosts, room_id, user_id, membership, content={}, params=None - ): + async def _make_and_verify_event( + self, + target_hosts: Iterable[str], + room_id: str, + user_id: str, + membership: str, + content: JsonDict = {}, + params: Optional[Dict[str, str]] = None, + ) -> Tuple[str, EventBase, RoomVersion]: ( origin, event, room_version, - ) = yield self.federation_client.make_membership_event( + ) = await self.federation_client.make_membership_event( target_hosts, room_id, user_id, membership, content, params=params ) @@ -1633,20 +1628,17 @@ class FederationHandler(BaseHandler): assert event.room_id == room_id return origin, event, room_version - @defer.inlineCallbacks - @log_function - def on_make_leave_request(self, origin, room_id, user_id): + async def on_make_leave_request( + self, origin: str, room_id: str, user_id: str + ) -> EventBase: """ We've received a /make_leave/ request, so we create a partial leave event for the room and return that. We do *not* persist or process it until the other server has signed it and sent it back. Args: - origin (str): The (verified) server name of the requesting server. - room_id (str): Room to create leave event in - user_id (str): The user to create the leave for - - Returns: - Deferred[FrozenEvent] + origin: The (verified) server name of the requesting server. + room_id: Room to create leave event in + user_id: The user to create the leave for """ if get_domain_from_id(user_id) != origin: logger.info( @@ -1656,7 +1648,7 @@ class FederationHandler(BaseHandler): ) raise SynapseError(403, "User not from origin", Codes.FORBIDDEN) - room_version = yield self.store.get_room_version_id(room_id) + room_version = await self.store.get_room_version_id(room_id) builder = self.event_builder_factory.new( room_version, { @@ -1668,11 +1660,11 @@ class FederationHandler(BaseHandler): }, ) - event, context = yield self.event_creation_handler.create_new_client_event( + event, context = await self.event_creation_handler.create_new_client_event( builder=builder ) - event_allowed = yield self.third_party_event_rules.check_event_allowed( + event_allowed = await self.third_party_event_rules.check_event_allowed( event, context ) if not event_allowed: @@ -1684,7 +1676,7 @@ class FederationHandler(BaseHandler): try: # The remote hasn't signed it yet, obviously. We'll do the full checks # when we get the event back in `on_send_leave_request` - yield self.auth.check_from_context( + await self.auth.check_from_context( room_version, event, context, do_sig_check=False ) except AuthError as e: @@ -1693,9 +1685,7 @@ class FederationHandler(BaseHandler): return event - @defer.inlineCallbacks - @log_function - def on_send_leave_request(self, origin, pdu): + async def on_send_leave_request(self, origin, pdu): """ We have received a leave event for a room. Fully process it.""" event = pdu @@ -1715,9 +1705,9 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - context = yield self._handle_new_event(origin, event) + context = await self._handle_new_event(origin, event) - event_allowed = yield self.third_party_event_rules.check_event_allowed( + event_allowed = await self.third_party_event_rules.check_event_allowed( event, context ) if not event_allowed: @@ -1798,6 +1788,9 @@ class FederationHandler(BaseHandler): if not in_room: raise AuthError(403, "Host not in room.") + # Synapse asks for 100 events per backfill request. Do not allow more. + limit = min(limit, 100) + events = yield self.store.get_backfill_events(room_id, pdu_list, limit) events = yield filter_events_for_server(self.storage, origin, events) @@ -1839,11 +1832,10 @@ class FederationHandler(BaseHandler): def get_min_depth_for_context(self, context): return self.store.get_min_depth(context) - @defer.inlineCallbacks - def _handle_new_event( + async def _handle_new_event( self, origin, event, state=None, auth_events=None, backfilled=False ): - context = yield self._prep_event( + context = await self._prep_event( origin, event, state=state, auth_events=auth_events, backfilled=backfilled ) @@ -1856,11 +1848,11 @@ class FederationHandler(BaseHandler): and not backfilled and not context.rejected ): - yield self.action_generator.handle_push_actions_for_event( + await self.action_generator.handle_push_actions_for_event( event, context ) - yield self.persist_events_and_notify( + await self.persist_events_and_notify( [(event, context)], backfilled=backfilled ) success = True @@ -1872,13 +1864,12 @@ class FederationHandler(BaseHandler): return context - @defer.inlineCallbacks - def _handle_new_events( + async def _handle_new_events( self, origin: str, event_infos: Iterable[_NewEventInfo], backfilled: bool = False, - ): + ) -> None: """Creates the appropriate contexts and persists events. The events should not depend on one another, e.g. this should be used to persist a bunch of outliers, but not a chunk of individual events that depend @@ -1887,11 +1878,10 @@ class FederationHandler(BaseHandler): Notifies about the events where appropriate. """ - @defer.inlineCallbacks - def prep(ev_info: _NewEventInfo): + async def prep(ev_info: _NewEventInfo): event = ev_info.event with nested_logging_context(suffix=event.event_id): - res = yield self._prep_event( + res = await self._prep_event( origin, event, state=ev_info.state, @@ -1900,14 +1890,14 @@ class FederationHandler(BaseHandler): ) return res - contexts = yield make_deferred_yieldable( + contexts = await make_deferred_yieldable( defer.gatherResults( [run_in_background(prep, ev_info) for ev_info in event_infos], consumeErrors=True, ) ) - yield self.persist_events_and_notify( + await self.persist_events_and_notify( [ (ev_info.event, context) for ev_info, context in zip(event_infos, contexts) @@ -1915,15 +1905,14 @@ class FederationHandler(BaseHandler): backfilled=backfilled, ) - @defer.inlineCallbacks - def _persist_auth_tree( + async def _persist_auth_tree( self, origin: str, auth_events: List[EventBase], state: List[EventBase], event: EventBase, room_version: RoomVersion, - ): + ) -> None: """Checks the auth chain is valid (and passes auth checks) for the state and event. Then persists the auth chain and state atomically. Persists the event separately. Notifies about the persisted events @@ -1938,14 +1927,11 @@ class FederationHandler(BaseHandler): event room_version: The room version we expect this room to have, and will raise if it doesn't match the version in the create event. - - Returns: - Deferred """ events_to_context = {} for e in itertools.chain(auth_events, state): e.internal_metadata.outlier = True - ctx = yield self.state_handler.compute_event_context(e) + ctx = await self.state_handler.compute_event_context(e) events_to_context[e.event_id] = ctx event_map = { @@ -1977,12 +1963,8 @@ class FederationHandler(BaseHandler): missing_auth_events.add(e_id) for e_id in missing_auth_events: - m_ev = yield self.federation_client.get_pdu( - [origin], - e_id, - room_version=room_version.identifier, - outlier=True, - timeout=10000, + m_ev = await self.federation_client.get_pdu( + [origin], e_id, room_version=room_version, outlier=True, timeout=10000, ) if m_ev and m_ev.event_id == e_id: event_map[e_id] = m_ev @@ -2013,91 +1995,74 @@ class FederationHandler(BaseHandler): raise events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR - yield self.persist_events_and_notify( + await self.persist_events_and_notify( [ (e, events_to_context[e.event_id]) for e in itertools.chain(auth_events, state) ] ) - new_event_context = yield self.state_handler.compute_event_context( + new_event_context = await self.state_handler.compute_event_context( event, old_state=state ) - yield self.persist_events_and_notify([(event, new_event_context)]) + await self.persist_events_and_notify([(event, new_event_context)]) - @defer.inlineCallbacks - def _prep_event( + async def _prep_event( self, origin: str, event: EventBase, state: Optional[Iterable[EventBase]], auth_events: Optional[StateMap[EventBase]], backfilled: bool, - ): - """ - - Args: - origin: - event: - state: - auth_events: - backfilled: - - Returns: - Deferred, which resolves to synapse.events.snapshot.EventContext - """ - context = yield self.state_handler.compute_event_context(event, old_state=state) + ) -> EventContext: + context = await self.state_handler.compute_event_context(event, old_state=state) if not auth_events: - prev_state_ids = yield context.get_prev_state_ids() - auth_events_ids = yield self.auth.compute_auth_events( + prev_state_ids = await context.get_prev_state_ids() + auth_events_ids = await self.auth.compute_auth_events( event, prev_state_ids, for_verification=True ) - auth_events = yield self.store.get_events(auth_events_ids) + auth_events = await self.store.get_events(auth_events_ids) auth_events = {(e.type, e.state_key): e for e in auth_events.values()} # This is a hack to fix some old rooms where the initial join event # didn't reference the create event in its auth events. if event.type == EventTypes.Member and not event.auth_event_ids(): if len(event.prev_event_ids()) == 1 and event.depth < 5: - c = yield self.store.get_event( + c = await self.store.get_event( event.prev_event_ids()[0], allow_none=True ) if c and c.type == EventTypes.Create: auth_events[(c.type, c.state_key)] = c - context = yield self.do_auth(origin, event, context, auth_events=auth_events) + context = await self.do_auth(origin, event, context, auth_events=auth_events) if not context.rejected: - yield self._check_for_soft_fail(event, state, backfilled) + await self._check_for_soft_fail(event, state, backfilled) if event.type == EventTypes.GuestAccess and not context.rejected: - yield self.maybe_kick_guest_users(event) + await self.maybe_kick_guest_users(event) return context - @defer.inlineCallbacks - def _check_for_soft_fail( + async def _check_for_soft_fail( self, event: EventBase, state: Optional[Iterable[EventBase]], backfilled: bool - ): - """Checks if we should soft fail the event, if so marks the event as + ) -> None: + """Checks if we should soft fail the event; if so, marks the event as such. Args: event state: The state at the event if we don't have all the event's prev events backfilled: Whether the event is from backfill - - Returns: - Deferred """ # For new (non-backfilled and non-outlier) events we check if the event # passes auth based on the current state. If it doesn't then we # "soft-fail" the event. do_soft_fail_check = not backfilled and not event.internal_metadata.is_outlier() if do_soft_fail_check: - extrem_ids = yield self.store.get_latest_event_ids_in_room(event.room_id) + extrem_ids = await self.store.get_latest_event_ids_in_room(event.room_id) extrem_ids = set(extrem_ids) prev_event_ids = set(event.prev_event_ids()) @@ -2108,7 +2073,7 @@ class FederationHandler(BaseHandler): do_soft_fail_check = False if do_soft_fail_check: - room_version = yield self.store.get_room_version_id(event.room_id) + room_version = await self.store.get_room_version_id(event.room_id) room_version_obj = KNOWN_ROOM_VERSIONS[room_version] # Calculate the "current state". @@ -2125,19 +2090,19 @@ class FederationHandler(BaseHandler): # given state at the event. This should correctly handle cases # like bans, especially with state res v2. - state_sets = yield self.state_store.get_state_groups( + state_sets = await self.state_store.get_state_groups( event.room_id, extrem_ids ) state_sets = list(state_sets.values()) state_sets.append(state) - current_state_ids = yield self.state_handler.resolve_events( + current_state_ids = await self.state_handler.resolve_events( room_version, state_sets, event ) current_state_ids = { k: e.event_id for k, e in iteritems(current_state_ids) } else: - current_state_ids = yield self.state_handler.get_current_state_ids( + current_state_ids = await self.state_handler.get_current_state_ids( event.room_id, latest_event_ids=extrem_ids ) @@ -2153,7 +2118,7 @@ class FederationHandler(BaseHandler): e for k, e in iteritems(current_state_ids) if k in auth_types ] - current_auth_events = yield self.store.get_events(current_state_ids) + current_auth_events = await self.store.get_events(current_state_ids) current_auth_events = { (e.type, e.state_key): e for e in current_auth_events.values() } @@ -2166,15 +2131,14 @@ class FederationHandler(BaseHandler): logger.warning("Soft-failing %r because %s", event, e) event.internal_metadata.soft_failed = True - @defer.inlineCallbacks - def on_query_auth( + async def on_query_auth( self, origin, event_id, room_id, remote_auth_chain, rejects, missing ): - in_room = yield self.auth.check_host_in_room(room_id, origin) + in_room = await self.auth.check_host_in_room(room_id, origin) if not in_room: raise AuthError(403, "Host not in room.") - event = yield self.store.get_event( + event = await self.store.get_event( event_id, allow_none=False, check_room_id=room_id ) @@ -2182,57 +2146,61 @@ class FederationHandler(BaseHandler): # don't want to fall into the trap of `missing` being wrong. for e in remote_auth_chain: try: - yield self._handle_new_event(origin, e) + await self._handle_new_event(origin, e) except AuthError: pass # Now get the current auth_chain for the event. - local_auth_chain = yield self.store.get_auth_chain( + local_auth_chain = await self.store.get_auth_chain( [auth_id for auth_id in event.auth_event_ids()], include_given=True ) # TODO: Check if we would now reject event_id. If so we need to tell # everyone. - ret = yield self.construct_auth_difference(local_auth_chain, remote_auth_chain) + ret = await self.construct_auth_difference(local_auth_chain, remote_auth_chain) logger.debug("on_query_auth returning: %s", ret) return ret - @defer.inlineCallbacks - def on_get_missing_events( + async def on_get_missing_events( self, origin, room_id, earliest_events, latest_events, limit ): - in_room = yield self.auth.check_host_in_room(room_id, origin) + in_room = await self.auth.check_host_in_room(room_id, origin) if not in_room: raise AuthError(403, "Host not in room.") + # Only allow up to 20 events to be retrieved per request. limit = min(limit, 20) - missing_events = yield self.store.get_missing_events( + missing_events = await self.store.get_missing_events( room_id=room_id, earliest_events=earliest_events, latest_events=latest_events, limit=limit, ) - missing_events = yield filter_events_for_server( + missing_events = await filter_events_for_server( self.storage, origin, missing_events ) return missing_events - @defer.inlineCallbacks - @log_function - def do_auth(self, origin, event, context, auth_events): + async def do_auth( + self, + origin: str, + event: EventBase, + context: EventContext, + auth_events: StateMap[EventBase], + ) -> EventContext: """ Args: - origin (str): - event (synapse.events.EventBase): - context (synapse.events.snapshot.EventContext): - auth_events (dict[(str, str)->synapse.events.EventBase]): + origin: + event: + context: + auth_events: Map from (event_type, state_key) to event Normally, our calculated auth_events based on the state of the room @@ -2242,13 +2210,13 @@ class FederationHandler(BaseHandler): Also NB that this function adds entries to it. Returns: - defer.Deferred[EventContext]: updated context object + updated context object """ - room_version = yield self.store.get_room_version_id(event.room_id) + room_version = await self.store.get_room_version_id(event.room_id) room_version_obj = KNOWN_ROOM_VERSIONS[room_version] try: - context = yield self._update_auth_events_and_context_for_auth( + context = await self._update_auth_events_and_context_for_auth( origin, event, context, auth_events ) except Exception: @@ -2270,10 +2238,13 @@ class FederationHandler(BaseHandler): return context - @defer.inlineCallbacks - def _update_auth_events_and_context_for_auth( - self, origin, event, context, auth_events - ): + async def _update_auth_events_and_context_for_auth( + self, + origin: str, + event: EventBase, + context: EventContext, + auth_events: StateMap[EventBase], + ) -> EventContext: """Helper for do_auth. See there for docs. Checks whether a given event has the expected auth events. If it @@ -2281,16 +2252,16 @@ class FederationHandler(BaseHandler): we can come to a consensus (e.g. if one server missed some valid state). - This attempts to resovle any potential divergence of state between + This attempts to resolve any potential divergence of state between servers, but is not essential and so failures should not block further processing of the event. Args: - origin (str): - event (synapse.events.EventBase): - context (synapse.events.snapshot.EventContext): + origin: + event: + context: - auth_events (dict[(str, str)->synapse.events.EventBase]): + auth_events: Map from (event_type, state_key) to event Normally, our calculated auth_events based on the state of the room @@ -2301,7 +2272,7 @@ class FederationHandler(BaseHandler): Also NB that this function adds entries to it. Returns: - defer.Deferred[EventContext]: updated context + updated context """ event_auth_events = set(event.auth_event_ids()) @@ -2315,7 +2286,7 @@ class FederationHandler(BaseHandler): # # we start by checking if they are in the store, and then try calling /event_auth/. if missing_auth: - have_events = yield self.store.have_seen_events(missing_auth) + have_events = await self.store.have_seen_events(missing_auth) logger.debug("Events %s are in the store", have_events) missing_auth.difference_update(have_events) @@ -2324,7 +2295,7 @@ class FederationHandler(BaseHandler): logger.info("auth_events contains unknown events: %s", missing_auth) try: try: - remote_auth_chain = yield self.federation_client.get_event_auth( + remote_auth_chain = await self.federation_client.get_event_auth( origin, event.room_id, event.event_id ) except RequestSendFailed as e: @@ -2333,7 +2304,7 @@ class FederationHandler(BaseHandler): logger.info("Failed to get event auth from remote: %s", e) return context - seen_remotes = yield self.store.have_seen_events( + seen_remotes = await self.store.have_seen_events( [e.event_id for e in remote_auth_chain] ) @@ -2356,7 +2327,7 @@ class FederationHandler(BaseHandler): logger.debug( "do_auth %s missing_auth: %s", event.event_id, e.event_id ) - yield self._handle_new_event(origin, e, auth_events=auth) + await self._handle_new_event(origin, e, auth_events=auth) if e.event_id in event_auth_events: auth_events[(e.type, e.state_key)] = e @@ -2390,7 +2361,7 @@ class FederationHandler(BaseHandler): # XXX: currently this checks for redactions but I'm not convinced that is # necessary? - different_events = yield self.store.get_events_as_list(different_auth) + different_events = await self.store.get_events_as_list(different_auth) for d in different_events: if d.room_id != event.room_id: @@ -2416,8 +2387,8 @@ class FederationHandler(BaseHandler): remote_auth_events.update({(d.type, d.state_key): d for d in different_events}) remote_state = remote_auth_events.values() - room_version = yield self.store.get_room_version_id(event.room_id) - new_state = yield self.state_handler.resolve_events( + room_version = await self.store.get_room_version_id(event.room_id) + new_state = await self.state_handler.resolve_events( room_version, (local_state, remote_state), event ) @@ -2432,27 +2403,27 @@ class FederationHandler(BaseHandler): auth_events.update(new_state) - context = yield self._update_context_for_auth_events( + context = await self._update_context_for_auth_events( event, context, auth_events ) return context - @defer.inlineCallbacks - def _update_context_for_auth_events(self, event, context, auth_events): + async def _update_context_for_auth_events( + self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase] + ) -> EventContext: """Update the state_ids in an event context after auth event resolution, storing the changes as a new state group. Args: - event (Event): The event we're handling the context for + event: The event we're handling the context for - context (synapse.events.snapshot.EventContext): initial event context + context: initial event context - auth_events (dict[(str, str)->EventBase]): Events to update in the event - context. + auth_events: Events to update in the event context. Returns: - Deferred[EventContext]: new event context + new event context """ # exclude the state key of the new event from the current_state in the context. if event.is_state(): @@ -2463,19 +2434,19 @@ class FederationHandler(BaseHandler): k: a.event_id for k, a in iteritems(auth_events) if k != event_key } - current_state_ids = yield context.get_current_state_ids() + current_state_ids = await context.get_current_state_ids() current_state_ids = dict(current_state_ids) current_state_ids.update(state_updates) - prev_state_ids = yield context.get_prev_state_ids() + prev_state_ids = await context.get_prev_state_ids() prev_state_ids = dict(prev_state_ids) prev_state_ids.update({k: a.event_id for k, a in iteritems(auth_events)}) # create a new state group as a delta from the existing one. prev_group = context.state_group - state_group = yield self.state_store.store_state_group( + state_group = await self.state_store.store_state_group( event.event_id, event.room_id, prev_group=prev_group, @@ -2492,8 +2463,9 @@ class FederationHandler(BaseHandler): delta_ids=state_updates, ) - @defer.inlineCallbacks - def construct_auth_difference(self, local_auth, remote_auth): + async def construct_auth_difference( + self, local_auth: Iterable[EventBase], remote_auth: Iterable[EventBase] + ) -> Dict: """ Given a local and remote auth chain, find the differences. This assumes that we have already processed all events in remote_auth @@ -2602,7 +2574,7 @@ class FederationHandler(BaseHandler): reason_map = {} for e in base_remote_rejected: - reason = yield self.store.get_rejection_reason(e.event_id) + reason = await self.store.get_rejection_reason(e.event_id) if reason is None: # TODO: e is not in the current state, so we should # construct some proof of that. @@ -2687,33 +2659,31 @@ class FederationHandler(BaseHandler): destinations, room_id, event_dict ) - @defer.inlineCallbacks - @log_function - def on_exchange_third_party_invite_request(self, room_id, event_dict): + async def on_exchange_third_party_invite_request( + self, room_id: str, event_dict: JsonDict + ) -> None: """Handle an exchange_third_party_invite request from a remote server The remote server will call this when it wants to turn a 3pid invite into a normal m.room.member invite. Args: - room_id (str): The ID of the room. + room_id: The ID of the room. event_dict (dict[str, Any]): Dictionary containing the event body. - Returns: - Deferred: resolves (to None) """ - room_version = yield self.store.get_room_version_id(room_id) + room_version = await self.store.get_room_version_id(room_id) # NB: event_dict has a particular specced format we might need to fudge # if we change event formats too much. builder = self.event_builder_factory.new(room_version, event_dict) - event, context = yield self.event_creation_handler.create_new_client_event( + event, context = await self.event_creation_handler.create_new_client_event( builder=builder ) - event_allowed = yield self.third_party_event_rules.check_event_allowed( + event_allowed = await self.third_party_event_rules.check_event_allowed( event, context ) if not event_allowed: @@ -2724,16 +2694,16 @@ class FederationHandler(BaseHandler): 403, "This event is not allowed in this context", Codes.FORBIDDEN ) - event, context = yield self.add_display_name_to_third_party_invite( + event, context = await self.add_display_name_to_third_party_invite( room_version, event_dict, event, context ) try: - yield self.auth.check_from_context(room_version, event, context) + await self.auth.check_from_context(room_version, event, context) except AuthError as e: logger.warning("Denying third party invite %r because %s", event, e) raise e - yield self._check_signature(event, context) + await self._check_signature(event, context) # We need to tell the transaction queue to send this out, even # though the sender isn't a local user. @@ -2741,7 +2711,7 @@ class FederationHandler(BaseHandler): # We retrieve the room member handler here as to not cause a cyclic dependency member_handler = self.hs.get_room_member_handler() - yield member_handler.send_membership_event(None, event, context) + await member_handler.send_membership_event(None, event, context) @defer.inlineCallbacks def add_display_name_to_third_party_invite( @@ -2889,27 +2859,27 @@ class FederationHandler(BaseHandler): if "valid" not in response or not response["valid"]: raise AuthError(403, "Third party certificate was invalid") - @defer.inlineCallbacks - def persist_events_and_notify(self, event_and_contexts, backfilled=False): + async def persist_events_and_notify( + self, + event_and_contexts: Sequence[Tuple[EventBase, EventContext]], + backfilled: bool = False, + ) -> None: """Persists events and tells the notifier/pushers about them, if necessary. Args: - event_and_contexts(list[tuple[FrozenEvent, EventContext]]) - backfilled (bool): Whether these events are a result of + event_and_contexts: + backfilled: Whether these events are a result of backfilling or not - - Returns: - Deferred """ if self.config.worker_app: - yield self._send_events_to_master( + await self._send_events_to_master( store=self.store, event_and_contexts=event_and_contexts, backfilled=backfilled, ) else: - max_stream_id = yield self.storage.persistence.persist_events( + max_stream_id = await self.storage.persistence.persist_events( event_and_contexts, backfilled=backfilled ) @@ -2920,15 +2890,17 @@ class FederationHandler(BaseHandler): if not backfilled: # Never notify for backfilled events for event, _ in event_and_contexts: - yield self._notify_persisted_event(event, max_stream_id) + await self._notify_persisted_event(event, max_stream_id) - def _notify_persisted_event(self, event, max_stream_id): + async def _notify_persisted_event( + self, event: EventBase, max_stream_id: int + ) -> None: """Checks to see if notifier/pushers should be notified about the event or not. Args: - event (FrozenEvent) - max_stream_id (int): The max_stream_id returned by persist_events + event: + max_stream_id: The max_stream_id returned by persist_events """ extra_users = [] @@ -2952,29 +2924,29 @@ class FederationHandler(BaseHandler): event, event_stream_id, max_stream_id, extra_users=extra_users ) - return self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) + await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) - def _clean_room_for_join(self, room_id): + async def _clean_room_for_join(self, room_id: str) -> None: """Called to clean up any data in DB for a given room, ready for the server to join the room. Args: - room_id (str) + room_id """ if self.config.worker_app: - return self._clean_room_for_join_client(room_id) + await self._clean_room_for_join_client(room_id) else: - return self.store.clean_room_for_join(room_id) + await self.store.clean_room_for_join(room_id) - def user_joined_room(self, user, room_id): + async def user_joined_room(self, user: UserID, room_id: str) -> None: """Called when a new user has joined the room """ if self.config.worker_app: - return self._notify_user_membership_change( + await self._notify_user_membership_change( room_id=room_id, user_id=user.to_string(), change="joined" ) else: - return defer.succeed(user_joined_room(self.distributor, user, room_id)) + user_joined_room(self.distributor, user, room_id) @defer.inlineCallbacks def get_room_complexity(self, remote_room_hosts, room_id): diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 319565510f..ad22415782 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -63,7 +63,7 @@ def _create_rerouter(func_name): return f -class GroupsLocalHandler(object): +class GroupsLocalWorkerHandler(object): def __init__(self, hs): self.hs = hs self.store = hs.get_datastore() @@ -81,40 +81,17 @@ class GroupsLocalHandler(object): self.profile_handler = hs.get_profile_handler() - # Ensure attestations get renewed - hs.get_groups_attestation_renewer() - # The following functions merely route the query to the local groups server # or federation depending on if the group is local or remote get_group_profile = _create_rerouter("get_group_profile") - update_group_profile = _create_rerouter("update_group_profile") get_rooms_in_group = _create_rerouter("get_rooms_in_group") - get_invited_users_in_group = _create_rerouter("get_invited_users_in_group") - - add_room_to_group = _create_rerouter("add_room_to_group") - update_room_in_group = _create_rerouter("update_room_in_group") - remove_room_from_group = _create_rerouter("remove_room_from_group") - - update_group_summary_room = _create_rerouter("update_group_summary_room") - delete_group_summary_room = _create_rerouter("delete_group_summary_room") - - update_group_category = _create_rerouter("update_group_category") - delete_group_category = _create_rerouter("delete_group_category") get_group_category = _create_rerouter("get_group_category") get_group_categories = _create_rerouter("get_group_categories") - - update_group_summary_user = _create_rerouter("update_group_summary_user") - delete_group_summary_user = _create_rerouter("delete_group_summary_user") - - update_group_role = _create_rerouter("update_group_role") - delete_group_role = _create_rerouter("delete_group_role") get_group_role = _create_rerouter("get_group_role") get_group_roles = _create_rerouter("get_group_roles") - set_group_join_policy = _create_rerouter("set_group_join_policy") - @defer.inlineCallbacks def get_group_summary(self, group_id, requester_user_id): """Get the group summary for a group. @@ -170,6 +147,144 @@ class GroupsLocalHandler(object): return res @defer.inlineCallbacks + def get_users_in_group(self, group_id, requester_user_id): + """Get users in a group + """ + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.get_users_in_group( + group_id, requester_user_id + ) + return res + + group_server_name = get_domain_from_id(group_id) + + try: + res = yield self.transport_client.get_users_in_group( + get_domain_from_id(group_id), group_id, requester_user_id + ) + except HttpResponseException as e: + raise e.to_synapse_error() + except RequestSendFailed: + raise SynapseError(502, "Failed to contact group server") + + chunk = res["chunk"] + valid_entries = [] + for entry in chunk: + g_user_id = entry["user_id"] + attestation = entry.pop("attestation", {}) + try: + if get_domain_from_id(g_user_id) != group_server_name: + yield self.attestations.verify_attestation( + attestation, + group_id=group_id, + user_id=g_user_id, + server_name=get_domain_from_id(g_user_id), + ) + valid_entries.append(entry) + except Exception as e: + logger.info("Failed to verify user is in group: %s", e) + + res["chunk"] = valid_entries + + return res + + @defer.inlineCallbacks + def get_joined_groups(self, user_id): + group_ids = yield self.store.get_joined_groups(user_id) + return {"groups": group_ids} + + @defer.inlineCallbacks + def get_publicised_groups_for_user(self, user_id): + if self.hs.is_mine_id(user_id): + result = yield self.store.get_publicised_groups_for_user(user_id) + + # Check AS associated groups for this user - this depends on the + # RegExps in the AS registration file (under `users`) + for app_service in self.store.get_app_services(): + result.extend(app_service.get_groups_for_user(user_id)) + + return {"groups": result} + else: + try: + bulk_result = yield self.transport_client.bulk_get_publicised_groups( + get_domain_from_id(user_id), [user_id] + ) + except HttpResponseException as e: + raise e.to_synapse_error() + except RequestSendFailed: + raise SynapseError(502, "Failed to contact group server") + + result = bulk_result.get("users", {}).get(user_id) + # TODO: Verify attestations + return {"groups": result} + + @defer.inlineCallbacks + def bulk_get_publicised_groups(self, user_ids, proxy=True): + destinations = {} + local_users = set() + + for user_id in user_ids: + if self.hs.is_mine_id(user_id): + local_users.add(user_id) + else: + destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id) + + if not proxy and destinations: + raise SynapseError(400, "Some user_ids are not local") + + results = {} + failed_results = [] + for destination, dest_user_ids in iteritems(destinations): + try: + r = yield self.transport_client.bulk_get_publicised_groups( + destination, list(dest_user_ids) + ) + results.update(r["users"]) + except Exception: + failed_results.extend(dest_user_ids) + + for uid in local_users: + results[uid] = yield self.store.get_publicised_groups_for_user(uid) + + # Check AS associated groups for this user - this depends on the + # RegExps in the AS registration file (under `users`) + for app_service in self.store.get_app_services(): + results[uid].extend(app_service.get_groups_for_user(uid)) + + return {"users": results} + + +class GroupsLocalHandler(GroupsLocalWorkerHandler): + def __init__(self, hs): + super(GroupsLocalHandler, self).__init__(hs) + + # Ensure attestations get renewed + hs.get_groups_attestation_renewer() + + # The following functions merely route the query to the local groups server + # or federation depending on if the group is local or remote + + update_group_profile = _create_rerouter("update_group_profile") + + add_room_to_group = _create_rerouter("add_room_to_group") + update_room_in_group = _create_rerouter("update_room_in_group") + remove_room_from_group = _create_rerouter("remove_room_from_group") + + update_group_summary_room = _create_rerouter("update_group_summary_room") + delete_group_summary_room = _create_rerouter("delete_group_summary_room") + + update_group_category = _create_rerouter("update_group_category") + delete_group_category = _create_rerouter("delete_group_category") + + update_group_summary_user = _create_rerouter("update_group_summary_user") + delete_group_summary_user = _create_rerouter("delete_group_summary_user") + + update_group_role = _create_rerouter("update_group_role") + delete_group_role = _create_rerouter("delete_group_role") + + set_group_join_policy = _create_rerouter("set_group_join_policy") + + @defer.inlineCallbacks def create_group(self, group_id, user_id, content): """Create a group """ @@ -220,48 +335,6 @@ class GroupsLocalHandler(object): return res @defer.inlineCallbacks - def get_users_in_group(self, group_id, requester_user_id): - """Get users in a group - """ - if self.is_mine_id(group_id): - res = yield self.groups_server_handler.get_users_in_group( - group_id, requester_user_id - ) - return res - - group_server_name = get_domain_from_id(group_id) - - try: - res = yield self.transport_client.get_users_in_group( - get_domain_from_id(group_id), group_id, requester_user_id - ) - except HttpResponseException as e: - raise e.to_synapse_error() - except RequestSendFailed: - raise SynapseError(502, "Failed to contact group server") - - chunk = res["chunk"] - valid_entries = [] - for entry in chunk: - g_user_id = entry["user_id"] - attestation = entry.pop("attestation", {}) - try: - if get_domain_from_id(g_user_id) != group_server_name: - yield self.attestations.verify_attestation( - attestation, - group_id=group_id, - user_id=g_user_id, - server_name=get_domain_from_id(g_user_id), - ) - valid_entries.append(entry) - except Exception as e: - logger.info("Failed to verify user is in group: %s", e) - - res["chunk"] = valid_entries - - return res - - @defer.inlineCallbacks def join_group(self, group_id, user_id, content): """Request to join a group """ @@ -452,68 +525,3 @@ class GroupsLocalHandler(object): group_id, user_id, membership="leave" ) self.notifier.on_new_event("groups_key", token, users=[user_id]) - - @defer.inlineCallbacks - def get_joined_groups(self, user_id): - group_ids = yield self.store.get_joined_groups(user_id) - return {"groups": group_ids} - - @defer.inlineCallbacks - def get_publicised_groups_for_user(self, user_id): - if self.hs.is_mine_id(user_id): - result = yield self.store.get_publicised_groups_for_user(user_id) - - # Check AS associated groups for this user - this depends on the - # RegExps in the AS registration file (under `users`) - for app_service in self.store.get_app_services(): - result.extend(app_service.get_groups_for_user(user_id)) - - return {"groups": result} - else: - try: - bulk_result = yield self.transport_client.bulk_get_publicised_groups( - get_domain_from_id(user_id), [user_id] - ) - except HttpResponseException as e: - raise e.to_synapse_error() - except RequestSendFailed: - raise SynapseError(502, "Failed to contact group server") - - result = bulk_result.get("users", {}).get(user_id) - # TODO: Verify attestations - return {"groups": result} - - @defer.inlineCallbacks - def bulk_get_publicised_groups(self, user_ids, proxy=True): - destinations = {} - local_users = set() - - for user_id in user_ids: - if self.hs.is_mine_id(user_id): - local_users.add(user_id) - else: - destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id) - - if not proxy and destinations: - raise SynapseError(400, "Some user_ids are not local") - - results = {} - failed_results = [] - for destination, dest_user_ids in iteritems(destinations): - try: - r = yield self.transport_client.bulk_get_publicised_groups( - destination, list(dest_user_ids) - ) - results.update(r["users"]) - except Exception: - failed_results.extend(dest_user_ids) - - for uid in local_users: - results[uid] = yield self.store.get_publicised_groups_for_user(uid) - - # Check AS associated groups for this user - this depends on the - # RegExps in the AS registration file (under `users`) - for app_service in self.store.get_app_services(): - results[uid].extend(app_service.get_groups_for_user(uid)) - - return {"users": results} diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index bdf16c84d3..be6ae18a92 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -932,10 +932,9 @@ class EventCreationHandler(object): # way? If we have been invited by a remote server, we need # to get them to sign the event. - returned_invite = yield federation_handler.send_invite( - invitee.domain, event + returned_invite = yield defer.ensureDeferred( + federation_handler.send_invite(invitee.domain, event) ) - event.unsigned.pop("room_state", None) # TODO: Make sure the signatures actually are correct. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b609a65f47..ab07edd2fc 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -259,7 +259,7 @@ class RoomCreationHandler(BaseHandler): for v in ("invite", "events_default"): current = int(pl_content.get(v, 0)) if current < restricted_level: - logger.info( + logger.debug( "Setting level for %s in %s to %i (was %i)", v, old_room_id, @@ -269,7 +269,7 @@ class RoomCreationHandler(BaseHandler): pl_content[v] = restricted_level updated = True else: - logger.info("Not setting level for %s (already %i)", v, current) + logger.debug("Not setting level for %s (already %i)", v, current) if updated: try: @@ -296,7 +296,7 @@ class RoomCreationHandler(BaseHandler): EventTypes.Aliases, events_default ) - logger.info("Setting correct PLs in new room to %s", new_pl_content) + logger.debug("Setting correct PLs in new room to %s", new_pl_content) yield self.event_creation_handler.create_and_send_nonmember_event( requester, { @@ -579,9 +579,13 @@ class RoomCreationHandler(BaseHandler): # Check whether the third party rules allows/changes the room create # request. - yield self.third_party_event_rules.on_create_room( + event_allowed = yield self.third_party_event_rules.on_create_room( requester, config, is_requester_admin=is_requester_admin ) + if not event_allowed: + raise SynapseError( + 403, "You are not permitted to create rooms", Codes.FORBIDDEN + ) if not is_requester_admin and not self.spam_checker.user_may_create_room( user_id @@ -782,7 +786,7 @@ class RoomCreationHandler(BaseHandler): @defer.inlineCallbacks def send(etype, content, **kwargs): event = create(etype, content, **kwargs) - logger.info("Sending %s in new room", etype) + logger.debug("Sending %s in new room", etype) yield self.event_creation_handler.create_and_send_nonmember_event( creator, event, ratelimit=False ) @@ -796,7 +800,7 @@ class RoomCreationHandler(BaseHandler): creation_content.update({"creator": creator_id}) yield send(etype=EventTypes.Create, content=creation_content) - logger.info("Sending %s in new room", EventTypes.Member) + logger.debug("Sending %s in new room", EventTypes.Member) yield self.room_member_handler.update_membership( creator, creator.user, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 15e8aa5249..4260426369 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -944,8 +944,10 @@ class RoomMemberMasterHandler(RoomMemberHandler): # join dance for now, since we're kinda implicitly checking # that we are allowed to join when we decide whether or not we # need to do the invite/join dance. - yield self.federation_handler.do_invite_join( - remote_room_hosts, room_id, user.to_string(), content + yield defer.ensureDeferred( + self.federation_handler.do_invite_join( + remote_room_hosts, room_id, user.to_string(), content + ) ) yield self._user_joined_room(user, room_id) @@ -982,8 +984,10 @@ class RoomMemberMasterHandler(RoomMemberHandler): """ fed_handler = self.federation_handler try: - ret = yield fed_handler.do_remotely_reject_invite( - remote_room_hosts, room_id, target.to_string(), content=content, + ret = yield defer.ensureDeferred( + fed_handler.do_remotely_reject_invite( + remote_room_hosts, room_id, target.to_string(), content=content, + ) ) return ret except Exception as e: diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 68e6edace5..d93a276693 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -300,7 +300,7 @@ class StatsHandler(StateDeltasHandler): room_state["guest_access"] = event_content.get("guest_access") for room_id, state in room_to_state_updates.items(): - logger.info("Updating room_stats_state for %s: %s", room_id, state) + logger.debug("Updating room_stats_state for %s: %s", room_id, state) yield self.store.update_room_state(room_id, state) return room_to_stats_deltas, user_to_stats_deltas diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index cd95f85e3f..f8d60d32ba 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -14,20 +14,30 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections import itertools import logging +from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple from six import iteritems, itervalues +import attr from prometheus_client import Counter from synapse.api.constants import EventTypes, Membership +from synapse.api.filtering import FilterCollection +from synapse.events import EventBase from synapse.logging.context import LoggingContext from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.roommember import MemberSummary from synapse.storage.state import StateFilter -from synapse.types import RoomStreamToken +from synapse.types import ( + Collection, + JsonDict, + RoomStreamToken, + StateMap, + StreamToken, + UserID, +) from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.lrucache import LruCache @@ -62,17 +72,22 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000 LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100 -SyncConfig = collections.namedtuple( - "SyncConfig", ["user", "filter_collection", "is_guest", "request_key", "device_id"] -) +@attr.s(slots=True, frozen=True) +class SyncConfig: + user = attr.ib(type=UserID) + filter_collection = attr.ib(type=FilterCollection) + is_guest = attr.ib(type=bool) + request_key = attr.ib(type=Tuple[Any, ...]) + device_id = attr.ib(type=str) -class TimelineBatch( - collections.namedtuple("TimelineBatch", ["prev_batch", "events", "limited"]) -): - __slots__ = [] +@attr.s(slots=True, frozen=True) +class TimelineBatch: + prev_batch = attr.ib(type=StreamToken) + events = attr.ib(type=List[EventBase]) + limited = attr.ib(bool) - def __nonzero__(self): + def __nonzero__(self) -> bool: """Make the result appear empty if there are no updates. This is used to tell if room needs to be part of the sync result. """ @@ -81,23 +96,17 @@ class TimelineBatch( __bool__ = __nonzero__ # python3 -class JoinedSyncResult( - collections.namedtuple( - "JoinedSyncResult", - [ - "room_id", # str - "timeline", # TimelineBatch - "state", # dict[(str, str), FrozenEvent] - "ephemeral", - "account_data", - "unread_notifications", - "summary", - ], - ) -): - __slots__ = [] - - def __nonzero__(self): +@attr.s(slots=True, frozen=True) +class JoinedSyncResult: + room_id = attr.ib(type=str) + timeline = attr.ib(type=TimelineBatch) + state = attr.ib(type=StateMap[EventBase]) + ephemeral = attr.ib(type=List[JsonDict]) + account_data = attr.ib(type=List[JsonDict]) + unread_notifications = attr.ib(type=JsonDict) + summary = attr.ib(type=Optional[JsonDict]) + + def __nonzero__(self) -> bool: """Make the result appear empty if there are no updates. This is used to tell if room needs to be part of the sync result. """ @@ -113,20 +122,14 @@ class JoinedSyncResult( __bool__ = __nonzero__ # python3 -class ArchivedSyncResult( - collections.namedtuple( - "ArchivedSyncResult", - [ - "room_id", # str - "timeline", # TimelineBatch - "state", # dict[(str, str), FrozenEvent] - "account_data", - ], - ) -): - __slots__ = [] - - def __nonzero__(self): +@attr.s(slots=True, frozen=True) +class ArchivedSyncResult: + room_id = attr.ib(type=str) + timeline = attr.ib(type=TimelineBatch) + state = attr.ib(type=StateMap[EventBase]) + account_data = attr.ib(type=List[JsonDict]) + + def __nonzero__(self) -> bool: """Make the result appear empty if there are no updates. This is used to tell if room needs to be part of the sync result. """ @@ -135,70 +138,88 @@ class ArchivedSyncResult( __bool__ = __nonzero__ # python3 -class InvitedSyncResult( - collections.namedtuple( - "InvitedSyncResult", - ["room_id", "invite"], # str # FrozenEvent: the invite event - ) -): - __slots__ = [] +@attr.s(slots=True, frozen=True) +class InvitedSyncResult: + room_id = attr.ib(type=str) + invite = attr.ib(type=EventBase) - def __nonzero__(self): + def __nonzero__(self) -> bool: """Invited rooms should always be reported to the client""" return True __bool__ = __nonzero__ # python3 -class GroupsSyncResult( - collections.namedtuple("GroupsSyncResult", ["join", "invite", "leave"]) -): - __slots__ = [] +@attr.s(slots=True, frozen=True) +class GroupsSyncResult: + join = attr.ib(type=JsonDict) + invite = attr.ib(type=JsonDict) + leave = attr.ib(type=JsonDict) - def __nonzero__(self): + def __nonzero__(self) -> bool: return bool(self.join or self.invite or self.leave) __bool__ = __nonzero__ # python3 -class DeviceLists( - collections.namedtuple( - "DeviceLists", - [ - "changed", # list of user_ids whose devices may have changed - "left", # list of user_ids whose devices we no longer track - ], - ) -): - __slots__ = [] +@attr.s(slots=True, frozen=True) +class DeviceLists: + """ + Attributes: + changed: List of user_ids whose devices may have changed + left: List of user_ids whose devices we no longer track + """ + + changed = attr.ib(type=Collection[str]) + left = attr.ib(type=Collection[str]) - def __nonzero__(self): + def __nonzero__(self) -> bool: return bool(self.changed or self.left) __bool__ = __nonzero__ # python3 -class SyncResult( - collections.namedtuple( - "SyncResult", - [ - "next_batch", # Token for the next sync - "presence", # List of presence events for the user. - "account_data", # List of account_data events for the user. - "joined", # JoinedSyncResult for each joined room. - "invited", # InvitedSyncResult for each invited room. - "archived", # ArchivedSyncResult for each archived room. - "to_device", # List of direct messages for the device. - "device_lists", # List of user_ids whose devices have changed - "device_one_time_keys_count", # Dict of algorithm to count for one time keys - # for this device - "groups", - ], - ) -): - __slots__ = [] - - def __nonzero__(self): +@attr.s +class _RoomChanges: + """The set of room entries to include in the sync, plus the set of joined + and left room IDs since last sync. + """ + + room_entries = attr.ib(type=List["RoomSyncResultBuilder"]) + invited = attr.ib(type=List[InvitedSyncResult]) + newly_joined_rooms = attr.ib(type=List[str]) + newly_left_rooms = attr.ib(type=List[str]) + + +@attr.s(slots=True, frozen=True) +class SyncResult: + """ + Attributes: + next_batch: Token for the next sync + presence: List of presence events for the user. + account_data: List of account_data events for the user. + joined: JoinedSyncResult for each joined room. + invited: InvitedSyncResult for each invited room. + archived: ArchivedSyncResult for each archived room. + to_device: List of direct messages for the device. + device_lists: List of user_ids whose devices have changed + device_one_time_keys_count: Dict of algorithm to count for one time keys + for this device + groups: Group updates, if any + """ + + next_batch = attr.ib(type=StreamToken) + presence = attr.ib(type=List[JsonDict]) + account_data = attr.ib(type=List[JsonDict]) + joined = attr.ib(type=List[JoinedSyncResult]) + invited = attr.ib(type=List[InvitedSyncResult]) + archived = attr.ib(type=List[ArchivedSyncResult]) + to_device = attr.ib(type=List[JsonDict]) + device_lists = attr.ib(type=DeviceLists) + device_one_time_keys_count = attr.ib(type=JsonDict) + groups = attr.ib(type=Optional[GroupsSyncResult]) + + def __nonzero__(self) -> bool: """Make the result appear empty if there are no updates. This is used to tell if the notifier needs to wait for more events when polling for events. @@ -240,13 +261,15 @@ class SyncHandler(object): ) async def wait_for_sync_for_user( - self, sync_config, since_token=None, timeout=0, full_state=False - ): + self, + sync_config: SyncConfig, + since_token: Optional[StreamToken] = None, + timeout: int = 0, + full_state: bool = False, + ) -> SyncResult: """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then return an empty sync result. - Returns: - Deferred[SyncResult] """ # If the user is not part of the mau group, then check that limits have # not been exceeded (if not part of the group by this point, almost certain @@ -265,8 +288,12 @@ class SyncHandler(object): return res async def _wait_for_sync_for_user( - self, sync_config, since_token, timeout, full_state - ): + self, + sync_config: SyncConfig, + since_token: Optional[StreamToken] = None, + timeout: int = 0, + full_state: bool = False, + ) -> SyncResult: if since_token is None: sync_type = "initial_sync" elif full_state: @@ -305,25 +332,33 @@ class SyncHandler(object): return result - def current_sync_for_user(self, sync_config, since_token=None, full_state=False): + async def current_sync_for_user( + self, + sync_config: SyncConfig, + since_token: Optional[StreamToken] = None, + full_state: bool = False, + ) -> SyncResult: """Get the sync for client needed to match what the server has now. - Returns: - A Deferred SyncResult. """ - return self.generate_sync_result(sync_config, since_token, full_state) + return await self.generate_sync_result(sync_config, since_token, full_state) - async def push_rules_for_user(self, user): + async def push_rules_for_user(self, user: UserID) -> JsonDict: user_id = user.to_string() rules = await self.store.get_push_rules_for_user(user_id) rules = format_push_rules_for_user(user, rules) return rules - async def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None): + async def ephemeral_by_room( + self, + sync_result_builder: "SyncResultBuilder", + now_token: StreamToken, + since_token: Optional[StreamToken] = None, + ) -> Tuple[StreamToken, Dict[str, List[JsonDict]]]: """Get the ephemeral events for each room the user is in Args: - sync_result_builder(SyncResultBuilder) - now_token (StreamToken): Where the server is currently up to. - since_token (StreamToken): Where the server was when the client + sync_result_builder + now_token: Where the server is currently up to. + since_token: Where the server was when the client last synced. Returns: A tuple of the now StreamToken, updated to reflect the which typing @@ -348,7 +383,7 @@ class SyncHandler(object): ) now_token = now_token.copy_and_replace("typing_key", typing_key) - ephemeral_by_room = {} + ephemeral_by_room = {} # type: JsonDict for event in typing: # we want to exclude the room_id from the event, but modifying the @@ -380,13 +415,13 @@ class SyncHandler(object): async def _load_filtered_recents( self, - room_id, - sync_config, - now_token, - since_token=None, - recents=None, - newly_joined_room=False, - ): + room_id: str, + sync_config: SyncConfig, + now_token: StreamToken, + since_token: Optional[StreamToken] = None, + potential_recents: Optional[List[EventBase]] = None, + newly_joined_room: bool = False, + ) -> TimelineBatch: """ Returns: a Deferred TimelineBatch @@ -397,21 +432,29 @@ class SyncHandler(object): sync_config.filter_collection.blocks_all_room_timeline() ) - if recents is None or newly_joined_room or timeline_limit < len(recents): + if ( + potential_recents is None + or newly_joined_room + or timeline_limit < len(potential_recents) + ): limited = True else: limited = False - if recents: - recents = sync_config.filter_collection.filter_room_timeline(recents) + if potential_recents: + recents = sync_config.filter_collection.filter_room_timeline( + potential_recents + ) # We check if there are any state events, if there are then we pass # all current state events to the filter_events function. This is to # ensure that we always include current state in the timeline - current_state_ids = frozenset() + current_state_ids = frozenset() # type: FrozenSet[str] if any(e.is_state() for e in recents): - current_state_ids = await self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(itervalues(current_state_ids)) + current_state_ids_map = await self.state.get_current_state_ids( + room_id + ) + current_state_ids = frozenset(itervalues(current_state_ids_map)) recents = await filter_events_for_client( self.storage, @@ -463,8 +506,10 @@ class SyncHandler(object): # ensure that we always include current state in the timeline current_state_ids = frozenset() if any(e.is_state() for e in loaded_recents): - current_state_ids = await self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(itervalues(current_state_ids)) + current_state_ids_map = await self.state.get_current_state_ids( + room_id + ) + current_state_ids = frozenset(itervalues(current_state_ids_map)) loaded_recents = await filter_events_for_client( self.storage, @@ -493,17 +538,15 @@ class SyncHandler(object): limited=limited or newly_joined_room, ) - async def get_state_after_event(self, event, state_filter=StateFilter.all()): + async def get_state_after_event( + self, event: EventBase, state_filter: StateFilter = StateFilter.all() + ) -> StateMap[str]: """ Get the room state after the given event Args: - event(synapse.events.EventBase): event of interest - state_filter (StateFilter): The state filter used to fetch state - from the database. - - Returns: - A Deferred map from ((type, state_key)->Event) + event: event of interest + state_filter: The state filter used to fetch state from the database. """ state_ids = await self.state_store.get_state_ids_for_event( event.event_id, state_filter=state_filter @@ -514,18 +557,17 @@ class SyncHandler(object): return state_ids async def get_state_at( - self, room_id, stream_position, state_filter=StateFilter.all() - ): + self, + room_id: str, + stream_position: StreamToken, + state_filter: StateFilter = StateFilter.all(), + ) -> StateMap[str]: """ Get the room state at a particular stream position Args: - room_id(str): room for which to get state - stream_position(StreamToken): point at which to get state - state_filter (StateFilter): The state filter used to fetch state - from the database. - - Returns: - A Deferred map from ((type, state_key)->Event) + room_id: room for which to get state + stream_position: point at which to get state + state_filter: The state filter used to fetch state from the database. """ # FIXME this claims to get the state at a stream position, but # get_recent_events_for_room operates by topo ordering. This therefore @@ -546,23 +588,25 @@ class SyncHandler(object): state = {} return state - async def compute_summary(self, room_id, sync_config, batch, state, now_token): + async def compute_summary( + self, + room_id: str, + sync_config: SyncConfig, + batch: TimelineBatch, + state: StateMap[EventBase], + now_token: StreamToken, + ) -> Optional[JsonDict]: """ Works out a room summary block for this room, summarising the number of joined members in the room, and providing the 'hero' members if the room has no name so clients can consistently name rooms. Also adds state events to 'state' if needed to describe the heroes. - Args: - room_id(str): - sync_config(synapse.handlers.sync.SyncConfig): - batch(synapse.handlers.sync.TimelineBatch): The timeline batch for - the room that will be sent to the user. - state(dict): dict of (type, state_key) -> Event as returned by - compute_state_delta - now_token(str): Token of the end of the current batch. - - Returns: - A deferred dict describing the room summary + Args + room_id + sync_config + batch: The timeline batch for the room that will be sent to the user. + state: State as returned by compute_state_delta + now_token: Token of the end of the current batch. """ # FIXME: we could/should get this from room_stats when matthew/stats lands @@ -681,7 +725,7 @@ class SyncHandler(object): return summary - def get_lazy_loaded_members_cache(self, cache_key): + def get_lazy_loaded_members_cache(self, cache_key: Tuple[str, str]) -> LruCache: cache = self.lazy_loaded_members_cache.get(cache_key) if cache is None: logger.debug("creating LruCache for %r", cache_key) @@ -692,23 +736,24 @@ class SyncHandler(object): return cache async def compute_state_delta( - self, room_id, batch, sync_config, since_token, now_token, full_state - ): + self, + room_id: str, + batch: TimelineBatch, + sync_config: SyncConfig, + since_token: Optional[StreamToken], + now_token: StreamToken, + full_state: bool, + ) -> StateMap[EventBase]: """ Works out the difference in state between the start of the timeline and the previous sync. Args: - room_id(str): - batch(synapse.handlers.sync.TimelineBatch): The timeline batch for - the room that will be sent to the user. - sync_config(synapse.handlers.sync.SyncConfig): - since_token(str|None): Token of the end of the previous batch. May - be None. - now_token(str): Token of the end of the current batch. - full_state(bool): Whether to force returning the full state. - - Returns: - A deferred dict of (type, state_key) -> Event + room_id: + batch: The timeline batch for the room that will be sent to the user. + sync_config: + since_token: Token of the end of the previous batch. May be None. + now_token: Token of the end of the current batch. + full_state: Whether to force returning the full state. """ # TODO(mjark) Check if the state events were received by the server # after the previous sync, since we need to include those state @@ -800,6 +845,10 @@ class SyncHandler(object): # about them). state_filter = StateFilter.all() + # If this is an initial sync then full_state should be set, and + # that case is handled above. We assert here to ensure that this + # is indeed the case. + assert since_token is not None state_at_previous_sync = await self.get_state_at( room_id, stream_position=since_token, state_filter=state_filter ) @@ -874,7 +923,7 @@ class SyncHandler(object): if t[0] == EventTypes.Member: cache.set(t[1], event_id) - state = {} + state = {} # type: Dict[str, EventBase] if state_ids: state = await self.store.get_events(list(state_ids.values())) @@ -885,7 +934,9 @@ class SyncHandler(object): ) } - async def unread_notifs_for_room_id(self, room_id, sync_config): + async def unread_notifs_for_room_id( + self, room_id: str, sync_config: SyncConfig + ) -> Optional[Dict[str, str]]: with Measure(self.clock, "unread_notifs_for_room_id"): last_unread_event_id = await self.store.get_last_receipt_event_id_for_user( user_id=sync_config.user.to_string(), @@ -893,7 +944,6 @@ class SyncHandler(object): receipt_type="m.read", ) - notifs = [] if last_unread_event_id: notifs = await self.store.get_unread_event_push_actions_by_room_for_user( room_id, sync_config.user.to_string(), last_unread_event_id @@ -905,17 +955,12 @@ class SyncHandler(object): return None async def generate_sync_result( - self, sync_config, since_token=None, full_state=False - ): + self, + sync_config: SyncConfig, + since_token: Optional[StreamToken] = None, + full_state: bool = False, + ) -> SyncResult: """Generates a sync result. - - Args: - sync_config (SyncConfig) - since_token (StreamToken) - full_state (bool) - - Returns: - Deferred(SyncResult) """ # NB: The now_token gets changed by some of the generate_sync_* methods, # this is due to some of the underlying streams not supporting the ability @@ -923,7 +968,7 @@ class SyncHandler(object): # Always use the `now_token` in `SyncResultBuilder` now_token = await self.event_sources.get_current_token() - logger.info( + logger.debug( "Calculating sync response for %r between %s and %s", sync_config.user, since_token, @@ -977,7 +1022,7 @@ class SyncHandler(object): ) device_id = sync_config.device_id - one_time_key_counts = {} + one_time_key_counts = {} # type: JsonDict if device_id: one_time_key_counts = await self.store.count_e2e_one_time_keys( user_id, device_id @@ -1007,7 +1052,9 @@ class SyncHandler(object): ) @measure_func("_generate_sync_entry_for_groups") - async def _generate_sync_entry_for_groups(self, sync_result_builder): + async def _generate_sync_entry_for_groups( + self, sync_result_builder: "SyncResultBuilder" + ) -> None: user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token now_token = sync_result_builder.now_token @@ -1052,27 +1099,22 @@ class SyncHandler(object): @measure_func("_generate_sync_entry_for_device_list") async def _generate_sync_entry_for_device_list( self, - sync_result_builder, - newly_joined_rooms, - newly_joined_or_invited_users, - newly_left_rooms, - newly_left_users, - ): + sync_result_builder: "SyncResultBuilder", + newly_joined_rooms: Set[str], + newly_joined_or_invited_users: Set[str], + newly_left_rooms: Set[str], + newly_left_users: Set[str], + ) -> DeviceLists: """Generate the DeviceLists section of sync Args: - sync_result_builder (SyncResultBuilder) - newly_joined_rooms (set[str]): Set of rooms user has joined since - previous sync - newly_joined_or_invited_users (set[str]): Set of users that have - joined or been invited to a room since previous sync. - newly_left_rooms (set[str]): Set of rooms user has left since + sync_result_builder + newly_joined_rooms: Set of rooms user has joined since previous sync + newly_joined_or_invited_users: Set of users that have joined or + been invited to a room since previous sync. + newly_left_rooms: Set of rooms user has left since previous sync + newly_left_users: Set of users that have left a room we're in since previous sync - newly_left_users (set[str]): Set of users that have left a room - we're in since previous sync - - Returns: - Deferred[DeviceLists] """ user_id = sync_result_builder.sync_config.user.to_string() @@ -1133,15 +1175,11 @@ class SyncHandler(object): else: return DeviceLists(changed=[], left=[]) - async def _generate_sync_entry_for_to_device(self, sync_result_builder): + async def _generate_sync_entry_for_to_device( + self, sync_result_builder: "SyncResultBuilder" + ) -> None: """Generates the portion of the sync response. Populates `sync_result_builder` with the result. - - Args: - sync_result_builder(SyncResultBuilder) - - Returns: - Deferred(dict): A dictionary containing the per room account data. """ user_id = sync_result_builder.sync_config.user.to_string() device_id = sync_result_builder.sync_config.device_id @@ -1179,15 +1217,17 @@ class SyncHandler(object): else: sync_result_builder.to_device = [] - async def _generate_sync_entry_for_account_data(self, sync_result_builder): + async def _generate_sync_entry_for_account_data( + self, sync_result_builder: "SyncResultBuilder" + ) -> Dict[str, Dict[str, JsonDict]]: """Generates the account data portion of the sync response. Populates `sync_result_builder` with the result. Args: - sync_result_builder(SyncResultBuilder) + sync_result_builder Returns: - Deferred(dict): A dictionary containing the per room account data. + A dictionary containing the per room account data. """ sync_config = sync_result_builder.sync_config user_id = sync_result_builder.sync_config.user.to_string() @@ -1231,18 +1271,21 @@ class SyncHandler(object): return account_data_by_room async def _generate_sync_entry_for_presence( - self, sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users - ): + self, + sync_result_builder: "SyncResultBuilder", + newly_joined_rooms: Set[str], + newly_joined_or_invited_users: Set[str], + ) -> None: """Generates the presence portion of the sync response. Populates the `sync_result_builder` with the result. Args: - sync_result_builder(SyncResultBuilder) - newly_joined_rooms(list): List of rooms that the user has joined - since the last sync (or empty if an initial sync) - newly_joined_or_invited_users(list): List of users that have joined - or been invited to rooms since the last sync (or empty if an initial - sync) + sync_result_builder + newly_joined_rooms: Set of rooms that the user has joined since + the last sync (or empty if an initial sync) + newly_joined_or_invited_users: Set of users that have joined or + been invited to rooms since the last sync (or empty if an + initial sync) """ now_token = sync_result_builder.now_token sync_config = sync_result_builder.sync_config @@ -1286,17 +1329,19 @@ class SyncHandler(object): sync_result_builder.presence = presence async def _generate_sync_entry_for_rooms( - self, sync_result_builder, account_data_by_room - ): + self, + sync_result_builder: "SyncResultBuilder", + account_data_by_room: Dict[str, Dict[str, JsonDict]], + ) -> Tuple[Set[str], Set[str], Set[str], Set[str]]: """Generates the rooms portion of the sync response. Populates the `sync_result_builder` with the result. Args: - sync_result_builder(SyncResultBuilder) - account_data_by_room(dict): Dictionary of per room account data + sync_result_builder + account_data_by_room: Dictionary of per room account data Returns: - Deferred(tuple): Returns a 4-tuple of + Returns a 4-tuple of `(newly_joined_rooms, newly_joined_or_invited_users, newly_left_rooms, newly_left_users)` """ @@ -1307,7 +1352,7 @@ class SyncHandler(object): ) if block_all_room_ephemeral: - ephemeral_by_room = {} + ephemeral_by_room = {} # type: Dict[str, List[JsonDict]] else: now_token, ephemeral_by_room = await self.ephemeral_by_room( sync_result_builder, @@ -1328,7 +1373,7 @@ class SyncHandler(object): ) if not tags_by_room: logger.debug("no-oping sync") - return [], [], [], [] + return set(), set(), set(), set() ignored_account_data = await self.store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id=user_id @@ -1340,19 +1385,22 @@ class SyncHandler(object): ignored_users = frozenset() if since_token: - res = await self._get_rooms_changed(sync_result_builder, ignored_users) - room_entries, invited, newly_joined_rooms, newly_left_rooms = res - + room_changes = await self._get_rooms_changed( + sync_result_builder, ignored_users + ) tags_by_room = await self.store.get_updated_tags( user_id, since_token.account_data_key ) else: - res = await self._get_all_rooms(sync_result_builder, ignored_users) - room_entries, invited, newly_joined_rooms = res - newly_left_rooms = [] + room_changes = await self._get_all_rooms(sync_result_builder, ignored_users) tags_by_room = await self.store.get_tags_for_user(user_id) + room_entries = room_changes.room_entries + invited = room_changes.invited + newly_joined_rooms = room_changes.newly_joined_rooms + newly_left_rooms = room_changes.newly_left_rooms + def handle_room_entries(room_entry): return self._generate_room_entry( sync_result_builder, @@ -1392,13 +1440,15 @@ class SyncHandler(object): newly_left_users -= newly_joined_or_invited_users return ( - newly_joined_rooms, + set(newly_joined_rooms), newly_joined_or_invited_users, - newly_left_rooms, + set(newly_left_rooms), newly_left_users, ) - async def _have_rooms_changed(self, sync_result_builder): + async def _have_rooms_changed( + self, sync_result_builder: "SyncResultBuilder" + ) -> bool: """Returns whether there may be any new events that should be sent down the sync. Returns True if there are. """ @@ -1422,22 +1472,10 @@ class SyncHandler(object): return True return False - async def _get_rooms_changed(self, sync_result_builder, ignored_users): + async def _get_rooms_changed( + self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str] + ) -> _RoomChanges: """Gets the the changes that have happened since the last sync. - - Args: - sync_result_builder(SyncResultBuilder) - ignored_users(set(str)): Set of users ignored by user. - - Returns: - Deferred(tuple): Returns a tuple of the form: - `(room_entries, invited_rooms, newly_joined_rooms, newly_left_rooms)` - - where: - room_entries is a list [RoomSyncResultBuilder] - invited_rooms is a list [InvitedSyncResult] - newly_joined_rooms is a list[str] of room ids - newly_left_rooms is a list[str] of room ids """ user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token @@ -1451,7 +1489,7 @@ class SyncHandler(object): user_id, since_token.room_key, now_token.room_key ) - mem_change_events_by_room_id = {} + mem_change_events_by_room_id = {} # type: Dict[str, List[EventBase]] for event in rooms_changed: mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) @@ -1460,7 +1498,7 @@ class SyncHandler(object): room_entries = [] invited = [] for room_id, events in iteritems(mem_change_events_by_room_id): - logger.info( + logger.debug( "Membership changes in %s: [%s]", room_id, ", ".join(("%s (%s)" % (e.event_id, e.membership) for e in events)), @@ -1570,7 +1608,7 @@ class SyncHandler(object): # This is all screaming out for a refactor, as the logic here is # subtle and the moving parts numerous. if leave_event.internal_metadata.is_out_of_band_membership(): - batch_events = [leave_event] + batch_events = [leave_event] # type: Optional[List[EventBase]] else: batch_events = None @@ -1636,18 +1674,17 @@ class SyncHandler(object): ) room_entries.append(entry) - return room_entries, invited, newly_joined_rooms, newly_left_rooms + return _RoomChanges(room_entries, invited, newly_joined_rooms, newly_left_rooms) - async def _get_all_rooms(self, sync_result_builder, ignored_users): + async def _get_all_rooms( + self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str] + ) -> _RoomChanges: """Returns entries for all rooms for the user. Args: - sync_result_builder(SyncResultBuilder) - ignored_users(set(str)): Set of users ignored by user. + sync_result_builder + ignored_users: Set of users ignored by user. - Returns: - Deferred(tuple): Returns a tuple of the form: - `([RoomSyncResultBuilder], [InvitedSyncResult], [])` """ user_id = sync_result_builder.sync_config.user.to_string() @@ -1709,30 +1746,30 @@ class SyncHandler(object): ) ) - return room_entries, invited, [] + return _RoomChanges(room_entries, invited, [], []) async def _generate_room_entry( self, - sync_result_builder, - ignored_users, - room_builder, - ephemeral, - tags, - account_data, - always_include=False, + sync_result_builder: "SyncResultBuilder", + ignored_users: Set[str], + room_builder: "RoomSyncResultBuilder", + ephemeral: List[JsonDict], + tags: Optional[List[JsonDict]], + account_data: Dict[str, JsonDict], + always_include: bool = False, ): """Populates the `joined` and `archived` section of `sync_result_builder` based on the `room_builder`. Args: - sync_result_builder(SyncResultBuilder) - ignored_users(set(str)): Set of users ignored by user. - room_builder(RoomSyncResultBuilder) - ephemeral(list): List of new ephemeral events for room - tags(list): List of *all* tags for room, or None if there has been + sync_result_builder + ignored_users: Set of users ignored by user. + room_builder + ephemeral: List of new ephemeral events for room + tags: List of *all* tags for room, or None if there has been no change. - account_data(list): List of new account data for room - always_include(bool): Always include this room in the sync response, + account_data: List of new account data for room + always_include: Always include this room in the sync response, even if empty. """ newly_joined = room_builder.newly_joined @@ -1758,7 +1795,7 @@ class SyncHandler(object): sync_config, now_token=upto_token, since_token=since_token, - recents=events, + potential_recents=events, newly_joined_room=newly_joined, ) @@ -1809,7 +1846,7 @@ class SyncHandler(object): room_id, batch, sync_config, since_token, now_token, full_state=full_state ) - summary = {} + summary = {} # type: Optional[JsonDict] # we include a summary in room responses when we're lazy loading # members (as the client otherwise doesn't have enough info to form @@ -1833,7 +1870,7 @@ class SyncHandler(object): ) if room_builder.rtype == "joined": - unread_notifications = {} + unread_notifications = {} # type: Dict[str, str] room_sync = JoinedSyncResult( room_id=room_id, timeline=batch, @@ -1855,23 +1892,25 @@ class SyncHandler(object): if batch.limited and since_token: user_id = sync_result_builder.sync_config.user.to_string() - logger.info( + logger.debug( "Incremental gappy sync of %s for user %s with %d state events" % (room_id, user_id, len(state)) ) elif room_builder.rtype == "archived": - room_sync = ArchivedSyncResult( + archived_room_sync = ArchivedSyncResult( room_id=room_id, timeline=batch, state=state, account_data=account_data_events, ) - if room_sync or always_include: - sync_result_builder.archived.append(room_sync) + if archived_room_sync or always_include: + sync_result_builder.archived.append(archived_room_sync) else: raise Exception("Unrecognized rtype: %r", room_builder.rtype) - async def get_rooms_for_user_at(self, user_id, stream_ordering): + async def get_rooms_for_user_at( + self, user_id: str, stream_ordering: int + ) -> FrozenSet[str]: """Get set of joined rooms for a user at the given stream ordering. The stream ordering *must* be recent, otherwise this may throw an @@ -1879,12 +1918,11 @@ class SyncHandler(object): current token, which should be perfectly fine). Args: - user_id (str) - stream_ordering (int) + user_id + stream_ordering ReturnValue: - Deferred[frozenset[str]]: Set of room_ids the user is in at given - stream_ordering. + Set of room_ids the user is in at given stream_ordering. """ joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id) @@ -1911,11 +1949,10 @@ class SyncHandler(object): if user_id in users_in_room: joined_room_ids.add(room_id) - joined_room_ids = frozenset(joined_room_ids) - return joined_room_ids + return frozenset(joined_room_ids) -def _action_has_highlight(actions): +def _action_has_highlight(actions: List[JsonDict]) -> bool: for action in actions: try: if action.get("set_tweak", None) == "highlight": @@ -1927,22 +1964,23 @@ def _action_has_highlight(actions): def _calculate_state( - timeline_contains, timeline_start, previous, current, lazy_load_members -): + timeline_contains: StateMap[str], + timeline_start: StateMap[str], + previous: StateMap[str], + current: StateMap[str], + lazy_load_members: bool, +) -> StateMap[str]: """Works out what state to include in a sync response. Args: - timeline_contains (dict): state in the timeline - timeline_start (dict): state at the start of the timeline - previous (dict): state at the end of the previous sync (or empty dict + timeline_contains: state in the timeline + timeline_start: state at the start of the timeline + previous: state at the end of the previous sync (or empty dict if this is an initial sync) - current (dict): state at the end of the timeline - lazy_load_members (bool): whether to return members from timeline_start + current: state at the end of the timeline + lazy_load_members: whether to return members from timeline_start or not. assumes that timeline_start has already been filtered to include only the members the client needs to know about. - - Returns: - dict """ event_id_to_key = { e: key @@ -1979,15 +2017,16 @@ def _calculate_state( return {event_id_to_key[e]: e for e in state_ids} -class SyncResultBuilder(object): +@attr.s +class SyncResultBuilder: """Used to help build up a new SyncResult for a user Attributes: - sync_config (SyncConfig) - full_state (bool) - since_token (StreamToken) - now_token (StreamToken) - joined_room_ids (list[str]) + sync_config + full_state: The full_state flag as specified by user + since_token: The token supplied by user, or None. + now_token: The token to sync up to. + joined_room_ids: List of rooms the user is joined to # The following mirror the fields in a sync response presence (list) @@ -1995,61 +2034,45 @@ class SyncResultBuilder(object): joined (list[JoinedSyncResult]) invited (list[InvitedSyncResult]) archived (list[ArchivedSyncResult]) - device (list) groups (GroupsSyncResult|None) to_device (list) """ - def __init__( - self, sync_config, full_state, since_token, now_token, joined_room_ids - ): - """ - Args: - sync_config (SyncConfig) - full_state (bool): The full_state flag as specified by user - since_token (StreamToken): The token supplied by user, or None. - now_token (StreamToken): The token to sync up to. - joined_room_ids (list[str]): List of rooms the user is joined to - """ - self.sync_config = sync_config - self.full_state = full_state - self.since_token = since_token - self.now_token = now_token - self.joined_room_ids = joined_room_ids - - self.presence = [] - self.account_data = [] - self.joined = [] - self.invited = [] - self.archived = [] - self.device = [] - self.groups = None - self.to_device = [] + sync_config = attr.ib(type=SyncConfig) + full_state = attr.ib(type=bool) + since_token = attr.ib(type=Optional[StreamToken]) + now_token = attr.ib(type=StreamToken) + joined_room_ids = attr.ib(type=FrozenSet[str]) + + presence = attr.ib(type=List[JsonDict], default=attr.Factory(list)) + account_data = attr.ib(type=List[JsonDict], default=attr.Factory(list)) + joined = attr.ib(type=List[JoinedSyncResult], default=attr.Factory(list)) + invited = attr.ib(type=List[InvitedSyncResult], default=attr.Factory(list)) + archived = attr.ib(type=List[ArchivedSyncResult], default=attr.Factory(list)) + groups = attr.ib(type=Optional[GroupsSyncResult], default=None) + to_device = attr.ib(type=List[JsonDict], default=attr.Factory(list)) +@attr.s class RoomSyncResultBuilder(object): """Stores information needed to create either a `JoinedSyncResult` or `ArchivedSyncResult`. + + Attributes: + room_id + rtype: One of `"joined"` or `"archived"` + events: List of events to include in the room (more events may be added + when generating result). + newly_joined: If the user has newly joined the room + full_state: Whether the full state should be sent in result + since_token: Earliest point to return events from, or None + upto_token: Latest point to return events from. """ - def __init__( - self, room_id, rtype, events, newly_joined, full_state, since_token, upto_token - ): - """ - Args: - room_id(str) - rtype(str): One of `"joined"` or `"archived"` - events(list[FrozenEvent]): List of events to include in the room - (more events may be added when generating result). - newly_joined(bool): If the user has newly joined the room - full_state(bool): Whether the full state should be sent in result - since_token(StreamToken): Earliest point to return events from, or None - upto_token(StreamToken): Latest point to return events from. - """ - self.room_id = room_id - self.rtype = rtype - self.events = events - self.newly_joined = newly_joined - self.full_state = full_state - self.since_token = since_token - self.upto_token = upto_token + room_id = attr.ib(type=str) + rtype = attr.ib(type=str) + events = attr.ib(type=Optional[List[EventBase]]) + newly_joined = attr.ib(type=bool) + full_state = attr.ib(type=bool) + since_token = attr.ib(type=Optional[StreamToken]) + upto_token = attr.ib(type=StreamToken) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 624f05ab5b..81aa58dc8c 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -149,7 +149,7 @@ class UserDirectoryHandler(StateDeltasHandler): self.pos, room_max_stream_ordering ) - logger.info("Handling %d state deltas", len(deltas)) + logger.debug("Handling %d state deltas", len(deltas)) yield self._handle_deltas(deltas) self.pos = max_pos @@ -195,7 +195,7 @@ class UserDirectoryHandler(StateDeltasHandler): room_id, self.server_name ) if not is_in_room: - logger.info("Server left room: %r", room_id) + logger.debug("Server left room: %r", room_id) # Fetch all the users that we marked as being in user # directory due to being in the room and then check if # need to remove those users or not diff --git a/synapse/http/site.py b/synapse/http/site.py index 911251c0bc..e092193c9c 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -225,7 +225,7 @@ class SynapseRequest(Request): self.start_time, name=servlet_name, method=self.get_method() ) - self.site.access_logger.info( + self.site.access_logger.debug( "%s - %s - Received request: %s %s", self.getClientIP(), self.site.site_tag, diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index d0879b0490..5bb17d1228 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -398,7 +398,7 @@ class HttpPusher(object): Args: badge (int): number of unread messages """ - logger.info("Sending updated badge count %d to %s", badge, self.name) + logger.debug("Sending updated badge count %d to %s", badge, self.name) d = { "notification": { "id": "", diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py index 69a4ae42f9..2d4fd08cf5 100644 --- a/synapse/replication/slave/storage/groups.py +++ b/synapse/replication/slave/storage/groups.py @@ -13,15 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage import DataStore +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker +from synapse.storage.data_stores.main.group_server import GroupServerWorkerStore from synapse.storage.database import Database from synapse.util.caches.stream_change_cache import StreamChangeCache -from ._base import BaseSlavedStore, __func__ -from ._slaved_id_tracker import SlavedIdTracker - -class SlavedGroupServerStore(BaseSlavedStore): +class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore): def __init__(self, database: Database, db_conn, hs): super(SlavedGroupServerStore, self).__init__(database, db_conn, hs) @@ -35,9 +34,8 @@ class SlavedGroupServerStore(BaseSlavedStore): self._group_updates_id_gen.get_current_token(), ) - get_groups_changes_for_user = __func__(DataStore.get_groups_changes_for_user) - get_group_stream_token = __func__(DataStore.get_group_stream_token) - get_all_groups_for_user = __func__(DataStore.get_all_groups_for_user) + def get_group_stream_token(self): + return self._group_updates_id_gen.get_current_token() def stream_positions(self): result = super(SlavedGroupServerStore, self).stream_positions() diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 3455741195..e75c5f1370 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -105,7 +105,7 @@ class UsersRestServletV2(RestServlet): class UserRestServletV2(RestServlet): - PATTERNS = (re.compile("^/_synapse/admin/v2/users/(?P<user_id>@[^/]+)$"),) + PATTERNS = (re.compile("^/_synapse/admin/v2/users/(?P<user_id>[^/]+)$"),) """Get request to list user details. This needs user to have administrator access in Synapse. @@ -136,6 +136,8 @@ class UserRestServletV2(RestServlet): self.hs = hs self.auth = hs.get_auth() self.admin_handler = hs.get_handlers().admin_handler + self.store = hs.get_datastore() + self.auth_handler = hs.get_auth_handler() self.profile_handler = hs.get_profile_handler() self.set_password_handler = hs.get_set_password_handler() self.deactivate_account_handler = hs.get_deactivate_account_handler() @@ -163,6 +165,7 @@ class UserRestServletV2(RestServlet): raise SynapseError(400, "This endpoint can only be used with local users") user = await self.admin_handler.get_user(target_user) + user_id = target_user.to_string() if user: # modify user if "displayname" in body: @@ -170,6 +173,29 @@ class UserRestServletV2(RestServlet): target_user, requester, body["displayname"], True ) + if "threepids" in body: + # check for required parameters for each threepid + for threepid in body["threepids"]: + assert_params_in_dict(threepid, ["medium", "address"]) + + # remove old threepids from user + threepids = await self.store.user_get_threepids(user_id) + for threepid in threepids: + try: + await self.auth_handler.delete_threepid( + user_id, threepid["medium"], threepid["address"], None + ) + except Exception: + logger.exception("Failed to remove threepids") + raise SynapseError(500, "Failed to remove threepids") + + # add new threepids to user + current_time = self.hs.get_clock().time_msec() + for threepid in body["threepids"]: + await self.auth_handler.add_threepid( + user_id, threepid["medium"], threepid["address"], current_time + ) + if "avatar_url" in body: await self.profile_handler.set_avatar_url( target_user, requester, body["avatar_url"], True @@ -221,6 +247,7 @@ class UserRestServletV2(RestServlet): admin = body.get("admin", None) user_type = body.get("user_type", None) displayname = body.get("displayname", None) + threepids = body.get("threepids", None) if user_type is not None and user_type not in UserTypes.ALL_USER_TYPES: raise SynapseError(400, "Invalid user type") @@ -232,6 +259,18 @@ class UserRestServletV2(RestServlet): default_display_name=displayname, user_type=user_type, ) + + if "threepids" in body: + # check for required parameters for each threepid + for threepid in body["threepids"]: + assert_params_in_dict(threepid, ["medium", "address"]) + + current_time = self.hs.get_clock().time_msec() + for threepid in body["threepids"]: + await self.auth_handler.add_threepid( + user_id, threepid["medium"], threepid["address"], current_time + ) + if "avatar_url" in body: await self.profile_handler.set_avatar_url( user_id, requester, body["avatar_url"], True @@ -568,7 +607,7 @@ class UserAdminServlet(RestServlet): {} """ - PATTERNS = (re.compile("^/_synapse/admin/v1/users/(?P<user_id>@[^/]*)/admin$"),) + PATTERNS = (re.compile("^/_synapse/admin/v1/users/(?P<user_id>[^/]*)/admin$"),) def __init__(self, hs): self.hs = hs diff --git a/synapse/server.py b/synapse/server.py index 7926867b77..fd2f69e928 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -50,7 +50,7 @@ from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.federation.sender import FederationSender from synapse.federation.transport.client import TransportLayerClient from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer -from synapse.groups.groups_server import GroupsServerHandler +from synapse.groups.groups_server import GroupsServerHandler, GroupsServerWorkerHandler from synapse.handlers import Handlers from synapse.handlers.account_validity import AccountValidityHandler from synapse.handlers.acme import AcmeHandler @@ -62,7 +62,7 @@ from synapse.handlers.devicemessage import DeviceMessageHandler from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler from synapse.handlers.events import EventHandler, EventStreamHandler -from synapse.handlers.groups_local import GroupsLocalHandler +from synapse.handlers.groups_local import GroupsLocalHandler, GroupsLocalWorkerHandler from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.message import EventCreationHandler, MessageHandler from synapse.handlers.pagination import PaginationHandler @@ -460,10 +460,16 @@ class HomeServer(object): return UserDirectoryHandler(self) def build_groups_local_handler(self): - return GroupsLocalHandler(self) + if self.config.worker_app: + return GroupsLocalWorkerHandler(self) + else: + return GroupsLocalHandler(self) def build_groups_server_handler(self): - return GroupsServerHandler(self) + if self.config.worker_app: + return GroupsServerWorkerHandler(self) + else: + return GroupsServerHandler(self) def build_groups_attestation_signing(self): return GroupAttestationSigning(self) diff --git a/synapse/server.pyi b/synapse/server.pyi index 90347ac23e..40eabfe5d9 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -107,3 +107,5 @@ class HomeServer(object): self, ) -> synapse.replication.tcp.client.ReplicationClientHandler: pass + def is_mine_id(self, domain_id: str) -> bool: + pass diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py index 6acd45e9f3..0963e6c250 100644 --- a/synapse/storage/data_stores/main/group_server.py +++ b/synapse/storage/data_stores/main/group_server.py @@ -27,21 +27,7 @@ _DEFAULT_CATEGORY_ID = "" _DEFAULT_ROLE_ID = "" -class GroupServerStore(SQLBaseStore): - def set_group_join_policy(self, group_id, join_policy): - """Set the join policy of a group. - - join_policy can be one of: - * "invite" - * "open" - """ - return self.db.simple_update_one( - table="groups", - keyvalues={"group_id": group_id}, - updatevalues={"join_policy": join_policy}, - desc="set_group_join_policy", - ) - +class GroupServerWorkerStore(SQLBaseStore): def get_group(self, group_id): return self.db.simple_select_one( table="groups", @@ -157,6 +143,366 @@ class GroupServerStore(SQLBaseStore): "get_rooms_for_summary", _get_rooms_for_summary_txn ) + @defer.inlineCallbacks + def get_group_categories(self, group_id): + rows = yield self.db.simple_select_list( + table="group_room_categories", + keyvalues={"group_id": group_id}, + retcols=("category_id", "is_public", "profile"), + desc="get_group_categories", + ) + + return { + row["category_id"]: { + "is_public": row["is_public"], + "profile": json.loads(row["profile"]), + } + for row in rows + } + + @defer.inlineCallbacks + def get_group_category(self, group_id, category_id): + category = yield self.db.simple_select_one( + table="group_room_categories", + keyvalues={"group_id": group_id, "category_id": category_id}, + retcols=("is_public", "profile"), + desc="get_group_category", + ) + + category["profile"] = json.loads(category["profile"]) + + return category + + @defer.inlineCallbacks + def get_group_roles(self, group_id): + rows = yield self.db.simple_select_list( + table="group_roles", + keyvalues={"group_id": group_id}, + retcols=("role_id", "is_public", "profile"), + desc="get_group_roles", + ) + + return { + row["role_id"]: { + "is_public": row["is_public"], + "profile": json.loads(row["profile"]), + } + for row in rows + } + + @defer.inlineCallbacks + def get_group_role(self, group_id, role_id): + role = yield self.db.simple_select_one( + table="group_roles", + keyvalues={"group_id": group_id, "role_id": role_id}, + retcols=("is_public", "profile"), + desc="get_group_role", + ) + + role["profile"] = json.loads(role["profile"]) + + return role + + def get_local_groups_for_room(self, room_id): + """Get all of the local group that contain a given room + Args: + room_id (str): The ID of a room + Returns: + Deferred[list[str]]: A twisted.Deferred containing a list of group ids + containing this room + """ + return self.db.simple_select_onecol( + table="group_rooms", + keyvalues={"room_id": room_id}, + retcol="group_id", + desc="get_local_groups_for_room", + ) + + def get_users_for_summary_by_role(self, group_id, include_private=False): + """Get the users and roles that should be included in a summary request + + Returns ([users], [roles]) + """ + + def _get_users_for_summary_txn(txn): + keyvalues = {"group_id": group_id} + if not include_private: + keyvalues["is_public"] = True + + sql = """ + SELECT user_id, is_public, role_id, user_order + FROM group_summary_users + WHERE group_id = ? + """ + + if not include_private: + sql += " AND is_public = ?" + txn.execute(sql, (group_id, True)) + else: + txn.execute(sql, (group_id,)) + + users = [ + { + "user_id": row[0], + "is_public": row[1], + "role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None, + "order": row[3], + } + for row in txn + ] + + sql = """ + SELECT role_id, is_public, profile, role_order + FROM group_summary_roles + INNER JOIN group_roles USING (group_id, role_id) + WHERE group_id = ? + """ + + if not include_private: + sql += " AND is_public = ?" + txn.execute(sql, (group_id, True)) + else: + txn.execute(sql, (group_id,)) + + roles = { + row[0]: { + "is_public": row[1], + "profile": json.loads(row[2]), + "order": row[3], + } + for row in txn + } + + return users, roles + + return self.db.runInteraction( + "get_users_for_summary_by_role", _get_users_for_summary_txn + ) + + def is_user_in_group(self, user_id, group_id): + return self.db.simple_select_one_onecol( + table="group_users", + keyvalues={"group_id": group_id, "user_id": user_id}, + retcol="user_id", + allow_none=True, + desc="is_user_in_group", + ).addCallback(lambda r: bool(r)) + + def is_user_admin_in_group(self, group_id, user_id): + return self.db.simple_select_one_onecol( + table="group_users", + keyvalues={"group_id": group_id, "user_id": user_id}, + retcol="is_admin", + allow_none=True, + desc="is_user_admin_in_group", + ) + + def is_user_invited_to_local_group(self, group_id, user_id): + """Has the group server invited a user? + """ + return self.db.simple_select_one_onecol( + table="group_invites", + keyvalues={"group_id": group_id, "user_id": user_id}, + retcol="user_id", + desc="is_user_invited_to_local_group", + allow_none=True, + ) + + def get_users_membership_info_in_group(self, group_id, user_id): + """Get a dict describing the membership of a user in a group. + + Example if joined: + + { + "membership": "join", + "is_public": True, + "is_privileged": False, + } + + Returns an empty dict if the user is not join/invite/etc + """ + + def _get_users_membership_in_group_txn(txn): + row = self.db.simple_select_one_txn( + txn, + table="group_users", + keyvalues={"group_id": group_id, "user_id": user_id}, + retcols=("is_admin", "is_public"), + allow_none=True, + ) + + if row: + return { + "membership": "join", + "is_public": row["is_public"], + "is_privileged": row["is_admin"], + } + + row = self.db.simple_select_one_onecol_txn( + txn, + table="group_invites", + keyvalues={"group_id": group_id, "user_id": user_id}, + retcol="user_id", + allow_none=True, + ) + + if row: + return {"membership": "invite"} + + return {} + + return self.db.runInteraction( + "get_users_membership_info_in_group", _get_users_membership_in_group_txn + ) + + def get_publicised_groups_for_user(self, user_id): + """Get all groups a user is publicising + """ + return self.db.simple_select_onecol( + table="local_group_membership", + keyvalues={"user_id": user_id, "membership": "join", "is_publicised": True}, + retcol="group_id", + desc="get_publicised_groups_for_user", + ) + + def get_attestations_need_renewals(self, valid_until_ms): + """Get all attestations that need to be renewed until givent time + """ + + def _get_attestations_need_renewals_txn(txn): + sql = """ + SELECT group_id, user_id FROM group_attestations_renewals + WHERE valid_until_ms <= ? + """ + txn.execute(sql, (valid_until_ms,)) + return self.db.cursor_to_dict(txn) + + return self.db.runInteraction( + "get_attestations_need_renewals", _get_attestations_need_renewals_txn + ) + + @defer.inlineCallbacks + def get_remote_attestation(self, group_id, user_id): + """Get the attestation that proves the remote agrees that the user is + in the group. + """ + row = yield self.db.simple_select_one( + table="group_attestations_remote", + keyvalues={"group_id": group_id, "user_id": user_id}, + retcols=("valid_until_ms", "attestation_json"), + desc="get_remote_attestation", + allow_none=True, + ) + + now = int(self._clock.time_msec()) + if row and now < row["valid_until_ms"]: + return json.loads(row["attestation_json"]) + + return None + + def get_joined_groups(self, user_id): + return self.db.simple_select_onecol( + table="local_group_membership", + keyvalues={"user_id": user_id, "membership": "join"}, + retcol="group_id", + desc="get_joined_groups", + ) + + def get_all_groups_for_user(self, user_id, now_token): + def _get_all_groups_for_user_txn(txn): + sql = """ + SELECT group_id, type, membership, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND membership != 'leave' + AND stream_id <= ? + """ + txn.execute(sql, (user_id, now_token)) + return [ + { + "group_id": row[0], + "type": row[1], + "membership": row[2], + "content": json.loads(row[3]), + } + for row in txn + ] + + return self.db.runInteraction( + "get_all_groups_for_user", _get_all_groups_for_user_txn + ) + + def get_groups_changes_for_user(self, user_id, from_token, to_token): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_entity_changed( + user_id, from_token + ) + if not has_changed: + return defer.succeed([]) + + def _get_groups_changes_for_user_txn(txn): + sql = """ + SELECT group_id, membership, type, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (user_id, from_token, to_token)) + return [ + { + "group_id": group_id, + "membership": membership, + "type": gtype, + "content": json.loads(content_json), + } + for group_id, membership, gtype, content_json in txn + ] + + return self.db.runInteraction( + "get_groups_changes_for_user", _get_groups_changes_for_user_txn + ) + + def get_all_groups_changes(self, from_token, to_token, limit): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_any_entity_changed( + from_token + ) + if not has_changed: + return defer.succeed([]) + + def _get_all_groups_changes_txn(txn): + sql = """ + SELECT stream_id, group_id, user_id, type, content + FROM local_group_updates + WHERE ? < stream_id AND stream_id <= ? + LIMIT ? + """ + txn.execute(sql, (from_token, to_token, limit)) + return [ + (stream_id, group_id, user_id, gtype, json.loads(content_json)) + for stream_id, group_id, user_id, gtype, content_json in txn + ] + + return self.db.runInteraction( + "get_all_groups_changes", _get_all_groups_changes_txn + ) + + +class GroupServerStore(GroupServerWorkerStore): + def set_group_join_policy(self, group_id, join_policy): + """Set the join policy of a group. + + join_policy can be one of: + * "invite" + * "open" + """ + return self.db.simple_update_one( + table="groups", + keyvalues={"group_id": group_id}, + updatevalues={"join_policy": join_policy}, + desc="set_group_join_policy", + ) + def add_room_to_summary(self, group_id, room_id, category_id, order, is_public): return self.db.runInteraction( "add_room_to_summary", @@ -299,36 +645,6 @@ class GroupServerStore(SQLBaseStore): desc="remove_room_from_summary", ) - @defer.inlineCallbacks - def get_group_categories(self, group_id): - rows = yield self.db.simple_select_list( - table="group_room_categories", - keyvalues={"group_id": group_id}, - retcols=("category_id", "is_public", "profile"), - desc="get_group_categories", - ) - - return { - row["category_id"]: { - "is_public": row["is_public"], - "profile": json.loads(row["profile"]), - } - for row in rows - } - - @defer.inlineCallbacks - def get_group_category(self, group_id, category_id): - category = yield self.db.simple_select_one( - table="group_room_categories", - keyvalues={"group_id": group_id, "category_id": category_id}, - retcols=("is_public", "profile"), - desc="get_group_category", - ) - - category["profile"] = json.loads(category["profile"]) - - return category - def upsert_group_category(self, group_id, category_id, profile, is_public): """Add/update room category for group """ @@ -360,36 +676,6 @@ class GroupServerStore(SQLBaseStore): desc="remove_group_category", ) - @defer.inlineCallbacks - def get_group_roles(self, group_id): - rows = yield self.db.simple_select_list( - table="group_roles", - keyvalues={"group_id": group_id}, - retcols=("role_id", "is_public", "profile"), - desc="get_group_roles", - ) - - return { - row["role_id"]: { - "is_public": row["is_public"], - "profile": json.loads(row["profile"]), - } - for row in rows - } - - @defer.inlineCallbacks - def get_group_role(self, group_id, role_id): - role = yield self.db.simple_select_one( - table="group_roles", - keyvalues={"group_id": group_id, "role_id": role_id}, - retcols=("is_public", "profile"), - desc="get_group_role", - ) - - role["profile"] = json.loads(role["profile"]) - - return role - def upsert_group_role(self, group_id, role_id, profile, is_public): """Add/remove user role """ @@ -555,100 +841,6 @@ class GroupServerStore(SQLBaseStore): desc="remove_user_from_summary", ) - def get_local_groups_for_room(self, room_id): - """Get all of the local group that contain a given room - Args: - room_id (str): The ID of a room - Returns: - Deferred[list[str]]: A twisted.Deferred containing a list of group ids - containing this room - """ - return self.db.simple_select_onecol( - table="group_rooms", - keyvalues={"room_id": room_id}, - retcol="group_id", - desc="get_local_groups_for_room", - ) - - def get_users_for_summary_by_role(self, group_id, include_private=False): - """Get the users and roles that should be included in a summary request - - Returns ([users], [roles]) - """ - - def _get_users_for_summary_txn(txn): - keyvalues = {"group_id": group_id} - if not include_private: - keyvalues["is_public"] = True - - sql = """ - SELECT user_id, is_public, role_id, user_order - FROM group_summary_users - WHERE group_id = ? - """ - - if not include_private: - sql += " AND is_public = ?" - txn.execute(sql, (group_id, True)) - else: - txn.execute(sql, (group_id,)) - - users = [ - { - "user_id": row[0], - "is_public": row[1], - "role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None, - "order": row[3], - } - for row in txn - ] - - sql = """ - SELECT role_id, is_public, profile, role_order - FROM group_summary_roles - INNER JOIN group_roles USING (group_id, role_id) - WHERE group_id = ? - """ - - if not include_private: - sql += " AND is_public = ?" - txn.execute(sql, (group_id, True)) - else: - txn.execute(sql, (group_id,)) - - roles = { - row[0]: { - "is_public": row[1], - "profile": json.loads(row[2]), - "order": row[3], - } - for row in txn - } - - return users, roles - - return self.db.runInteraction( - "get_users_for_summary_by_role", _get_users_for_summary_txn - ) - - def is_user_in_group(self, user_id, group_id): - return self.db.simple_select_one_onecol( - table="group_users", - keyvalues={"group_id": group_id, "user_id": user_id}, - retcol="user_id", - allow_none=True, - desc="is_user_in_group", - ).addCallback(lambda r: bool(r)) - - def is_user_admin_in_group(self, group_id, user_id): - return self.db.simple_select_one_onecol( - table="group_users", - keyvalues={"group_id": group_id, "user_id": user_id}, - retcol="is_admin", - allow_none=True, - desc="is_user_admin_in_group", - ) - def add_group_invite(self, group_id, user_id): """Record that the group server has invited a user """ @@ -658,64 +850,6 @@ class GroupServerStore(SQLBaseStore): desc="add_group_invite", ) - def is_user_invited_to_local_group(self, group_id, user_id): - """Has the group server invited a user? - """ - return self.db.simple_select_one_onecol( - table="group_invites", - keyvalues={"group_id": group_id, "user_id": user_id}, - retcol="user_id", - desc="is_user_invited_to_local_group", - allow_none=True, - ) - - def get_users_membership_info_in_group(self, group_id, user_id): - """Get a dict describing the membership of a user in a group. - - Example if joined: - - { - "membership": "join", - "is_public": True, - "is_privileged": False, - } - - Returns an empty dict if the user is not join/invite/etc - """ - - def _get_users_membership_in_group_txn(txn): - row = self.db.simple_select_one_txn( - txn, - table="group_users", - keyvalues={"group_id": group_id, "user_id": user_id}, - retcols=("is_admin", "is_public"), - allow_none=True, - ) - - if row: - return { - "membership": "join", - "is_public": row["is_public"], - "is_privileged": row["is_admin"], - } - - row = self.db.simple_select_one_onecol_txn( - txn, - table="group_invites", - keyvalues={"group_id": group_id, "user_id": user_id}, - retcol="user_id", - allow_none=True, - ) - - if row: - return {"membership": "invite"} - - return {} - - return self.db.runInteraction( - "get_users_membership_info_in_group", _get_users_membership_in_group_txn - ) - def add_user_to_group( self, group_id, @@ -846,16 +980,6 @@ class GroupServerStore(SQLBaseStore): "remove_room_from_group", _remove_room_from_group_txn ) - def get_publicised_groups_for_user(self, user_id): - """Get all groups a user is publicising - """ - return self.db.simple_select_onecol( - table="local_group_membership", - keyvalues={"user_id": user_id, "membership": "join", "is_publicised": True}, - retcol="group_id", - desc="get_publicised_groups_for_user", - ) - def update_group_publicity(self, group_id, user_id, publicise): """Update whether the user is publicising their membership of the group """ @@ -1000,22 +1124,6 @@ class GroupServerStore(SQLBaseStore): desc="update_group_profile", ) - def get_attestations_need_renewals(self, valid_until_ms): - """Get all attestations that need to be renewed until givent time - """ - - def _get_attestations_need_renewals_txn(txn): - sql = """ - SELECT group_id, user_id FROM group_attestations_renewals - WHERE valid_until_ms <= ? - """ - txn.execute(sql, (valid_until_ms,)) - return self.db.cursor_to_dict(txn) - - return self.db.runInteraction( - "get_attestations_need_renewals", _get_attestations_need_renewals_txn - ) - def update_attestation_renewal(self, group_id, user_id, attestation): """Update an attestation that we have renewed """ @@ -1054,112 +1162,6 @@ class GroupServerStore(SQLBaseStore): desc="remove_attestation_renewal", ) - @defer.inlineCallbacks - def get_remote_attestation(self, group_id, user_id): - """Get the attestation that proves the remote agrees that the user is - in the group. - """ - row = yield self.db.simple_select_one( - table="group_attestations_remote", - keyvalues={"group_id": group_id, "user_id": user_id}, - retcols=("valid_until_ms", "attestation_json"), - desc="get_remote_attestation", - allow_none=True, - ) - - now = int(self._clock.time_msec()) - if row and now < row["valid_until_ms"]: - return json.loads(row["attestation_json"]) - - return None - - def get_joined_groups(self, user_id): - return self.db.simple_select_onecol( - table="local_group_membership", - keyvalues={"user_id": user_id, "membership": "join"}, - retcol="group_id", - desc="get_joined_groups", - ) - - def get_all_groups_for_user(self, user_id, now_token): - def _get_all_groups_for_user_txn(txn): - sql = """ - SELECT group_id, type, membership, u.content - FROM local_group_updates AS u - INNER JOIN local_group_membership USING (group_id, user_id) - WHERE user_id = ? AND membership != 'leave' - AND stream_id <= ? - """ - txn.execute(sql, (user_id, now_token)) - return [ - { - "group_id": row[0], - "type": row[1], - "membership": row[2], - "content": json.loads(row[3]), - } - for row in txn - ] - - return self.db.runInteraction( - "get_all_groups_for_user", _get_all_groups_for_user_txn - ) - - def get_groups_changes_for_user(self, user_id, from_token, to_token): - from_token = int(from_token) - has_changed = self._group_updates_stream_cache.has_entity_changed( - user_id, from_token - ) - if not has_changed: - return defer.succeed([]) - - def _get_groups_changes_for_user_txn(txn): - sql = """ - SELECT group_id, membership, type, u.content - FROM local_group_updates AS u - INNER JOIN local_group_membership USING (group_id, user_id) - WHERE user_id = ? AND ? < stream_id AND stream_id <= ? - """ - txn.execute(sql, (user_id, from_token, to_token)) - return [ - { - "group_id": group_id, - "membership": membership, - "type": gtype, - "content": json.loads(content_json), - } - for group_id, membership, gtype, content_json in txn - ] - - return self.db.runInteraction( - "get_groups_changes_for_user", _get_groups_changes_for_user_txn - ) - - def get_all_groups_changes(self, from_token, to_token, limit): - from_token = int(from_token) - has_changed = self._group_updates_stream_cache.has_any_entity_changed( - from_token - ) - if not has_changed: - return defer.succeed([]) - - def _get_all_groups_changes_txn(txn): - sql = """ - SELECT stream_id, group_id, user_id, type, content - FROM local_group_updates - WHERE ? < stream_id AND stream_id <= ? - LIMIT ? - """ - txn.execute(sql, (from_token, to_token, limit)) - return [ - (stream_id, group_id, user_id, gtype, json.loads(content_json)) - for stream_id, group_id, user_id, gtype, content_json in txn - ] - - return self.db.runInteraction( - "get_all_groups_changes", _get_all_groups_changes_txn - ) - def get_group_stream_token(self): return self._group_updates_id_gen.get_current_token() diff --git a/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column_2.sql.postgres b/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column_2.sql.postgres new file mode 100644 index 0000000000..c601cff6de --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column_2.sql.postgres @@ -0,0 +1,35 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- when we first added the room_version column, it was populated via a background +-- update. We now need it to be populated before synapse starts, so we populate +-- any remaining rows with a NULL room version now. For servers which have completed +-- the background update, this will be pretty quick. + +-- the following query will set room_version to NULL if no create event is found for +-- the room in current_state_events, and will set it to '1' if a create event with no +-- room_version is found. + +UPDATE rooms SET room_version=( + SELECT COALESCE(json::json->'content'->>'room_version','1') + FROM current_state_events cse INNER JOIN event_json ej USING (event_id) + WHERE cse.room_id=rooms.room_id AND cse.type='m.room.create' AND cse.state_key='' +) WHERE rooms.room_version IS NULL; + +-- we still allow the background update to complete: it has the useful side-effect of +-- populating `rooms` with any missing rooms (based on the current_state_events table). + +-- see also rooms_version_column_2.sql.sqlite which has a copy of the above query, using +-- sqlite syntax for the json extraction. diff --git a/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column_2.sql.sqlite b/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column_2.sql.sqlite new file mode 100644 index 0000000000..335c6f2074 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column_2.sql.sqlite @@ -0,0 +1,22 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- see rooms_version_column_2.sql.postgres for details of what's going on here. + +UPDATE rooms SET room_version=( + SELECT COALESCE(json_extract(ej.json, '$.content.room_version'), '1') + FROM current_state_events cse INNER JOIN event_json ej USING (event_id) + WHERE cse.room_id=rooms.room_id AND cse.type='m.room.create' AND cse.state_key='' +) WHERE rooms.room_version IS NULL; diff --git a/synapse/storage/data_stores/main/stats.py b/synapse/storage/data_stores/main/stats.py index 7af1495e47..380c1ec7da 100644 --- a/synapse/storage/data_stores/main/stats.py +++ b/synapse/storage/data_stores/main/stats.py @@ -271,31 +271,6 @@ class StatsStore(StateDeltasStore): return slice_list - def get_room_stats_state(self, room_id): - """ - Returns the current room_stats_state for a room. - - Args: - room_id (str): The ID of the room to return state for. - - Returns (dict): - Dictionary containing these keys: - "name", "topic", "canonical_alias", "avatar", "join_rules", - "history_visibility" - """ - return self.db.simple_select_one( - "room_stats_state", - {"room_id": room_id}, - retcols=( - "name", - "topic", - "canonical_alias", - "avatar", - "join_rules", - "history_visibility", - ), - ) - @cached() def get_earliest_token_for_stats(self, stats_type, id): """ diff --git a/synapse/storage/data_stores/main/user_directory.py b/synapse/storage/data_stores/main/user_directory.py index 90c180ec6d..6b8130bf0f 100644 --- a/synapse/storage/data_stores/main/user_directory.py +++ b/synapse/storage/data_stores/main/user_directory.py @@ -183,7 +183,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): ) return 1 - logger.info( + logger.debug( "Processing the next %d rooms of %d remaining" % (len(rooms_to_work_on), progress["remaining"]) ) @@ -308,7 +308,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): ) return 1 - logger.info( + logger.debug( "Processing the next %d users of %d remaining" % (len(users_to_work_on), progress["remaining"]) ) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 1003dd84a5..3eeb2f7c04 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -343,7 +343,7 @@ class Database(object): top_three_counters = self._txn_perf_counters.interval(duration, limit=3) - perf_logger.info( + perf_logger.debug( "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters ) diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index af3fd67ab9..a5370ed527 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -390,7 +390,7 @@ class EventsPersistenceStorage(object): state_delta_reuse_delta_counter.inc() break - logger.info("Calculating state delta for room %s", room_id) + logger.debug("Calculating state delta for room %s", room_id) with Measure( self._clock, "persist_events.get_new_state_after_events" ): diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 04b6abdc24..581dffd8a0 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -73,6 +73,10 @@ class ObservableDeferred(object): def errback(f): object.__setattr__(self, "_result", (False, f)) while self._observers: + # This is a little bit of magic to correctly propagate stack + # traces when we `await` on one of the observer deferreds. + f.value.__failure__ = f + try: # TODO: Handle errors here. self._observers.pop().errback(f) diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 82d3eefe0e..b68f9fe0d4 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -144,7 +144,7 @@ class ResponseCache(object): """ result = self.get(key) if not result: - logger.info( + logger.debug( "[%s]: no cached result for [%s], calculating new one", self._name, key ) d = run_in_background(callback, *args, **kwargs) |