summary refs log tree commit diff
path: root/synapse/replication/http
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/http')
-rw-r--r--synapse/replication/http/_base.py32
-rw-r--r--synapse/replication/http/federation.py51
-rw-r--r--synapse/replication/http/login.py7
-rw-r--r--synapse/replication/http/membership.py34
-rw-r--r--synapse/replication/http/register.py17
-rw-r--r--synapse/replication/http/send_event.py13
6 files changed, 65 insertions, 89 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index e81456ab2b..fe482e279f 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -17,11 +17,17 @@ import abc
 import logging
 import re
 
+from six import raise_from
 from six.moves import urllib
 
 from twisted.internet import defer
 
-from synapse.api.errors import CodeMessageException, HttpResponseException
+from synapse.api.errors import (
+    CodeMessageException,
+    HttpResponseException,
+    RequestSendFailed,
+    SynapseError,
+)
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.stringutils import random_string
 
@@ -77,8 +83,7 @@ class ReplicationEndpoint(object):
     def __init__(self, hs):
         if self.CACHE:
             self.response_cache = ResponseCache(
-                hs, "repl." + self.NAME,
-                timeout_ms=30 * 60 * 1000,
+                hs, "repl." + self.NAME, timeout_ms=30 * 60 * 1000
             )
 
         assert self.METHOD in ("PUT", "POST", "GET")
@@ -128,8 +133,7 @@ class ReplicationEndpoint(object):
             data = yield cls._serialize_payload(**kwargs)
 
             url_args = [
-                urllib.parse.quote(kwargs[name], safe='')
-                for name in cls.PATH_ARGS
+                urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
             ]
 
             if cls.CACHE:
@@ -150,7 +154,10 @@ class ReplicationEndpoint(object):
                 )
 
             uri = "http://%s:%s/_synapse/replication/%s/%s" % (
-                host, port, cls.NAME, "/".join(url_args)
+                host,
+                port,
+                cls.NAME,
+                "/".join(url_args),
             )
 
             try:
@@ -175,6 +182,8 @@ class ReplicationEndpoint(object):
                 # on the master process that we should send to the client. (And
                 # importantly, not stack traces everywhere)
                 raise e.to_synapse_error()
+            except RequestSendFailed as e:
+                raise_from(SynapseError(502, "Failed to talk to master"), e)
 
             defer.returnValue(result)
 
@@ -194,10 +203,7 @@ class ReplicationEndpoint(object):
             url_args.append("txn_id")
 
         args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
-        pattern = re.compile("^/_synapse/replication/%s/%s$" % (
-            self.NAME,
-            args
-        ))
+        pattern = re.compile("^/_synapse/replication/%s/%s$" % (self.NAME, args))
 
         http_server.register_paths(method, [pattern], handler)
 
@@ -211,8 +217,4 @@ class ReplicationEndpoint(object):
 
         assert self.CACHE
 
-        return self.response_cache.wrap(
-            txn_id,
-            self._handle_request,
-            request, **kwargs
-        )
+        return self.response_cache.wrap(txn_id, self._handle_request, request, **kwargs)
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 0f0a07c422..61eafbe708 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -68,18 +68,17 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         for event, context in event_and_contexts:
             serialized_context = yield context.serialize(event, store)
 
-            event_payloads.append({
-                "event": event.get_pdu_json(),
-                "event_format_version": event.format_version,
-                "internal_metadata": event.internal_metadata.get_dict(),
-                "rejected_reason": event.rejected_reason,
-                "context": serialized_context,
-            })
-
-        payload = {
-            "events": event_payloads,
-            "backfilled": backfilled,
-        }
+            event_payloads.append(
+                {
+                    "event": event.get_pdu_json(),
+                    "event_format_version": event.format_version,
+                    "internal_metadata": event.internal_metadata.get_dict(),
+                    "rejected_reason": event.rejected_reason,
+                    "context": serialized_context,
+                }
+            )
+
+        payload = {"events": event_payloads, "backfilled": backfilled}
 
         defer.returnValue(payload)
 
@@ -103,18 +102,15 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
                 event = EventType(event_dict, internal_metadata, rejected_reason)
 
                 context = yield EventContext.deserialize(
-                    self.store, event_payload["context"],
+                    self.store, event_payload["context"]
                 )
 
                 event_and_contexts.append((event, context))
 
-        logger.info(
-            "Got %d events from federation",
-            len(event_and_contexts),
-        )
+        logger.info("Got %d events from federation", len(event_and_contexts))
 
         yield self.federation_handler.persist_events_and_notify(
-            event_and_contexts, backfilled,
+            event_and_contexts, backfilled
         )
 
         defer.returnValue((200, {}))
@@ -146,10 +142,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
 
     @staticmethod
     def _serialize_payload(edu_type, origin, content):
-        return {
-            "origin": origin,
-            "content": content,
-        }
+        return {"origin": origin, "content": content}
 
     @defer.inlineCallbacks
     def _handle_request(self, request, edu_type):
@@ -159,10 +152,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
             origin = content["origin"]
             edu_content = content["content"]
 
-        logger.info(
-            "Got %r edu from %s",
-            edu_type, origin,
-        )
+        logger.info("Got %r edu from %s", edu_type, origin)
 
         result = yield self.registry.on_edu(edu_type, origin, edu_content)
 
@@ -201,9 +191,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
             query_type (str)
             args (dict): The arguments received for the given query type
         """
-        return {
-            "args": args,
-        }
+        return {"args": args}
 
     @defer.inlineCallbacks
     def _handle_request(self, request, query_type):
@@ -212,10 +200,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
 
             args = content["args"]
 
-        logger.info(
-            "Got %r query",
-            query_type,
-        )
+        logger.info("Got %r query", query_type)
 
         result = yield self.registry.on_query(query_type, args)
 
diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py
index 63bc0405ea..7c1197e5dd 100644
--- a/synapse/replication/http/login.py
+++ b/synapse/replication/http/login.py
@@ -61,13 +61,10 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint):
         is_guest = content["is_guest"]
 
         device_id, access_token = yield self.registration_handler.register_device(
-            user_id, device_id, initial_display_name, is_guest,
+            user_id, device_id, initial_display_name, is_guest
         )
 
-        defer.returnValue((200, {
-            "device_id": device_id,
-            "access_token": access_token,
-        }))
+        defer.returnValue((200, {"device_id": device_id, "access_token": access_token}))
 
 
 def register_servlets(hs, http_server):
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 81a2b204c7..0a76a3762f 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -40,7 +40,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
     """
 
     NAME = "remote_join"
-    PATH_ARGS = ("room_id", "user_id",)
+    PATH_ARGS = ("room_id", "user_id")
 
     def __init__(self, hs):
         super(ReplicationRemoteJoinRestServlet, self).__init__(hs)
@@ -50,8 +50,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    def _serialize_payload(requester, room_id, user_id, remote_room_hosts,
-                           content):
+    def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content):
         """
         Args:
             requester(Requester)
@@ -78,16 +77,10 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
         if requester.user:
             request.authenticated_entity = requester.user.to_string()
 
-        logger.info(
-            "remote_join: %s into room: %s",
-            user_id, room_id,
-        )
+        logger.info("remote_join: %s into room: %s", user_id, room_id)
 
         yield self.federation_handler.do_invite_join(
-            remote_room_hosts,
-            room_id,
-            user_id,
-            event_content,
+            remote_room_hosts, room_id, user_id, event_content
         )
 
         defer.returnValue((200, {}))
@@ -107,7 +100,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
     """
 
     NAME = "remote_reject_invite"
-    PATH_ARGS = ("room_id", "user_id",)
+    PATH_ARGS = ("room_id", "user_id")
 
     def __init__(self, hs):
         super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs)
