diff options
Diffstat (limited to 'synapse/replication/http')
-rw-r--r-- | synapse/replication/http/__init__.py | 23 | ||||
-rw-r--r-- | synapse/replication/http/_base.py | 57 | ||||
-rw-r--r-- | synapse/replication/http/devices.py | 73 | ||||
-rw-r--r-- | synapse/replication/http/federation.py | 85 | ||||
-rw-r--r-- | synapse/replication/http/login.py | 7 | ||||
-rw-r--r-- | synapse/replication/http/membership.py | 69 | ||||
-rw-r--r-- | synapse/replication/http/presence.py | 116 | ||||
-rw-r--r-- | synapse/replication/http/register.py | 14 | ||||
-rw-r--r-- | synapse/replication/http/send_event.py | 24 | ||||
-rw-r--r-- | synapse/replication/http/streams.py | 79 |
10 files changed, 470 insertions, 77 deletions
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 81b85352b1..19b69e0e11 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -14,7 +14,16 @@ # limitations under the License. from synapse.http.server import JsonResource -from synapse.replication.http import federation, login, membership, register, send_event +from synapse.replication.http import ( + devices, + federation, + login, + membership, + presence, + register, + send_event, + streams, +) REPLICATION_PREFIX = "/_synapse/replication" @@ -26,7 +35,13 @@ class ReplicationRestResource(JsonResource): def register_servlets(self, hs): send_event.register_servlets(hs, self) - membership.register_servlets(hs, self) federation.register_servlets(hs, self) - login.register_servlets(hs, self) - register.register_servlets(hs, self) + presence.register_servlets(hs, self) + membership.register_servlets(hs, self) + + # The following can't currently be instantiated on workers. + if hs.config.worker.worker_app is None: + login.register_servlets(hs, self) + register.register_servlets(hs, self) + devices.register_servlets(hs, self) + streams.register_servlets(hs, self) diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 03560c1f0e..793cef6c26 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -16,6 +16,8 @@ import abc import logging import re +from inspect import signature +from typing import Dict, List, Tuple from six import raise_from from six.moves import urllib @@ -43,7 +45,7 @@ class ReplicationEndpoint(object): """Helper base class for defining new replication HTTP endpoints. This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..` - (with an `/:txn_id` prefix for cached requests.), where NAME is a name, + (with a `/:txn_id` suffix for cached requests), where NAME is a name, PATH_ARGS are a tuple of parameters to be encoded in the URL. For example, if `NAME` is "send_event" and `PATH_ARGS` is `("event_id",)`, @@ -59,6 +61,8 @@ class ReplicationEndpoint(object): must call `register` to register the path with the HTTP server. Requests can be sent by calling the client returned by `make_client`. + Requests are sent to master process by default, but can be sent to other + named processes by specifying an `instance_name` keyword argument. Attributes: NAME (str): A name for the endpoint, added to the path as well as used @@ -78,9 +82,8 @@ class ReplicationEndpoint(object): __metaclass__ = abc.ABCMeta - NAME = abc.abstractproperty() - PATH_ARGS = abc.abstractproperty() - + NAME = abc.abstractproperty() # type: str # type: ignore + PATH_ARGS = abc.abstractproperty() # type: Tuple[str, ...] # type: ignore METHOD = "POST" CACHE = True RETRY_ON_TIMEOUT = True @@ -91,6 +94,16 @@ class ReplicationEndpoint(object): hs, "repl." + self.NAME, timeout_ms=30 * 60 * 1000 ) + # We reserve `instance_name` as a parameter to sending requests, so we + # assert here that sub classes don't try and use the name. + assert ( + "instance_name" not in self.PATH_ARGS + ), "`instance_name` is a reserved paramater name" + assert ( + "instance_name" + not in signature(self.__class__._serialize_payload).parameters + ), "`instance_name` is a reserved paramater name" + assert self.METHOD in ("PUT", "POST", "GET") @abc.abstractmethod @@ -110,14 +123,14 @@ class ReplicationEndpoint(object): return {} @abc.abstractmethod - def _handle_request(self, request, **kwargs): + async def _handle_request(self, request, **kwargs): """Handle incoming request. This is called with the request object and PATH_ARGS. Returns: - Deferred[dict]: A JSON serialisable dict to be used as response - body of request. + tuple[int, dict]: HTTP status code and a JSON serialisable dict + to be used as response body of request. """ pass @@ -128,14 +141,30 @@ class ReplicationEndpoint(object): Returns a callable that accepts the same parameters as `_serialize_payload`. """ clock = hs.get_clock() - host = hs.config.worker_replication_host - port = hs.config.worker_replication_http_port - client = hs.get_simple_http_client() + local_instance_name = hs.get_instance_name() + + master_host = hs.config.worker_replication_host + master_port = hs.config.worker_replication_http_port + + instance_map = hs.config.worker.instance_map @trace(opname="outgoing_replication_request") @defer.inlineCallbacks - def send_request(**kwargs): + def send_request(instance_name="master", **kwargs): + if instance_name == local_instance_name: + raise Exception("Trying to send HTTP request to self") + if instance_name == "master": + host = master_host + port = master_port + elif instance_name in instance_map: + host = instance_map[instance_name].host + port = instance_map[instance_name].port + else: + raise Exception( + "Instance %r not in 'instance_map' config" % (instance_name,) + ) + data = yield cls._serialize_payload(**kwargs) url_args = [ @@ -171,7 +200,7 @@ class ReplicationEndpoint(object): # have a good idea that the request has either succeeded or failed on # the master, and so whether we should clean up or not. while True: - headers = {} + headers = {} # type: Dict[bytes, List[bytes]] inject_active_span_byte_dict(headers, None, check_destination=False) try: result = yield request_func(uri, data, headers=headers) @@ -180,7 +209,7 @@ class ReplicationEndpoint(object): if e.code != 504 or not cls.RETRY_ON_TIMEOUT: raise - logger.warn("%s request timed out", cls.NAME) + logger.warning("%s request timed out", cls.NAME) # If we timed out we probably don't need to worry about backing # off too much, but lets just wait a little anyway. @@ -207,7 +236,7 @@ class ReplicationEndpoint(object): method = self.METHOD if self.CACHE: - handler = self._cached_handler + handler = self._cached_handler # type: ignore url_args.append("txn_id") args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args) diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py new file mode 100644 index 0000000000..e32aac0a25 --- /dev/null +++ b/synapse/replication/http/devices.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from synapse.replication.http._base import ReplicationEndpoint + +logger = logging.getLogger(__name__) + + +class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint): + """Ask master to resync the device list for a user by contacting their + server. + + This must happen on master so that the results can be correctly cached in + the database and streamed to workers. + + Request format: + + POST /_synapse/replication/user_device_resync/:user_id + + {} + + Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id` + response, e.g.: + + { + "user_id": "@alice:example.org", + "devices": [ + { + "device_id": "JLAFKJWSCS", + "keys": { ... }, + "device_display_name": "Alice's Mobile Phone" + } + ] + } + """ + + NAME = "user_device_resync" + PATH_ARGS = ("user_id",) + CACHE = False + + def __init__(self, hs): + super(ReplicationUserDevicesResyncRestServlet, self).__init__(hs) + + self.device_list_updater = hs.get_device_handler().device_list_updater + self.store = hs.get_datastore() + self.clock = hs.get_clock() + + @staticmethod + def _serialize_payload(user_id): + return {} + + async def _handle_request(self, request, user_id): + user_devices = await self.device_list_updater.user_device_resync(user_id) + + return 200, user_devices + + +def register_servlets(hs, http_server): + ReplicationUserDevicesResyncRestServlet(hs).register(http_server) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 2f16955954..c287c4e269 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -17,7 +17,8 @@ import logging from twisted.internet import defer -from synapse.events import event_type_from_format_version +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.events import make_event_from_dict from synapse.events.snapshot import EventContext from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint @@ -28,7 +29,7 @@ logger = logging.getLogger(__name__) class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): """Handles events newly received from federation, including persisting and - notifying. + notifying. Returns the maximum stream ID of the persisted events. The API looks like: @@ -37,11 +38,21 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): { "events": [{ "event": { .. serialized event .. }, + "room_version": .., // "1", "2", "3", etc: the version of the room + // containing the event + "event_format_version": .., // 1,2,3 etc: the event format version "internal_metadata": { .. serialized internal_metadata .. }, "rejected_reason": .., // The event.rejected_reason field "context": { .. serialized event context .. }, }], "backfilled": false + } + + 200 OK + + { + "max_stream_id": 32443, + } """ NAME = "fed_send_events" @@ -51,6 +62,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): super(ReplicationFederationSendEventsRestServlet, self).__init__(hs) self.store = hs.get_datastore() + self.storage = hs.get_storage() self.clock = hs.get_clock() self.federation_handler = hs.get_handlers().federation_handler @@ -71,6 +83,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): event_payloads.append( { "event": event.get_pdu_json(), + "room_version": event.room_version.identifier, "event_format_version": event.format_version, "internal_metadata": event.internal_metadata.get_dict(), "rejected_reason": event.rejected_reason, @@ -82,8 +95,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): return payload - @defer.inlineCallbacks - def _handle_request(self, request): + async def _handle_request(self, request): with Measure(self.clock, "repl_fed_send_events_parse"): content = parse_json_object_from_request(request) @@ -94,26 +106,27 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): event_and_contexts = [] for event_payload in event_payloads: event_dict = event_payload["event"] - format_ver = event_payload["event_format_version"] + room_ver = KNOWN_ROOM_VERSIONS[event_payload["room_version"]] internal_metadata = event_payload["internal_metadata"] rejected_reason = event_payload["rejected_reason"] - EventType = event_type_from_format_version(format_ver) - event = EventType(event_dict, internal_metadata, rejected_reason) + event = make_event_from_dict( + event_dict, room_ver, internal_metadata, rejected_reason + ) - context = yield EventContext.deserialize( - self.store, event_payload["context"] + context = EventContext.deserialize( + self.storage, event_payload["context"] ) event_and_contexts.append((event, context)) logger.info("Got %d events from federation", len(event_and_contexts)) - yield self.federation_handler.persist_events_and_notify( + max_stream_id = await self.federation_handler.persist_events_and_notify( event_and_contexts, backfilled ) - return 200, {} + return 200, {"max_stream_id": max_stream_id} class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): @@ -144,8 +157,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): def _serialize_payload(edu_type, origin, content): return {"origin": origin, "content": content} - @defer.inlineCallbacks - def _handle_request(self, request, edu_type): + async def _handle_request(self, request, edu_type): with Measure(self.clock, "repl_fed_send_edu_parse"): content = parse_json_object_from_request(request) @@ -154,7 +166,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): logger.info("Got %r edu from %s", edu_type, origin) - result = yield self.registry.on_edu(edu_type, origin, edu_content) + result = await self.registry.on_edu(edu_type, origin, edu_content) return 200, result @@ -193,8 +205,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint): """ return {"args": args} - @defer.inlineCallbacks - def _handle_request(self, request, query_type): + async def _handle_request(self, request, query_type): with Measure(self.clock, "repl_fed_query_parse"): content = parse_json_object_from_request(request) @@ -202,7 +213,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint): logger.info("Got %r query", query_type) - result = yield self.registry.on_query(query_type, args) + result = await self.registry.on_query(query_type, args) return 200, result @@ -213,7 +224,7 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint): Request format: - POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id + POST /_synapse/replication/fed_cleanup_room/:room_id/:txn_id {} """ @@ -234,10 +245,41 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint): """ return {} - @defer.inlineCallbacks - def _handle_request(self, request, room_id): - yield self.store.clean_room_for_join(room_id) + async def _handle_request(self, request, room_id): + await self.store.clean_room_for_join(room_id) + + return 200, {} + + +class ReplicationStoreRoomOnInviteRestServlet(ReplicationEndpoint): + """Called to clean up any data in DB for a given room, ready for the + server to join the room. + + Request format: + + POST /_synapse/replication/store_room_on_invite/:room_id/:txn_id + + { + "room_version": "1", + } + """ + + NAME = "store_room_on_invite" + PATH_ARGS = ("room_id",) + + def __init__(self, hs): + super().__init__(hs) + + self.store = hs.get_datastore() + + @staticmethod + def _serialize_payload(room_id, room_version): + return {"room_version": room_version.identifier} + async def _handle_request(self, request, room_id): + content = parse_json_object_from_request(request) + room_version = KNOWN_ROOM_VERSIONS[content["room_version"]] + await self.store.maybe_store_room_on_invite(room_id, room_version) return 200, {} @@ -246,3 +288,4 @@ def register_servlets(hs, http_server): ReplicationFederationSendEduRestServlet(hs).register(http_server) ReplicationGetQueryRestServlet(hs).register(http_server) ReplicationCleanRoomRestServlet(hs).register(http_server) + ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server) diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py index 786f5232b2..798b9d3af5 100644 --- a/synapse/replication/http/login.py +++ b/synapse/replication/http/login.py @@ -15,8 +15,6 @@ import logging -from twisted.internet import defer - from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint @@ -52,15 +50,14 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint): "is_guest": is_guest, } - @defer.inlineCallbacks - def _handle_request(self, request, user_id): + async def _handle_request(self, request, user_id): content = parse_json_object_from_request(request) device_id = content["device_id"] initial_display_name = content["initial_display_name"] is_guest = content["is_guest"] - device_id, access_token = yield self.registration_handler.register_device( + device_id, access_token = await self.registration_handler.register_device( user_id, device_id, initial_display_name, is_guest ) diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index b9ce3477ad..a7174c4a8f 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -14,14 +14,16 @@ # limitations under the License. import logging - -from twisted.internet import defer +from typing import TYPE_CHECKING from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint from synapse.types import Requester, UserID from synapse.util.distributor import user_joined_room, user_left_room +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -65,8 +67,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint): "content": content, } - @defer.inlineCallbacks - def _handle_request(self, request, room_id, user_id): + async def _handle_request(self, request, room_id, user_id): content = parse_json_object_from_request(request) remote_room_hosts = content["remote_room_hosts"] @@ -79,11 +80,11 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint): logger.info("remote_join: %s into room: %s", user_id, room_id) - yield self.federation_handler.do_invite_join( + event_id, stream_id = await self.federation_handler.do_invite_join( remote_room_hosts, room_id, user_id, event_content ) - return 200, {} + return 200, {"event_id": event_id, "stream_id": stream_id} class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): @@ -96,6 +97,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): { "requester": ..., "remote_room_hosts": [...], + "content": { ... } } """ @@ -108,9 +110,10 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): self.federation_handler = hs.get_handlers().federation_handler self.store = hs.get_datastore() self.clock = hs.get_clock() + self.member_handler = hs.get_room_member_handler() @staticmethod - def _serialize_payload(requester, room_id, user_id, remote_room_hosts): + def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content): """ Args: requester(Requester) @@ -121,13 +124,14 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): return { "requester": requester.serialize(), "remote_room_hosts": remote_room_hosts, + "content": content, } - @defer.inlineCallbacks - def _handle_request(self, request, room_id, user_id): + async def _handle_request(self, request, room_id, user_id): content = parse_json_object_from_request(request) remote_room_hosts = content["remote_room_hosts"] + event_content = content["content"] requester = Requester.deserialize(self.store, content["requester"]) @@ -137,10 +141,10 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): logger.info("remote_reject_invite: %s out of room: %s", user_id, room_id) try: - event = yield self.federation_handler.do_remotely_reject_invite( - remote_room_hosts, room_id, user_id + event, stream_id = await self.federation_handler.do_remotely_reject_invite( + remote_room_hosts, room_id, user_id, event_content, ) - ret = event.get_pdu_json() + event_id = event.event_id except Exception as e: # if we were unable to reject the exception, just mark # it as rejected on our end and plough ahead. @@ -148,12 +152,44 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): # The 'except' clause is very broad, but we need to # capture everything from DNS failures upwards # - logger.warn("Failed to reject invite: %s", e) + logger.warning("Failed to reject invite: %s", e) + + stream_id = await self.member_handler.locally_reject_invite( + user_id, room_id + ) + event_id = None + + return 200, {"event_id": event_id, "stream_id": stream_id} + + +class ReplicationLocallyRejectInviteRestServlet(ReplicationEndpoint): + """Rejects the invite for the user and room locally. + + Request format: + + POST /_synapse/replication/locally_reject_invite/:room_id/:user_id + + {} + """ + + NAME = "locally_reject_invite" + PATH_ARGS = ("room_id", "user_id") + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self.member_handler = hs.get_room_member_handler() + + @staticmethod + def _serialize_payload(room_id, user_id): + return {} + + async def _handle_request(self, request, room_id, user_id): + logger.info("locally_reject_invite: %s out of room: %s", user_id, room_id) - yield self.store.locally_reject_invite(user_id, room_id) - ret = {} + stream_id = await self.member_handler.locally_reject_invite(user_id, room_id) - return 200, ret + return 200, {"stream_id": stream_id} class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): @@ -209,3 +245,4 @@ def register_servlets(hs, http_server): ReplicationRemoteJoinRestServlet(hs).register(http_server) ReplicationRemoteRejectInviteRestServlet(hs).register(http_server) ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server) + ReplicationLocallyRejectInviteRestServlet(hs).register(http_server) diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py new file mode 100644 index 0000000000..ea1b33331b --- /dev/null +++ b/synapse/replication/http/presence.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING + +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint +from synapse.types import UserID + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class ReplicationBumpPresenceActiveTime(ReplicationEndpoint): + """We've seen the user do something that indicates they're interacting + with the app. + + The POST looks like: + + POST /_synapse/replication/bump_presence_active_time/<user_id> + + 200 OK + + {} + """ + + NAME = "bump_presence_active_time" + PATH_ARGS = ("user_id",) + METHOD = "POST" + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._presence_handler = hs.get_presence_handler() + + @staticmethod + def _serialize_payload(user_id): + return {} + + async def _handle_request(self, request, user_id): + await self._presence_handler.bump_presence_active_time( + UserID.from_string(user_id) + ) + + return ( + 200, + {}, + ) + + +class ReplicationPresenceSetState(ReplicationEndpoint): + """Set the presence state for a user. + + The POST looks like: + + POST /_synapse/replication/presence_set_state/<user_id> + + { + "state": { ... }, + "ignore_status_msg": false, + } + + 200 OK + + {} + """ + + NAME = "presence_set_state" + PATH_ARGS = ("user_id",) + METHOD = "POST" + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._presence_handler = hs.get_presence_handler() + + @staticmethod + def _serialize_payload(user_id, state, ignore_status_msg=False): + return { + "state": state, + "ignore_status_msg": ignore_status_msg, + } + + async def _handle_request(self, request, user_id): + content = parse_json_object_from_request(request) + + await self._presence_handler.set_state( + UserID.from_string(user_id), content["state"], content["ignore_status_msg"] + ) + + return ( + 200, + {}, + ) + + +def register_servlets(hs, http_server): + ReplicationBumpPresenceActiveTime(hs).register(http_server) + ReplicationPresenceSetState(hs).register(http_server) diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py index 38260256cf..0c4aca1291 100644 --- a/synapse/replication/http/register.py +++ b/synapse/replication/http/register.py @@ -15,8 +15,6 @@ import logging -from twisted.internet import defer - from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint @@ -74,11 +72,12 @@ class ReplicationRegisterServlet(ReplicationEndpoint): "address": address, } - @defer.inlineCallbacks - def _handle_request(self, request, user_id): + async def _handle_request(self, request, user_id): content = parse_json_object_from_request(request) - yield self.registration_handler.register_with_store( + self.registration_handler.check_registration_ratelimit(content["address"]) + + await self.registration_handler.register_with_store( user_id=user_id, password_hash=content["password_hash"], was_guest=content["was_guest"], @@ -117,14 +116,13 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint): """ return {"auth_result": auth_result, "access_token": access_token} - @defer.inlineCallbacks - def _handle_request(self, request, user_id): + async def _handle_request(self, request, user_id): content = parse_json_object_from_request(request) auth_result = content["auth_result"] access_token = content["access_token"] - yield self.registration_handler.post_registration_actions( + await self.registration_handler.post_registration_actions( user_id=user_id, auth_result=auth_result, access_token=access_token ) diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index adb9b2f7f4..c981723c1a 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -17,7 +17,8 @@ import logging from twisted.internet import defer -from synapse.events import event_type_from_format_version +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.events import make_event_from_dict from synapse.events.snapshot import EventContext from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint @@ -37,6 +38,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): { "event": { .. serialized event .. }, + "room_version": .., // "1", "2", "3", etc: the version of the room + // containing the event + "event_format_version": .., // 1,2,3 etc: the event format version "internal_metadata": { .. serialized internal_metadata .. }, "rejected_reason": .., // The event.rejected_reason field "context": { .. serialized event context .. }, @@ -54,6 +58,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): self.event_creation_handler = hs.get_event_creation_handler() self.store = hs.get_datastore() + self.storage = hs.get_storage() self.clock = hs.get_clock() @staticmethod @@ -76,6 +81,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): payload = { "event": event.get_pdu_json(), + "room_version": event.room_version.identifier, "event_format_version": event.format_version, "internal_metadata": event.internal_metadata.get_dict(), "rejected_reason": event.rejected_reason, @@ -87,21 +93,21 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): return payload - @defer.inlineCallbacks - def _handle_request(self, request, event_id): + async def _handle_request(self, request, event_id): with Measure(self.clock, "repl_send_event_parse"): content = parse_json_object_from_request(request) event_dict = content["event"] - format_ver = content["event_format_version"] + room_ver = KNOWN_ROOM_VERSIONS[content["room_version"]] internal_metadata = content["internal_metadata"] rejected_reason = content["rejected_reason"] - EventType = event_type_from_format_version(format_ver) - event = EventType(event_dict, internal_metadata, rejected_reason) + event = make_event_from_dict( + event_dict, room_ver, internal_metadata, rejected_reason + ) requester = Requester.deserialize(self.store, content["requester"]) - context = yield EventContext.deserialize(self.store, content["context"]) + context = EventContext.deserialize(self.storage, content["context"]) ratelimit = content["ratelimit"] extra_users = [UserID.from_string(u) for u in content["extra_users"]] @@ -113,11 +119,11 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): "Got event to send with ID: %s into room: %s", event.event_id, event.room_id ) - yield self.event_creation_handler.persist_and_notify_client_event( + stream_id = await self.event_creation_handler.persist_and_notify_client_event( requester, event, context, ratelimit=ratelimit, extra_users=extra_users ) - return 200, {} + return 200, {"stream_id": stream_id} def register_servlets(hs, http_server): diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py new file mode 100644 index 0000000000..bde97eef32 --- /dev/null +++ b/synapse/replication/http/streams.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from synapse.api.errors import SynapseError +from synapse.http.servlet import parse_integer +from synapse.replication.http._base import ReplicationEndpoint + +logger = logging.getLogger(__name__) + + +class ReplicationGetStreamUpdates(ReplicationEndpoint): + """Fetches stream updates from a server. Used for streams not persisted to + the database, e.g. typing notifications. + + The API looks like: + + GET /_synapse/replication/get_repl_stream_updates/<stream name>?from_token=0&to_token=10 + + 200 OK + + { + updates: [ ... ], + upto_token: 10, + limited: False, + } + + If there are more rows than can sensibly be returned in one lump, `limited` will be + set to true, and the caller should call again with a new `from_token`. + + """ + + NAME = "get_repl_stream_updates" + PATH_ARGS = ("stream_name",) + METHOD = "GET" + + def __init__(self, hs): + super().__init__(hs) + + self._instance_name = hs.get_instance_name() + self.streams = hs.get_replication_streams() + + @staticmethod + def _serialize_payload(stream_name, from_token, upto_token): + return {"from_token": from_token, "upto_token": upto_token} + + async def _handle_request(self, request, stream_name): + stream = self.streams.get(stream_name) + if stream is None: + raise SynapseError(400, "Unknown stream") + + from_token = parse_integer(request, "from_token", required=True) + upto_token = parse_integer(request, "upto_token", required=True) + + updates, upto_token, limited = await stream.get_updates_since( + self._instance_name, from_token, upto_token + ) + + return ( + 200, + {"updates": updates, "upto_token": upto_token, "limited": limited}, + ) + + +def register_servlets(hs, http_server): + ReplicationGetStreamUpdates(hs).register(http_server) |