summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/app/__init__.py4
-rw-r--r--synapse/app/_base.py77
-rw-r--r--synapse/app/appservice.py29
-rw-r--r--synapse/app/client_reader.py33
-rw-r--r--synapse/app/event_creator.py39
-rw-r--r--synapse/app/federation_reader.py38
-rw-r--r--synapse/app/federation_sender.py46
-rw-r--r--synapse/app/frontend_proxy.py69
-rwxr-xr-xsynapse/app/homeserver.py163
-rw-r--r--synapse/app/media_repository.py37
-rw-r--r--synapse/app/pusher.py60
-rw-r--r--synapse/app/synchrotron.py90
-rw-r--r--synapse/app/user_dir.py45
-rw-r--r--synapse/appservice/__init__.py39
-rw-r--r--synapse/appservice/api.py66
-rw-r--r--synapse/appservice/scheduler.py50
16 files changed, 429 insertions, 456 deletions
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index f56f5fcc13..d877c77834 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -43,7 +43,7 @@ def check_bind_error(e, address, bind_addresses):
         address (str): Address on which binding was attempted.
         bind_addresses (list): Addresses on which the service listens.
     """
-    if address == '0.0.0.0' and '::' in bind_addresses:
-        logger.warn('Failed to listen on 0.0.0.0, continuing because listening on [::]')
+    if address == "0.0.0.0" and "::" in bind_addresses:
+        logger.warn("Failed to listen on 0.0.0.0, continuing because listening on [::]")
     else:
         raise e
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 8cc990399f..df4c2d4c97 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -75,14 +75,14 @@ def start_worker_reactor(appname, config):
 
 
 def start_reactor(
-        appname,
-        soft_file_limit,
-        gc_thresholds,
-        pid_file,
-        daemonize,
-        cpu_affinity,
-        print_pidfile,
-        logger,
+    appname,
+    soft_file_limit,
+    gc_thresholds,
+    pid_file,
+    daemonize,
+    cpu_affinity,
+    print_pidfile,
+    logger,
 ):
     """ Run the reactor in the main process
 
