diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/app/__init__.py | 4 | ||||
-rw-r--r-- | synapse/app/_base.py | 77 | ||||
-rw-r--r-- | synapse/app/appservice.py | 29 | ||||
-rw-r--r-- | synapse/app/client_reader.py | 33 | ||||
-rw-r--r-- | synapse/app/event_creator.py | 39 | ||||
-rw-r--r-- | synapse/app/federation_reader.py | 38 | ||||
-rw-r--r-- | synapse/app/federation_sender.py | 46 | ||||
-rw-r--r-- | synapse/app/frontend_proxy.py | 69 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 163 | ||||
-rw-r--r-- | synapse/app/media_repository.py | 37 | ||||
-rw-r--r-- | synapse/app/pusher.py | 60 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 90 | ||||
-rw-r--r-- | synapse/app/user_dir.py | 45 | ||||
-rw-r--r-- | synapse/appservice/__init__.py | 39 | ||||
-rw-r--r-- | synapse/appservice/api.py | 66 | ||||
-rw-r--r-- | synapse/appservice/scheduler.py | 50 |
16 files changed, 429 insertions, 456 deletions
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py index f56f5fcc13..d877c77834 100644 --- a/synapse/app/__init__.py +++ b/synapse/app/__init__.py @@ -43,7 +43,7 @@ def check_bind_error(e, address, bind_addresses): address (str): Address on which binding was attempted. bind_addresses (list): Addresses on which the service listens. """ - if address == '0.0.0.0' and '::' in bind_addresses: - logger.warn('Failed to listen on 0.0.0.0, continuing because listening on [::]') + if address == "0.0.0.0" and "::" in bind_addresses: + logger.warn("Failed to listen on 0.0.0.0, continuing because listening on [::]") else: raise e diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 8cc990399f..df4c2d4c97 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -75,14 +75,14 @@ def start_worker_reactor(appname, config): def start_reactor( - appname, - soft_file_limit, - gc_thresholds, - pid_file, - daemonize, - cpu_affinity, - print_pidfile, - logger, + appname, + soft_file_limit, + gc_thresholds, + pid_file, + daemonize, + cpu_affinity, + print_pidfile, + logger, ): """ Run the reactor in the main process @@ -149,10 +149,10 @@ def start_reactor( def quit_with_error(error_string): message_lines = error_string.split("\n") line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2 - sys.stderr.write("*" * line_length + '\n') + sys.stderr.write("*" * line_length + "\n") for line in message_lines: sys.stderr.write(" %s\n" % (line.rstrip(),)) - sys.stderr.write("*" * line_length + '\n') + sys.stderr.write("*" * line_length + "\n") sys.exit(1) @@ -178,14 +178,7 @@ def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50): r = [] for address in bind_addresses: try: - r.append( - reactor.listenTCP( - port, - factory, - backlog, - address - ) - ) + r.append(reactor.listenTCP(port, factory, backlog, address)) except error.CannotListenError as e: check_bind_error(e, address, bind_addresses) @@ -205,13 +198,7 @@ def listen_ssl( for address in bind_addresses: try: r.append( - reactor.listenSSL( - port, - factory, - context_factory, - backlog, - address - ) + reactor.listenSSL(port, factory, context_factory, backlog, address) ) except error.CannotListenError as e: check_bind_error(e, address, bind_addresses) @@ -243,15 +230,13 @@ def refresh_certificate(hs): if isinstance(i.factory, TLSMemoryBIOFactory): addr = i.getHost() logger.info( - "Replacing TLS context factory on [%s]:%i", addr.host, addr.port, + "Replacing TLS context factory on [%s]:%i", addr.host, addr.port ) # We want to replace TLS factories with a new one, with the new # TLS configuration. We do this by reaching in and pulling out # the wrappedFactory, and then re-wrapping it. i.factory = TLSMemoryBIOFactory( - hs.tls_server_context_factory, - False, - i.factory.wrappedFactory + hs.tls_server_context_factory, False, i.factory.wrappedFactory ) logger.info("Context factories updated.") @@ -267,6 +252,7 @@ def start(hs, listeners=None): try: # Set up the SIGHUP machinery. if hasattr(signal, "SIGHUP"): + def handle_sighup(*args, **kwargs): for i in _sighup_callbacks: i(hs) @@ -302,10 +288,8 @@ def setup_sentry(hs): return import sentry_sdk - sentry_sdk.init( - dsn=hs.config.sentry_dsn, - release=get_version_string(synapse), - ) + + sentry_sdk.init(dsn=hs.config.sentry_dsn, release=get_version_string(synapse)) # We set some default tags that give some context to this instance with sentry_sdk.configure_scope() as scope: @@ -326,7 +310,7 @@ def install_dns_limiter(reactor, max_dns_requests_in_flight=100): many DNS queries at once """ new_resolver = _LimitedHostnameResolver( - reactor.nameResolver, max_dns_requests_in_flight, + reactor.nameResolver, max_dns_requests_in_flight ) reactor.installNameResolver(new_resolver) @@ -339,11 +323,17 @@ class _LimitedHostnameResolver(object): def __init__(self, resolver, max_dns_requests_in_flight): self._resolver = resolver self._limiter = Linearizer( - name="dns_client_limiter", max_count=max_dns_requests_in_flight, + name="dns_client_limiter", max_count=max_dns_requests_in_flight ) - def resolveHostName(self, resolutionReceiver, hostName, portNumber=0, - addressTypes=None, transportSemantics='TCP'): + def resolveHostName( + self, + resolutionReceiver, + hostName, + portNumber=0, + addressTypes=None, + transportSemantics="TCP", + ): # We need this function to return `resolutionReceiver` so we do all the # actual logic involving deferreds in a separate function. @@ -363,8 +353,14 @@ class _LimitedHostnameResolver(object): return resolutionReceiver @defer.inlineCallbacks - def _resolve(self, resolutionReceiver, hostName, portNumber=0, - addressTypes=None, transportSemantics='TCP'): + def _resolve( + self, + resolutionReceiver, + hostName, + portNumber=0, + addressTypes=None, + transportSemantics="TCP", + ): with (yield self._limiter.queue(())): # resolveHostName doesn't return a Deferred, so we need to hook into @@ -374,8 +370,7 @@ class _LimitedHostnameResolver(object): receiver = _DeferredResolutionReceiver(resolutionReceiver, deferred) self._resolver.resolveHostName( - receiver, hostName, portNumber, - addressTypes, transportSemantics, + receiver, hostName, portNumber, addressTypes, transportSemantics ) yield deferred diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 33107f56d1..9120bdb143 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -44,7 +44,9 @@ logger = logging.getLogger("synapse.app.appservice") class AppserviceSlaveStore( - DirectoryStore, SlavedEventStore, SlavedApplicationServiceStore, + DirectoryStore, + SlavedEventStore, + SlavedApplicationServiceStore, SlavedRegistrationStore, ): pass @@ -74,7 +76,7 @@ class AppserviceServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse appservice now listening on port %d", port) @@ -88,18 +90,19 @@ class AppserviceServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -132,9 +135,7 @@ class ASReplicationHandler(ReplicationClientHandler): def start(config_options): try: - config = HomeServerConfig.load_config( - "Synapse appservice", config_options - ) + config = HomeServerConfig.load_config("Synapse appservice", config_options) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) @@ -173,6 +174,6 @@ def start(config_options): _base.start_worker_reactor("synapse-appservice", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index a16e037f32..90bc79cdda 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -37,6 +37,7 @@ from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.groups import SlavedGroupServerStore from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.profile import SlavedProfileStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore @@ -52,6 +53,7 @@ from synapse.rest.client.v1.room import ( PublicRoomListRestServlet, RoomEventContextServlet, RoomMemberListRestServlet, + RoomMessageListRestServlet, RoomStateRestServlet, ) from synapse.rest.client.v1.voip import VoipRestServlet @@ -74,6 +76,7 @@ class ClientReaderSlavedStore( SlavedDeviceStore, SlavedReceiptsStore, SlavedPushRuleStore, + SlavedGroupServerStore, SlavedAccountDataStore, SlavedEventStore, SlavedKeyStore, @@ -109,6 +112,7 @@ class ClientReaderServer(HomeServer): JoinedRoomMemberListRestServlet(self).register(resource) RoomStateRestServlet(self).register(resource) RoomEventContextServlet(self).register(resource) + RoomMessageListRestServlet(self).register(resource) RegisterRestServlet(self).register(resource) LoginRestServlet(self).register(resource) ThreepidRestServlet(self).register(resource) @@ -118,9 +122,7 @@ class ClientReaderServer(HomeServer): PushRuleRestServlet(self).register(resource) VersionsRestServlet().register(resource) - resources.update({ - "/_matrix/client": resource, - }) + resources.update({"/_matrix/client": resource}) root_resource = create_resource_tree(resources, NoResource()) @@ -133,7 +135,7 @@ class ClientReaderServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse client reader now listening on port %d", port) @@ -147,18 +149,19 @@ class ClientReaderServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -170,9 +173,7 @@ class ClientReaderServer(HomeServer): def start(config_options): try: - config = HomeServerConfig.load_config( - "Synapse client reader", config_options - ) + config = HomeServerConfig.load_config("Synapse client reader", config_options) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) @@ -199,6 +200,6 @@ def start(config_options): _base.start_worker_reactor("synapse-client-reader", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index b8e5196152..ff522e4499 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -109,12 +109,14 @@ class EventCreatorServer(HomeServer): ProfileAvatarURLRestServlet(self).register(resource) ProfileDisplaynameRestServlet(self).register(resource) ProfileRestServlet(self).register(resource) - resources.update({ - "/_matrix/client/r0": resource, - "/_matrix/client/unstable": resource, - "/_matrix/client/v2_alpha": resource, - "/_matrix/client/api/v1": resource, - }) + resources.update( + { + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + } + ) root_resource = create_resource_tree(resources, NoResource()) @@ -127,7 +129,7 @@ class EventCreatorServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse event creator now listening on port %d", port) @@ -141,18 +143,19 @@ class EventCreatorServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -164,9 +167,7 @@ class EventCreatorServer(HomeServer): def start(config_options): try: - config = HomeServerConfig.load_config( - "Synapse event creator", config_options - ) + config = HomeServerConfig.load_config("Synapse event creator", config_options) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) @@ -198,6 +199,6 @@ def start(config_options): _base.start_worker_reactor("synapse-event-creator", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 7da79dc827..9421420930 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -86,19 +86,18 @@ class FederationReaderServer(HomeServer): if name == "metrics": resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "federation": - resources.update({ - FEDERATION_PREFIX: TransportLayerServer(self), - }) + resources.update({FEDERATION_PREFIX: TransportLayerServer(self)}) if name == "openid" and "federation" not in res["names"]: # Only load the openid resource separately if federation resource # is not specified since federation resource includes openid # resource. - resources.update({ - FEDERATION_PREFIX: TransportLayerServer( - self, - servlet_groups=["openid"], - ), - }) + resources.update( + { + FEDERATION_PREFIX: TransportLayerServer( + self, servlet_groups=["openid"] + ) + } + ) if name in ["keys", "federation"]: resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self) @@ -115,7 +114,7 @@ class FederationReaderServer(HomeServer): root_resource, self.version_string, ), - reactor=self.get_reactor() + reactor=self.get_reactor(), ) logger.info("Synapse federation reader now listening on port %d", port) @@ -129,18 +128,19 @@ class FederationReaderServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -181,6 +181,6 @@ def start(config_options): _base.start_worker_reactor("synapse-federation-reader", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 1d43f2b075..969be58d0b 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -52,8 +52,13 @@ logger = logging.getLogger("synapse.app.federation_sender") class FederationSenderSlaveStore( - SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore, - SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore, + SlavedDeviceInboxStore, + SlavedTransactionStore, + SlavedReceiptsStore, + SlavedEventStore, + SlavedRegistrationStore, + SlavedDeviceStore, + SlavedPresenceStore, ): def __init__(self, db_conn, hs): super(FederationSenderSlaveStore, self).__init__(db_conn, hs) @@ -65,10 +70,7 @@ class FederationSenderSlaveStore( self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) def _get_federation_out_pos(self, db_conn): - sql = ( - "SELECT stream_id FROM federation_stream_position" - " WHERE type = ?" - ) + sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?" sql = self.database_engine.convert_param_style(sql) txn = db_conn.cursor() @@ -103,7 +105,7 @@ class FederationSenderServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse federation_sender now listening on port %d", port) @@ -117,18 +119,19 @@ class FederationSenderServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -151,7 +154,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler): self.send_handler.process_replication_rows(stream_name, token, rows) def get_streams_to_replicate(self): - args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate() + args = super( + FederationSenderReplicationHandler, self + ).get_streams_to_replicate() args.update(self.send_handler.stream_positions()) return args @@ -203,6 +208,7 @@ class FederationSenderHandler(object): """Processes the replication stream and forwards the appropriate entries to the federation sender. """ + def __init__(self, hs, replication_client): self.store = hs.get_datastore() self._is_mine_id = hs.is_mine_id @@ -241,7 +247,7 @@ class FederationSenderHandler(object): # ... and when new receipts happen elif stream_name == ReceiptsStream.NAME: run_as_background_process( - "process_receipts_for_federation", self._on_new_receipts, rows, + "process_receipts_for_federation", self._on_new_receipts, rows ) @defer.inlineCallbacks @@ -278,12 +284,14 @@ class FederationSenderHandler(object): # We ACK this token over replication so that the master can drop # its in memory queues - self.replication_client.send_federation_ack(self.federation_position) + self.replication_client.send_federation_ack( + self.federation_position + ) self._last_ack = self.federation_position except Exception: logger.exception("Error updating federation stream position") -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 6504da5278..2fd7d57ebf 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -62,14 +62,11 @@ class PresenceStatusStubServlet(RestServlet): # Pass through the auth headers, if any, in case the access token # is there. auth_headers = request.requestHeaders.getRawHeaders("Authorization", []) - headers = { - "Authorization": auth_headers, - } + headers = {"Authorization": auth_headers} try: result = yield self.http_client.get_json( - self.main_uri + request.uri.decode('ascii'), - headers=headers, + self.main_uri + request.uri.decode("ascii"), headers=headers ) except HttpResponseException as e: raise e.to_synapse_error() @@ -105,18 +102,19 @@ class KeyUploadServlet(RestServlet): if device_id is not None: # passing the device_id here is deprecated; however, we allow it # for now for compatibility with older clients. - if (requester.device_id is not None and - device_id != requester.device_id): - logger.warning("Client uploading keys for a different device " - "(logged in as %s, uploading for %s)", - requester.device_id, device_id) + if requester.device_id is not None and device_id != requester.device_id: + logger.warning( + "Client uploading keys for a different device " + "(logged in as %s, uploading for %s)", + requester.device_id, + device_id, + ) else: device_id = requester.device_id if device_id is None: raise SynapseError( - 400, - "To upload keys, you must pass device_id when authenticating" + 400, "To upload keys, you must pass device_id when authenticating" ) if body: @@ -124,13 +122,9 @@ class KeyUploadServlet(RestServlet): # Pass through the auth headers, if any, in case the access token # is there. auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", []) - headers = { - "Authorization": auth_headers, - } + headers = {"Authorization": auth_headers} result = yield self.http_client.post_json_get_json( - self.main_uri + request.uri.decode('ascii'), - body, - headers=headers, + self.main_uri + request.uri.decode("ascii"), body, headers=headers ) defer.returnValue((200, result)) @@ -171,12 +165,14 @@ class FrontendProxyServer(HomeServer): if not self.config.use_presence: PresenceStatusStubServlet(self).register(resource) - resources.update({ - "/_matrix/client/r0": resource, - "/_matrix/client/unstable": resource, - "/_matrix/client/v2_alpha": resource, - "/_matrix/client/api/v1": resource, - }) + resources.update( + { + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + } + ) root_resource = create_resource_tree(resources, NoResource()) @@ -190,7 +186,7 @@ class FrontendProxyServer(HomeServer): root_resource, self.version_string, ), - reactor=self.get_reactor() + reactor=self.get_reactor(), ) logger.info("Synapse client reader now listening on port %d", port) @@ -204,18 +200,19 @@ class FrontendProxyServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -227,9 +224,7 @@ class FrontendProxyServer(HomeServer): def start(config_options): try: - config = HomeServerConfig.load_config( - "Synapse frontend proxy", config_options - ) + config = HomeServerConfig.load_config("Synapse frontend proxy", config_options) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) @@ -258,6 +253,6 @@ def start(config_options): _base.start_worker_reactor("synapse-frontend-proxy", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index b27b12e73d..d19c7c7d71 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -101,13 +101,12 @@ class SynapseHomeServer(HomeServer): # Skip loading openid resource if federation is defined # since federation resource will include openid continue - resources.update(self._configure_named_resource( - name, res.get("compress", False), - )) + resources.update( + self._configure_named_resource(name, res.get("compress", False)) + ) additional_resources = listener_config.get("additional_resources", {}) - logger.debug("Configuring additional resources: %r", - additional_resources) + logger.debug("Configuring additional resources: %r", additional_resources) module_api = ModuleApi(self, self.get_auth_handler()) for path, resmodule in additional_resources.items(): handler_cls, config = load_module(resmodule) @@ -174,59 +173,67 @@ class SynapseHomeServer(HomeServer): if compress: client_resource = gz_wrap(client_resource) - resources.update({ - "/_matrix/client/api/v1": client_resource, - "/_matrix/client/r0": client_resource, - "/_matrix/client/unstable": client_resource, - "/_matrix/client/v2_alpha": client_resource, - "/_matrix/client/versions": client_resource, - "/.well-known/matrix/client": WellKnownResource(self), - "/_synapse/admin": AdminRestResource(self), - }) + resources.update( + { + "/_matrix/client/api/v1": client_resource, + "/_matrix/client/r0": client_resource, + "/_matrix/client/unstable": client_resource, + "/_matrix/client/v2_alpha": client_resource, + "/_matrix/client/versions": client_resource, + "/.well-known/matrix/client": WellKnownResource(self), + "/_synapse/admin": AdminRestResource(self), + } + ) if self.get_config().saml2_enabled: from synapse.rest.saml2 import SAML2Resource + resources["/_matrix/saml2"] = SAML2Resource(self) if name == "consent": from synapse.rest.consent.consent_resource import ConsentResource + consent_resource = ConsentResource(self) if compress: consent_resource = gz_wrap(consent_resource) - resources.update({ - "/_matrix/consent": consent_resource, - }) + resources.update({"/_matrix/consent": consent_resource}) if name == "federation": - resources.update({ - FEDERATION_PREFIX: TransportLayerServer(self), - }) + resources.update({FEDERATION_PREFIX: TransportLayerServer(self)}) if name == "openid": - resources.update({ - FEDERATION_PREFIX: TransportLayerServer(self, servlet_groups=["openid"]), - }) + resources.update( + { + FEDERATION_PREFIX: TransportLayerServer( + self, servlet_groups=["openid"] + ) + } + ) if name in ["static", "client"]: - resources.update({ - STATIC_PREFIX: File( - os.path.join(os.path.dirname(synapse.__file__), "static") - ), - }) + resources.update( + { + STATIC_PREFIX: File( + os.path.join(os.path.dirname(synapse.__file__), "static") + ) + } + ) if name in ["media", "federation", "client"]: if self.get_config().enable_media_repo: media_repo = self.get_media_repository_resource() - resources.update({ - MEDIA_PREFIX: media_repo, - LEGACY_MEDIA_PREFIX: media_repo, - CONTENT_REPO_PREFIX: ContentRepoResource( - self, self.config.uploads_path - ), - }) + resources.update( + { + MEDIA_PREFIX: media_repo, + LEGACY_MEDIA_PREFIX: media_repo, + CONTENT_REPO_PREFIX: ContentRepoResource( + self, self.config.uploads_path + ), + } + ) elif name == "media": raise ConfigError( - "'media' resource conflicts with enable_media_repo=False", + "'media' resource conflicts with enable_media_repo=False" ) if name in ["keys", "federation"]: @@ -257,18 +264,14 @@ class SynapseHomeServer(HomeServer): for listener in listeners: if listener["type"] == "http": - self._listening_services.extend( - self._listener_http(config, listener) - ) + self._listening_services.extend(self._listener_http(config, listener)) elif listener["type"] == "manhole": listen_tcp( listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "replication": services = listen_tcp( @@ -277,16 +280,17 @@ class SynapseHomeServer(HomeServer): ReplicationStreamProtocolFactory(self), ) for s in services: - reactor.addSystemEventTrigger( - "before", "shutdown", s.stopListening, - ) + reactor.addSystemEventTrigger("before", "shutdown", s.stopListening) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -312,7 +316,7 @@ current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU") max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit") registered_reserved_users_mau_gauge = Gauge( "synapse_admin_mau:registered_reserved_users", - "Registered users with reserved threepids" + "Registered users with reserved threepids", ) @@ -327,8 +331,7 @@ def setup(config_options): """ try: config = HomeServerConfig.load_or_generate_config( - "Synapse Homeserver", - config_options, + "Synapse Homeserver", config_options ) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") @@ -339,10 +342,7 @@ def setup(config_options): # generating config files and shouldn't try to continue. sys.exit(0) - synapse.config.logger.setup_logging( - config, - use_worker_options=False - ) + synapse.config.logger.setup_logging(config, use_worker_options=False) events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -357,7 +357,7 @@ def setup(config_options): database_engine=database_engine, ) - logger.info("Preparing database: %s...", config.database_config['name']) + logger.info("Preparing database: %s...", config.database_config["name"]) try: with hs.get_db_conn(run_new_connection=False) as db_conn: @@ -375,7 +375,7 @@ def setup(config_options): ) sys.exit(1) - logger.info("Database prepared in %s.", config.database_config['name']) + logger.info("Database prepared in %s.", config.database_config["name"]) hs.setup() hs.setup_master() @@ -391,9 +391,7 @@ def setup(config_options): acme = hs.get_acme_handler() # Check how long the certificate is active for. - cert_days_remaining = hs.config.is_disk_cert_valid( - allow_self_signed=False - ) + cert_days_remaining = hs.config.is_disk_cert_valid(allow_self_signed=False) # We want to reprovision if cert_days_remaining is None (meaning no # certificate exists), or the days remaining number it returns @@ -401,8 +399,8 @@ def setup(config_options): provision = False if ( - cert_days_remaining is None or - cert_days_remaining < hs.config.acme_reprovision_threshold + cert_days_remaining is None + or cert_days_remaining < hs.config.acme_reprovision_threshold ): provision = True @@ -433,10 +431,7 @@ def setup(config_options): yield do_acme() # Check if it needs to be reprovisioned every day. - hs.get_clock().looping_call( - reprovision_acme, - 24 * 60 * 60 * 1000 - ) + hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000) _base.start(hs, config.listeners) @@ -463,6 +458,7 @@ class SynapseService(service.Service): A twisted Service class that will start synapse. Used to run synapse via twistd and a .tac. """ + def __init__(self, config): self.config = config @@ -479,6 +475,7 @@ class SynapseService(service.Service): def run(hs): PROFILE_SYNAPSE = False if PROFILE_SYNAPSE: + def profile(func): from cProfile import Profile from threading import current_thread @@ -489,13 +486,14 @@ def run(hs): func(*args, **kargs) profile.disable() ident = current_thread().ident - profile.dump_stats("/tmp/%s.%s.%i.pstat" % ( - hs.hostname, func.__name__, ident - )) + profile.dump_stats( + "/tmp/%s.%s.%i.pstat" % (hs.hostname, func.__name__, ident) + ) return profiled from twisted.python.threadpool import ThreadPool + ThreadPool._worker = profile(ThreadPool._worker) reactor.run = profile(reactor.run) @@ -541,7 +539,9 @@ def run(hs): stats["daily_active_users"] = yield hs.get_datastore().count_daily_users() stats["monthly_active_users"] = yield hs.get_datastore().count_monthly_users() - stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms() + stats[ + "daily_active_rooms" + ] = yield hs.get_datastore().count_daily_active_rooms() stats["daily_messages"] = yield hs.get_datastore().count_daily_messages() r30_results = yield hs.get_datastore().count_r30_users() @@ -565,8 +565,7 @@ def run(hs): logger.info("Reporting stats to matrix.org: %s" % (stats,)) try: yield hs.get_simple_http_client().put_json( - "https://matrix.org/report-usage-stats/push", - stats + "https://matrix.org/report-usage-stats/push", stats ) except Exception as e: logger.warn("Error reporting stats: %s", e) @@ -581,14 +580,11 @@ def run(hs): logger.info("report_stats can use psutil") stats_process.append(process) except (AttributeError): - logger.warning( - "Unable to read memory/cpu stats. Disabling reporting." - ) + logger.warning("Unable to read memory/cpu stats. Disabling reporting.") def generate_user_daily_visit_stats(): return run_as_background_process( - "generate_user_daily_visits", - hs.get_datastore().generate_user_daily_visits, + "generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits ) # Rather than update on per session basis, batch up the requests. @@ -599,9 +595,9 @@ def run(hs): # monthly active user limiting functionality def reap_monthly_active_users(): return run_as_background_process( - "reap_monthly_active_users", - hs.get_datastore().reap_monthly_active_users, + "reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users ) + clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) reap_monthly_active_users() @@ -619,8 +615,7 @@ def run(hs): def start_generate_monthly_active_users(): return run_as_background_process( - "generate_monthly_active_users", - generate_monthly_active_users, + "generate_monthly_active_users", generate_monthly_active_users ) start_generate_monthly_active_users() @@ -660,5 +655,5 @@ def main(): run(hs) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index d4cc4e9443..cf0e2036c3 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -72,13 +72,15 @@ class MediaRepositoryServer(HomeServer): resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "media": media_repo = self.get_media_repository_resource() - resources.update({ - MEDIA_PREFIX: media_repo, - LEGACY_MEDIA_PREFIX: media_repo, - CONTENT_REPO_PREFIX: ContentRepoResource( - self, self.config.uploads_path - ), - }) + resources.update( + { + MEDIA_PREFIX: media_repo, + LEGACY_MEDIA_PREFIX: media_repo, + CONTENT_REPO_PREFIX: ContentRepoResource( + self, self.config.uploads_path + ), + } + ) root_resource = create_resource_tree(resources, NoResource()) @@ -91,7 +93,7 @@ class MediaRepositoryServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse media repository now listening on port %d", port) @@ -105,18 +107,19 @@ class MediaRepositoryServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -164,6 +167,6 @@ def start(config_options): _base.start_worker_reactor("synapse-media-repository", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index cbf0d67f51..df29ea5ecb 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -46,36 +46,27 @@ logger = logging.getLogger("synapse.app.pusher") class PusherSlaveStore( - SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, - SlavedAccountDataStore + SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, SlavedAccountDataStore ): - update_pusher_last_stream_ordering_and_success = ( - __func__(DataStore.update_pusher_last_stream_ordering_and_success) + update_pusher_last_stream_ordering_and_success = __func__( + DataStore.update_pusher_last_stream_ordering_and_success ) - update_pusher_failing_since = ( - __func__(DataStore.update_pusher_failing_since) - ) + update_pusher_failing_since = __func__(DataStore.update_pusher_failing_since) - update_pusher_last_stream_ordering = ( - __func__(DataStore.update_pusher_last_stream_ordering) + update_pusher_last_stream_ordering = __func__( + DataStore.update_pusher_last_stream_ordering ) - get_throttle_params_by_room = ( - __func__(DataStore.get_throttle_params_by_room) - ) + get_throttle_params_by_room = __func__(DataStore.get_throttle_params_by_room) - set_throttle_params = ( - __func__(DataStore.set_throttle_params) - ) + set_throttle_params = __func__(DataStore.set_throttle_params) - get_time_of_last_push_action_before = ( - __func__(DataStore.get_time_of_last_push_action_before) + get_time_of_last_push_action_before = __func__( + DataStore.get_time_of_last_push_action_before ) - get_profile_displayname = ( - __func__(DataStore.get_profile_displayname) - ) + get_profile_displayname = __func__(DataStore.get_profile_displayname) class PusherServer(HomeServer): @@ -105,7 +96,7 @@ class PusherServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse pusher now listening on port %d", port) @@ -119,18 +110,19 @@ class PusherServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -161,9 +153,7 @@ class PusherReplicationHandler(ReplicationClientHandler): else: yield self.start_pusher(row.user_id, row.app_id, row.pushkey) elif stream_name == "events": - yield self.pusher_pool.on_new_notifications( - token, token, - ) + yield self.pusher_pool.on_new_notifications(token, token) elif stream_name == "receipts": yield self.pusher_pool.on_new_receipts( token, token, set(row.room_id for row in rows) @@ -188,9 +178,7 @@ class PusherReplicationHandler(ReplicationClientHandler): def start(config_options): try: - config = HomeServerConfig.load_config( - "Synapse pusher", config_options - ) + config = HomeServerConfig.load_config("Synapse pusher", config_options) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) @@ -234,6 +222,6 @@ def start(config_options): _base.start_worker_reactor("synapse-pusher", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): ps = start(sys.argv[1:]) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 5388def28a..858949910d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -98,10 +98,7 @@ class SynchrotronPresence(object): self.notifier = hs.get_notifier() active_presence = self.store.take_presence_startup_info() - self.user_to_current_state = { - state.user_id: state - for state in active_presence - } + self.user_to_current_state = {state.user_id: state for state in active_presence} # user_id -> last_sync_ms. Lists the users that have stopped syncing # but we haven't notified the master of that yet @@ -196,17 +193,26 @@ class SynchrotronPresence(object): room_ids_to_states, users_to_states = parties self.notifier.on_new_event( - "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=users_to_states.keys() + "presence_key", + stream_id, + rooms=room_ids_to_states.keys(), + users=users_to_states.keys(), ) @defer.inlineCallbacks def process_replication_rows(self, token, rows): - states = [UserPresenceState( - row.user_id, row.state, row.last_active_ts, - row.last_federation_update_ts, row.last_user_sync_ts, row.status_msg, - row.currently_active - ) for row in rows] + states = [ + UserPresenceState( + row.user_id, + row.state, + row.last_active_ts, + row.last_federation_update_ts, + row.last_user_sync_ts, + row.status_msg, + row.currently_active, + ) + for row in rows + ] for state in states: self.user_to_current_state[state.user_id] = state @@ -217,7 +223,8 @@ class SynchrotronPresence(object): def get_currently_syncing_users(self): if self.hs.config.use_presence: return [ - user_id for user_id, count in iteritems(self.user_to_num_current_syncs) + user_id + for user_id, count in iteritems(self.user_to_num_current_syncs) if count > 0 ] else: @@ -281,12 +288,14 @@ class SynchrotronServer(HomeServer): events.register_servlets(self, resource) InitialSyncRestServlet(self).register(resource) RoomInitialSyncRestServlet(self).register(resource) - resources.update({ - "/_matrix/client/r0": resource, - "/_matrix/client/unstable": resource, - "/_matrix/client/v2_alpha": resource, - "/_matrix/client/api/v1": resource, - }) + resources.update( + { + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + } + ) root_resource = create_resource_tree(resources, NoResource()) @@ -299,7 +308,7 @@ class SynchrotronServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse synchrotron now listening on port %d", port) @@ -313,18 +322,19 @@ class SynchrotronServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -382,40 +392,36 @@ class SyncReplicationHandler(ReplicationClientHandler): ) elif stream_name == "push_rules": self.notifier.on_new_event( - "push_rules_key", token, users=[row.user_id for row in rows], + "push_rules_key", token, users=[row.user_id for row in rows] ) - elif stream_name in ("account_data", "tag_account_data",): + elif stream_name in ("account_data", "tag_account_data"): self.notifier.on_new_event( - "account_data_key", token, users=[row.user_id for row in rows], + "account_data_key", token, users=[row.user_id for row in rows] ) elif stream_name == "receipts": self.notifier.on_new_event( - "receipt_key", token, rooms=[row.room_id for row in rows], + "receipt_key", token, rooms=[row.room_id for row in rows] ) elif stream_name == "typing": self.typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows], + "typing_key", token, rooms=[row.room_id for row in rows] ) elif stream_name == "to_device": entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: - self.notifier.on_new_event( - "to_device_key", token, users=entities, - ) + self.notifier.on_new_event("to_device_key", token, users=entities) elif stream_name == "device_lists": all_room_ids = set() for row in rows: room_ids = yield self.store.get_rooms_for_user(row.user_id) all_room_ids.update(room_ids) - self.notifier.on_new_event( - "device_list_key", token, rooms=all_room_ids, - ) + self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) elif stream_name == "presence": yield self.presence_handler.process_replication_rows(token, rows) elif stream_name == "receipts": self.notifier.on_new_event( - "groups_key", token, users=[row.user_id for row in rows], + "groups_key", token, users=[row.user_id for row in rows] ) except Exception: logger.exception("Error processing replication") @@ -423,9 +429,7 @@ class SyncReplicationHandler(ReplicationClientHandler): def start(config_options): try: - config = HomeServerConfig.load_config( - "Synapse synchrotron", config_options - ) + config = HomeServerConfig.load_config("Synapse synchrotron", config_options) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) @@ -453,6 +457,6 @@ def start(config_options): _base.start_worker_reactor("synapse-synchrotron", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 355f5aa71d..2d9d2e1bbc 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -66,14 +66,16 @@ class UserDirectorySlaveStore( events_max = self._stream_id_gen.get_current_token() curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict( - db_conn, "current_state_delta_stream", + db_conn, + "current_state_delta_stream", entity_column="room_id", stream_column="stream_id", max_value=events_max, # As we share the stream id with events token limit=1000, ) self._curr_state_delta_stream_cache = StreamChangeCache( - "_curr_state_delta_stream_cache", min_curr_state_delta_id, + "_curr_state_delta_stream_cache", + min_curr_state_delta_id, prefilled_cache=curr_state_delta_prefill, ) @@ -110,12 +112,14 @@ class UserDirectoryServer(HomeServer): elif name == "client": resource = JsonResource(self, canonical_json=False) user_directory.register_servlets(self, resource) - resources.update({ - "/_matrix/client/r0": resource, - "/_matrix/client/unstable": resource, - "/_matrix/client/v2_alpha": resource, - "/_matrix/client/api/v1": resource, - }) + resources.update( + { + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + } + ) root_resource = create_resource_tree(resources, NoResource()) @@ -128,7 +132,7 @@ class UserDirectoryServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse user_dir now listening on port %d", port) @@ -142,18 +146,19 @@ class UserDirectoryServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -186,9 +191,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler): def start(config_options): try: - config = HomeServerConfig.load_config( - "Synapse user directory", config_options - ) + config = HomeServerConfig.load_config("Synapse user directory", config_options) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) @@ -227,6 +230,6 @@ def start(config_options): _base.start_worker_reactor("synapse-user-dir", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 57ed8a3ca2..b26a31dd54 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -48,9 +48,7 @@ class AppServiceTransaction(object): A Deferred which resolves to True if the transaction was sent. """ return as_api.push_bulk( - service=self.service, - events=self.events, - txn_id=self.id + service=self.service, events=self.events, txn_id=self.id ) def complete(self, store): @@ -64,10 +62,7 @@ class AppServiceTransaction(object): Returns: A Deferred which resolves to True if the transaction was completed. """ - return store.complete_appservice_txn( - service=self.service, - txn_id=self.id - ) + return store.complete_appservice_txn(service=self.service, txn_id=self.id) class ApplicationService(object): @@ -76,6 +71,7 @@ class ApplicationService(object): Provides methods to check if this service is "interested" in events. """ + NS_USERS = "users" NS_ALIASES = "aliases" NS_ROOMS = "rooms" @@ -84,9 +80,19 @@ class ApplicationService(object): # values. NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS] - def __init__(self, token, hostname, url=None, namespaces=None, hs_token=None, - sender=None, id=None, protocols=None, rate_limited=True, - ip_range_whitelist=None): + def __init__( + self, + token, + hostname, + url=None, + namespaces=None, + hs_token=None, + sender=None, + id=None, + protocols=None, + rate_limited=True, + ip_range_whitelist=None, + ): self.token = token self.url = url self.hs_token = hs_token @@ -128,9 +134,7 @@ class ApplicationService(object): if not isinstance(regex_obj, dict): raise ValueError("Expected dict regex for ns '%s'" % ns) if not isinstance(regex_obj.get("exclusive"), bool): - raise ValueError( - "Expected bool for 'exclusive' in ns '%s'" % ns - ) + raise ValueError("Expected bool for 'exclusive' in ns '%s'" % ns) group_id = regex_obj.get("group_id") if group_id: if not isinstance(group_id, str): @@ -153,9 +157,7 @@ class ApplicationService(object): if isinstance(regex, string_types): regex_obj["regex"] = re.compile(regex) # Pre-compile regex else: - raise ValueError( - "Expected string for 'regex' in ns '%s'" % ns - ) + raise ValueError("Expected string for 'regex' in ns '%s'" % ns) return namespaces def _matches_regex(self, test_string, namespace_key): @@ -178,8 +180,9 @@ class ApplicationService(object): if self.is_interested_in_user(event.sender): defer.returnValue(True) # also check m.room.member state key - if (event.type == EventTypes.Member and - self.is_interested_in_user(event.state_key)): + if event.type == EventTypes.Member and self.is_interested_in_user( + event.state_key + ): defer.returnValue(True) if not store: diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 9ccc5a80fc..571881775b 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -32,19 +32,17 @@ logger = logging.getLogger(__name__) sent_transactions_counter = Counter( "synapse_appservice_api_sent_transactions", "Number of /transactions/ requests sent", - ["service"] + ["service"], ) failed_transactions_counter = Counter( "synapse_appservice_api_failed_transactions", "Number of /transactions/ requests that failed to send", - ["service"] + ["service"], ) sent_events_counter = Counter( - "synapse_appservice_api_sent_events", - "Number of events sent to the AS", - ["service"] + "synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"] ) HOUR_IN_MS = 60 * 60 * 1000 @@ -92,8 +90,9 @@ class ApplicationServiceApi(SimpleHttpClient): super(ApplicationServiceApi, self).__init__(hs) self.clock = hs.get_clock() - self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta", - timeout_ms=HOUR_IN_MS) + self.protocol_meta_cache = ResponseCache( + hs, "as_protocol_meta", timeout_ms=HOUR_IN_MS + ) @defer.inlineCallbacks def query_user(self, service, user_id): @@ -102,9 +101,7 @@ class ApplicationServiceApi(SimpleHttpClient): uri = service.url + ("/users/%s" % urllib.parse.quote(user_id)) response = None try: - response = yield self.get_json(uri, { - "access_token": service.hs_token - }) + response = yield self.get_json(uri, {"access_token": service.hs_token}) if response is not None: # just an empty json object defer.returnValue(True) except CodeMessageException as e: @@ -123,9 +120,7 @@ class ApplicationServiceApi(SimpleHttpClient): uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias)) response = None try: - response = yield self.get_json(uri, { - "access_token": service.hs_token - }) + response = yield self.get_json(uri, {"access_token": service.hs_token}) if response is not None: # just an empty json object defer.returnValue(True) except CodeMessageException as e: @@ -144,9 +139,7 @@ class ApplicationServiceApi(SimpleHttpClient): elif kind == ThirdPartyEntityKind.LOCATION: required_field = "alias" else: - raise ValueError( - "Unrecognised 'kind' argument %r to query_3pe()", kind - ) + raise ValueError("Unrecognised 'kind' argument %r to query_3pe()", kind) if service.url is None: defer.returnValue([]) @@ -154,14 +147,13 @@ class ApplicationServiceApi(SimpleHttpClient): service.url, APP_SERVICE_PREFIX, kind, - urllib.parse.quote(protocol) + urllib.parse.quote(protocol), ) try: response = yield self.get_json(uri, fields) if not isinstance(response, list): logger.warning( - "query_3pe to %s returned an invalid response %r", - uri, response + "query_3pe to %s returned an invalid response %r", uri, response ) defer.returnValue([]) @@ -171,8 +163,7 @@ class ApplicationServiceApi(SimpleHttpClient): ret.append(r) else: logger.warning( - "query_3pe to %s returned an invalid result %r", - uri, r + "query_3pe to %s returned an invalid result %r", uri, r ) defer.returnValue(ret) @@ -189,27 +180,27 @@ class ApplicationServiceApi(SimpleHttpClient): uri = "%s%s/thirdparty/protocol/%s" % ( service.url, APP_SERVICE_PREFIX, - urllib.parse.quote(protocol) + urllib.parse.quote(protocol), ) try: info = yield self.get_json(uri, {}) if not _is_valid_3pe_metadata(info): - logger.warning("query_3pe_protocol to %s did not return a" - " valid result", uri) + logger.warning( + "query_3pe_protocol to %s did not return a" " valid result", uri + ) defer.returnValue(None) for instance in info.get("instances", []): network_id = instance.get("network_id", None) if network_id is not None: instance["instance_id"] = ThirdPartyInstanceID( - service.id, network_id, + service.id, network_id ).to_string() defer.returnValue(info) except Exception as ex: - logger.warning("query_3pe_protocol to %s threw exception %s", - uri, ex) + logger.warning("query_3pe_protocol to %s threw exception %s", uri, ex) defer.returnValue(None) key = (service.id, protocol) @@ -223,22 +214,19 @@ class ApplicationServiceApi(SimpleHttpClient): events = self._serialize(events) if txn_id is None: - logger.warning("push_bulk: Missing txn ID sending events to %s", - service.url) + logger.warning( + "push_bulk: Missing txn ID sending events to %s", service.url + ) txn_id = str(0) txn_id = str(txn_id) - uri = service.url + ("/transactions/%s" % - urllib.parse.quote(txn_id)) + uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id)) try: yield self.put_json( uri=uri, - json_body={ - "events": events - }, - args={ - "access_token": service.hs_token - }) + json_body={"events": events}, + args={"access_token": service.hs_token}, + ) sent_transactions_counter.labels(service.id).inc() sent_events_counter.labels(service.id).inc(len(events)) defer.returnValue(True) @@ -252,6 +240,4 @@ class ApplicationServiceApi(SimpleHttpClient): def _serialize(self, events): time_now = self.clock.time_msec() - return [ - serialize_event(e, time_now, as_client_event=True) for e in events - ] + return [serialize_event(e, time_now, as_client_event=True) for e in events] diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 685f15c061..b54bf5411f 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -112,15 +112,14 @@ class _ServiceQueuer(object): return run_as_background_process( - "as-sender-%s" % (service.id, ), - self._send_request, service, + "as-sender-%s" % (service.id,), self._send_request, service ) @defer.inlineCallbacks def _send_request(self, service): # sanity-check: we shouldn't get here if this service already has a sender # running. - assert(service.id not in self.requests_in_flight) + assert service.id not in self.requests_in_flight self.requests_in_flight.add(service.id) try: @@ -137,7 +136,6 @@ class _ServiceQueuer(object): class _TransactionController(object): - def __init__(self, clock, store, as_api, recoverer_fn): self.clock = clock self.store = store @@ -149,10 +147,7 @@ class _TransactionController(object): @defer.inlineCallbacks def send(self, service, events): try: - txn = yield self.store.create_appservice_txn( - service=service, - events=events - ) + txn = yield self.store.create_appservice_txn(service=service, events=events) service_is_up = yield self._is_service_up(service) if service_is_up: sent = yield txn.send(self.as_api) @@ -167,12 +162,12 @@ class _TransactionController(object): @defer.inlineCallbacks def on_recovered(self, recoverer): self.recoverers.remove(recoverer) - logger.info("Successfully recovered application service AS ID %s", - recoverer.service.id) + logger.info( + "Successfully recovered application service AS ID %s", recoverer.service.id + ) logger.info("Remaining active recoverers: %s", len(self.recoverers)) yield self.store.set_appservice_state( - recoverer.service, - ApplicationServiceState.UP + recoverer.service, ApplicationServiceState.UP ) def add_recoverers(self, recoverers): @@ -184,13 +179,10 @@ class _TransactionController(object): @defer.inlineCallbacks def _start_recoverer(self, service): try: - yield self.store.set_appservice_state( - service, - ApplicationServiceState.DOWN - ) + yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN) logger.info( "Application service falling behind. Starting recoverer. AS ID %s", - service.id + service.id, ) recoverer = self.recoverer_fn(service, self.on_recovered) self.add_recoverers([recoverer]) @@ -205,19 +197,16 @@ class _TransactionController(object): class _Recoverer(object): - @staticmethod @defer.inlineCallbacks def start(clock, store, as_api, callback): - services = yield store.get_appservices_by_state( - ApplicationServiceState.DOWN - ) - recoverers = [ - _Recoverer(clock, store, as_api, s, callback) for s in services - ] + services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN) + recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services] for r in recoverers: - logger.info("Starting recoverer for AS ID %s which was marked as " - "DOWN", r.service.id) + logger.info( + "Starting recoverer for AS ID %s which was marked as " "DOWN", + r.service.id, + ) r.recover() defer.returnValue(recoverers) @@ -232,9 +221,9 @@ class _Recoverer(object): def recover(self): def _retry(): run_as_background_process( - "as-recoverer-%s" % (self.service.id,), - self.retry, + "as-recoverer-%s" % (self.service.id,), self.retry ) + self.clock.call_later((2 ** self.backoff_counter), _retry) def _backoff(self): @@ -248,8 +237,9 @@ class _Recoverer(object): try: txn = yield self.store.get_oldest_unsent_txn(self.service) if txn: - logger.info("Retrying transaction %s for AS ID %s", - txn.id, txn.service.id) + logger.info( + "Retrying transaction %s for AS ID %s", txn.id, txn.service.id + ) sent = yield txn.send(self.as_api) if sent: yield txn.complete(self.store) |