diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 81b85352b1..19b69e0e11 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -14,7 +14,16 @@
# limitations under the License.
from synapse.http.server import JsonResource
-from synapse.replication.http import federation, login, membership, register, send_event
+from synapse.replication.http import (
+ devices,
+ federation,
+ login,
+ membership,
+ presence,
+ register,
+ send_event,
+ streams,
+)
REPLICATION_PREFIX = "/_synapse/replication"
@@ -26,7 +35,13 @@ class ReplicationRestResource(JsonResource):
def register_servlets(self, hs):
send_event.register_servlets(hs, self)
- membership.register_servlets(hs, self)
federation.register_servlets(hs, self)
- login.register_servlets(hs, self)
- register.register_servlets(hs, self)
+ presence.register_servlets(hs, self)
+ membership.register_servlets(hs, self)
+
+ # The following can't currently be instantiated on workers.
+ if hs.config.worker.worker_app is None:
+ login.register_servlets(hs, self)
+ register.register_servlets(hs, self)
+ devices.register_servlets(hs, self)
+ streams.register_servlets(hs, self)
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 03560c1f0e..793cef6c26 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -16,6 +16,8 @@
import abc
import logging
import re
+from inspect import signature
+from typing import Dict, List, Tuple
from six import raise_from
from six.moves import urllib
@@ -43,7 +45,7 @@ class ReplicationEndpoint(object):
"""Helper base class for defining new replication HTTP endpoints.
This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..`
- (with an `/:txn_id` prefix for cached requests.), where NAME is a name,
+ (with a `/:txn_id` suffix for cached requests), where NAME is a name,
PATH_ARGS are a tuple of parameters to be encoded in the URL.
For example, if `NAME` is "send_event" and `PATH_ARGS` is `("event_id",)`,
@@ -59,6 +61,8 @@ class ReplicationEndpoint(object):
must call `register` to register the path with the HTTP server.
Requests can be sent by calling the client returned by `make_client`.
+ Requests are sent to master process by default, but can be sent to other
+ named processes by specifying an `instance_name` keyword argument.
Attributes:
NAME (str): A name for the endpoint, added to the path as well as used
@@ -78,9 +82,8 @@ class ReplicationEndpoint(object):
__metaclass__ = abc.ABCMeta
- NAME = abc.abstractproperty()
- PATH_ARGS = abc.abstractproperty()
-
+ NAME = abc.abstractproperty() # type: str # type: ignore
+ PATH_ARGS = abc.abstractproperty() # type: Tuple[str, ...] # type: ignore
METHOD = "POST"
CACHE = True
RETRY_ON_TIMEOUT = True
@@ -91,6 +94,16 @@ class ReplicationEndpoint(object):
hs, "repl." + self.NAME, timeout_ms=30 * 60 * 1000
)
+ # We reserve `instance_name` as a parameter to sending requests, so we
+ # assert here that sub classes don't try and use the name.
+ assert (
+ "instance_name" not in self.PATH_ARGS
+ ), "`instance_name` is a reserved paramater name"
+ assert (
+ "instance_name"
+ not in signature(self.__class__._serialize_payload).parameters
+ ), "`instance_name` is a reserved paramater name"
+
assert self.METHOD in ("PUT", "POST", "GET")
@abc.abstractmethod
@@ -110,14 +123,14 @@ class ReplicationEndpoint(object):
return {}
@abc.abstractmethod
- def _handle_request(self, request, **kwargs):
+ async def _handle_request(self, request, **kwargs):
"""Handle incoming request.
This is called with the request object and PATH_ARGS.
Returns:
- Deferred[dict]: A JSON serialisable dict to be used as response
- body of request.
+ tuple[int, dict]: HTTP status code and a JSON serialisable dict
+ to be used as response body of request.
"""
pass
@@ -128,14 +141,30 @@ class ReplicationEndpoint(object):
Returns a callable that accepts the same parameters as `_serialize_payload`.
"""
clock = hs.get_clock()
- host = hs.config.worker_replication_host
- port = hs.config.worker_replication_http_port
-
client = hs.get_simple_http_client()
+ local_instance_name = hs.get_instance_name()
+
+ master_host = hs.config.worker_replication_host
+ master_port = hs.config.worker_replication_http_port
+
+ instance_map = hs.config.worker.instance_map
@trace(opname="outgoing_replication_request")
@defer.inlineCallbacks
- def send_request(**kwargs):
+ def send_request(instance_name="master", **kwargs):
+ if instance_name == local_instance_name:
+ raise Exception("Trying to send HTTP request to self")
+ if instance_name == "master":
+ host = master_host
+ port = master_port
+ elif instance_name in instance_map:
+ host = instance_map[instance_name].host
+ port = instance_map[instance_name].port
+ else:
+ raise Exception(
+ "Instance %r not in 'instance_map' config" % (instance_name,)
+ )
+
data = yield cls._serialize_payload(**kwargs)
url_args = [
@@ -171,7 +200,7 @@ class ReplicationEndpoint(object):
# have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not.
while True:
- headers = {}
+ headers = {} # type: Dict[bytes, List[bytes]]
inject_active_span_byte_dict(headers, None, check_destination=False)
try:
result = yield request_func(uri, data, headers=headers)
@@ -180,7 +209,7 @@ class ReplicationEndpoint(object):
if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
raise
- logger.warn("%s request timed out", cls.NAME)
+ logger.warning("%s request timed out", cls.NAME)
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
@@ -207,7 +236,7 @@ class ReplicationEndpoint(object):
method = self.METHOD
if self.CACHE:
- handler = self._cached_handler
+ handler = self._cached_handler # type: ignore
url_args.append("txn_id")
args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
new file mode 100644
index 0000000000..e32aac0a25
--- /dev/null
+++ b/synapse/replication/http/devices.py
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.replication.http._base import ReplicationEndpoint
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
+ """Ask master to resync the device list for a user by contacting their
+ server.
+
+ This must happen on master so that the results can be correctly cached in
+ the database and streamed to workers.
+
+ Request format:
+
+ POST /_synapse/replication/user_device_resync/:user_id
+
+ {}
+
+ Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
+ response, e.g.:
+
+ {
+ "user_id": "@alice:example.org",
+ "devices": [
+ {
+ "device_id": "JLAFKJWSCS",
+ "keys": { ... },
+ "device_display_name": "Alice's Mobile Phone"
+ }
+ ]
+ }
+ """
+
+ NAME = "user_device_resync"
+ PATH_ARGS = ("user_id",)
+ CACHE = False
+
+ def __init__(self, hs):
+ super(ReplicationUserDevicesResyncRestServlet, self).__init__(hs)
+
+ self.device_list_updater = hs.get_device_handler().device_list_updater
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+
+ @staticmethod
+ def _serialize_payload(user_id):
+ return {}
+
+ async def _handle_request(self, request, user_id):
+ user_devices = await self.device_list_updater.user_device_resync(user_id)
+
+ return 200, user_devices
+
+
+def register_servlets(hs, http_server):
+ ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 2f16955954..c287c4e269 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -17,7 +17,8 @@ import logging
from twisted.internet import defer
-from synapse.events import event_type_from_format_version
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.events import make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
@@ -28,7 +29,7 @@ logger = logging.getLogger(__name__)
class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
"""Handles events newly received from federation, including persisting and
- notifying.
+ notifying. Returns the maximum stream ID of the persisted events.
The API looks like:
@@ -37,11 +38,21 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
{
"events": [{
"event": { .. serialized event .. },
+ "room_version": .., // "1", "2", "3", etc: the version of the room
+ // containing the event
+ "event_format_version": .., // 1,2,3 etc: the event format version
"internal_metadata": { .. serialized internal_metadata .. },
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
}],
"backfilled": false
+ }
+
+ 200 OK
+
+ {
+ "max_stream_id": 32443,
+ }
"""
NAME = "fed_send_events"
@@ -51,6 +62,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
super(ReplicationFederationSendEventsRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
+ self.storage = hs.get_storage()
self.clock = hs.get_clock()
self.federation_handler = hs.get_handlers().federation_handler
@@ -71,6 +83,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
event_payloads.append(
{
"event": event.get_pdu_json(),
+ "room_version": event.room_version.identifier,
"event_format_version": event.format_version,
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
@@ -82,8 +95,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
return payload
- @defer.inlineCallbacks
- def _handle_request(self, request):
+ async def _handle_request(self, request):
with Measure(self.clock, "repl_fed_send_events_parse"):
content = parse_json_object_from_request(request)
@@ -94,26 +106,27 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
event_and_contexts = []
for event_payload in event_payloads:
event_dict = event_payload["event"]
- format_ver = event_payload["event_format_version"]
+ room_ver = KNOWN_ROOM_VERSIONS[event_payload["room_version"]]
internal_metadata = event_payload["internal_metadata"]
rejected_reason = event_payload["rejected_reason"]
- EventType = event_type_from_format_version(format_ver)
- event = EventType(event_dict, internal_metadata, rejected_reason)
+ event = make_event_from_dict(
+ event_dict, room_ver, internal_metadata, rejected_reason
+ )
- context = yield EventContext.deserialize(
- self.store, event_payload["context"]
+ context = EventContext.deserialize(
+ self.storage, event_payload["context"]
)
event_and_contexts.append((event, context))
logger.info("Got %d events from federation", len(event_and_contexts))
- yield self.federation_handler.persist_events_and_notify(
+ max_stream_id = await self.federation_handler.persist_events_and_notify(
event_and_contexts, backfilled
)
- return 200, {}
+ return 200, {"max_stream_id": max_stream_id}
class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
@@ -144,8 +157,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
def _serialize_payload(edu_type, origin, content):
return {"origin": origin, "content": content}
- @defer.inlineCallbacks
- def _handle_request(self, request, edu_type):
+ async def _handle_request(self, request, edu_type):
with Measure(self.clock, "repl_fed_send_edu_parse"):
content = parse_json_object_from_request(request)
@@ -154,7 +166,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
logger.info("Got %r edu from %s", edu_type, origin)
- result = yield self.registry.on_edu(edu_type, origin, edu_content)
+ result = await self.registry.on_edu(edu_type, origin, edu_content)
return 200, result
@@ -193,8 +205,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
"""
return {"args": args}
- @defer.inlineCallbacks
- def _handle_request(self, request, query_type):
+ async def _handle_request(self, request, query_type):
with Measure(self.clock, "repl_fed_query_parse"):
content = parse_json_object_from_request(request)
@@ -202,7 +213,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
logger.info("Got %r query", query_type)
- result = yield self.registry.on_query(query_type, args)
+ result = await self.registry.on_query(query_type, args)
return 200, result
@@ -213,7 +224,7 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
Request format:
- POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id
+ POST /_synapse/replication/fed_cleanup_room/:room_id/:txn_id
{}
"""
@@ -234,10 +245,41 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
"""
return {}
- @defer.inlineCallbacks
- def _handle_request(self, request, room_id):
- yield self.store.clean_room_for_join(room_id)
+ async def _handle_request(self, request, room_id):
+ await self.store.clean_room_for_join(room_id)
+
+ return 200, {}
+
+
+class ReplicationStoreRoomOnInviteRestServlet(ReplicationEndpoint):
+ """Called to clean up any data in DB for a given room, ready for the
+ server to join the room.
+
+ Request format:
+
+ POST /_synapse/replication/store_room_on_invite/:room_id/:txn_id
+
+ {
+ "room_version": "1",
+ }
+ """
+
+ NAME = "store_room_on_invite"
+ PATH_ARGS = ("room_id",)
+
+ def __init__(self, hs):
+ super().__init__(hs)
+
+ self.store = hs.get_datastore()
+
+ @staticmethod
+ def _serialize_payload(room_id, room_version):
+ return {"room_version": room_version.identifier}
+ async def _handle_request(self, request, room_id):
+ content = parse_json_object_from_request(request)
+ room_version = KNOWN_ROOM_VERSIONS[content["room_version"]]
+ await self.store.maybe_store_room_on_invite(room_id, room_version)
return 200, {}
@@ -246,3 +288,4 @@ def register_servlets(hs, http_server):
ReplicationFederationSendEduRestServlet(hs).register(http_server)
ReplicationGetQueryRestServlet(hs).register(http_server)
ReplicationCleanRoomRestServlet(hs).register(http_server)
+ ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py
index 786f5232b2..798b9d3af5 100644
--- a/synapse/replication/http/login.py
+++ b/synapse/replication/http/login.py
@@ -15,8 +15,6 @@
import logging
-from twisted.internet import defer
-
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
@@ -52,15 +50,14 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint):
"is_guest": is_guest,
}
- @defer.inlineCallbacks
- def _handle_request(self, request, user_id):
+ async def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request)
device_id = content["device_id"]
initial_display_name = content["initial_display_name"]
is_guest = content["is_guest"]
- device_id, access_token = yield self.registration_handler.register_device(
+ device_id, access_token = await self.registration_handler.register_device(
user_id, device_id, initial_display_name, is_guest
)
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index b9ce3477ad..a7174c4a8f 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -14,14 +14,16 @@
# limitations under the License.
import logging
-
-from twisted.internet import defer
+from typing import TYPE_CHECKING
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID
from synapse.util.distributor import user_joined_room, user_left_room
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
logger = logging.getLogger(__name__)
@@ -65,8 +67,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
"content": content,
}
- @defer.inlineCallbacks
- def _handle_request(self, request, room_id, user_id):
+ async def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"]
@@ -79,11 +80,11 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
logger.info("remote_join: %s into room: %s", user_id, room_id)
- yield self.federation_handler.do_invite_join(
+ event_id, stream_id = await self.federation_handler.do_invite_join(
remote_room_hosts, room_id, user_id, event_content
)
- return 200, {}
+ return 200, {"event_id": event_id, "stream_id": stream_id}
class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
@@ -96,6 +97,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
{
"requester": ...,
"remote_room_hosts": [...],
+ "content": { ... }
}
"""
@@ -108,9 +110,10 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
self.federation_handler = hs.get_handlers().federation_handler
self.store = hs.get_datastore()
self.clock = hs.get_clock()
+ self.member_handler = hs.get_room_member_handler()
@staticmethod
- def _serialize_payload(requester, room_id, user_id, remote_room_hosts):
+ def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content):
"""
Args:
requester(Requester)
@@ -121,13 +124,14 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
return {
"requester": requester.serialize(),
"remote_room_hosts": remote_room_hosts,
+ "content": content,
}
- @defer.inlineCallbacks
- def _handle_request(self, request, room_id, user_id):
+ async def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"]
+ event_content = content["content"]
requester = Requester.deserialize(self.store, content["requester"])
@@ -137,10 +141,10 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
logger.info("remote_reject_invite: %s out of room: %s", user_id, room_id)
try:
- event = yield self.federation_handler.do_remotely_reject_invite(
- remote_room_hosts, room_id, user_id
+ event, stream_id = await self.federation_handler.do_remotely_reject_invite(
+ remote_room_hosts, room_id, user_id, event_content,
)
- ret = event.get_pdu_json()
+ event_id = event.event_id
except Exception as e:
# if we were unable to reject the exception, just mark
# it as rejected on our end and plough ahead.
@@ -148,12 +152,44 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
# The 'except' clause is very broad, but we need to
# capture everything from DNS failures upwards
#
- logger.warn("Failed to reject invite: %s", e)
+ logger.warning("Failed to reject invite: %s", e)
+
+ stream_id = await self.member_handler.locally_reject_invite(
+ user_id, room_id
+ )
+ event_id = None
+
+ return 200, {"event_id": event_id, "stream_id": stream_id}
+
+
+class ReplicationLocallyRejectInviteRestServlet(ReplicationEndpoint):
+ """Rejects the invite for the user and room locally.
+
+ Request format:
+
+ POST /_synapse/replication/locally_reject_invite/:room_id/:user_id
+
+ {}
+ """
+
+ NAME = "locally_reject_invite"
+ PATH_ARGS = ("room_id", "user_id")
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+
+ self.member_handler = hs.get_room_member_handler()
+
+ @staticmethod
+ def _serialize_payload(room_id, user_id):
+ return {}
+
+ async def _handle_request(self, request, room_id, user_id):
+ logger.info("locally_reject_invite: %s out of room: %s", user_id, room_id)
- yield self.store.locally_reject_invite(user_id, room_id)
- ret = {}
+ stream_id = await self.member_handler.locally_reject_invite(user_id, room_id)
- return 200, ret
+ return 200, {"stream_id": stream_id}
class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
@@ -209,3 +245,4 @@ def register_servlets(hs, http_server):
ReplicationRemoteJoinRestServlet(hs).register(http_server)
ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)
+ ReplicationLocallyRejectInviteRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py
new file mode 100644
index 0000000000..ea1b33331b
--- /dev/null
+++ b/synapse/replication/http/presence.py
@@ -0,0 +1,116 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import TYPE_CHECKING
+
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import UserID
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationBumpPresenceActiveTime(ReplicationEndpoint):
+ """We've seen the user do something that indicates they're interacting
+ with the app.
+
+ The POST looks like:
+
+ POST /_synapse/replication/bump_presence_active_time/<user_id>
+
+ 200 OK
+
+ {}
+ """
+
+ NAME = "bump_presence_active_time"
+ PATH_ARGS = ("user_id",)
+ METHOD = "POST"
+ CACHE = False
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+
+ self._presence_handler = hs.get_presence_handler()
+
+ @staticmethod
+ def _serialize_payload(user_id):
+ return {}
+
+ async def _handle_request(self, request, user_id):
+ await self._presence_handler.bump_presence_active_time(
+ UserID.from_string(user_id)
+ )
+
+ return (
+ 200,
+ {},
+ )
+
+
+class ReplicationPresenceSetState(ReplicationEndpoint):
+ """Set the presence state for a user.
+
+ The POST looks like:
+
+ POST /_synapse/replication/presence_set_state/<user_id>
+
+ {
+ "state": { ... },
+ "ignore_status_msg": false,
+ }
+
+ 200 OK
+
+ {}
+ """
+
+ NAME = "presence_set_state"
+ PATH_ARGS = ("user_id",)
+ METHOD = "POST"
+ CACHE = False
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+
+ self._presence_handler = hs.get_presence_handler()
+
+ @staticmethod
+ def _serialize_payload(user_id, state, ignore_status_msg=False):
+ return {
+ "state": state,
+ "ignore_status_msg": ignore_status_msg,
+ }
+
+ async def _handle_request(self, request, user_id):
+ content = parse_json_object_from_request(request)
+
+ await self._presence_handler.set_state(
+ UserID.from_string(user_id), content["state"], content["ignore_status_msg"]
+ )
+
+ return (
+ 200,
+ {},
+ )
+
+
+def register_servlets(hs, http_server):
+ ReplicationBumpPresenceActiveTime(hs).register(http_server)
+ ReplicationPresenceSetState(hs).register(http_server)
diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py
index 38260256cf..0c4aca1291 100644
--- a/synapse/replication/http/register.py
+++ b/synapse/replication/http/register.py
@@ -15,8 +15,6 @@
import logging
-from twisted.internet import defer
-
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
@@ -74,11 +72,12 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
"address": address,
}
- @defer.inlineCallbacks
- def _handle_request(self, request, user_id):
+ async def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request)
- yield self.registration_handler.register_with_store(
+ self.registration_handler.check_registration_ratelimit(content["address"])
+
+ await self.registration_handler.register_with_store(
user_id=user_id,
password_hash=content["password_hash"],
was_guest=content["was_guest"],
@@ -117,14 +116,13 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
"""
return {"auth_result": auth_result, "access_token": access_token}
- @defer.inlineCallbacks
- def _handle_request(self, request, user_id):
+ async def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request)
auth_result = content["auth_result"]
access_token = content["access_token"]
- yield self.registration_handler.post_registration_actions(
+ await self.registration_handler.post_registration_actions(
user_id=user_id, auth_result=auth_result, access_token=access_token
)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index adb9b2f7f4..c981723c1a 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -17,7 +17,8 @@ import logging
from twisted.internet import defer
-from synapse.events import event_type_from_format_version
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.events import make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
@@ -37,6 +38,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
{
"event": { .. serialized event .. },
+ "room_version": .., // "1", "2", "3", etc: the version of the room
+ // containing the event
+ "event_format_version": .., // 1,2,3 etc: the event format version
"internal_metadata": { .. serialized internal_metadata .. },
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
@@ -54,6 +58,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastore()
+ self.storage = hs.get_storage()
self.clock = hs.get_clock()
@staticmethod
@@ -76,6 +81,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
payload = {
"event": event.get_pdu_json(),
+ "room_version": event.room_version.identifier,
"event_format_version": event.format_version,
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
@@ -87,21 +93,21 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
return payload
- @defer.inlineCallbacks
- def _handle_request(self, request, event_id):
+ async def _handle_request(self, request, event_id):
with Measure(self.clock, "repl_send_event_parse"):
content = parse_json_object_from_request(request)
event_dict = content["event"]
- format_ver = content["event_format_version"]
+ room_ver = KNOWN_ROOM_VERSIONS[content["room_version"]]
internal_metadata = content["internal_metadata"]
rejected_reason = content["rejected_reason"]
- EventType = event_type_from_format_version(format_ver)
- event = EventType(event_dict, internal_metadata, rejected_reason)
+ event = make_event_from_dict(
+ event_dict, room_ver, internal_metadata, rejected_reason
+ )
requester = Requester.deserialize(self.store, content["requester"])
- context = yield EventContext.deserialize(self.store, content["context"])
+ context = EventContext.deserialize(self.storage, content["context"])
ratelimit = content["ratelimit"]
extra_users = [UserID.from_string(u) for u in content["extra_users"]]
@@ -113,11 +119,11 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
)
- yield self.event_creation_handler.persist_and_notify_client_event(
+ stream_id = await self.event_creation_handler.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)
- return 200, {}
+ return 200, {"stream_id": stream_id}
def register_servlets(hs, http_server):
diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
new file mode 100644
index 0000000000..bde97eef32
--- /dev/null
+++ b/synapse/replication/http/streams.py
@@ -0,0 +1,79 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.api.errors import SynapseError
+from synapse.http.servlet import parse_integer
+from synapse.replication.http._base import ReplicationEndpoint
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationGetStreamUpdates(ReplicationEndpoint):
+ """Fetches stream updates from a server. Used for streams not persisted to
+ the database, e.g. typing notifications.
+
+ The API looks like:
+
+ GET /_synapse/replication/get_repl_stream_updates/<stream name>?from_token=0&to_token=10
+
+ 200 OK
+
+ {
+ updates: [ ... ],
+ upto_token: 10,
+ limited: False,
+ }
+
+ If there are more rows than can sensibly be returned in one lump, `limited` will be
+ set to true, and the caller should call again with a new `from_token`.
+
+ """
+
+ NAME = "get_repl_stream_updates"
+ PATH_ARGS = ("stream_name",)
+ METHOD = "GET"
+
+ def __init__(self, hs):
+ super().__init__(hs)
+
+ self._instance_name = hs.get_instance_name()
+ self.streams = hs.get_replication_streams()
+
+ @staticmethod
+ def _serialize_payload(stream_name, from_token, upto_token):
+ return {"from_token": from_token, "upto_token": upto_token}
+
+ async def _handle_request(self, request, stream_name):
+ stream = self.streams.get(stream_name)
+ if stream is None:
+ raise SynapseError(400, "Unknown stream")
+
+ from_token = parse_integer(request, "from_token", required=True)
+ upto_token = parse_integer(request, "upto_token", required=True)
+
+ updates, upto_token, limited = await stream.get_updates_since(
+ self._instance_name, from_token, upto_token
+ )
+
+ return (
+ 200,
+ {"updates": updates, "upto_token": upto_token, "limited": limited},
+ )
+
+
+def register_servlets(hs, http_server):
+ ReplicationGetStreamUpdates(hs).register(http_server)
|