diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 3b3352798d..0a8ce9bc66 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -156,7 +156,6 @@ def start(config_options):
)
ss.setup()
- ss.get_handlers()
ss.start_listening(config.worker_listeners)
def start():
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index fc0b9e8c04..eb593c5278 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -161,7 +161,6 @@ def start(config_options):
)
ss.setup()
- ss.get_handlers()
ss.start_listening(config.worker_listeners)
def start():
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 4de43c41f0..20d157911b 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -144,7 +144,6 @@ def start(config_options):
)
ss.setup()
- ss.get_handlers()
ss.start_listening(config.worker_listeners)
def start():
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index e32ee8fe93..816c080d18 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -211,7 +211,6 @@ def start(config_options):
)
ss.setup()
- ss.get_handlers()
ss.start_listening(config.worker_listeners)
def start():
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e375f2bbcf..503f461ab4 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -348,7 +348,7 @@ def setup(config_options):
hs.get_state_handler().start_caching()
hs.get_datastore().start_profiling()
hs.get_datastore().start_doing_background_updates()
- hs.get_replication_layer().start_get_pdu_cache()
+ hs.get_replication_client().start_get_pdu_cache()
register_memory_metrics(hs)
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 1ed1ca8772..84c5791b3b 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -158,7 +158,6 @@ def start(config_options):
)
ss.setup()
- ss.get_handlers()
ss.start_listening(config.worker_listeners)
def start():
diff --git a/synapse/federation/__init__.py b/synapse/federation/__init__.py
index 2e32d245ba..f5f0bdfca3 100644
--- a/synapse/federation/__init__.py
+++ b/synapse/federation/__init__.py
@@ -15,11 +15,3 @@
""" This package includes all the federation specific logic.
"""
-
-from .replication import ReplicationLayer
-
-
-def initialize_http_replication(hs):
- transport = hs.get_federation_transport_client()
-
- return ReplicationLayer(hs, transport)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index f6fd2e86e9..bea7fd0b71 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -54,27 +54,19 @@ class FederationServer(FederationBase):
super(FederationServer, self).__init__(hs)
self.auth = hs.get_auth()
+ self.handler = hs.get_handlers().federation_handler
self._server_linearizer = async.Linearizer("fed_server")
self._transaction_linearizer = async.Linearizer("fed_txn_handler")
self.transaction_actions = TransactionActions(self.store)
- self.handler = None
-
self.registry = hs.get_federation_registry()
# We cache responses to state queries, as they take a while and often
# come in waves.
self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
- def set_handler(self, handler):
- """Sets the handler that the replication layer will use to communicate
- receipt of new PDUs from other home servers. The required methods are
- documented on :py:class:`.ReplicationHandler`.
- """
- self.handler = handler
-
@defer.inlineCallbacks
@log_function
def on_backfill_request(self, origin, room_id, versions, limit):
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
deleted file mode 100644
index b8b3a3f933..0000000000
--- a/synapse/federation/replication.py
+++ /dev/null
@@ -1,51 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""This layer is responsible for replicating with remote home servers using
-a given transport.
-"""
-
-from .federation_client import FederationClient
-from .federation_server import FederationServer
-
-import logging
-
-
-logger = logging.getLogger(__name__)
-
-
-class ReplicationLayer(FederationClient, FederationServer):
- """This layer is responsible for replicating with remote home servers over
- the given transport. I.e., does the sending and receiving of PDUs to
- remote home servers.
-
- The layer communicates with the rest of the server via a registered
- ReplicationHandler.
-
- In more detail, the layer:
- * Receives incoming data and processes it into transactions and pdus.
- * Fetches any PDUs it thinks it might have missed.
- * Keeps the current state for contexts up to date by applying the
- suitable conflict resolution.
- * Sends outgoing pdus wrapped in transactions.
- * Fills out the references to previous pdus/transactions appropriately
- for outgoing data.
- """
-
- def __init__(self, hs, transport_layer):
- super(ReplicationLayer, self).__init__(hs)
-
- def __str__(self):
- return "<ReplicationLayer(%s)>" % self.server_name
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 06c16ba4fa..04b83e691a 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -1190,7 +1190,7 @@ GROUP_ATTESTATION_SERVLET_CLASSES = (
def register_servlets(hs, resource, authenticator, ratelimiter):
for servletclass in FEDERATION_SERVLET_CLASSES:
servletclass(
- handler=hs.get_replication_layer(),
+ handler=hs.get_replication_server(),
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 9e58dbe64e..fcf41630d6 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -37,7 +37,6 @@ class DeviceHandler(BaseHandler):
self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler()
self.federation_sender = hs.get_federation_sender()
- self.federation = hs.get_replication_layer()
self._edu_updater = DeviceListEduUpdater(hs, self)
@@ -432,7 +431,7 @@ class DeviceListEduUpdater(object):
def __init__(self, hs, device_handler):
self.store = hs.get_datastore()
- self.federation = hs.get_replication_layer()
+ self.federation = hs.get_replication_client()
self.clock = hs.get_clock()
self.device_handler = device_handler
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index e955cb1f3c..dfe04eb1c1 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -36,7 +36,7 @@ class DirectoryHandler(BaseHandler):
self.appservice_handler = hs.get_application_service_handler()
self.event_creation_handler = hs.get_event_creation_handler()
- self.federation = hs.get_replication_layer()
+ self.federation = hs.get_replication_client()
hs.get_federation_registry().register_query_handler(
"directory", self.on_directory_query
)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 57f50a4e27..0ca8d036ee 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
class E2eKeysHandler(object):
def __init__(self, hs):
self.store = hs.get_datastore()
- self.federation = hs.get_replication_layer()
+ self.federation = hs.get_replication_client()
self.device_handler = hs.get_device_handler()
self.is_mine = hs.is_mine
self.clock = hs.get_clock()
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 520612683e..cfd4379160 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -68,7 +68,7 @@ class FederationHandler(BaseHandler):
self.hs = hs
self.store = hs.get_datastore()
- self.replication_layer = hs.get_replication_layer()
+ self.replication_layer = hs.get_replication_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
@@ -78,8 +78,6 @@ class FederationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
- self.replication_layer.set_handler(self)
-
# When joining a room we need to queue any events for that room up
self.room_queues = {}
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index b11ae78350..a5e501897c 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -93,7 +93,6 @@ class PresenceHandler(object):
self.store = hs.get_datastore()
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
- self.replication = hs.get_replication_layer()
self.federation = hs.get_federation_sender()
self.state = hs.get_state_handler()
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index c386c79bbd..0cfac60d74 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -31,7 +31,7 @@ class ProfileHandler(BaseHandler):
def __init__(self, hs):
super(ProfileHandler, self).__init__(hs)
- self.federation = hs.get_replication_layer()
+ self.federation = hs.get_replication_client()
hs.get_federation_registry().register_query_handler(
"profile", self.on_profile_query
)
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index dfa09141ed..f79bd8902f 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -409,7 +409,7 @@ class RoomListHandler(BaseHandler):
def _get_remote_list_cached(self, server_name, limit=None, since_token=None,
search_filter=None, include_all_networks=False,
third_party_instance_id=None,):
- repl_layer = self.hs.get_replication_layer()
+ repl_layer = self.hs.get_replication_client()
if search_filter:
# We can't cache when asking for search
return repl_layer.get_public_rooms(
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0cac039c6c..2dbd6b5184 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -55,7 +55,6 @@ class RoomMemberHandler(object):
self.registration_handler = hs.get_handlers().registration_handler
self.profile_handler = hs.get_profile_handler()
self.event_creation_hander = hs.get_event_creation_handler()
- self.replication_layer = hs.get_replication_layer()
self.member_linearizer = Linearizer(name="member")
@@ -212,7 +211,7 @@ class RoomMemberHandler(object):
# if this is a join with a 3pid signature, we may need to turn a 3pid
# invite into a normal invite before we can handle the join.
if third_party_signed is not None:
- yield self.replication_layer.exchange_third_party_invite(
+ yield self.federation_handler.exchange_third_party_invite(
third_party_signed["sender"],
target.to_string(),
room_id,
diff --git a/synapse/server.py b/synapse/server.py
index 1bc8d6f702..894e9c2acf 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -32,7 +32,8 @@ from synapse.appservice.scheduler import ApplicationServiceScheduler
from synapse.crypto.keyring import Keyring
from synapse.events.builder import EventBuilderFactory
from synapse.events.spamcheck import SpamChecker
-from synapse.federation import initialize_http_replication
+from synapse.federation.federation_client import FederationClient
+from synapse.federation.federation_server import FederationServer
from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.federation_server import FederationHandlerRegistry
from synapse.federation.transport.client import TransportLayerClient
@@ -100,7 +101,8 @@ class HomeServer(object):
DEPENDENCIES = [
'http_client',
'db_pool',
- 'replication_layer',
+ 'replication_client',
+ 'replication_server',
'handlers',
'v1auth',
'auth',
@@ -197,8 +199,11 @@ class HomeServer(object):
def get_ratelimiter(self):
return self.ratelimiter
- def build_replication_layer(self):
- return initialize_http_replication(self)
+ def build_replication_client(self):
+ return FederationClient(self)
+
+ def build_replication_server(self):
+ return FederationServer(self)
def build_handlers(self):
return Handlers(self)
|