diff options
Diffstat (limited to 'synapse/replication/http')
-rw-r--r-- | synapse/replication/http/_base.py | 32 | ||||
-rw-r--r-- | synapse/replication/http/federation.py | 51 | ||||
-rw-r--r-- | synapse/replication/http/login.py | 7 | ||||
-rw-r--r-- | synapse/replication/http/membership.py | 34 | ||||
-rw-r--r-- | synapse/replication/http/register.py | 17 | ||||
-rw-r--r-- | synapse/replication/http/send_event.py | 13 |
6 files changed, 65 insertions, 89 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index e81456ab2b..fe482e279f 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -17,11 +17,17 @@ import abc import logging import re +from six import raise_from from six.moves import urllib from twisted.internet import defer -from synapse.api.errors import CodeMessageException, HttpResponseException +from synapse.api.errors import ( + CodeMessageException, + HttpResponseException, + RequestSendFailed, + SynapseError, +) from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import random_string @@ -77,8 +83,7 @@ class ReplicationEndpoint(object): def __init__(self, hs): if self.CACHE: self.response_cache = ResponseCache( - hs, "repl." + self.NAME, - timeout_ms=30 * 60 * 1000, + hs, "repl." + self.NAME, timeout_ms=30 * 60 * 1000 ) assert self.METHOD in ("PUT", "POST", "GET") @@ -128,8 +133,7 @@ class ReplicationEndpoint(object): data = yield cls._serialize_payload(**kwargs) url_args = [ - urllib.parse.quote(kwargs[name], safe='') - for name in cls.PATH_ARGS + urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS ] if cls.CACHE: @@ -150,7 +154,10 @@ class ReplicationEndpoint(object): ) uri = "http://%s:%s/_synapse/replication/%s/%s" % ( - host, port, cls.NAME, "/".join(url_args) + host, + port, + cls.NAME, + "/".join(url_args), ) try: @@ -175,6 +182,8 @@ class ReplicationEndpoint(object): # on the master process that we should send to the client. (And # importantly, not stack traces everywhere) raise e.to_synapse_error() + except RequestSendFailed as e: + raise_from(SynapseError(502, "Failed to talk to master"), e) defer.returnValue(result) @@ -194,10 +203,7 @@ class ReplicationEndpoint(object): url_args.append("txn_id") args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args) - pattern = re.compile("^/_synapse/replication/%s/%s$" % ( - self.NAME, - args - )) + pattern = re.compile("^/_synapse/replication/%s/%s$" % (self.NAME, args)) http_server.register_paths(method, [pattern], handler) @@ -211,8 +217,4 @@ class ReplicationEndpoint(object): assert self.CACHE - return self.response_cache.wrap( - txn_id, - self._handle_request, - request, **kwargs - ) + return self.response_cache.wrap(txn_id, self._handle_request, request, **kwargs) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 0f0a07c422..61eafbe708 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -68,18 +68,17 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): for event, context in event_and_contexts: serialized_context = yield context.serialize(event, store) - event_payloads.append({ - "event": event.get_pdu_json(), - "event_format_version": event.format_version, - "internal_metadata": event.internal_metadata.get_dict(), - "rejected_reason": event.rejected_reason, - "context": serialized_context, - }) - - payload = { - "events": event_payloads, - "backfilled": backfilled, - } + event_payloads.append( + { + "event": event.get_pdu_json(), + "event_format_version": event.format_version, + "internal_metadata": event.internal_metadata.get_dict(), + "rejected_reason": event.rejected_reason, + "context": serialized_context, + } + ) + + payload = {"events": event_payloads, "backfilled": backfilled} defer.returnValue(payload) @@ -103,18 +102,15 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): event = EventType(event_dict, internal_metadata, rejected_reason) context = yield EventContext.deserialize( - self.store, event_payload["context"], + self.store, event_payload["context"] ) event_and_contexts.append((event, context)) - logger.info( - "Got %d events from federation", - len(event_and_contexts), - ) + logger.info("Got %d events from federation", len(event_and_contexts)) yield self.federation_handler.persist_events_and_notify( - event_and_contexts, backfilled, + event_and_contexts, backfilled ) defer.returnValue((200, {})) @@ -146,10 +142,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): @staticmethod def _serialize_payload(edu_type, origin, content): - return { - "origin": origin, - "content": content, - } + return {"origin": origin, "content": content} @defer.inlineCallbacks def _handle_request(self, request, edu_type): @@ -159,10 +152,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): origin = content["origin"] edu_content = content["content"] - logger.info( - "Got %r edu from %s", - edu_type, origin, - ) + logger.info("Got %r edu from %s", edu_type, origin) result = yield self.registry.on_edu(edu_type, origin, edu_content) @@ -201,9 +191,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint): query_type (str) args (dict): The arguments received for the given query type """ - return { - "args": args, - } + return {"args": args} @defer.inlineCallbacks def _handle_request(self, request, query_type): @@ -212,10 +200,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint): args = content["args"] - logger.info( - "Got %r query", - query_type, - ) + logger.info("Got %r query", query_type) result = yield self.registry.on_query(query_type, args) diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py index 63bc0405ea..7c1197e5dd 100644 --- a/synapse/replication/http/login.py +++ b/synapse/replication/http/login.py @@ -61,13 +61,10 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint): is_guest = content["is_guest"] device_id, access_token = yield self.registration_handler.register_device( - user_id, device_id, initial_display_name, is_guest, + user_id, device_id, initial_display_name, is_guest ) - defer.returnValue((200, { - "device_id": device_id, - "access_token": access_token, - })) + defer.returnValue((200, {"device_id": device_id, "access_token": access_token})) def register_servlets(hs, http_server): diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index 81a2b204c7..0a76a3762f 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -40,7 +40,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint): """ NAME = "remote_join" - PATH_ARGS = ("room_id", "user_id",) + PATH_ARGS = ("room_id", "user_id") def __init__(self, hs): super(ReplicationRemoteJoinRestServlet, self).__init__(hs) @@ -50,8 +50,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint): self.clock = hs.get_clock() @staticmethod - def _serialize_payload(requester, room_id, user_id, remote_room_hosts, - content): + def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content): """ Args: requester(Requester) @@ -78,16 +77,10 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint): if requester.user: request.authenticated_entity = requester.user.to_string() - logger.info( - "remote_join: %s into room: %s", - user_id, room_id, - ) + logger.info("remote_join: %s into room: %s", user_id, room_id) yield self.federation_handler.do_invite_join( - remote_room_hosts, - room_id, - user_id, - event_content, + remote_room_hosts, room_id, user_id, event_content ) defer.returnValue((200, {})) @@ -107,7 +100,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): """ NAME = "remote_reject_invite" - PATH_ARGS = ("room_id", "user_id",) + PATH_ARGS = ("room_id", "user_id") def __init__(self, hs): super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs) @@ -141,16 +134,11 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): if requester.user: request.authenticated_entity = requester.user.to_string() - logger.info( - "remote_reject_invite: %s out of room: %s", - user_id, room_id, - ) + logger.info("remote_reject_invite: %s out of room: %s", user_id, room_id) try: event = yield self.federation_handler.do_remotely_reject_invite( - remote_room_hosts, - room_id, - user_id, + remote_room_hosts, room_id, user_id ) ret = event.get_pdu_json() except Exception as e: @@ -162,9 +150,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): # logger.warn("Failed to reject invite: %s", e) - yield self.store.locally_reject_invite( - user_id, room_id - ) + yield self.store.locally_reject_invite(user_id, room_id) ret = {} defer.returnValue((200, ret)) @@ -228,7 +214,7 @@ class ReplicationRegister3PIDGuestRestServlet(ReplicationEndpoint): logger.info("get_or_register_3pid_guest: %r", content) ret = yield self.registeration_handler.get_or_register_3pid_guest( - medium, address, inviter_user_id, + medium, address, inviter_user_id ) defer.returnValue((200, ret)) @@ -264,7 +250,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): user_id (str) change (str): Either "joined" or "left" """ - assert change in ("joined", "left",) + assert change in ("joined", "left") return {} diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py index 912a5ac341..f81a0f1b8f 100644 --- a/synapse/replication/http/register.py +++ b/synapse/replication/http/register.py @@ -37,8 +37,16 @@ class ReplicationRegisterServlet(ReplicationEndpoint): @staticmethod def _serialize_payload( - user_id, token, password_hash, was_guest, make_guest, appservice_id, - create_profile_with_displayname, admin, user_type, address, + user_id, + token, + password_hash, + was_guest, + make_guest, + appservice_id, + create_profile_with_displayname, + admin, + user_type, + address, ): """ Args: @@ -85,7 +93,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint): create_profile_with_displayname=content["create_profile_with_displayname"], admin=content["admin"], user_type=content["user_type"], - address=content["address"] + address=content["address"], ) defer.returnValue((200, {})) @@ -104,8 +112,7 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint): self.registration_handler = hs.get_registration_handler() @staticmethod - def _serialize_payload(user_id, auth_result, access_token, bind_email, - bind_msisdn): + def _serialize_payload(user_id, auth_result, access_token, bind_email, bind_msisdn): """ Args: user_id (str): The user ID that consented diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 3635015eda..034763fe99 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -45,6 +45,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): "extra_users": [], } """ + NAME = "send_event" PATH_ARGS = ("event_id",) @@ -57,8 +58,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): @staticmethod @defer.inlineCallbacks - def _serialize_payload(event_id, store, event, context, requester, - ratelimit, extra_users): + def _serialize_payload( + event_id, store, event, context, requester, ratelimit, extra_users + ): """ Args: event_id (str) @@ -108,14 +110,11 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): request.authenticated_entity = requester.user.to_string() logger.info( - "Got event to send with ID: %s into room: %s", - event.event_id, event.room_id, + "Got event to send with ID: %s into room: %s", event.event_id, event.room_id ) yield self.event_creation_handler.persist_and_notify_client_event( - requester, event, context, - ratelimit=ratelimit, - extra_users=extra_users, + requester, event, context, ratelimit=ratelimit, extra_users=extra_users ) defer.returnValue((200, {})) |