From 305d16d61200362f807fa5d97d415043f2a09315 Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Wed, 29 Mar 2017 18:17:42 +0530 Subject: add user friendly report of assertion error in synctl.py Signed-off-by: Anant Prakash --- synapse/app/synctl.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index c045588866..81510bc5c1 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -175,7 +175,8 @@ def main(): worker_app = worker_config["worker_app"] worker_pidfile = worker_config["worker_pid_file"] worker_daemonize = worker_config["worker_daemonize"] - assert worker_daemonize # TODO print something more user friendly + assert worker_daemonize, "In config %r: expected '%s' to be True" % ( + worker_configfile, "worker_daemonize") worker_cache_factor = worker_config.get("synctl_cache_factor") workers.append(Worker( worker_app, worker_configfile, worker_pidfile, worker_cache_factor, -- cgit 1.5.1 From 3ba2859e0c1fb30a22b70c25de89eb4242d14d6e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Mar 2017 13:31:10 +0100 Subject: Add tcp replication listener type and hook it up --- synapse/app/homeserver.py | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 2cdd2d39ff..990eb477e5 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -56,6 +56,7 @@ from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.metrics import register_memory_metrics, get_metrics_for from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX +from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.federation.transport.server import TransportLayerServer from synapse.util.rlimit import change_resource_limit @@ -222,6 +223,16 @@ class SynapseHomeServer(HomeServer): ), interface=address ) + elif listener["type"] == "replication": + bind_addresses = listener["bind_addresses"] + for address in bind_addresses: + factory = ReplicationStreamProtocolFactory(self) + server_listener = reactor.listenTCP( + listener["port"], factory, interface=address + ) + reactor.addSystemEventTrigger( + "before", "shutdown", server_listener.stopListening, + ) else: logger.warn("Unrecognized listener type: %s", listener["type"]) -- cgit 1.5.1 From 36c28bc467e53cf7318964a4dc986428644f27ed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 16:33:44 +0100 Subject: Update all the workers and master to use TCP replication --- synapse/app/appservice.py | 46 +++--- synapse/app/client_reader.py | 22 +-- synapse/app/federation_reader.py | 22 +-- synapse/app/federation_sender.py | 110 ++++++++------ synapse/app/media_repository.py | 22 +-- synapse/app/pusher.py | 125 ++++++---------- synapse/app/synchrotron.py | 306 ++++++++++++++------------------------- synapse/config/workers.py | 3 + synapse/server.py | 4 + 9 files changed, 259 insertions(+), 401 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index a6f1e7594e..13b69d8dee 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -26,8 +26,8 @@ from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine -from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole @@ -36,7 +36,7 @@ from synapse.util.versionstring import get_version_string from synapse import events -from twisted.internet import reactor, defer +from twisted.internet import reactor from twisted.web.resource import Resource from daemonize import Daemonize @@ -120,30 +120,23 @@ class AppserviceServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) - @defer.inlineCallbacks - def replicate(self): - http_client = self.get_simple_http_client() - store = self.get_datastore() - replication_url = self.config.worker_replication_url - appservice_handler = self.get_application_service_handler() - - @defer.inlineCallbacks - def replicate(results): - stream = results.get("events") - if stream: - max_stream_id = stream["position"] - yield appservice_handler.notify_interested_services(max_stream_id) - - while True: - try: - args = store.stream_positions() - args["timeout"] = 30000 - result = yield http_client.get_json(replication_url, args=args) - yield store.process_replication(result) - replicate(result) - except: - logger.exception("Error replicating from %r", replication_url) - yield sleep(30) + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return ASReplicationHandler(self) + + +class ASReplicationHandler(ReplicationClientHandler): + def __init__(self, hs): + super(ASReplicationHandler, self).__init__(hs.get_datastore()) + self.appservice_handler = hs.get_application_service_handler() + + def on_rdata(self, stream_name, token, rows): + super(ASReplicationHandler, self).on_rdata(stream_name, token, rows) + + if stream_name == "events": + max_stream_id = self.store.get_room_max_stream_ordering() + self.appservice_handler.notify_interested_services(max_stream_id) def start(config_options): @@ -199,7 +192,6 @@ def start(config_options): reactor.run() def start(): - ps.replicate() ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index e4ea3ab933..9b72c649ac 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -30,11 +30,11 @@ from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v1.room import PublicRoomListRestServlet from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine -from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole @@ -45,7 +45,7 @@ from synapse.crypto import context_factory from synapse import events -from twisted.internet import reactor, defer +from twisted.internet import reactor from twisted.web.resource import Resource from daemonize import Daemonize @@ -145,21 +145,10 @@ class ClientReaderServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) - @defer.inlineCallbacks - def replicate(self): - http_client = self.get_simple_http_client() - store = self.get_datastore() - replication_url = self.config.worker_replication_url + self.get_tcp_replication().start_replication(self) - while True: - try: - args = store.stream_positions() - args["timeout"] = 30000 - result = yield http_client.get_json(replication_url, args=args) - yield store.process_replication(result) - except: - logger.exception("Error replicating from %r", replication_url) - yield sleep(5) + def build_tcp_replication(self): + return ReplicationClientHandler(self.get_datastore()) def start(config_options): @@ -209,7 +198,6 @@ def start(config_options): def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() - ss.replicate() reactor.callWhenRunning(start) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index e52b0f240d..eb392e1c9d 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -27,9 +27,9 @@ from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.directory import DirectoryStore +from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine -from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole @@ -42,7 +42,7 @@ from synapse.crypto import context_factory from synapse import events -from twisted.internet import reactor, defer +from twisted.internet import reactor from twisted.web.resource import Resource from daemonize import Daemonize @@ -134,21 +134,10 @@ class FederationReaderServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) - @defer.inlineCallbacks - def replicate(self): - http_client = self.get_simple_http_client() - store = self.get_datastore() - replication_url = self.config.worker_replication_url + self.get_tcp_replication().start_replication(self) - while True: - try: - args = store.stream_positions() - args["timeout"] = 30000 - result = yield http_client.get_json(replication_url, args=args) - yield store.process_replication(result) - except: - logger.exception("Error replicating from %r", replication_url) - yield sleep(5) + def build_tcp_replication(self): + return ReplicationClientHandler(self.get_datastore()) def start(config_options): @@ -198,7 +187,6 @@ def start(config_options): def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() - ss.replicate() reactor.callWhenRunning(start) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 76c4cc54d1..8994891aeb 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -31,9 +31,10 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore +from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState -from synapse.util.async import sleep +from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole @@ -59,7 +60,23 @@ class FederationSenderSlaveStore( SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, SlavedRegistrationStore, SlavedDeviceStore, ): - pass + def __init__(self, db_conn, hs): + super(FederationSenderSlaveStore, self).__init__(db_conn, hs) + self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) + + def _get_federation_out_pos(self, db_conn): + sql = ( + "SELECT stream_id FROM federation_stream_position" + " WHERE type = ?" + ) + sql = self.database_engine.convert_param_style(sql) + + txn = db_conn.cursor() + txn.execute(sql, ("federation",)) + rows = txn.fetchall() + txn.close() + + return rows[0][0] if rows else -1 class FederationSenderServer(HomeServer): @@ -127,26 +144,29 @@ class FederationSenderServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) - @defer.inlineCallbacks - def replicate(self): - http_client = self.get_simple_http_client() - store = self.get_datastore() - replication_url = self.config.worker_replication_url - send_handler = FederationSenderHandler(self) - - send_handler.on_start() - - while True: - try: - args = store.stream_positions() - args.update((yield send_handler.stream_positions())) - args["timeout"] = 30000 - result = yield http_client.get_json(replication_url, args=args) - yield store.process_replication(result) - yield send_handler.process_replication(result) - except: - logger.exception("Error replicating from %r", replication_url) - yield sleep(30) + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return FederationSenderReplicationHandler(self) + + +class FederationSenderReplicationHandler(ReplicationClientHandler): + def __init__(self, hs): + super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore()) + self.send_handler = FederationSenderHandler(hs) + + def on_rdata(self, stream_name, token, rows): + super(FederationSenderReplicationHandler, self).on_rdata( + stream_name, token, rows + ) + self.send_handler.process_replication_rows(stream_name, token, rows) + if stream_name == "federation": + self.send_federation_ack(token) + + def get_streams_to_replicate(self): + args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate() + args.update(self.send_handler.stream_positions()) + return args def start(config_options): @@ -205,7 +225,6 @@ def start(config_options): reactor.run() def start(): - ps.replicate() ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() @@ -233,6 +252,9 @@ class FederationSenderHandler(object): self.store = hs.get_datastore() self.federation_sender = hs.get_federation_sender() + self.federation_position = self.store.federation_out_pos_startup + self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") + self._room_serials = {} self._room_typing = {} @@ -243,25 +265,13 @@ class FederationSenderHandler(object): self.store.get_room_max_stream_ordering() ) - @defer.inlineCallbacks def stream_positions(self): - stream_id = yield self.store.get_federation_out_pos("federation") - defer.returnValue({ - "federation": stream_id, - - # Ack stuff we've "processed", this should only be called from - # one process. - "federation_ack": stream_id, - }) + return {"federation": self.federation_position} - @defer.inlineCallbacks - def process_replication(self, result): + def process_replication_rows(self, stream_name, token, rows): # The federation stream contains things that we want to send out, e.g. # presence, typing, etc. - fed_stream = result.get("federation") - if fed_stream: - latest_id = int(fed_stream["position"]) - + if stream_name == "federation": # The federation stream containis a bunch of different types of # rows that need to be handled differently. We parse the rows, put # them into the appropriate collection and then send them off. @@ -272,8 +282,9 @@ class FederationSenderHandler(object): device_destinations = set() # Parse the rows in the stream - for row in fed_stream["rows"]: - position, typ, content_js = row + for row in rows: + typ = row.type + content_js = row.data content = json.loads(content_js) if typ == send_queue.PRESENCE_TYPE: @@ -325,16 +336,19 @@ class FederationSenderHandler(object): for destination in device_destinations: self.federation_sender.send_device_messages(destination) - # Record where we are in the stream. - yield self.store.update_federation_out_pos( - "federation", latest_id - ) + self.update_token(token) # We also need to poke the federation sender when new events happen - event_stream = result.get("events") - if event_stream: - latest_pos = event_stream["position"] - self.federation_sender.notify_new_events(latest_pos) + elif stream_name == "events": + self.federation_sender.notify_new_events(token) + + @defer.inlineCallbacks + def update_token(self, token): + self.federation_position = token + with (yield self._fed_position_linearizer.queue(None)): + yield self.store.update_federation_out_pos( + "federation", self.federation_position + ) if __name__ == '__main__': diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 1444e69a42..26c4416956 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -25,13 +25,13 @@ from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine from synapse.storage.media_repository import MediaRepositoryStore -from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole @@ -45,7 +45,7 @@ from synapse.crypto import context_factory from synapse import events -from twisted.internet import reactor, defer +from twisted.internet import reactor from twisted.web.resource import Resource from daemonize import Daemonize @@ -142,21 +142,10 @@ class MediaRepositoryServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) - @defer.inlineCallbacks - def replicate(self): - http_client = self.get_simple_http_client() - store = self.get_datastore() - replication_url = self.config.worker_replication_url + self.get_tcp_replication().start_replication(self) - while True: - try: - args = store.stream_positions() - args["timeout"] = 30000 - result = yield http_client.get_json(replication_url, args=args) - yield store.process_replication(result) - except: - logger.exception("Error replicating from %r", replication_url) - yield sleep(5) + def build_tcp_replication(self): + return ReplicationClientHandler(self.get_datastore()) def start(config_options): @@ -206,7 +195,6 @@ def start(config_options): def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() - ss.replicate() reactor.callWhenRunning(start) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index ab682e52ec..cb76f058b0 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -27,9 +27,9 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore +from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine from synapse.storage import DataStore -from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, preserve_fn, \ PreserveLoggingContext @@ -39,7 +39,7 @@ from synapse.util.versionstring import get_version_string from synapse import events -from twisted.internet import reactor, defer +from twisted.internet import reactor from twisted.web.resource import Resource from daemonize import Daemonize @@ -89,7 +89,6 @@ class PusherSlaveStore( class PusherServer(HomeServer): - def get_db_conn(self, run_new_connection=True): # Any param beginning with cp_ is a parameter for adbapi, and should # not be passed to the database engine. @@ -109,16 +108,7 @@ class PusherServer(HomeServer): logger.info("Finished setting up.") def remove_pusher(self, app_id, push_key, user_id): - http_client = self.get_simple_http_client() - replication_url = self.config.worker_replication_url - url = replication_url + "/remove_pushers" - return http_client.post_json_get_json(url, { - "remove": [{ - "app_id": app_id, - "push_key": push_key, - "user_id": user_id, - }] - }) + self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id) def _listen_http(self, listener_config): port = listener_config["port"] @@ -166,73 +156,51 @@ class PusherServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) - @defer.inlineCallbacks - def replicate(self): - http_client = self.get_simple_http_client() - store = self.get_datastore() - replication_url = self.config.worker_replication_url - pusher_pool = self.get_pusherpool() - - def stop_pusher(user_id, app_id, pushkey): - key = "%s:%s" % (app_id, pushkey) - pushers_for_user = pusher_pool.pushers.get(user_id, {}) - pusher = pushers_for_user.pop(key, None) - if pusher is None: - return - logger.info("Stopping pusher %r / %r", user_id, key) - pusher.on_stop() - - def start_pusher(user_id, app_id, pushkey): - key = "%s:%s" % (app_id, pushkey) - logger.info("Starting pusher %r / %r", user_id, key) - return pusher_pool._refresh_pusher(app_id, pushkey, user_id) - - @defer.inlineCallbacks - def poke_pushers(results): - pushers_rows = set( - map(tuple, results.get("pushers", {}).get("rows", [])) + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return PusherReplicationHandler(self) + + +class PusherReplicationHandler(ReplicationClientHandler): + def __init__(self, hs): + super(PusherReplicationHandler, self).__init__(hs.get_datastore()) + + self.pusher_pool = hs.get_pusherpool() + + def on_rdata(self, stream_name, token, rows): + super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) + self.poke_pushers(stream_name, token, rows) + + def poke_pushers(self, stream_name, token, rows): + if stream_name == "pushers": + for row in rows: + if row.deleted: + self.stop_pusher(row.user_id, row.app_id, row.pushkey) + else: + self.start_pusher(row.user_id, row.app_id, row.pushkey) + elif stream_name == "events": + preserve_fn(self.pusher_pool.on_new_notifications)( + token, token, ) - deleted_pushers_rows = set( - map(tuple, results.get("deleted_pushers", {}).get("rows", [])) + elif stream_name == "receipts": + preserve_fn(self.pusher_pool.on_new_receipts)( + token, token, set(row.room_id for row in rows) ) - for row in sorted(pushers_rows | deleted_pushers_rows): - if row in deleted_pushers_rows: - user_id, app_id, pushkey = row[1:4] - stop_pusher(user_id, app_id, pushkey) - elif row in pushers_rows: - user_id = row[1] - app_id = row[5] - pushkey = row[8] - yield start_pusher(user_id, app_id, pushkey) - - stream = results.get("events") - if stream and stream["rows"]: - min_stream_id = stream["rows"][0][0] - max_stream_id = stream["position"] - preserve_fn(pusher_pool.on_new_notifications)( - min_stream_id, max_stream_id - ) - - stream = results.get("receipts") - if stream and stream["rows"]: - rows = stream["rows"] - affected_room_ids = set(row[1] for row in rows) - min_stream_id = rows[0][0] - max_stream_id = stream["position"] - preserve_fn(pusher_pool.on_new_receipts)( - min_stream_id, max_stream_id, affected_room_ids - ) - - while True: - try: - args = store.stream_positions() - args["timeout"] = 30000 - result = yield http_client.get_json(replication_url, args=args) - yield store.process_replication(result) - poke_pushers(result) - except: - logger.exception("Error replicating from %r", replication_url) - yield sleep(30) + + def stop_pusher(self, user_id, app_id, pushkey): + key = "%s:%s" % (app_id, pushkey) + pushers_for_user = self.pusher_pool.pushers.get(user_id, {}) + pusher = pushers_for_user.pop(key, None) + if pusher is None: + return + logger.info("Stopping pusher %r / %r", user_id, key) + pusher.on_stop() + + def start_pusher(self, user_id, app_id, pushkey): + key = "%s:%s" % (app_id, pushkey) + logger.info("Starting pusher %r / %r", user_id, key) + return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id) def start(config_options): @@ -288,7 +256,6 @@ def start(config_options): reactor.run() def start(): - ps.replicate() ps.get_pusherpool().start() ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 34e34e5580..c9c48f324f 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -16,7 +16,7 @@ import synapse -from synapse.api.constants import EventTypes, PresenceState +from synapse.api.constants import EventTypes from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging @@ -40,15 +40,14 @@ from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine from synapse.storage.presence import PresenceStore, UserPresenceState from synapse.storage.roommember import RoomMemberStore -from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn, \ - PreserveLoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string @@ -111,7 +110,6 @@ class SynchrotronPresence(object): self.http_client = hs.get_simple_http_client() self.store = hs.get_datastore() self.user_to_num_current_syncs = {} - self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users" self.clock = hs.get_clock() self.notifier = hs.get_notifier() @@ -124,14 +122,7 @@ class SynchrotronPresence(object): self.process_id = random_string(16) logger.info("Presence process_id is %r", self.process_id) - self._sending_sync = False - self._need_to_send_sync = False - self.clock.looping_call( - self._send_syncing_users_regularly, - UPDATE_SYNCING_USERS_MS, - ) - - reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown) + self.sync_callback = None def set_state(self, user, state, ignore_status_msg=False): # TODO Hows this supposed to work? @@ -142,15 +133,15 @@ class SynchrotronPresence(object): _get_interested_parties = PresenceHandler._get_interested_parties.__func__ current_state_for_users = PresenceHandler.current_state_for_users.__func__ - @defer.inlineCallbacks def user_syncing(self, user_id, affect_presence): if affect_presence: curr_sync = self.user_to_num_current_syncs.get(user_id, 0) self.user_to_num_current_syncs[user_id] = curr_sync + 1 - prev_states = yield self.current_state_for_users([user_id]) - if prev_states[user_id].state == PresenceState.OFFLINE: - # TODO: Don't block the sync request on this HTTP hit. - yield self._send_syncing_users_now() + + if self.sync_callback: + if self.user_to_num_current_syncs[user_id] == 1: + now = self.clock.time_msec() + self.sync_callback(user_id, True, now) def _end(): # We check that the user_id is in user_to_num_current_syncs because @@ -159,6 +150,11 @@ class SynchrotronPresence(object): if affect_presence and user_id in self.user_to_num_current_syncs: self.user_to_num_current_syncs[user_id] -= 1 + if self.sync_callback: + if self.user_to_num_current_syncs[user_id] == 0: + now = self.clock.time_msec() + self.sync_callback(user_id, False, now) + @contextlib.contextmanager def _user_syncing(): try: @@ -166,49 +162,7 @@ class SynchrotronPresence(object): finally: _end() - defer.returnValue(_user_syncing()) - - @defer.inlineCallbacks - def _on_shutdown(self): - # When the synchrotron is shutdown tell the master to clear the in - # progress syncs for this process - self.user_to_num_current_syncs.clear() - yield self._send_syncing_users_now() - - def _send_syncing_users_regularly(self): - # Only send an update if we aren't in the middle of sending one. - if not self._sending_sync: - preserve_fn(self._send_syncing_users_now)() - - @defer.inlineCallbacks - def _send_syncing_users_now(self): - if self._sending_sync: - # We don't want to race with sending another update. - # Instead we wait for that update to finish and send another - # update afterwards. - self._need_to_send_sync = True - return - - # Flag that we are sending an update. - self._sending_sync = True - - yield self.http_client.post_json_get_json(self.syncing_users_url, { - "process_id": self.process_id, - "syncing_users": [ - user_id for user_id, count in self.user_to_num_current_syncs.items() - if count > 0 - ], - }) - - # Unset the flag as we are no longer sending an update. - self._sending_sync = False - if self._need_to_send_sync: - # If something happened while we were sending the update then - # we might need to send another update. - # TODO: Check if the update that was sent matches the current state - # as we only need to send an update if they are different. - self._need_to_send_sync = False - yield self._send_syncing_users_now() + return defer.succeed(_user_syncing()) @defer.inlineCallbacks def notify_from_replication(self, states, stream_id): @@ -223,26 +177,24 @@ class SynchrotronPresence(object): ) @defer.inlineCallbacks - def process_replication(self, result): - stream = result.get("presence", {"rows": []}) - states = [] - for row in stream["rows"]: - ( - position, user_id, state, last_active_ts, - last_federation_update_ts, last_user_sync_ts, status_msg, - currently_active - ) = row - state = UserPresenceState( - user_id, state, last_active_ts, - last_federation_update_ts, last_user_sync_ts, status_msg, - currently_active - ) - self.user_to_current_state[user_id] = state - states.append(state) + def process_replication_rows(self, token, rows): + states = [UserPresenceState( + row.user_id, row.state, row.last_active_ts, + row.last_federation_update_ts, row.last_user_sync_ts, row.status_msg, + row.currently_active + ) for row in rows] - if states and "position" in stream: - stream_id = int(stream["position"]) - yield self.notify_from_replication(states, stream_id) + for state in states: + self.user_to_current_state[row.user_id] = state + + stream_id = token + yield self.notify_from_replication(states, stream_id) + + def get_currently_syncing_users(self): + return [ + user_id for user_id, count in self.user_to_num_current_syncs.iteritems() + if count > 0 + ] class SynchrotronTyping(object): @@ -257,16 +209,13 @@ class SynchrotronTyping(object): # value which we *must* use for the next replication request. return {"typing": self._latest_room_serial} - def process_replication(self, result): - stream = result.get("typing") - if stream: - self._latest_room_serial = int(stream["position"]) + def process_replication_rows(self, token, rows): + self._latest_room_serial = token - for row in stream["rows"]: - position, room_id, typing_json = row - typing = json.loads(typing_json) - self._room_serials[room_id] = position - self._room_typing[room_id] = typing + for row in rows: + typing = json.loads(row.user_ids) + self._room_serials[row.room_id] = token + self._room_typing[row.room_id] = typing class SynchrotronApplicationService(object): @@ -351,124 +300,90 @@ class SynchrotronServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) - @defer.inlineCallbacks - def replicate(self): - http_client = self.get_simple_http_client() - store = self.get_datastore() - replication_url = self.config.worker_replication_url - notifier = self.get_notifier() - presence_handler = self.get_presence_handler() - typing_handler = self.get_typing_handler() - - def notify_from_stream( - result, stream_name, stream_key, room=None, user=None - ): - stream = result.get(stream_name) - if stream: - position_index = stream["field_names"].index("position") - if room: - room_index = stream["field_names"].index(room) - if user: - user_index = stream["field_names"].index(user) - - users = () - rooms = () - for row in stream["rows"]: - position = row[position_index] - - if user: - users = (row[user_index],) - - if room: - rooms = (row[room_index],) - - notifier.on_new_event( - stream_key, position, users=users, rooms=rooms - ) + self.get_tcp_replication().start_replication(self) - @defer.inlineCallbacks - def notify_device_list_update(result): - stream = result.get("device_lists") - if not stream: - return + def build_tcp_replication(self): + return SyncReplicationHandler(self) - position_index = stream["field_names"].index("position") - user_index = stream["field_names"].index("user_id") + def build_presence_handler(self): + return SynchrotronPresence(self) - for row in stream["rows"]: - position = row[position_index] - user_id = row[user_index] + def build_typing_handler(self): + return SynchrotronTyping(self) - room_ids = yield store.get_rooms_for_user(user_id) - notifier.on_new_event( - "device_list_key", position, rooms=room_ids, - ) +class SyncReplicationHandler(ReplicationClientHandler): + def __init__(self, hs): + super(SyncReplicationHandler, self).__init__(hs.get_datastore()) - @defer.inlineCallbacks - def notify(result): - stream = result.get("events") - if stream: - max_position = stream["position"] - - event_map = yield store.get_events([row[1] for row in stream["rows"]]) - - for row in stream["rows"]: - position = row[0] - event_id = row[1] - event = event_map.get(event_id, None) - if not event: - continue - - extra_users = () - if event.type == EventTypes.Member: - extra_users = (event.state_key,) - notifier.on_new_room_event( - event, position, max_position, extra_users - ) + self.store = hs.get_datastore() + self.typing_handler = hs.get_typing_handler() + self.presence_handler = hs.get_presence_handler() + self.notifier = hs.get_notifier() - notify_from_stream( - result, "push_rules", "push_rules_key", user="user_id" - ) - notify_from_stream( - result, "user_account_data", "account_data_key", user="user_id" - ) - notify_from_stream( - result, "room_account_data", "account_data_key", user="user_id" + self.presence_handler.sync_callback = self.send_user_sync + + def on_rdata(self, stream_name, token, rows): + super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) + + if stream_name == "typing": + self.typing_handler.process_replication_rows(token, rows) + elif stream_name == "presence": + self.presence_handler.process_replication_rows(token, rows) + self.notify(stream_name, token, rows) + + def get_streams_to_replicate(self): + args = super(SyncReplicationHandler, self).get_streams_to_replicate() + args.update(self.typing_handler.stream_positions()) + return args + + def get_currently_syncing_users(self): + return self.presence_handler.get_currently_syncing_users() + + @defer.inlineCallbacks + def notify(self, stream_name, token, rows): + if stream_name == "events": + # We shouldn't get multiple rows per token for events stream, so + # we don't need to optimise this for multiple rows. + for row in rows: + event = yield self.store.get_event(row.event_id) + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + max_token = self.store.get_room_max_stream_ordering() + self.notifier.on_new_room_event( + event, token, max_token, extra_users + ) + elif stream_name == "push_rules": + self.notifier.on_new_event( + "push_rules_key", token, users=[row.user_id for row in rows], ) - notify_from_stream( - result, "tag_account_data", "account_data_key", user="user_id" + elif stream_name in ("account_data", "tag_account_data",): + self.notifier.on_new_event( + "account_data_key", token, users=[row.user_id for row in rows], ) - notify_from_stream( - result, "receipts", "receipt_key", room="room_id" + elif stream_name == "receipts": + self.notifier.on_new_event( + "receipt_key", token, rooms=[row.room_id for row in rows], ) - notify_from_stream( - result, "typing", "typing_key", room="room_id" + elif stream_name == "typing": + self.notifier.on_new_event( + "typing_key", token, rooms=[row.room_id for row in rows], ) - notify_from_stream( - result, "to_device", "to_device_key", user="user_id" + elif stream_name == "to_device": + entities = [row.entity for row in rows if row.entity.startswith("@")] + if entities: + self.notifier.on_new_event( + "to_device_key", token, users=entities, + ) + elif stream_name == "device_lists": + all_room_ids = set() + for row in rows: + room_ids = yield self.store.get_rooms_for_user(row.user_id) + all_room_ids.update(room_ids) + self.notifier.on_new_event( + "device_list_key", token, rooms=all_room_ids, ) - yield notify_device_list_update(result) - - while True: - try: - args = store.stream_positions() - args.update(typing_handler.stream_positions()) - args["timeout"] = 30000 - result = yield http_client.get_json(replication_url, args=args) - yield store.process_replication(result) - typing_handler.process_replication(result) - yield presence_handler.process_replication(result) - yield notify(result) - except: - logger.exception("Error replicating from %r", replication_url) - yield sleep(5) - - def build_presence_handler(self): - return SynchrotronPresence(self) - - def build_typing_handler(self): - return SynchrotronTyping(self) def start(config_options): @@ -514,7 +429,6 @@ def start(config_options): def start(): ss.get_datastore().start_profiling() - ss.replicate() ss.get_state_handler().start_caching() reactor.callWhenRunning(start) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index b165c67ee7..ad06ba9691 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -29,6 +29,9 @@ class WorkerConfig(Config): self.worker_log_file = config.get("worker_log_file") self.worker_log_config = config.get("worker_log_config") self.worker_replication_url = config.get("worker_replication_url") + self.worker_replication_host = config.get("worker_replication_host", None) + self.worker_replication_port = config.get("worker_replication_port", None) + self.worker_name = config.get("worker_name", self.worker_app) if self.worker_listeners: for listener in self.worker_listeners: diff --git a/synapse/server.py b/synapse/server.py index c577032041..6310152560 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -132,6 +132,7 @@ class HomeServer(object): 'federation_sender', 'receipts_handler', 'macaroon_generator', + 'tcp_replication', ] def __init__(self, hostname, **kwargs): @@ -290,6 +291,9 @@ class HomeServer(object): def build_receipts_handler(self): return ReceiptsHandler(self) + def build_tcp_replication(self): + raise NotImplementedError() + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) -- cgit 1.5.1 From 6ce6bbedcb8721fc99200ecace94eeedb51da6f2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Apr 2017 10:29:29 +0100 Subject: Move where we ack federation --- synapse/app/federation_sender.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 8994891aeb..705fe5e3df 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -153,15 +153,13 @@ class FederationSenderServer(HomeServer): class FederationSenderReplicationHandler(ReplicationClientHandler): def __init__(self, hs): super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore()) - self.send_handler = FederationSenderHandler(hs) + self.send_handler = FederationSenderHandler(hs, self) def on_rdata(self, stream_name, token, rows): super(FederationSenderReplicationHandler, self).on_rdata( stream_name, token, rows ) self.send_handler.process_replication_rows(stream_name, token, rows) - if stream_name == "federation": - self.send_federation_ack(token) def get_streams_to_replicate(self): args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate() @@ -248,13 +246,16 @@ class FederationSenderHandler(object): """Processes the replication stream and forwards the appropriate entries to the federation sender. """ - def __init__(self, hs): + def __init__(self, hs, replication_client): self.store = hs.get_datastore() self.federation_sender = hs.get_federation_sender() + self.replication_client = replication_client self.federation_position = self.store.federation_out_pos_startup self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") + self._last_ack = self.federation_position + self._room_serials = {} self._room_typing = {} @@ -345,10 +346,18 @@ class FederationSenderHandler(object): @defer.inlineCallbacks def update_token(self, token): self.federation_position = token + + # We linearize here to ensure we don't have races updating the token with (yield self._fed_position_linearizer.queue(None)): - yield self.store.update_federation_out_pos( - "federation", self.federation_position - ) + if self._last_ack < self.federation_position: + yield self.store.update_federation_out_pos( + "federation", self.federation_position + ) + + # We ACK this token over replication so that the master can drop + # its in memory queues + self.replication_client.send_federation_ack(self.federation_position) + self._last_ack = self.federation_position if __name__ == '__main__': -- cgit 1.5.1 From 3376f160124ff2f8915341c90a24ba83495fb8d9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Apr 2017 11:09:26 +0100 Subject: Shuffle and comment synchrotron presence --- synapse/app/synchrotron.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index c9c48f324f..7341970a81 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -106,6 +106,7 @@ UPDATE_SYNCING_USERS_MS = 10 * 1000 class SynchrotronPresence(object): def __init__(self, hs): + self.hs = hs self.is_mine_id = hs.is_mine_id self.http_client = hs.get_simple_http_client() self.store = hs.get_datastore() @@ -122,7 +123,8 @@ class SynchrotronPresence(object): self.process_id = random_string(16) logger.info("Presence process_id is %r", self.process_id) - self.sync_callback = None + def send_user_sync(self, user_id, is_syncing, last_sync_ms): + self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms) def set_state(self, user, state, ignore_status_msg=False): # TODO Hows this supposed to work? @@ -138,10 +140,10 @@ class SynchrotronPresence(object): curr_sync = self.user_to_num_current_syncs.get(user_id, 0) self.user_to_num_current_syncs[user_id] = curr_sync + 1 - if self.sync_callback: - if self.user_to_num_current_syncs[user_id] == 1: - now = self.clock.time_msec() - self.sync_callback(user_id, True, now) + # If we went from no in flight sync to some, notify replication + if self.user_to_num_current_syncs[user_id] == 1: + now = self.clock.time_msec() + self.send_user_sync(user_id, True, now) def _end(): # We check that the user_id is in user_to_num_current_syncs because @@ -150,10 +152,10 @@ class SynchrotronPresence(object): if affect_presence and user_id in self.user_to_num_current_syncs: self.user_to_num_current_syncs[user_id] -= 1 - if self.sync_callback: - if self.user_to_num_current_syncs[user_id] == 0: - now = self.clock.time_msec() - self.sync_callback(user_id, False, now) + # If we went from one in flight sync to non, notify replication + if self.user_to_num_current_syncs[user_id] == 0: + now = self.clock.time_msec() + self.send_user_sync(user_id, False, now) @contextlib.contextmanager def _user_syncing(): -- cgit 1.5.1 From ac66e11f2b1235f801195ad0065008cdca2f1b0b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Apr 2017 15:22:54 +0100 Subject: Add the appropriate amount of preserve_fn --- synapse/app/appservice.py | 6 ++++-- synapse/app/federation_sender.py | 4 ++-- synapse/app/pusher.py | 13 +++++++------ synapse/app/synchrotron.py | 13 ++++++------- 4 files changed, 19 insertions(+), 17 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 13b69d8dee..9a476efa63 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -29,7 +29,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -136,7 +136,9 @@ class ASReplicationHandler(ReplicationClientHandler): if stream_name == "events": max_stream_id = self.store.get_room_max_stream_ordering() - self.appservice_handler.notify_interested_services(max_stream_id) + preserve_fn( + self.appservice_handler.notify_interested_services + )(max_stream_id) def start(config_options): diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 705fe5e3df..fa7b19e53a 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -36,7 +36,7 @@ from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -337,7 +337,7 @@ class FederationSenderHandler(object): for destination in device_destinations: self.federation_sender.send_device_messages(destination) - self.update_token(token) + preserve_fn(self.update_token)(token) # We also need to poke the federation sender when new events happen elif stream_name == "events": diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index cb76f058b0..f9114acfcb 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -39,7 +39,7 @@ from synapse.util.versionstring import get_version_string from synapse import events -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import Resource from daemonize import Daemonize @@ -170,21 +170,22 @@ class PusherReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) - self.poke_pushers(stream_name, token, rows) + preserve_fn(self.poke_pushers)(stream_name, token, rows) + @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): if stream_name == "pushers": for row in rows: if row.deleted: - self.stop_pusher(row.user_id, row.app_id, row.pushkey) + yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) else: - self.start_pusher(row.user_id, row.app_id, row.pushkey) + yield self.start_pusher(row.user_id, row.app_id, row.pushkey) elif stream_name == "events": - preserve_fn(self.pusher_pool.on_new_notifications)( + yield self.pusher_pool.on_new_notifications( token, token, ) elif stream_name == "receipts": - preserve_fn(self.pusher_pool.on_new_receipts)( + yield self.pusher_pool.on_new_receipts( token, token, set(row.room_id for row in rows) ) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 7341970a81..a1ef5dfa77 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -47,7 +47,7 @@ from synapse.storage.engines import create_engine from synapse.storage.presence import PresenceStore, UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string @@ -328,11 +328,7 @@ class SyncReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) - if stream_name == "typing": - self.typing_handler.process_replication_rows(token, rows) - elif stream_name == "presence": - self.presence_handler.process_replication_rows(token, rows) - self.notify(stream_name, token, rows) + preserve_fn(self.process_and_notify)(stream_name, token, rows) def get_streams_to_replicate(self): args = super(SyncReplicationHandler, self).get_streams_to_replicate() @@ -343,7 +339,7 @@ class SyncReplicationHandler(ReplicationClientHandler): return self.presence_handler.get_currently_syncing_users() @defer.inlineCallbacks - def notify(self, stream_name, token, rows): + def process_and_notify(self, stream_name, token, rows): if stream_name == "events": # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows. @@ -369,6 +365,7 @@ class SyncReplicationHandler(ReplicationClientHandler): "receipt_key", token, rooms=[row.room_id for row in rows], ) elif stream_name == "typing": + self.typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( "typing_key", token, rooms=[row.room_id for row in rows], ) @@ -386,6 +383,8 @@ class SyncReplicationHandler(ReplicationClientHandler): self.notifier.on_new_event( "device_list_key", token, rooms=all_room_ids, ) + elif stream_name == "presence": + yield self.presence_handler.process_replication_rows(token, rows) def start(config_options): -- cgit 1.5.1 From d1d5362267adb316e65f046787207aa7d862ec3b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Apr 2017 16:41:03 +0100 Subject: Add comment --- synapse/app/federation_sender.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index fa7b19e53a..cbddc80ca9 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -62,6 +62,11 @@ class FederationSenderSlaveStore( ): def __init__(self, db_conn, hs): super(FederationSenderSlaveStore, self).__init__(db_conn, hs) + + # We pull out the current federation stream position now so that we + # always have a known value for the federation position in memory so + # that we don't have to bounce via a deferred once when we start the + # replication streams. self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) def _get_federation_out_pos(self, db_conn): -- cgit 1.5.1 From f10ce8944b8ef4c33694654c397bfcda44c6124a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Apr 2017 11:10:28 +0100 Subject: Don't double json encode federation replication data --- synapse/app/federation_sender.py | 4 +--- synapse/federation/send_queue.py | 19 +++++++++---------- 2 files changed, 10 insertions(+), 13 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index cbddc80ca9..145c01f3a3 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -51,7 +51,6 @@ from daemonize import Daemonize import sys import logging import gc -import ujson as json logger = logging.getLogger("synapse.app.appservice") @@ -290,8 +289,7 @@ class FederationSenderHandler(object): # Parse the rows in the stream for row in rows: typ = row.type - content_js = row.data - content = json.loads(content_js) + content = row.data if typ == send_queue.PRESENCE_TYPE: destination = content["destination"] diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 4bde66fbf8..78c852ed69 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -35,7 +35,6 @@ from synapse.util.metrics import Measure import synapse.metrics from blist import sorteddict -import ujson metrics = synapse.metrics.get_metrics_for(__name__) @@ -258,10 +257,10 @@ class FederationRemoteSendQueue(object): ) for (key, (dest, user_id)) in dest_user_ids: - rows.append((key, PRESENCE_TYPE, ujson.dumps({ + rows.append((key, PRESENCE_TYPE, { "destination": dest, "state": self.presence_map[user_id].as_dict(), - }))) + })) # Fetch changes keyed edus keys = self.keyed_edu_changed.keys() @@ -271,10 +270,10 @@ class FederationRemoteSendQueue(object): for (pos, (destination, edu_key)) in keyed_edus: rows.append( - (pos, KEYED_EDU_TYPE, ujson.dumps({ + (pos, KEYED_EDU_TYPE, { "key": edu_key, "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(), - })) + }) ) # Fetch changed edus @@ -284,7 +283,7 @@ class FederationRemoteSendQueue(object): edus = set((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: - rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict()))) + rows.append((pos, EDU_TYPE, edu.get_internal_dict())) # Fetch changed failures keys = self.failures.keys() @@ -293,10 +292,10 @@ class FederationRemoteSendQueue(object): failures = set((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: - rows.append((pos, FAILURE_TYPE, ujson.dumps({ + rows.append((pos, FAILURE_TYPE, { "destination": destination, "failure": failure, - }))) + })) # Fetch changed device messages keys = self.device_messages.keys() @@ -305,9 +304,9 @@ class FederationRemoteSendQueue(object): device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) for (pos, destination) in device_messages: - rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({ + rows.append((pos, DEVICE_MESSAGE_TYPE, { "destination": destination, - }))) + })) # Sort rows based on pos rows.sort() -- cgit 1.5.1 From 96b9b6c1275087acf57faf1306bcc392dbd9f842 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Apr 2017 11:34:20 +0100 Subject: Don't double json encode typing replication data --- synapse/app/synchrotron.py | 4 +--- synapse/handlers/typing.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index a1ef5dfa77..1fac021ea9 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -62,7 +62,6 @@ import sys import logging import contextlib import gc -import ujson as json logger = logging.getLogger("synapse.app.synchrotron") @@ -215,9 +214,8 @@ class SynchrotronTyping(object): self._latest_room_serial = token for row in rows: - typing = json.loads(row.user_ids) self._room_serials[row.room_id] = token - self._room_typing[row.room_id] = typing + self._room_typing[row.room_id] = row.user_ids class SynchrotronApplicationService(object): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index d6809862e0..3b7818af5c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -24,7 +24,6 @@ from synapse.types import UserID, get_domain_from_id import logging from collections import namedtuple -import ujson as json logger = logging.getLogger(__name__) @@ -288,8 +287,7 @@ class TypingHandler(object): for room_id, serial in self._room_serials.items(): if last_id < serial and serial <= current_id: typing = self._room_typing[room_id] - typing_bytes = json.dumps(list(typing), ensure_ascii=False) - rows.append((serial, room_id, typing_bytes)) + rows.append((serial, room_id, list(typing))) rows.sort() return rows -- cgit 1.5.1 From b5cb6347a4d9b9fb5c21a2a6e3a2852c14b837fb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Apr 2017 13:25:40 +0100 Subject: Don't immediately notify the master about users whose syncs have gone away --- synapse/app/synchrotron.py | 40 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 4 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index a1ef5dfa77..a5dfbef8da 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -120,12 +120,46 @@ class SynchrotronPresence(object): for state in active_presence } + # user_id -> last_sync_ms. Lists the users that have stopped syncing + # but we haven't notified the master of that yet + self.users_going_offline = {} + + self._send_stop_syncing_loop = self.clock.looping_call( + self.send_stop_syncing, 10 * 1000 + ) + self.process_id = random_string(16) logger.info("Presence process_id is %r", self.process_id) def send_user_sync(self, user_id, is_syncing, last_sync_ms): self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms) + def mark_as_coming_online(self, user_id): + """A user has started syncing. Send a UserSync to the master, unless they + had recently stopped syncing. + """ + going_offline = self.users_going_offline.pop(user_id, None) + if not going_offline: + self.send_user_sync(user_id, True, self.clock.time_msec()) + + def mark_as_going_offline(self, user_id): + """A user has stopped syncing. We wait before notifying the master as + its likely they'll come back soon. This allows us to avoid sending + a stopped syncing immediately followed by a started syncing notification + to the master + """ + self.users_going_offline[user_id] = self.clock.time_msec() + + def send_stop_syncing(self): + """Check if there are any users who have stopped syncing a while ago + and haven't come back yet. If there are poke the master about them. + """ + now = self.clock.time_msec() + for user_id, last_sync_ms in self.users_going_offline.items(): + if now - last_sync_ms > 10 * 1000: + self.users_going_offline.pop(user_id, None) + self.send_user_sync(user_id, False, last_sync_ms) + def set_state(self, user, state, ignore_status_msg=False): # TODO Hows this supposed to work? pass @@ -142,8 +176,7 @@ class SynchrotronPresence(object): # If we went from no in flight sync to some, notify replication if self.user_to_num_current_syncs[user_id] == 1: - now = self.clock.time_msec() - self.send_user_sync(user_id, True, now) + self.mark_as_coming_online(user_id) def _end(): # We check that the user_id is in user_to_num_current_syncs because @@ -154,8 +187,7 @@ class SynchrotronPresence(object): # If we went from one in flight sync to non, notify replication if self.user_to_num_current_syncs[user_id] == 0: - now = self.clock.time_msec() - self.send_user_sync(user_id, False, now) + self.mark_as_going_offline(user_id) @contextlib.contextmanager def _user_syncing(): -- cgit 1.5.1 From dbf87282d39b9ee503f07d68c99ccc1624fc6e17 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Apr 2017 13:11:21 +0100 Subject: Docs --- synapse/app/synchrotron.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index a5dfbef8da..e0716a6226 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -137,9 +137,13 @@ class SynchrotronPresence(object): def mark_as_coming_online(self, user_id): """A user has started syncing. Send a UserSync to the master, unless they had recently stopped syncing. + + Args: + user_id (str) """ going_offline = self.users_going_offline.pop(user_id, None) if not going_offline: + # Safe to skip if we haven't yet told the master they were offline self.send_user_sync(user_id, True, self.clock.time_msec()) def mark_as_going_offline(self, user_id): @@ -147,6 +151,9 @@ class SynchrotronPresence(object): its likely they'll come back soon. This allows us to avoid sending a stopped syncing immediately followed by a started syncing notification to the master + + Args: + user_id (str) """ self.users_going_offline[user_id] = self.clock.time_msec() -- cgit 1.5.1 From 391712a4f9c43359c9f6c36a6ed51800b370ffc0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Apr 2017 13:35:00 +0100 Subject: Comment --- synapse/app/synchrotron.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index e0716a6226..67d9210f2a 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -143,7 +143,7 @@ class SynchrotronPresence(object): """ going_offline = self.users_going_offline.pop(user_id, None) if not going_offline: - # Safe to skip if we haven't yet told the master they were offline + # Safe to skip because we haven't yet told the master they were offline self.send_user_sync(user_id, True, self.clock.time_msec()) def mark_as_going_offline(self, user_id): -- cgit 1.5.1 From 8a1137ceab54b5b2643f1be3995b2977f5cd2c63 Mon Sep 17 00:00:00 2001 From: Kim Brose Date: Thu, 6 Apr 2017 17:10:20 +0200 Subject: fix typo in synctl help --- synapse/app/synctl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index c045588866..e84045c7d1 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -98,7 +98,7 @@ def main(): "configfile", nargs="?", default="homeserver.yaml", - help="the homeserver config file, defaults to homserver.yaml", + help="the homeserver config file, defaults to homeserver.yaml", ) parser.add_argument( "-w", "--worker", -- cgit 1.5.1 From 449d1297cae8bb05140b257f6e08e4cc036bf481 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Apr 2017 11:48:27 +0100 Subject: Fix up federation SendQueue and document types --- synapse/app/federation_sender.py | 66 +---------- synapse/federation/send_queue.py | 246 ++++++++++++++++++++++++++++++++++----- 2 files changed, 221 insertions(+), 91 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 145c01f3a3..477e16e0fa 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -23,7 +23,6 @@ from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory from synapse.http.site import SynapseSite from synapse.federation import send_queue -from synapse.federation.units import Edu from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.events import SlavedEventStore @@ -33,7 +32,6 @@ from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine -from synapse.storage.presence import UserPresenceState from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -277,69 +275,7 @@ class FederationSenderHandler(object): # The federation stream contains things that we want to send out, e.g. # presence, typing, etc. if stream_name == "federation": - # The federation stream containis a bunch of different types of - # rows that need to be handled differently. We parse the rows, put - # them into the appropriate collection and then send them off. - presence_to_send = {} - keyed_edus = {} - edus = {} - failures = {} - device_destinations = set() - - # Parse the rows in the stream - for row in rows: - typ = row.type - content = row.data - - if typ == send_queue.PRESENCE_TYPE: - destination = content["destination"] - state = UserPresenceState.from_dict(content["state"]) - - presence_to_send.setdefault(destination, []).append(state) - elif typ == send_queue.KEYED_EDU_TYPE: - key = content["key"] - edu = Edu(**content["edu"]) - - keyed_edus.setdefault( - edu.destination, {} - )[(edu.destination, tuple(key))] = edu - elif typ == send_queue.EDU_TYPE: - edu = Edu(**content) - - edus.setdefault(edu.destination, []).append(edu) - elif typ == send_queue.FAILURE_TYPE: - destination = content["destination"] - failure = content["failure"] - - failures.setdefault(destination, []).append(failure) - elif typ == send_queue.DEVICE_MESSAGE_TYPE: - device_destinations.add(content["destination"]) - else: - raise Exception("Unrecognised federation type: %r", typ) - - # We've finished collecting, send everything off - for destination, states in presence_to_send.items(): - self.federation_sender.send_presence(destination, states) - - for destination, edu_map in keyed_edus.items(): - for key, edu in edu_map.items(): - self.federation_sender.send_edu( - edu.destination, edu.edu_type, edu.content, key=key, - ) - - for destination, edu_list in edus.items(): - for edu in edu_list: - self.federation_sender.send_edu( - edu.destination, edu.edu_type, edu.content, key=None, - ) - - for destination, failure_list in failures.items(): - for failure in failure_list: - self.federation_sender.send_failure(destination, failure) - - for destination in device_destinations: - self.federation_sender.send_device_messages(destination) - + send_queue.process_rows_for_federation(self.federation_sender, rows) preserve_fn(self.update_token)(token) # We also need to poke the federation sender when new events happen diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 78c852ed69..8a6392c697 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -31,22 +31,17 @@ Events are replicated via a separate events stream. from .units import Edu +from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure import synapse.metrics from blist import sorteddict +from collections import namedtuple metrics = synapse.metrics.get_metrics_for(__name__) -PRESENCE_TYPE = "p" -KEYED_EDU_TYPE = "k" -EDU_TYPE = "e" -FAILURE_TYPE = "f" -DEVICE_MESSAGE_TYPE = "d" - - class FederationRemoteSendQueue(object): """A drop in replacement for TransactionQueue""" @@ -257,10 +252,10 @@ class FederationRemoteSendQueue(object): ) for (key, (dest, user_id)) in dest_user_ids: - rows.append((key, PRESENCE_TYPE, { - "destination": dest, - "state": self.presence_map[user_id].as_dict(), - })) + rows.append((key, PresenceRow( + destination=dest, + state=self.presence_map[user_id], + ))) # Fetch changes keyed edus keys = self.keyed_edu_changed.keys() @@ -269,12 +264,10 @@ class FederationRemoteSendQueue(object): keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j]) for (pos, (destination, edu_key)) in keyed_edus: - rows.append( - (pos, KEYED_EDU_TYPE, { - "key": edu_key, - "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(), - }) - ) + rows.append((pos, KeyedEduRow( + key=edu_key, + edu=self.keyed_edu[(destination, edu_key)], + ))) # Fetch changed edus keys = self.edus.keys() @@ -283,7 +276,7 @@ class FederationRemoteSendQueue(object): edus = set((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: - rows.append((pos, EDU_TYPE, edu.get_internal_dict())) + rows.append((pos, EduRow(edu))) # Fetch changed failures keys = self.failures.keys() @@ -292,10 +285,10 @@ class FederationRemoteSendQueue(object): failures = set((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: - rows.append((pos, FAILURE_TYPE, { - "destination": destination, - "failure": failure, - })) + rows.append((pos, FailureRow( + destination=destination, + failure=failure, + ))) # Fetch changed device messages keys = self.device_messages.keys() @@ -304,11 +297,212 @@ class FederationRemoteSendQueue(object): device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) for (pos, destination) in device_messages: - rows.append((pos, DEVICE_MESSAGE_TYPE, { - "destination": destination, - })) + rows.append((pos, DeviceRow( + destination=destination, + ))) # Sort rows based on pos rows.sort() - return rows + return [(pos, row.TypeId, row.to_data()) for pos, row in rows] + + +class BaseFederationRow(object): + TypeId = None + + @staticmethod + def from_data(data): + """Parse the data from the federation stream into a row. + """ + raise NotImplementedError() + + def to_data(self): + """Serialize this row to be sent over the federation stream + """ + raise NotImplementedError() + + def add_to_buffer(self, buff): + """Add this row to the appropriate field in the buffer ready for this + to be sent over federation. + + We use a buffer so that we can batch up events that have come in at + the same time and send them all at once. + + Args: + buff (BufferedToSend) + """ + raise NotImplementedError() + + +class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( + "destination", # str + "state", # UserPresenceState +))): + TypeId = "p" + + @staticmethod + def from_data(data): + return PresenceRow( + destination=data["destination"], + state=UserPresenceState.from_dict(data["state"]) + ) + + def to_data(self): + return { + "destination": self.destination, + "state": self.state.as_dict() + } + + def add_to_buffer(self, buff): + buff.presence.setdefault(self.destination, []).append(self.state) + + +class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( + "key", # tuple(str) - the edu key passed to send_edu + "edu", # Edu +))): + TypeId = "k" + + @staticmethod + def from_data(data): + return KeyedEduRow( + key=tuple(data["key"]), + edu=Edu(**data["edu"]), + ) + + def to_data(self): + return { + "key": self.key, + "edu": self.edu.get_internal_dict(), + } + + def add_to_buffer(self, buff): + buff.keyed_edus.setdefault( + self.edu.destination, {} + )[self.key] = self.edu + + +class EduRow(BaseFederationRow, namedtuple("EduRow", ( + "edu", # Edu +))): + TypeId = "e" + + @staticmethod + def from_data(data): + return EduRow(Edu(**data)) + + def to_data(self): + return self.edu.get_internal_dict() + + def add_to_buffer(self, buff): + buff.edus.setdefault(self.edu.destination, []).append(self.edu) + + +class FailureRow(BaseFederationRow, namedtuple("FailureRow", ( + "destination", # str + "failure", +))): + TypeId = "f" + + @staticmethod + def from_data(data): + return FailureRow( + destination=data["destination"], + failure=data["failure"], + ) + + def to_data(self): + return { + "destination": self.destination, + "failure": self.failure, + } + + def add_to_buffer(self, buff): + buff.failures.setdefault(self.destination, []).append(self.failure) + + +class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ( + "destination", # str +))): + TypeId = "d" + + @staticmethod + def from_data(data): + return DeviceRow(destination=data) + + def to_data(self): + return self.destination + + def add_to_buffer(self, buff): + buff.device_destinations.add(self.destination) + + +TypeToRow = { + Row.TypeId: Row + for Row in ( + PresenceRow, + KeyedEduRow, + EduRow, + FailureRow, + DeviceRow, + ) +} + + +BufferedToSend = namedtuple("BufferedToSend", ( + "presence", # dict of destination -> [UserPresenceState] + "keyed_edus", # dict of destination -> { key -> Edu } + "edus", # dict of destination -> [Edu] + "failures", # dict of destination -> [failures] + "device_destinations", # set of destinations +)) + + +def process_rows_for_federation(federation_sender, rows): + """Parse a list of rows from the federation stream and them send them out. + + Args: + federation_sender (TransactionQueue) + rows (list(FederationStreamRow)) + """ + + # The federation stream containis a bunch of different types of + # rows that need to be handled differently. We parse the rows, put + # them into the appropriate collection and then send them off. + + buff = BufferedToSend( + presence={}, + keyed_edus={}, + edus={}, + failures={}, + device_destinations=set(), + ) + + # Parse the rows in the stream and add to the buffer + for row in rows: + RowType = TypeToRow[row.type] + parsed_row = RowType.from_data(row.data) + parsed_row.add_to_buffer(buff) + + # We've finished collecting, send everything off + for destination, states in buff.presence.iteritems(): + federation_sender.send_presence(destination, states) + + for destination, edu_map in buff.keyed_edus.iteritems(): + for key, edu in edu_map.items(): + federation_sender.send_edu( + edu.destination, edu.edu_type, edu.content, key=key, + ) + + for destination, edu_list in buff.edus.iteritems(): + for edu in edu_list: + federation_sender.send_edu( + edu.destination, edu.edu_type, edu.content, key=None, + ) + + for destination, failure_list in buff.failures.iteritems(): + for failure in failure_list: + federation_sender.send_failure(destination, failure) + + for destination in buff.device_destinations: + federation_sender.send_device_messages(destination) -- cgit 1.5.1 From 29574fd5b3537cc272a4d792669b8d5be2a92b6f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 16:48:30 +0100 Subject: Reduce federation presence replication traffic This is mainly done by moving the calculation of where to send presence updates from the presence handler to the transaction queue, so we only need to send the presence event (and not the destinations) across the replication connection. Before we were duplicating by sending the full state across once per destination. --- synapse/app/federation_sender.py | 12 ++++ synapse/app/synchrotron.py | 6 +- synapse/federation/send_queue.py | 47 ++++++-------- synapse/federation/transaction_queue.py | 99 ++++++++++++++++++++++++++--- synapse/handlers/presence.py | 54 ++++------------ synapse/replication/slave/storage/events.py | 1 + 6 files changed, 139 insertions(+), 80 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 477e16e0fa..49efb602bc 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -32,6 +32,7 @@ from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine +from synapse.storage.presence import PresenceStore from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -80,6 +81,17 @@ class FederationSenderSlaveStore( return rows[0][0] if rows else -1 + # XXX: This is a bit broken because we don't persist the accepted list in a + # way that can be replicated. This means that we don't have a way to + # invalidate the cache correctly. + # This is fine since in practice nobody uses the presence list stuff... + get_presence_list_accepted = PresenceStore.__dict__[ + "get_presence_list_accepted" + ] + get_presence_list_observers_accepted = PresenceStore.__dict__[ + "get_presence_list_observers_accepted" + ] + class FederationSenderServer(HomeServer): def get_db_conn(self, run_new_connection=True): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index d39e3161fe..7b6f82abdc 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -206,10 +206,8 @@ class SynchrotronPresence(object): @defer.inlineCallbacks def notify_from_replication(self, states, stream_id): - parties = yield self._get_interested_parties( - states, calculate_remote_hosts=False - ) - room_ids_to_states, users_to_states, _ = parties + parties = yield self._get_interested_parties(states) + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 748548bbe2..a12c18f4df 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -53,6 +53,7 @@ class FederationRemoteSendQueue(object): self.server_name = hs.hostname self.clock = hs.get_clock() self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id self.presence_map = {} self.presence_changed = sorteddict() @@ -120,7 +121,9 @@ class FederationRemoteSendQueue(object): del self.presence_changed[key] user_ids = set( - user_id for uids in self.presence_changed.values() for _, user_id in uids + user_id + for uids in self.presence_changed.itervalues() + for user_id in uids ) to_del = [ @@ -187,18 +190,14 @@ class FederationRemoteSendQueue(object): self.notifier.on_new_replication_data() - def send_presence(self, destination, states): + def send_presence(self, states): """As per TransactionQueue""" pos = self._next_pos() - self.presence_map.update({ - state.user_id: state - for state in states - }) + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - self.presence_changed[pos] = [ - (destination, state.user_id) for state in states - ] + self.presence_map.update({state.user_id: state for state in local_states}) + self.presence_changed[pos] = [state.user_id for state in local_states] self.notifier.on_new_replication_data() @@ -251,15 +250,14 @@ class FederationRemoteSendQueue(object): keys = self.presence_changed.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - dest_user_ids = set( - (pos, dest_user_id) + dest_user_ids = [ + (pos, user_id) for pos in keys[i:j] - for dest_user_id in self.presence_changed[pos] - ) + for user_id in self.presence_changed[pos] + ] - for (key, (dest, user_id)) in dest_user_ids: + for (key, user_id) in dest_user_ids: rows.append((key, PresenceRow( - destination=dest, state=self.presence_map[user_id], ))) @@ -354,7 +352,6 @@ class BaseFederationRow(object): class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( - "destination", # str "state", # UserPresenceState ))): TypeId = "p" @@ -362,18 +359,14 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( @staticmethod def from_data(data): return PresenceRow( - destination=data["destination"], - state=UserPresenceState.from_dict(data["state"]) + state=UserPresenceState.from_dict(data) ) def to_data(self): - return { - "destination": self.destination, - "state": self.state.as_dict() - } + return self.state.as_dict() def add_to_buffer(self, buff): - buff.presence.setdefault(self.destination, []).append(self.state) + buff.presence.append(self.state) class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( @@ -469,7 +462,7 @@ TypeToRow = { ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( - "presence", # dict of destination -> [UserPresenceState] + "presence", # list(UserPresenceState) "keyed_edus", # dict of destination -> { key -> Edu } "edus", # dict of destination -> [Edu] "failures", # dict of destination -> [failures] @@ -491,7 +484,7 @@ def process_rows_for_federation(transaction_queue, rows): # them into the appropriate collection and then send them off. buff = ParsedFederationStreamData( - presence={}, + presence=[], keyed_edus={}, edus={}, failures={}, @@ -508,8 +501,8 @@ def process_rows_for_federation(transaction_queue, rows): parsed_row = RowType.from_data(row.data) parsed_row.add_to_buffer(buff) - for destination, states in buff.presence.iteritems(): - transaction_queue.send_presence(destination, states) + if buff.presence: + transaction_queue.send_presence(buff.presence) for destination, edu_map in buff.keyed_edus.iteritems(): for key, edu in edu_map.items(): diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c27ce7c5f3..fd9e1fa01c 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -21,7 +21,7 @@ from .units import Transaction, Edu from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor -from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.logcontext import preserve_context_over_fn, preserve_fn from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id @@ -78,6 +78,7 @@ class TransactionQueue(object): self.pending_edus_by_dest = edus = {} # Presence needs to be separate as we send single aggragate EDUs + self.pending_presence = {} self.pending_presence_by_dest = presence = {} self.pending_edus_keyed_by_dest = edus_keyed = {} @@ -113,6 +114,8 @@ class TransactionQueue(object): self._is_processing = False self._last_poked_id = -1 + self._processing_pending_presence = False + def can_send_to(self, destination): """Can we send messages to the given server? @@ -224,17 +227,95 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def send_presence(self, destination, states): - if not self.can_send_to(destination): + @preserve_fn + @defer.inlineCallbacks + def send_presence(self, states): + """Send the new presence states to the appropriate destinations. + + Args: + states (list(UserPresenceState)) + """ + + # First we queue up the new presence by user ID, so multiple presence + # updates in quick successtion are correctly handled + self.pending_presence.update({state.user_id: state for state in states}) + + # We then handle the new pending presence in batches, first figuring + # out the destinations we need to send each state to and then poking it + # to attempt a new transaction. We linearize this so that we don't + # accidentally mess up the ordering and send multiple presence updates + # in the wrong order + if self._processing_pending_presence: return - self.pending_presence_by_dest.setdefault(destination, {}).update({ - state.user_id: state for state in states - }) + self._processing_pending_presence = True + try: + while True: + states = self.pending_presence + self.pending_presence = {} - preserve_context_over_fn( - self._attempt_new_transaction, destination - ) + if not states: + break + + yield self._process_presence_inner(states) + finally: + self._processing_pending_presence = False + + @measure_func("txnqueue._process_presence") + @defer.inlineCallbacks + def _process_presence_inner(self, states): + """Given a list of states populate self.pending_presence_by_dest and + poke to send a new transaction to each destination + + Args: + states (list(UserPresenceState)) + """ + # First we look up the rooms each user is in (as well as any explicit + # subscriptions), then for each distinct room we look up the remote + # hosts in those rooms. + room_ids_to_states = {} + users_to_states = {} + for state in states.itervalues(): + room_ids = yield self.store.get_rooms_for_user(state.user_id) + for room_id in room_ids: + room_ids_to_states.setdefault(room_id, []).append(state) + + plist = yield self.store.get_presence_list_observers_accepted( + state.user_id, + ) + for u in plist: + users_to_states.setdefault(u, []).append(state) + + hosts_and_states = [] + for room_id, states in room_ids_to_states.items(): + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + if not local_states: + continue + + hosts = yield self.store.get_hosts_in_room(room_id) + hosts_and_states.append((hosts, local_states)) + + for user_id, states in users_to_states.items(): + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + if not local_states: + continue + + host = get_domain_from_id(user_id) + hosts_and_states.append(([host], local_states)) + + # And now finally queue up new transactions + for destinations, states in hosts_and_states: + for destination in destinations: + if not self.can_send_to(destination): + continue + + self.pending_presence_by_dest.setdefault( + destination, {} + ).update({ + state.user_id: state for state in states + }) + + preserve_fn(self._attempt_new_transaction)(destination) def send_edu(self, destination, edu_type, content, key=None): edu = Edu( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9ed5af3cb4..c1c0dd4d3d 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -318,11 +318,7 @@ class PresenceHandler(object): if to_federation_ping: federation_presence_out_counter.inc_by(len(to_federation_ping)) - _, _, hosts_to_states = yield self._get_interested_parties( - to_federation_ping.values() - ) - - self._push_to_remotes(hosts_to_states) + self._push_to_remotes(to_federation_ping.values()) def _handle_timeouts(self): """Checks the presence of users that have timed out and updates as @@ -615,12 +611,12 @@ class PresenceHandler(object): defer.returnValue(states) @defer.inlineCallbacks - def _get_interested_parties(self, states, calculate_remote_hosts=True): + def _get_interested_parties(self, states): """Given a list of states return which entities (rooms, users, servers) are interested in the given states. Returns: - 3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`, + 2-tuple: `(room_ids_to_states, users_to_states)`, with each item being a dict of `entity_name` -> `[UserPresenceState]` """ room_ids_to_states = {} @@ -637,30 +633,10 @@ class PresenceHandler(object): # Always notify self users_to_states.setdefault(state.user_id, []).append(state) - hosts_to_states = {} - if calculate_remote_hosts: - for room_id, states in room_ids_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - hosts = yield self.store.get_hosts_in_room(room_id) - - for host in hosts: - hosts_to_states.setdefault(host, []).extend(local_states) - - for user_id, states in users_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - host = get_domain_from_id(user_id) - hosts_to_states.setdefault(host, []).extend(local_states) - # TODO: de-dup hosts_to_states, as a single host might have multiple # of same presence - defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states)) + defer.returnValue((room_ids_to_states, users_to_states)) @defer.inlineCallbacks def _persist_and_notify(self, states): @@ -670,33 +646,32 @@ class PresenceHandler(object): stream_id, max_token = yield self.store.update_presence(states) parties = yield self._get_interested_parties(states) - room_ids_to_states, users_to_states, hosts_to_states = parties + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states.keys()] + users=[UserID.from_string(u) for u in users_to_states] ) - self._push_to_remotes(hosts_to_states) + self._push_to_remotes(states) @defer.inlineCallbacks def notify_for_states(self, state, stream_id): parties = yield self._get_interested_parties([state]) - room_ids_to_states, users_to_states, hosts_to_states = parties + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states.keys()] + users=[UserID.from_string(u) for u in users_to_states] ) - def _push_to_remotes(self, hosts_to_states): + def _push_to_remotes(self, states): """Sends state updates to remote servers. Args: - hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]` + hosts_to_states (list): list(state) """ - for host, states in hosts_to_states.items(): - self.federation.send_presence(host, states) + self.federation.send_presence(states) @defer.inlineCallbacks def incoming_presence(self, origin, content): @@ -837,14 +812,13 @@ class PresenceHandler(object): if self.is_mine(user): state = yield self.current_state_for_user(user.to_string()) - hosts = set(get_domain_from_id(u) for u in user_ids) - self._push_to_remotes({host: (state,) for host in hosts}) + self._push_to_remotes([state]) else: user_ids = filter(self.is_mine_id, user_ids) states = yield self.current_state_for_users(user_ids) - self._push_to_remotes({user.domain: states.values()}) + self._push_to_remotes(states.values()) @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 5fd47706ef..4ca1e5aa8c 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -71,6 +71,7 @@ class SlavedEventStore(BaseSlavedStore): # to reach inside the __dict__ to extract them. get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"] get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"] + get_hosts_in_room = RoomMemberStore.__dict__["get_hosts_in_room"] get_users_who_share_room_with_user = ( RoomMemberStore.__dict__["get_users_who_share_room_with_user"] ) -- cgit 1.5.1 From 28a46497856a2140e8828787af4051c79d12b57e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 09:52:11 +0100 Subject: Remove HTTP replication APIs --- synapse/app/homeserver.py | 4 - synapse/replication/expire_cache.py | 60 ---- synapse/replication/presence_resource.py | 59 ---- synapse/replication/pusher_resource.py | 54 --- synapse/replication/resource.py | 576 ------------------------------- 5 files changed, 753 deletions(-) delete mode 100644 synapse/replication/expire_cache.py delete mode 100644 synapse/replication/presence_resource.py delete mode 100644 synapse/replication/pusher_resource.py delete mode 100644 synapse/replication/resource.py (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 990eb477e5..6f5924d2c7 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -55,7 +55,6 @@ from synapse.crypto import context_factory from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.metrics import register_memory_metrics, get_metrics_for from synapse.metrics.resource import MetricsResource, METRICS_PREFIX -from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.federation.transport.server import TransportLayerServer @@ -167,9 +166,6 @@ class SynapseHomeServer(HomeServer): if name == "metrics" and self.get_config().enable_metrics: resources[METRICS_PREFIX] = MetricsResource(self) - if name == "replication": - resources[REPLICATION_PREFIX] = ReplicationResource(self) - if WEB_CLIENT_PREFIX in resources: root_resource = RootRedirect(WEB_CLIENT_PREFIX) else: diff --git a/synapse/replication/expire_cache.py b/synapse/replication/expire_cache.py deleted file mode 100644 index c05a50d7a6..0000000000 --- a/synapse/replication/expire_cache.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.http.server import respond_with_json_bytes, request_handler -from synapse.http.servlet import parse_json_object_from_request - -from twisted.web.resource import Resource -from twisted.web.server import NOT_DONE_YET - - -class ExpireCacheResource(Resource): - """ - HTTP endpoint for expiring storage caches. - - POST /_synapse/replication/expire_cache HTTP/1.1 - Content-Type: application/json - - { - "invalidate": [ - { - "name": "func_name", - "keys": ["key1", "key2"] - } - ] - } - """ - - def __init__(self, hs): - Resource.__init__(self) # Resource is old-style, so no super() - - self.store = hs.get_datastore() - self.version_string = hs.version_string - self.clock = hs.get_clock() - - def render_POST(self, request): - self._async_render_POST(request) - return NOT_DONE_YET - - @request_handler() - def _async_render_POST(self, request): - content = parse_json_object_from_request(request) - - for row in content["invalidate"]: - name = row["name"] - keys = tuple(row["keys"]) - - getattr(self.store, name).invalidate(keys) - - respond_with_json_bytes(request, 200, "{}") diff --git a/synapse/replication/presence_resource.py b/synapse/replication/presence_resource.py deleted file mode 100644 index fc18130ab4..0000000000 --- a/synapse/replication/presence_resource.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.http.server import respond_with_json_bytes, request_handler -from synapse.http.servlet import parse_json_object_from_request - -from twisted.web.resource import Resource -from twisted.web.server import NOT_DONE_YET -from twisted.internet import defer - - -class PresenceResource(Resource): - """ - HTTP endpoint for marking users as syncing. - - POST /_synapse/replication/presence HTTP/1.1 - Content-Type: application/json - - { - "process_id": "", - "syncing_users": [""] - } - """ - - def __init__(self, hs): - Resource.__init__(self) # Resource is old-style, so no super() - - self.version_string = hs.version_string - self.clock = hs.get_clock() - self.presence_handler = hs.get_presence_handler() - - def render_POST(self, request): - self._async_render_POST(request) - return NOT_DONE_YET - - @request_handler() - @defer.inlineCallbacks - def _async_render_POST(self, request): - content = parse_json_object_from_request(request) - - process_id = content["process_id"] - syncing_user_ids = content["syncing_users"] - - yield self.presence_handler.update_external_syncs( - process_id, set(syncing_user_ids) - ) - - respond_with_json_bytes(request, 200, "{}") diff --git a/synapse/replication/pusher_resource.py b/synapse/replication/pusher_resource.py deleted file mode 100644 index 9b01ab3c13..0000000000 --- a/synapse/replication/pusher_resource.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.http.server import respond_with_json_bytes, request_handler -from synapse.http.servlet import parse_json_object_from_request - -from twisted.web.resource import Resource -from twisted.web.server import NOT_DONE_YET -from twisted.internet import defer - - -class PusherResource(Resource): - """ - HTTP endpoint for deleting rejected pushers - """ - - def __init__(self, hs): - Resource.__init__(self) # Resource is old-style, so no super() - - self.version_string = hs.version_string - self.store = hs.get_datastore() - self.notifier = hs.get_notifier() - self.clock = hs.get_clock() - - def render_POST(self, request): - self._async_render_POST(request) - return NOT_DONE_YET - - @request_handler() - @defer.inlineCallbacks - def _async_render_POST(self, request): - content = parse_json_object_from_request(request) - - for remove in content["remove"]: - yield self.store.delete_pusher_by_app_id_pushkey_user_id( - remove["app_id"], - remove["push_key"], - remove["user_id"], - ) - - self.notifier.on_new_replication_data() - - respond_with_json_bytes(request, 200, "{}") diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py deleted file mode 100644 index abd3fe7665..0000000000 --- a/synapse/replication/resource.py +++ /dev/null @@ -1,576 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.http.servlet import parse_integer, parse_string -from synapse.http.server import request_handler, finish_request -from synapse.replication.pusher_resource import PusherResource -from synapse.replication.presence_resource import PresenceResource -from synapse.replication.expire_cache import ExpireCacheResource -from synapse.api.errors import SynapseError - -from twisted.web.resource import Resource -from twisted.web.server import NOT_DONE_YET -from twisted.internet import defer - -import ujson as json - -import collections -import logging - -logger = logging.getLogger(__name__) - -REPLICATION_PREFIX = "/_synapse/replication" - -STREAM_NAMES = ( - ("events",), - ("presence",), - ("typing",), - ("receipts",), - ("user_account_data", "room_account_data", "tag_account_data",), - ("backfill",), - ("push_rules",), - ("pushers",), - ("caches",), - ("to_device",), - ("public_rooms",), - ("federation",), - ("device_lists",), -) - - -class ReplicationResource(Resource): - """ - HTTP endpoint for extracting data from synapse. - - The streams of data returned by the endpoint are controlled by the - parameters given to the API. To return a given stream pass a query - parameter with a position in the stream to return data from or the - special value "-1" to return data from the start of the stream. - - If there is no data for any of the supplied streams after the given - position then the request will block until there is data for one - of the streams. This allows clients to long-poll this API. - - The possible streams are: - - * "streams": A special stream returing the positions of other streams. - * "events": The new events seen on the server. - * "presence": Presence updates. - * "typing": Typing updates. - * "receipts": Receipt updates. - * "user_account_data": Top-level per user account data. - * "room_account_data: Per room per user account data. - * "tag_account_data": Per room per user tags. - * "backfill": Old events that have been backfilled from other servers. - * "push_rules": Per user changes to push rules. - * "pushers": Per user changes to their pushers. - * "caches": Cache invalidations. - - The API takes two additional query parameters: - - * "timeout": How long to wait before returning an empty response. - * "limit": The maximum number of rows to return for the selected streams. - - The response is a JSON object with keys for each stream with updates. Under - each key is a JSON object with: - - * "position": The current position of the stream. - * "field_names": The names of the fields in each row. - * "rows": The updates as an array of arrays. - - There are a number of ways this API could be used: - - 1) To replicate the contents of the backing database to another database. - 2) To be notified when the contents of a shared backing database changes. - 3) To "tail" the activity happening on a server for debugging. - - In the first case the client would track all of the streams and store it's - own copy of the data. - - In the second case the client might theoretically just be able to follow - the "streams" stream to track where the other streams are. However in - practise it will probably need to get the contents of the streams in - order to expire the any in-memory caches. Whether it gets the contents - of the streams from this replication API or directly from the backing - store is a matter of taste. - - In the third case the client would use the "streams" stream to find what - streams are available and their current positions. Then it can start - long-polling this replication API for new data on those streams. - """ - - def __init__(self, hs): - Resource.__init__(self) # Resource is old-style, so no super() - - self.version_string = hs.version_string - self.store = hs.get_datastore() - self.sources = hs.get_event_sources() - self.presence_handler = hs.get_presence_handler() - self.typing_handler = hs.get_typing_handler() - self.federation_sender = hs.get_federation_sender() - self.notifier = hs.notifier - self.clock = hs.get_clock() - self.config = hs.get_config() - - self.putChild("remove_pushers", PusherResource(hs)) - self.putChild("syncing_users", PresenceResource(hs)) - self.putChild("expire_cache", ExpireCacheResource(hs)) - - def render_GET(self, request): - self._async_render_GET(request) - return NOT_DONE_YET - - @defer.inlineCallbacks - def current_replication_token(self): - stream_token = yield self.sources.get_current_token() - backfill_token = yield self.store.get_current_backfill_token() - push_rules_token, room_stream_token = self.store.get_push_rules_stream_token() - pushers_token = self.store.get_pushers_stream_token() - caches_token = self.store.get_cache_stream_token() - public_rooms_token = self.store.get_current_public_room_stream_id() - federation_token = self.federation_sender.get_current_token() - device_list_token = self.store.get_device_stream_token() - - defer.returnValue(_ReplicationToken( - room_stream_token, - int(stream_token.presence_key), - int(stream_token.typing_key), - int(stream_token.receipt_key), - int(stream_token.account_data_key), - backfill_token, - push_rules_token, - pushers_token, - 0, # State stream is no longer a thing - caches_token, - int(stream_token.to_device_key), - int(public_rooms_token), - int(federation_token), - int(device_list_token), - )) - - @request_handler() - @defer.inlineCallbacks - def _async_render_GET(self, request): - limit = parse_integer(request, "limit", 100) - timeout = parse_integer(request, "timeout", 10 * 1000) - - request.setHeader(b"Content-Type", b"application/json") - - request_streams = { - name: parse_integer(request, name) - for names in STREAM_NAMES for name in names - } - request_streams["streams"] = parse_string(request, "streams") - - federation_ack = parse_integer(request, "federation_ack", None) - - def replicate(): - return self.replicate( - request_streams, limit, - federation_ack=federation_ack - ) - - writer = yield self.notifier.wait_for_replication(replicate, timeout) - result = writer.finish() - - for stream_name, stream_content in result.items(): - logger.info( - "Replicating %d rows of %s from %s -> %s", - len(stream_content["rows"]), - stream_name, - request_streams.get(stream_name), - stream_content["position"], - ) - - request.write(json.dumps(result, ensure_ascii=False)) - finish_request(request) - - @defer.inlineCallbacks - def replicate(self, request_streams, limit, federation_ack=None): - writer = _Writer() - current_token = yield self.current_replication_token() - logger.debug("Replicating up to %r", current_token) - - if limit == 0: - raise SynapseError(400, "Limit cannot be 0") - - yield self.account_data(writer, current_token, limit, request_streams) - yield self.events(writer, current_token, limit, request_streams) - # TODO: implement limit - yield self.presence(writer, current_token, request_streams) - yield self.typing(writer, current_token, request_streams) - yield self.receipts(writer, current_token, limit, request_streams) - yield self.push_rules(writer, current_token, limit, request_streams) - yield self.pushers(writer, current_token, limit, request_streams) - yield self.caches(writer, current_token, limit, request_streams) - yield self.to_device(writer, current_token, limit, request_streams) - yield self.public_rooms(writer, current_token, limit, request_streams) - yield self.device_lists(writer, current_token, limit, request_streams) - self.federation(writer, current_token, limit, request_streams, federation_ack) - self.streams(writer, current_token, request_streams) - - logger.debug("Replicated %d rows", writer.total) - defer.returnValue(writer) - - def streams(self, writer, current_token, request_streams): - request_token = request_streams.get("streams") - - streams = [] - - if request_token is not None: - if request_token == "-1": - for names, position in zip(STREAM_NAMES, current_token): - streams.extend((name, position) for name in names) - else: - items = zip( - STREAM_NAMES, - current_token, - _ReplicationToken(request_token) - ) - for names, current_id, last_id in items: - if last_id < current_id: - streams.extend((name, current_id) for name in names) - - if streams: - writer.write_header_and_rows( - "streams", streams, ("name", "position"), - position=str(current_token) - ) - - @defer.inlineCallbacks - def events(self, writer, current_token, limit, request_streams): - request_events = request_streams.get("events") - request_backfill = request_streams.get("backfill") - - if request_events is not None or request_backfill is not None: - if request_events is None: - request_events = current_token.events - if request_backfill is None: - request_backfill = current_token.backfill - - no_new_tokens = ( - request_events == current_token.events - and request_backfill == current_token.backfill - ) - if no_new_tokens: - return - - res = yield self.store.get_all_new_events( - request_backfill, request_events, - current_token.backfill, current_token.events, - limit - ) - - upto_events_token = _position_from_rows( - res.new_forward_events, current_token.events - ) - - upto_backfill_token = _position_from_rows( - res.new_backfill_events, current_token.backfill - ) - - if request_events != upto_events_token: - writer.write_header_and_rows("events", res.new_forward_events, ( - "position", "event_id", "room_id", "type", "state_key", - ), position=upto_events_token) - - if request_backfill != upto_backfill_token: - writer.write_header_and_rows("backfill", res.new_backfill_events, ( - "position", "event_id", "room_id", "type", "state_key", "redacts", - ), position=upto_backfill_token) - - writer.write_header_and_rows( - "forward_ex_outliers", res.forward_ex_outliers, - ("position", "event_id", "state_group"), - ) - writer.write_header_and_rows( - "backward_ex_outliers", res.backward_ex_outliers, - ("position", "event_id", "state_group"), - ) - - @defer.inlineCallbacks - def presence(self, writer, current_token, request_streams): - current_position = current_token.presence - - request_presence = request_streams.get("presence") - - if request_presence is not None and request_presence != current_position: - presence_rows = yield self.presence_handler.get_all_presence_updates( - request_presence, current_position - ) - upto_token = _position_from_rows(presence_rows, current_position) - writer.write_header_and_rows("presence", presence_rows, ( - "position", "user_id", "state", "last_active_ts", - "last_federation_update_ts", "last_user_sync_ts", - "status_msg", "currently_active", - ), position=upto_token) - - @defer.inlineCallbacks - def typing(self, writer, current_token, request_streams): - current_position = current_token.typing - - request_typing = request_streams.get("typing") - - if request_typing is not None and request_typing != current_position: - # If they have a higher token than current max, we can assume that - # they had been talking to a previous instance of the master. Since - # we reset the token on restart, the best (but hacky) thing we can - # do is to simply resend down all the typing notifications. - if request_typing > current_position: - request_typing = 0 - - typing_rows = yield self.typing_handler.get_all_typing_updates( - request_typing, current_position - ) - upto_token = _position_from_rows(typing_rows, current_position) - writer.write_header_and_rows("typing", typing_rows, ( - "position", "room_id", "typing" - ), position=upto_token) - - @defer.inlineCallbacks - def receipts(self, writer, current_token, limit, request_streams): - current_position = current_token.receipts - - request_receipts = request_streams.get("receipts") - - if request_receipts is not None and request_receipts != current_position: - receipts_rows = yield self.store.get_all_updated_receipts( - request_receipts, current_position, limit - ) - upto_token = _position_from_rows(receipts_rows, current_position) - writer.write_header_and_rows("receipts", receipts_rows, ( - "position", "room_id", "receipt_type", "user_id", "event_id", "data" - ), position=upto_token) - - @defer.inlineCallbacks - def account_data(self, writer, current_token, limit, request_streams): - current_position = current_token.account_data - - user_account_data = request_streams.get("user_account_data") - room_account_data = request_streams.get("room_account_data") - tag_account_data = request_streams.get("tag_account_data") - - if user_account_data is not None or room_account_data is not None: - if user_account_data is None: - user_account_data = current_position - if room_account_data is None: - room_account_data = current_position - - no_new_tokens = ( - user_account_data == current_position - and room_account_data == current_position - ) - if no_new_tokens: - return - - user_rows, room_rows = yield self.store.get_all_updated_account_data( - user_account_data, room_account_data, current_position, limit - ) - - upto_users_token = _position_from_rows(user_rows, current_position) - upto_rooms_token = _position_from_rows(room_rows, current_position) - - writer.write_header_and_rows("user_account_data", user_rows, ( - "position", "user_id", "type", "content" - ), position=upto_users_token) - writer.write_header_and_rows("room_account_data", room_rows, ( - "position", "user_id", "room_id", "type", "content" - ), position=upto_rooms_token) - - if tag_account_data is not None: - tag_rows = yield self.store.get_all_updated_tags( - tag_account_data, current_position, limit - ) - upto_tag_token = _position_from_rows(tag_rows, current_position) - writer.write_header_and_rows("tag_account_data", tag_rows, ( - "position", "user_id", "room_id", "tags" - ), position=upto_tag_token) - - @defer.inlineCallbacks - def push_rules(self, writer, current_token, limit, request_streams): - current_position = current_token.push_rules - - push_rules = request_streams.get("push_rules") - - if push_rules is not None and push_rules != current_position: - rows = yield self.store.get_all_push_rule_updates( - push_rules, current_position, limit - ) - upto_token = _position_from_rows(rows, current_position) - writer.write_header_and_rows("push_rules", rows, ( - "position", "event_stream_ordering", "user_id", "rule_id", "op", - "priority_class", "priority", "conditions", "actions" - ), position=upto_token) - - @defer.inlineCallbacks - def pushers(self, writer, current_token, limit, request_streams): - current_position = current_token.pushers - - pushers = request_streams.get("pushers") - - if pushers is not None and pushers != current_position: - updated, deleted = yield self.store.get_all_updated_pushers( - pushers, current_position, limit - ) - upto_token = _position_from_rows(updated, current_position) - writer.write_header_and_rows("pushers", updated, ( - "position", "user_id", "access_token", "profile_tag", "kind", - "app_id", "app_display_name", "device_display_name", "pushkey", - "ts", "lang", "data" - ), position=upto_token) - writer.write_header_and_rows("deleted_pushers", deleted, ( - "position", "user_id", "app_id", "pushkey" - ), position=upto_token) - - @defer.inlineCallbacks - def caches(self, writer, current_token, limit, request_streams): - current_position = current_token.caches - - caches = request_streams.get("caches") - - if caches is not None and caches != current_position: - updated_caches = yield self.store.get_all_updated_caches( - caches, current_position, limit - ) - upto_token = _position_from_rows(updated_caches, current_position) - writer.write_header_and_rows("caches", updated_caches, ( - "position", "cache_func", "keys", "invalidation_ts" - ), position=upto_token) - - @defer.inlineCallbacks - def to_device(self, writer, current_token, limit, request_streams): - current_position = current_token.to_device - - to_device = request_streams.get("to_device") - - if to_device is not None and to_device != current_position: - to_device_rows = yield self.store.get_all_new_device_messages( - to_device, current_position, limit - ) - upto_token = _position_from_rows(to_device_rows, current_position) - writer.write_header_and_rows("to_device", to_device_rows, ( - "position", "user_id", "device_id", "message_json" - ), position=upto_token) - - @defer.inlineCallbacks - def public_rooms(self, writer, current_token, limit, request_streams): - current_position = current_token.public_rooms - - public_rooms = request_streams.get("public_rooms") - - if public_rooms is not None and public_rooms != current_position: - public_rooms_rows = yield self.store.get_all_new_public_rooms( - public_rooms, current_position, limit - ) - upto_token = _position_from_rows(public_rooms_rows, current_position) - writer.write_header_and_rows("public_rooms", public_rooms_rows, ( - "position", "room_id", "visibility", "appservice_id", "network_id", - ), position=upto_token) - - def federation(self, writer, current_token, limit, request_streams, federation_ack): - if self.config.send_federation: - return - - current_position = current_token.federation - - federation = request_streams.get("federation") - - if federation is not None and federation != current_position: - federation_rows = self.federation_sender.get_replication_rows( - federation, current_position, limit, federation_ack=federation_ack, - ) - upto_token = _position_from_rows(federation_rows, current_position) - writer.write_header_and_rows("federation", federation_rows, ( - "position", "type", "content", - ), position=upto_token) - - @defer.inlineCallbacks - def device_lists(self, writer, current_token, limit, request_streams): - current_position = current_token.device_lists - - device_lists = request_streams.get("device_lists") - - if device_lists is not None and device_lists != current_position: - changes = yield self.store.get_all_device_list_changes_for_remotes( - device_lists, current_position, - ) - writer.write_header_and_rows("device_lists", changes, ( - "position", "user_id", "destination", - ), position=current_position) - - -class _Writer(object): - """Writes the streams as a JSON object as the response to the request""" - def __init__(self): - self.streams = {} - self.total = 0 - - def write_header_and_rows(self, name, rows, fields, position=None): - if position is None: - if rows: - position = rows[-1][0] - else: - return - - self.streams[name] = { - "position": position if type(position) is int else str(position), - "field_names": fields, - "rows": rows, - } - - self.total += len(rows) - - def __nonzero__(self): - return bool(self.total) - - def finish(self): - return self.streams - - -class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( - "events", "presence", "typing", "receipts", "account_data", "backfill", - "push_rules", "pushers", "state", "caches", "to_device", "public_rooms", - "federation", "device_lists", -))): - __slots__ = [] - - def __new__(cls, *args): - if len(args) == 1: - streams = [int(value) for value in args[0].split("_")] - if len(streams) < len(cls._fields): - streams.extend([0] * (len(cls._fields) - len(streams))) - return cls(*streams) - else: - return super(_ReplicationToken, cls).__new__(cls, *args) - - def __str__(self): - return "_".join(str(value) for value in self) - - -def _position_from_rows(rows, current_position): - """Calculates a position to return for a stream. Ideally we want to return the - position of the last row, as that will be the most correct. However, if there - are no rows we fall back to using the current position to stop us from - repeatedly hitting the storage layer unncessarily thinking there are updates. - (Not all advances of the token correspond to an actual update) - - We can't just always return the current position, as we often limit the - number of rows we replicate, and so the stream may lag. The assumption is - that if the storage layer returns no new rows then we are not lagging and - we are at the `current_position`. - """ - if rows: - return rows[-1][0] - return current_position -- cgit 1.5.1 From 414522aed59aca9b711253aba98d15e2d59e14f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 15:30:02 +0100 Subject: Move get_interested_parties --- synapse/app/synchrotron.py | 5 ++-- synapse/handlers/presence.py | 69 ++++++++++++++++++++++---------------------- 2 files changed, 36 insertions(+), 38 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 7b6f82abdc..e3fbf02c9c 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -20,7 +20,7 @@ from synapse.api.constants import EventTypes from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.handlers.presence import PresenceHandler +from synapse.handlers.presence import PresenceHandler, get_interested_parties from synapse.http.site import SynapseSite from synapse.http.server import JsonResource from synapse.metrics.resource import MetricsResource, METRICS_PREFIX @@ -172,7 +172,6 @@ class SynchrotronPresence(object): get_states = PresenceHandler.get_states.__func__ get_state = PresenceHandler.get_state.__func__ - _get_interested_parties = PresenceHandler._get_interested_parties.__func__ current_state_for_users = PresenceHandler.current_state_for_users.__func__ def user_syncing(self, user_id, affect_presence): @@ -206,7 +205,7 @@ class SynchrotronPresence(object): @defer.inlineCallbacks def notify_from_replication(self, states, stream_id): - parties = yield self._get_interested_parties(states) + parties = yield get_interested_parties(self.store, states) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 98e736be5b..b9ce997a94 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -610,31 +610,6 @@ class PresenceHandler(object): defer.returnValue(states) - @defer.inlineCallbacks - def _get_interested_parties(self, states): - """Given a list of states return which entities (rooms, users, servers) - are interested in the given states. - - Returns: - 2-tuple: `(room_ids_to_states, users_to_states)`, - with each item being a dict of `entity_name` -> `[UserPresenceState]` - """ - room_ids_to_states = {} - users_to_states = {} - for state in states: - room_ids = yield self.store.get_rooms_for_user(state.user_id) - for room_id in room_ids: - room_ids_to_states.setdefault(room_id, []).append(state) - - plist = yield self.store.get_presence_list_observers_accepted(state.user_id) - for u in plist: - users_to_states.setdefault(u, []).append(state) - - # Always notify self - users_to_states.setdefault(state.user_id, []).append(state) - - defer.returnValue((room_ids_to_states, users_to_states)) - @defer.inlineCallbacks def _persist_and_notify(self, states): """Persist states in the database, poke the notifier and send to @@ -642,7 +617,7 @@ class PresenceHandler(object): """ stream_id, max_token = yield self.store.update_presence(states) - parties = yield self._get_interested_parties(states) + parties = yield get_interested_parties(self.store, states) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( @@ -654,7 +629,7 @@ class PresenceHandler(object): @defer.inlineCallbacks def notify_for_states(self, state, stream_id): - parties = yield self._get_interested_parties([state]) + parties = yield get_interested_parties(self.store, [state]) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( @@ -1316,6 +1291,36 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now): return new_state, persist_and_notify, federation_ping + +@defer.inlineCallbacks +def get_interested_parties(store, states): + """Given a list of states return which entities (rooms, users) + are interested in the given states. + + Args: + states (list(UserPresenceState)) + + Returns: + 2-tuple: `(room_ids_to_states, users_to_states)`, + with each item being a dict of `entity_name` -> `[UserPresenceState]` + """ + room_ids_to_states = {} + users_to_states = {} + for state in states: + room_ids = yield store.get_rooms_for_user(state.user_id) + for room_id in room_ids: + room_ids_to_states.setdefault(room_id, []).append(state) + + plist = yield store.get_presence_list_observers_accepted(state.user_id) + for u in plist: + users_to_states.setdefault(u, []).append(state) + + # Always notify self + users_to_states.setdefault(state.user_id, []).append(state) + + defer.returnValue((room_ids_to_states, users_to_states)) + + @defer.inlineCallbacks def get_interested_remotes(store, states): """Given a list of presence states figure out which remote servers @@ -1351,17 +1356,11 @@ def get_interested_remotes(store, states): users_to_states.setdefault(u, []).append(state) for room_id, states in room_ids_to_states.items(): - if not local_states: - continue - hosts = yield store.get_hosts_in_room(room_id) - hosts_and_states.append((hosts, local_states)) + hosts_and_states.append((hosts, states)) for user_id, states in users_to_states.items(): - if not local_states: - continue - host = get_domain_from_id(user_id) - hosts_and_states.append(([host], local_states)) + hosts_and_states.append(([host], states)) defer.returnValue(hosts_and_states) -- cgit 1.5.1 From 9c712a366fbbba90dbe18f7246c279d3cbb1cf10 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 16:07:33 +0100 Subject: Move get_presence_list_* to SlaveStore --- synapse/app/federation_sender.py | 15 ++------------- synapse/app/synchrotron.py | 12 +----------- synapse/replication/slave/storage/presence.py | 10 ++++++++++ 3 files changed, 13 insertions(+), 24 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 49efb602bc..e51a69074d 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -28,11 +28,11 @@ from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine -from synapse.storage.presence import PresenceStore from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -56,7 +56,7 @@ logger = logging.getLogger("synapse.app.appservice") class FederationSenderSlaveStore( SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, - SlavedRegistrationStore, SlavedDeviceStore, + SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore, ): def __init__(self, db_conn, hs): super(FederationSenderSlaveStore, self).__init__(db_conn, hs) @@ -81,17 +81,6 @@ class FederationSenderSlaveStore( return rows[0][0] if rows else -1 - # XXX: This is a bit broken because we don't persist the accepted list in a - # way that can be replicated. This means that we don't have a way to - # invalidate the cache correctly. - # This is fine since in practice nobody uses the presence list stuff... - get_presence_list_accepted = PresenceStore.__dict__[ - "get_presence_list_accepted" - ] - get_presence_list_observers_accepted = PresenceStore.__dict__[ - "get_presence_list_observers_accepted" - ] - class FederationSenderServer(HomeServer): def get_db_conn(self, run_new_connection=True): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index e3fbf02c9c..13c00ef2ba 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -44,7 +44,7 @@ from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine -from synapse.storage.presence import PresenceStore, UserPresenceState +from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -89,16 +89,6 @@ class SynchrotronSlavedStore( RoomMemberStore.__dict__["did_forget"] ) - # XXX: This is a bit broken because we don't persist the accepted list in a - # way that can be replicated. This means that we don't have a way to - # invalidate the cache correctly. - get_presence_list_accepted = PresenceStore.__dict__[ - "get_presence_list_accepted" - ] - get_presence_list_observers_accepted = PresenceStore.__dict__[ - "get_presence_list_observers_accepted" - ] - UPDATE_SYNCING_USERS_MS = 10 * 1000 diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index dffc80adc3..cfb9280181 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -39,6 +39,16 @@ class SlavedPresenceStore(BaseSlavedStore): _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"] get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"] + # XXX: This is a bit broken because we don't persist the accepted list in a + # way that can be replicated. This means that we don't have a way to + # invalidate the cache correctly. + get_presence_list_accepted = PresenceStore.__dict__[ + "get_presence_list_accepted" + ] + get_presence_list_observers_accepted = PresenceStore.__dict__[ + "get_presence_list_observers_accepted" + ] + def get_current_presence_token(self): return self._presence_id_gen.get_current_token() -- cgit 1.5.1 From c1f52a321d010abd6a5f24da18e871a5e4b9ce12 Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Thu, 13 Apr 2017 17:59:34 +0530 Subject: synctl.py: Check if synapse is already running --- synapse/app/synctl.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index 8223734845..3bd7ef7bba 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -234,6 +234,9 @@ def main(): if action == "start" or action == "restart": if start_stop_synapse: + # Check if synapse is already running + if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())): + abort("synapse.app.homeserver already running") start(configfile) for worker in workers: -- cgit 1.5.1 From 8e780b113d1223994bf77a5075e91ff56dd20f67 Mon Sep 17 00:00:00 2001 From: Matthew Wolff Date: Mon, 17 Apr 2017 00:44:05 -0500 Subject: web_server_root documentation fix Signed-off-by: Matthew Wolff --- README.rst | 7 +++++-- synapse/app/homeserver.py | 3 +-- synapse/config/server.py | 6 ++++++ 3 files changed, 12 insertions(+), 4 deletions(-) (limited to 'synapse/app') diff --git a/README.rst b/README.rst index 759197f5ff..8de7cd3f3b 100644 --- a/README.rst +++ b/README.rst @@ -335,8 +335,11 @@ ArchLinux --------- The quickest way to get up and running with ArchLinux is probably with the community package -https://www.archlinux.org/packages/community/any/matrix-synapse/, which should pull in all -the necessary dependencies. +https://www.archlinux.org/packages/community/any/matrix-synapse/, which should pull in most of +the necessary dependencies. If the default web client is to be served (enabled by default in +the generated config), +https://www.archlinux.org/packages/community/any/python2-matrix-angular-sdk/ will also need to +be installed. Alternatively, to install using pip a few changes may be needed as ArchLinux defaults to python 3, but synapse currently assumes python 2.7 by default: diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 6f5924d2c7..c9521a10fb 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -88,11 +88,10 @@ def build_resource_for_web_client(hs): "the location of the source to serve via the configuration\n" "option `web_client_location`\n\n" "To install the `matrix-angular-sdk` via pip, run:\n\n" - " pip install '%(dep)s'\n" + " pip install matrix-angular-sdk\n" "\n" "You can also disable hosting of the webclient via the\n" "configuration option `web_client`\n" - % {"dep": DEPENDENCY_LINKS["matrix-angular-sdk"]} ) syweb_path = os.path.dirname(syweb.__file__) webclient_path = os.path.join(syweb_path, "webclient") diff --git a/synapse/config/server.py b/synapse/config/server.py index 1f9999d57a..25e6666238 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -144,6 +144,12 @@ class ServerConfig(Config): # Whether to serve a web client from the HTTP/HTTPS root resource. web_client: True + # The root directory to server for the above web client. + # If left undefined, synapse will serve the matrix-angular-sdk web client. + # Make sure matrix-angular-sdk is installed with pip if web_client is True + # and web_client_location is undefined + # web_client_location: "/path/to/web/root" + # The public-facing base URL for the client API (not including _matrix/...) # public_baseurl: https://example.com:8448/ -- cgit 1.5.1 From 54f9a4cb59e358941b0b410633cd568b79f3e393 Mon Sep 17 00:00:00 2001 From: Matthew Wolff Date: Mon, 17 Apr 2017 01:38:27 -0500 Subject: Fixed travis build failure Signed-off-by: Matthew Wolff --- synapse/app/homeserver.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index c9521a10fb..56f97ea845 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -24,9 +24,7 @@ import sys import synapse.config.logger from synapse.config._base import ConfigError -from synapse.python_dependencies import ( - check_requirements, DEPENDENCY_LINKS -) +from synapse.python_dependencies import check_requirements from synapse.rest import ClientRestResource from synapse.storage.engines import create_engine, IncorrectDatabaseSetup -- cgit 1.5.1 From d4040e9e284b413ceecdbe927ff3be02a1aa9b70 Mon Sep 17 00:00:00 2001 From: Matthew Wolff Date: Tue, 18 Apr 2017 16:19:48 -0500 Subject: Queried CONDITIONAL_REQUIREMENTS --- synapse/app/homeserver.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 56f97ea845..3457402596 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -24,7 +24,9 @@ import sys import synapse.config.logger from synapse.config._base import ConfigError -from synapse.python_dependencies import check_requirements +from synapse.python_dependencies import ( + check_requirements, CONDITIONAL_REQUIREMENTS +) from synapse.rest import ClientRestResource from synapse.storage.engines import create_engine, IncorrectDatabaseSetup @@ -86,10 +88,11 @@ def build_resource_for_web_client(hs): "the location of the source to serve via the configuration\n" "option `web_client_location`\n\n" "To install the `matrix-angular-sdk` via pip, run:\n\n" - " pip install matrix-angular-sdk\n" + " pip install '%(dep)s'\n" "\n" "You can also disable hosting of the webclient via the\n" "configuration option `web_client`\n" + % {"dep": CONDITIONAL_REQUIREMENTS["web_client"].keys()[0]} ) syweb_path = os.path.dirname(syweb.__file__) webclient_path = os.path.join(syweb_path, "webclient") -- cgit 1.5.1