summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/__init__.py4
-rw-r--r--synapse/app/_base.py107
-rw-r--r--synapse/app/appservice.py29
-rw-r--r--synapse/app/client_reader.py35
-rw-r--r--synapse/app/event_creator.py39
-rw-r--r--synapse/app/federation_reader.py38
-rw-r--r--synapse/app/federation_sender.py46
-rw-r--r--synapse/app/frontend_proxy.py80
-rwxr-xr-xsynapse/app/homeserver.py165
-rw-r--r--synapse/app/media_repository.py37
-rw-r--r--synapse/app/pusher.py60
-rw-r--r--synapse/app/synchrotron.py90
-rw-r--r--synapse/app/user_dir.py45
13 files changed, 376 insertions, 399 deletions
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index f56f5fcc13..d877c77834 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -43,7 +43,7 @@ def check_bind_error(e, address, bind_addresses):
         address (str): Address on which binding was attempted.
         bind_addresses (list): Addresses on which the service listens.
     """
-    if address == '0.0.0.0' and '::' in bind_addresses:
-        logger.warn('Failed to listen on 0.0.0.0, continuing because listening on [::]')
+    if address == "0.0.0.0" and "::" in bind_addresses:
+        logger.warn("Failed to listen on 0.0.0.0, continuing because listening on [::]")
     else:
         raise e
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 08199a5e8d..d50a9840d4 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -19,7 +19,6 @@ import signal
 import sys
 import traceback
 
-import psutil
 from daemonize import Daemonize
 
 from twisted.internet import defer, error, reactor
@@ -68,21 +67,13 @@ def start_worker_reactor(appname, config):
         gc_thresholds=config.gc_thresholds,
         pid_file=config.worker_pid_file,
         daemonize=config.worker_daemonize,
-        cpu_affinity=config.worker_cpu_affinity,
         print_pidfile=config.print_pidfile,
         logger=logger,
     )
 
 
 def start_reactor(
-        appname,
-        soft_file_limit,
-        gc_thresholds,
-        pid_file,
-        daemonize,
-        cpu_affinity,
-        print_pidfile,
-        logger,
+    appname, soft_file_limit, gc_thresholds, pid_file, daemonize, print_pidfile, logger
 ):
     """ Run the reactor in the main process
 