@@ -149,10 +149,10 @@ def start_reactor(
 def quit_with_error(error_string):
     message_lines = error_string.split("\n")
     line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2
-    sys.stderr.write("*" * line_length + '\n')
+    sys.stderr.write("*" * line_length + "\n")
     for line in message_lines:
         sys.stderr.write(" %s\n" % (line.rstrip(),))
-    sys.stderr.write("*" * line_length + '\n')
+    sys.stderr.write("*" * line_length + "\n")
     sys.exit(1)
 
 
@@ -178,14 +178,7 @@ def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
     r = []
     for address in bind_addresses:
         try:
-            r.append(
-                reactor.listenTCP(
-                    port,
-                    factory,
-                    backlog,
-                    address
-                )
-            )
+            r.append(reactor.listenTCP(port, factory, backlog, address))
         except error.CannotListenError as e:
             check_bind_error(e, address, bind_addresses)
 
@@ -205,13 +198,7 @@ def listen_ssl(
     for address in bind_addresses:
         try:
             r.append(
-                reactor.listenSSL(
-                    port,
-                    factory,
-                    context_factory,
-                    backlog,
-                    address
-                )
+                reactor.listenSSL(port, factory, context_factory, backlog, address)
             )
         except error.CannotListenError as e:
             check_bind_error(e, address, bind_addresses)
@@ -243,15 +230,13 @@ def refresh_certificate(hs):
             if isinstance(i.factory, TLSMemoryBIOFactory):
                 addr = i.getHost()
                 logger.info(
-                    "Replacing TLS context factory on [%s]:%i", addr.host, addr.port,
+                    "Replacing TLS context factory on [%s]:%i", addr.host, addr.port
                 )
                 # We want to replace TLS factories with a new one, with the new
                 # TLS configuration. We do this by reaching in and pulling out
                 # the wrappedFactory, and then re-wrapping it.
                 i.factory = TLSMemoryBIOFactory(
-                    hs.tls_server_context_factory,
-                    False,
-                    i.factory.wrappedFactory
+                    hs.tls_server_context_factory, False, i.factory.wrappedFactory
                 )
         logger.info("Context factories updated.")
 
@@ -267,6 +252,7 @@ def start(hs, listeners=None):
     try:
         # Set up the SIGHUP machinery.
         if hasattr(signal, "SIGHUP"):
+
             def handle_sighup(*args, **kwargs):
                 for i in _sighup_callbacks:
                     i(hs)
@@ -302,10 +288,8 @@ def setup_sentry(hs):
         return
 
     import sentry_sdk
-    sentry_sdk.init(
-        dsn=hs.config.sentry_dsn,
-        release=get_version_string(synapse),
-    )
+
+    sentry_sdk.init(dsn=hs.config.sentry_dsn, release=get_version_string(synapse))
 
     # We set some default tags that give some context to this instance
     with sentry_sdk.configure_scope() as scope:
@@ -326,7 +310,7 @@ def install_dns_limiter(reactor, max_dns_requests_in_flight=100):
     many DNS queries at once
     """
     new_resolver = _LimitedHostnameResolver(
-        reactor.nameResolver, max_dns_requests_in_flight,
+        reactor.nameResolver, max_dns_requests_in_flight
     )
 
     reactor.installNameResolver(new_resolver)
@@ -339,11 +323,17 @@ class _LimitedHostnameResolver(object):
     def __init__(self, resolver, max_dns_requests_in_flight):
         self._resolver = resolver
         self._limiter = Linearizer(
-            name="dns_client_limiter", max_count=max_dns_requests_in_flight,
+            name="dns_client_limiter", max_count=max_dns_requests_in_flight
         )
 
-    def resolveHostName(self, resolutionReceiver, hostName, portNumber=0,
-                        addressTypes=None, transportSemantics='TCP'):
+    def resolveHostName(
+        self,
+        resolutionReceiver,
+        hostName,
+        portNumber=0,
+        addressTypes=None,
+        transportSemantics="TCP",
+    ):
         # We need this function to return `resolutionReceiver` so we do all the
         # actual logic involving deferreds in a separate function.
 
@@ -363,8 +353,14 @@ class _LimitedHostnameResolver(object):
         return resolutionReceiver
 
     @defer.inlineCallbacks
-    def _resolve(self, resolutionReceiver, hostName, portNumber=0,
-                 addressTypes=None, transportSemantics='TCP'):
+    def _resolve(
+        self,
+        resolutionReceiver,
+        hostName,
+        portNumber=0,
+        addressTypes=None,
+        transportSemantics="TCP",
+    ):
 
         with (yield self._limiter.queue(())):
             # resolveHostName doesn't return a Deferred, so we need to hook into
@@ -374,8 +370,7 @@ class _LimitedHostnameResolver(object):
             receiver = _DeferredResolutionReceiver(resolutionReceiver, deferred)
 
             self._resolver.resolveHostName(
-                receiver, hostName, portNumber,
-                addressTypes, transportSemantics,
+                receiver, hostName, portNumber, addressTypes, transportSemantics
             )
 
             yield deferred
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 33107f56d1..9120bdb143 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -44,7 +44,9 @@ logger = logging.getLogger("synapse.app.appservice")
 
 
 class AppserviceSlaveStore(
-    DirectoryStore, SlavedEventStore, SlavedApplicationServiceStore,
+    DirectoryStore,
+    SlavedEventStore,
+    SlavedApplicationServiceStore,
     SlavedRegistrationStore,
 ):
     pass
@@ -74,7 +76,7 @@ class AppserviceServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse appservice now listening on port %d", port)
@@ -88,18 +90,19 @@ class AppserviceServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -132,9 +135,7 @@ class ASReplicationHandler(ReplicationClientHandler):
 
 def start(config_options):
     try:
-        config = HomeServerConfig.load_config(
-            "Synapse appservice", config_options
-        )
+        config = HomeServerConfig.load_config("Synapse appservice", config_options)
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
@@ -173,6 +174,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-appservice", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index a16e037f32..90bc79cdda 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -37,6 +37,7 @@ from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.groups import SlavedGroupServerStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.profile import SlavedProfileStore
 from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
@@ -52,6 +53,7 @@ from synapse.rest.client.v1.room import (
     PublicRoomListRestServlet,
     RoomEventContextServlet,
     RoomMemberListRestServlet,
+    RoomMessageListRestServlet,
     RoomStateRestServlet,
 )
 from synapse.rest.client.v1.voip import VoipRestServlet
@@ -74,6 +76,7 @@ class ClientReaderSlavedStore(
     SlavedDeviceStore,
     SlavedReceiptsStore,
     SlavedPushRuleStore,
+    SlavedGroupServerStore,
     SlavedAccountDataStore,
     SlavedEventStore,
     SlavedKeyStore,
@@ -109,6 +112,7 @@ class ClientReaderServer(HomeServer):
                     JoinedRoomMemberListRestServlet(self).register(resource)
                     RoomStateRestServlet(self).register(resource)
                     RoomEventContextServlet(self).register(resource)
+                    RoomMessageListRestServlet(self).register(resource)
                     RegisterRestServlet(self).register(resource)
                     LoginRestServlet(self).register(resource)
                     ThreepidRestServlet(self).register(resource)
@@ -118,9 +122,7 @@ class ClientReaderServer(HomeServer):
                     PushRuleRestServlet(self).register(resource)
                     VersionsRestServlet().register(resource)
 
-                    resources.update({
-                        "/_matrix/client": resource,
-                    })
+                    resources.update({"/_matrix/client": resource})
 
         root_resource = create_resource_tree(resources, NoResource())
 
@@ -133,7 +135,7 @@ class ClientReaderServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse client reader now listening on port %d", port)
@@ -147,18 +149,19 @@ class ClientReaderServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -170,9 +173,7 @@ class ClientReaderServer(HomeServer):
 
 def start(config_options):
     try:
-        config = HomeServerConfig.load_config(
-            "Synapse client reader", config_options
-        )
+        config = HomeServerConfig.load_config("Synapse client reader", config_options)
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
@@ -199,6 +200,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-client-reader", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index b8e5196152..ff522e4499 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -109,12 +109,14 @@ class EventCreatorServer(HomeServer):
                     ProfileAvatarURLRestServlet(self).register(resource)
                     ProfileDisplaynameRestServlet(self).register(resource)
                     ProfileRestServlet(self).register(resource)
-                    resources.update({
-                        "/_matrix/client/r0": resource,
-                        "/_matrix/client/unstable": resource,
-                        "/_matrix/client/v2_alpha": resource,
-                        "/_matrix/client/api/v1": resource,
-                    })
+                    resources.update(
+                        {
+                            "/_matrix/client/r0": resource,
+                            "/_matrix/client/unstable": resource,
+                            "/_matrix/client/v2_alpha": resource,
+                            "/_matrix/client/api/v1": resource,
+                        }
+                    )
 
         root_resource = create_resource_tree(resources, NoResource())
 
@@ -127,7 +129,7 @@ class EventCreatorServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse event creator now listening on port %d", port)
@@ -141,18 +143,19 @@ class EventCreatorServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -164,9 +167,7 @@ class EventCreatorServer(HomeServer):
 
 def start(config_options):
     try:
-        config = HomeServerConfig.load_config(
-            "Synapse event creator", config_options
-        )
+        config = HomeServerConfig.load_config("Synapse event creator", config_options)
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
@@ -198,6 +199,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-event-creator", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 7da79dc827..9421420930 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -86,19 +86,18 @@ class FederationReaderServer(HomeServer):
                 if name == "metrics":
                     resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
                 elif name == "federation":
-                    resources.update({
-                        FEDERATION_PREFIX: TransportLayerServer(self),
-                    })
+                    resources.update({FEDERATION_PREFIX: TransportLayerServer(self)})
                 if name == "openid" and "federation" not in res["names"]:
                     # Only load the openid resource separately if federation resource
                     # is not specified since federation resource includes openid
                     # resource.
-                    resources.update({
-                        FEDERATION_PREFIX: TransportLayerServer(
-                            self,
-                            servlet_groups=["openid"],
-                        ),
-                    })
+                    resources.update(
+                        {
+                            FEDERATION_PREFIX: TransportLayerServer(
+                                self, servlet_groups=["openid"]
+                            )
+                        }
+                    )
 
                 if name in ["keys", "federation"]:
                     resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
@@ -115,7 +114,7 @@ class FederationReaderServer(HomeServer):
                 root_resource,
                 self.version_string,
             ),
-            reactor=self.get_reactor()
+            reactor=self.get_reactor(),
         )
 
         logger.info("Synapse federation reader now listening on port %d", port)
@@ -129,18 +128,19 @@ class FederationReaderServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -181,6 +181,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-federation-reader", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 1d43f2b075..969be58d0b 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -52,8 +52,13 @@ logger = logging.getLogger("synapse.app.federation_sender")
 
 
 class FederationSenderSlaveStore(
-    SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
-    SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
+    SlavedDeviceInboxStore,
+    SlavedTransactionStore,
+    SlavedReceiptsStore,
+    SlavedEventStore,
+    SlavedRegistrationStore,
+    SlavedDeviceStore,
+    SlavedPresenceStore,
 ):
     def __init__(self, db_conn, hs):
         super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
@@ -65,10 +70,7 @@ class FederationSenderSlaveStore(
         self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
 
     def _get_federation_out_pos(self, db_conn):
-        sql = (
-            "SELECT stream_id FROM federation_stream_position"
-            " WHERE type = ?"
-        )
+        sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?"
         sql = self.database_engine.convert_param_style(sql)
 
         txn = db_conn.cursor()
@@ -103,7 +105,7 @@ class FederationSenderServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse federation_sender now listening on port %d", port)
@@ -117,18 +119,19 @@ class FederationSenderServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -151,7 +154,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
         self.send_handler.process_replication_rows(stream_name, token, rows)
 
     def get_streams_to_replicate(self):
-        args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate()
+        args = super(
+            FederationSenderReplicationHandler, self
+        ).get_streams_to_replicate()
         args.update(self.send_handler.stream_positions())
         return args
 
@@ -203,6 +208,7 @@ class FederationSenderHandler(object):
     """Processes the replication stream and forwards the appropriate entries
     to the federation sender.
     """
+
     def __init__(self, hs, replication_client):
         self.store = hs.get_datastore()
         self._is_mine_id = hs.is_mine_id
@@ -241,7 +247,7 @@ class FederationSenderHandler(object):
         # ... and when new receipts happen
         elif stream_name == ReceiptsStream.NAME:
             run_as_background_process(
-                "process_receipts_for_federation", self._on_new_receipts, rows,
+                "process_receipts_for_federation", self._on_new_receipts, rows
             )
 
     @defer.inlineCallbacks
@@ -278,12 +284,14 @@ class FederationSenderHandler(object):
 
                     # We ACK this token over replication so that the master can drop
                     # its in memory queues
-                    self.replication_client.send_federation_ack(self.federation_position)
+                    self.replication_client.send_federation_ack(
+                        self.federation_position
+                    )
                     self._last_ack = self.federation_position
         except Exception:
             logger.exception("Error updating federation stream position")
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 6504da5278..2fd7d57ebf 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -62,14 +62,11 @@ class PresenceStatusStubServlet(RestServlet):
         # Pass through the auth headers, if any, in case the access token
         # is there.
         auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
-        headers = {
-            "Authorization": auth_headers,
-        }
+        headers = {"Authorization": auth_headers}
 
         try:
             result = yield self.http_client.get_json(
-                self.main_uri + request.uri.decode('ascii'),
-                headers=headers,
+                self.main_uri + request.uri.decode("ascii"), headers=headers
             )
         except HttpResponseException as e:
             raise e.to_synapse_error()
@@ -105,18 +102,19 @@ class KeyUploadServlet(RestServlet):
         if device_id is not None:
             # passing the device_id here is deprecated; however, we allow it
             # for now for compatibility with older clients.
-            if (requester.device_id is not None and
-                    device_id != requester.device_id):
-                logger.warning("Client uploading keys for a different device "
-                               "(logged in as %s, uploading for %s)",
-                               requester.device_id, device_id)
+            if requester.device_id is not None and device_id != requester.device_id:
+                logger.warning(
+                    "Client uploading keys for a different device "
+                    "(logged in as %s, uploading for %s)",
+                    requester.device_id,
+                    device_id,
+                )
         else:
             device_id = requester.device_id
 
         if device_id is None:
             raise SynapseError(
-                400,
-                "To upload keys, you must pass device_id when authenticating"
+                400, "To upload keys, you must pass device_id when authenticating"
             )
 
         if body:
@@ -124,13 +122,9 @@ class KeyUploadServlet(RestServlet):
             # Pass through the auth headers, if any, in case the access token
             # is there.
             auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", [])
-            headers = {
-                "Authorization": auth_headers,
-            }
+            headers = {"Authorization": auth_headers}
             result = yield self.http_client.post_json_get_json(
-                self.main_uri + request.uri.decode('ascii'),
-                body,
-                headers=headers,
+                self.main_uri + request.uri.decode("ascii"), body, headers=headers
             )
 
             defer.returnValue((200, result))
@@ -171,12 +165,14 @@ class FrontendProxyServer(HomeServer):
                     if not self.config.use_presence:
                         PresenceStatusStubServlet(self).register(resource)
 
-                    resources.update({
-                        "/_matrix/client/r0": resource,
-                        "/_matrix/client/unstable": resource,
-                        "/_matrix/client/v2_alpha": resource,
-                        "/_matrix/client/api/v1": resource,
-                    })
+                    resources.update(
+                        {
+                            "/_matrix/client/r0": resource,
+                            "/_matrix/client/unstable": resource,
+                            "/_matrix/client/v2_alpha": resource,
+                            "/_matrix/client/api/v1": resource,
+                        }
+                    )
 
         root_resource = create_resource_tree(resources, NoResource())
 
@@ -190,7 +186,7 @@ class FrontendProxyServer(HomeServer):
                 root_resource,
                 self.version_string,
             ),
-            reactor=self.get_reactor()
+            reactor=self.get_reactor(),
         )
 
         logger.info("Synapse client reader now listening on port %d", port)
@@ -204,18 +200,19 @@ class FrontendProxyServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -227,9 +224,7 @@ class FrontendProxyServer(HomeServer):
 
 def start(config_options):
     try:
-        config = HomeServerConfig.load_config(
-            "Synapse frontend proxy", config_options
-        )
+        config = HomeServerConfig.load_config("Synapse frontend proxy", config_options)
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
@@ -258,6 +253,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-frontend-proxy", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index b27b12e73d..d19c7c7d71 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -101,13 +101,12 @@ class SynapseHomeServer(HomeServer):
                     # Skip loading openid resource if federation is defined
                     # since federation resource will include openid
                     continue
-                resources.update(self._configure_named_resource(
-                    name, res.get("compress", False),
-                ))
+                resources.update(
+                    self._configure_named_resource(name, res.get("compress", False))
+                )
 
         additional_resources = listener_config.get("additional_resources", {})
-        logger.debug("Configuring additional resources: %r",
-                     additional_resources)
+        logger.debug("Configuring additional resources: %r", additional_resources)
         module_api = ModuleApi(self, self.get_auth_handler())
         for path, resmodule in additional_resources.items():
             handler_cls, config = load_module(resmodule)
@@ -174,59 +173,67 @@ class SynapseHomeServer(HomeServer):
             if compress:
                 client_resource = gz_wrap(client_resource)
 
-            resources.update({
-                "/_matrix/client/api/v1": client_resource,
-                "/_matrix/client/r0": client_resource,
-                "/_matrix/client/unstable": client_resource,
-                "/_matrix/client/v2_alpha": client_resource,
-                "/_matrix/client/versions": client_resource,
-                "/.well-known/matrix/client": WellKnownResource(self),
-                "/_synapse/admin": AdminRestResource(self),
-            })
+            resources.update(
+                {
+                    "/_matrix/client/api/v1": client_resource,
+                    "/_matrix/client/r0": client_resource,
+                    "/_matrix/client/unstable": client_resource,
+                    "/_matrix/client/v2_alpha": client_resource,
+                    "/_matrix/client/versions": client_resource,
+                    "/.well-known/matrix/client": WellKnownResource(self),
+                    "/_synapse/admin": AdminRestResource(self),
+                }
+            )
 
             if self.get_config().saml2_enabled:
                 from synapse.rest.saml2 import SAML2Resource
+
                 resources["/_matrix/saml2"] = SAML2Resource(self)
 
         if name == "consent":
             from synapse.rest.consent.consent_resource import ConsentResource
+
             consent_resource = ConsentResource(self)
             if compress:
                 consent_resource = gz_wrap(consent_resource)
-            resources.update({
-                "/_matrix/consent": consent_resource,
-            })
+            resources.update({"/_matrix/consent": consent_resource})
 
         if name == "federation":
-            resources.update({
-                FEDERATION_PREFIX: TransportLayerServer(self),
-            })
+            resources.update({FEDERATION_PREFIX: TransportLayerServer(self)})
 
         if name == "openid":
-            resources.update({
-                FEDERATION_PREFIX: TransportLayerServer(self, servlet_groups=["openid"]),
-            })
+            resources.update(
+                {
+                    FEDERATION_PREFIX: TransportLayerServer(
+                        self, servlet_groups=["openid"]
+                    )
+                }
+            )
 
         if name in ["static", "client"]:
-            resources.update({
-                STATIC_PREFIX: File(
-                    os.path.join(os.path.dirname(synapse.__file__), "static")
-                ),
-            })
+            resources.update(
+                {
+                    STATIC_PREFIX: File(
+                        os.path.join(os.path.dirname(synapse.__file__), "static")
+                    )
+                }
+            )
 
         if name in ["media", "federation", "client"]:
             if self.get_config().enable_media_repo:
                 media_repo = self.get_media_repository_resource()
-                resources.update({
-                    MEDIA_PREFIX: media_repo,
-                    LEGACY_MEDIA_PREFIX: media_repo,
-                    CONTENT_REPO_PREFIX: ContentRepoResource(
-                        self, self.config.uploads_path
-                    ),
-                })
+                resources.update(
+                    {
+                        MEDIA_PREFIX: media_repo,
+                        LEGACY_MEDIA_PREFIX: media_repo,
+                        CONTENT_REPO_PREFIX: ContentRepoResource(
+                            self, self.config.uploads_path
+                        ),
+                    }
+                )
             elif name == "media":
                 raise ConfigError(
-                    "'media' resource conflicts with enable_media_repo=False",
+                    "'media' resource conflicts with enable_media_repo=False"
                 )
 
         if name in ["keys", "federation"]:
@@ -257,18 +264,14 @@ class SynapseHomeServer(HomeServer):
 
         for listener in listeners:
             if listener["type"] == "http":
-                self._listening_services.extend(
-                    self._listener_http(config, listener)
-                )
+                self._listening_services.extend(self._listener_http(config, listener))
             elif listener["type"] == "manhole":
                 listen_tcp(
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "replication":
                 services = listen_tcp(
@@ -277,16 +280,17 @@ class SynapseHomeServer(HomeServer):
                     ReplicationStreamProtocolFactory(self),
                 )
                 for s in services:
-                    reactor.addSystemEventTrigger(
-                        "before", "shutdown", s.stopListening,
-                    )
+                    reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -312,7 +316,7 @@ current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
 max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
 registered_reserved_users_mau_gauge = Gauge(
     "synapse_admin_mau:registered_reserved_users",
-    "Registered users with reserved threepids"
+    "Registered users with reserved threepids",
 )
 
 
@@ -327,8 +331,7 @@ def setup(config_options):
     """
     try:
         config = HomeServerConfig.load_or_generate_config(
-            "Synapse Homeserver",
-            config_options,
+            "Synapse Homeserver", config_options
         )
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
@@ -339,10 +342,7 @@ def setup(config_options):
         # generating config files and shouldn't try to continue.
         sys.exit(0)
 
-    synapse.config.logger.setup_logging(
-        config,
-        use_worker_options=False
-    )
+    synapse.config.logger.setup_logging(config, use_worker_options=False)
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
@@ -357,7 +357,7 @@ def setup(config_options):
         database_engine=database_engine,
     )
 
-    logger.info("Preparing database: %s...", config.database_config['name'])
+    logger.info("Preparing database: %s...", config.database_config["name"])
 
     try:
         with hs.get_db_conn(run_new_connection=False) as db_conn:
@@ -375,7 +375,7 @@ def setup(config_options):
         )
         sys.exit(1)
 
