diff options
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/appservice.py | 58 | ||||
-rw-r--r-- | synapse/app/client_reader.py | 34 | ||||
-rw-r--r-- | synapse/app/federation_reader.py | 32 | ||||
-rw-r--r-- | synapse/app/federation_sender.py | 138 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 24 | ||||
-rw-r--r-- | synapse/app/media_repository.py | 34 | ||||
-rw-r--r-- | synapse/app/pusher.py | 133 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 350 | ||||
-rwxr-xr-x | synapse/app/synctl.py | 50 |
9 files changed, 426 insertions, 427 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 1900930053..9a476efa63 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -26,17 +26,17 @@ from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine -from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string from synapse import events -from twisted.internet import reactor, defer +from twisted.internet import reactor from twisted.web.resource import Resource from daemonize import Daemonize @@ -120,30 +120,25 @@ class AppserviceServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) - @defer.inlineCallbacks - def replicate(self): - http_client = self.get_simple_http_client() - store = self.get_datastore() - replication_url = self.config.worker_replication_url - appservice_handler = self.get_application_service_handler() - - @defer.inlineCallbacks - def replicate(results): - stream = results.get("events") - if stream: - max_stream_id = stream["position"] - yield appservice_handler.notify_interested_services(max_stream_id) - - while True: - try: - args = store.stream_positions() - args["timeout"] = 30000 - result = yield http_client.get_json(replication_url, args=args) - yield store.process_replication(result) - replicate(result) - except: - logger.exception("Error replicating from %r", replication_url) - yield sleep(30) + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return ASReplicationHandler(self) + + +class ASReplicationHandler(ReplicationClientHandler): + def __init__(self, hs): + super(ASReplicationHandler, self).__init__(hs.get_datastore()) + self.appservice_handler = hs.get_application_service_handler() + + def on_rdata(self, stream_name, token, rows): + super(ASReplicationHandler, self).on_rdata(stream_name, token, rows) + + if stream_name == "events": + max_stream_id = self.store.get_room_max_stream_ordering() + preserve_fn( + self.appservice_handler.notify_interested_services + )(max_stream_id) def start(config_options): @@ -157,7 +152,7 @@ def start(config_options): assert config.worker_app == "synapse.app.appservice" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -187,7 +182,11 @@ def start(config_options): ps.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: @@ -195,7 +194,6 @@ def start(config_options): reactor.run() def start(): - ps.replicate() ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 4d081eccd1..9b72c649ac 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -29,13 +29,14 @@ from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v1.room import PublicRoomListRestServlet from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine -from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -44,7 +45,7 @@ from synapse.crypto import context_factory from synapse import events -from twisted.internet import reactor, defer +from twisted.internet import reactor from twisted.web.resource import Resource from daemonize import Daemonize @@ -63,6 +64,7 @@ class ClientReaderSlavedStore( DirectoryStore, SlavedApplicationServiceStore, SlavedRegistrationStore, + TransactionStore, BaseSlavedStore, ClientIpStore, # After BaseSlavedStore because the constructor is different ): @@ -143,21 +145,10 @@ class ClientReaderServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) - @defer.inlineCallbacks - def replicate(self): - http_client = self.get_simple_http_client() - store = self.get_datastore() - replication_url = self.config.worker_replication_url + self.get_tcp_replication().start_replication(self) - while True: - try: - args = store.stream_positions() - args["timeout"] = 30000 - result = yield http_client.get_json(replication_url, args=args) - yield store.process_replication(result) - except: - logger.exception("Error replicating from %r", replication_url) - yield sleep(5) + def build_tcp_replication(self): + return ReplicationClientHandler(self.get_datastore()) def start(config_options): @@ -171,7 +162,7 @@ def start(config_options): assert config.worker_app == "synapse.app.client_reader" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -193,7 +184,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: @@ -203,7 +198,6 @@ def start(config_options): def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() - ss.replicate() reactor.callWhenRunning(start) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 90a4816753..eb392e1c9d 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -27,11 +27,11 @@ from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.directory import DirectoryStore +from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine -from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -42,7 +42,7 @@ from synapse.crypto import context_factory from synapse import events -from twisted.internet import reactor, defer +from twisted.internet import reactor from twisted.web.resource import Resource from daemonize import Daemonize @@ -134,21 +134,10 @@ class FederationReaderServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) - @defer.inlineCallbacks - def replicate(self): - http_client = self.get_simple_http_client() - store = self.get_datastore() - replication_url = self.config.worker_replication_url + self.get_tcp_replication().start_replication(self) - while True: - try: - args = store.stream_positions() - args["timeout"] = 30000 - result = yield http_client.get_json(replication_url, args=args) - yield store.process_replication(result) - except: - logger.exception("Error replicating from %r", replication_url) - yield sleep(5) + def build_tcp_replication(self): + return ReplicationClientHandler(self.get_datastore()) def start(config_options): @@ -162,7 +151,7 @@ def start(config_options): assert config.worker_app == "synapse.app.federation_reader" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -184,7 +173,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: @@ -194,7 +187,6 @@ def start(config_options): def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() - ss.replicate() reactor.callWhenRunning(start) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 411e47d98d..145c01f3a3 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -31,11 +31,12 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore +from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState -from synapse.util.async import sleep +from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -50,7 +51,6 @@ from daemonize import Daemonize import sys import logging import gc -import ujson as json logger = logging.getLogger("synapse.app.appservice") @@ -59,7 +59,28 @@ class FederationSenderSlaveStore( SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, SlavedRegistrationStore, SlavedDeviceStore, ): - pass + def __init__(self, db_conn, hs): + super(FederationSenderSlaveStore, self).__init__(db_conn, hs) + + # We pull out the current federation stream position now so that we + # always have a known value for the federation position in memory so + # that we don't have to bounce via a deferred once when we start the + # replication streams. + self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) + + def _get_federation_out_pos(self, db_conn): + sql = ( + "SELECT stream_id FROM federation_stream_position" + " WHERE type = ?" + ) + sql = self.database_engine.convert_param_style(sql) + + txn = db_conn.cursor() + txn.execute(sql, ("federation",)) + rows = txn.fetchall() + txn.close() + + return rows[0][0] if rows else -1 class FederationSenderServer(HomeServer): @@ -127,26 +148,27 @@ class FederationSenderServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) - @defer.inlineCallbacks - def replicate(self): - http_client = self.get_simple_http_client() - store = self.get_datastore() - replication_url = self.config.worker_replication_url - send_handler = FederationSenderHandler(self) - - send_handler.on_start() - - while True: - try: - args = store.stream_positions() - args.update((yield send_handler.stream_positions())) - args["timeout"] = 30000 - result = yield http_client.get_json(replication_url, args=args) - yield store.process_replication(result) - yield send_handler.process_replication(result) - except: - logger.exception("Error replicating from %r", replication_url) - yield sleep(30) + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return FederationSenderReplicationHandler(self) + + +class FederationSenderReplicationHandler(ReplicationClientHandler): + def __init__(self, hs): + super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore()) + self.send_handler = FederationSenderHandler(hs, self) + + def on_rdata(self, stream_name, token, rows): + super(FederationSenderReplicationHandler, self).on_rdata( + stream_name, token, rows + ) + self.send_handler.process_replication_rows(stream_name, token, rows) + + def get_streams_to_replicate(self): + args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate() + args.update(self.send_handler.stream_positions()) + return args def start(config_options): @@ -160,7 +182,7 @@ def start(config_options): assert config.worker_app == "synapse.app.federation_sender" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -193,7 +215,11 @@ def start(config_options): ps.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: @@ -201,7 +227,6 @@ def start(config_options): reactor.run() def start(): - ps.replicate() ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() @@ -225,9 +250,15 @@ class FederationSenderHandler(object): """Processes the replication stream and forwards the appropriate entries to the federation sender. """ - def __init__(self, hs): + def __init__(self, hs, replication_client): self.store = hs.get_datastore() self.federation_sender = hs.get_federation_sender() + self.replication_client = replication_client + + self.federation_position = self.store.federation_out_pos_startup + self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") + + self._last_ack = self.federation_position self._room_serials = {} self._room_typing = {} @@ -239,25 +270,13 @@ class FederationSenderHandler(object): self.store.get_room_max_stream_ordering() ) - @defer.inlineCallbacks def stream_positions(self): - stream_id = yield self.store.get_federation_out_pos("federation") - defer.returnValue({ - "federation": stream_id, - - # Ack stuff we've "processed", this should only be called from - # one process. - "federation_ack": stream_id, - }) + return {"federation": self.federation_position} - @defer.inlineCallbacks - def process_replication(self, result): + def process_replication_rows(self, stream_name, token, rows): # The federation stream contains things that we want to send out, e.g. # presence, typing, etc. - fed_stream = result.get("federation") - if fed_stream: - latest_id = int(fed_stream["position"]) - + if stream_name == "federation": # The federation stream containis a bunch of different types of # rows that need to be handled differently. We parse the rows, put # them into the appropriate collection and then send them off. @@ -268,9 +287,9 @@ class FederationSenderHandler(object): device_destinations = set() # Parse the rows in the stream - for row in fed_stream["rows"]: - position, typ, content_js = row - content = json.loads(content_js) + for row in rows: + typ = row.type + content = row.data if typ == send_queue.PRESENCE_TYPE: destination = content["destination"] @@ -321,16 +340,27 @@ class FederationSenderHandler(object): for destination in device_destinations: self.federation_sender.send_device_messages(destination) - # Record where we are in the stream. - yield self.store.update_federation_out_pos( - "federation", latest_id - ) + preserve_fn(self.update_token)(token) # We also need to poke the federation sender when new events happen - event_stream = result.get("events") - if event_stream: - latest_pos = event_stream["position"] - self.federation_sender.notify_new_events(latest_pos) + elif stream_name == "events": + self.federation_sender.notify_new_events(token) + + @defer.inlineCallbacks + def update_token(self, token): + self.federation_position = token + + # We linearize here to ensure we don't have races updating the token + with (yield self._fed_position_linearizer.queue(None)): + if self._last_ack < self.federation_position: + yield self.store.update_federation_out_pos( + "federation", self.federation_position + ) + + # We ACK this token over replication so that the master can drop + # its in memory queues + self.replication_client.send_federation_ack(self.federation_position) + self._last_ack = self.federation_position if __name__ == '__main__': diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e0b87468fe..990eb477e5 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -20,6 +20,8 @@ import gc import logging import os import sys + +import synapse.config.logger from synapse.config._base import ConfigError from synapse.python_dependencies import ( @@ -50,10 +52,11 @@ from synapse.api.urls import ( ) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.metrics import register_memory_metrics, get_metrics_for from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX +from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.federation.transport.server import TransportLayerServer from synapse.util.rlimit import change_resource_limit @@ -220,6 +223,16 @@ class SynapseHomeServer(HomeServer): ), interface=address ) + elif listener["type"] == "replication": + bind_addresses = listener["bind_addresses"] + for address in bind_addresses: + factory = ReplicationStreamProtocolFactory(self) + server_listener = reactor.listenTCP( + listener["port"], factory, interface=address + ) + reactor.addSystemEventTrigger( + "before", "shutdown", server_listener.stopListening, + ) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -286,7 +299,7 @@ def setup(config_options): # generating config files and shouldn't try to continue. sys.exit(0) - config.setup_logging() + synapse.config.logger.setup_logging(config, use_worker_options=False) # check any extra requirements we have now we have a config check_requirements(config) @@ -454,7 +467,12 @@ def run(hs): def in_thread(): # Uncomment to enable tracing of log context changes. # sys.settrace(logcontext_tracer) - with LoggingContext("run"): + + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): change_resource_limit(hs.config.soft_file_limit) if hs.config.gc_thresholds: gc.set_threshold(*hs.config.gc_thresholds) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index ef17b158a5..26c4416956 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -24,15 +24,16 @@ from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine from synapse.storage.media_repository import MediaRepositoryStore -from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -44,7 +45,7 @@ from synapse.crypto import context_factory from synapse import events -from twisted.internet import reactor, defer +from twisted.internet import reactor from twisted.web.resource import Resource from daemonize import Daemonize @@ -59,6 +60,7 @@ logger = logging.getLogger("synapse.app.media_repository") class MediaRepositorySlavedStore( SlavedApplicationServiceStore, SlavedRegistrationStore, + TransactionStore, BaseSlavedStore, MediaRepositoryStore, ClientIpStore, @@ -140,21 +142,10 @@ class MediaRepositoryServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) - @defer.inlineCallbacks - def replicate(self): - http_client = self.get_simple_http_client() - store = self.get_datastore() - replication_url = self.config.worker_replication_url + self.get_tcp_replication().start_replication(self) - while True: - try: - args = store.stream_positions() - args["timeout"] = 30000 - result = yield http_client.get_json(replication_url, args=args) - yield store.process_replication(result) - except: - logger.exception("Error replicating from %r", replication_url) - yield sleep(5) + def build_tcp_replication(self): + return ReplicationClientHandler(self.get_datastore()) def start(config_options): @@ -168,7 +159,7 @@ def start(config_options): assert config.worker_app == "synapse.app.media_repository" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -190,7 +181,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: @@ -200,7 +195,6 @@ def start(config_options): def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() - ss.replicate() reactor.callWhenRunning(start) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 073f2c2489..f9114acfcb 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -27,11 +27,12 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore +from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine from synapse.storage import DataStore -from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn, \ + PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -88,7 +89,6 @@ class PusherSlaveStore( class PusherServer(HomeServer): - def get_db_conn(self, run_new_connection=True): # Any param beginning with cp_ is a parameter for adbapi, and should # not be passed to the database engine. @@ -108,16 +108,7 @@ class PusherServer(HomeServer): logger.info("Finished setting up.") def remove_pusher(self, app_id, push_key, user_id): - http_client = self.get_simple_http_client() - replication_url = self.config.worker_replication_url - url = replication_url + "/remove_pushers" - return http_client.post_json_get_json(url, { - "remove": [{ - "app_id": app_id, - "push_key": push_key, - "user_id": user_id, - }] - }) + self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id) def _listen_http(self, listener_config): port = listener_config["port"] @@ -165,73 +156,52 @@ class PusherServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return PusherReplicationHandler(self) + + +class PusherReplicationHandler(ReplicationClientHandler): + def __init__(self, hs): + super(PusherReplicationHandler, self).__init__(hs.get_datastore()) + + self.pusher_pool = hs.get_pusherpool() + + def on_rdata(self, stream_name, token, rows): + super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) + preserve_fn(self.poke_pushers)(stream_name, token, rows) + @defer.inlineCallbacks - def replicate(self): - http_client = self.get_simple_http_client() - store = self.get_datastore() - replication_url = self.config.worker_replication_url - pusher_pool = self.get_pusherpool() - - def stop_pusher(user_id, app_id, pushkey): - key = "%s:%s" % (app_id, pushkey) - pushers_for_user = pusher_pool.pushers.get(user_id, {}) - pusher = pushers_for_user.pop(key, None) - if pusher is None: - return - logger.info("Stopping pusher %r / %r", user_id, key) - pusher.on_stop() - - def start_pusher(user_id, app_id, pushkey): - key = "%s:%s" % (app_id, pushkey) - logger.info("Starting pusher %r / %r", user_id, key) - return pusher_pool._refresh_pusher(app_id, pushkey, user_id) - - @defer.inlineCallbacks - def poke_pushers(results): - pushers_rows = set( - map(tuple, results.get("pushers", {}).get("rows", [])) + def poke_pushers(self, stream_name, token, rows): + if stream_name == "pushers": + for row in rows: + if row.deleted: + yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) + else: + yield self.start_pusher(row.user_id, row.app_id, row.pushkey) + elif stream_name == "events": + yield self.pusher_pool.on_new_notifications( + token, token, ) - deleted_pushers_rows = set( - map(tuple, results.get("deleted_pushers", {}).get("rows", [])) + elif stream_name == "receipts": + yield self.pusher_pool.on_new_receipts( + token, token, set(row.room_id for row in rows) ) - for row in sorted(pushers_rows | deleted_pushers_rows): - if row in deleted_pushers_rows: - user_id, app_id, pushkey = row[1:4] - stop_pusher(user_id, app_id, pushkey) - elif row in pushers_rows: - user_id = row[1] - app_id = row[5] - pushkey = row[8] - yield start_pusher(user_id, app_id, pushkey) - - stream = results.get("events") - if stream and stream["rows"]: - min_stream_id = stream["rows"][0][0] - max_stream_id = stream["position"] - preserve_fn(pusher_pool.on_new_notifications)( - min_stream_id, max_stream_id - ) - - stream = results.get("receipts") - if stream and stream["rows"]: - rows = stream["rows"] - affected_room_ids = set(row[1] for row in rows) - min_stream_id = rows[0][0] - max_stream_id = stream["position"] - preserve_fn(pusher_pool.on_new_receipts)( - min_stream_id, max_stream_id, affected_room_ids - ) - - while True: - try: - args = store.stream_positions() - args["timeout"] = 30000 - result = yield http_client.get_json(replication_url, args=args) - yield store.process_replication(result) - poke_pushers(result) - except: - logger.exception("Error replicating from %r", replication_url) - yield sleep(30) + + def stop_pusher(self, user_id, app_id, pushkey): + key = "%s:%s" % (app_id, pushkey) + pushers_for_user = self.pusher_pool.pushers.get(user_id, {}) + pusher = pushers_for_user.pop(key, None) + if pusher is None: + return + logger.info("Stopping pusher %r / %r", user_id, key) + pusher.on_stop() + + def start_pusher(self, user_id, app_id, pushkey): + key = "%s:%s" % (app_id, pushkey) + logger.info("Starting pusher %r / %r", user_id, key) + return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id) def start(config_options): @@ -245,7 +215,7 @@ def start(config_options): assert config.worker_app == "synapse.app.pusher" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -275,7 +245,11 @@ def start(config_options): ps.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: @@ -283,7 +257,6 @@ def start(config_options): reactor.run() def start(): - ps.replicate() ps.get_pusherpool().start() ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 3f29595256..d39e3161fe 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -16,11 +16,10 @@ import synapse -from synapse.api.constants import EventTypes, PresenceState +from synapse.api.constants import EventTypes from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.events import FrozenEvent from synapse.handlers.presence import PresenceHandler from synapse.http.site import SynapseSite from synapse.http.server import JsonResource @@ -41,14 +40,14 @@ from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine from synapse.storage.presence import PresenceStore, UserPresenceState from synapse.storage.roommember import RoomMemberStore -from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string @@ -63,7 +62,6 @@ import sys import logging import contextlib import gc -import ujson as json logger = logging.getLogger("synapse.app.synchrotron") @@ -107,11 +105,11 @@ UPDATE_SYNCING_USERS_MS = 10 * 1000 class SynchrotronPresence(object): def __init__(self, hs): + self.hs = hs self.is_mine_id = hs.is_mine_id self.http_client = hs.get_simple_http_client() self.store = hs.get_datastore() self.user_to_num_current_syncs = {} - self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users" self.clock = hs.get_clock() self.notifier = hs.get_notifier() @@ -121,17 +119,52 @@ class SynchrotronPresence(object): for state in active_presence } - self.process_id = random_string(16) - logger.info("Presence process_id is %r", self.process_id) + # user_id -> last_sync_ms. Lists the users that have stopped syncing + # but we haven't notified the master of that yet + self.users_going_offline = {} - self._sending_sync = False - self._need_to_send_sync = False - self.clock.looping_call( - self._send_syncing_users_regularly, - UPDATE_SYNCING_USERS_MS, + self._send_stop_syncing_loop = self.clock.looping_call( + self.send_stop_syncing, 10 * 1000 ) - reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown) + self.process_id = random_string(16) + logger.info("Presence process_id is %r", self.process_id) + + def send_user_sync(self, user_id, is_syncing, last_sync_ms): + self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms) + + def mark_as_coming_online(self, user_id): + """A user has started syncing. Send a UserSync to the master, unless they + had recently stopped syncing. + + Args: + user_id (str) + """ + going_offline = self.users_going_offline.pop(user_id, None) + if not going_offline: + # Safe to skip because we haven't yet told the master they were offline + self.send_user_sync(user_id, True, self.clock.time_msec()) + + def mark_as_going_offline(self, user_id): + """A user has stopped syncing. We wait before notifying the master as + its likely they'll come back soon. This allows us to avoid sending + a stopped syncing immediately followed by a started syncing notification + to the master + + Args: + user_id (str) + """ + self.users_going_offline[user_id] = self.clock.time_msec() + + def send_stop_syncing(self): + """Check if there are any users who have stopped syncing a while ago + and haven't come back yet. If there are poke the master about them. + """ + now = self.clock.time_msec() + for user_id, last_sync_ms in self.users_going_offline.items(): + if now - last_sync_ms > 10 * 1000: + self.users_going_offline.pop(user_id, None) + self.send_user_sync(user_id, False, last_sync_ms) def set_state(self, user, state, ignore_status_msg=False): # TODO Hows this supposed to work? @@ -142,15 +175,14 @@ class SynchrotronPresence(object): _get_interested_parties = PresenceHandler._get_interested_parties.__func__ current_state_for_users = PresenceHandler.current_state_for_users.__func__ - @defer.inlineCallbacks def user_syncing(self, user_id, affect_presence): if affect_presence: curr_sync = self.user_to_num_current_syncs.get(user_id, 0) self.user_to_num_current_syncs[user_id] = curr_sync + 1 - prev_states = yield self.current_state_for_users([user_id]) - if prev_states[user_id].state == PresenceState.OFFLINE: - # TODO: Don't block the sync request on this HTTP hit. - yield self._send_syncing_users_now() + + # If we went from no in flight sync to some, notify replication + if self.user_to_num_current_syncs[user_id] == 1: + self.mark_as_coming_online(user_id) def _end(): # We check that the user_id is in user_to_num_current_syncs because @@ -159,6 +191,10 @@ class SynchrotronPresence(object): if affect_presence and user_id in self.user_to_num_current_syncs: self.user_to_num_current_syncs[user_id] -= 1 + # If we went from one in flight sync to non, notify replication + if self.user_to_num_current_syncs[user_id] == 0: + self.mark_as_going_offline(user_id) + @contextlib.contextmanager def _user_syncing(): try: @@ -166,49 +202,7 @@ class SynchrotronPresence(object): finally: _end() - defer.returnValue(_user_syncing()) - - @defer.inlineCallbacks - def _on_shutdown(self): - # When the synchrotron is shutdown tell the master to clear the in - # progress syncs for this process - self.user_to_num_current_syncs.clear() - yield self._send_syncing_users_now() - - def _send_syncing_users_regularly(self): - # Only send an update if we aren't in the middle of sending one. - if not self._sending_sync: - preserve_fn(self._send_syncing_users_now)() - - @defer.inlineCallbacks - def _send_syncing_users_now(self): - if self._sending_sync: - # We don't want to race with sending another update. - # Instead we wait for that update to finish and send another - # update afterwards. - self._need_to_send_sync = True - return - - # Flag that we are sending an update. - self._sending_sync = True - - yield self.http_client.post_json_get_json(self.syncing_users_url, { - "process_id": self.process_id, - "syncing_users": [ - user_id for user_id, count in self.user_to_num_current_syncs.items() - if count > 0 - ], - }) - - # Unset the flag as we are no longer sending an update. - self._sending_sync = False - if self._need_to_send_sync: - # If something happened while we were sending the update then - # we might need to send another update. - # TODO: Check if the update that was sent matches the current state - # as we only need to send an update if they are different. - self._need_to_send_sync = False - yield self._send_syncing_users_now() + return defer.succeed(_user_syncing()) @defer.inlineCallbacks def notify_from_replication(self, states, stream_id): @@ -223,26 +217,24 @@ class SynchrotronPresence(object): ) @defer.inlineCallbacks - def process_replication(self, result): - stream = result.get("presence", {"rows": []}) - states = [] - for row in stream["rows"]: - ( - position, user_id, state, last_active_ts, - last_federation_update_ts, last_user_sync_ts, status_msg, - currently_active - ) = row - state = UserPresenceState( - user_id, state, last_active_ts, - last_federation_update_ts, last_user_sync_ts, status_msg, - currently_active - ) - self.user_to_current_state[user_id] = state - states.append(state) + def process_replication_rows(self, token, rows): + states = [UserPresenceState( + row.user_id, row.state, row.last_active_ts, + row.last_federation_update_ts, row.last_user_sync_ts, row.status_msg, + row.currently_active + ) for row in rows] + + for state in states: + self.user_to_current_state[row.user_id] = state - if states and "position" in stream: - stream_id = int(stream["position"]) - yield self.notify_from_replication(states, stream_id) + stream_id = token + yield self.notify_from_replication(states, stream_id) + + def get_currently_syncing_users(self): + return [ + user_id for user_id, count in self.user_to_num_current_syncs.iteritems() + if count > 0 + ] class SynchrotronTyping(object): @@ -257,16 +249,12 @@ class SynchrotronTyping(object): # value which we *must* use for the next replication request. return {"typing": self._latest_room_serial} - def process_replication(self, result): - stream = result.get("typing") - if stream: - self._latest_room_serial = int(stream["position"]) + def process_replication_rows(self, token, rows): + self._latest_room_serial = token - for row in stream["rows"]: - position, room_id, typing_json = row - typing = json.loads(typing_json) - self._room_serials[room_id] = position - self._room_typing[room_id] = typing + for row in rows: + self._room_serials[row.room_id] = token + self._room_typing[row.room_id] = row.user_ids class SynchrotronApplicationService(object): @@ -351,120 +339,89 @@ class SynchrotronServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) - @defer.inlineCallbacks - def replicate(self): - http_client = self.get_simple_http_client() - store = self.get_datastore() - replication_url = self.config.worker_replication_url - notifier = self.get_notifier() - presence_handler = self.get_presence_handler() - typing_handler = self.get_typing_handler() - - def notify_from_stream( - result, stream_name, stream_key, room=None, user=None - ): - stream = result.get(stream_name) - if stream: - position_index = stream["field_names"].index("position") - if room: - room_index = stream["field_names"].index(room) - if user: - user_index = stream["field_names"].index(user) - - users = () - rooms = () - for row in stream["rows"]: - position = row[position_index] - - if user: - users = (row[user_index],) - - if room: - rooms = (row[room_index],) - - notifier.on_new_event( - stream_key, position, users=users, rooms=rooms - ) + self.get_tcp_replication().start_replication(self) - @defer.inlineCallbacks - def notify_device_list_update(result): - stream = result.get("device_lists") - if not stream: - return + def build_tcp_replication(self): + return SyncReplicationHandler(self) - position_index = stream["field_names"].index("position") - user_index = stream["field_names"].index("user_id") + def build_presence_handler(self): + return SynchrotronPresence(self) - for row in stream["rows"]: - position = row[position_index] - user_id = row[user_index] + def build_typing_handler(self): + return SynchrotronTyping(self) - rooms = yield store.get_rooms_for_user(user_id) - room_ids = [r.room_id for r in rooms] - notifier.on_new_event( - "device_list_key", position, rooms=room_ids, - ) +class SyncReplicationHandler(ReplicationClientHandler): + def __init__(self, hs): + super(SyncReplicationHandler, self).__init__(hs.get_datastore()) - @defer.inlineCallbacks - def notify(result): - stream = result.get("events") - if stream: - max_position = stream["position"] - for row in stream["rows"]: - position = row[0] - internal = json.loads(row[1]) - event_json = json.loads(row[2]) - event = FrozenEvent(event_json, internal_metadata_dict=internal) - extra_users = () - if event.type == EventTypes.Member: - extra_users = (event.state_key,) - notifier.on_new_room_event( - event, position, max_position, extra_users - ) + self.store = hs.get_datastore() + self.typing_handler = hs.get_typing_handler() + self.presence_handler = hs.get_presence_handler() + self.notifier = hs.get_notifier() - notify_from_stream( - result, "push_rules", "push_rules_key", user="user_id" - ) - notify_from_stream( - result, "user_account_data", "account_data_key", user="user_id" - ) - notify_from_stream( - result, "room_account_data", "account_data_key", user="user_id" + self.presence_handler.sync_callback = self.send_user_sync + + def on_rdata(self, stream_name, token, rows): + super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) + + preserve_fn(self.process_and_notify)(stream_name, token, rows) + + def get_streams_to_replicate(self): + args = super(SyncReplicationHandler, self).get_streams_to_replicate() + args.update(self.typing_handler.stream_positions()) + return args + + def get_currently_syncing_users(self): + return self.presence_handler.get_currently_syncing_users() + + @defer.inlineCallbacks + def process_and_notify(self, stream_name, token, rows): + if stream_name == "events": + # We shouldn't get multiple rows per token for events stream, so + # we don't need to optimise this for multiple rows. + for row in rows: + event = yield self.store.get_event(row.event_id) + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + max_token = self.store.get_room_max_stream_ordering() + self.notifier.on_new_room_event( + event, token, max_token, extra_users + ) + elif stream_name == "push_rules": + self.notifier.on_new_event( + "push_rules_key", token, users=[row.user_id for row in rows], ) - notify_from_stream( - result, "tag_account_data", "account_data_key", user="user_id" + elif stream_name in ("account_data", "tag_account_data",): + self.notifier.on_new_event( + "account_data_key", token, users=[row.user_id for row in rows], ) - notify_from_stream( - result, "receipts", "receipt_key", room="room_id" + elif stream_name == "receipts": + self.notifier.on_new_event( + "receipt_key", token, rooms=[row.room_id for row in rows], ) - notify_from_stream( - result, "typing", "typing_key", room="room_id" + elif stream_name == "typing": + self.typing_handler.process_replication_rows(token, rows) + self.notifier.on_new_event( + "typing_key", token, rooms=[row.room_id for row in rows], ) - notify_from_stream( - result, "to_device", "to_device_key", user="user_id" + elif stream_name == "to_device": + entities = [row.entity for row in rows if row.entity.startswith("@")] + if entities: + self.notifier.on_new_event( + "to_device_key", token, users=entities, + ) + elif stream_name == "device_lists": + all_room_ids = set() + for row in rows: + room_ids = yield self.store.get_rooms_for_user(row.user_id) + all_room_ids.update(room_ids) + self.notifier.on_new_event( + "device_list_key", token, rooms=all_room_ids, ) - yield notify_device_list_update(result) - - while True: - try: - args = store.stream_positions() - args.update(typing_handler.stream_positions()) - args["timeout"] = 30000 - result = yield http_client.get_json(replication_url, args=args) - yield store.process_replication(result) - typing_handler.process_replication(result) - yield presence_handler.process_replication(result) - yield notify(result) - except: - logger.exception("Error replicating from %r", replication_url) - yield sleep(5) - - def build_presence_handler(self): - return SynchrotronPresence(self) - - def build_typing_handler(self): - return SynchrotronTyping(self) + elif stream_name == "presence": + yield self.presence_handler.process_replication_rows(token, rows) def start(config_options): @@ -478,7 +435,7 @@ def start(config_options): assert config.worker_app == "synapse.app.synchrotron" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -497,7 +454,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: @@ -506,7 +467,6 @@ def start(config_options): def start(): ss.get_datastore().start_profiling() - ss.replicate() ss.get_state_handler().start_caching() reactor.callWhenRunning(start) diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index e84045c7d1..8223734845 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -23,14 +23,27 @@ import signal import subprocess import sys import yaml +import errno +import time SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"] GREEN = "\x1b[1;32m" +YELLOW = "\x1b[1;33m" RED = "\x1b[1;31m" NORMAL = "\x1b[m" +def pid_running(pid): + try: + os.kill(pid, 0) + return True + except OSError, err: + if err.errno == errno.EPERM: + return True + return False + + def write(message, colour=NORMAL, stream=sys.stdout): if colour == NORMAL: stream.write(message + "\n") @@ -38,6 +51,11 @@ def write(message, colour=NORMAL, stream=sys.stdout): stream.write(colour + message + NORMAL + "\n") +def abort(message, colour=RED, stream=sys.stderr): + write(message, colour, stream) + sys.exit(1) + + def start(configfile): write("Starting ...") args = SYNAPSE @@ -45,7 +63,8 @@ def start(configfile): try: subprocess.check_call(args) - write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN) + write("started synapse.app.homeserver(%r)" % + (configfile,), colour=GREEN) except subprocess.CalledProcessError as e: write( "error starting (exit code: %d); see above for logs" % e.returncode, @@ -76,8 +95,16 @@ def start_worker(app, configfile, worker_configfile): def stop(pidfile, app): if os.path.exists(pidfile): pid = int(open(pidfile).read()) - os.kill(pid, signal.SIGTERM) - write("stopped %s" % (app,), colour=GREEN) + try: + os.kill(pid, signal.SIGTERM) + write("stopped %s" % (app,), colour=GREEN) + except OSError, err: + if err.errno == errno.ESRCH: + write("%s not running" % (app,), colour=YELLOW) + elif err.errno == errno.EPERM: + abort("Cannot stop %s: Operation not permitted" % (app,)) + else: + abort("Cannot stop %s: Unknown error" % (app,)) Worker = collections.namedtuple("Worker", [ @@ -175,7 +202,8 @@ def main(): worker_app = worker_config["worker_app"] worker_pidfile = worker_config["worker_pid_file"] worker_daemonize = worker_config["worker_daemonize"] - assert worker_daemonize # TODO print something more user friendly + assert worker_daemonize, "In config %r: expected '%s' to be True" % ( + worker_configfile, "worker_daemonize") worker_cache_factor = worker_config.get("synctl_cache_factor") workers.append(Worker( worker_app, worker_configfile, worker_pidfile, worker_cache_factor, @@ -190,7 +218,19 @@ def main(): if start_stop_synapse: stop(pidfile, "synapse.app.homeserver") - # TODO: Wait for synapse to actually shutdown before starting it again + # Wait for synapse to actually shutdown before starting it again + if action == "restart": + running_pids = [] + if start_stop_synapse and os.path.exists(pidfile): + running_pids.append(int(open(pidfile).read())) + for worker in workers: + if os.path.exists(worker.pidfile): + running_pids.append(int(open(worker.pidfile).read())) + if len(running_pids) > 0: + write("Waiting for process to exit before restarting...") + for running_pid in running_pids: + while pid_running(running_pid): + time.sleep(0.2) if action == "start" or action == "restart": if start_stop_synapse: |