summary refs log tree commit diff
path: root/synapse/app/federation_reader.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-05-18 13:54:27 +0100
committerErik Johnston <erik@matrix.org>2017-05-18 13:54:27 +0100
commit3accee1a8c2804620713ac4ff068a4a18a7de192 (patch)
tree5bfad3c1e8653713f8e14062732f892797279a92 /synapse/app/federation_reader.py
parentMerge pull request #2136 from bbigras/patch-1 (diff)
parentBump changelog and version (diff)
downloadsynapse-3accee1a8c2804620713ac4ff068a4a18a7de192.tar.xz
Merge branch 'release-v0.21.0' of github.com:matrix-org/synapse v0.21.0
Diffstat (limited to 'synapse/app/federation_reader.py')
-rw-r--r--synapse/app/federation_reader.py22
1 files changed, 5 insertions, 17 deletions
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index e52b0f240d..eb392e1c9d 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -27,9 +27,9 @@ from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.slave.storage.directory import DirectoryStore
+from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
-from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
@@ -42,7 +42,7 @@ from synapse.crypto import context_factory
 from synapse import events
 
 
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
 from twisted.web.resource import Resource
 
 from daemonize import Daemonize
@@ -134,21 +134,10 @@ class FederationReaderServer(HomeServer):
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
-    @defer.inlineCallbacks
-    def replicate(self):
-        http_client = self.get_simple_http_client()
-        store = self.get_datastore()
-        replication_url = self.config.worker_replication_url
+        self.get_tcp_replication().start_replication(self)
 
-        while True:
-            try:
-                args = store.stream_positions()
-                args["timeout"] = 30000
-                result = yield http_client.get_json(replication_url, args=args)
-                yield store.process_replication(result)
-            except:
-                logger.exception("Error replicating from %r", replication_url)
-                yield sleep(5)
+    def build_tcp_replication(self):
+        return ReplicationClientHandler(self.get_datastore())
 
 
 def start(config_options):
@@ -198,7 +187,6 @@ def start(config_options):
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
-        ss.replicate()
 
     reactor.callWhenRunning(start)