-    logger.info("Database prepared in %s.", config.database_config['name'])
+    logger.info("Database prepared in %s.", config.database_config["name"])
 
     hs.setup()
     hs.setup_master()
@@ -391,9 +391,7 @@ def setup(config_options):
         acme = hs.get_acme_handler()
 
         # Check how long the certificate is active for.
-        cert_days_remaining = hs.config.is_disk_cert_valid(
-            allow_self_signed=False
-        )
+        cert_days_remaining = hs.config.is_disk_cert_valid(allow_self_signed=False)
 
         # We want to reprovision if cert_days_remaining is None (meaning no
         # certificate exists), or the days remaining number it returns
@@ -401,8 +399,8 @@ def setup(config_options):
         provision = False
 
         if (
-            cert_days_remaining is None or
-            cert_days_remaining < hs.config.acme_reprovision_threshold
+            cert_days_remaining is None
+            or cert_days_remaining < hs.config.acme_reprovision_threshold
         ):
             provision = True
 
@@ -433,10 +431,7 @@ def setup(config_options):
                 yield do_acme()
 
                 # Check if it needs to be reprovisioned every day.
-                hs.get_clock().looping_call(
-                    reprovision_acme,
-                    24 * 60 * 60 * 1000
-                )
+                hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)
 
             _base.start(hs, config.listeners)
 
