summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/app/_base.py12
-rw-r--r--synapse/app/admin_cmd.py4
-rw-r--r--synapse/app/appservice.py4
-rw-r--r--synapse/app/client_reader.py6
-rw-r--r--synapse/app/event_creator.py4
-rw-r--r--synapse/app/federation_reader.py4
-rw-r--r--synapse/app/federation_sender.py4
-rw-r--r--synapse/app/frontend_proxy.py12
-rw-r--r--synapse/app/homeserver.py10
-rw-r--r--synapse/app/media_repository.py4
-rw-r--r--synapse/app/pusher.py4
-rw-r--r--synapse/app/synchrotron.py4
-rw-r--r--synapse/app/user_dir.py4
-rw-r--r--synapse/appservice/api.py3
-rw-r--r--synapse/appservice/scheduler.py153
15 files changed, 130 insertions, 102 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 69dcf3523f..c30fdeee9a 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -36,18 +36,20 @@ from synapse.util.versionstring import get_version_string
 
 logger = logging.getLogger(__name__)
 
+# list of tuples of function, args list, kwargs dict
 _sighup_callbacks = []
 
 
-def register_sighup(func):
+def register_sighup(func, *args, **kwargs):
     """
     Register a function to be called when a SIGHUP occurs.
 
     Args:
         func (function): Function to be called when sent a SIGHUP signal.
-            Will be called with a single argument, the homeserver.
+            Will be called with a single default argument, the homeserver.
+        *args, **kwargs: args and kwargs to be passed to the target function.
     """
-    _sighup_callbacks.append(func)
+    _sighup_callbacks.append((func, args, kwargs))
 
 
 def start_worker_reactor(appname, config, run_command=reactor.run):
@@ -248,8 +250,8 @@ def start(hs, listeners=None):
                 # we're not using systemd.
                 sdnotify(b"RELOADING=1")
 
-                for i in _sighup_callbacks:
-                    i(hs)
+                for i, args, kwargs in _sighup_callbacks:
+                    i(hs, *args, **kwargs)
 
                 sdnotify(b"READY=1")
 
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index 1fd52a5526..04751a6a5e 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -227,8 +227,6 @@ def start(config_options):
     config.start_pushers = False
     config.send_federation = False
 
-    setup_logging(config, use_worker_options=True)
-
     synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -241,6 +239,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
 
     # We use task.react as the basic run command as it correctly handles tearing
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 54bb114dec..767b87d2db 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -141,8 +141,6 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.appservice"
 
-    setup_logging(config, use_worker_options=True)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -167,6 +165,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ps, config, use_worker_options=True)
+
     ps.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ps, config.worker_listeners
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 721bb5b119..dbcc414c42 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -119,7 +119,7 @@ class ClientReaderServer(HomeServer):
                     KeyChangesServlet(self).register(resource)
                     VoipRestServlet(self).register(resource)
                     PushRuleRestServlet(self).register(resource)
-                    VersionsRestServlet().register(resource)
+                    VersionsRestServlet(self).register(resource)
 
                     resources.update({"/_matrix/client": resource})
 
@@ -179,8 +179,6 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.client_reader"
 
-    setup_logging(config, use_worker_options=True)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -193,6 +191,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 473c8895d0..c67fe69a50 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -175,8 +175,6 @@ def start(config_options):
 
     assert config.worker_replication_http_port is not None
 
-    setup_logging(config, use_worker_options=True)
-
     # This should only be done on the user directory worker or the master
     config.update_user_directory = False
 
@@ -192,6 +190,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 5255d9e8cc..1ef027a88c 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -160,8 +160,6 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.federation_reader"
 
-    setup_logging(config, use_worker_options=True)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -174,6 +172,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index c5a2880e69..04fbb407af 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -171,8 +171,6 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.federation_sender"
 
