diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/app/_base.py | 12 | ||||
-rw-r--r-- | synapse/app/admin_cmd.py | 4 | ||||
-rw-r--r-- | synapse/app/appservice.py | 4 | ||||
-rw-r--r-- | synapse/app/client_reader.py | 6 | ||||
-rw-r--r-- | synapse/app/event_creator.py | 4 | ||||
-rw-r--r-- | synapse/app/federation_reader.py | 4 | ||||
-rw-r--r-- | synapse/app/federation_sender.py | 4 | ||||
-rw-r--r-- | synapse/app/frontend_proxy.py | 12 | ||||
-rw-r--r-- | synapse/app/homeserver.py | 10 | ||||
-rw-r--r-- | synapse/app/media_repository.py | 4 | ||||
-rw-r--r-- | synapse/app/pusher.py | 4 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 4 | ||||
-rw-r--r-- | synapse/app/user_dir.py | 4 | ||||
-rw-r--r-- | synapse/appservice/api.py | 3 | ||||
-rw-r--r-- | synapse/appservice/scheduler.py | 153 |
15 files changed, 130 insertions, 102 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 69dcf3523f..c30fdeee9a 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -36,18 +36,20 @@ from synapse.util.versionstring import get_version_string logger = logging.getLogger(__name__) +# list of tuples of function, args list, kwargs dict _sighup_callbacks = [] -def register_sighup(func): +def register_sighup(func, *args, **kwargs): """ Register a function to be called when a SIGHUP occurs. Args: func (function): Function to be called when sent a SIGHUP signal. - Will be called with a single argument, the homeserver. + Will be called with a single default argument, the homeserver. + *args, **kwargs: args and kwargs to be passed to the target function. """ - _sighup_callbacks.append(func) + _sighup_callbacks.append((func, args, kwargs)) def start_worker_reactor(appname, config, run_command=reactor.run): @@ -248,8 +250,8 @@ def start(hs, listeners=None): # we're not using systemd. sdnotify(b"RELOADING=1") - for i in _sighup_callbacks: - i(hs) + for i, args, kwargs in _sighup_callbacks: + i(hs, *args, **kwargs) sdnotify(b"READY=1") diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index 1fd52a5526..04751a6a5e 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -227,8 +227,6 @@ def start(config_options): config.start_pushers = False config.send_federation = False - setup_logging(config, use_worker_options=True) - synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -241,6 +239,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() # We use task.react as the basic run command as it correctly handles tearing diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 54bb114dec..767b87d2db 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -141,8 +141,6 @@ def start(config_options): assert config.worker_app == "synapse.app.appservice" - setup_logging(config, use_worker_options=True) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -167,6 +165,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ps, config, use_worker_options=True) + ps.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ps, config.worker_listeners diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 721bb5b119..dbcc414c42 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -119,7 +119,7 @@ class ClientReaderServer(HomeServer): KeyChangesServlet(self).register(resource) VoipRestServlet(self).register(resource) PushRuleRestServlet(self).register(resource) - VersionsRestServlet().register(resource) + VersionsRestServlet(self).register(resource) resources.update({"/_matrix/client": resource}) @@ -179,8 +179,6 @@ def start(config_options): assert config.worker_app == "synapse.app.client_reader" - setup_logging(config, use_worker_options=True) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -193,6 +191,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ss, config.worker_listeners diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index 473c8895d0..c67fe69a50 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -175,8 +175,6 @@ def start(config_options): assert config.worker_replication_http_port is not None - setup_logging(config, use_worker_options=True) - # This should only be done on the user directory worker or the master config.update_user_directory = False @@ -192,6 +190,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ss, config.worker_listeners diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 5255d9e8cc..1ef027a88c 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -160,8 +160,6 @@ def start(config_options): assert config.worker_app == "synapse.app.federation_reader" - setup_logging(config, use_worker_options=True) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -174,6 +172,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ss, config.worker_listeners diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index c5a2880e69..04fbb407af 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -171,8 +171,6 @@ def start(config_options): assert config.worker_app == "synapse.app.federation_sender" - setup_logging(config, use_worker_options=True) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -197,6 +195,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ss, config.worker_listeners diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index e2822ca848..9504bfbc70 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -70,12 +70,12 @@ class PresenceStatusStubServlet(RestServlet): except HttpResponseException as e: raise e.to_synapse_error() - return (200, result) + return 200, result @defer.inlineCallbacks def on_PUT(self, request, user_id): yield self.auth.get_user_by_req(request) - return (200, {}) + return 200, {} class KeyUploadServlet(RestServlet): @@ -126,11 +126,11 @@ class KeyUploadServlet(RestServlet): self.main_uri + request.uri.decode("ascii"), body, headers=headers ) - return (200, result) + return 200, result else: # Just interested in counts. result = yield self.store.count_e2e_one_time_keys(user_id, device_id) - return (200, {"one_time_key_counts": result}) + return 200, {"one_time_key_counts": result} class FrontendProxySlavedStore( @@ -232,8 +232,6 @@ def start(config_options): assert config.worker_main_http_uri is not None - setup_logging(config, use_worker_options=True) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -246,6 +244,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ss, config.worker_listeners diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 8233905844..774326dff9 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -341,8 +341,6 @@ def setup(config_options): # generating config files and shouldn't try to continue. sys.exit(0) - synapse.config.logger.setup_logging(config, use_worker_options=False) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -356,6 +354,8 @@ def setup(config_options): database_engine=database_engine, ) + synapse.config.logger.setup_logging(hs, config, use_worker_options=False) + logger.info("Preparing database: %s...", config.database_config["name"]) try: @@ -561,10 +561,12 @@ def run(hs): stats["database_engine"] = hs.get_datastore().database_engine_name stats["database_server_version"] = hs.get_datastore().get_server_version() - logger.info("Reporting stats to matrix.org: %s" % (stats,)) + logger.info( + "Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats) + ) try: yield hs.get_simple_http_client().put_json( - "https://matrix.org/report-usage-stats/push", stats + hs.config.report_stats_endpoint, stats ) except Exception as e: logger.warn("Error reporting stats: %s", e) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 3a168577c7..2ac783ffa3 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -155,8 +155,6 @@ def start(config_options): "Please add ``enable_media_repo: false`` to the main config\n" ) - setup_logging(config, use_worker_options=True) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -169,6 +167,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ss, config.worker_listeners diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 692ffa2f04..d84732ee3c 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -184,8 +184,6 @@ def start(config_options): assert config.worker_app == "synapse.app.pusher" - setup_logging(config, use_worker_options=True) - events.USE_FROZEN_DICTS = config.use_frozen_dicts if config.start_pushers: @@ -210,6 +208,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ps, config, use_worker_options=True) + ps.setup() def start(): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index a1c3b162f7..473026fce5 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -435,8 +435,6 @@ def start(config_options): assert config.worker_app == "synapse.app.synchrotron" - setup_logging(config, use_worker_options=True) - synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -450,6 +448,8 @@ def start(config_options): application_service_handler=SynchrotronApplicationService(), ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ss, config.worker_listeners diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index cb29a1afab..e01afb39f2 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -197,8 +197,6 @@ def start(config_options): assert config.worker_app == "synapse.app.user_dir" - setup_logging(config, use_worker_options=True) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -223,6 +221,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ss, config.worker_listeners diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 007ca75a94..3e25bf5747 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -107,7 +107,6 @@ class ApplicationServiceApi(SimpleHttpClient): except CodeMessageException as e: if e.code == 404: return False - return logger.warning("query_user to %s received %s", uri, e.code) except Exception as ex: logger.warning("query_user to %s threw exception %s", uri, ex) @@ -127,7 +126,6 @@ class ApplicationServiceApi(SimpleHttpClient): logger.warning("query_alias to %s received %s", uri, e.code) if e.code == 404: return False - return except Exception as ex: logger.warning("query_alias to %s threw exception %s", uri, ex) return False @@ -230,7 +228,6 @@ class ApplicationServiceApi(SimpleHttpClient): sent_transactions_counter.labels(service.id).inc() sent_events_counter.labels(service.id).inc(len(events)) return True - return except CodeMessageException as e: logger.warning("push_bulk to %s received %s", uri, e.code) except Exception as ex: diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 42a350bff8..9998f822f1 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -70,35 +70,37 @@ class ApplicationServiceScheduler(object): self.store = hs.get_datastore() self.as_api = hs.get_application_service_api() - def create_recoverer(service, callback): - return _Recoverer(self.clock, self.store, self.as_api, service, callback) - - self.txn_ctrl = _TransactionController( - self.clock, self.store, self.as_api, create_recoverer - ) + self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api) self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock) @defer.inlineCallbacks def start(self): logger.info("Starting appservice scheduler") + # check for any DOWN ASes and start recoverers for them. - recoverers = yield _Recoverer.start( - self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered + services = yield self.store.get_appservices_by_state( + ApplicationServiceState.DOWN ) - self.txn_ctrl.add_recoverers(recoverers) + + for service in services: + self.txn_ctrl.start_recoverer(service) def submit_event_for_as(self, service, event): self.queuer.enqueue(service, event) class _ServiceQueuer(object): - """Queues events for the same application service together, sending - transactions as soon as possible. Once a transaction is sent successfully, - this schedules any other events in the queue to run. + """Queue of events waiting to be sent to appservices. + + Groups events into transactions per-appservice, and sends them on to the + TransactionController. Makes sure that we only have one transaction in flight per + appservice at a given time. """ def __init__(self, txn_ctrl, clock): self.queued_events = {} # dict of {service_id: [events]} + + # the appservices which currently have a transaction in flight self.requests_in_flight = set() self.txn_ctrl = txn_ctrl self.clock = clock @@ -136,13 +138,29 @@ class _ServiceQueuer(object): class _TransactionController(object): - def __init__(self, clock, store, as_api, recoverer_fn): + """Transaction manager. + + Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer + if a transaction fails. + + (Note we have only have one of these in the homeserver.) + + Args: + clock (synapse.util.Clock): + store (synapse.storage.DataStore): + as_api (synapse.appservice.api.ApplicationServiceApi): + """ + + def __init__(self, clock, store, as_api): self.clock = clock self.store = store self.as_api = as_api - self.recoverer_fn = recoverer_fn - # keep track of how many recoverers there are - self.recoverers = [] + + # map from service id to recoverer instance + self.recoverers = {} + + # for UTs + self.RECOVERER_CLASS = _Recoverer @defer.inlineCallbacks def send(self, service, events): @@ -154,42 +172,45 @@ class _TransactionController(object): if sent: yield txn.complete(self.store) else: - run_in_background(self._start_recoverer, service) + run_in_background(self._on_txn_fail, service) except Exception: logger.exception("Error creating appservice transaction") - run_in_background(self._start_recoverer, service) + run_in_background(self._on_txn_fail, service) @defer.inlineCallbacks def on_recovered(self, recoverer): - self.recoverers.remove(recoverer) logger.info( "Successfully recovered application service AS ID %s", recoverer.service.id ) + self.recoverers.pop(recoverer.service.id) logger.info("Remaining active recoverers: %s", len(self.recoverers)) yield self.store.set_appservice_state( recoverer.service, ApplicationServiceState.UP ) - def add_recoverers(self, recoverers): - for r in recoverers: - self.recoverers.append(r) - if len(recoverers) > 0: - logger.info("New active recoverers: %s", len(self.recoverers)) - @defer.inlineCallbacks - def _start_recoverer(self, service): + def _on_txn_fail(self, service): try: yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN) - logger.info( - "Application service falling behind. Starting recoverer. AS ID %s", - service.id, - ) - recoverer = self.recoverer_fn(service, self.on_recovered) - self.add_recoverers([recoverer]) - recoverer.recover() + self.start_recoverer(service) except Exception: logger.exception("Error starting AS recoverer") + def start_recoverer(self, service): + """Start a Recoverer for the given service + + Args: + service (synapse.appservice.ApplicationService): + """ + logger.info("Starting recoverer for AS ID %s", service.id) + assert service.id not in self.recoverers + recoverer = self.RECOVERER_CLASS( + self.clock, self.store, self.as_api, service, self.on_recovered + ) + self.recoverers[service.id] = recoverer + recoverer.recover() + logger.info("Now %i active recoverers", len(self.recoverers)) + @defer.inlineCallbacks def _is_service_up(self, service): state = yield self.store.get_appservice_state(service) @@ -197,18 +218,17 @@ class _TransactionController(object): class _Recoverer(object): - @staticmethod - @defer.inlineCallbacks - def start(clock, store, as_api, callback): - services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN) - recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services] - for r in recoverers: - logger.info( - "Starting recoverer for AS ID %s which was marked as " "DOWN", - r.service.id, - ) - r.recover() - return recoverers + """Manages retries and backoff for a DOWN appservice. + + We have one of these for each appservice which is currently considered DOWN. + + Args: + clock (synapse.util.Clock): + store (synapse.storage.DataStore): + as_api (synapse.appservice.api.ApplicationServiceApi): + service (synapse.appservice.ApplicationService): the service we are managing + callback (callable[_Recoverer]): called once the service recovers. + """ def __init__(self, clock, store, as_api, service, callback): self.clock = clock @@ -224,7 +244,9 @@ class _Recoverer(object): "as-recoverer-%s" % (self.service.id,), self.retry ) - self.clock.call_later((2 ** self.backoff_counter), _retry) + delay = 2 ** self.backoff_counter + logger.info("Scheduling retries on %s in %fs", self.service.id, delay) + self.clock.call_later(delay, _retry) def _backoff(self): # cap the backoff to be around 8.5min => (2^9) = 512 secs @@ -234,25 +256,30 @@ class _Recoverer(object): @defer.inlineCallbacks def retry(self): + logger.info("Starting retries on %s", self.service.id) try: - txn = yield self.store.get_oldest_unsent_txn(self.service) - if txn: + while True: + txn = yield self.store.get_oldest_unsent_txn(self.service) + if not txn: + # nothing left: we're done! + self.callback(self) + return + logger.info( "Retrying transaction %s for AS ID %s", txn.id, txn.service.id ) sent = yield txn.send(self.as_api) - if sent: - yield txn.complete(self.store) - # reset the backoff counter and retry immediately - self.backoff_counter = 1 - yield self.retry() - else: - self._backoff() - else: - self._set_service_recovered() - except Exception as e: - logger.exception(e) - self._backoff() - - def _set_service_recovered(self): - self.callback(self) + if not sent: + break + + yield txn.complete(self.store) + + # reset the backoff counter and then process the next transaction + self.backoff_counter = 1 + + except Exception: + logger.exception("Unexpected error running retries") + + # we didn't manage to send all of the transactions before we got an error of + # some flavour: reschedule the next retry. + self._backoff() |