@@ -463,6 +458,7 @@ class SynapseService(service.Service):
     A twisted Service class that will start synapse. Used to run synapse
     via twistd and a .tac.
     """
+
     def __init__(self, config):
         self.config = config
 
@@ -479,6 +475,7 @@ class SynapseService(service.Service):
 def run(hs):
     PROFILE_SYNAPSE = False
     if PROFILE_SYNAPSE:
+
         def profile(func):
             from cProfile import Profile
             from threading import current_thread
@@ -489,13 +486,14 @@ def run(hs):
                 func(*args, **kargs)
                 profile.disable()
                 ident = current_thread().ident
-                profile.dump_stats("/tmp/%s.%s.%i.pstat" % (
-                    hs.hostname, func.__name__, ident
-                ))
+                profile.dump_stats(
+                    "/tmp/%s.%s.%i.pstat" % (hs.hostname, func.__name__, ident)
+                )
 
             return profiled
 
         from twisted.python.threadpool import ThreadPool
+
         ThreadPool._worker = profile(ThreadPool._worker)
         reactor.run = profile(reactor.run)
 
@@ -541,7 +539,9 @@ def run(hs):
 
         stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
         stats["monthly_active_users"] = yield hs.get_datastore().count_monthly_users()
-        stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
+        stats[
+            "daily_active_rooms"
+        ] = yield hs.get_datastore().count_daily_active_rooms()
         stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
 
         r30_results = yield hs.get_datastore().count_r30_users()
@@ -565,8 +565,7 @@ def run(hs):
         logger.info("Reporting stats to matrix.org: %s" % (stats,))
         try:
             yield hs.get_simple_http_client().put_json(
-                "https://matrix.org/report-usage-stats/push",
-                stats
+                "https://matrix.org/report-usage-stats/push", stats
             )
         except Exception as e:
             logger.warn("Error reporting stats: %s", e)
@@ -581,14 +580,11 @@ def run(hs):
             logger.info("report_stats can use psutil")
             stats_process.append(process)
         except (AttributeError):
-            logger.warning(
-                "Unable to read memory/cpu stats. Disabling reporting."
-            )
+            logger.warning("Unable to read memory/cpu stats. Disabling reporting.")
 
     def generate_user_daily_visit_stats():
         return run_as_background_process(
-            "generate_user_daily_visits",
-            hs.get_datastore().generate_user_daily_visits,
+            "generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits
         )
 
     # Rather than update on per session basis, batch up the requests.
@@ -599,9 +595,9 @@ def run(hs):
     # monthly active user limiting functionality
     def reap_monthly_active_users():
         return run_as_background_process(
-            "reap_monthly_active_users",
-            hs.get_datastore().reap_monthly_active_users,
+            "reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users
         )
+
     clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
     reap_monthly_active_users()
 
@@ -619,8 +615,7 @@ def run(hs):
 
     def start_generate_monthly_active_users():
         return run_as_background_process(
-            "generate_monthly_active_users",
-            generate_monthly_active_users,
+            "generate_monthly_active_users", generate_monthly_active_users
         )
 
     start_generate_monthly_active_users()
@@ -660,5 +655,5 @@ def main():
         run(hs)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     main()
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index d4cc4e9443..cf0e2036c3 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -72,13 +72,15 @@ class MediaRepositoryServer(HomeServer):
                     resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
                 elif name == "media":
                     media_repo = self.get_media_repository_resource()
-                    resources.update({
-                        MEDIA_PREFIX: media_repo,
-                        LEGACY_MEDIA_PREFIX: media_repo,
-                        CONTENT_REPO_PREFIX: ContentRepoResource(
-                            self, self.config.uploads_path
-                        ),
-                    })
+                    resources.update(
+                        {
+                            MEDIA_PREFIX: media_repo,
+                            LEGACY_MEDIA_PREFIX: media_repo,
+                            CONTENT_REPO_PREFIX: ContentRepoResource(
+                                self, self.config.uploads_path
+                            ),
+                        }
+                    )
 
         root_resource = create_resource_tree(resources, NoResource())
 
@@ -91,7 +93,7 @@ class MediaRepositoryServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse media repository now listening on port %d", port)
@@ -105,18 +107,19 @@ class MediaRepositoryServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -164,6 +167,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-media-repository", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index cbf0d67f51..df29ea5ecb 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -46,36 +46,27 @@ logger = logging.getLogger("synapse.app.pusher")
 
 
 class PusherSlaveStore(
-    SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
-    SlavedAccountDataStore
+    SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, SlavedAccountDataStore
 ):
-    update_pusher_last_stream_ordering_and_success = (
-        __func__(DataStore.update_pusher_last_stream_ordering_and_success)
+    update_pusher_last_stream_ordering_and_success = __func__(
+        DataStore.update_pusher_last_stream_ordering_and_success
     )
 
-    update_pusher_failing_since = (
-        __func__(DataStore.update_pusher_failing_since)
-    )
+    update_pusher_failing_since = __func__(DataStore.update_pusher_failing_since)
 
-    update_pusher_last_stream_ordering = (
-        __func__(DataStore.update_pusher_last_stream_ordering)
+    update_pusher_last_stream_ordering = __func__(
+        DataStore.update_pusher_last_stream_ordering
     )
 
-    get_throttle_params_by_room = (
-        __func__(DataStore.get_throttle_params_by_room)
-    )
+    get_throttle_params_by_room = __func__(DataStore.get_throttle_params_by_room)
 
-    set_throttle_params = (
-        __func__(DataStore.set_throttle_params)
-    )
+    set_throttle_params = __func__(DataStore.set_throttle_params)
 
-    get_time_of_last_push_action_before = (
-        __func__(DataStore.get_time_of_last_push_action_before)
+    get_time_of_last_push_action_before = __func__(
+        DataStore.get_time_of_last_push_action_before
     )
 
-    get_profile_displayname = (
-        __func__(DataStore.get_profile_displayname)
-    )
+    get_profile_displayname = __func__(DataStore.get_profile_displayname)
 
 
 class PusherServer(HomeServer):
@@ -105,7 +96,7 @@ class PusherServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse pusher now listening on port %d", port)
@@ -119,18 +110,19 @@ class PusherServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -161,9 +153,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
                     else:
                         yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
             elif stream_name == "events":
-                yield self.pusher_pool.on_new_notifications(
-                    token, token,
-                )
+                yield self.pusher_pool.on_new_notifications(token, token)
             elif stream_name == "receipts":
                 yield self.pusher_pool.on_new_receipts(
                     token, token, set(row.room_id for row in rows)
@@ -188,9 +178,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
 
 def start(config_options):
     try:
-        config = HomeServerConfig.load_config(
-            "Synapse pusher", config_options
-        )
+        config = HomeServerConfig.load_config("Synapse pusher", config_options)
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
@@ -234,6 +222,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-pusher", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         ps = start(sys.argv[1:])
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 5388def28a..858949910d 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -98,10 +98,7 @@ class SynchrotronPresence(object):
         self.notifier = hs.get_notifier()
 
         active_presence = self.store.take_presence_startup_info()
-        self.user_to_current_state = {
-            state.user_id: state
-            for state in active_presence
-        }
+        self.user_to_current_state = {state.user_id: state for state in active_presence}
 
         # user_id -> last_sync_ms. Lists the users that have stopped syncing
         # but we haven't notified the master of that yet
@@ -196,17 +193,26 @@ class SynchrotronPresence(object):
         room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
-            "presence_key", stream_id, rooms=room_ids_to_states.keys(),
-            users=users_to_states.keys()
+            "presence_key",
+            stream_id,
+            rooms=room_ids_to_states.keys(),
+            users=users_to_states.keys(),
         )
 
     @defer.inlineCallbacks
     def process_replication_rows(self, token, rows):
-        states = [UserPresenceState(
-            row.user_id, row.state, row.last_active_ts,
-            row.last_federation_update_ts, row.last_user_sync_ts, row.status_msg,
-            row.currently_active
-        ) for row in rows]
+        states = [
+            UserPresenceState(
+                row.user_id,
+                row.state,
+                row.last_active_ts,
+                row.last_federation_update_ts,
+                row.last_user_sync_ts,
+                row.status_msg,
+                row.currently_active,
+            )
+            for row in rows
+        ]
 
         for state in states:
             self.user_to_current_state[state.user_id] = state
@@ -217,7 +223,8 @@ class SynchrotronPresence(object):
     def get_currently_syncing_users(self):
         if self.hs.config.use_presence:
             return [
-                user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
+                user_id
+                for user_id, count in iteritems(self.user_to_num_current_syncs)
                 if count > 0
             ]
         else:
@@ -281,12 +288,14 @@ class SynchrotronServer(HomeServer):
                     events.register_servlets(self, resource)
                     InitialSyncRestServlet(self).register(resource)
                     RoomInitialSyncRestServlet(self).register(resource)
-                    resources.update({
-                        "/_matrix/client/r0": resource,
-                        "/_matrix/client/unstable": resource,
-                        "/_matrix/client/v2_alpha": resource,
-                        "/_matrix/client/api/v1": resource,
-                    })
+                    resources.update(
+                        {
+                            "/_matrix/client/r0": resource,
+                            "/_matrix/client/unstable": resource,
+                            "/_matrix/client/v2_alpha": resource,
+                            "/_matrix/client/api/v1": resource,
+                        }
+                    )
 
         root_resource = create_resource_tree(resources, NoResource())
 
@@ -299,7 +308,7 @@ class SynchrotronServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse synchrotron now listening on port %d", port)
@@ -313,18 +322,19 @@ class SynchrotronServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -382,40 +392,36 @@ class SyncReplicationHandler(ReplicationClientHandler):
                     )
             elif stream_name == "push_rules":
                 self.notifier.on_new_event(
-                    "push_rules_key", token, users=[row.user_id for row in rows],
+                    "push_rules_key", token, users=[row.user_id for row in rows]
                 )
-            elif stream_name in ("account_data", "tag_account_data",):
+            elif stream_name in ("account_data", "tag_account_data"):
                 self.notifier.on_new_event(
-                    "account_data_key", token, users=[row.user_id for row in rows],
+                    "account_data_key", token, users=[row.user_id for row in rows]
                 )
             elif stream_name == "receipts":
                 self.notifier.on_new_event(
-                    "receipt_key", token, rooms=[row.room_id for row in rows],
+                    "receipt_key", token, rooms=[row.room_id for row in rows]
                 )
             elif stream_name == "typing":
                 self.typing_handler.process_replication_rows(token, rows)
                 self.notifier.on_new_event(
-                    "typing_key", token, rooms=[row.room_id for row in rows],
+                    "typing_key", token, rooms=[row.room_id for row in rows]
                 )
             elif stream_name == "to_device":
                 entities = [row.entity for row in rows if row.entity.startswith("@")]
                 if entities:
-                    self.notifier.on_new_event(
-                        "to_device_key", token, users=entities,
-                    )
+                    self.notifier.on_new_event("to_device_key", token, users=entities)
             elif stream_name == "device_lists":
                 all_room_ids = set()
                 for row in rows:
                     room_ids = yield self.store.get_rooms_for_user(row.user_id)
                     all_room_ids.update(room_ids)
-                self.notifier.on_new_event(
-                    "device_list_key", token, rooms=all_room_ids,
-                )
+                self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
             elif stream_name == "presence":
                 yield self.presence_handler.process_replication_rows(token, rows)
             elif stream_name == "receipts":
                 self.notifier.on_new_event(
-                    "groups_key", token, users=[row.user_id for row in rows],
+                    "groups_key", token, users=[row.user_id for row in rows]
                 )
         except Exception:
             logger.exception("Error processing replication")
@@ -423,9 +429,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
 
 def start(config_options):
     try:
-        config = HomeServerConfig.load_config(
-            "Synapse synchrotron", config_options
-        )
+        config = HomeServerConfig.load_config("Synapse synchrotron", config_options)
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
@@ -453,6 +457,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-synchrotron", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 355f5aa71d..2d9d2e1bbc 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -66,14 +66,16 @@ class UserDirectorySlaveStore(
 
         events_max = self._stream_id_gen.get_current_token()
         curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
-            db_conn, "current_state_delta_stream",
+            db_conn,
+            "current_state_delta_stream",
             entity_column="room_id",
             stream_column="stream_id",
             max_value=events_max,  # As we share the stream id with events token
             limit=1000,
         )
         self._curr_state_delta_stream_cache = StreamChangeCache(
-            "_curr_state_delta_stream_cache", min_curr_state_delta_id,
+            "_curr_state_delta_stream_cache",
+            min_curr_state_delta_id,
             prefilled_cache=curr_state_delta_prefill,
         )
 
@@ -110,12 +112,14 @@ class UserDirectoryServer(HomeServer):
                 elif name == "client":
                     resource = JsonResource(self, canonical_json=False)
                     user_directory.register_servlets(self, resource)
-                    resources.update({
-                        "/_matrix/client/r0": resource,
-                        "/_matrix/client/unstable": resource,
-                        "/_matrix/client/v2_alpha": resource,
-                        "/_matrix/client/api/v1": resource,
-                    })
+                    resources.update(
+                        {
+                            "/_matrix/client/r0": resource,
+                            "/_matrix/client/unstable": resource,
+                            "/_matrix/client/v2_alpha": resource,
+                            "/_matrix/client/api/v1": resource,
+                        }
+                    )
 
         root_resource = create_resource_tree(resources, NoResource())
 
@@ -128,7 +132,7 @@ class UserDirectoryServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse user_dir now listening on port %d", port)
@@ -142,18 +146,19 @@ class UserDirectoryServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -186,9 +191,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
 
 def start(config_options):
     try:
-        config = HomeServerConfig.load_config(
-            "Synapse user directory", config_options
-        )
+        config = HomeServerConfig.load_config("Synapse user directory", config_options)
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
@@ -227,6 +230,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-user-dir", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 57ed8a3ca2..b26a31dd54 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -48,9 +48,7 @@ class AppServiceTransaction(object):
             A Deferred which resolves to True if the transaction was sent.
         """
         return as_api.push_bulk(
-            service=self.service,
-            events=self.events,
-            txn_id=self.id
+            service=self.service, events=self.events, txn_id=self.id
         )
 
     def complete(self, store):