-    setup_logging(config, use_worker_options=True)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -197,6 +195,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index e2822ca848..9504bfbc70 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -70,12 +70,12 @@ class PresenceStatusStubServlet(RestServlet):
         except HttpResponseException as e:
             raise e.to_synapse_error()
 
-        return (200, result)
+        return 200, result
 
     @defer.inlineCallbacks
     def on_PUT(self, request, user_id):
         yield self.auth.get_user_by_req(request)
-        return (200, {})
+        return 200, {}
 
 
 class KeyUploadServlet(RestServlet):
@@ -126,11 +126,11 @@ class KeyUploadServlet(RestServlet):
                 self.main_uri + request.uri.decode("ascii"), body, headers=headers
             )
 
-            return (200, result)
+            return 200, result
         else:
             # Just interested in counts.
             result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
-            return (200, {"one_time_key_counts": result})
+            return 200, {"one_time_key_counts": result}
 
 
 class FrontendProxySlavedStore(
@@ -232,8 +232,6 @@ def start(config_options):
 
     assert config.worker_main_http_uri is not None
 
-    setup_logging(config, use_worker_options=True)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -246,6 +244,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 8233905844..774326dff9 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -341,8 +341,6 @@ def setup(config_options):
         # generating config files and shouldn't try to continue.
         sys.exit(0)
 
-    synapse.config.logger.setup_logging(config, use_worker_options=False)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -356,6 +354,8 @@ def setup(config_options):
         database_engine=database_engine,
     )
 
+    synapse.config.logger.setup_logging(hs, config, use_worker_options=False)
+
     logger.info("Preparing database: %s...", config.database_config["name"])
 
     try:
@@ -561,10 +561,12 @@ def run(hs):
 
         stats["database_engine"] = hs.get_datastore().database_engine_name
         stats["database_server_version"] = hs.get_datastore().get_server_version()
-        logger.info("Reporting stats to matrix.org: %s" % (stats,))
+        logger.info(
+            "Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats)
+        )
         try:
             yield hs.get_simple_http_client().put_json(
-                "https://matrix.org/report-usage-stats/push", stats
+                hs.config.report_stats_endpoint, stats
             )
         except Exception as e:
             logger.warn("Error reporting stats: %s", e)
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 3a168577c7..2ac783ffa3 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -155,8 +155,6 @@ def start(config_options):
             "Please add ``enable_media_repo: false`` to the main config\n"
         )
 
-    setup_logging(config, use_worker_options=True)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -169,6 +167,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 692ffa2f04..d84732ee3c 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -184,8 +184,6 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.pusher"
 
-    setup_logging(config, use_worker_options=True)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     if config.start_pushers:
@@ -210,6 +208,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ps, config, use_worker_options=True)
+
     ps.setup()
 
     def start():
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index a1c3b162f7..473026fce5 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -435,8 +435,6 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.synchrotron"
 
-    setup_logging(config, use_worker_options=True)
-
     synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -450,6 +448,8 @@ def start(config_options):
         application_service_handler=SynchrotronApplicationService(),
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index cb29a1afab..e01afb39f2 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -197,8 +197,6 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.user_dir"
 
-    setup_logging(config, use_worker_options=True)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -223,6 +221,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 007ca75a94..3e25bf5747 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -107,7 +107,6 @@ class ApplicationServiceApi(SimpleHttpClient):
         except CodeMessageException as e:
             if e.code == 404:
                 return False
-                return
             logger.warning("query_user to %s received %s", uri, e.code)
         except Exception as ex:
             logger.warning("query_user to %s threw exception %s", uri, ex)
@@ -127,7 +126,6 @@ class ApplicationServiceApi(SimpleHttpClient):
             logger.warning("query_alias to %s received %s", uri, e.code)
             if e.code == 404:
                 return False
-                return
         except Exception as ex:
             logger.warning("query_alias to %s threw exception %s", uri, ex)
         return False
@@ -230,7 +228,6 @@ class ApplicationServiceApi(SimpleHttpClient):
             sent_transactions_counter.labels(service.id).inc()
             sent_events_counter.labels(service.id).inc(len(events))
             return True
