summary refs log tree commit diff
path: root/synapse/app/appservice.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-03-27 16:33:44 +0100
committerErik Johnston <erik@matrix.org>2017-04-03 15:35:52 +0100
commit36c28bc467e53cf7318964a4dc986428644f27ed (patch)
treec2b46831a4298f68ae7489ababeb22767b56c26b /synapse/app/appservice.py
parentChange slave storage to use new replication interface (diff)
downloadsynapse-36c28bc467e53cf7318964a4dc986428644f27ed.tar.xz
Update all the workers and master to use TCP replication
Diffstat (limited to 'synapse/app/appservice.py')
-rw-r--r--synapse/app/appservice.py46
1 files changed, 19 insertions, 27 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index a6f1e7594e..13b69d8dee 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -26,8 +26,8 @@ from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.storage.engines import create_engine
-from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
@@ -36,7 +36,7 @@ from synapse.util.versionstring import get_version_string
 
 from synapse import events
 
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
 from twisted.web.resource import Resource
 
 from daemonize import Daemonize
@@ -120,30 +120,23 @@ class AppserviceServer(HomeServer):
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
-    @defer.inlineCallbacks
-    def replicate(self):
-        http_client = self.get_simple_http_client()
-        store = self.get_datastore()
-        replication_url = self.config.worker_replication_url
-        appservice_handler = self.get_application_service_handler()
-
-        @defer.inlineCallbacks
-        def replicate(results):
-            stream = results.get("events")
-            if stream:
-                max_stream_id = stream["position"]
-                yield appservice_handler.notify_interested_services(max_stream_id)
-
-        while True:
-            try:
-                args = store.stream_positions()
-                args["timeout"] = 30000
-                result = yield http_client.get_json(replication_url, args=args)
-                yield store.process_replication(result)
-                replicate(result)
-            except:
-                logger.exception("Error replicating from %r", replication_url)
-                yield sleep(30)
+        self.get_tcp_replication().start_replication(self)
+
+    def build_tcp_replication(self):
+        return ASReplicationHandler(self)
+
+
+class ASReplicationHandler(ReplicationClientHandler):
+    def __init__(self, hs):
+        super(ASReplicationHandler, self).__init__(hs.get_datastore())
+        self.appservice_handler = hs.get_application_service_handler()
+
+    def on_rdata(self, stream_name, token, rows):
+        super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
+
+        if stream_name == "events":
+            max_stream_id = self.store.get_room_max_stream_ordering()
+            self.appservice_handler.notify_interested_services(max_stream_id)
 
 
 def start(config_options):
@@ -199,7 +192,6 @@ def start(config_options):
             reactor.run()
 
     def start():
-        ps.replicate()
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()