@@ -64,10 +62,7 @@ class AppServiceTransaction(object):
         Returns:
             A Deferred which resolves to True if the transaction was completed.
         """
-        return store.complete_appservice_txn(
-            service=self.service,
-            txn_id=self.id
-        )
+        return store.complete_appservice_txn(service=self.service, txn_id=self.id)
 
 
 class ApplicationService(object):
@@ -76,6 +71,7 @@ class ApplicationService(object):
 
     Provides methods to check if this service is "interested" in events.
     """
+
     NS_USERS = "users"
     NS_ALIASES = "aliases"
     NS_ROOMS = "rooms"
@@ -84,9 +80,19 @@ class ApplicationService(object):
     # values.
     NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS]
 
-    def __init__(self, token, hostname, url=None, namespaces=None, hs_token=None,
-                 sender=None, id=None, protocols=None, rate_limited=True,
-                 ip_range_whitelist=None):
+    def __init__(
+        self,
+        token,
+        hostname,
+        url=None,
+        namespaces=None,
+        hs_token=None,
+        sender=None,
+        id=None,
+        protocols=None,
+        rate_limited=True,
+        ip_range_whitelist=None,
+    ):
         self.token = token
         self.url = url
         self.hs_token = hs_token
@@ -128,9 +134,7 @@ class ApplicationService(object):
                 if not isinstance(regex_obj, dict):
                     raise ValueError("Expected dict regex for ns '%s'" % ns)
                 if not isinstance(regex_obj.get("exclusive"), bool):
