From 729b672823132f413800a10f5fa8cac1f9b99008 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Jul 2018 13:53:54 +0100 Subject: Use new helper base class for ReplicationSendEventRestServlet --- synapse/handlers/message.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 39d7724778..bcb093ba3e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -30,7 +30,7 @@ from synapse.api.urls import ConsentURIBuilder from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator -from synapse.replication.http.send_event import send_event_to_master +from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.types import RoomAlias, UserID from synapse.util.async import Linearizer from synapse.util.frozenutils import frozendict_json_encoder @@ -171,7 +171,7 @@ class EventCreationHandler(object): self.notifier = hs.get_notifier() self.config = hs.config - self.http_client = hs.get_simple_http_client() + self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs) # This is only used to get at ratelimit function, and maybe_kick_guest_users self.base_handler = BaseHandler(hs) @@ -559,12 +559,9 @@ class EventCreationHandler(object): try: # If we're a worker we need to hit out to the master. if self.config.worker_app: - yield send_event_to_master( - clock=self.hs.get_clock(), + yield self.send_event_to_master( + event_id=event.event_id, store=self.store, - client=self.http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, requester=requester, event=event, context=context, -- cgit 1.5.1 From 443da003bc46da8d6e46403cfa31ee6a4e4da230 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Jul 2018 14:31:51 +0100 Subject: Use new helper base class for membership requests --- synapse/handlers/room_member_worker.py | 41 +++--- synapse/replication/http/membership.py | 262 ++++++++++++--------------------- 2 files changed, 108 insertions(+), 195 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 22d8b4b0d3..acc6eb8099 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -20,16 +20,24 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.handlers.room_member import RoomMemberHandler from synapse.replication.http.membership import ( - get_or_register_3pid_guest, - notify_user_membership_change, - remote_join, - remote_reject_invite, + ReplicationRegister3PIDGuestRestServlet as Repl3PID, + ReplicationRemoteJoinRestServlet as ReplRemoteJoin, + ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite, + ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft, ) logger = logging.getLogger(__name__) class RoomMemberWorkerHandler(RoomMemberHandler): + def __init__(self, hs): + super(RoomMemberWorkerHandler, self).__init__(hs) + + self._get_register_3pid_client = Repl3PID.make_client(hs) + self._remote_join_client = ReplRemoteJoin.make_client(hs) + self._remote_reject_client = ReplRejectInvite.make_client(hs) + self._notify_change_client = ReplJoinedLeft.make_client(hs) + @defer.inlineCallbacks def _remote_join(self, requester, remote_room_hosts, room_id, user, content): """Implements RoomMemberHandler._remote_join @@ -37,10 +45,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): if len(remote_room_hosts) == 0: raise SynapseError(404, "No known servers") - ret = yield remote_join( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + ret = yield self._remote_join_client( requester=requester, remote_room_hosts=remote_room_hosts, room_id=room_id, @@ -55,10 +60,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): """Implements RoomMemberHandler._remote_reject_invite """ - return remote_reject_invite( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + return self._remote_reject_client( requester=requester, remote_room_hosts=remote_room_hosts, room_id=room_id, @@ -68,10 +70,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): def _user_joined_room(self, target, room_id): """Implements RoomMemberHandler._user_joined_room """ - return notify_user_membership_change( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + return self._notify_change_client( user_id=target.to_string(), room_id=room_id, change="joined", @@ -80,10 +79,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): def _user_left_room(self, target, room_id): """Implements RoomMemberHandler._user_left_room """ - return notify_user_membership_change( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + return self._notify_change_client( user_id=target.to_string(), room_id=room_id, change="left", @@ -92,10 +88,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): """Implements RoomMemberHandler.get_or_register_3pid_guest """ - return get_or_register_3pid_guest( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + return self._get_register_3pid_client( requester=requester, medium=medium, address=address, diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index 6bfc8a5b89..8ad83e8421 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -14,182 +14,53 @@ # limitations under the License. import logging -import re from twisted.internet import defer -from synapse.api.errors import MatrixCodeMessageException, SynapseError -from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint from synapse.types import Requester, UserID from synapse.util.distributor import user_joined_room, user_left_room logger = logging.getLogger(__name__) -@defer.inlineCallbacks -def remote_join(client, host, port, requester, remote_room_hosts, - room_id, user_id, content): - """Ask the master to do a remote join for the given user to the given room - - Args: - client (SimpleHttpClient) - host (str): host of master - port (int): port on master listening for HTTP replication - requester (Requester) - remote_room_hosts (list[str]): Servers to try and join via - room_id (str) - user_id (str) - content (dict): The event content to use for the join event - - Returns: - Deferred - """ - uri = "http://%s:%s/_synapse/replication/remote_join" % (host, port) - - payload = { - "requester": requester.serialize(), - "remote_room_hosts": remote_room_hosts, - "room_id": room_id, - "user_id": user_id, - "content": content, - } - - try: - result = yield client.post_json_get_json(uri, payload) - except MatrixCodeMessageException as e: - # We convert to SynapseError as we know that it was a SynapseError - # on the master process that we should send to the client. (And - # importantly, not stack traces everywhere) - raise SynapseError(e.code, e.msg, e.errcode) - defer.returnValue(result) - - -@defer.inlineCallbacks -def remote_reject_invite(client, host, port, requester, remote_room_hosts, - room_id, user_id): - """Ask master to reject the invite for the user and room. - - Args: - client (SimpleHttpClient) - host (str): host of master - port (int): port on master listening for HTTP replication - requester (Requester) - remote_room_hosts (list[str]): Servers to try and reject via - room_id (str) - user_id (str) - - Returns: - Deferred - """ - uri = "http://%s:%s/_synapse/replication/remote_reject_invite" % (host, port) - - payload = { - "requester": requester.serialize(), - "remote_room_hosts": remote_room_hosts, - "room_id": room_id, - "user_id": user_id, - } - - try: - result = yield client.post_json_get_json(uri, payload) - except MatrixCodeMessageException as e: - # We convert to SynapseError as we know that it was a SynapseError - # on the master process that we should send to the client. (And - # importantly, not stack traces everywhere) - raise SynapseError(e.code, e.msg, e.errcode) - defer.returnValue(result) - - -@defer.inlineCallbacks -def get_or_register_3pid_guest(client, host, port, requester, - medium, address, inviter_user_id): - """Ask the master to get/create a guest account for given 3PID. - - Args: - client (SimpleHttpClient) - host (str): host of master - port (int): port on master listening for HTTP replication - requester (Requester) - medium (str) - address (str) - inviter_user_id (str): The user ID who is trying to invite the - 3PID - - Returns: - Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the - 3PID guest account. - """ - - uri = "http://%s:%s/_synapse/replication/get_or_register_3pid_guest" % (host, port) - - payload = { - "requester": requester.serialize(), - "medium": medium, - "address": address, - "inviter_user_id": inviter_user_id, - } - - try: - result = yield client.post_json_get_json(uri, payload) - except MatrixCodeMessageException as e: - # We convert to SynapseError as we know that it was a SynapseError - # on the master process that we should send to the client. (And - # importantly, not stack traces everywhere) - raise SynapseError(e.code, e.msg, e.errcode) - defer.returnValue(result) - - -@defer.inlineCallbacks -def notify_user_membership_change(client, host, port, user_id, room_id, change): - """Notify master that a user has joined or left the room - - Args: - client (SimpleHttpClient) - host (str): host of master - port (int): port on master listening for HTTP replication. - user_id (str) - room_id (str) - change (str): Either "join" or "left" - - Returns: - Deferred +class ReplicationRemoteJoinRestServlet(ReplicationEndpoint): + """Does a remote join for the given user to the given room """ - assert change in ("joined", "left") - - uri = "http://%s:%s/_synapse/replication/user_%s_room" % (host, port, change) - - payload = { - "user_id": user_id, - "room_id": room_id, - } - - try: - result = yield client.post_json_get_json(uri, payload) - except MatrixCodeMessageException as e: - # We convert to SynapseError as we know that it was a SynapseError - # on the master process that we should send to the client. (And - # importantly, not stack traces everywhere) - raise SynapseError(e.code, e.msg, e.errcode) - defer.returnValue(result) - -class ReplicationRemoteJoinRestServlet(RestServlet): - PATTERNS = [re.compile("^/_synapse/replication/remote_join$")] + NAME = "remote_join" + PATH_ARGS = ("room_id", "user_id",) def __init__(self, hs): - super(ReplicationRemoteJoinRestServlet, self).__init__() + super(ReplicationRemoteJoinRestServlet, self).__init__(hs) self.federation_handler = hs.get_handlers().federation_handler self.store = hs.get_datastore() self.clock = hs.get_clock() + @staticmethod + def _serialize_payload(requester, room_id, user_id, remote_room_hosts, + content): + """ + Args: + requester(Requester) + room_id (str) + user_id (str) + remote_room_hosts (list[str]): Servers to try and join via + content(dict): The event content to use for the join event + """ + return { + "requester": requester.serialize(), + "remote_room_hosts": remote_room_hosts, + "content": content, + } + @defer.inlineCallbacks - def on_POST(self, request): + def _handle_request(self, request, room_id, user_id): content = parse_json_object_from_request(request) remote_room_hosts = content["remote_room_hosts"] - room_id = content["room_id"] - user_id = content["user_id"] event_content = content["content"] requester = Requester.deserialize(self.store, content["requester"]) @@ -212,23 +83,39 @@ class ReplicationRemoteJoinRestServlet(RestServlet): defer.returnValue((200, {})) -class ReplicationRemoteRejectInviteRestServlet(RestServlet): - PATTERNS = [re.compile("^/_synapse/replication/remote_reject_invite$")] +class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): + """Rejects the invite for the user and room. + """ + + NAME = "remote_reject_invite" + PATH_ARGS = ("room_id", "user_id",) def __init__(self, hs): - super(ReplicationRemoteRejectInviteRestServlet, self).__init__() + super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs) self.federation_handler = hs.get_handlers().federation_handler self.store = hs.get_datastore() self.clock = hs.get_clock() + @staticmethod + def _serialize_payload(requester, room_id, user_id, remote_room_hosts): + """ + Args: + requester(Requester) + room_id (str) + user_id (str) + remote_room_hosts (list[str]): Servers to try and reject via + """ + return { + "requester": requester.serialize(), + "remote_room_hosts": remote_room_hosts, + } + @defer.inlineCallbacks - def on_POST(self, request): + def _handle_request(self, request, room_id, user_id): content = parse_json_object_from_request(request) remote_room_hosts = content["remote_room_hosts"] - room_id = content["room_id"] - user_id = content["user_id"] requester = Requester.deserialize(self.store, content["requester"]) @@ -264,18 +151,39 @@ class ReplicationRemoteRejectInviteRestServlet(RestServlet): defer.returnValue((200, ret)) -class ReplicationRegister3PIDGuestRestServlet(RestServlet): - PATTERNS = [re.compile("^/_synapse/replication/get_or_register_3pid_guest$")] +class ReplicationRegister3PIDGuestRestServlet(ReplicationEndpoint): + """Gets/creates a guest account for given 3PID. + """ + + NAME = "get_or_register_3pid_guest" + PATH_ARGS = () def __init__(self, hs): - super(ReplicationRegister3PIDGuestRestServlet, self).__init__() + super(ReplicationRegister3PIDGuestRestServlet, self).__init__(hs) self.registeration_handler = hs.get_handlers().registration_handler self.store = hs.get_datastore() self.clock = hs.get_clock() + @staticmethod + def _serialize_payload(requester, medium, address, inviter_user_id): + """ + Args: + requester(Requester) + medium (str) + address (str) + inviter_user_id (str): The user ID who is trying to invite the + 3PID + """ + return { + "requester": requester.serialize(), + "medium": medium, + "address": address, + "inviter_user_id": inviter_user_id, + } + @defer.inlineCallbacks - def on_POST(self, request): + def _handle_request(self, request): content = parse_json_object_from_request(request) medium = content["medium"] @@ -296,23 +204,35 @@ class ReplicationRegister3PIDGuestRestServlet(RestServlet): defer.returnValue((200, ret)) -class ReplicationUserJoinedLeftRoomRestServlet(RestServlet): - PATTERNS = [re.compile("^/_synapse/replication/user_(?Pjoined|left)_room$")] +class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): + """Notifies that a user has joined or left the room + """ + + NAME = "membership_change" + PATH_ARGS = ("room_id", "user_id", "change") + CACHE = False # No point caching as should return instantly. def __init__(self, hs): - super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__() + super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__(hs) self.registeration_handler = hs.get_handlers().registration_handler self.store = hs.get_datastore() self.clock = hs.get_clock() self.distributor = hs.get_distributor() - def on_POST(self, request, change): - content = parse_json_object_from_request(request) + @staticmethod + def _serialize_payload(room_id, user_id, change): + """ + Args: + room_id (str) + user_id (str) + change (str): Either "joined" or "left" + """ + assert change in ("joined", "left",) - user_id = content["user_id"] - room_id = content["room_id"] + return {} + def _handle_request(self, request, room_id, user_id, change): logger.info("user membership change: %s in %s", user_id, room_id) user = UserID.from_string(user_id) -- cgit 1.5.1 From 0ca459ea334ff86016bda241c0c823178789c215 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 25 Jul 2018 22:10:39 +0100 Subject: Basic support for room versioning This is the first tranche of support for room versioning. It includes: * setting the default room version in the config file * new room_version param on the createRoom API * storing the version of newly-created rooms in the m.room.create event * fishing the version of existing rooms out of the m.room.create event --- synapse/api/constants.py | 6 ++++++ synapse/api/errors.py | 2 ++ synapse/config/server.py | 14 ++++++++++++ synapse/handlers/room.py | 27 ++++++++++++++++++++++- synapse/replication/slave/storage/events.py | 2 +- synapse/storage/state.py | 33 ++++++++++++++++++++++++++--- tests/utils.py | 4 ++++ 7 files changed, 83 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 4df930c8d1..a27bf3b32d 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2017 Vector Creations Ltd +# Copyright 2018 New Vector Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -94,3 +95,8 @@ class RoomCreationPreset(object): class ThirdPartyEntityKind(object): USER = "user" LOCATION = "location" + + +# vdh-test-version is a placeholder to get room versioning support working and tested +# until we have a working v2. +KNOWN_ROOM_VERSIONS = {"1", "vdh-test-version"} diff --git a/synapse/api/errors.py b/synapse/api/errors.py index b41d595059..477ca07a24 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -56,6 +57,7 @@ class Codes(object): CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN" CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM" MAU_LIMIT_EXCEEDED = "M_MAU_LIMIT_EXCEEDED" + UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION" class CodeMessageException(RuntimeError): diff --git a/synapse/config/server.py b/synapse/config/server.py index 6a471a0a5e..68ef2789d3 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -16,6 +16,7 @@ import logging +from synapse.api.constants import KNOWN_ROOM_VERSIONS from synapse.http.endpoint import parse_and_validate_server_name from ._base import Config, ConfigError @@ -75,6 +76,16 @@ class ServerConfig(Config): ) else: self.max_mau_value = 0 + + # the version of rooms created by default on this server + self.default_room_version = str(config.get( + "default_room_version", "1", + )) + if self.default_room_version not in KNOWN_ROOM_VERSIONS: + raise ConfigError("Unrecognised value '%s' for default_room_version" % ( + self.default_room_version, + )) + # FIXME: federation_domain_whitelist needs sytests self.federation_domain_whitelist = None federation_domain_whitelist = config.get( @@ -249,6 +260,9 @@ class ServerConfig(Config): # (except those sent by local server admins). The default is False. # block_non_admin_invites: True + # The room_version of rooms which are created by default by this server. + # default_room_version: 1 + # Restrict federation to the following whitelist of domains. # N.B. we recommend also firewalling your federation listener to limit # inbound federation traffic as early as possible, rather than relying diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7b7804d9b2..a526b684e9 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -21,9 +21,16 @@ import math import string from collections import OrderedDict +from six import string_types + from twisted.internet import defer -from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset +from synapse.api.constants import ( + KNOWN_ROOM_VERSIONS, + EventTypes, + JoinRules, + RoomCreationPreset, +) from synapse.api.errors import AuthError, Codes, StoreError, SynapseError from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID from synapse.util import stringutils @@ -99,6 +106,21 @@ class RoomCreationHandler(BaseHandler): if ratelimit: yield self.ratelimit(requester) + room_version = config.get("room_version", self.hs.config.default_room_version) + if not isinstance(room_version, string_types): + raise SynapseError( + 400, + "room_version must be a string", + Codes.BAD_JSON, + ) + + if room_version not in KNOWN_ROOM_VERSIONS: + raise SynapseError( + 400, + "Your homeserver does not support this room version", + Codes.UNSUPPORTED_ROOM_VERSION, + ) + if "room_alias_name" in config: for wchar in string.whitespace: if wchar in config["room_alias_name"]: @@ -184,6 +206,9 @@ class RoomCreationHandler(BaseHandler): creation_content = config.get("creation_content", {}) + # override any attempt to set room versions via the creation_content + creation_content["room_version"] = room_version + room_member_handler = self.hs.get_room_member_handler() yield self._send_events_for_new_room( diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index bdb5eee4af..4830c68f35 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -44,8 +44,8 @@ class SlavedEventStore(EventFederationWorkerStore, RoomMemberWorkerStore, EventPushActionsWorkerStore, StreamWorkerStore, - EventsWorkerStore, StateGroupWorkerStore, + EventsWorkerStore, SignatureWorkerStore, UserErasureWorkerStore, BaseSlavedStore): diff --git a/synapse/storage/state.py b/synapse/storage/state.py index b27b3ae144..17b14d464b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -21,15 +21,17 @@ from six.moves import range from twisted.internet import defer +from synapse.api.constants import EventTypes +from synapse.api.errors import NotFoundError +from synapse.storage._base import SQLBaseStore from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.engines import PostgresEngine +from synapse.storage.events_worker import EventsWorkerStore from synapse.util.caches import get_cache_factor_for, intern_string from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.stringutils import to_ascii -from ._base import SQLBaseStore - logger = logging.getLogger(__name__) @@ -46,7 +48,8 @@ class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delt return len(self.delta_ids) if self.delta_ids else 0 -class StateGroupWorkerStore(SQLBaseStore): +# this inherits from EventsWorkerStore because it calls self.get_events +class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): """The parts of StateGroupStore that can be called from workers. """ @@ -61,6 +64,30 @@ class StateGroupWorkerStore(SQLBaseStore): "*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache") ) + @defer.inlineCallbacks + def get_room_version(self, room_id): + """Get the room_version of a given room + + Args: + room_id (str) + + Returns: + Deferred[str] + + Raises: + NotFoundError if the room is unknown + """ + # for now we do this by looking at the create event. We may want to cache this + # more intelligently in future. + state_ids = yield self.get_current_state_ids(room_id) + create_id = state_ids.get((EventTypes.Create, "")) + + if not create_id: + raise NotFoundError("Unknown room") + + create_event = yield self.get_event(create_id) + defer.returnValue(create_event.content.get("room_version", "1")) + @cached(max_entries=100000, iterable=True) def get_current_state_ids(self, room_id): """Get the current state event ids for a room based on the diff --git a/tests/utils.py b/tests/utils.py index 9bff3ff3b9..9e188a8ed4 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -74,6 +74,10 @@ def setup_test_homeserver(name="test", datastore=None, config=None, reactor=None config.media_storage_providers = [] config.auto_join_rooms = [] + # we need a sane default_room_version, otherwise attempts to create rooms will + # fail. + config.default_room_version = "1" + # disable user directory updates, because they get done in the # background, which upsets the test runner. config.update_user_directory = False -- cgit 1.5.1 From f900d508244b4277065d34dd9a05224fd60d5221 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 6 Aug 2018 13:45:37 +0100 Subject: include known room versions in outgoing make_joins --- synapse/federation/federation_client.py | 8 +++++--- synapse/federation/transport/client.py | 5 ++++- synapse/handlers/federation.py | 13 +++++++++++-- synapse/http/matrixfederationclient.py | 7 +++++-- 4 files changed, 25 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index de4b813a15..7ec1d7a889 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -521,7 +521,7 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to %s via any server", description) def make_membership_event(self, destinations, room_id, user_id, membership, - content={},): + content, params): """ Creates an m.room.member event, with context, without participating in the room. @@ -537,8 +537,10 @@ class FederationClient(FederationBase): user_id (str): The user whose membership is being evented. membership (str): The "membership" property of the event. Must be one of "join" or "leave". - content (object): Any additional data to put into the content field + content (dict): Any additional data to put into the content field of the event. + params (dict[str, str|Iterable[str]]): Query parameters to include in the + request. Return: Deferred: resolves to a tuple of (origin (str), event (object)) where origin is the remote homeserver which generated the event. @@ -558,7 +560,7 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def send_request(destination): ret = yield self.transport_layer.make_membership_event( - destination, room_id, user_id, membership + destination, room_id, user_id, membership, params, ) pdu_dict = ret.get("event", None) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 4529d454af..b4fbe2c9d5 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -195,7 +195,7 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def make_membership_event(self, destination, room_id, user_id, membership): + def make_membership_event(self, destination, room_id, user_id, membership, params): """Asks a remote server to build and sign us a membership event Note that this does not append any events to any graphs. @@ -205,6 +205,8 @@ class TransportLayerClient(object): room_id (str): room to join/leave user_id (str): user to be joined/left membership (str): one of join/leave + params (dict[str, str|Iterable[str]]): Query parameters to include in the + request. Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result @@ -241,6 +243,7 @@ class TransportLayerClient(object): content = yield self.client.get_json( destination=destination, path=path, + args=params, retry_on_dns_fail=retry_on_dns_fail, timeout=20000, ignore_backoff=ignore_backoff, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 533b82c783..0dffd44e22 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -30,7 +30,12 @@ from unpaddedbase64 import decode_base64 from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership, RejectedReason +from synapse.api.constants import ( + KNOWN_ROOM_VERSIONS, + EventTypes, + Membership, + RejectedReason, +) from synapse.api.errors import ( AuthError, CodeMessageException, @@ -922,6 +927,9 @@ class FederationHandler(BaseHandler): joinee, "join", content, + params={ + "ver": KNOWN_ROOM_VERSIONS, + }, ) # This shouldn't happen, because the RoomMemberHandler has a @@ -1187,13 +1195,14 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, - content={},): + content={}, params=None): origin, pdu = yield self.federation_client.make_membership_event( target_hosts, room_id, user_id, membership, content, + params=params, ) logger.debug("Got response to make_%s: %s", membership, pdu) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index bf1aa29502..b3f5415aa6 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -439,7 +439,7 @@ class MatrixFederationHttpClient(object): defer.returnValue(json.loads(body)) @defer.inlineCallbacks - def get_json(self, destination, path, args={}, retry_on_dns_fail=True, + def get_json(self, destination, path, args=None, retry_on_dns_fail=True, timeout=None, ignore_backoff=False): """ GETs some json from the given host homeserver and path @@ -447,7 +447,7 @@ class MatrixFederationHttpClient(object): destination (str): The remote server to send the HTTP request to. path (str): The HTTP path. - args (dict): A dictionary used to create query strings, defaults to + args (dict|None): A dictionary used to create query strings, defaults to None. timeout (int): How long to try (in ms) the destination for before giving up. None indicates no timeout and that the request will @@ -702,6 +702,9 @@ def check_content_type_is_json(headers): def encode_query_args(args): + if args is None: + return b"" + encoded_args = {} for k, vs in args.items(): if isinstance(vs, string_types): -- cgit 1.5.1 From 3523f5432a79659ac365dc2ff1212aa1a3707d8e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 7 Aug 2018 12:51:57 +0100 Subject: Don't expose default_room_version as config opt --- synapse/api/constants.py | 3 +++ synapse/config/server.py | 14 -------------- synapse/handlers/room.py | 3 ++- 3 files changed, 5 insertions(+), 15 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/constants.py b/synapse/api/constants.py index a27bf3b32d..b0da506f6d 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -97,6 +97,9 @@ class ThirdPartyEntityKind(object): LOCATION = "location" +# the version we will give rooms which are created on this server +DEFAULT_ROOM_VERSION = "1" + # vdh-test-version is a placeholder to get room versioning support working and tested # until we have a working v2. KNOWN_ROOM_VERSIONS = {"1", "vdh-test-version"} diff --git a/synapse/config/server.py b/synapse/config/server.py index 68ef2789d3..6a471a0a5e 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -16,7 +16,6 @@ import logging -from synapse.api.constants import KNOWN_ROOM_VERSIONS from synapse.http.endpoint import parse_and_validate_server_name from ._base import Config, ConfigError @@ -76,16 +75,6 @@ class ServerConfig(Config): ) else: self.max_mau_value = 0 - - # the version of rooms created by default on this server - self.default_room_version = str(config.get( - "default_room_version", "1", - )) - if self.default_room_version not in KNOWN_ROOM_VERSIONS: - raise ConfigError("Unrecognised value '%s' for default_room_version" % ( - self.default_room_version, - )) - # FIXME: federation_domain_whitelist needs sytests self.federation_domain_whitelist = None federation_domain_whitelist = config.get( @@ -260,9 +249,6 @@ class ServerConfig(Config): # (except those sent by local server admins). The default is False. # block_non_admin_invites: True - # The room_version of rooms which are created by default by this server. - # default_room_version: 1 - # Restrict federation to the following whitelist of domains. # N.B. we recommend also firewalling your federation listener to limit # inbound federation traffic as early as possible, rather than relying diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a526b684e9..6a17c42238 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -26,6 +26,7 @@ from six import string_types from twisted.internet import defer from synapse.api.constants import ( + DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS, EventTypes, JoinRules, @@ -106,7 +107,7 @@ class RoomCreationHandler(BaseHandler): if ratelimit: yield self.ratelimit(requester) - room_version = config.get("room_version", self.hs.config.default_room_version) + room_version = config.get("room_version", DEFAULT_ROOM_VERSION) if not isinstance(room_version, string_types): raise SynapseError( 400, -- cgit 1.5.1 From 53bca4690b5c94fb1506dbc628ce2ef0f770745f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 7 Aug 2018 19:09:48 +0100 Subject: more metrics for the federation and appservice senders --- synapse/federation/transaction_queue.py | 10 +++++++++- synapse/handlers/appservice.py | 10 ++++++++++ synapse/metrics/__init__.py | 13 +++++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 78f9d40a3a..f603c8a368 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,6 +26,8 @@ from synapse.api.errors import FederationDeniedError, HttpResponseException from synapse.handlers.presence import format_user_presence_state, get_interested_remotes from synapse.metrics import ( LaterGauge, + event_processing_loop_counter, + event_processing_loop_room_count, events_processed_counter, sent_edus_counter, sent_transactions_counter, @@ -253,7 +255,13 @@ class TransactionQueue(object): synapse.metrics.event_processing_last_ts.labels( "federation_sender").set(ts) - events_processed_counter.inc(len(events)) + events_processed_counter.inc(len(events)) + + event_processing_loop_room_count.labels( + "federation_sender" + ).inc(len(events_by_room)) + + event_processing_loop_counter.labels("federation_sender").inc() synapse.metrics.event_processing_positions.labels( "federation_sender").set(next_token) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index ee41aed69e..f0f89af7dc 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -23,6 +23,10 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes +from synapse.metrics import ( + event_processing_loop_counter, + event_processing_loop_room_count, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.metrics import Measure @@ -136,6 +140,12 @@ class ApplicationServicesHandler(object): events_processed_counter.inc(len(events)) + event_processing_loop_room_count.labels( + "appservice_sender" + ).inc(len(events_by_room)) + + event_processing_loop_counter.labels("appservice_sender").inc() + synapse.metrics.event_processing_lag.labels( "appservice_sender").set(now - ts) synapse.metrics.event_processing_last_ts.labels( diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a9158fc066..b7cb3a730a 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -174,6 +174,19 @@ sent_transactions_counter = Counter("synapse_federation_client_sent_transactions events_processed_counter = Counter("synapse_federation_client_events_processed", "") +event_processing_loop_counter = Counter( + "synapse_event_processing_loop", + "Event processing loop iterations", + ["name"], +) + +event_processing_loop_room_count = Counter( + "synapse_event_processing_loop_room_count", + "Rooms seen per event processing loop iteration", + ["name"], +) + + # Used to track where various components have processed in the event stream, # e.g. federation sending, appservice sending, etc. event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"]) -- cgit 1.5.1