summary refs log tree commit diff
path: root/synapse/app/federation_reader.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-03-27 16:33:44 +0100
committerErik Johnston <erik@matrix.org>2017-04-03 15:35:52 +0100
commit36c28bc467e53cf7318964a4dc986428644f27ed (patch)
treec2b46831a4298f68ae7489ababeb22767b56c26b /synapse/app/federation_reader.py
parentChange slave storage to use new replication interface (diff)
downloadsynapse-36c28bc467e53cf7318964a4dc986428644f27ed.tar.xz
Update all the workers and master to use TCP replication
Diffstat (limited to 'synapse/app/federation_reader.py')
-rw-r--r--synapse/app/federation_reader.py22
1 files changed, 5 insertions, 17 deletions
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index e52b0f240d..eb392e1c9d 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -27,9 +27,9 @@ from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.slave.storage.directory import DirectoryStore
+from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
-from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
@@ -42,7 +42,7 @@ from synapse.crypto import context_factory
 from synapse import events
 
 
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
 from twisted.web.resource import Resource
 
 from daemonize import Daemonize
@@ -134,21 +134,10 @@ class FederationReaderServer(HomeServer):
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
-    @defer.inlineCallbacks
-    def replicate(self):
-        http_client = self.get_simple_http_client()
-        store = self.get_datastore()
-        replication_url = self.config.worker_replication_url
+        self.get_tcp_replication().start_replication(self)
 
-        while True:
-            try:
-                args = store.stream_positions()
-                args["timeout"] = 30000
-                result = yield http_client.get_json(replication_url, args=args)
-                yield store.process_replication(result)
-            except:
-                logger.exception("Error replicating from %r", replication_url)
-                yield sleep(5)
+    def build_tcp_replication(self):
+        return ReplicationClientHandler(self.get_datastore())
 
 
 def start(config_options):
@@ -198,7 +187,6 @@ def start(config_options):
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
-        ss.replicate()
 
     reactor.callWhenRunning(start)