-                    raise ValueError(
-                        "Expected bool for 'exclusive' in ns '%s'" % ns
-                    )
+                    raise ValueError("Expected bool for 'exclusive' in ns '%s'" % ns)
                 group_id = regex_obj.get("group_id")
                 if group_id:
                     if not isinstance(group_id, str):
@@ -153,9 +157,7 @@ class ApplicationService(object):
                 if isinstance(regex, string_types):
                     regex_obj["regex"] = re.compile(regex)  # Pre-compile regex
                 else:
-                    raise ValueError(
-                        "Expected string for 'regex' in ns '%s'" % ns
-                    )
+                    raise ValueError("Expected string for 'regex' in ns '%s'" % ns)
         return namespaces
 
     def _matches_regex(self, test_string, namespace_key):
@@ -178,8 +180,9 @@ class ApplicationService(object):
         if self.is_interested_in_user(event.sender):
             defer.returnValue(True)
         # also check m.room.member state key
-        if (event.type == EventTypes.Member and
-                self.is_interested_in_user(event.state_key)):
+        if event.type == EventTypes.Member and self.is_interested_in_user(
+            event.state_key
+        ):
             defer.returnValue(True)
 
         if not store:
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 9ccc5a80fc..571881775b 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -32,19 +32,17 @@ logger = logging.getLogger(__name__)
 sent_transactions_counter = Counter(
     "synapse_appservice_api_sent_transactions",
     "Number of /transactions/ requests sent",
-    ["service"]
+    ["service"],
 )
 
 failed_transactions_counter = Counter(
     "synapse_appservice_api_failed_transactions",
     "Number of /transactions/ requests that failed to send",
-    ["service"]
+    ["service"],
 )
 
 sent_events_counter = Counter(
-    "synapse_appservice_api_sent_events",
-    "Number of events sent to the AS",
-    ["service"]
+    "synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"]
 )
 
 HOUR_IN_MS = 60 * 60 * 1000
@@ -92,8 +90,9 @@ class ApplicationServiceApi(SimpleHttpClient):
         super(ApplicationServiceApi, self).__init__(hs)
         self.clock = hs.get_clock()
 
-        self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta",
-                                                 timeout_ms=HOUR_IN_MS)
+        self.protocol_meta_cache = ResponseCache(
+            hs, "as_protocol_meta", timeout_ms=HOUR_IN_MS
+        )
 
     @defer.inlineCallbacks
     def query_user(self, service, user_id):
@@ -102,9 +101,7 @@ class ApplicationServiceApi(SimpleHttpClient):
         uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
         response = None
         try:
-            response = yield self.get_json(uri, {
-                "access_token": service.hs_token
-            })
+            response = yield self.get_json(uri, {"access_token": service.hs_token})
             if response is not None:  # just an empty json object
                 defer.returnValue(True)
         except CodeMessageException as e:
@@ -123,9 +120,7 @@ class ApplicationServiceApi(SimpleHttpClient):
         uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
         response = None
         try:
-            response = yield self.get_json(uri, {
-                "access_token": service.hs_token
-            })
+            response = yield self.get_json(uri, {"access_token": service.hs_token})
             if response is not None:  # just an empty json object
                 defer.returnValue(True)
         except CodeMessageException as e:
@@ -144,9 +139,7 @@ class ApplicationServiceApi(SimpleHttpClient):
         elif kind == ThirdPartyEntityKind.LOCATION:
             required_field = "alias"
         else:
-            raise ValueError(
-                "Unrecognised 'kind' argument %r to query_3pe()", kind
-            )
+            raise ValueError("Unrecognised 'kind' argument %r to query_3pe()", kind)
         if service.url is None:
             defer.returnValue([])
 
@@ -154,14 +147,13 @@ class ApplicationServiceApi(SimpleHttpClient):
             service.url,
             APP_SERVICE_PREFIX,
             kind,
-            urllib.parse.quote(protocol)
+            urllib.parse.quote(protocol),
         )
         try:
             response = yield self.get_json(uri, fields)
             if not isinstance(response, list):
                 logger.warning(
-                    "query_3pe to %s returned an invalid response %r",
-                    uri, response
+                    "query_3pe to %s returned an invalid response %r", uri, response
                 )
                 defer.returnValue([])
 
