summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-07-26 11:44:22 +0100
committerErik Johnston <erik@matrix.org>2018-08-06 15:23:31 +0100
commita3f5bf79a0fc0ea6d59069945f53717a3e9c6581 (patch)
tree5e8cabe33af2923ff80502b9e3e4eda6118af65b /synapse
parentAdd replication APIs for persisting federation events (diff)
downloadsynapse-a3f5bf79a0fc0ea6d59069945f53717a3e9c6581.tar.xz
Add EDU/query handling over replication
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/federation_server.py43
-rw-r--r--synapse/handlers/federation.py24
-rw-r--r--synapse/replication/http/federation.py2
-rw-r--r--synapse/server.py6
4 files changed, 62 insertions, 13 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index bf89d568af..941e30a596 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -33,6 +33,10 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js
 from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Edu, Transaction
 from synapse.http.endpoint import parse_server_name
+from synapse.replication.http.federation import (
+    ReplicationFederationSendEduRestServlet,
+    ReplicationGetQueryRestServlet,
+)
 from synapse.types import get_domain_from_id
 from synapse.util import async
 from synapse.util.caches.response_cache import ResponseCache
@@ -745,6 +749,8 @@ class FederationHandlerRegistry(object):
         if edu_type in self.edu_handlers:
             raise KeyError("Already have an EDU handler for %s" % (edu_type,))
 
+        logger.info("Registering federation EDU handler for %r", edu_type)
+
         self.edu_handlers[edu_type] = handler
 
     def register_query_handler(self, query_type, handler):
@@ -763,6 +769,8 @@ class FederationHandlerRegistry(object):
                 "Already have a Query handler for %s" % (query_type,)
             )
 
+        logger.info("Registering federation query handler for %r", query_type)
+
         self.query_handlers[query_type] = handler
 
     @defer.inlineCallbacks
@@ -785,3 +793,38 @@ class FederationHandlerRegistry(object):
             raise NotFoundError("No handler for Query type '%s'" % (query_type,))
 
         return handler(args)
+
+
+class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
+    def __init__(self, hs):
+        self.config = hs.config
+        self.http_client = hs.get_simple_http_client()
+        self.clock = hs.get_clock()
+
+        self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
+        self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
+
+        super(ReplicationFederationHandlerRegistry, self).__init__()
+
+    def on_edu(self, edu_type, origin, content):
+        handler = self.edu_handlers.get(edu_type)
+        if handler:
+            return super(ReplicationFederationHandlerRegistry, self).on_edu(
+                edu_type, origin, content,
+            )
+
+        return self._send_edu(
+                edu_type=edu_type,
+                origin=origin,
+                content=content,
+        )
+
+    def on_query(self, query_type, args):
+        handler = self.query_handlers.get(query_type)
+        if handler:
+            return handler(args)
+
+        return self._get_query_client(
+                query_type=query_type,
+                args=args,
+        )
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0524dec942..d2cbb12df3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -44,8 +44,10 @@ from synapse.crypto.event_signing import (
     compute_event_signature,
 )
 from synapse.events.validator import EventValidator
-from synapse.replication.http.federation import send_federation_events_to_master
-from synapse.replication.http.membership import notify_user_membership_change
+from synapse.replication.http.federation import (
+    ReplicationFederationSendEventsRestServlet,
+)
+from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
 from synapse.state import resolve_events_with_factory
 from synapse.types import UserID, get_domain_from_id
 from synapse.util import logcontext, unwrapFirstError
@@ -91,6 +93,13 @@ class FederationHandler(BaseHandler):
         self.config = hs.config
         self.http_client = hs.get_simple_http_client()
 
+        self._send_events_to_master = (
+            ReplicationFederationSendEventsRestServlet.make_client(hs)
+        )
+        self._notify_user_membership_change = (
+            ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
+        )
+
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
         self._room_pdu_linearizer = Linearizer("fed_room_pdu")
@@ -2318,12 +2327,8 @@ class FederationHandler(BaseHandler):
             Deferred
         """
         if self.config.worker_app:
-            yield send_federation_events_to_master(
-                clock=self.hs.get_clock(),
+            yield self._send_events_to_master(
                 store=self.store,
-                client=self.http_client,
-                host=self.config.worker_replication_host,
-                port=self.config.worker_replication_http_port,
                 event_and_contexts=event_and_contexts,
                 backfilled=backfilled
             )
@@ -2381,10 +2386,7 @@ class FederationHandler(BaseHandler):
         """Called when a new user has joined the room
         """
         if self.config.worker_app:
-            return notify_user_membership_change(
-                client=self.http_client,
-                host=self.config.worker_replication_host,
-                port=self.config.worker_replication_http_port,
+            return self._notify_user_membership_change(
                 room_id=room_id,
                 user_id=user.to_string(),
                 change="joined",
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index f39aaa89be..3fa7bd64c7 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -59,8 +59,8 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         self.notifier = hs.get_notifier()
         self.pusher_pool = hs.get_pusherpool()
 
-    @defer.inlineCallbacks
     @staticmethod
+    @defer.inlineCallbacks
     def _serialize_payload(store, event_and_contexts, backfilled):
         """
         Args:
diff --git a/synapse/server.py b/synapse/server.py
index 140be9ebe8..26228d8c72 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -36,6 +36,7 @@ from synapse.federation.federation_client import FederationClient
 from synapse.federation.federation_server import (
     FederationHandlerRegistry,
     FederationServer,
+    ReplicationFederationHandlerRegistry,
 )
 from synapse.federation.send_queue import FederationRemoteSendQueue
 from synapse.federation.transaction_queue import TransactionQueue
@@ -423,7 +424,10 @@ class HomeServer(object):
         return RoomMemberMasterHandler(self)
 
     def build_federation_registry(self):
-        return FederationHandlerRegistry()
+        if self.config.worker_app:
+            return ReplicationFederationHandlerRegistry(self)
+        else:
+            return FederationHandlerRegistry()
 
     def build_server_notices_manager(self):
         if self.config.worker_app: