summary refs log tree commit diff
diff options
context:
space:
mode:
-rwxr-xr-xsynapse/app/homeserver.py2
-rw-r--r--synapse/federation/federation_server.py10
-rw-r--r--synapse/federation/transport/server.py2
-rw-r--r--synapse/handlers/device.py3
-rw-r--r--synapse/handlers/directory.py2
-rw-r--r--synapse/handlers/e2e_keys.py2
-rw-r--r--synapse/handlers/federation.py4
-rw-r--r--synapse/handlers/presence.py1
-rw-r--r--synapse/handlers/profile.py2
-rw-r--r--synapse/handlers/room_list.py2
-rw-r--r--synapse/handlers/room_member.py3
-rw-r--r--synapse/server.py13
12 files changed, 19 insertions, 27 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e375f2bbcf..503f461ab4 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -348,7 +348,7 @@ def setup(config_options):
         hs.get_state_handler().start_caching()
         hs.get_datastore().start_profiling()
         hs.get_datastore().start_doing_background_updates()
-        hs.get_replication_layer().start_get_pdu_cache()
+        hs.get_replication_client().start_get_pdu_cache()
 
         register_memory_metrics(hs)
 
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index dd73fc50b2..740ef96280 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -54,27 +54,19 @@ class FederationServer(FederationBase):
         super(FederationServer, self).__init__(hs)
 
         self.auth = hs.get_auth()
+        self.handler = hs.get_handlers().federation_handler
 
         self._server_linearizer = async.Linearizer("fed_server")
         self._transaction_linearizer = async.Linearizer("fed_txn_handler")
 
         self.transaction_actions = TransactionActions(self.store)
 
-        self.handler = None
-
         self.registry = hs.get_federation_registry()
 
         # We cache responses to state queries, as they take a while and often
         # come in waves.
         self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
 
-    def set_handler(self, handler):
-        """Sets the handler that the replication layer will use to communicate
-        receipt of new PDUs from other home servers. The required methods are
-        documented on :py:class:`.ReplicationHandler`.
-        """
-        self.handler = handler
-
     @defer.inlineCallbacks
     @log_function
     def on_backfill_request(self, origin, room_id, versions, limit):
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 06c16ba4fa..04b83e691a 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -1190,7 +1190,7 @@ GROUP_ATTESTATION_SERVLET_CLASSES = (
 def register_servlets(hs, resource, authenticator, ratelimiter):
     for servletclass in FEDERATION_SERVLET_CLASSES:
         servletclass(
-            handler=hs.get_replication_layer(),
+            handler=hs.get_replication_server(),
             authenticator=authenticator,
             ratelimiter=ratelimiter,
             server_name=hs.hostname,
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 9e58dbe64e..fcf41630d6 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -37,7 +37,6 @@ class DeviceHandler(BaseHandler):
         self.state = hs.get_state_handler()
         self._auth_handler = hs.get_auth_handler()
         self.federation_sender = hs.get_federation_sender()
-        self.federation = hs.get_replication_layer()
 
         self._edu_updater = DeviceListEduUpdater(hs, self)
 
@@ -432,7 +431,7 @@ class DeviceListEduUpdater(object):
 
     def __init__(self, hs, device_handler):
         self.store = hs.get_datastore()
-        self.federation = hs.get_replication_layer()
+        self.federation = hs.get_replication_client()
         self.clock = hs.get_clock()
         self.device_handler = device_handler
 
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index e955cb1f3c..dfe04eb1c1 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -36,7 +36,7 @@ class DirectoryHandler(BaseHandler):
         self.appservice_handler = hs.get_application_service_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
 
-        self.federation = hs.get_replication_layer()
+        self.federation = hs.get_replication_client()
         hs.get_federation_registry().register_query_handler(
             "directory", self.on_directory_query
         )
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 57f50a4e27..0ca8d036ee 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
 class E2eKeysHandler(object):
     def __init__(self, hs):
         self.store = hs.get_datastore()
-        self.federation = hs.get_replication_layer()
+        self.federation = hs.get_replication_client()
         self.device_handler = hs.get_device_handler()
         self.is_mine = hs.is_mine
         self.clock = hs.get_clock()
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 520612683e..cfd4379160 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -68,7 +68,7 @@ class FederationHandler(BaseHandler):
         self.hs = hs
 
         self.store = hs.get_datastore()
-        self.replication_layer = hs.get_replication_layer()
+        self.replication_layer = hs.get_replication_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
@@ -78,8 +78,6 @@ class FederationHandler(BaseHandler):
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
 
-        self.replication_layer.set_handler(self)
-
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
         self._room_pdu_linearizer = Linearizer("fed_room_pdu")
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index b11ae78350..a5e501897c 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -93,7 +93,6 @@ class PresenceHandler(object):
         self.store = hs.get_datastore()
         self.wheel_timer = WheelTimer()
         self.notifier = hs.get_notifier()
-        self.replication = hs.get_replication_layer()
         self.federation = hs.get_federation_sender()
 
         self.state = hs.get_state_handler()
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index c386c79bbd..0cfac60d74 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -31,7 +31,7 @@ class ProfileHandler(BaseHandler):
     def __init__(self, hs):
         super(ProfileHandler, self).__init__(hs)
 
-        self.federation = hs.get_replication_layer()
+        self.federation = hs.get_replication_client()
         hs.get_federation_registry().register_query_handler(
             "profile", self.on_profile_query
         )
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index dfa09141ed..f79bd8902f 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -409,7 +409,7 @@ class RoomListHandler(BaseHandler):
     def _get_remote_list_cached(self, server_name, limit=None, since_token=None,
                                 search_filter=None, include_all_networks=False,
                                 third_party_instance_id=None,):
-        repl_layer = self.hs.get_replication_layer()
+        repl_layer = self.hs.get_replication_client()
         if search_filter:
             # We can't cache when asking for search
             return repl_layer.get_public_rooms(
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ed3b97730d..e2f0527712 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -55,7 +55,6 @@ class RoomMemberHandler(object):
         self.registration_handler = hs.get_handlers().registration_handler
         self.profile_handler = hs.get_profile_handler()
         self.event_creation_hander = hs.get_event_creation_handler()
-        self.replication_layer = hs.get_replication_layer()
 
         self.member_linearizer = Linearizer(name="member")
 
@@ -212,7 +211,7 @@ class RoomMemberHandler(object):
         # if this is a join with a 3pid signature, we may need to turn a 3pid
         # invite into a normal invite before we can handle the join.
         if third_party_signed is not None:
-            yield self.replication_layer.exchange_third_party_invite(
+            yield self.federation_handler.exchange_third_party_invite(
                 third_party_signed["sender"],
                 target.to_string(),
                 room_id,
diff --git a/synapse/server.py b/synapse/server.py
index 1bc8d6f702..894e9c2acf 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -32,7 +32,8 @@ from synapse.appservice.scheduler import ApplicationServiceScheduler
 from synapse.crypto.keyring import Keyring
 from synapse.events.builder import EventBuilderFactory
 from synapse.events.spamcheck import SpamChecker
-from synapse.federation import initialize_http_replication
+from synapse.federation.federation_client import FederationClient
+from synapse.federation.federation_server import FederationServer
 from synapse.federation.send_queue import FederationRemoteSendQueue
 from synapse.federation.federation_server import FederationHandlerRegistry
 from synapse.federation.transport.client import TransportLayerClient
@@ -100,7 +101,8 @@ class HomeServer(object):
     DEPENDENCIES = [
         'http_client',
         'db_pool',
-        'replication_layer',
+        'replication_client',
+        'replication_server',
         'handlers',
         'v1auth',
         'auth',
@@ -197,8 +199,11 @@ class HomeServer(object):
     def get_ratelimiter(self):
         return self.ratelimiter
 
-    def build_replication_layer(self):
-        return initialize_http_replication(self)
+    def build_replication_client(self):
+        return FederationClient(self)
+
+    def build_replication_server(self):
+        return FederationServer(self)
 
     def build_handlers(self):
         return Handlers(self)