@@ -141,16 +134,11 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
         if requester.user:
             request.authenticated_entity = requester.user.to_string()
 
-        logger.info(
-            "remote_reject_invite: %s out of room: %s",
-            user_id, room_id,
-        )
+        logger.info("remote_reject_invite: %s out of room: %s", user_id, room_id)
 
         try:
             event = yield self.federation_handler.do_remotely_reject_invite(
-                remote_room_hosts,
-                room_id,
-                user_id,
+                remote_room_hosts, room_id, user_id
             )
             ret = event.get_pdu_json()
         except Exception as e:
@@ -162,9 +150,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
             #
             logger.warn("Failed to reject invite: %s", e)
 
-            yield self.store.locally_reject_invite(
-                user_id, room_id
-            )
+            yield self.store.locally_reject_invite(user_id, room_id)
             ret = {}
 
         defer.returnValue((200, ret))
@@ -228,7 +214,7 @@ class ReplicationRegister3PIDGuestRestServlet(ReplicationEndpoint):
         logger.info("get_or_register_3pid_guest: %r", content)
 
         ret = yield self.registeration_handler.get_or_register_3pid_guest(
-            medium, address, inviter_user_id,
+            medium, address, inviter_user_id
         )
 
         defer.returnValue((200, ret))
@@ -264,7 +250,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
             user_id (str)
             change (str): Either "joined" or "left"
         """
-        assert change in ("joined", "left",)
+        assert change in ("joined", "left")
 
         return {}
 
diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py
index 912a5ac341..f81a0f1b8f 100644
--- a/synapse/replication/http/register.py
+++ b/synapse/replication/http/register.py
@@ -37,8 +37,16 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
 
     @staticmethod
     def _serialize_payload(
-        user_id, token, password_hash, was_guest, make_guest, appservice_id,
-        create_profile_with_displayname, admin, user_type, address,
+        user_id,
+        token,
+        password_hash,
+        was_guest,
+        make_guest,
+        appservice_id,
+        create_profile_with_displayname,
+        admin,
+        user_type,
+        address,
     ):
         """
         Args:
@@ -85,7 +93,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
             create_profile_with_displayname=content["create_profile_with_displayname"],
             admin=content["admin"],
             user_type=content["user_type"],
-            address=content["address"]
+            address=content["address"],
         )
 
         defer.returnValue((200, {}))
@@ -104,8 +112,7 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
         self.registration_handler = hs.get_registration_handler()
 
     @staticmethod
-    def _serialize_payload(user_id, auth_result, access_token, bind_email,
-                           bind_msisdn):
+    def _serialize_payload(user_id, auth_result, access_token, bind_email, bind_msisdn):
         """
         Args:
             user_id (str): The user ID that consented
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 3635015eda..034763fe99 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -45,6 +45,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
             "extra_users": [],
         }
     """
+
     NAME = "send_event"
     PATH_ARGS = ("event_id",)
 
@@ -57,8 +58,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
 
     @staticmethod
     @defer.inlineCallbacks
-    def _serialize_payload(event_id, store, event, context, requester,
-                           ratelimit, extra_users):
+    def _serialize_payload(
+        event_id, store, event, context, requester, ratelimit, extra_users
+    ):
         """
         Args:
             event_id (str)
@@ -108,14 +110,11 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
             request.authenticated_entity = requester.user.to_string()
 
         logger.info(
-            "Got event to send with ID: %s into room: %s",
-            event.event_id, event.room_id,
+            "Got event to send with ID: %s into room: %s", event.event_id, event.room_id
         )
 
         yield self.event_creation_handler.persist_and_notify_client_event(
-            requester, event, context,
-            ratelimit=ratelimit,
-            extra_users=extra_users,
+            requester, event, context, ratelimit=ratelimit, extra_users=extra_users
         )
 
         defer.returnValue((200, {}))