summary refs log tree commit diff
path: root/synapse/app/appservice.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/appservice.py')
-rw-r--r--synapse/app/appservice.py50
1 files changed, 22 insertions, 28 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index a6f1e7594e..9a476efa63 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -26,17 +26,17 @@ from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.storage.engines import create_engine
-from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
 
 from synapse import events
 
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
 from twisted.web.resource import Resource
 
 from daemonize import Daemonize
@@ -120,30 +120,25 @@ class AppserviceServer(HomeServer):
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
-    @defer.inlineCallbacks
-    def replicate(self):
-        http_client = self.get_simple_http_client()
-        store = self.get_datastore()
-        replication_url = self.config.worker_replication_url
-        appservice_handler = self.get_application_service_handler()
-
-        @defer.inlineCallbacks
-        def replicate(results):
-            stream = results.get("events")
-            if stream:
-                max_stream_id = stream["position"]
-                yield appservice_handler.notify_interested_services(max_stream_id)
-
-        while True:
-            try:
-                args = store.stream_positions()
-                args["timeout"] = 30000
-                result = yield http_client.get_json(replication_url, args=args)
-                yield store.process_replication(result)
-                replicate(result)
-            except:
-                logger.exception("Error replicating from %r", replication_url)
-                yield sleep(30)
+        self.get_tcp_replication().start_replication(self)
+
+    def build_tcp_replication(self):
+        return ASReplicationHandler(self)
+
+
+class ASReplicationHandler(ReplicationClientHandler):
+    def __init__(self, hs):
+        super(ASReplicationHandler, self).__init__(hs.get_datastore())
+        self.appservice_handler = hs.get_application_service_handler()
+
+    def on_rdata(self, stream_name, token, rows):
+        super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
+
+        if stream_name == "events":
+            max_stream_id = self.store.get_room_max_stream_ordering()
+            preserve_fn(
+                self.appservice_handler.notify_interested_services
+            )(max_stream_id)
 
 
 def start(config_options):
@@ -199,7 +194,6 @@ def start(config_options):
             reactor.run()
 
     def start():
-        ps.replicate()
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()