@@ -95,7 +86,6 @@ def start_reactor(
         gc_thresholds:
         pid_file (str): name of pid file to write to if daemonize is True
         daemonize (bool): true to run the reactor in a background process
-        cpu_affinity (int|None): cpu affinity mask
         print_pidfile (bool): whether to print the pid file, if daemonize is True
         logger (logging.Logger): logger instance to pass to Daemonize
     """
@@ -109,20 +99,6 @@ def start_reactor(
         # between the sentinel and `run` logcontexts.
         with PreserveLoggingContext():
             logger.info("Running")
-            if cpu_affinity is not None:
-                # Turn the bitmask into bits, reverse it so we go from 0 up
-                mask_to_bits = bin(cpu_affinity)[2:][::-1]
-
-                cpus = []
-                cpu_num = 0
-
-                for i in mask_to_bits:
-                    if i == "1":
-                        cpus.append(cpu_num)
-                    cpu_num += 1
-
-                p = psutil.Process()
-                p.cpu_affinity(cpus)
 
             change_resource_limit(soft_file_limit)
             if gc_thresholds:
@@ -149,10 +125,10 @@ def start_reactor(
 def quit_with_error(error_string):
     message_lines = error_string.split("\n")
     line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2
-    sys.stderr.write("*" * line_length + '\n')
+    sys.stderr.write("*" * line_length + "\n")
     for line in message_lines:
         sys.stderr.write(" %s\n" % (line.rstrip(),))
-    sys.stderr.write("*" * line_length + '\n')
+    sys.stderr.write("*" * line_length + "\n")
     sys.exit(1)
 
 
@@ -178,14 +154,7 @@ def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
     r = []
     for address in bind_addresses:
         try:
-            r.append(
-                reactor.listenTCP(
-                    port,
-                    factory,
-                    backlog,
-                    address
-                )
-            )
+            r.append(reactor.listenTCP(port, factory, backlog, address))
         except error.CannotListenError as e:
             check_bind_error(e, address, bind_addresses)
 
@@ -205,13 +174,7 @@ def listen_ssl(
     for address in bind_addresses:
         try:
             r.append(
-                reactor.listenSSL(
-                    port,
-                    factory,
-                    context_factory,
-                    backlog,
-                    address
-                )
+                reactor.listenSSL(port, factory, context_factory, backlog, address)
             )
         except error.CannotListenError as e:
             check_bind_error(e, address, bind_addresses)
@@ -243,15 +206,13 @@ def refresh_certificate(hs):
             if isinstance(i.factory, TLSMemoryBIOFactory):
                 addr = i.getHost()
                 logger.info(
-                    "Replacing TLS context factory on [%s]:%i", addr.host, addr.port,
+                    "Replacing TLS context factory on [%s]:%i", addr.host, addr.port
                 )
                 # We want to replace TLS factories with a new one, with the new
                 # TLS configuration. We do this by reaching in and pulling out
                 # the wrappedFactory, and then re-wrapping it.
                 i.factory = TLSMemoryBIOFactory(
-                    hs.tls_server_context_factory,
-                    False,
-                    i.factory.wrappedFactory
+                    hs.tls_server_context_factory, False, i.factory.wrappedFactory
                 )
         logger.info("Context factories updated.")
 
@@ -267,6 +228,7 @@ def start(hs, listeners=None):
     try:
         # Set up the SIGHUP machinery.
         if hasattr(signal, "SIGHUP"):
+
             def handle_sighup(*args, **kwargs):
                 for i in _sighup_callbacks:
                     i(hs)
@@ -302,10 +264,8 @@ def setup_sentry(hs):
         return
 
     import sentry_sdk
-    sentry_sdk.init(
-        dsn=hs.config.sentry_dsn,
-        release=get_version_string(synapse),
-    )
+
+    sentry_sdk.init(dsn=hs.config.sentry_dsn, release=get_version_string(synapse))
 
     # We set some default tags that give some context to this instance
     with sentry_sdk.configure_scope() as scope:
@@ -326,7 +286,7 @@ def install_dns_limiter(reactor, max_dns_requests_in_flight=100):
     many DNS queries at once
     """
     new_resolver = _LimitedHostnameResolver(
-        reactor.nameResolver, max_dns_requests_in_flight,
+        reactor.nameResolver, max_dns_requests_in_flight
     )
 
     reactor.installNameResolver(new_resolver)
@@ -339,26 +299,44 @@ class _LimitedHostnameResolver(object):
     def __init__(self, resolver, max_dns_requests_in_flight):
         self._resolver = resolver
         self._limiter = Linearizer(
-            name="dns_client_limiter", max_count=max_dns_requests_in_flight,
+            name="dns_client_limiter", max_count=max_dns_requests_in_flight
         )
 
-    def resolveHostName(self, resolutionReceiver, hostName, portNumber=0,
-                        addressTypes=None, transportSemantics='TCP'):
-        # Note this is happening deep within the reactor, so we don't need to
-        # worry about log contexts.
-
+    def resolveHostName(
+        self,
+        resolutionReceiver,
+        hostName,
+        portNumber=0,
+        addressTypes=None,
+        transportSemantics="TCP",
+    ):
         # We need this function to return `resolutionReceiver` so we do all the
         # actual logic involving deferreds in a separate function.
-        self._resolve(
-            resolutionReceiver, hostName, portNumber,
-            addressTypes, transportSemantics,
-        )
+
+        # even though this is happening within the depths of twisted, we need to drop
+        # our logcontext before starting _resolve, otherwise: (a) _resolve will drop
+        # the logcontext if it returns an incomplete deferred; (b) _resolve will
+        # call the resolutionReceiver *with* a logcontext, which it won't be expecting.
+        with PreserveLoggingContext():
+            self._resolve(
+                resolutionReceiver,
+                hostName,
+                portNumber,
+                addressTypes,
+                transportSemantics,
+            )
 
         return resolutionReceiver
 
     @defer.inlineCallbacks
-    def _resolve(self, resolutionReceiver, hostName, portNumber=0,
-                 addressTypes=None, transportSemantics='TCP'):
+    def _resolve(
+        self,
+        resolutionReceiver,
+        hostName,
+        portNumber=0,
+        addressTypes=None,
+        transportSemantics="TCP",
+    ):
 
         with (yield self._limiter.queue(())):
             # resolveHostName doesn't return a Deferred, so we need to hook into
@@ -368,8 +346,7 @@ class _LimitedHostnameResolver(object):
             receiver = _DeferredResolutionReceiver(resolutionReceiver, deferred)
 
             self._resolver.resolveHostName(
-                receiver, hostName, portNumber,
-                addressTypes, transportSemantics,
+                receiver, hostName, portNumber, addressTypes, transportSemantics
             )
 
             yield deferred
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 33107f56d1..9120bdb143 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -44,7 +44,9 @@ logger = logging.getLogger("synapse.app.appservice")
 
 
 class AppserviceSlaveStore(
-    DirectoryStore, SlavedEventStore, SlavedApplicationServiceStore,
+    DirectoryStore,
+    SlavedEventStore,
+    SlavedApplicationServiceStore,
     SlavedRegistrationStore,
 ):
     pass
@@ -74,7 +76,7 @@ class AppserviceServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse appservice now listening on port %d", port)
@@ -88,18 +90,19 @@ class AppserviceServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -132,9 +135,7 @@ class ASReplicationHandler(ReplicationClientHandler):
 
 def start(config_options):
     try:
-        config = HomeServerConfig.load_config(
-            "Synapse appservice", config_options
-        )
+        config = HomeServerConfig.load_config("Synapse appservice", config_options)
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
@@ -173,6 +174,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-appservice", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 864f1eac48..90bc79cdda 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -37,7 +37,9 @@ from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.groups import SlavedGroupServerStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.profile import SlavedProfileStore
 from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
@@ -51,6 +53,7 @@ from synapse.rest.client.v1.room import (
     PublicRoomListRestServlet,
     RoomEventContextServlet,
     RoomMemberListRestServlet,
+    RoomMessageListRestServlet,
     RoomStateRestServlet,
 )
 from synapse.rest.client.v1.voip import VoipRestServlet
@@ -73,6 +76,7 @@ class ClientReaderSlavedStore(
     SlavedDeviceStore,
     SlavedReceiptsStore,
     SlavedPushRuleStore,
+    SlavedGroupServerStore,
     SlavedAccountDataStore,
     SlavedEventStore,
     SlavedKeyStore,
@@ -81,6 +85,7 @@ class ClientReaderSlavedStore(
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
     SlavedTransactionStore,
+    SlavedProfileStore,
     SlavedClientIpStore,
     BaseSlavedStore,
 ):
@@ -107,6 +112,7 @@ class ClientReaderServer(HomeServer):
                     JoinedRoomMemberListRestServlet(self).register(resource)
                     RoomStateRestServlet(self).register(resource)
                     RoomEventContextServlet(self).register(resource)
+                    RoomMessageListRestServlet(self).register(resource)
                     RegisterRestServlet(self).register(resource)
                     LoginRestServlet(self).register(resource)
                     ThreepidRestServlet(self).register(resource)
@@ -116,9 +122,7 @@ class ClientReaderServer(HomeServer):
                     PushRuleRestServlet(self).register(resource)
                     VersionsRestServlet().register(resource)
 
-                    resources.update({
-                        "/_matrix/client": resource,
-                    })
+                    resources.update({"/_matrix/client": resource})
 
         root_resource = create_resource_tree(resources, NoResource())
 
@@ -131,7 +135,7 @@ class ClientReaderServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse client reader now listening on port %d", port)
@@ -145,18 +149,19 @@ class ClientReaderServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -168,9 +173,7 @@ class ClientReaderServer(HomeServer):
 
 def start(config_options):
     try:
-        config = HomeServerConfig.load_config(
-            "Synapse client reader", config_options
-        )
+        config = HomeServerConfig.load_config("Synapse client reader", config_options)
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
@@ -197,6 +200,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-client-reader", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index b8e5196152..ff522e4499 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -109,12 +109,14 @@ class EventCreatorServer(HomeServer):
                     ProfileAvatarURLRestServlet(self).register(resource)
                     ProfileDisplaynameRestServlet(self).register(resource)
                     ProfileRestServlet(self).register(resource)
-                    resources.update({
-                        "/_matrix/client/r0": resource,
-                        "/_matrix/client/unstable": resource,
-                        "/_matrix/client/v2_alpha": resource,
-                        "/_matrix/client/api/v1": resource,
-                    })
+                    resources.update(
+                        {
+                            "/_matrix/client/r0": resource,
+                            "/_matrix/client/unstable": resource,
+                            "/_matrix/client/v2_alpha": resource,
+                            "/_matrix/client/api/v1": resource,
+                        }
+                    )
 
         root_resource = create_resource_tree(resources, NoResource())
 
@@ -127,7 +129,7 @@ class EventCreatorServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse event creator now listening on port %d", port)
@@ -141,18 +143,19 @@ class EventCreatorServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -164,9 +167,7 @@ class EventCreatorServer(HomeServer):
 
 def start(config_options):
     try:
-        config = HomeServerConfig.load_config(
-            "Synapse event creator", config_options
-        )
+        config = HomeServerConfig.load_config("Synapse event creator", config_options)
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
@@ -198,6 +199,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-event-creator", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 7da79dc827..9421420930 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -86,19 +86,18 @@ class FederationReaderServer(HomeServer):
                 if name == "metrics":
                     resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
                 elif name == "federation":
-                    resources.update({
-                        FEDERATION_PREFIX: TransportLayerServer(self),
-                    })
+                    resources.update({FEDERATION_PREFIX: TransportLayerServer(self)})
                 if name == "openid" and "federation" not in res["names"]:
                     # Only load the openid resource separately if federation resource
                     # is not specified since federation resource includes openid
                     # resource.
-                    resources.update({
-                        FEDERATION_PREFIX: TransportLayerServer(
-                            self,
-                            servlet_groups=["openid"],
-                        ),
-                    })
+                    resources.update(
+                        {
+                            FEDERATION_PREFIX: TransportLayerServer(
+                                self, servlet_groups=["openid"]
+                            )
+                        }
+                    )
 
                 if name in ["keys", "federation"]:
                     resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
@@ -115,7 +114,7 @@ class FederationReaderServer(HomeServer):
                 root_resource,
                 self.version_string,
             ),
-            reactor=self.get_reactor()
+            reactor=self.get_reactor(),
         )
 
         logger.info("Synapse federation reader now listening on port %d", port)
