diff options
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/__init__.py | 4 | ||||
-rw-r--r-- | synapse/app/_base.py | 107 | ||||
-rw-r--r-- | synapse/app/appservice.py | 29 | ||||
-rw-r--r-- | synapse/app/client_reader.py | 35 | ||||
-rw-r--r-- | synapse/app/event_creator.py | 39 | ||||
-rw-r--r-- | synapse/app/federation_reader.py | 38 | ||||
-rw-r--r-- | synapse/app/federation_sender.py | 46 | ||||
-rw-r--r-- | synapse/app/frontend_proxy.py | 80 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 165 | ||||
-rw-r--r-- | synapse/app/media_repository.py | 37 | ||||
-rw-r--r-- | synapse/app/pusher.py | 60 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 90 | ||||
-rw-r--r-- | synapse/app/user_dir.py | 45 |
13 files changed, 376 insertions, 399 deletions
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py index f56f5fcc13..d877c77834 100644 --- a/synapse/app/__init__.py +++ b/synapse/app/__init__.py @@ -43,7 +43,7 @@ def check_bind_error(e, address, bind_addresses): address (str): Address on which binding was attempted. bind_addresses (list): Addresses on which the service listens. """ - if address == '0.0.0.0' and '::' in bind_addresses: - logger.warn('Failed to listen on 0.0.0.0, continuing because listening on [::]') + if address == "0.0.0.0" and "::" in bind_addresses: + logger.warn("Failed to listen on 0.0.0.0, continuing because listening on [::]") else: raise e diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 08199a5e8d..d50a9840d4 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -19,7 +19,6 @@ import signal import sys import traceback -import psutil from daemonize import Daemonize from twisted.internet import defer, error, reactor @@ -68,21 +67,13 @@ def start_worker_reactor(appname, config): gc_thresholds=config.gc_thresholds, pid_file=config.worker_pid_file, daemonize=config.worker_daemonize, - cpu_affinity=config.worker_cpu_affinity, print_pidfile=config.print_pidfile, logger=logger, ) def start_reactor( - appname, - soft_file_limit, - gc_thresholds, - pid_file, - daemonize, - cpu_affinity, - print_pidfile, - logger, + appname, soft_file_limit, gc_thresholds, pid_file, daemonize, print_pidfile, logger ): """ Run the reactor in the main process @@ -95,7 +86,6 @@ def start_reactor( gc_thresholds: pid_file (str): name of pid file to write to if daemonize is True daemonize (bool): true to run the reactor in a background process - cpu_affinity (int|None): cpu affinity mask print_pidfile (bool): whether to print the pid file, if daemonize is True logger (logging.Logger): logger instance to pass to Daemonize """ @@ -109,20 +99,6 @@ def start_reactor( # between the sentinel and `run` logcontexts. with PreserveLoggingContext(): logger.info("Running") - if cpu_affinity is not None: - # Turn the bitmask into bits, reverse it so we go from 0 up - mask_to_bits = bin(cpu_affinity)[2:][::-1] - - cpus = [] - cpu_num = 0 - - for i in mask_to_bits: - if i == "1": - cpus.append(cpu_num) - cpu_num += 1 - - p = psutil.Process() - p.cpu_affinity(cpus) change_resource_limit(soft_file_limit) if gc_thresholds: @@ -149,10 +125,10 @@ def start_reactor( def quit_with_error(error_string): message_lines = error_string.split("\n") line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2 - sys.stderr.write("*" * line_length + '\n') + sys.stderr.write("*" * line_length + "\n") for line in message_lines: sys.stderr.write(" %s\n" % (line.rstrip(),)) - sys.stderr.write("*" * line_length + '\n') + sys.stderr.write("*" * line_length + "\n") sys.exit(1) @@ -178,14 +154,7 @@ def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50): r = [] for address in bind_addresses: try: - r.append( - reactor.listenTCP( - port, - factory, - backlog, - address - ) - ) + r.append(reactor.listenTCP(port, factory, backlog, address)) except error.CannotListenError as e: check_bind_error(e, address, bind_addresses) @@ -205,13 +174,7 @@ def listen_ssl( for address in bind_addresses: try: r.append( - reactor.listenSSL( - port, - factory, - context_factory, - backlog, - address - ) + reactor.listenSSL(port, factory, context_factory, backlog, address) ) except error.CannotListenError as e: check_bind_error(e, address, bind_addresses) @@ -243,15 +206,13 @@ def refresh_certificate(hs): if isinstance(i.factory, TLSMemoryBIOFactory): addr = i.getHost() logger.info( - "Replacing TLS context factory on [%s]:%i", addr.host, addr.port, + "Replacing TLS context factory on [%s]:%i", addr.host, addr.port ) # We want to replace TLS factories with a new one, with the new # TLS configuration. We do this by reaching in and pulling out # the wrappedFactory, and then re-wrapping it. i.factory = TLSMemoryBIOFactory( - hs.tls_server_context_factory, - False, - i.factory.wrappedFactory + hs.tls_server_context_factory, False, i.factory.wrappedFactory ) logger.info("Context factories updated.") @@ -267,6 +228,7 @@ def start(hs, listeners=None): try: # Set up the SIGHUP machinery. if hasattr(signal, "SIGHUP"): + def handle_sighup(*args, **kwargs): for i in _sighup_callbacks: i(hs) @@ -302,10 +264,8 @@ def setup_sentry(hs): return import sentry_sdk - sentry_sdk.init( - dsn=hs.config.sentry_dsn, - release=get_version_string(synapse), - ) + + sentry_sdk.init(dsn=hs.config.sentry_dsn, release=get_version_string(synapse)) # We set some default tags that give some context to this instance with sentry_sdk.configure_scope() as scope: @@ -326,7 +286,7 @@ def install_dns_limiter(reactor, max_dns_requests_in_flight=100): many DNS queries at once """ new_resolver = _LimitedHostnameResolver( - reactor.nameResolver, max_dns_requests_in_flight, + reactor.nameResolver, max_dns_requests_in_flight ) reactor.installNameResolver(new_resolver) @@ -339,26 +299,44 @@ class _LimitedHostnameResolver(object): def __init__(self, resolver, max_dns_requests_in_flight): self._resolver = resolver self._limiter = Linearizer( - name="dns_client_limiter", max_count=max_dns_requests_in_flight, + name="dns_client_limiter", max_count=max_dns_requests_in_flight ) - def resolveHostName(self, resolutionReceiver, hostName, portNumber=0, - addressTypes=None, transportSemantics='TCP'): - # Note this is happening deep within the reactor, so we don't need to - # worry about log contexts. - + def resolveHostName( + self, + resolutionReceiver, + hostName, + portNumber=0, + addressTypes=None, + transportSemantics="TCP", + ): # We need this function to return `resolutionReceiver` so we do all the # actual logic involving deferreds in a separate function. - self._resolve( - resolutionReceiver, hostName, portNumber, - addressTypes, transportSemantics, - ) + + # even though this is happening within the depths of twisted, we need to drop + # our logcontext before starting _resolve, otherwise: (a) _resolve will drop + # the logcontext if it returns an incomplete deferred; (b) _resolve will + # call the resolutionReceiver *with* a logcontext, which it won't be expecting. + with PreserveLoggingContext(): + self._resolve( + resolutionReceiver, + hostName, + portNumber, + addressTypes, + transportSemantics, + ) return resolutionReceiver @defer.inlineCallbacks - def _resolve(self, resolutionReceiver, hostName, portNumber=0, - addressTypes=None, transportSemantics='TCP'): + def _resolve( + self, + resolutionReceiver, + hostName, + portNumber=0, + addressTypes=None, + transportSemantics="TCP", + ): with (yield self._limiter.queue(())): # resolveHostName doesn't return a Deferred, so we need to hook into @@ -368,8 +346,7 @@ class _LimitedHostnameResolver(object): receiver = _DeferredResolutionReceiver(resolutionReceiver, deferred) self._resolver.resolveHostName( - receiver, hostName, portNumber, - addressTypes, transportSemantics, + receiver, hostName, portNumber, addressTypes, transportSemantics ) yield deferred diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 33107f56d1..9120bdb143 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -44,7 +44,9 @@ logger = logging.getLogger("synapse.app.appservice") class AppserviceSlaveStore( - DirectoryStore, SlavedEventStore, SlavedApplicationServiceStore, + DirectoryStore, + SlavedEventStore, + SlavedApplicationServiceStore, SlavedRegistrationStore, ): pass @@ -74,7 +76,7 @@ class AppserviceServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse appservice now listening on port %d", port) @@ -88,18 +90,19 @@ class AppserviceServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -132,9 +135,7 @@ class ASReplicationHandler(ReplicationClientHandler): def start(config_options): try: - config = HomeServerConfig.load_config( - "Synapse appservice", config_options - ) + config = HomeServerConfig.load_config("Synapse appservice", config_options) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) @@ -173,6 +174,6 @@ def start(config_options): _base.start_worker_reactor("synapse-appservice", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 864f1eac48..90bc79cdda 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -37,7 +37,9 @@ from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.groups import SlavedGroupServerStore from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.profile import SlavedProfileStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore @@ -51,6 +53,7 @@ from synapse.rest.client.v1.room import ( PublicRoomListRestServlet, RoomEventContextServlet, RoomMemberListRestServlet, + RoomMessageListRestServlet, RoomStateRestServlet, ) from synapse.rest.client.v1.voip import VoipRestServlet @@ -73,6 +76,7 @@ class ClientReaderSlavedStore( SlavedDeviceStore, SlavedReceiptsStore, SlavedPushRuleStore, + SlavedGroupServerStore, SlavedAccountDataStore, SlavedEventStore, SlavedKeyStore, @@ -81,6 +85,7 @@ class ClientReaderSlavedStore( SlavedApplicationServiceStore, SlavedRegistrationStore, SlavedTransactionStore, + SlavedProfileStore, SlavedClientIpStore, BaseSlavedStore, ): @@ -107,6 +112,7 @@ class ClientReaderServer(HomeServer): JoinedRoomMemberListRestServlet(self).register(resource) RoomStateRestServlet(self).register(resource) RoomEventContextServlet(self).register(resource) + RoomMessageListRestServlet(self).register(resource) RegisterRestServlet(self).register(resource) LoginRestServlet(self).register(resource) ThreepidRestServlet(self).register(resource) @@ -116,9 +122,7 @@ class ClientReaderServer(HomeServer): PushRuleRestServlet(self).register(resource) VersionsRestServlet().register(resource) - resources.update({ - "/_matrix/client": resource, - }) + resources.update({"/_matrix/client": resource}) root_resource = create_resource_tree(resources, NoResource()) @@ -131,7 +135,7 @@ class ClientReaderServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse client reader now listening on port %d", port) @@ -145,18 +149,19 @@ class ClientReaderServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -168,9 +173,7 @@ class ClientReaderServer(HomeServer): def start(config_options): try: - config = HomeServerConfig.load_config( - "Synapse client reader", config_options - ) + config = HomeServerConfig.load_config("Synapse client reader", config_options) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) @@ -197,6 +200,6 @@ def start(config_options): _base.start_worker_reactor("synapse-client-reader", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index b8e5196152..ff522e4499 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -109,12 +109,14 @@ class EventCreatorServer(HomeServer): ProfileAvatarURLRestServlet(self).register(resource) ProfileDisplaynameRestServlet(self).register(resource) ProfileRestServlet(self).register(resource) - resources.update({ - "/_matrix/client/r0": resource, - "/_matrix/client/unstable": resource, - "/_matrix/client/v2_alpha": resource, - "/_matrix/client/api/v1": resource, - }) + resources.update( + { + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + } + ) root_resource = create_resource_tree(resources, NoResource()) @@ -127,7 +129,7 @@ class EventCreatorServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse event creator now listening on port %d", port) @@ -141,18 +143,19 @@ class EventCreatorServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -164,9 +167,7 @@ class EventCreatorServer(HomeServer): def start(config_options): try: - config = HomeServerConfig.load_config( - "Synapse event creator", config_options - ) + config = HomeServerConfig.load_config("Synapse event creator", config_options) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) @@ -198,6 +199,6 @@ def start(config_options): _base.start_worker_reactor("synapse-event-creator", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 7da79dc827..9421420930 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -86,19 +86,18 @@ class FederationReaderServer(HomeServer): if name == "metrics": resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "federation": - resources.update({ - FEDERATION_PREFIX: TransportLayerServer(self), - }) + resources.update({FEDERATION_PREFIX: TransportLayerServer(self)}) if name == "openid" and "federation" not in res["names"]: # Only load the openid resource separately if federation resource # is not specified since federation resource includes openid # resource. - resources.update({ - FEDERATION_PREFIX: TransportLayerServer( - self, - servlet_groups=["openid"], - ), - }) + resources.update( + { + FEDERATION_PREFIX: TransportLayerServer( + self, servlet_groups=["openid"] + ) + } + ) if name in ["keys", "federation"]: resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self) @@ -115,7 +114,7 @@ class FederationReaderServer(HomeServer): root_resource, self.version_string, ), - reactor=self.get_reactor() + reactor=self.get_reactor(), ) logger.info("Synapse federation reader now listening on port %d", port) @@ -129,18 +128,19 @@ class FederationReaderServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -181,6 +181,6 @@ def start(config_options): _base.start_worker_reactor("synapse-federation-reader", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 1d43f2b075..969be58d0b 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -52,8 +52,13 @@ logger = logging.getLogger("synapse.app.federation_sender") class FederationSenderSlaveStore( - SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore, - SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore, + SlavedDeviceInboxStore, + SlavedTransactionStore, + SlavedReceiptsStore, + SlavedEventStore, + SlavedRegistrationStore, + SlavedDeviceStore, + SlavedPresenceStore, ): def __init__(self, db_conn, hs): super(FederationSenderSlaveStore, self).__init__(db_conn, hs) @@ -65,10 +70,7 @@ class FederationSenderSlaveStore( self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) def _get_federation_out_pos(self, db_conn): - sql = ( - "SELECT stream_id FROM federation_stream_position" - " WHERE type = ?" - ) + sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?" sql = self.database_engine.convert_param_style(sql) txn = db_conn.cursor() @@ -103,7 +105,7 @@ class FederationSenderServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse federation_sender now listening on port %d", port) @@ -117,18 +119,19 @@ class FederationSenderServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -151,7 +154,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler): self.send_handler.process_replication_rows(stream_name, token, rows) def get_streams_to_replicate(self): - args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate() + args = super( + FederationSenderReplicationHandler, self + ).get_streams_to_replicate() args.update(self.send_handler.stream_positions()) return args @@ -203,6 +208,7 @@ class FederationSenderHandler(object): """Processes the replication stream and forwards the appropriate entries to the federation sender. """ + def __init__(self, hs, replication_client): self.store = hs.get_datastore() self._is_mine_id = hs.is_mine_id @@ -241,7 +247,7 @@ class FederationSenderHandler(object): # ... and when new receipts happen elif stream_name == ReceiptsStream.NAME: run_as_background_process( - "process_receipts_for_federation", self._on_new_receipts, rows, + "process_receipts_for_federation", self._on_new_receipts, rows ) @defer.inlineCallbacks @@ -278,12 +284,14 @@ class FederationSenderHandler(object): # We ACK this token over replication so that the master can drop # its in memory queues - self.replication_client.send_federation_ack(self.federation_position) + self.replication_client.send_federation_ack( + self.federation_position + ) self._last_ack = self.federation_position except Exception: logger.exception("Error updating federation stream position") -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 8479fee738..2fd7d57ebf 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -37,8 +37,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns -from synapse.rest.client.v2_alpha._base import client_v2_patterns +from synapse.rest.client.v2_alpha._base import client_patterns from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree @@ -49,11 +48,11 @@ from synapse.util.versionstring import get_version_string logger = logging.getLogger("synapse.app.frontend_proxy") -class PresenceStatusStubServlet(ClientV1RestServlet): - PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status") +class PresenceStatusStubServlet(RestServlet): + PATTERNS = client_patterns("/presence/(?P<user_id>[^/]*)/status") def __init__(self, hs): - super(PresenceStatusStubServlet, self).__init__(hs) + super(PresenceStatusStubServlet, self).__init__() self.http_client = hs.get_simple_http_client() self.auth = hs.get_auth() self.main_uri = hs.config.worker_main_http_uri @@ -63,14 +62,11 @@ class PresenceStatusStubServlet(ClientV1RestServlet): # Pass through the auth headers, if any, in case the access token # is there. auth_headers = request.requestHeaders.getRawHeaders("Authorization", []) - headers = { - "Authorization": auth_headers, - } + headers = {"Authorization": auth_headers} try: result = yield self.http_client.get_json( - self.main_uri + request.uri.decode('ascii'), - headers=headers, + self.main_uri + request.uri.decode("ascii"), headers=headers ) except HttpResponseException as e: raise e.to_synapse_error() @@ -84,7 +80,7 @@ class PresenceStatusStubServlet(ClientV1RestServlet): class KeyUploadServlet(RestServlet): - PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$") + PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$") def __init__(self, hs): """ @@ -106,18 +102,19 @@ class KeyUploadServlet(RestServlet): if device_id is not None: # passing the device_id here is deprecated; however, we allow it # for now for compatibility with older clients. - if (requester.device_id is not None and - device_id != requester.device_id): - logger.warning("Client uploading keys for a different device " - "(logged in as %s, uploading for %s)", - requester.device_id, device_id) + if requester.device_id is not None and device_id != requester.device_id: + logger.warning( + "Client uploading keys for a different device " + "(logged in as %s, uploading for %s)", + requester.device_id, + device_id, + ) else: device_id = requester.device_id if device_id is None: raise SynapseError( - 400, - "To upload keys, you must pass device_id when authenticating" + 400, "To upload keys, you must pass device_id when authenticating" ) if body: @@ -125,13 +122,9 @@ class KeyUploadServlet(RestServlet): # Pass through the auth headers, if any, in case the access token # is there. auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", []) - headers = { - "Authorization": auth_headers, - } + headers = {"Authorization": auth_headers} result = yield self.http_client.post_json_get_json( - self.main_uri + request.uri.decode('ascii'), - body, - headers=headers, + self.main_uri + request.uri.decode("ascii"), body, headers=headers ) defer.returnValue((200, result)) @@ -172,12 +165,14 @@ class FrontendProxyServer(HomeServer): if not self.config.use_presence: PresenceStatusStubServlet(self).register(resource) - resources.update({ - "/_matrix/client/r0": resource, - "/_matrix/client/unstable": resource, - "/_matrix/client/v2_alpha": resource, - "/_matrix/client/api/v1": resource, - }) + resources.update( + { + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + } + ) root_resource = create_resource_tree(resources, NoResource()) @@ -191,7 +186,7 @@ class FrontendProxyServer(HomeServer): root_resource, self.version_string, ), - reactor=self.get_reactor() + reactor=self.get_reactor(), ) logger.info("Synapse client reader now listening on port %d", port) @@ -205,18 +200,19 @@ class FrontendProxyServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -228,9 +224,7 @@ class FrontendProxyServer(HomeServer): def start(config_options): try: - config = HomeServerConfig.load_config( - "Synapse frontend proxy", config_options - ) + config = HomeServerConfig.load_config("Synapse frontend proxy", config_options) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) @@ -259,6 +253,6 @@ def start(config_options): _base.start_worker_reactor("synapse-frontend-proxy", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 1045d28949..49da105cf6 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -101,13 +101,12 @@ class SynapseHomeServer(HomeServer): # Skip loading openid resource if federation is defined # since federation resource will include openid continue - resources.update(self._configure_named_resource( - name, res.get("compress", False), - )) + resources.update( + self._configure_named_resource(name, res.get("compress", False)) + ) additional_resources = listener_config.get("additional_resources", {}) - logger.debug("Configuring additional resources: %r", - additional_resources) + logger.debug("Configuring additional resources: %r", additional_resources) module_api = ModuleApi(self, self.get_auth_handler()) for path, resmodule in additional_resources.items(): handler_cls, config = load_module(resmodule) @@ -174,59 +173,67 @@ class SynapseHomeServer(HomeServer): if compress: client_resource = gz_wrap(client_resource) - resources.update({ - "/_matrix/client/api/v1": client_resource, - "/_matrix/client/r0": client_resource, - "/_matrix/client/unstable": client_resource, - "/_matrix/client/v2_alpha": client_resource, - "/_matrix/client/versions": client_resource, - "/.well-known/matrix/client": WellKnownResource(self), - "/_synapse/admin": AdminRestResource(self), - }) + resources.update( + { + "/_matrix/client/api/v1": client_resource, + "/_matrix/client/r0": client_resource, + "/_matrix/client/unstable": client_resource, + "/_matrix/client/v2_alpha": client_resource, + "/_matrix/client/versions": client_resource, + "/.well-known/matrix/client": WellKnownResource(self), + "/_synapse/admin": AdminRestResource(self), + } + ) if self.get_config().saml2_enabled: from synapse.rest.saml2 import SAML2Resource + resources["/_matrix/saml2"] = SAML2Resource(self) if name == "consent": from synapse.rest.consent.consent_resource import ConsentResource + consent_resource = ConsentResource(self) if compress: consent_resource = gz_wrap(consent_resource) - resources.update({ - "/_matrix/consent": consent_resource, - }) + resources.update({"/_matrix/consent": consent_resource}) if name == "federation": - resources.update({ - FEDERATION_PREFIX: TransportLayerServer(self), - }) + resources.update({FEDERATION_PREFIX: TransportLayerServer(self)}) if name == "openid": - resources.update({ - FEDERATION_PREFIX: TransportLayerServer(self, servlet_groups=["openid"]), - }) + resources.update( + { + FEDERATION_PREFIX: TransportLayerServer( + self, servlet_groups=["openid"] + ) + } + ) if name in ["static", "client"]: - resources.update({ - STATIC_PREFIX: File( - os.path.join(os.path.dirname(synapse.__file__), "static") - ), - }) + resources.update( + { + STATIC_PREFIX: File( + os.path.join(os.path.dirname(synapse.__file__), "static") + ) + } + ) if name in ["media", "federation", "client"]: if self.get_config().enable_media_repo: media_repo = self.get_media_repository_resource() - resources.update({ - MEDIA_PREFIX: media_repo, - LEGACY_MEDIA_PREFIX: media_repo, - CONTENT_REPO_PREFIX: ContentRepoResource( - self, self.config.uploads_path - ), - }) + resources.update( + { + MEDIA_PREFIX: media_repo, + LEGACY_MEDIA_PREFIX: media_repo, + CONTENT_REPO_PREFIX: ContentRepoResource( + self, self.config.uploads_path + ), + } + ) elif name == "media": raise ConfigError( - "'media' resource conflicts with enable_media_repo=False", + "'media' resource conflicts with enable_media_repo=False" ) if name in ["keys", "federation"]: @@ -257,18 +264,14 @@ class SynapseHomeServer(HomeServer): for listener in listeners: if listener["type"] == "http": - self._listening_services.extend( - self._listener_http(config, listener) - ) + self._listening_services.extend(self._listener_http(config, listener)) elif listener["type"] == "manhole": listen_tcp( listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "replication": services = listen_tcp( @@ -277,16 +280,17 @@ class SynapseHomeServer(HomeServer): ReplicationStreamProtocolFactory(self), ) for s in services: - reactor.addSystemEventTrigger( - "before", "shutdown", s.stopListening, - ) + reactor.addSystemEventTrigger("before", "shutdown", s.stopListening) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -312,7 +316,7 @@ current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU") max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit") registered_reserved_users_mau_gauge = Gauge( "synapse_admin_mau:registered_reserved_users", - "Registered users with reserved threepids" + "Registered users with reserved threepids", ) @@ -327,8 +331,7 @@ def setup(config_options): """ try: config = HomeServerConfig.load_or_generate_config( - "Synapse Homeserver", - config_options, + "Synapse Homeserver", config_options ) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") @@ -339,10 +342,7 @@ def setup(config_options): # generating config files and shouldn't try to continue. sys.exit(0) - synapse.config.logger.setup_logging( - config, - use_worker_options=False - ) + synapse.config.logger.setup_logging(config, use_worker_options=False) events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -357,7 +357,7 @@ def setup(config_options): database_engine=database_engine, ) - logger.info("Preparing database: %s...", config.database_config['name']) + logger.info("Preparing database: %s...", config.database_config["name"]) try: with hs.get_db_conn(run_new_connection=False) as db_conn: @@ -375,7 +375,7 @@ def setup(config_options): ) sys.exit(1) - logger.info("Database prepared in %s.", config.database_config['name']) + logger.info("Database prepared in %s.", config.database_config["name"]) hs.setup() hs.setup_master() @@ -391,9 +391,7 @@ def setup(config_options): acme = hs.get_acme_handler() # Check how long the certificate is active for. - cert_days_remaining = hs.config.is_disk_cert_valid( - allow_self_signed=False - ) + cert_days_remaining = hs.config.is_disk_cert_valid(allow_self_signed=False) # We want to reprovision if cert_days_remaining is None (meaning no # certificate exists), or the days remaining number it returns @@ -401,8 +399,8 @@ def setup(config_options): provision = False if ( - cert_days_remaining is None or - cert_days_remaining < hs.config.acme_reprovision_threshold + cert_days_remaining is None + or cert_days_remaining < hs.config.acme_reprovision_threshold ): provision = True @@ -433,10 +431,7 @@ def setup(config_options): yield do_acme() # Check if it needs to be reprovisioned every day. - hs.get_clock().looping_call( - reprovision_acme, - 24 * 60 * 60 * 1000 - ) + hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000) _base.start(hs, config.listeners) @@ -463,6 +458,7 @@ class SynapseService(service.Service): A twisted Service class that will start synapse. Used to run synapse via twistd and a .tac. """ + def __init__(self, config): self.config = config @@ -479,6 +475,7 @@ class SynapseService(service.Service): def run(hs): PROFILE_SYNAPSE = False if PROFILE_SYNAPSE: + def profile(func): from cProfile import Profile from threading import current_thread @@ -489,13 +486,14 @@ def run(hs): func(*args, **kargs) profile.disable() ident = current_thread().ident - profile.dump_stats("/tmp/%s.%s.%i.pstat" % ( - hs.hostname, func.__name__, ident - )) + profile.dump_stats( + "/tmp/%s.%s.%i.pstat" % (hs.hostname, func.__name__, ident) + ) return profiled from twisted.python.threadpool import ThreadPool + ThreadPool._worker = profile(ThreadPool._worker) reactor.run = profile(reactor.run) @@ -540,7 +538,10 @@ def run(hs): stats["total_room_count"] = room_count stats["daily_active_users"] = yield hs.get_datastore().count_daily_users() - stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms() + stats["monthly_active_users"] = yield hs.get_datastore().count_monthly_users() + stats[ + "daily_active_rooms" + ] = yield hs.get_datastore().count_daily_active_rooms() stats["daily_messages"] = yield hs.get_datastore().count_daily_messages() r30_results = yield hs.get_datastore().count_r30_users() @@ -564,8 +565,7 @@ def run(hs): logger.info("Reporting stats to matrix.org: %s" % (stats,)) try: yield hs.get_simple_http_client().put_json( - "https://matrix.org/report-usage-stats/push", - stats + "https://matrix.org/report-usage-stats/push", stats ) except Exception as e: logger.warn("Error reporting stats: %s", e) @@ -580,14 +580,11 @@ def run(hs): logger.info("report_stats can use psutil") stats_process.append(process) except (AttributeError): - logger.warning( - "Unable to read memory/cpu stats. Disabling reporting." - ) + logger.warning("Unable to read memory/cpu stats. Disabling reporting.") def generate_user_daily_visit_stats(): return run_as_background_process( - "generate_user_daily_visits", - hs.get_datastore().generate_user_daily_visits, + "generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits ) # Rather than update on per session basis, batch up the requests. @@ -598,9 +595,9 @@ def run(hs): # monthly active user limiting functionality def reap_monthly_active_users(): return run_as_background_process( - "reap_monthly_active_users", - hs.get_datastore().reap_monthly_active_users, + "reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users ) + clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) reap_monthly_active_users() @@ -618,8 +615,7 @@ def run(hs): def start_generate_monthly_active_users(): return run_as_background_process( - "generate_monthly_active_users", - generate_monthly_active_users, + "generate_monthly_active_users", generate_monthly_active_users ) start_generate_monthly_active_users() @@ -645,7 +641,6 @@ def run(hs): gc_thresholds=hs.config.gc_thresholds, pid_file=hs.config.pid_file, daemonize=hs.config.daemonize, - cpu_affinity=hs.config.cpu_affinity, print_pidfile=hs.config.print_pidfile, logger=logger, ) @@ -659,5 +654,5 @@ def main(): run(hs) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index d4cc4e9443..cf0e2036c3 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -72,13 +72,15 @@ class MediaRepositoryServer(HomeServer): resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "media": media_repo = self.get_media_repository_resource() - resources.update({ - MEDIA_PREFIX: media_repo, - LEGACY_MEDIA_PREFIX: media_repo, - CONTENT_REPO_PREFIX: ContentRepoResource( - self, self.config.uploads_path - ), - }) + resources.update( + { + MEDIA_PREFIX: media_repo, + LEGACY_MEDIA_PREFIX: media_repo, + CONTENT_REPO_PREFIX: ContentRepoResource( + self, self.config.uploads_path + ), + } + ) root_resource = create_resource_tree(resources, NoResource()) @@ -91,7 +93,7 @@ class MediaRepositoryServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse media repository now listening on port %d", port) @@ -105,18 +107,19 @@ class MediaRepositoryServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -164,6 +167,6 @@ def start(config_options): _base.start_worker_reactor("synapse-media-repository", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index cbf0d67f51..df29ea5ecb 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -46,36 +46,27 @@ logger = logging.getLogger("synapse.app.pusher") class PusherSlaveStore( - SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, - SlavedAccountDataStore + SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, SlavedAccountDataStore ): - update_pusher_last_stream_ordering_and_success = ( - __func__(DataStore.update_pusher_last_stream_ordering_and_success) + update_pusher_last_stream_ordering_and_success = __func__( + DataStore.update_pusher_last_stream_ordering_and_success ) - update_pusher_failing_since = ( - __func__(DataStore.update_pusher_failing_since) - ) + update_pusher_failing_since = __func__(DataStore.update_pusher_failing_since) - update_pusher_last_stream_ordering = ( - __func__(DataStore.update_pusher_last_stream_ordering) + update_pusher_last_stream_ordering = __func__( + DataStore.update_pusher_last_stream_ordering ) - get_throttle_params_by_room = ( - __func__(DataStore.get_throttle_params_by_room) - ) + get_throttle_params_by_room = __func__(DataStore.get_throttle_params_by_room) - set_throttle_params = ( - __func__(DataStore.set_throttle_params) - ) + set_throttle_params = __func__(DataStore.set_throttle_params) - get_time_of_last_push_action_before = ( - __func__(DataStore.get_time_of_last_push_action_before) + get_time_of_last_push_action_before = __func__( + DataStore.get_time_of_last_push_action_before ) - get_profile_displayname = ( - __func__(DataStore.get_profile_displayname) - ) + get_profile_displayname = __func__(DataStore.get_profile_displayname) class PusherServer(HomeServer): @@ -105,7 +96,7 @@ class PusherServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse pusher now listening on port %d", port) @@ -119,18 +110,19 @@ class PusherServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -161,9 +153,7 @@ class PusherReplicationHandler(ReplicationClientHandler): else: yield self.start_pusher(row.user_id, row.app_id, row.pushkey) elif stream_name == "events": - yield self.pusher_pool.on_new_notifications( - token, token, - ) + yield self.pusher_pool.on_new_notifications(token, token) elif stream_name == "receipts": yield self.pusher_pool.on_new_receipts( token, token, set(row.room_id for row in rows) @@ -188,9 +178,7 @@ class PusherReplicationHandler(ReplicationClientHandler): def start(config_options): try: - config = HomeServerConfig.load_config( - "Synapse pusher", config_options - ) + config = HomeServerConfig.load_config("Synapse pusher", config_options) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) @@ -234,6 +222,6 @@ def start(config_options): _base.start_worker_reactor("synapse-pusher", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): ps = start(sys.argv[1:]) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 5388def28a..858949910d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -98,10 +98,7 @@ class SynchrotronPresence(object): self.notifier = hs.get_notifier() active_presence = self.store.take_presence_startup_info() - self.user_to_current_state = { - state.user_id: state - for state in active_presence - } + self.user_to_current_state = {state.user_id: state for state in active_presence} # user_id -> last_sync_ms. Lists the users that have stopped syncing # but we haven't notified the master of that yet @@ -196,17 +193,26 @@ class SynchrotronPresence(object): room_ids_to_states, users_to_states = parties self.notifier.on_new_event( - "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=users_to_states.keys() + "presence_key", + stream_id, + rooms=room_ids_to_states.keys(), + users=users_to_states.keys(), ) @defer.inlineCallbacks def process_replication_rows(self, token, rows): - states = [UserPresenceState( - row.user_id, row.state, row.last_active_ts, - row.last_federation_update_ts, row.last_user_sync_ts, row.status_msg, - row.currently_active - ) for row in rows] + states = [ + UserPresenceState( + row.user_id, + row.state, + row.last_active_ts, + row.last_federation_update_ts, + row.last_user_sync_ts, + row.status_msg, + row.currently_active, + ) + for row in rows + ] for state in states: self.user_to_current_state[state.user_id] = state @@ -217,7 +223,8 @@ class SynchrotronPresence(object): def get_currently_syncing_users(self): if self.hs.config.use_presence: return [ - user_id for user_id, count in iteritems(self.user_to_num_current_syncs) + user_id + for user_id, count in iteritems(self.user_to_num_current_syncs) if count > 0 ] else: @@ -281,12 +288,14 @@ class SynchrotronServer(HomeServer): events.register_servlets(self, resource) InitialSyncRestServlet(self).register(resource) RoomInitialSyncRestServlet(self).register(resource) - resources.update({ - "/_matrix/client/r0": resource, - "/_matrix/client/unstable": resource, - "/_matrix/client/v2_alpha": resource, - "/_matrix/client/api/v1": resource, - }) + resources.update( + { + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + } + ) root_resource = create_resource_tree(resources, NoResource()) @@ -299,7 +308,7 @@ class SynchrotronServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse synchrotron now listening on port %d", port) @@ -313,18 +322,19 @@ class SynchrotronServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -382,40 +392,36 @@ class SyncReplicationHandler(ReplicationClientHandler): ) elif stream_name == "push_rules": self.notifier.on_new_event( - "push_rules_key", token, users=[row.user_id for row in rows], + "push_rules_key", token, users=[row.user_id for row in rows] ) - elif stream_name in ("account_data", "tag_account_data",): + elif stream_name in ("account_data", "tag_account_data"): self.notifier.on_new_event( - "account_data_key", token, users=[row.user_id for row in rows], + "account_data_key", token, users=[row.user_id for row in rows] ) elif stream_name == "receipts": self.notifier.on_new_event( - "receipt_key", token, rooms=[row.room_id for row in rows], + "receipt_key", token, rooms=[row.room_id for row in rows] ) elif stream_name == "typing": self.typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows], + "typing_key", token, rooms=[row.room_id for row in rows] ) elif stream_name == "to_device": entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: - self.notifier.on_new_event( - "to_device_key", token, users=entities, - ) + self.notifier.on_new_event("to_device_key", token, users=entities) elif stream_name == "device_lists": all_room_ids = set() for row in rows: room_ids = yield self.store.get_rooms_for_user(row.user_id) all_room_ids.update(room_ids) - self.notifier.on_new_event( - "device_list_key", token, rooms=all_room_ids, - ) + self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) elif stream_name == "presence": yield self.presence_handler.process_replication_rows(token, rows) elif stream_name == "receipts": self.notifier.on_new_event( - "groups_key", token, users=[row.user_id for row in rows], + "groups_key", token, users=[row.user_id for row in rows] ) except Exception: logger.exception("Error processing replication") @@ -423,9 +429,7 @@ class SyncReplicationHandler(ReplicationClientHandler): def start(config_options): try: - config = HomeServerConfig.load_config( - "Synapse synchrotron", config_options - ) + config = HomeServerConfig.load_config("Synapse synchrotron", config_options) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) @@ -453,6 +457,6 @@ def start(config_options): _base.start_worker_reactor("synapse-synchrotron", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 355f5aa71d..2d9d2e1bbc 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -66,14 +66,16 @@ class UserDirectorySlaveStore( events_max = self._stream_id_gen.get_current_token() curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict( - db_conn, "current_state_delta_stream", + db_conn, + "current_state_delta_stream", entity_column="room_id", stream_column="stream_id", max_value=events_max, # As we share the stream id with events token limit=1000, ) self._curr_state_delta_stream_cache = StreamChangeCache( - "_curr_state_delta_stream_cache", min_curr_state_delta_id, + "_curr_state_delta_stream_cache", + min_curr_state_delta_id, prefilled_cache=curr_state_delta_prefill, ) @@ -110,12 +112,14 @@ class UserDirectoryServer(HomeServer): elif name == "client": resource = JsonResource(self, canonical_json=False) user_directory.register_servlets(self, resource) - resources.update({ - "/_matrix/client/r0": resource, - "/_matrix/client/unstable": resource, - "/_matrix/client/v2_alpha": resource, - "/_matrix/client/api/v1": resource, - }) + resources.update( + { + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + } + ) root_resource = create_resource_tree(resources, NoResource()) @@ -128,7 +132,7 @@ class UserDirectoryServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse user_dir now listening on port %d", port) @@ -142,18 +146,19 @@ class UserDirectoryServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -186,9 +191,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler): def start(config_options): try: - config = HomeServerConfig.load_config( - "Synapse user directory", config_options - ) + config = HomeServerConfig.load_config("Synapse user directory", config_options) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) @@ -227,6 +230,6 @@ def start(config_options): _base.start_worker_reactor("synapse-user-dir", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) |