-            return
         except CodeMessageException as e:
             logger.warning("push_bulk to %s received %s", uri, e.code)
         except Exception as ex:
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 42a350bff8..9998f822f1 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -70,35 +70,37 @@ class ApplicationServiceScheduler(object):
         self.store = hs.get_datastore()
         self.as_api = hs.get_application_service_api()
 
-        def create_recoverer(service, callback):
-            return _Recoverer(self.clock, self.store, self.as_api, service, callback)
-
-        self.txn_ctrl = _TransactionController(
-            self.clock, self.store, self.as_api, create_recoverer
-        )
+        self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
         self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
 
     @defer.inlineCallbacks
     def start(self):
         logger.info("Starting appservice scheduler")
+
         # check for any DOWN ASes and start recoverers for them.
-        recoverers = yield _Recoverer.start(
-            self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
+        services = yield self.store.get_appservices_by_state(
+            ApplicationServiceState.DOWN
         )
-        self.txn_ctrl.add_recoverers(recoverers)
+
+        for service in services:
+            self.txn_ctrl.start_recoverer(service)
 
     def submit_event_for_as(self, service, event):
         self.queuer.enqueue(service, event)
 
 
 class _ServiceQueuer(object):
-    """Queues events for the same application service together, sending
-    transactions as soon as possible. Once a transaction is sent successfully,
-    this schedules any other events in the queue to run.
+    """Queue of events waiting to be sent to appservices.
+
+    Groups events into transactions per-appservice, and sends them on to the
+    TransactionController. Makes sure that we only have one transaction in flight per
+    appservice at a given time.
     """
 
     def __init__(self, txn_ctrl, clock):
         self.queued_events = {}  # dict of {service_id: [events]}
+
+        # the appservices which currently have a transaction in flight
         self.requests_in_flight = set()
         self.txn_ctrl = txn_ctrl
         self.clock = clock
@@ -136,13 +138,29 @@ class _ServiceQueuer(object):
 
 
 class _TransactionController(object):
-    def __init__(self, clock, store, as_api, recoverer_fn):
+    """Transaction manager.
+
+    Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer
+    if a transaction fails.
+
+    (Note we have only have one of these in the homeserver.)
+
+    Args:
+        clock (synapse.util.Clock):
+        store (synapse.storage.DataStore):
+        as_api (synapse.appservice.api.ApplicationServiceApi):
+    """
+
+    def __init__(self, clock, store, as_api):
         self.clock = clock
         self.store = store
         self.as_api = as_api
-        self.recoverer_fn = recoverer_fn
-        # keep track of how many recoverers there are
-        self.recoverers = []
+
+        # map from service id to recoverer instance
+        self.recoverers = {}
+
+        # for UTs
+        self.RECOVERER_CLASS = _Recoverer
 
     @defer.inlineCallbacks
     def send(self, service, events):
@@ -154,42 +172,45 @@ class _TransactionController(object):
                 if sent:
                     yield txn.complete(self.store)
                 else:
-                    run_in_background(self._start_recoverer, service)
+                    run_in_background(self._on_txn_fail, service)
         except Exception:
             logger.exception("Error creating appservice transaction")
-            run_in_background(self._start_recoverer, service)
+            run_in_background(self._on_txn_fail, service)
 
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):
-        self.recoverers.remove(recoverer)
         logger.info(
             "Successfully recovered application service AS ID %s", recoverer.service.id
         )
+        self.recoverers.pop(recoverer.service.id)
         logger.info("Remaining active recoverers: %s", len(self.recoverers))
         yield self.store.set_appservice_state(
             recoverer.service, ApplicationServiceState.UP
         )
 
-    def add_recoverers(self, recoverers):
-        for r in recoverers:
-            self.recoverers.append(r)
-        if len(recoverers) > 0:
-            logger.info("New active recoverers: %s", len(self.recoverers))
-
     @defer.inlineCallbacks
-    def _start_recoverer(self, service):
+    def _on_txn_fail(self, service):
         try:
             yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
-            logger.info(
-                "Application service falling behind. Starting recoverer. AS ID %s",
-                service.id,
-            )
-            recoverer = self.recoverer_fn(service, self.on_recovered)
-            self.add_recoverers([recoverer])
-            recoverer.recover()
+            self.start_recoverer(service)
         except Exception:
             logger.exception("Error starting AS recoverer")
 
+    def start_recoverer(self, service):
+        """Start a Recoverer for the given service
+
+        Args:
+            service (synapse.appservice.ApplicationService):
+        """
+        logger.info("Starting recoverer for AS ID %s", service.id)
+        assert service.id not in self.recoverers
+        recoverer = self.RECOVERER_CLASS(
+            self.clock, self.store, self.as_api, service, self.on_recovered
+        )
+        self.recoverers[service.id] = recoverer
+        recoverer.recover()
+        logger.info("Now %i active recoverers", len(self.recoverers))
+
     @defer.inlineCallbacks
     def _is_service_up(self, service):
         state = yield self.store.get_appservice_state(service)
@@ -197,18 +218,17 @@ class _TransactionController(object):
 
 
 class _Recoverer(object):
-    @staticmethod
-    @defer.inlineCallbacks
-    def start(clock, store, as_api, callback):
-        services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
-        recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
-        for r in recoverers:
-            logger.info(
-                "Starting recoverer for AS ID %s which was marked as " "DOWN",
-                r.service.id,
-            )
-            r.recover()
-        return recoverers
+    """Manages retries and backoff for a DOWN appservice.
+
+    We have one of these for each appservice which is currently considered DOWN.
+
+    Args:
+        clock (synapse.util.Clock):
+        store (synapse.storage.DataStore):
+        as_api (synapse.appservice.api.ApplicationServiceApi):
+        service (synapse.appservice.ApplicationService): the service we are managing
+        callback (callable[_Recoverer]): called once the service recovers.
+    """
 
     def __init__(self, clock, store, as_api, service, callback):
         self.clock = clock
@@ -224,7 +244,9 @@ class _Recoverer(object):
                 "as-recoverer-%s" % (self.service.id,), self.retry
             )
 
-        self.clock.call_later((2 ** self.backoff_counter), _retry)
+        delay = 2 ** self.backoff_counter
+        logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
+        self.clock.call_later(delay, _retry)
 
     def _backoff(self):
         # cap the backoff to be around 8.5min => (2^9) = 512 secs
@@ -234,25 +256,30 @@ class _Recoverer(object):
 
     @defer.inlineCallbacks
     def retry(self):
+        logger.info("Starting retries on %s", self.service.id)
         try:
-            txn = yield self.store.get_oldest_unsent_txn(self.service)
-            if txn:
+            while True:
+                txn = yield self.store.get_oldest_unsent_txn(self.service)
+                if not txn:
+                    # nothing left: we're done!
+                    self.callback(self)
+                    return
+
                 logger.info(
                     "Retrying transaction %s for AS ID %s", txn.id, txn.service.id
                 )
                 sent = yield txn.send(self.as_api)
-                if sent:
-                    yield txn.complete(self.store)
-                    # reset the backoff counter and retry immediately
-                    self.backoff_counter = 1
-                    yield self.retry()
-                else:
-                    self._backoff()
-            else:
-                self._set_service_recovered()
-        except Exception as e:
-            logger.exception(e)
-            self._backoff()
-
-    def _set_service_recovered(self):
-        self.callback(self)
+                if not sent:
+                    break
+
+                yield txn.complete(self.store)
+
+                # reset the backoff counter and then process the next transaction
+                self.backoff_counter = 1
+
+        except Exception:
+            logger.exception("Unexpected error running retries")
+
+        # we didn't manage to send all of the transactions before we got an error of
+        # some flavour: reschedule the next retry.
+        self._backoff()