diff options
Diffstat (limited to 'synapse/app/generic_worker.py')
-rw-r--r-- | synapse/app/generic_worker.py | 21 |
1 files changed, 10 insertions, 11 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index f125658615..e0fdef5cdb 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -64,8 +64,9 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore -from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.replication.tcp.client import ReplicationClientFactory from synapse.replication.tcp.commands import ClearUserSyncsCommand +from synapse.replication.tcp.handler import WorkerReplicationDataHandler from synapse.replication.tcp.streams import ( AccountDataStream, DeviceListsStream, @@ -598,25 +599,26 @@ class GenericWorkerServer(HomeServer): else: logger.warning("Unrecognized listener type: %s", listener["type"]) - self.get_tcp_replication().start_replication(self) + factory = ReplicationClientFactory(self, self.config.worker_name) + host = self.config.worker_replication_host + port = self.config.worker_replication_port + self.get_reactor().connectTCP(host, port, factory) def remove_pusher(self, app_id, push_key, user_id): self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id) - def build_tcp_replication(self): - return GenericWorkerReplicationHandler(self) - def build_presence_handler(self): return GenericWorkerPresence(self) def build_typing_handler(self): return GenericWorkerTyping(self) + def build_replication_data_handler(self): + return GenericWorkerReplicationHandler(self) -class GenericWorkerReplicationHandler(ReplicationClientHandler): - def __init__(self, hs): - super(GenericWorkerReplicationHandler, self).__init__(hs.get_datastore()) +class GenericWorkerReplicationHandler(WorkerReplicationDataHandler): + def __init__(self, hs): self.store = hs.get_datastore() self.typing_handler = hs.get_typing_handler() # NB this is a SynchrotronPresence, not a normal PresenceHandler @@ -644,9 +646,6 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler): args.update(self.send_handler.stream_positions()) return args - def get_currently_syncing_users(self): - return self.presence_handler.get_currently_syncing_users() - async def process_and_notify(self, stream_name, token, rows): try: if self.send_handler: |