@@ -129,18 +128,19 @@ class FederationReaderServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -181,6 +181,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-federation-reader", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 1d43f2b075..969be58d0b 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -52,8 +52,13 @@ logger = logging.getLogger("synapse.app.federation_sender")
 
 
 class FederationSenderSlaveStore(
-    SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
-    SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
+    SlavedDeviceInboxStore,
+    SlavedTransactionStore,
+    SlavedReceiptsStore,
+    SlavedEventStore,
+    SlavedRegistrationStore,
+    SlavedDeviceStore,
+    SlavedPresenceStore,
 ):
     def __init__(self, db_conn, hs):
         super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
@@ -65,10 +70,7 @@ class FederationSenderSlaveStore(
         self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
 
     def _get_federation_out_pos(self, db_conn):
-        sql = (
-            "SELECT stream_id FROM federation_stream_position"
-            " WHERE type = ?"
-        )
+        sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?"
         sql = self.database_engine.convert_param_style(sql)
 
         txn = db_conn.cursor()
@@ -103,7 +105,7 @@ class FederationSenderServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse federation_sender now listening on port %d", port)
@@ -117,18 +119,19 @@ class FederationSenderServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -151,7 +154,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
         self.send_handler.process_replication_rows(stream_name, token, rows)
 
     def get_streams_to_replicate(self):
-        args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate()
+        args = super(
+            FederationSenderReplicationHandler, self
+        ).get_streams_to_replicate()
         args.update(self.send_handler.stream_positions())
         return args
 
@@ -203,6 +208,7 @@ class FederationSenderHandler(object):
     """Processes the replication stream and forwards the appropriate entries
     to the federation sender.
     """
+
     def __init__(self, hs, replication_client):
         self.store = hs.get_datastore()
         self._is_mine_id = hs.is_mine_id
@@ -241,7 +247,7 @@ class FederationSenderHandler(object):
         # ... and when new receipts happen
         elif stream_name == ReceiptsStream.NAME:
             run_as_background_process(
-                "process_receipts_for_federation", self._on_new_receipts, rows,
+                "process_receipts_for_federation", self._on_new_receipts, rows
             )
 
     @defer.inlineCallbacks
@@ -278,12 +284,14 @@ class FederationSenderHandler(object):
 
                     # We ACK this token over replication so that the master can drop
                     # its in memory queues
-                    self.replication_client.send_federation_ack(self.federation_position)
+                    self.replication_client.send_federation_ack(
+                        self.federation_position
+                    )
                     self._last_ack = self.federation_position
         except Exception:
             logger.exception("Error updating federation stream position")
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 8479fee738..2fd7d57ebf 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -37,8 +37,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
-from synapse.rest.client.v2_alpha._base import client_v2_patterns
+from synapse.rest.client.v2_alpha._base import client_patterns
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
@@ -49,11 +48,11 @@ from synapse.util.versionstring import get_version_string
 logger = logging.getLogger("synapse.app.frontend_proxy")
 
 
-class PresenceStatusStubServlet(ClientV1RestServlet):
-    PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
+class PresenceStatusStubServlet(RestServlet):
+    PATTERNS = client_patterns("/presence/(?P<user_id>[^/]*)/status")
 
     def __init__(self, hs):
-        super(PresenceStatusStubServlet, self).__init__(hs)
+        super(PresenceStatusStubServlet, self).__init__()
         self.http_client = hs.get_simple_http_client()
         self.auth = hs.get_auth()
         self.main_uri = hs.config.worker_main_http_uri
@@ -63,14 +62,11 @@ class PresenceStatusStubServlet(ClientV1RestServlet):
         # Pass through the auth headers, if any, in case the access token
         # is there.
         auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
-        headers = {
-            "Authorization": auth_headers,
-        }
+        headers = {"Authorization": auth_headers}
 
         try:
             result = yield self.http_client.get_json(
-                self.main_uri + request.uri.decode('ascii'),
-                headers=headers,
+                self.main_uri + request.uri.decode("ascii"), headers=headers
             )
         except HttpResponseException as e:
             raise e.to_synapse_error()
@@ -84,7 +80,7 @@ class PresenceStatusStubServlet(ClientV1RestServlet):
 
 
 class KeyUploadServlet(RestServlet):
-    PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
+    PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
 
     def __init__(self, hs):
         """
@@ -106,18 +102,19 @@ class KeyUploadServlet(RestServlet):
         if device_id is not None:
             # passing the device_id here is deprecated; however, we allow it
             # for now for compatibility with older clients.
-            if (requester.device_id is not None and
-                    device_id != requester.device_id):
-                logger.warning("Client uploading keys for a different device "
-                               "(logged in as %s, uploading for %s)",
-                               requester.device_id, device_id)
+            if requester.device_id is not None and device_id != requester.device_id:
+                logger.warning(
+                    "Client uploading keys for a different device "
+                    "(logged in as %s, uploading for %s)",
+                    requester.device_id,
+                    device_id,
+                )
         else:
             device_id = requester.device_id
 
         if device_id is None:
             raise SynapseError(
-                400,
-                "To upload keys, you must pass device_id when authenticating"
+                400, "To upload keys, you must pass device_id when authenticating"
             )
 
         if body:
@@ -125,13 +122,9 @@ class KeyUploadServlet(RestServlet):
             # Pass through the auth headers, if any, in case the access token
             # is there.
             auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", [])
-            headers = {
-                "Authorization": auth_headers,
-            }
+            headers = {"Authorization": auth_headers}
             result = yield self.http_client.post_json_get_json(
-                self.main_uri + request.uri.decode('ascii'),
-                body,
-                headers=headers,
+                self.main_uri + request.uri.decode("ascii"), body, headers=headers
             )
 
             defer.returnValue((200, result))
@@ -172,12 +165,14 @@ class FrontendProxyServer(HomeServer):
                     if not self.config.use_presence:
                         PresenceStatusStubServlet(self).register(resource)
 
-                    resources.update({
-                        "/_matrix/client/r0": resource,
-                        "/_matrix/client/unstable": resource,
-                        "/_matrix/client/v2_alpha": resource,
-                        "/_matrix/client/api/v1": resource,
-                    })
+                    resources.update(
+                        {
+                            "/_matrix/client/r0": resource,
+                            "/_matrix/client/unstable": resource,
+                            "/_matrix/client/v2_alpha": resource,
+                            "/_matrix/client/api/v1": resource,
+                        }
+                    )
 
         root_resource = create_resource_tree(resources, NoResource())
 
@@ -191,7 +186,7 @@ class FrontendProxyServer(HomeServer):
                 root_resource,
                 self.version_string,
             ),
-            reactor=self.get_reactor()
+            reactor=self.get_reactor(),
         )
 
         logger.info("Synapse client reader now listening on port %d", port)
@@ -205,18 +200,19 @@ class FrontendProxyServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -228,9 +224,7 @@ class FrontendProxyServer(HomeServer):
 
 def start(config_options):
     try:
-        config = HomeServerConfig.load_config(
-            "Synapse frontend proxy", config_options
-        )
+        config = HomeServerConfig.load_config("Synapse frontend proxy", config_options)
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
@@ -259,6 +253,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-frontend-proxy", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 1045d28949..49da105cf6 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -101,13 +101,12 @@ class SynapseHomeServer(HomeServer):
                     # Skip loading openid resource if federation is defined
                     # since federation resource will include openid
                     continue
-                resources.update(self._configure_named_resource(
-                    name, res.get("compress", False),
-                ))
+                resources.update(
+                    self._configure_named_resource(name, res.get("compress", False))
+                )
 
         additional_resources = listener_config.get("additional_resources", {})
-        logger.debug("Configuring additional resources: %r",
-                     additional_resources)
+        logger.debug("Configuring additional resources: %r", additional_resources)
         module_api = ModuleApi(self, self.get_auth_handler())
         for path, resmodule in additional_resources.items():
             handler_cls, config = load_module(resmodule)
@@ -174,59 +173,67 @@ class SynapseHomeServer(HomeServer):
             if compress:
                 client_resource = gz_wrap(client_resource)
 
-            resources.update({
-                "/_matrix/client/api/v1": client_resource,
-                "/_matrix/client/r0": client_resource,
-                "/_matrix/client/unstable": client_resource,
-                "/_matrix/client/v2_alpha": client_resource,
-                "/_matrix/client/versions": client_resource,
-                "/.well-known/matrix/client": WellKnownResource(self),
-                "/_synapse/admin": AdminRestResource(self),
-            })
+            resources.update(
+                {
+                    "/_matrix/client/api/v1": client_resource,
+                    "/_matrix/client/r0": client_resource,
+                    "/_matrix/client/unstable": client_resource,
+                    "/_matrix/client/v2_alpha": client_resource,
+                    "/_matrix/client/versions": client_resource,
+                    "/.well-known/matrix/client": WellKnownResource(self),
+                    "/_synapse/admin": AdminRestResource(self),
+                }
+            )
 
             if self.get_config().saml2_enabled:
                 from synapse.rest.saml2 import SAML2Resource
+
                 resources["/_matrix/saml2"] = SAML2Resource(self)
 
         if name == "consent":
             from synapse.rest.consent.consent_resource import ConsentResource
+
             consent_resource = ConsentResource(self)
             if compress:
                 consent_resource = gz_wrap(consent_resource)
-            resources.update({
-                "/_matrix/consent": consent_resource,
-            })
+            resources.update({"/_matrix/consent": consent_resource})
 
         if name == "federation":
-            resources.update({
-                FEDERATION_PREFIX: TransportLayerServer(self),
-            })
+            resources.update({FEDERATION_PREFIX: TransportLayerServer(self)})
 
         if name == "openid":
-            resources.update({
-                FEDERATION_PREFIX: TransportLayerServer(self, servlet_groups=["openid"]),
-            })
+            resources.update(
+                {
+                    FEDERATION_PREFIX: TransportLayerServer(
+                        self, servlet_groups=["openid"]
+                    )
+                }
+            )
 
         if name in ["static", "client"]:
-            resources.update({
-                STATIC_PREFIX: File(
-                    os.path.join(os.path.dirname(synapse.__file__), "static")
-                ),
-            })
+            resources.update(
+                {
+                    STATIC_PREFIX: File(
+                        os.path.join(os.path.dirname(synapse.__file__), "static")
+                    )
+                }
+            )
 
         if name in ["media", "federation", "client"]:
             if self.get_config().enable_media_repo:
                 media_repo = self.get_media_repository_resource()
-                resources.update({
-                    MEDIA_PREFIX: media_repo,
-                    LEGACY_MEDIA_PREFIX: media_repo,
-                    CONTENT_REPO_PREFIX: ContentRepoResource(
-                        self, self.config.uploads_path
-                    ),
-                })
+                resources.update(
+                    {
+                        MEDIA_PREFIX: media_repo,
+                        LEGACY_MEDIA_PREFIX: media_repo,
+                        CONTENT_REPO_PREFIX: ContentRepoResource(
+                            self, self.config.uploads_path
+                        ),
+                    }
+                )
             elif name == "media":
                 raise ConfigError(
-                    "'media' resource conflicts with enable_media_repo=False",
+                    "'media' resource conflicts with enable_media_repo=False"
                 )
 
         if name in ["keys", "federation"]:
@@ -257,18 +264,14 @@ class SynapseHomeServer(HomeServer):
 
         for listener in listeners:
             if listener["type"] == "http":
-                self._listening_services.extend(
-                    self._listener_http(config, listener)
-                )
+                self._listening_services.extend(self._listener_http(config, listener))
             elif listener["type"] == "manhole":
                 listen_tcp(
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "replication":
                 services = listen_tcp(
@@ -277,16 +280,17 @@ class SynapseHomeServer(HomeServer):
                     ReplicationStreamProtocolFactory(self),
                 )
                 for s in services:
-                    reactor.addSystemEventTrigger(
-                        "before", "shutdown", s.stopListening,
-                    )
+                    reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -312,7 +316,7 @@ current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
 max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
 registered_reserved_users_mau_gauge = Gauge(
     "synapse_admin_mau:registered_reserved_users",
-    "Registered users with reserved threepids"
+    "Registered users with reserved threepids",
 )
 
 
@@ -327,8 +331,7 @@ def setup(config_options):
     """
     try:
         config = HomeServerConfig.load_or_generate_config(
-            "Synapse Homeserver",
-            config_options,
+            "Synapse Homeserver", config_options
         )
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
@@ -339,10 +342,7 @@ def setup(config_options):
         # generating config files and shouldn't try to continue.
         sys.exit(0)
 
-    synapse.config.logger.setup_logging(
-        config,
-        use_worker_options=False
-    )
+    synapse.config.logger.setup_logging(config, use_worker_options=False)
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
@@ -357,7 +357,7 @@ def setup(config_options):
         database_engine=database_engine,
     )
 
-    logger.info("Preparing database: %s...", config.database_config['name'])
+    logger.info("Preparing database: %s...", config.database_config["name"])
 
     try:
         with hs.get_db_conn(run_new_connection=False) as db_conn:
@@ -375,7 +375,7 @@ def setup(config_options):
         )
         sys.exit(1)
 
-    logger.info("Database prepared in %s.", config.database_config['name'])
+    logger.info("Database prepared in %s.", config.database_config["name"])
 
     hs.setup()
     hs.setup_master()
@@ -391,9 +391,7 @@ def setup(config_options):
         acme = hs.get_acme_handler()
 
         # Check how long the certificate is active for.
-        cert_days_remaining = hs.config.is_disk_cert_valid(
-            allow_self_signed=False
-        )
+        cert_days_remaining = hs.config.is_disk_cert_valid(allow_self_signed=False)
 
         # We want to reprovision if cert_days_remaining is None (meaning no
         # certificate exists), or the days remaining number it returns
@@ -401,8 +399,8 @@ def setup(config_options):
         provision = False
 
         if (
-            cert_days_remaining is None or
-            cert_days_remaining < hs.config.acme_reprovision_threshold
+            cert_days_remaining is None
+            or cert_days_remaining < hs.config.acme_reprovision_threshold
         ):
             provision = True
 