@@ -171,8 +163,7 @@ class ApplicationServiceApi(SimpleHttpClient):
                     ret.append(r)
                 else:
                     logger.warning(
-                        "query_3pe to %s returned an invalid result %r",
-                        uri, r
+                        "query_3pe to %s returned an invalid result %r", uri, r
                     )
 
             defer.returnValue(ret)
@@ -189,27 +180,27 @@ class ApplicationServiceApi(SimpleHttpClient):
             uri = "%s%s/thirdparty/protocol/%s" % (
                 service.url,
                 APP_SERVICE_PREFIX,
-                urllib.parse.quote(protocol)
+                urllib.parse.quote(protocol),
             )
             try:
                 info = yield self.get_json(uri, {})
 
                 if not _is_valid_3pe_metadata(info):
-                    logger.warning("query_3pe_protocol to %s did not return a"
-                                   " valid result", uri)
+                    logger.warning(
+                        "query_3pe_protocol to %s did not return a" " valid result", uri
+                    )
                     defer.returnValue(None)
 
                 for instance in info.get("instances", []):
                     network_id = instance.get("network_id", None)
                     if network_id is not None:
                         instance["instance_id"] = ThirdPartyInstanceID(
-                            service.id, network_id,
+                            service.id, network_id
                         ).to_string()
 
                 defer.returnValue(info)
             except Exception as ex:
-                logger.warning("query_3pe_protocol to %s threw exception %s",
-                               uri, ex)
+                logger.warning("query_3pe_protocol to %s threw exception %s", uri, ex)
                 defer.returnValue(None)
 
         key = (service.id, protocol)
@@ -223,22 +214,19 @@ class ApplicationServiceApi(SimpleHttpClient):
         events = self._serialize(events)
 
         if txn_id is None:
-            logger.warning("push_bulk: Missing txn ID sending events to %s",
-                           service.url)
+            logger.warning(
+                "push_bulk: Missing txn ID sending events to %s", service.url
+            )
             txn_id = str(0)
         txn_id = str(txn_id)
 
-        uri = service.url + ("/transactions/%s" %
-                             urllib.parse.quote(txn_id))
+        uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id))
         try:
             yield self.put_json(
                 uri=uri,
-                json_body={
-                    "events": events
-                },
-                args={
-                    "access_token": service.hs_token
-                })
+                json_body={"events": events},
+                args={"access_token": service.hs_token},
+            )
             sent_transactions_counter.labels(service.id).inc()
             sent_events_counter.labels(service.id).inc(len(events))
             defer.returnValue(True)
@@ -252,6 +240,4 @@ class ApplicationServiceApi(SimpleHttpClient):
 
     def _serialize(self, events):
         time_now = self.clock.time_msec()
-        return [
-            serialize_event(e, time_now, as_client_event=True) for e in events
-        ]
+        return [serialize_event(e, time_now, as_client_event=True) for e in events]
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 685f15c061..b54bf5411f 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -112,15 +112,14 @@ class _ServiceQueuer(object):
             return
 
         run_as_background_process(
-            "as-sender-%s" % (service.id, ),
-            self._send_request, service,
+            "as-sender-%s" % (service.id,), self._send_request, service
         )
 
     @defer.inlineCallbacks
     def _send_request(self, service):
         # sanity-check: we shouldn't get here if this service already has a sender
         # running.
-        assert(service.id not in self.requests_in_flight)
+        assert service.id not in self.requests_in_flight
 
         self.requests_in_flight.add(service.id)
         try:
@@ -137,7 +136,6 @@ class _ServiceQueuer(object):
 
 
 class _TransactionController(object):
-
     def __init__(self, clock, store, as_api, recoverer_fn):
         self.clock = clock
         self.store = store
@@ -149,10 +147,7 @@ class _TransactionController(object):
     @defer.inlineCallbacks
     def send(self, service, events):
         try:
-            txn = yield self.store.create_appservice_txn(
-                service=service,
-                events=events
-            )
+            txn = yield self.store.create_appservice_txn(service=service, events=events)
             service_is_up = yield self._is_service_up(service)
             if service_is_up:
                 sent = yield txn.send(self.as_api)
@@ -167,12 +162,12 @@ class _TransactionController(object):
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):
         self.recoverers.remove(recoverer)
-        logger.info("Successfully recovered application service AS ID %s",
-                    recoverer.service.id)
+        logger.info(
+            "Successfully recovered application service AS ID %s", recoverer.service.id
+        )
         logger.info("Remaining active recoverers: %s", len(self.recoverers))
         yield self.store.set_appservice_state(
-            recoverer.service,
-            ApplicationServiceState.UP
+            recoverer.service, ApplicationServiceState.UP
         )
 
     def add_recoverers(self, recoverers):
@@ -184,13 +179,10 @@ class _TransactionController(object):
     @defer.inlineCallbacks
     def _start_recoverer(self, service):
         try:
-            yield self.store.set_appservice_state(
-                service,
-                ApplicationServiceState.DOWN
-            )
+            yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
             logger.info(
                 "Application service falling behind. Starting recoverer. AS ID %s",
-                service.id
+                service.id,
             )
             recoverer = self.recoverer_fn(service, self.on_recovered)
             self.add_recoverers([recoverer])
@@ -205,19 +197,16 @@ class _TransactionController(object):
 
 
 class _Recoverer(object):
-
     @staticmethod
     @defer.inlineCallbacks
     def start(clock, store, as_api, callback):
-        services = yield store.get_appservices_by_state(
-            ApplicationServiceState.DOWN
-        )
-        recoverers = [
-            _Recoverer(clock, store, as_api, s, callback) for s in services
-        ]
+        services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
+        recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
         for r in recoverers:
-            logger.info("Starting recoverer for AS ID %s which was marked as "
-                        "DOWN", r.service.id)
+            logger.info(
+                "Starting recoverer for AS ID %s which was marked as " "DOWN",
+                r.service.id,
+            )
             r.recover()
         defer.returnValue(recoverers)
 
@@ -232,9 +221,9 @@ class _Recoverer(object):
     def recover(self):
         def _retry():
             run_as_background_process(
-                "as-recoverer-%s" % (self.service.id,),
-                self.retry,
+                "as-recoverer-%s" % (self.service.id,), self.retry
             )
+
         self.clock.call_later((2 ** self.backoff_counter), _retry)
 
     def _backoff(self):
@@ -248,8 +237,9 @@ class _Recoverer(object):
         try:
             txn = yield self.store.get_oldest_unsent_txn(self.service)
             if txn:
-                logger.info("Retrying transaction %s for AS ID %s",
-                            txn.id, txn.service.id)
+                logger.info(
+                    "Retrying transaction %s for AS ID %s", txn.id, txn.service.id
+                )
                 sent = yield txn.send(self.as_api)
                 if sent:
                     yield txn.complete(self.store)