summary refs log tree commit diff
path: root/synapse/app/generic_worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/generic_worker.py')
-rw-r--r--synapse/app/generic_worker.py21
1 files changed, 10 insertions, 11 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index f125658615..e0fdef5cdb 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -64,8 +64,9 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import SlavedTransactionStore
-from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.client import ReplicationClientFactory
 from synapse.replication.tcp.commands import ClearUserSyncsCommand
+from synapse.replication.tcp.handler import WorkerReplicationDataHandler
 from synapse.replication.tcp.streams import (
     AccountDataStream,
     DeviceListsStream,
@@ -598,25 +599,26 @@ class GenericWorkerServer(HomeServer):
             else:
                 logger.warning("Unrecognized listener type: %s", listener["type"])
 
-        self.get_tcp_replication().start_replication(self)
+        factory = ReplicationClientFactory(self, self.config.worker_name)
+        host = self.config.worker_replication_host
+        port = self.config.worker_replication_port
+        self.get_reactor().connectTCP(host, port, factory)
 
     def remove_pusher(self, app_id, push_key, user_id):
         self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
 
-    def build_tcp_replication(self):
-        return GenericWorkerReplicationHandler(self)
-
     def build_presence_handler(self):
         return GenericWorkerPresence(self)
 
     def build_typing_handler(self):
         return GenericWorkerTyping(self)
 
+    def build_replication_data_handler(self):
+        return GenericWorkerReplicationHandler(self)
 
-class GenericWorkerReplicationHandler(ReplicationClientHandler):
-    def __init__(self, hs):
-        super(GenericWorkerReplicationHandler, self).__init__(hs.get_datastore())
 
+class GenericWorkerReplicationHandler(WorkerReplicationDataHandler):
+    def __init__(self, hs):
         self.store = hs.get_datastore()
         self.typing_handler = hs.get_typing_handler()
         # NB this is a SynchrotronPresence, not a normal PresenceHandler
@@ -644,9 +646,6 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
             args.update(self.send_handler.stream_positions())
         return args
 
-    def get_currently_syncing_users(self):
-        return self.presence_handler.get_currently_syncing_users()
-
     async def process_and_notify(self, stream_name, token, rows):
         try:
             if self.send_handler: