diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index a6f1e7594e..9a476efa63 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -26,17 +26,17 @@ from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.storage.engines import create_engine
-from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
from synapse import events
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
from twisted.web.resource import Resource
from daemonize import Daemonize
@@ -120,30 +120,25 @@ class AppserviceServer(HomeServer):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
- @defer.inlineCallbacks
- def replicate(self):
- http_client = self.get_simple_http_client()
- store = self.get_datastore()
- replication_url = self.config.worker_replication_url
- appservice_handler = self.get_application_service_handler()
-
- @defer.inlineCallbacks
- def replicate(results):
- stream = results.get("events")
- if stream:
- max_stream_id = stream["position"]
- yield appservice_handler.notify_interested_services(max_stream_id)
-
- while True:
- try:
- args = store.stream_positions()
- args["timeout"] = 30000
- result = yield http_client.get_json(replication_url, args=args)
- yield store.process_replication(result)
- replicate(result)
- except:
- logger.exception("Error replicating from %r", replication_url)
- yield sleep(30)
+ self.get_tcp_replication().start_replication(self)
+
+ def build_tcp_replication(self):
+ return ASReplicationHandler(self)
+
+
+class ASReplicationHandler(ReplicationClientHandler):
+ def __init__(self, hs):
+ super(ASReplicationHandler, self).__init__(hs.get_datastore())
+ self.appservice_handler = hs.get_application_service_handler()
+
+ def on_rdata(self, stream_name, token, rows):
+ super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
+
+ if stream_name == "events":
+ max_stream_id = self.store.get_room_max_stream_ordering()
+ preserve_fn(
+ self.appservice_handler.notify_interested_services
+ )(max_stream_id)
def start(config_options):
@@ -199,7 +194,6 @@ def start(config_options):
reactor.run()
def start():
- ps.replicate()
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index e4ea3ab933..9b72c649ac 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -30,11 +30,11 @@ from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import PublicRoomListRestServlet
from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine
-from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
@@ -45,7 +45,7 @@ from synapse.crypto import context_factory
from synapse import events
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
from twisted.web.resource import Resource
from daemonize import Daemonize
@@ -145,21 +145,10 @@ class ClientReaderServer(HomeServer):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
- @defer.inlineCallbacks
- def replicate(self):
- http_client = self.get_simple_http_client()
- store = self.get_datastore()
- replication_url = self.config.worker_replication_url
+ self.get_tcp_replication().start_replication(self)
- while True:
- try:
- args = store.stream_positions()
- args["timeout"] = 30000
- result = yield http_client.get_json(replication_url, args=args)
- yield store.process_replication(result)
- except:
- logger.exception("Error replicating from %r", replication_url)
- yield sleep(5)
+ def build_tcp_replication(self):
+ return ReplicationClientHandler(self.get_datastore())
def start(config_options):
@@ -209,7 +198,6 @@ def start(config_options):
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
- ss.replicate()
reactor.callWhenRunning(start)
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index e52b0f240d..eb392e1c9d 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -27,9 +27,9 @@ from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.directory import DirectoryStore
+from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
-from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
@@ -42,7 +42,7 @@ from synapse.crypto import context_factory
from synapse import events
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
from twisted.web.resource import Resource
from daemonize import Daemonize
@@ -134,21 +134,10 @@ class FederationReaderServer(HomeServer):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
- @defer.inlineCallbacks
- def replicate(self):
- http_client = self.get_simple_http_client()
- store = self.get_datastore()
- replication_url = self.config.worker_replication_url
+ self.get_tcp_replication().start_replication(self)
- while True:
- try:
- args = store.stream_positions()
- args["timeout"] = 30000
- result = yield http_client.get_json(replication_url, args=args)
- yield store.process_replication(result)
- except:
- logger.exception("Error replicating from %r", replication_url)
- yield sleep(5)
+ def build_tcp_replication(self):
+ return ReplicationClientHandler(self.get_datastore())
def start(config_options):
@@ -198,7 +187,6 @@ def start(config_options):
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
- ss.replicate()
reactor.callWhenRunning(start)
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 76c4cc54d1..cbddc80ca9 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -31,11 +31,12 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
-from synapse.util.async import sleep
+from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
@@ -59,7 +60,28 @@ class FederationSenderSlaveStore(
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedRegistrationStore, SlavedDeviceStore,
):
- pass
+ def __init__(self, db_conn, hs):
+ super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
+
+ # We pull out the current federation stream position now so that we
+ # always have a known value for the federation position in memory so
+ # that we don't have to bounce via a deferred once when we start the
+ # replication streams.
+ self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
+
+ def _get_federation_out_pos(self, db_conn):
+ sql = (
+ "SELECT stream_id FROM federation_stream_position"
+ " WHERE type = ?"
+ )
+ sql = self.database_engine.convert_param_style(sql)
+
+ txn = db_conn.cursor()
+ txn.execute(sql, ("federation",))
+ rows = txn.fetchall()
+ txn.close()
+
+ return rows[0][0] if rows else -1
class FederationSenderServer(HomeServer):
@@ -127,26 +149,27 @@ class FederationSenderServer(HomeServer):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
- @defer.inlineCallbacks
- def replicate(self):
- http_client = self.get_simple_http_client()
- store = self.get_datastore()
- replication_url = self.config.worker_replication_url
- send_handler = FederationSenderHandler(self)
-
- send_handler.on_start()
-
- while True:
- try:
- args = store.stream_positions()
- args.update((yield send_handler.stream_positions()))
- args["timeout"] = 30000
- result = yield http_client.get_json(replication_url, args=args)
- yield store.process_replication(result)
- yield send_handler.process_replication(result)
- except:
- logger.exception("Error replicating from %r", replication_url)
- yield sleep(30)
+ self.get_tcp_replication().start_replication(self)
+
+ def build_tcp_replication(self):
+ return FederationSenderReplicationHandler(self)
+
+
+class FederationSenderReplicationHandler(ReplicationClientHandler):
+ def __init__(self, hs):
+ super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
+ self.send_handler = FederationSenderHandler(hs, self)
+
+ def on_rdata(self, stream_name, token, rows):
+ super(FederationSenderReplicationHandler, self).on_rdata(
+ stream_name, token, rows
+ )
+ self.send_handler.process_replication_rows(stream_name, token, rows)
+
+ def get_streams_to_replicate(self):
+ args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate()
+ args.update(self.send_handler.stream_positions())
+ return args
def start(config_options):
@@ -205,7 +228,6 @@ def start(config_options):
reactor.run()
def start():
- ps.replicate()
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()
@@ -229,9 +251,15 @@ class FederationSenderHandler(object):
"""Processes the replication stream and forwards the appropriate entries
to the federation sender.
"""
- def __init__(self, hs):
+ def __init__(self, hs, replication_client):
self.store = hs.get_datastore()
self.federation_sender = hs.get_federation_sender()
+ self.replication_client = replication_client
+
+ self.federation_position = self.store.federation_out_pos_startup
+ self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
+
+ self._last_ack = self.federation_position
self._room_serials = {}
self._room_typing = {}
@@ -243,25 +271,13 @@ class FederationSenderHandler(object):
self.store.get_room_max_stream_ordering()
)
- @defer.inlineCallbacks
def stream_positions(self):
- stream_id = yield self.store.get_federation_out_pos("federation")
- defer.returnValue({
- "federation": stream_id,
-
- # Ack stuff we've "processed", this should only be called from
- # one process.
- "federation_ack": stream_id,
- })
+ return {"federation": self.federation_position}
- @defer.inlineCallbacks
- def process_replication(self, result):
+ def process_replication_rows(self, stream_name, token, rows):
# The federation stream contains things that we want to send out, e.g.
# presence, typing, etc.
- fed_stream = result.get("federation")
- if fed_stream:
- latest_id = int(fed_stream["position"])
-
+ if stream_name == "federation":
# The federation stream containis a bunch of different types of
# rows that need to be handled differently. We parse the rows, put
# them into the appropriate collection and then send them off.
@@ -272,8 +288,9 @@ class FederationSenderHandler(object):
device_destinations = set()
# Parse the rows in the stream
- for row in fed_stream["rows"]:
- position, typ, content_js = row
+ for row in rows:
+ typ = row.type
+ content_js = row.data
content = json.loads(content_js)
if typ == send_queue.PRESENCE_TYPE:
@@ -325,16 +342,27 @@ class FederationSenderHandler(object):
for destination in device_destinations:
self.federation_sender.send_device_messages(destination)
- # Record where we are in the stream.
- yield self.store.update_federation_out_pos(
- "federation", latest_id
- )
+ preserve_fn(self.update_token)(token)
# We also need to poke the federation sender when new events happen
- event_stream = result.get("events")
- if event_stream:
- latest_pos = event_stream["position"]
- self.federation_sender.notify_new_events(latest_pos)
+ elif stream_name == "events":
+ self.federation_sender.notify_new_events(token)
+
+ @defer.inlineCallbacks
+ def update_token(self, token):
+ self.federation_position = token
+
+ # We linearize here to ensure we don't have races updating the token
+ with (yield self._fed_position_linearizer.queue(None)):
+ if self._last_ack < self.federation_position:
+ yield self.store.update_federation_out_pos(
+ "federation", self.federation_position
+ )
+
+ # We ACK this token over replication so that the master can drop
+ # its in memory queues
+ self.replication_client.send_federation_ack(self.federation_position)
+ self._last_ack = self.federation_position
if __name__ == '__main__':
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 1444e69a42..26c4416956 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -25,13 +25,13 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.media.v1.media_repository import MediaRepositoryResource
from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine
from synapse.storage.media_repository import MediaRepositoryStore
-from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
@@ -45,7 +45,7 @@ from synapse.crypto import context_factory
from synapse import events
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
from twisted.web.resource import Resource
from daemonize import Daemonize
@@ -142,21 +142,10 @@ class MediaRepositoryServer(HomeServer):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
- @defer.inlineCallbacks
- def replicate(self):
- http_client = self.get_simple_http_client()
- store = self.get_datastore()
- replication_url = self.config.worker_replication_url
+ self.get_tcp_replication().start_replication(self)
- while True:
- try:
- args = store.stream_positions()
- args["timeout"] = 30000
- result = yield http_client.get_json(replication_url, args=args)
- yield store.process_replication(result)
- except:
- logger.exception("Error replicating from %r", replication_url)
- yield sleep(5)
+ def build_tcp_replication(self):
+ return ReplicationClientHandler(self.get_datastore())
def start(config_options):
@@ -206,7 +195,6 @@ def start(config_options):
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
- ss.replicate()
reactor.callWhenRunning(start)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index ab682e52ec..f9114acfcb 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -27,9 +27,9 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
+from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.storage.engines import create_engine
from synapse.storage import DataStore
-from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn, \
PreserveLoggingContext
@@ -89,7 +89,6 @@ class PusherSlaveStore(
class PusherServer(HomeServer):
-
def get_db_conn(self, run_new_connection=True):
# Any param beginning with cp_ is a parameter for adbapi, and should
# not be passed to the database engine.
@@ -109,16 +108,7 @@ class PusherServer(HomeServer):
logger.info("Finished setting up.")
def remove_pusher(self, app_id, push_key, user_id):
- http_client = self.get_simple_http_client()
- replication_url = self.config.worker_replication_url
- url = replication_url + "/remove_pushers"
- return http_client.post_json_get_json(url, {
- "remove": [{
- "app_id": app_id,
- "push_key": push_key,
- "user_id": user_id,
- }]
- })
+ self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -166,73 +156,52 @@ class PusherServer(HomeServer):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+ self.get_tcp_replication().start_replication(self)
+
+ def build_tcp_replication(self):
+ return PusherReplicationHandler(self)
+
+
+class PusherReplicationHandler(ReplicationClientHandler):
+ def __init__(self, hs):
+ super(PusherReplicationHandler, self).__init__(hs.get_datastore())
+
+ self.pusher_pool = hs.get_pusherpool()
+
+ def on_rdata(self, stream_name, token, rows):
+ super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
+ preserve_fn(self.poke_pushers)(stream_name, token, rows)
+
@defer.inlineCallbacks
- def replicate(self):
- http_client = self.get_simple_http_client()
- store = self.get_datastore()
- replication_url = self.config.worker_replication_url
- pusher_pool = self.get_pusherpool()
-
- def stop_pusher(user_id, app_id, pushkey):
- key = "%s:%s" % (app_id, pushkey)
- pushers_for_user = pusher_pool.pushers.get(user_id, {})
- pusher = pushers_for_user.pop(key, None)
- if pusher is None:
- return
- logger.info("Stopping pusher %r / %r", user_id, key)
- pusher.on_stop()
-
- def start_pusher(user_id, app_id, pushkey):
- key = "%s:%s" % (app_id, pushkey)
- logger.info("Starting pusher %r / %r", user_id, key)
- return pusher_pool._refresh_pusher(app_id, pushkey, user_id)
-
- @defer.inlineCallbacks
- def poke_pushers(results):
- pushers_rows = set(
- map(tuple, results.get("pushers", {}).get("rows", []))
+ def poke_pushers(self, stream_name, token, rows):
+ if stream_name == "pushers":
+ for row in rows:
+ if row.deleted:
+ yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
+ else:
+ yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
+ elif stream_name == "events":
+ yield self.pusher_pool.on_new_notifications(
+ token, token,
)
- deleted_pushers_rows = set(
- map(tuple, results.get("deleted_pushers", {}).get("rows", []))
+ elif stream_name == "receipts":
+ yield self.pusher_pool.on_new_receipts(
+ token, token, set(row.room_id for row in rows)
)
- for row in sorted(pushers_rows | deleted_pushers_rows):
- if row in deleted_pushers_rows:
- user_id, app_id, pushkey = row[1:4]
- stop_pusher(user_id, app_id, pushkey)
- elif row in pushers_rows:
- user_id = row[1]
- app_id = row[5]
- pushkey = row[8]
- yield start_pusher(user_id, app_id, pushkey)
-
- stream = results.get("events")
- if stream and stream["rows"]:
- min_stream_id = stream["rows"][0][0]
- max_stream_id = stream["position"]
- preserve_fn(pusher_pool.on_new_notifications)(
- min_stream_id, max_stream_id
- )
-
- stream = results.get("receipts")
- if stream and stream["rows"]:
- rows = stream["rows"]
- affected_room_ids = set(row[1] for row in rows)
- min_stream_id = rows[0][0]
- max_stream_id = stream["position"]
- preserve_fn(pusher_pool.on_new_receipts)(
- min_stream_id, max_stream_id, affected_room_ids
- )
-
- while True:
- try:
- args = store.stream_positions()
- args["timeout"] = 30000
- result = yield http_client.get_json(replication_url, args=args)
- yield store.process_replication(result)
- poke_pushers(result)
- except:
- logger.exception("Error replicating from %r", replication_url)
- yield sleep(30)
+
+ def stop_pusher(self, user_id, app_id, pushkey):
+ key = "%s:%s" % (app_id, pushkey)
+ pushers_for_user = self.pusher_pool.pushers.get(user_id, {})
+ pusher = pushers_for_user.pop(key, None)
+ if pusher is None:
+ return
+ logger.info("Stopping pusher %r / %r", user_id, key)
+ pusher.on_stop()
+
+ def start_pusher(self, user_id, app_id, pushkey):
+ key = "%s:%s" % (app_id, pushkey)
+ logger.info("Starting pusher %r / %r", user_id, key)
+ return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id)
def start(config_options):
@@ -288,7 +257,6 @@ def start(config_options):
reactor.run()
def start():
- ps.replicate()
ps.get_pusherpool().start()
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 34e34e5580..a1ef5dfa77 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -16,7 +16,7 @@
import synapse
-from synapse.api.constants import EventTypes, PresenceState
+from synapse.api.constants import EventTypes
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
@@ -40,15 +40,14 @@ from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.room import RoomStore
+from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine
from synapse.storage.presence import PresenceStore, UserPresenceState
from synapse.storage.roommember import RoomMemberStore
-from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn, \
- PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.stringutils import random_string
@@ -107,11 +106,11 @@ UPDATE_SYNCING_USERS_MS = 10 * 1000
class SynchrotronPresence(object):
def __init__(self, hs):
+ self.hs = hs
self.is_mine_id = hs.is_mine_id
self.http_client = hs.get_simple_http_client()
self.store = hs.get_datastore()
self.user_to_num_current_syncs = {}
- self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
@@ -124,14 +123,8 @@ class SynchrotronPresence(object):
self.process_id = random_string(16)
logger.info("Presence process_id is %r", self.process_id)
- self._sending_sync = False
- self._need_to_send_sync = False
- self.clock.looping_call(
- self._send_syncing_users_regularly,
- UPDATE_SYNCING_USERS_MS,
- )
-
- reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+ def send_user_sync(self, user_id, is_syncing, last_sync_ms):
+ self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
def set_state(self, user, state, ignore_status_msg=False):
# TODO Hows this supposed to work?
@@ -142,15 +135,15 @@ class SynchrotronPresence(object):
_get_interested_parties = PresenceHandler._get_interested_parties.__func__
current_state_for_users = PresenceHandler.current_state_for_users.__func__
- @defer.inlineCallbacks
def user_syncing(self, user_id, affect_presence):
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
- prev_states = yield self.current_state_for_users([user_id])
- if prev_states[user_id].state == PresenceState.OFFLINE:
- # TODO: Don't block the sync request on this HTTP hit.
- yield self._send_syncing_users_now()
+
+ # If we went from no in flight sync to some, notify replication
+ if self.user_to_num_current_syncs[user_id] == 1:
+ now = self.clock.time_msec()
+ self.send_user_sync(user_id, True, now)
def _end():
# We check that the user_id is in user_to_num_current_syncs because
@@ -159,6 +152,11 @@ class SynchrotronPresence(object):
if affect_presence and user_id in self.user_to_num_current_syncs:
self.user_to_num_current_syncs[user_id] -= 1
+ # If we went from one in flight sync to non, notify replication
+ if self.user_to_num_current_syncs[user_id] == 0:
+ now = self.clock.time_msec()
+ self.send_user_sync(user_id, False, now)
+
@contextlib.contextmanager
def _user_syncing():
try:
@@ -166,49 +164,7 @@ class SynchrotronPresence(object):
finally:
_end()
- defer.returnValue(_user_syncing())
-
- @defer.inlineCallbacks
- def _on_shutdown(self):
- # When the synchrotron is shutdown tell the master to clear the in
- # progress syncs for this process
- self.user_to_num_current_syncs.clear()
- yield self._send_syncing_users_now()
-
- def _send_syncing_users_regularly(self):
- # Only send an update if we aren't in the middle of sending one.
- if not self._sending_sync:
- preserve_fn(self._send_syncing_users_now)()
-
- @defer.inlineCallbacks
- def _send_syncing_users_now(self):
- if self._sending_sync:
- # We don't want to race with sending another update.
- # Instead we wait for that update to finish and send another
- # update afterwards.
- self._need_to_send_sync = True
- return
-
- # Flag that we are sending an update.
- self._sending_sync = True
-
- yield self.http_client.post_json_get_json(self.syncing_users_url, {
- "process_id": self.process_id,
- "syncing_users": [
- user_id for user_id, count in self.user_to_num_current_syncs.items()
- if count > 0
- ],
- })
-
- # Unset the flag as we are no longer sending an update.
- self._sending_sync = False
- if self._need_to_send_sync:
- # If something happened while we were sending the update then
- # we might need to send another update.
- # TODO: Check if the update that was sent matches the current state
- # as we only need to send an update if they are different.
- self._need_to_send_sync = False
- yield self._send_syncing_users_now()
+ return defer.succeed(_user_syncing())
@defer.inlineCallbacks
def notify_from_replication(self, states, stream_id):
@@ -223,26 +179,24 @@ class SynchrotronPresence(object):
)
@defer.inlineCallbacks
- def process_replication(self, result):
- stream = result.get("presence", {"rows": []})
- states = []
- for row in stream["rows"]:
- (
- position, user_id, state, last_active_ts,
- last_federation_update_ts, last_user_sync_ts, status_msg,
- currently_active
- ) = row
- state = UserPresenceState(
- user_id, state, last_active_ts,
- last_federation_update_ts, last_user_sync_ts, status_msg,
- currently_active
- )
- self.user_to_current_state[user_id] = state
- states.append(state)
+ def process_replication_rows(self, token, rows):
+ states = [UserPresenceState(
+ row.user_id, row.state, row.last_active_ts,
+ row.last_federation_update_ts, row.last_user_sync_ts, row.status_msg,
+ row.currently_active
+ ) for row in rows]
- if states and "position" in stream:
- stream_id = int(stream["position"])
- yield self.notify_from_replication(states, stream_id)
+ for state in states:
+ self.user_to_current_state[row.user_id] = state
+
+ stream_id = token
+ yield self.notify_from_replication(states, stream_id)
+
+ def get_currently_syncing_users(self):
+ return [
+ user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
+ if count > 0
+ ]
class SynchrotronTyping(object):
@@ -257,16 +211,13 @@ class SynchrotronTyping(object):
# value which we *must* use for the next replication request.
return {"typing": self._latest_room_serial}
- def process_replication(self, result):
- stream = result.get("typing")
- if stream:
- self._latest_room_serial = int(stream["position"])
+ def process_replication_rows(self, token, rows):
+ self._latest_room_serial = token
- for row in stream["rows"]:
- position, room_id, typing_json = row
- typing = json.loads(typing_json)
- self._room_serials[room_id] = position
- self._room_typing[room_id] = typing
+ for row in rows:
+ typing = json.loads(row.user_ids)
+ self._room_serials[row.room_id] = token
+ self._room_typing[row.room_id] = typing
class SynchrotronApplicationService(object):
@@ -351,124 +302,89 @@ class SynchrotronServer(HomeServer):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
- @defer.inlineCallbacks
- def replicate(self):
- http_client = self.get_simple_http_client()
- store = self.get_datastore()
- replication_url = self.config.worker_replication_url
- notifier = self.get_notifier()
- presence_handler = self.get_presence_handler()
- typing_handler = self.get_typing_handler()
-
- def notify_from_stream(
- result, stream_name, stream_key, room=None, user=None
- ):
- stream = result.get(stream_name)
- if stream:
- position_index = stream["field_names"].index("position")
- if room:
- room_index = stream["field_names"].index(room)
- if user:
- user_index = stream["field_names"].index(user)
-
- users = ()
- rooms = ()
- for row in stream["rows"]:
- position = row[position_index]
-
- if user:
- users = (row[user_index],)
-
- if room:
- rooms = (row[room_index],)
-
- notifier.on_new_event(
- stream_key, position, users=users, rooms=rooms
- )
+ self.get_tcp_replication().start_replication(self)
- @defer.inlineCallbacks
- def notify_device_list_update(result):
- stream = result.get("device_lists")
- if not stream:
- return
+ def build_tcp_replication(self):
+ return SyncReplicationHandler(self)
- position_index = stream["field_names"].index("position")
- user_index = stream["field_names"].index("user_id")
+ def build_presence_handler(self):
+ return SynchrotronPresence(self)
- for row in stream["rows"]:
- position = row[position_index]
- user_id = row[user_index]
+ def build_typing_handler(self):
+ return SynchrotronTyping(self)
- room_ids = yield store.get_rooms_for_user(user_id)
- notifier.on_new_event(
- "device_list_key", position, rooms=room_ids,
- )
+class SyncReplicationHandler(ReplicationClientHandler):
+ def __init__(self, hs):
+ super(SyncReplicationHandler, self).__init__(hs.get_datastore())
- @defer.inlineCallbacks
- def notify(result):
- stream = result.get("events")
- if stream:
- max_position = stream["position"]
-
- event_map = yield store.get_events([row[1] for row in stream["rows"]])
-
- for row in stream["rows"]:
- position = row[0]
- event_id = row[1]
- event = event_map.get(event_id, None)
- if not event:
- continue
-
- extra_users = ()
- if event.type == EventTypes.Member:
- extra_users = (event.state_key,)
- notifier.on_new_room_event(
- event, position, max_position, extra_users
- )
+ self.store = hs.get_datastore()
+ self.typing_handler = hs.get_typing_handler()
+ self.presence_handler = hs.get_presence_handler()
+ self.notifier = hs.get_notifier()
- notify_from_stream(
- result, "push_rules", "push_rules_key", user="user_id"
- )
- notify_from_stream(
- result, "user_account_data", "account_data_key", user="user_id"
- )
- notify_from_stream(
- result, "room_account_data", "account_data_key", user="user_id"
+ self.presence_handler.sync_callback = self.send_user_sync
+
+ def on_rdata(self, stream_name, token, rows):
+ super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
+
+ preserve_fn(self.process_and_notify)(stream_name, token, rows)
+
+ def get_streams_to_replicate(self):
+ args = super(SyncReplicationHandler, self).get_streams_to_replicate()
+ args.update(self.typing_handler.stream_positions())
+ return args
+
+ def get_currently_syncing_users(self):
+ return self.presence_handler.get_currently_syncing_users()
+
+ @defer.inlineCallbacks
+ def process_and_notify(self, stream_name, token, rows):
+ if stream_name == "events":
+ # We shouldn't get multiple rows per token for events stream, so
+ # we don't need to optimise this for multiple rows.
+ for row in rows:
+ event = yield self.store.get_event(row.event_id)
+ extra_users = ()
+ if event.type == EventTypes.Member:
+ extra_users = (event.state_key,)
+ max_token = self.store.get_room_max_stream_ordering()
+ self.notifier.on_new_room_event(
+ event, token, max_token, extra_users
+ )
+ elif stream_name == "push_rules":
+ self.notifier.on_new_event(
+ "push_rules_key", token, users=[row.user_id for row in rows],
)
- notify_from_stream(
- result, "tag_account_data", "account_data_key", user="user_id"
+ elif stream_name in ("account_data", "tag_account_data",):
+ self.notifier.on_new_event(
+ "account_data_key", token, users=[row.user_id for row in rows],
)
- notify_from_stream(
- result, "receipts", "receipt_key", room="room_id"
+ elif stream_name == "receipts":
+ self.notifier.on_new_event(
+ "receipt_key", token, rooms=[row.room_id for row in rows],
)
- notify_from_stream(
- result, "typing", "typing_key", room="room_id"
+ elif stream_name == "typing":
+ self.typing_handler.process_replication_rows(token, rows)
+ self.notifier.on_new_event(
+ "typing_key", token, rooms=[row.room_id for row in rows],
)
- notify_from_stream(
- result, "to_device", "to_device_key", user="user_id"
+ elif stream_name == "to_device":
+ entities = [row.entity for row in rows if row.entity.startswith("@")]
+ if entities:
+ self.notifier.on_new_event(
+ "to_device_key", token, users=entities,
+ )
+ elif stream_name == "device_lists":
+ all_room_ids = set()
+ for row in rows:
+ room_ids = yield self.store.get_rooms_for_user(row.user_id)
+ all_room_ids.update(room_ids)
+ self.notifier.on_new_event(
+ "device_list_key", token, rooms=all_room_ids,
)
- yield notify_device_list_update(result)
-
- while True:
- try:
- args = store.stream_positions()
- args.update(typing_handler.stream_positions())
- args["timeout"] = 30000
- result = yield http_client.get_json(replication_url, args=args)
- yield store.process_replication(result)
- typing_handler.process_replication(result)
- yield presence_handler.process_replication(result)
- yield notify(result)
- except:
- logger.exception("Error replicating from %r", replication_url)
- yield sleep(5)
-
- def build_presence_handler(self):
- return SynchrotronPresence(self)
-
- def build_typing_handler(self):
- return SynchrotronTyping(self)
+ elif stream_name == "presence":
+ yield self.presence_handler.process_replication_rows(token, rows)
def start(config_options):
@@ -514,7 +430,6 @@ def start(config_options):
def start():
ss.get_datastore().start_profiling()
- ss.replicate()
ss.get_state_handler().start_caching()
reactor.callWhenRunning(start)
|