@@ -433,10 +431,7 @@ def setup(config_options):
                 yield do_acme()
 
                 # Check if it needs to be reprovisioned every day.
-                hs.get_clock().looping_call(
-                    reprovision_acme,
-                    24 * 60 * 60 * 1000
-                )
+                hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)
 
             _base.start(hs, config.listeners)
 
@@ -463,6 +458,7 @@ class SynapseService(service.Service):
     A twisted Service class that will start synapse. Used to run synapse
     via twistd and a .tac.
     """
+
     def __init__(self, config):
         self.config = config
 
@@ -479,6 +475,7 @@ class SynapseService(service.Service):
 def run(hs):
     PROFILE_SYNAPSE = False
     if PROFILE_SYNAPSE:
+
         def profile(func):
             from cProfile import Profile
             from threading import current_thread
@@ -489,13 +486,14 @@ def run(hs):
                 func(*args, **kargs)
                 profile.disable()
                 ident = current_thread().ident
-                profile.dump_stats("/tmp/%s.%s.%i.pstat" % (
-                    hs.hostname, func.__name__, ident
-                ))
+                profile.dump_stats(
+                    "/tmp/%s.%s.%i.pstat" % (hs.hostname, func.__name__, ident)
+                )
 
             return profiled
 
         from twisted.python.threadpool import ThreadPool
+
         ThreadPool._worker = profile(ThreadPool._worker)
         reactor.run = profile(reactor.run)
 
@@ -540,7 +538,10 @@ def run(hs):
         stats["total_room_count"] = room_count
 
         stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
-        stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
+        stats["monthly_active_users"] = yield hs.get_datastore().count_monthly_users()
+        stats[
+            "daily_active_rooms"
+        ] = yield hs.get_datastore().count_daily_active_rooms()
         stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
 
         r30_results = yield hs.get_datastore().count_r30_users()
@@ -564,8 +565,7 @@ def run(hs):
         logger.info("Reporting stats to matrix.org: %s" % (stats,))
         try:
             yield hs.get_simple_http_client().put_json(
-                "https://matrix.org/report-usage-stats/push",
-                stats
+                "https://matrix.org/report-usage-stats/push", stats
             )
         except Exception as e:
             logger.warn("Error reporting stats: %s", e)
@@ -580,14 +580,11 @@ def run(hs):
             logger.info("report_stats can use psutil")
             stats_process.append(process)
         except (AttributeError):
-            logger.warning(
-                "Unable to read memory/cpu stats. Disabling reporting."
-            )
+            logger.warning("Unable to read memory/cpu stats. Disabling reporting.")
 
     def generate_user_daily_visit_stats():
         return run_as_background_process(
-            "generate_user_daily_visits",
-            hs.get_datastore().generate_user_daily_visits,
+            "generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits
         )
 
     # Rather than update on per session basis, batch up the requests.
@@ -598,9 +595,9 @@ def run(hs):
     # monthly active user limiting functionality
     def reap_monthly_active_users():
         return run_as_background_process(
-            "reap_monthly_active_users",
-            hs.get_datastore().reap_monthly_active_users,
+            "reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users
         )
+
     clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
     reap_monthly_active_users()
 
@@ -618,8 +615,7 @@ def run(hs):
 
     def start_generate_monthly_active_users():
         return run_as_background_process(
-            "generate_monthly_active_users",
-            generate_monthly_active_users,
+            "generate_monthly_active_users", generate_monthly_active_users
         )
 
     start_generate_monthly_active_users()
@@ -645,7 +641,6 @@ def run(hs):
         gc_thresholds=hs.config.gc_thresholds,
         pid_file=hs.config.pid_file,
         daemonize=hs.config.daemonize,
-        cpu_affinity=hs.config.cpu_affinity,
         print_pidfile=hs.config.print_pidfile,
         logger=logger,
     )
@@ -659,5 +654,5 @@ def main():
         run(hs)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     main()
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index d4cc4e9443..cf0e2036c3 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -72,13 +72,15 @@ class MediaRepositoryServer(HomeServer):
                     resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
                 elif name == "media":
                     media_repo = self.get_media_repository_resource()
-                    resources.update({
-                        MEDIA_PREFIX: media_repo,
-                        LEGACY_MEDIA_PREFIX: media_repo,
-                        CONTENT_REPO_PREFIX: ContentRepoResource(
-                            self, self.config.uploads_path
-                        ),
-                    })
+                    resources.update(
+                        {
+                            MEDIA_PREFIX: media_repo,
+                            LEGACY_MEDIA_PREFIX: media_repo,
+                            CONTENT_REPO_PREFIX: ContentRepoResource(
+                                self, self.config.uploads_path
+                            ),
+                        }
+                    )
 
         root_resource = create_resource_tree(resources, NoResource())
 
@@ -91,7 +93,7 @@ class MediaRepositoryServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse media repository now listening on port %d", port)
@@ -105,18 +107,19 @@ class MediaRepositoryServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -164,6 +167,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-media-repository", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index cbf0d67f51..df29ea5ecb 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -46,36 +46,27 @@ logger = logging.getLogger("synapse.app.pusher")
 
 
 class PusherSlaveStore(
-    SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
-    SlavedAccountDataStore
+    SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, SlavedAccountDataStore
 ):
-    update_pusher_last_stream_ordering_and_success = (
-        __func__(DataStore.update_pusher_last_stream_ordering_and_success)
+    update_pusher_last_stream_ordering_and_success = __func__(
+        DataStore.update_pusher_last_stream_ordering_and_success
     )
 
-    update_pusher_failing_since = (
-        __func__(DataStore.update_pusher_failing_since)
-    )
+    update_pusher_failing_since = __func__(DataStore.update_pusher_failing_since)
 
-    update_pusher_last_stream_ordering = (
-        __func__(DataStore.update_pusher_last_stream_ordering)
+    update_pusher_last_stream_ordering = __func__(
+        DataStore.update_pusher_last_stream_ordering
     )
 
-    get_throttle_params_by_room = (
-        __func__(DataStore.get_throttle_params_by_room)
-    )
+    get_throttle_params_by_room = __func__(DataStore.get_throttle_params_by_room)
 
-    set_throttle_params = (
-        __func__(DataStore.set_throttle_params)
-    )
+    set_throttle_params = __func__(DataStore.set_throttle_params)
 
-    get_time_of_last_push_action_before = (
-        __func__(DataStore.get_time_of_last_push_action_before)
+    get_time_of_last_push_action_before = __func__(
+        DataStore.get_time_of_last_push_action_before
     )
 
-    get_profile_displayname = (
-        __func__(DataStore.get_profile_displayname)
-    )
+    get_profile_displayname = __func__(DataStore.get_profile_displayname)
 
 
 class PusherServer(HomeServer):
@@ -105,7 +96,7 @@ class PusherServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse pusher now listening on port %d", port)
@@ -119,18 +110,19 @@ class PusherServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -161,9 +153,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
                     else:
                         yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
             elif stream_name == "events":
-                yield self.pusher_pool.on_new_notifications(
-                    token, token,
-                )
+                yield self.pusher_pool.on_new_notifications(token, token)
             elif stream_name == "receipts":
                 yield self.pusher_pool.on_new_receipts(
                     token, token, set(row.room_id for row in rows)
@@ -188,9 +178,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
 
 def start(config_options):
     try:
-        config = HomeServerConfig.load_config(
-            "Synapse pusher", config_options
-        )
+        config = HomeServerConfig.load_config("Synapse pusher", config_options)
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
@@ -234,6 +222,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-pusher", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         ps = start(sys.argv[1:])
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 5388def28a..858949910d 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -98,10 +98,7 @@ class SynchrotronPresence(object):
         self.notifier = hs.get_notifier()
 
         active_presence = self.store.take_presence_startup_info()
-        self.user_to_current_state = {
-            state.user_id: state
-            for state in active_presence
-        }
+        self.user_to_current_state = {state.user_id: state for state in active_presence}
 
         # user_id -> last_sync_ms. Lists the users that have stopped syncing
         # but we haven't notified the master of that yet
@@ -196,17 +193,26 @@ class SynchrotronPresence(object):
         room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
-            "presence_key", stream_id, rooms=room_ids_to_states.keys(),
-            users=users_to_states.keys()
+            "presence_key",
+            stream_id,
+            rooms=room_ids_to_states.keys(),
+            users=users_to_states.keys(),
         )
 
     @defer.inlineCallbacks
     def process_replication_rows(self, token, rows):
-        states = [UserPresenceState(
-            row.user_id, row.state, row.last_active_ts,
-            row.last_federation_update_ts, row.last_user_sync_ts, row.status_msg,
-            row.currently_active
-        ) for row in rows]
+        states = [
+            UserPresenceState(
+                row.user_id,
+                row.state,
+                row.last_active_ts,
+                row.last_federation_update_ts,
+                row.last_user_sync_ts,
+                row.status_msg,
+                row.currently_active,
+            )
+            for row in rows
+        ]
 
         for state in states:
             self.user_to_current_state[state.user_id] = state
@@ -217,7 +223,8 @@ class SynchrotronPresence(object):
     def get_currently_syncing_users(self):
         if self.hs.config.use_presence:
             return [
-                user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
+                user_id
+                for user_id, count in iteritems(self.user_to_num_current_syncs)
                 if count > 0
             ]
         else:
@@ -281,12 +288,14 @@ class SynchrotronServer(HomeServer):
                     events.register_servlets(self, resource)
                     InitialSyncRestServlet(self).register(resource)
                     RoomInitialSyncRestServlet(self).register(resource)
-                    resources.update({
-                        "/_matrix/client/r0": resource,
-                        "/_matrix/client/unstable": resource,
-                        "/_matrix/client/v2_alpha": resource,
-                        "/_matrix/client/api/v1": resource,
-                    })
+                    resources.update(
+                        {
+                            "/_matrix/client/r0": resource,
+                            "/_matrix/client/unstable": resource,
+                            "/_matrix/client/v2_alpha": resource,
+                            "/_matrix/client/api/v1": resource,
+                        }
+                    )
 
         root_resource = create_resource_tree(resources, NoResource())
 
@@ -299,7 +308,7 @@ class SynchrotronServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse synchrotron now listening on port %d", port)
@@ -313,18 +322,19 @@ class SynchrotronServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -382,40 +392,36 @@ class SyncReplicationHandler(ReplicationClientHandler):
                     )
             elif stream_name == "push_rules":
                 self.notifier.on_new_event(
-                    "push_rules_key", token, users=[row.user_id for row in rows],
+                    "push_rules_key", token, users=[row.user_id for row in rows]
                 )
-            elif stream_name in ("account_data", "tag_account_data",):
+            elif stream_name in ("account_data", "tag_account_data"):
                 self.notifier.on_new_event(
-                    "account_data_key", token, users=[row.user_id for row in rows],
+                    "account_data_key", token, users=[row.user_id for row in rows]
                 )
             elif stream_name == "receipts":
                 self.notifier.on_new_event(
-                    "receipt_key", token, rooms=[row.room_id for row in rows],
+                    "receipt_key", token, rooms=[row.room_id for row in rows]
                 )
             elif stream_name == "typing":
                 self.typing_handler.process_replication_rows(token, rows)
                 self.notifier.on_new_event(
-                    "typing_key", token, rooms=[row.room_id for row in rows],
+                    "typing_key", token, rooms=[row.room_id for row in rows]
                 )
             elif stream_name == "to_device":
                 entities = [row.entity for row in rows if row.entity.startswith("@")]
                 if entities:
-                    self.notifier.on_new_event(
-                        "to_device_key", token, users=entities,
-                    )
+                    self.notifier.on_new_event("to_device_key", token, users=entities)
             elif stream_name == "device_lists":
                 all_room_ids = set()
                 for row in rows:
                     room_ids = yield self.store.get_rooms_for_user(row.user_id)
                     all_room_ids.update(room_ids)
-                self.notifier.on_new_event(
-                    "device_list_key", token, rooms=all_room_ids,
-                )
+                self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
             elif stream_name == "presence":
                 yield self.presence_handler.process_replication_rows(token, rows)
             elif stream_name == "receipts":
                 self.notifier.on_new_event(
-                    "groups_key", token, users=[row.user_id for row in rows],
+                    "groups_key", token, users=[row.user_id for row in rows]
                 )
         except Exception:
             logger.exception("Error processing replication")
@@ -423,9 +429,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
 
 def start(config_options):
     try:
-        config = HomeServerConfig.load_config(
-            "Synapse synchrotron", config_options
-        )
+        config = HomeServerConfig.load_config("Synapse synchrotron", config_options)
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
@@ -453,6 +457,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-synchrotron", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 355f5aa71d..2d9d2e1bbc 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -66,14 +66,16 @@ class UserDirectorySlaveStore(
 
         events_max = self._stream_id_gen.get_current_token()
         curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
-            db_conn, "current_state_delta_stream",
+            db_conn,
+            "current_state_delta_stream",
             entity_column="room_id",
             stream_column="stream_id",
             max_value=events_max,  # As we share the stream id with events token
             limit=1000,
         )
         self._curr_state_delta_stream_cache = StreamChangeCache(
-            "_curr_state_delta_stream_cache", min_curr_state_delta_id,
+            "_curr_state_delta_stream_cache",
+            min_curr_state_delta_id,
             prefilled_cache=curr_state_delta_prefill,
         )
 
@@ -110,12 +112,14 @@ class UserDirectoryServer(HomeServer):
                 elif name == "client":
                     resource = JsonResource(self, canonical_json=False)
                     user_directory.register_servlets(self, resource)
-                    resources.update({
-                        "/_matrix/client/r0": resource,
-                        "/_matrix/client/unstable": resource,
-                        "/_matrix/client/v2_alpha": resource,
-                        "/_matrix/client/api/v1": resource,
-                    })
+                    resources.update(
+                        {
+                            "/_matrix/client/r0": resource,
+                            "/_matrix/client/unstable": resource,
+                            "/_matrix/client/v2_alpha": resource,
+                            "/_matrix/client/api/v1": resource,
+                        }
+                    )
 
         root_resource = create_resource_tree(resources, NoResource())
 
@@ -128,7 +132,7 @@ class UserDirectoryServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse user_dir now listening on port %d", port)
@@ -142,18 +146,19 @@ class UserDirectoryServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -186,9 +191,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
 
 def start(config_options):
     try:
-        config = HomeServerConfig.load_config(
-            "Synapse user directory", config_options
-        )
+        config = HomeServerConfig.load_config("Synapse user directory", config_options)
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
@@ -227,6 +230,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-user-dir", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])