diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index a6f1e7594e..13b69d8dee 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -26,8 +26,8 @@ from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.storage.engines import create_engine
-from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
@@ -36,7 +36,7 @@ from synapse.util.versionstring import get_version_string
from synapse import events
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
from twisted.web.resource import Resource
from daemonize import Daemonize
@@ -120,30 +120,23 @@ class AppserviceServer(HomeServer):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
- @defer.inlineCallbacks
- def replicate(self):
- http_client = self.get_simple_http_client()
- store = self.get_datastore()
- replication_url = self.config.worker_replication_url
- appservice_handler = self.get_application_service_handler()
-
- @defer.inlineCallbacks
- def replicate(results):
- stream = results.get("events")
- if stream:
- max_stream_id = stream["position"]
- yield appservice_handler.notify_interested_services(max_stream_id)
-
- while True:
- try:
- args = store.stream_positions()
- args["timeout"] = 30000
- result = yield http_client.get_json(replication_url, args=args)
- yield store.process_replication(result)
- replicate(result)
- except:
- logger.exception("Error replicating from %r", replication_url)
- yield sleep(30)
+ self.get_tcp_replication().start_replication(self)
+
+ def build_tcp_replication(self):
+ return ASReplicationHandler(self)
+
+
+class ASReplicationHandler(ReplicationClientHandler):
+ def __init__(self, hs):
+ super(ASReplicationHandler, self).__init__(hs.get_datastore())
+ self.appservice_handler = hs.get_application_service_handler()
+
+ def on_rdata(self, stream_name, token, rows):
+ super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
+
+ if stream_name == "events":
+ max_stream_id = self.store.get_room_max_stream_ordering()
+ self.appservice_handler.notify_interested_services(max_stream_id)
def start(config_options):
@@ -199,7 +192,6 @@ def start(config_options):
reactor.run()
def start():
- ps.replicate()
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()
|