From bd6dc17221741d4ceae05ae769a70696ae939336 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 15 Jun 2020 07:03:36 -0400 Subject: Replace iteritems/itervalues/iterkeys with native versions. (#7692) --- synapse/federation/send_queue.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'synapse/federation/send_queue.py') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 52f4f54215..6bbd762681 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -33,8 +33,6 @@ import logging from collections import namedtuple from typing import Dict, List, Tuple, Type -from six import iteritems - from sortedcontainers import SortedDict from twisted.internet import defer @@ -327,7 +325,7 @@ class FederationRemoteSendQueue(object): # stream position. keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]} - for ((destination, edu_key), pos) in iteritems(keyed_edus): + for ((destination, edu_key), pos) in keyed_edus.items(): rows.append( ( pos, @@ -530,10 +528,10 @@ def process_rows_for_federation(transaction_queue, rows): states=[state], destinations=destinations ) - for destination, edu_map in iteritems(buff.keyed_edus): + for destination, edu_map in buff.keyed_edus.items(): for key, edu in edu_map.items(): transaction_queue.send_edu(edu, key) - for destination, edu_list in iteritems(buff.edus): + for destination, edu_list in buff.edus.items(): for edu in edu_list: transaction_queue.send_edu(edu, None) -- cgit 1.5.1 From 38e1fac8861f12b707609da06008695a05aaf21c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 9 Jul 2020 09:52:58 -0400 Subject: Fix some spelling mistakes / typos. (#7811) --- changelog.d/7811.misc | 1 + synapse/api/auth.py | 2 +- synapse/config/emailconfig.py | 2 +- synapse/federation/federation_client.py | 4 ++-- synapse/federation/federation_server.py | 6 +++--- synapse/federation/send_queue.py | 2 +- synapse/federation/sender/per_destination_queue.py | 4 ++-- synapse/federation/transport/client.py | 2 +- synapse/federation/transport/server.py | 4 ++-- synapse/notifier.py | 2 +- synapse/replication/http/_base.py | 4 ++-- synapse/replication/tcp/__init__.py | 2 +- synapse/replication/tcp/commands.py | 2 +- synapse/replication/tcp/protocol.py | 2 +- synapse/replication/tcp/redis.py | 2 +- synapse/replication/tcp/streams/events.py | 2 +- synapse/streams/config.py | 4 ++-- synapse/streams/events.py | 2 +- synapse/util/__init__.py | 2 +- synapse/util/async_helpers.py | 2 +- synapse/util/caches/descriptors.py | 2 +- synapse/util/distributor.py | 2 +- synapse/util/patch_inline_callbacks.py | 2 +- synapse/util/retryutils.py | 4 ++-- synapse/visibility.py | 4 ++-- tests/crypto/test_keyring.py | 2 +- tests/rest/client/test_retention.py | 2 +- tests/rest/client/v1/test_presence.py | 2 +- tests/rest/client/v2_alpha/test_relations.py | 2 +- tests/test_mau.py | 2 +- tests/util/test_logcontext.py | 4 ++-- 31 files changed, 41 insertions(+), 40 deletions(-) create mode 100644 changelog.d/7811.misc (limited to 'synapse/federation/send_queue.py') diff --git a/changelog.d/7811.misc b/changelog.d/7811.misc new file mode 100644 index 0000000000..d907bba4df --- /dev/null +++ b/changelog.d/7811.misc @@ -0,0 +1 @@ +Fix various spelling errors in comments and log lines. diff --git a/synapse/api/auth.py b/synapse/api/auth.py index cb22508f4d..40dc62ef6c 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -537,7 +537,7 @@ class Auth(object): # Currently we ignore the `for_verification` flag even though there are # some situations where we can drop particular auth events when adding # to the event's `auth_events` (e.g. joins pointing to previous joins - # when room is publically joinable). Dropping event IDs has the + # when room is publicly joinable). Dropping event IDs has the # advantage that the auth chain for the room grows slower, but we use # the auth chain in state resolution v2 to order events, which means # care must be taken if dropping events to ensure that it doesn't diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py index df08bcd1bc..b1dc7ad502 100644 --- a/synapse/config/emailconfig.py +++ b/synapse/config/emailconfig.py @@ -72,7 +72,7 @@ class EmailConfig(Config): template_dir = email_config.get("template_dir") # we need an absolute path, because we change directory after starting (and - # we don't yet know what auxilliary templates like mail.css we will need). + # we don't yet know what auxiliary templates like mail.css we will need). # (Note that loading as package_resources with jinja.PackageLoader doesn't # work for the same reason.) if not template_dir: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 07d41ec03f..a37cc9cb4a 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -245,7 +245,7 @@ class FederationClient(FederationBase): event_id: event to fetch room_version: version of the room outlier: Indicates whether the PDU is an `outlier`, i.e. if - it's from an arbitary point in the context as opposed to part + it's from an arbitrary point in the context as opposed to part of the current block of PDUs. Defaults to `False` timeout: How long to try (in ms) each destination for before moving to the next destination. None indicates no timeout. @@ -351,7 +351,7 @@ class FederationClient(FederationBase): outlier: bool = False, include_none: bool = False, ) -> List[EventBase]: - """Takes a list of PDUs and checks the signatures and hashs of each + """Takes a list of PDUs and checks the signatures and hashes of each one. If a PDU fails its signature check then we check if we have it in the database and if not then request if from the originating server of that PDU. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e704cf2f44..86051decd4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -717,7 +717,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool: # server name is a literal IP allow_ip_literals = acl_event.content.get("allow_ip_literals", True) if not isinstance(allow_ip_literals, bool): - logger.warning("Ignorning non-bool allow_ip_literals flag") + logger.warning("Ignoring non-bool allow_ip_literals flag") allow_ip_literals = True if not allow_ip_literals: # check for ipv6 literals. These start with '['. @@ -731,7 +731,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool: # next, check the deny list deny = acl_event.content.get("deny", []) if not isinstance(deny, (list, tuple)): - logger.warning("Ignorning non-list deny ACL %s", deny) + logger.warning("Ignoring non-list deny ACL %s", deny) deny = [] for e in deny: if _acl_entry_matches(server_name, e): @@ -741,7 +741,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool: # then the allow list. allow = acl_event.content.get("allow", []) if not isinstance(allow, (list, tuple)): - logger.warning("Ignorning non-list allow ACL %s", allow) + logger.warning("Ignoring non-list allow ACL %s", allow) allow = [] for e in allow: if _acl_entry_matches(server_name, e): diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 6bbd762681..860b03f7b9 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -359,7 +359,7 @@ class BaseFederationRow(object): Specifies how to identify, serialize and deserialize the different types. """ - TypeId = "" # Unique string that ids the type. Must be overriden in sub classes. + TypeId = "" # Unique string that ids the type. Must be overridden in sub classes. @staticmethod def from_data(data): diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 4e698981a4..12966e239b 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -119,7 +119,7 @@ class PerDestinationQueue(object): ) def send_pdu(self, pdu: EventBase, order: int) -> None: - """Add a PDU to the queue, and start the transmission loop if neccessary + """Add a PDU to the queue, and start the transmission loop if necessary Args: pdu: pdu to send @@ -129,7 +129,7 @@ class PerDestinationQueue(object): self.attempt_new_transaction() def send_presence(self, states: Iterable[UserPresenceState]) -> None: - """Add presence updates to the queue. Start the transmission loop if neccessary. + """Add presence updates to the queue. Start the transmission loop if necessary. Args: states: presence to send diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 9f99311419..cfdf23d366 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -746,7 +746,7 @@ class TransportLayerClient(object): def remove_user_from_group( self, destination, group_id, requester_user_id, user_id, content ): - """Remove a user fron a group + """Remove a user from a group """ path = _create_v1_path("/groups/%s/users/%s/remove", group_id, user_id) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index bfb7831a02..d1bac318e7 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -109,7 +109,7 @@ class Authenticator(object): self.server_name = hs.hostname self.store = hs.get_datastore() self.federation_domain_whitelist = hs.config.federation_domain_whitelist - self.notifer = hs.get_notifier() + self.notifier = hs.get_notifier() self.replication_client = None if hs.config.worker.worker_app: @@ -175,7 +175,7 @@ class Authenticator(object): await self.store.set_destination_retry_timings(origin, None, 0, 0) # Inform the relevant places that the remote server is back up. - self.notifer.notify_remote_server_up(origin) + self.notifier.notify_remote_server_up(origin) if self.replication_client: # If we're on a worker we try and inform master about this. The # replication client doesn't hook into the notifier to avoid diff --git a/synapse/notifier.py b/synapse/notifier.py index 87c120a59c..bd41f77852 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -83,7 +83,7 @@ class _NotifierUserStream(object): self.current_token = current_token # The last token for which we should wake up any streams that have a - # token that comes before it. This gets updated everytime we get poked. + # token that comes before it. This gets updated every time we get poked. # We start it at the current token since if we get any streams # that have a token from before we have no idea whether they should be # woken up or not, so lets just wake them up. diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 0843d28d4b..fb0dd04f88 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -92,11 +92,11 @@ class ReplicationEndpoint(object): # assert here that sub classes don't try and use the name. assert ( "instance_name" not in self.PATH_ARGS - ), "`instance_name` is a reserved paramater name" + ), "`instance_name` is a reserved parameter name" assert ( "instance_name" not in signature(self.__class__._serialize_payload).parameters - ), "`instance_name` is a reserved paramater name" + ), "`instance_name` is a reserved parameter name" assert self.METHOD in ("PUT", "POST", "GET") diff --git a/synapse/replication/tcp/__init__.py b/synapse/replication/tcp/__init__.py index 523a1358d4..1b8718b11d 100644 --- a/synapse/replication/tcp/__init__.py +++ b/synapse/replication/tcp/__init__.py @@ -25,7 +25,7 @@ Structure of the module: * command.py - the definitions of all the valid commands * protocol.py - the TCP protocol classes * resource.py - handles streaming stream updates to replications - * streams/ - the definitons of all the valid streams + * streams/ - the definitions of all the valid streams The general interaction of the classes are: diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 0f453ff0a8..ccc7f1f0d1 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -47,7 +47,7 @@ class Command(metaclass=abc.ABCMeta): @abc.abstractmethod def to_line(self) -> str: - """Serialises the comamnd for the wire. Does not include the command + """Serialises the command for the wire. Does not include the command prefix. """ diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 4198eece71..ca47f5cc88 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -317,7 +317,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): def _queue_command(self, cmd): """Queue the command until the connection is ready to write to again. """ - logger.debug("[%s] Queing as conn %r, cmd: %r", self.id(), self.state, cmd) + logger.debug("[%s] Queueing as conn %r, cmd: %r", self.id(), self.state, cmd) self.pending_commands.append(cmd) if len(self.pending_commands) > self.max_line_buffer: diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index e776b63183..0a7e7f67be 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -177,7 +177,7 @@ class RedisDirectTcpReplicationClientFactory(txredisapi.SubscriberFactory): Args: hs outbound_redis_connection: A connection to redis that will be used to - send outbound commands (this is seperate to the redis connection + send outbound commands (this is separate to the redis connection used to subscribe). """ diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index bdddb62ad6..1c2a4cce7f 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -62,7 +62,7 @@ class BaseEventsStreamRow(object): Specifies how to identify, serialize and deserialize the different types. """ - # Unique string that ids the type. Must be overriden in sub classes. + # Unique string that ids the type. Must be overridden in sub classes. TypeId = None # type: str @classmethod diff --git a/synapse/streams/config.py b/synapse/streams/config.py index cd56cd91ed..ca7c16ff65 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -68,13 +68,13 @@ class PaginationConfig(object): elif from_tok: from_tok = StreamToken.from_string(from_tok) except Exception: - raise SynapseError(400, "'from' paramater is invalid") + raise SynapseError(400, "'from' parameter is invalid") try: if to_tok: to_tok = StreamToken.from_string(to_tok) except Exception: - raise SynapseError(400, "'to' paramater is invalid") + raise SynapseError(400, "'to' parameter is invalid") limit = parse_integer(request, "limit", default=default_limit) diff --git a/synapse/streams/events.py b/synapse/streams/events.py index fcd2aaa9c9..5d3eddcfdc 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -68,7 +68,7 @@ class EventSources(object): The returned token does not have the current values for fields other than `room`, since they are not used during pagination. - Retuns: + Returns: Deferred[StreamToken] """ token = StreamToken( diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 60f0de70f7..c63256d3bd 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -55,7 +55,7 @@ class Clock(object): return self._reactor.seconds() def time_msec(self): - """Returns the current system time in miliseconds since epoch.""" + """Returns the current system time in milliseconds since epoch.""" return int(self.time() * 1000) def looping_call(self, f, msec, *args, **kwargs): diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 65abf0846e..f562770922 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -352,7 +352,7 @@ class ReadWriteLock(object): # resolved when they release the lock). # # Read: We know its safe to acquire a read lock when the latest writer has - # been resolved. The new reader is appeneded to the list of latest readers. + # been resolved. The new reader is appended to the list of latest readers. # # Write: We know its safe to acquire the write lock when both the latest # writers and readers have been resolved. The new writer replaces the latest diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 64f35fc288..9b09c08b89 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -516,7 +516,7 @@ class CacheListDescriptor(_CacheDescriptorBase): """ Args: orig (function) - cached_method_name (str): The name of the chached method. + cached_method_name (str): The name of the cached method. list_name (str): Name of the argument which is the bulk lookup list num_args (int): number of positional arguments (excluding ``self``, but including list_name) to use as cache keys. Defaults to all diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 45af8d3eeb..da20523b70 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -39,7 +39,7 @@ class Distributor(object): Signals are named simply by strings. TODO(paul): It would be nice to give signals stronger object identities, - so we can attach metadata, docstrings, detect typoes, etc... But this + so we can attach metadata, docstrings, detect typos, etc... But this model will do for today. """ diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py index 2605f3c65b..54c046b6e1 100644 --- a/synapse/util/patch_inline_callbacks.py +++ b/synapse/util/patch_inline_callbacks.py @@ -192,7 +192,7 @@ def _check_yield_points(f: Callable, changes: List[str]): result = yield d except Exception: # this will fish an earlier Failure out of the stack where possible, and - # thus is preferable to passing in an exeception to the Failure + # thus is preferable to passing in an exception to the Failure # constructor, since it results in less stack-mangling. result = Failure() diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index af69587196..8794317caa 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -22,7 +22,7 @@ from synapse.api.errors import CodeMessageException logger = logging.getLogger(__name__) -# the intial backoff, after the first transaction fails +# the initial backoff, after the first transaction fails MIN_RETRY_INTERVAL = 10 * 60 * 1000 # how much we multiply the backoff by after each subsequent fail @@ -174,7 +174,7 @@ class RetryDestinationLimiter(object): # has been decommissioned. # If we get a 401, then we should probably back off since they # won't accept our requests for at least a while. - # 429 is us being aggresively rate limited, so lets rate limit + # 429 is us being aggressively rate limited, so lets rate limit # ourselves. if exc_val.code == 404 and self.backoff_on_404: valid_err_code = False diff --git a/synapse/visibility.py b/synapse/visibility.py index 3dfd4af26c..0f042c5696 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -319,7 +319,7 @@ def filter_events_for_server( return True # Lets check to see if all the events have a history visibility - # of "shared" or "world_readable". If thats the case then we don't + # of "shared" or "world_readable". If that's the case then we don't # need to check membership (as we know the server is in the room). event_to_state_ids = yield storage.state.get_state_ids_for_events( frozenset(e.event_id for e in events), @@ -335,7 +335,7 @@ def filter_events_for_server( visibility_ids.add(hist) # If we failed to find any history visibility events then the default - # is "shared" visiblity. + # is "shared" visibility. if not visibility_ids: all_open = True else: diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index 70c8e72303..f9ce609923 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -192,7 +192,7 @@ class KeyringTestCase(unittest.HomeserverTestCase): d = _verify_json_for_server(kr, "server9", {}, 0, "test unsigned") self.failureResultOf(d, SynapseError) - # should suceed on a signed object + # should succeed on a signed object d = _verify_json_for_server(kr, "server9", json1, 500, "test signed") # self.assertFalse(d.called) self.get_success(d) diff --git a/tests/rest/client/test_retention.py b/tests/rest/client/test_retention.py index 95475bb651..e54ffea150 100644 --- a/tests/rest/client/test_retention.py +++ b/tests/rest/client/test_retention.py @@ -126,7 +126,7 @@ class RetentionTestCase(unittest.HomeserverTestCase): events.append(self.get_success(store.get_event(valid_event_id))) - # Advance the time by anothe 2 days. After this, the first event should be + # Advance the time by another 2 days. After this, the first event should be # outdated but not the second one. self.reactor.advance(one_day_ms * 2 / 1000) diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 0fdff79aa7..3c66255dac 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -60,7 +60,7 @@ class PresenceTestCase(unittest.HomeserverTestCase): def test_put_presence_disabled(self): """ - PUT to the status endpoint with use_presence disbled will NOT call + PUT to the status endpoint with use_presence disabled will NOT call set_state on the presence handler. """ self.hs.config.use_presence = False diff --git a/tests/rest/client/v2_alpha/test_relations.py b/tests/rest/client/v2_alpha/test_relations.py index fd641a7c2f..99c9f4e928 100644 --- a/tests/rest/client/v2_alpha/test_relations.py +++ b/tests/rest/client/v2_alpha/test_relations.py @@ -99,7 +99,7 @@ class RelationsTestCase(unittest.HomeserverTestCase): self.assertEquals(400, channel.code, channel.json_body) def test_basic_paginate_relations(self): - """Tests that calling pagination API corectly the latest relations. + """Tests that calling pagination API correctly the latest relations. """ channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction") self.assertEquals(200, channel.code, channel.json_body) diff --git a/tests/test_mau.py b/tests/test_mau.py index 49667ed7f4..654a6fa42d 100644 --- a/tests/test_mau.py +++ b/tests/test_mau.py @@ -166,7 +166,7 @@ class TestMauLimit(unittest.HomeserverTestCase): self.do_sync_for_user(token5) self.do_sync_for_user(token6) - # But old user cant + # But old user can't with self.assertRaises(SynapseError) as cm: self.do_sync_for_user(token1) diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index 95301c013c..58ee918f65 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -124,7 +124,7 @@ class LoggingContextTestCase(unittest.TestCase): @defer.inlineCallbacks def test_make_deferred_yieldable(self): - # a function which retuns an incomplete deferred, but doesn't follow + # a function which returns an incomplete deferred, but doesn't follow # the synapse rules. def blocking_function(): d = defer.Deferred() @@ -183,7 +183,7 @@ class LoggingContextTestCase(unittest.TestCase): @defer.inlineCallbacks def test_make_deferred_yieldable_with_await(self): - # an async function which retuns an incomplete coroutine, but doesn't + # an async function which returns an incomplete coroutine, but doesn't # follow the synapse rules. async def blocking_function(): -- cgit 1.5.1 From f299441cc67f31dcd47b8fdfda4a218bee9df9ba Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 10 Jul 2020 18:26:36 +0100 Subject: Add ability to shard the federation sender (#7798) --- changelog.d/7798.feature | 1 + docs/sample_config.yaml | 65 ++--- synapse/app/generic_worker.py | 59 ++--- synapse/config/federation.py | 129 ++++++++++ synapse/config/homeserver.py | 3 + synapse/config/server.py | 66 ----- synapse/federation/send_queue.py | 14 +- synapse/federation/sender/__init__.py | 48 +++- synapse/federation/sender/per_destination_queue.py | 22 ++ synapse/replication/tcp/commands.py | 10 +- synapse/replication/tcp/handler.py | 4 +- .../delta/58/10federation_pos_instance_name.sql | 22 ++ synapse/storage/data_stores/main/stream.py | 97 ++++++- tests/replication/test_federation_ack.py | 1 + tests/replication/test_federation_sender_shard.py | 286 +++++++++++++++++++++ 15 files changed, 670 insertions(+), 157 deletions(-) create mode 100644 changelog.d/7798.feature create mode 100644 synapse/config/federation.py create mode 100644 synapse/storage/data_stores/main/schema/delta/58/10federation_pos_instance_name.sql create mode 100644 tests/replication/test_federation_sender_shard.py (limited to 'synapse/federation/send_queue.py') diff --git a/changelog.d/7798.feature b/changelog.d/7798.feature new file mode 100644 index 0000000000..56ffaf0d4a --- /dev/null +++ b/changelog.d/7798.feature @@ -0,0 +1 @@ +Add experimental support for running multiple federation sender processes. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 164a104045..1a2d9fb153 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -118,38 +118,6 @@ pid_file: DATADIR/homeserver.pid # #enable_search: false -# Restrict federation to the following whitelist of domains. -# N.B. we recommend also firewalling your federation listener to limit -# inbound federation traffic as early as possible, rather than relying -# purely on this application-layer restriction. If not specified, the -# default is to whitelist everything. -# -#federation_domain_whitelist: -# - lon.example.com -# - nyc.example.com -# - syd.example.com - -# Prevent federation requests from being sent to the following -# blacklist IP address CIDR ranges. If this option is not specified, or -# specified with an empty list, no ip range blacklist will be enforced. -# -# As of Synapse v1.4.0 this option also affects any outbound requests to identity -# servers provided by user input. -# -# (0.0.0.0 and :: are always blacklisted, whether or not they are explicitly -# listed here, since they correspond to unroutable addresses.) -# -federation_ip_range_blacklist: - - '127.0.0.0/8' - - '10.0.0.0/8' - - '172.16.0.0/12' - - '192.168.0.0/16' - - '100.64.0.0/10' - - '169.254.0.0/16' - - '::1/128' - - 'fe80::/64' - - 'fc00::/7' - # List of ports that Synapse should listen on, their purpose and their # configuration. # @@ -608,6 +576,39 @@ acme: +# Restrict federation to the following whitelist of domains. +# N.B. we recommend also firewalling your federation listener to limit +# inbound federation traffic as early as possible, rather than relying +# purely on this application-layer restriction. If not specified, the +# default is to whitelist everything. +# +#federation_domain_whitelist: +# - lon.example.com +# - nyc.example.com +# - syd.example.com + +# Prevent federation requests from being sent to the following +# blacklist IP address CIDR ranges. If this option is not specified, or +# specified with an empty list, no ip range blacklist will be enforced. +# +# As of Synapse v1.4.0 this option also affects any outbound requests to identity +# servers provided by user input. +# +# (0.0.0.0 and :: are always blacklisted, whether or not they are explicitly +# listed here, since they correspond to unroutable addresses.) +# +federation_ip_range_blacklist: + - '127.0.0.0/8' + - '10.0.0.0/8' + - '172.16.0.0/12' + - '192.168.0.0/16' + - '100.64.0.0/10' + - '169.254.0.0/16' + - '::1/128' + - 'fe80::/64' + - 'fc00::/7' + + ## Caching ## # Caching can be configured through the following options. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index f6792d9fc8..e90695f026 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -511,25 +511,7 @@ class GenericWorkerSlavedStore( SearchWorkerStore, BaseSlavedStore, ): - def __init__(self, database, db_conn, hs): - super(GenericWorkerSlavedStore, self).__init__(database, db_conn, hs) - - # We pull out the current federation stream position now so that we - # always have a known value for the federation position in memory so - # that we don't have to bounce via a deferred once when we start the - # replication streams. - self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) - - def _get_federation_out_pos(self, db_conn): - sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?" - sql = self.database_engine.convert_param_style(sql) - - txn = db_conn.cursor() - txn.execute(sql, ("federation",)) - rows = txn.fetchall() - txn.close() - - return rows[0][0] if rows else -1 + pass class GenericWorkerServer(HomeServer): @@ -812,19 +794,11 @@ class FederationSenderHandler(object): self.federation_sender = hs.get_federation_sender() self._hs = hs - # if the worker is restarted, we want to pick up where we left off in - # the replication stream, so load the position from the database. - # - # XXX is this actually worthwhile? Whenever the master is restarted, we'll - # drop some rows anyway (which is mostly fine because we're only dropping - # typing and presence notifications). If the replication stream is - # unreliable, why do we do all this hoop-jumping to store the position in the - # database? See also https://github.com/matrix-org/synapse/issues/7535. - # - self.federation_position = self.store.federation_out_pos_startup + # Stores the latest position in the federation stream we've gotten up + # to. This is always set before we use it. + self.federation_position = None self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") - self._last_ack = self.federation_position def on_start(self): # There may be some events that are persisted but haven't been sent, @@ -932,7 +906,6 @@ class FederationSenderHandler(object): # We ACK this token over replication so that the master can drop # its in memory queues self._hs.get_tcp_replication().send_federation_ack(current_position) - self._last_ack = current_position except Exception: logger.exception("Error updating federation stream position") @@ -960,7 +933,7 @@ def start(config_options): ) if config.worker_app == "synapse.app.appservice": - if config.notify_appservices: + if config.appservice.notify_appservices: sys.stderr.write( "\nThe appservices must be disabled in the main synapse process" "\nbefore they can be run in a separate worker." @@ -970,13 +943,13 @@ def start(config_options): sys.exit(1) # Force the appservice to start since they will be disabled in the main config - config.notify_appservices = True + config.appservice.notify_appservices = True else: # For other worker types we force this to off. - config.notify_appservices = False + config.appservice.notify_appservices = False if config.worker_app == "synapse.app.pusher": - if config.start_pushers: + if config.server.start_pushers: sys.stderr.write( "\nThe pushers must be disabled in the main synapse process" "\nbefore they can be run in a separate worker." @@ -986,13 +959,13 @@ def start(config_options): sys.exit(1) # Force the pushers to start since they will be disabled in the main config - config.start_pushers = True + config.server.start_pushers = True else: # For other worker types we force this to off. - config.start_pushers = False + config.server.start_pushers = False if config.worker_app == "synapse.app.user_dir": - if config.update_user_directory: + if config.server.update_user_directory: sys.stderr.write( "\nThe update_user_directory must be disabled in the main synapse process" "\nbefore they can be run in a separate worker." @@ -1002,13 +975,13 @@ def start(config_options): sys.exit(1) # Force the pushers to start since they will be disabled in the main config - config.update_user_directory = True + config.server.update_user_directory = True else: # For other worker types we force this to off. - config.update_user_directory = False + config.server.update_user_directory = False if config.worker_app == "synapse.app.federation_sender": - if config.send_federation: + if config.federation.send_federation: sys.stderr.write( "\nThe send_federation must be disabled in the main synapse process" "\nbefore they can be run in a separate worker." @@ -1018,10 +991,10 @@ def start(config_options): sys.exit(1) # Force the pushers to start since they will be disabled in the main config - config.send_federation = True + config.federation.send_federation = True else: # For other worker types we force this to off. - config.send_federation = False + config.federation.send_federation = False synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts diff --git a/synapse/config/federation.py b/synapse/config/federation.py new file mode 100644 index 0000000000..7782ab4c9d --- /dev/null +++ b/synapse/config/federation.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hashlib import sha256 +from typing import List, Optional + +import attr +from netaddr import IPSet + +from ._base import Config, ConfigError + + +@attr.s +class ShardedFederationSendingConfig: + """Algorithm for choosing which federation sender instance is responsible + for which destionation host. + """ + + instances = attr.ib(type=List[str]) + + def should_send_to(self, instance_name: str, destination: str) -> bool: + """Whether this instance is responsible for sending transcations for + the given host. + """ + + # If multiple federation senders are not defined we always return true. + if not self.instances or len(self.instances) == 1: + return True + + # We shard by taking the hash, modulo it by the number of federation + # senders and then checking whether this instance matches the instance + # at that index. + # + # (Technically this introduces some bias and is not entirely uniform, but + # since the hash is so large the bias is ridiculously small). + dest_hash = sha256(destination.encode("utf8")).digest() + dest_int = int.from_bytes(dest_hash, byteorder="little") + remainder = dest_int % (len(self.instances)) + return self.instances[remainder] == instance_name + + +class FederationConfig(Config): + section = "federation" + + def read_config(self, config, **kwargs): + # Whether to send federation traffic out in this process. This only + # applies to some federation traffic, and so shouldn't be used to + # "disable" federation + self.send_federation = config.get("send_federation", True) + + federation_sender_instances = config.get("federation_sender_instances") or [] + self.federation_shard_config = ShardedFederationSendingConfig( + federation_sender_instances + ) + + # FIXME: federation_domain_whitelist needs sytests + self.federation_domain_whitelist = None # type: Optional[dict] + federation_domain_whitelist = config.get("federation_domain_whitelist", None) + + if federation_domain_whitelist is not None: + # turn the whitelist into a hash for speed of lookup + self.federation_domain_whitelist = {} + + for domain in federation_domain_whitelist: + self.federation_domain_whitelist[domain] = True + + self.federation_ip_range_blacklist = config.get( + "federation_ip_range_blacklist", [] + ) + + # Attempt to create an IPSet from the given ranges + try: + self.federation_ip_range_blacklist = IPSet( + self.federation_ip_range_blacklist + ) + + # Always blacklist 0.0.0.0, :: + self.federation_ip_range_blacklist.update(["0.0.0.0", "::"]) + except Exception as e: + raise ConfigError( + "Invalid range(s) provided in federation_ip_range_blacklist: %s" % e + ) + + def generate_config_section(self, config_dir_path, server_name, **kwargs): + return """\ + # Restrict federation to the following whitelist of domains. + # N.B. we recommend also firewalling your federation listener to limit + # inbound federation traffic as early as possible, rather than relying + # purely on this application-layer restriction. If not specified, the + # default is to whitelist everything. + # + #federation_domain_whitelist: + # - lon.example.com + # - nyc.example.com + # - syd.example.com + + # Prevent federation requests from being sent to the following + # blacklist IP address CIDR ranges. If this option is not specified, or + # specified with an empty list, no ip range blacklist will be enforced. + # + # As of Synapse v1.4.0 this option also affects any outbound requests to identity + # servers provided by user input. + # + # (0.0.0.0 and :: are always blacklisted, whether or not they are explicitly + # listed here, since they correspond to unroutable addresses.) + # + federation_ip_range_blacklist: + - '127.0.0.0/8' + - '10.0.0.0/8' + - '172.16.0.0/12' + - '192.168.0.0/16' + - '100.64.0.0/10' + - '169.254.0.0/16' + - '::1/128' + - 'fe80::/64' + - 'fc00::/7' + """ diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 264c274c52..8e93d31394 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -23,6 +23,7 @@ from .cas import CasConfig from .consent_config import ConsentConfig from .database import DatabaseConfig from .emailconfig import EmailConfig +from .federation import FederationConfig from .groups import GroupsConfig from .jwt_config import JWTConfig from .key import KeyConfig @@ -57,6 +58,7 @@ class HomeServerConfig(RootConfig): config_classes = [ ServerConfig, TlsConfig, + FederationConfig, CacheConfig, DatabaseConfig, LoggingConfig, @@ -90,4 +92,5 @@ class HomeServerConfig(RootConfig): ThirdPartyRulesConfig, TracerConfig, RedisConfig, + FederationConfig, ] diff --git a/synapse/config/server.py b/synapse/config/server.py index 8204664883..b6afa642ca 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -23,7 +23,6 @@ from typing import Any, Dict, Iterable, List, Optional import attr import yaml -from netaddr import IPSet from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.http.endpoint import parse_and_validate_server_name @@ -136,11 +135,6 @@ class ServerConfig(Config): self.use_frozen_dicts = config.get("use_frozen_dicts", False) self.public_baseurl = config.get("public_baseurl") - # Whether to send federation traffic out in this process. This only - # applies to some federation traffic, and so shouldn't be used to - # "disable" federation - self.send_federation = config.get("send_federation", True) - # Whether to enable user presence. self.use_presence = config.get("use_presence", True) @@ -263,34 +257,6 @@ class ServerConfig(Config): # due to resource constraints self.admin_contact = config.get("admin_contact", None) - # FIXME: federation_domain_whitelist needs sytests - self.federation_domain_whitelist = None # type: Optional[dict] - federation_domain_whitelist = config.get("federation_domain_whitelist", None) - - if federation_domain_whitelist is not None: - # turn the whitelist into a hash for speed of lookup - self.federation_domain_whitelist = {} - - for domain in federation_domain_whitelist: - self.federation_domain_whitelist[domain] = True - - self.federation_ip_range_blacklist = config.get( - "federation_ip_range_blacklist", [] - ) - - # Attempt to create an IPSet from the given ranges - try: - self.federation_ip_range_blacklist = IPSet( - self.federation_ip_range_blacklist - ) - - # Always blacklist 0.0.0.0, :: - self.federation_ip_range_blacklist.update(["0.0.0.0", "::"]) - except Exception as e: - raise ConfigError( - "Invalid range(s) provided in federation_ip_range_blacklist: %s" % e - ) - if self.public_baseurl is not None: if self.public_baseurl[-1] != "/": self.public_baseurl += "/" @@ -743,38 +709,6 @@ class ServerConfig(Config): # #enable_search: false - # Restrict federation to the following whitelist of domains. - # N.B. we recommend also firewalling your federation listener to limit - # inbound federation traffic as early as possible, rather than relying - # purely on this application-layer restriction. If not specified, the - # default is to whitelist everything. - # - #federation_domain_whitelist: - # - lon.example.com - # - nyc.example.com - # - syd.example.com - - # Prevent federation requests from being sent to the following - # blacklist IP address CIDR ranges. If this option is not specified, or - # specified with an empty list, no ip range blacklist will be enforced. - # - # As of Synapse v1.4.0 this option also affects any outbound requests to identity - # servers provided by user input. - # - # (0.0.0.0 and :: are always blacklisted, whether or not they are explicitly - # listed here, since they correspond to unroutable addresses.) - # - federation_ip_range_blacklist: - - '127.0.0.0/8' - - '10.0.0.0/8' - - '172.16.0.0/12' - - '192.168.0.0/16' - - '100.64.0.0/10' - - '169.254.0.0/16' - - '::1/128' - - 'fe80::/64' - - 'fc00::/7' - # List of ports that Synapse should listen on, their purpose and their # configuration. # diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 860b03f7b9..4fc9ff92e5 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -55,6 +55,11 @@ class FederationRemoteSendQueue(object): self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id + # We may have multiple federation sender instances, so we need to track + # their positions separately. + self._sender_instances = hs.config.federation.federation_shard_config.instances + self._sender_positions = {} + # Pending presence map user_id -> UserPresenceState self.presence_map = {} # type: Dict[str, UserPresenceState] @@ -261,7 +266,14 @@ class FederationRemoteSendQueue(object): def get_current_token(self): return self.pos - 1 - def federation_ack(self, token): + def federation_ack(self, instance_name, token): + if self._sender_instances: + # If we have configured multiple federation sender instances we need + # to track their positions separately, and only clear the queue up + # to the token all instances have acked. + self._sender_positions[instance_name] = token + token = min(self._sender_positions.values()) + self._clear_queue_before_pos(token) async def get_replication_rows( diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 464d7a41de..4b63a0755f 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -69,6 +69,9 @@ class FederationSender(object): self._transaction_manager = TransactionManager(hs) + self._instance_name = hs.get_instance_name() + self._federation_shard_config = hs.config.federation.federation_shard_config + # map from destination to PerDestinationQueue self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue] @@ -191,7 +194,13 @@ class FederationSender(object): ) return - destinations = set(destinations) + destinations = { + d + for d in destinations + if self._federation_shard_config.should_send_to( + self._instance_name, d + ) + } if send_on_behalf_of is not None: # If we are sending the event on behalf of another server @@ -322,7 +331,12 @@ class FederationSender(object): # Work out which remote servers should be poked and poke them. domains = yield self.state.get_current_hosts_in_room(room_id) - domains = [d for d in domains if d != self.server_name] + domains = [ + d + for d in domains + if d != self.server_name + and self._federation_shard_config.should_send_to(self._instance_name, d) + ] if not domains: return @@ -427,6 +441,10 @@ class FederationSender(object): for destination in destinations: if destination == self.server_name: continue + if not self._federation_shard_config.should_send_to( + self._instance_name, destination + ): + continue self._get_per_destination_queue(destination).send_presence(states) @measure_func("txnqueue._process_presence") @@ -441,6 +459,12 @@ class FederationSender(object): for destination in destinations: if destination == self.server_name: continue + + if not self._federation_shard_config.should_send_to( + self._instance_name, destination + ): + continue + self._get_per_destination_queue(destination).send_presence(states) def build_and_send_edu( @@ -462,6 +486,11 @@ class FederationSender(object): logger.info("Not sending EDU to ourselves") return + if not self._federation_shard_config.should_send_to( + self._instance_name, destination + ): + return + edu = Edu( origin=self.server_name, destination=destination, @@ -478,6 +507,11 @@ class FederationSender(object): edu: edu to send key: clobbering key for this edu """ + if not self._federation_shard_config.should_send_to( + self._instance_name, edu.destination + ): + return + queue = self._get_per_destination_queue(edu.destination) if key: queue.send_keyed_edu(edu, key) @@ -489,6 +523,11 @@ class FederationSender(object): logger.warning("Not sending device update to ourselves") return + if not self._federation_shard_config.should_send_to( + self._instance_name, destination + ): + return + self._get_per_destination_queue(destination).attempt_new_transaction() def wake_destination(self, destination: str): @@ -502,6 +541,11 @@ class FederationSender(object): logger.warning("Not waking up ourselves") return + if not self._federation_shard_config.should_send_to( + self._instance_name, destination + ): + return + self._get_per_destination_queue(destination).attempt_new_transaction() @staticmethod diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 12966e239b..6402136e8a 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -74,6 +74,20 @@ class PerDestinationQueue(object): self._clock = hs.get_clock() self._store = hs.get_datastore() self._transaction_manager = transaction_manager + self._instance_name = hs.get_instance_name() + self._federation_shard_config = hs.config.federation.federation_shard_config + + self._should_send_on_this_instance = True + if not self._federation_shard_config.should_send_to( + self._instance_name, destination + ): + # We don't raise an exception here to avoid taking out any other + # processing. We have a guard in `attempt_new_transaction` that + # ensure we don't start sending stuff. + logger.error( + "Create a per destination queue for %s on wrong worker", destination, + ) + self._should_send_on_this_instance = False self._destination = destination self.transmission_loop_running = False @@ -180,6 +194,14 @@ class PerDestinationQueue(object): logger.debug("TX [%s] Transaction already in progress", self._destination) return + if not self._should_send_on_this_instance: + # We don't raise an exception here to avoid taking out any other + # processing. + logger.error( + "Trying to start a transaction to %s on wrong worker", self._destination + ) + return + logger.debug("TX [%s] Starting transaction loop", self._destination) run_as_background_process( diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index ccc7f1f0d1..f33801f883 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -293,20 +293,22 @@ class FederationAckCommand(Command): Format:: - FEDERATION_ACK + FEDERATION_ACK """ NAME = "FEDERATION_ACK" - def __init__(self, token): + def __init__(self, instance_name, token): + self.instance_name = instance_name self.token = token @classmethod def from_line(cls, line): - return cls(int(line)) + instance_name, token = line.split(" ") + return cls(instance_name, int(token)) def to_line(self): - return str(self.token) + return "%s %s" % (self.instance_name, self.token) class RemovePusherCommand(Command): diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 55b3b79008..80f5df60f9 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -238,7 +238,7 @@ class ReplicationCommandHandler: federation_ack_counter.inc() if self._federation_sender: - self._federation_sender.federation_ack(cmd.token) + self._federation_sender.federation_ack(cmd.instance_name, cmd.token) async def on_REMOVE_PUSHER( self, conn: AbstractConnection, cmd: RemovePusherCommand @@ -527,7 +527,7 @@ class ReplicationCommandHandler: """Ack data for the federation stream. This allows the master to drop data stored purely in memory. """ - self.send_command(FederationAckCommand(token)) + self.send_command(FederationAckCommand(self._instance_name, token)) def send_user_sync( self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int diff --git a/synapse/storage/data_stores/main/schema/delta/58/10federation_pos_instance_name.sql b/synapse/storage/data_stores/main/schema/delta/58/10federation_pos_instance_name.sql new file mode 100644 index 0000000000..1cc2633aad --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/58/10federation_pos_instance_name.sql @@ -0,0 +1,22 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- We need to store the stream positions by instance in a sharded config world. +-- +-- We default to master as we want the column to be NOT NULL and we correctly +-- reset the instance name to match the config each time we start up. +ALTER TABLE federation_stream_position ADD COLUMN instance_name TEXT NOT NULL DEFAULT 'master'; + +CREATE UNIQUE INDEX federation_stream_position_instance ON federation_stream_position(type, instance_name); diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 379d758b5d..5e32c7aa1e 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -45,7 +45,7 @@ from twisted.internet import defer from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.storage._base import SQLBaseStore from synapse.storage.data_stores.main.events_worker import EventsWorkerStore -from synapse.storage.database import Database +from synapse.storage.database import Database, make_in_list_sql_clause from synapse.storage.engines import PostgresEngine from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -253,6 +253,16 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def __init__(self, database: Database, db_conn, hs): super(StreamWorkerStore, self).__init__(database, db_conn, hs) + self._instance_name = hs.get_instance_name() + self._send_federation = hs.should_send_federation() + self._federation_shard_config = hs.config.federation.federation_shard_config + + # If we're a process that sends federation we may need to reset the + # `federation_stream_position` table to match the current sharding + # config. We don't do this now as otherwise two processes could conflict + # during startup which would cause one to die. + self._need_to_reset_federation_stream_positions = self._send_federation + events_max = self.get_room_max_stream_ordering() event_cache_prefill, min_event_val = self.db.get_cache_dict( db_conn, @@ -793,22 +803,95 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return upper_bound, events - def get_federation_out_pos(self, typ): - return self.db.simple_select_one_onecol( + async def get_federation_out_pos(self, typ: str) -> int: + if self._need_to_reset_federation_stream_positions: + await self.db.runInteraction( + "_reset_federation_positions_txn", self._reset_federation_positions_txn + ) + self._need_to_reset_federation_stream_positions = False + + return await self.db.simple_select_one_onecol( table="federation_stream_position", retcol="stream_id", - keyvalues={"type": typ}, + keyvalues={"type": typ, "instance_name": self._instance_name}, desc="get_federation_out_pos", ) - def update_federation_out_pos(self, typ, stream_id): - return self.db.simple_update_one( + async def update_federation_out_pos(self, typ, stream_id): + if self._need_to_reset_federation_stream_positions: + await self.db.runInteraction( + "_reset_federation_positions_txn", self._reset_federation_positions_txn + ) + self._need_to_reset_federation_stream_positions = False + + return await self.db.simple_update_one( table="federation_stream_position", - keyvalues={"type": typ}, + keyvalues={"type": typ, "instance_name": self._instance_name}, updatevalues={"stream_id": stream_id}, desc="update_federation_out_pos", ) + def _reset_federation_positions_txn(self, txn): + """Fiddles with the `federation_stream_position` table to make it match + the configured federation sender instances during start up. + """ + + # The federation sender instances may have changed, so we need to + # massage the `federation_stream_position` table to have a row per type + # per instance sending federation. If there is a mismatch we update the + # table with the correct rows using the *minimum* stream ID seen. This + # may result in resending of events/EDUs to remote servers, but that is + # preferable to dropping them. + + if not self._send_federation: + return + + # Pull out the configured instances. If we don't have a shard config then + # we assume that we're the only instance sending. + configured_instances = self._federation_shard_config.instances + if not configured_instances: + configured_instances = [self._instance_name] + elif self._instance_name not in configured_instances: + return + + instances_in_table = self.db.simple_select_onecol_txn( + txn, + table="federation_stream_position", + keyvalues={}, + retcol="instance_name", + ) + + if set(instances_in_table) == set(configured_instances): + # Nothing to do + return + + sql = """ + SELECT type, MIN(stream_id) FROM federation_stream_position + GROUP BY type + """ + txn.execute(sql) + min_positions = dict(txn) # Map from type -> min position + + # Ensure we do actually have some values here + assert set(min_positions) == {"federation", "events"} + + sql = """ + DELETE FROM federation_stream_position + WHERE NOT (%s) + """ + clause, args = make_in_list_sql_clause( + txn.database_engine, "instance_name", configured_instances + ) + txn.execute(sql % (clause,), args) + + for typ, stream_id in min_positions.items(): + self.db.simple_upsert_txn( + txn, + table="federation_stream_position", + keyvalues={"type": typ, "instance_name": self._instance_name}, + values={"stream_id": stream_id}, + ) + def has_room_changed_since(self, room_id, stream_id): return self._events_stream_cache.has_entity_changed(room_id, stream_id) diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py index 5448d9f0dc..23be1167a3 100644 --- a/tests/replication/test_federation_ack.py +++ b/tests/replication/test_federation_ack.py @@ -32,6 +32,7 @@ class FederationAckTestCase(HomeserverTestCase): def make_homeserver(self, reactor, clock): hs = self.setup_test_homeserver(homeserverToUse=GenericWorkerServer) + return hs def test_federation_ack_sent(self): diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py new file mode 100644 index 0000000000..519a2dc510 --- /dev/null +++ b/tests/replication/test_federation_sender_shard.py @@ -0,0 +1,286 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from mock import Mock + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, Membership +from synapse.app.generic_worker import GenericWorkerServer +from synapse.events.builder import EventBuilderFactory +from synapse.replication.http import streams +from synapse.replication.tcp.handler import ReplicationCommandHandler +from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol +from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory +from synapse.rest.admin import register_servlets_for_client_rest_resource +from synapse.rest.client.v1 import login, room +from synapse.types import UserID + +from tests import unittest +from tests.server import FakeTransport + +logger = logging.getLogger(__name__) + + +class BaseStreamTestCase(unittest.HomeserverTestCase): + """Base class for tests of the replication streams""" + + servlets = [ + streams.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + # build a replication server + self.server_factory = ReplicationStreamProtocolFactory(hs) + self.streamer = hs.get_replication_streamer() + + store = hs.get_datastore() + self.database = store.db + + self.reactor.lookups["testserv"] = "1.2.3.4" + + def default_config(self): + conf = super().default_config() + conf["send_federation"] = False + return conf + + def make_worker_hs(self, extra_config={}): + config = self._get_worker_hs_config() + config.update(extra_config) + + mock_federation_client = Mock(spec=["put_json"]) + mock_federation_client.put_json.side_effect = lambda *_, **__: defer.succeed({}) + + worker_hs = self.setup_test_homeserver( + http_client=mock_federation_client, + homeserverToUse=GenericWorkerServer, + config=config, + reactor=self.reactor, + ) + + store = worker_hs.get_datastore() + store.db._db_pool = self.database._db_pool + + repl_handler = ReplicationCommandHandler(worker_hs) + client = ClientReplicationStreamProtocol( + worker_hs, "client", "test", self.clock, repl_handler, + ) + server = self.server_factory.buildProtocol(None) + + client_transport = FakeTransport(server, self.reactor) + client.makeConnection(client_transport) + + server_transport = FakeTransport(client, self.reactor) + server.makeConnection(server_transport) + + return worker_hs + + def _get_worker_hs_config(self) -> dict: + config = self.default_config() + config["worker_app"] = "synapse.app.federation_sender" + config["worker_replication_host"] = "testserv" + config["worker_replication_http_port"] = "8765" + return config + + def replicate(self): + """Tell the master side of replication that something has happened, and then + wait for the replication to occur. + """ + self.streamer.on_notifier_poke() + self.pump() + + def create_room_with_remote_server(self, user, token, remote_server="other_server"): + room = self.helper.create_room_as(user, tok=token) + store = self.hs.get_datastore() + federation = self.hs.get_handlers().federation_handler + + prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room)) + room_version = self.get_success(store.get_room_version(room)) + + factory = EventBuilderFactory(self.hs) + factory.hostname = remote_server + + user_id = UserID("user", remote_server).to_string() + + event_dict = { + "type": EventTypes.Member, + "state_key": user_id, + "content": {"membership": Membership.JOIN}, + "sender": user_id, + "room_id": room, + } + + builder = factory.for_room_version(room_version, event_dict) + join_event = self.get_success(builder.build(prev_event_ids)) + + self.get_success(federation.on_send_join_request(remote_server, join_event)) + self.replicate() + + return room + + +class FederationSenderTestCase(BaseStreamTestCase): + servlets = [ + login.register_servlets, + register_servlets_for_client_rest_resource, + room.register_servlets, + ] + + def test_send_event_single_sender(self): + """Test that using a single federation sender worker correctly sends a + new event. + """ + worker_hs = self.make_worker_hs({"send_federation": True}) + mock_client = worker_hs.get_http_client() + + user = self.register_user("user", "pass") + token = self.login("user", "pass") + + room = self.create_room_with_remote_server(user, token) + + mock_client.put_json.reset_mock() + + self.create_and_send_event(room, UserID.from_string(user)) + self.replicate() + + # Assert that the event was sent out over federation. + mock_client.put_json.assert_called() + self.assertEqual(mock_client.put_json.call_args[0][0], "other_server") + self.assertTrue(mock_client.put_json.call_args[1]["data"].get("pdus")) + + def test_send_event_sharded(self): + """Test that using two federation sender workers correctly sends + new events. + """ + worker1 = self.make_worker_hs( + { + "send_federation": True, + "worker_name": "sender1", + "federation_sender_instances": ["sender1", "sender2"], + } + ) + mock_client1 = worker1.get_http_client() + + worker2 = self.make_worker_hs( + { + "send_federation": True, + "worker_name": "sender2", + "federation_sender_instances": ["sender1", "sender2"], + } + ) + mock_client2 = worker2.get_http_client() + + user = self.register_user("user2", "pass") + token = self.login("user2", "pass") + + sent_on_1 = False + sent_on_2 = False + for i in range(20): + server_name = "other_server_%d" % (i,) + room = self.create_room_with_remote_server(user, token, server_name) + mock_client1.reset_mock() + mock_client2.reset_mock() + + self.create_and_send_event(room, UserID.from_string(user)) + self.replicate() + + if mock_client1.put_json.called: + sent_on_1 = True + mock_client2.put_json.assert_not_called() + self.assertEqual(mock_client1.put_json.call_args[0][0], server_name) + self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("pdus")) + elif mock_client2.put_json.called: + sent_on_2 = True + mock_client1.put_json.assert_not_called() + self.assertEqual(mock_client2.put_json.call_args[0][0], server_name) + self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("pdus")) + else: + raise AssertionError( + "Expected send transaction from one or the other sender" + ) + + if sent_on_1 and sent_on_2: + break + + self.assertTrue(sent_on_1) + self.assertTrue(sent_on_2) + + def test_send_typing_sharded(self): + """Test that using two federation sender workers correctly sends + new typing EDUs. + """ + worker1 = self.make_worker_hs( + { + "send_federation": True, + "worker_name": "sender1", + "federation_sender_instances": ["sender1", "sender2"], + } + ) + mock_client1 = worker1.get_http_client() + + worker2 = self.make_worker_hs( + { + "send_federation": True, + "worker_name": "sender2", + "federation_sender_instances": ["sender1", "sender2"], + } + ) + mock_client2 = worker2.get_http_client() + + user = self.register_user("user3", "pass") + token = self.login("user3", "pass") + + typing_handler = self.hs.get_typing_handler() + + sent_on_1 = False + sent_on_2 = False + for i in range(20): + server_name = "other_server_%d" % (i,) + room = self.create_room_with_remote_server(user, token, server_name) + mock_client1.reset_mock() + mock_client2.reset_mock() + + self.get_success( + typing_handler.started_typing( + target_user=UserID.from_string(user), + auth_user=UserID.from_string(user), + room_id=room, + timeout=20000, + ) + ) + + self.replicate() + + if mock_client1.put_json.called: + sent_on_1 = True + mock_client2.put_json.assert_not_called() + self.assertEqual(mock_client1.put_json.call_args[0][0], server_name) + self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("edus")) + elif mock_client2.put_json.called: + sent_on_2 = True + mock_client1.put_json.assert_not_called() + self.assertEqual(mock_client2.put_json.call_args[0][0], server_name) + self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("edus")) + else: + raise AssertionError( + "Expected send transaction from one or the other sender" + ) + + if sent_on_1 and sent_on_2: + break + + self.assertTrue(sent_on_1) + self.assertTrue(sent_on_2) -- cgit 1.5.1 From 2c1b9d676322fad8cb57c92f97f81393bcfcbe56 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 29 Jul 2020 23:22:13 +0100 Subject: Update worker docs with recent enhancements (#7969) --- changelog.d/7969.doc | 1 + docs/sample_config.yaml | 54 +++ docs/synctl_workers.md | 32 ++ docs/workers.md | 459 +++++++++++---------- synapse/app/generic_worker.py | 6 +- synapse/config/federation.py | 12 +- synapse/config/homeserver.py | 2 +- synapse/config/logger.py | 2 +- synapse/config/redis.py | 23 +- synapse/config/workers.py | 49 ++- synapse/federation/send_queue.py | 2 +- synapse/federation/sender/__init__.py | 2 +- synapse/federation/sender/per_destination_queue.py | 2 +- synapse/storage/data_stores/main/stream.py | 2 +- 14 files changed, 413 insertions(+), 235 deletions(-) create mode 100644 changelog.d/7969.doc create mode 100644 docs/synctl_workers.md (limited to 'synapse/federation/send_queue.py') diff --git a/changelog.d/7969.doc b/changelog.d/7969.doc new file mode 100644 index 0000000000..68d2ed5fad --- /dev/null +++ b/changelog.d/7969.doc @@ -0,0 +1 @@ +Update worker docs with latest enhancements. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 3227294e0b..b21e36bb6d 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -2398,3 +2398,57 @@ opentracing: # # logging: # false + + +## Workers ## + +# Disables sending of outbound federation transactions on the main process. +# Uncomment if using a federation sender worker. +# +#send_federation: false + +# It is possible to run multiple federation sender workers, in which case the +# work is balanced across them. +# +# This configuration must be shared between all federation sender workers, and if +# changed all federation sender workers must be stopped at the same time and then +# started, to ensure that all instances are running with the same config (otherwise +# events may be dropped). +# +#federation_sender_instances: +# - federation_sender1 + +# When using workers this should be a map from `worker_name` to the +# HTTP replication listener of the worker, if configured. +# +#instance_map: +# worker1: +# host: localhost +# port: 8034 + +# Experimental: When using workers you can define which workers should +# handle event persistence and typing notifications. Any worker +# specified here must also be in the `instance_map`. +# +#stream_writers: +# events: worker1 +# typing: worker1 + + +# Configuration for Redis when using workers. This *must* be enabled when +# using workers (unless using old style direct TCP configuration). +# +redis: + # Uncomment the below to enable Redis support. + # + #enabled: true + + # Optional host and port to use to connect to redis. Defaults to + # localhost and 6379 + # + #host: localhost + #port: 6379 + + # Optional password if configured on the Redis instance + # + #password: diff --git a/docs/synctl_workers.md b/docs/synctl_workers.md new file mode 100644 index 0000000000..8da4a31852 --- /dev/null +++ b/docs/synctl_workers.md @@ -0,0 +1,32 @@ +### Using synctl with workers + +If you want to use `synctl` to manage your synapse processes, you will need to +create an an additional configuration file for the main synapse process. That +configuration should look like this: + +```yaml +worker_app: synapse.app.homeserver +``` + +Additionally, each worker app must be configured with the name of a "pid file", +to which it will write its process ID when it starts. For example, for a +synchrotron, you might write: + +```yaml +worker_pid_file: /home/matrix/synapse/worker1.pid +``` + +Finally, to actually run your worker-based synapse, you must pass synctl the `-a` +commandline option to tell it to operate on all the worker configurations found +in the given directory, e.g.: + + synctl -a $CONFIG/workers start + +Currently one should always restart all workers when restarting or upgrading +synapse, unless you explicitly know it's safe not to. For instance, restarting +synapse without restarting all the synchrotrons may result in broken typing +notifications. + +To manipulate a specific worker, you pass the -w option to synctl: + + synctl -w $CONFIG/workers/worker1.yaml restart diff --git a/docs/workers.md b/docs/workers.md index f4cbbc0400..38bd758e57 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -16,69 +16,106 @@ workers only work with PostgreSQL-based Synapse deployments. SQLite should only be used for demo purposes and any admin considering workers should already be running PostgreSQL. -## Master/worker communication +## Main process/worker communication -The workers communicate with the master process via a Synapse-specific protocol -called 'replication' (analogous to MySQL- or Postgres-style database -replication) which feeds a stream of relevant data from the master to the -workers so they can be kept in sync with the master process and database state. +The processes communicate with each other via a Synapse-specific protocol called +'replication' (analogous to MySQL- or Postgres-style database replication) which +feeds streams of newly written data between processes so they can be kept in +sync with the database state. -Additionally, workers may make HTTP requests to the master, to send information -in the other direction. Typically this is used for operations which need to -wait for a reply - such as sending an event. +Additionally, processes may make HTTP requests to each other. Typically this is +used for operations which need to wait for a reply - such as sending an event. -## Configuration +As of Synapse v1.13.0, it is possible to configure Synapse to send replication +via a [Redis pub/sub channel](https://redis.io/topics/pubsub), and is now the +recommended way of configuring replication. This is an alternative to the old +direct TCP connections to the main process: rather than all the workers +connecting to the main process, all the workers and the main process connect to +Redis, which relays replication commands between processes. This can give a +significant cpu saving on the main process and will be a prerequisite for +upcoming performance improvements. + +(See the [Architectural diagram](#architectural-diagram) section at the end for +a visualisation of what this looks like) + + +## Setting up workers + +A Redis server is required to manage the communication between the processes. +(The older direct TCP connections are now deprecated.) The Redis server +should be installed following the normal procedure for your distribution (e.g. +`apt install redis-server` on Debian). It is safe to use an existing Redis +deployment if you have one. + +Once installed, check that Redis is running and accessible from the host running +Synapse, for example by executing `echo PING | nc -q1 localhost 6379` and seeing +a response of `+PONG`. + +The appropriate dependencies must also be installed for Synapse. If using a +virtualenv, these can be installed with: + +```sh +pip install matrix-synapse[redis] +``` + +Note that these dependencies are included when synapse is installed with `pip +install matrix-synapse[all]`. They are also included in the debian packages from +`matrix.org` and in the docker images at +https://hub.docker.com/r/matrixdotorg/synapse/. To make effective use of the workers, you will need to configure an HTTP reverse-proxy such as nginx or haproxy, which will direct incoming requests to -the correct worker, or to the main synapse instance. Note that this includes -requests made to the federation port. See [reverse_proxy.md](reverse_proxy.md) +the correct worker, or to the main synapse instance. See [reverse_proxy.md](reverse_proxy.md) for information on setting up a reverse proxy. -To enable workers, you need to add *two* replication listeners to the -main Synapse configuration file (`homeserver.yaml`). For example: +To enable workers you should create a configuration file for each worker +process. Each worker configuration file inherits the configuration of the shared +homeserver configuration file. You can then override configuration specific to +that worker, e.g. the HTTP listener that it provides (if any); logging +configuration; etc. You should minimise the number of overrides though to +maintain a usable config. + +Next you need to add both a HTTP replication listener and redis config to the +shared Synapse configuration file (`homeserver.yaml`). For example: ```yaml +# extend the existing `listeners` section. This defines the ports that the +# main process will listen on. listeners: - # The TCP replication port - - port: 9092 - bind_address: '127.0.0.1' - type: replication - # The HTTP replication port - port: 9093 bind_address: '127.0.0.1' type: http resources: - names: [replication] + +redis: + enabled: true ``` -Under **no circumstances** should these replication API listeners be exposed to -the public internet; they have no authentication and are unencrypted. +See the sample config for the full documentation of each option. -You should then create a set of configs for the various worker processes. Each -worker configuration file inherits the configuration of the main homeserver -configuration file. You can then override configuration specific to that -worker, e.g. the HTTP listener that it provides (if any); logging -configuration; etc. You should minimise the number of overrides though to -maintain a usable config. +Under **no circumstances** should the replication listener be exposed to the +public internet; it has no authentication and is unencrypted. In the config file for each worker, you must specify the type of worker -application (`worker_app`). The currently available worker applications are -listed below. You must also specify the replication endpoints that it should -talk to on the main synapse process. `worker_replication_host` should specify -the host of the main synapse, `worker_replication_port` should point to the TCP -replication listener port and `worker_replication_http_port` should point to -the HTTP replication port. +application (`worker_app`), and you should specify a unqiue name for the worker +(`worker_name`). The currently available worker applications are listed below. +You must also specify the HTTP replication endpoint that it should talk to on +the main synapse process. `worker_replication_host` should specify the host of +the main synapse and `worker_replication_http_port` should point to the HTTP +replication port. If the worker will handle HTTP requests then the +`worker_listeners` option should be set with a `http` listener, in the same way +as the `listeners` option in the shared config. For example: ```yaml -worker_app: synapse.app.synchrotron +worker_app: synapse.app.generic_worker +worker_name: worker1 -# The replication listener on the synapse to talk to. +# The replication listener on the main synapse process. worker_replication_host: 127.0.0.1 -worker_replication_port: 9092 worker_replication_http_port: 9093 worker_listeners: @@ -87,13 +124,14 @@ worker_listeners: resources: - names: - client + - federation -worker_log_config: /home/matrix/synapse/config/synchrotron_log_config.yaml +worker_log_config: /home/matrix/synapse/config/worker1_log_config.yaml ``` -...is a full configuration for a synchrotron worker instance, which will expose a -plain HTTP `/sync` endpoint on port 8083 separately from the `/sync` endpoint provided -by the main synapse. +...is a full configuration for a generic worker instance, which will expose a +plain HTTP endpoint on port 8083 separately serving various endpoints, e.g. +`/sync`, which are listed below. Obviously you should configure your reverse-proxy to route the relevant endpoints to the worker (`localhost:8083` in the above example). @@ -102,127 +140,24 @@ Finally, you need to start your worker processes. This can be done with either `synctl` or your distribution's preferred service manager such as `systemd`. We recommend the use of `systemd` where available: for information on setting up `systemd` to start synapse workers, see -[systemd-with-workers](systemd-with-workers). To use `synctl`, see below. +[systemd-with-workers](systemd-with-workers). To use `synctl`, see +[synctl_workers.md](synctl_workers.md). -### **Experimental** support for replication over redis - -As of Synapse v1.13.0, it is possible to configure Synapse to send replication -via a [Redis pub/sub channel](https://redis.io/topics/pubsub). This is an -alternative to direct TCP connections to the master: rather than all the -workers connecting to the master, all the workers and the master connect to -Redis, which relays replication commands between processes. This can give a -significant cpu saving on the master and will be a prerequisite for upcoming -performance improvements. - -Note that this support is currently experimental; you may experience lost -messages and similar problems! It is strongly recommended that admins setting -up workers for the first time use direct TCP replication as above. - -To configure Synapse to use Redis: - -1. Install Redis following the normal procedure for your distribution - for - example, on Debian, `apt install redis-server`. (It is safe to use an - existing Redis deployment if you have one: we use a pub/sub stream named - according to the `server_name` of your synapse server.) -2. Check Redis is running and accessible: you should be able to `echo PING | nc -q1 - localhost 6379` and get a response of `+PONG`. -3. Install the python prerequisites. If you installed synapse into a - virtualenv, this can be done with: - ```sh - pip install matrix-synapse[redis] - ``` - The debian packages from matrix.org already include the required - dependencies. -4. Add config to the shared configuration (`homeserver.yaml`): - ```yaml - redis: - enabled: true - ``` - Optional parameters which can go alongside `enabled` are `host`, `port`, - `password`. Normally none of these are required. -5. Restart master and all workers. - -Once redis replication is in use, `worker_replication_port` is redundant and -can be removed from the worker configuration files. Similarly, the -configuration for the `listener` for the TCP replication port can be removed -from the main configuration file. Note that the HTTP replication port is -still required. - -### Using synctl - -If you want to use `synctl` to manage your synapse processes, you will need to -create an an additional configuration file for the master synapse process. That -configuration should look like this: - -```yaml -worker_app: synapse.app.homeserver -``` - -Additionally, each worker app must be configured with the name of a "pid file", -to which it will write its process ID when it starts. For example, for a -synchrotron, you might write: - -```yaml -worker_pid_file: /home/matrix/synapse/synchrotron.pid -``` - -Finally, to actually run your worker-based synapse, you must pass synctl the `-a` -commandline option to tell it to operate on all the worker configurations found -in the given directory, e.g.: - - synctl -a $CONFIG/workers start - -Currently one should always restart all workers when restarting or upgrading -synapse, unless you explicitly know it's safe not to. For instance, restarting -synapse without restarting all the synchrotrons may result in broken typing -notifications. - -To manipulate a specific worker, you pass the -w option to synctl: - - synctl -w $CONFIG/workers/synchrotron.yaml restart ## Available worker applications -### `synapse.app.pusher` - -Handles sending push notifications to sygnal and email. Doesn't handle any -REST endpoints itself, but you should set `start_pushers: False` in the -shared configuration file to stop the main synapse sending these notifications. - -Note this worker cannot be load-balanced: only one instance should be active. - -### `synapse.app.synchrotron` +### `synapse.app.generic_worker` -The synchrotron handles `sync` requests from clients. In particular, it can -handle REST endpoints matching the following regular expressions: +This worker can handle API requests matching the following regular +expressions: + # Sync requests ^/_matrix/client/(v2_alpha|r0)/sync$ ^/_matrix/client/(api/v1|v2_alpha|r0)/events$ ^/_matrix/client/(api/v1|r0)/initialSync$ ^/_matrix/client/(api/v1|r0)/rooms/[^/]+/initialSync$ -The above endpoints should all be routed to the synchrotron worker by the -reverse-proxy configuration. - -It is possible to run multiple instances of the synchrotron to scale -horizontally. In this case the reverse-proxy should be configured to -load-balance across the instances, though it will be more efficient if all -requests from a particular user are routed to a single instance. Extracting -a userid from the access token is currently left as an exercise for the reader. - -### `synapse.app.appservice` - -Handles sending output traffic to Application Services. Doesn't handle any -REST endpoints itself, but you should set `notify_appservices: False` in the -shared configuration file to stop the main synapse sending these notifications. - -Note this worker cannot be load-balanced: only one instance should be active. - -### `synapse.app.federation_reader` - -Handles a subset of federation endpoints. In particular, it can handle REST -endpoints matching the following regular expressions: - + # Federation requests ^/_matrix/federation/v1/event/ ^/_matrix/federation/v1/state/ ^/_matrix/federation/v1/state_ids/ @@ -242,40 +177,145 @@ endpoints matching the following regular expressions: ^/_matrix/federation/v1/event_auth/ ^/_matrix/federation/v1/exchange_third_party_invite/ ^/_matrix/federation/v1/user/devices/ - ^/_matrix/federation/v1/send/ ^/_matrix/federation/v1/get_groups_publicised$ ^/_matrix/key/v2/query + # Inbound federation transaction request + ^/_matrix/federation/v1/send/ + + # Client API requests + ^/_matrix/client/(api/v1|r0|unstable)/publicRooms$ + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$ + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$ + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$ + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$ + ^/_matrix/client/(api/v1|r0|unstable)/account/3pid$ + ^/_matrix/client/(api/v1|r0|unstable)/keys/query$ + ^/_matrix/client/(api/v1|r0|unstable)/keys/changes$ + ^/_matrix/client/versions$ + ^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$ + ^/_matrix/client/(api/v1|r0|unstable)/joined_groups$ + ^/_matrix/client/(api/v1|r0|unstable)/publicised_groups$ + ^/_matrix/client/(api/v1|r0|unstable)/publicised_groups/ + + # Registration/login requests + ^/_matrix/client/(api/v1|r0|unstable)/login$ + ^/_matrix/client/(r0|unstable)/register$ + ^/_matrix/client/(r0|unstable)/auth/.*/fallback/web$ + + # Event sending requests + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state/ + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$ + ^/_matrix/client/(api/v1|r0|unstable)/join/ + ^/_matrix/client/(api/v1|r0|unstable)/profile/ + + Additionally, the following REST endpoints can be handled for GET requests: ^/_matrix/federation/v1/groups/ -The above endpoints should all be routed to the federation_reader worker by the -reverse-proxy configuration. +Pagination requests can also be handled, but all requests for a given +room must be routed to the same instance. Additionally, care must be taken to +ensure that the purge history admin API is not used while pagination requests +for the room are in flight: + + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/messages$ + +Note that a HTTP listener with `client` and `federation` resources must be +configured in the `worker_listeners` option in the worker config. + + +#### Load balancing + +It is possible to run multiple instances of this worker app, with incoming requests +being load-balanced between them by the reverse-proxy. However, different endpoints +have different characteristics and so admins +may wish to run multiple groups of workers handling different endpoints so that +load balancing can be done in different ways. + +For `/sync` and `/initialSync` requests it will be more efficient if all +requests from a particular user are routed to a single instance. Extracting a +user ID from the access token or `Authorization` header is currently left as an +exercise for the reader. Admins may additionally wish to separate out `/sync` +requests that have a `since` query parameter from those that don't (and +`/initialSync`), as requests that don't are known as "initial sync" that happens +when a user logs in on a new device and can be *very* resource intensive, so +isolating these requests will stop them from interfering with other users ongoing +syncs. + +Federation and client requests can be balanced via simple round robin. -The `^/_matrix/federation/v1/send/` endpoint must only be handled by a single -instance. +The inbound federation transaction request `^/_matrix/federation/v1/send/` +should be balanced by source IP so that transactions from the same remote server +go to the same process. -Note that `federation` must be added to the listener resources in the worker config: +Registration/login requests can be handled separately purely to help ensure that +unexpected load doesn't affect new logins and sign ups. + +Finally, event sending requests can be balanced by the room ID in the URI (or +the full URI, or even just round robin), the room ID is the path component after +`/rooms/`. If there is a large bridge connected that is sending or may send lots +of events, then a dedicated set of workers can be provisioned to limit the +effects of bursts of events from that bridge on events sent by normal users. + +#### Stream writers + +Additionally, there is *experimental* support for moving writing of specific +streams (such as events) off of the main process to a particular worker. (This +is only supported with Redis-based replication.) + +Currently support streams are `events` and `typing`. + +To enable this, the worker must have a HTTP replication listener configured, +have a `worker_name` and be listed in the `instance_map` config. For example to +move event persistence off to a dedicated worker, the shared configuration would +include: ```yaml -worker_app: synapse.app.federation_reader -... -worker_listeners: - - type: http - port: - resources: - - names: - - federation +instance_map: + event_persister1: + host: localhost + port: 8034 + +streams_writers: + events: event_persister1 ``` + +### `synapse.app.pusher` + +Handles sending push notifications to sygnal and email. Doesn't handle any +REST endpoints itself, but you should set `start_pushers: False` in the +shared configuration file to stop the main synapse sending push notifications. + +Note this worker cannot be load-balanced: only one instance should be active. + +### `synapse.app.appservice` + +Handles sending output traffic to Application Services. Doesn't handle any +REST endpoints itself, but you should set `notify_appservices: False` in the +shared configuration file to stop the main synapse sending appservice notifications. + +Note this worker cannot be load-balanced: only one instance should be active. + + ### `synapse.app.federation_sender` Handles sending federation traffic to other servers. Doesn't handle any REST endpoints itself, but you should set `send_federation: False` in the shared configuration file to stop the main synapse sending this traffic. -Note this worker cannot be load-balanced: only one instance should be active. +If running multiple federation senders then you must list each +instance in the `federation_sender_instances` option by their `worker_name`. +All instances must be stopped and started when adding or removing instances. +For example: + +```yaml +federation_sender_instances: + - federation_sender1 + - federation_sender2 +``` ### `synapse.app.media_repository` @@ -314,46 +354,6 @@ and you must configure a single instance to run the background tasks, e.g.: media_instance_running_background_jobs: "media-repository-1" ``` -### `synapse.app.client_reader` - -Handles client API endpoints. It can handle REST endpoints matching the -following regular expressions: - - ^/_matrix/client/(api/v1|r0|unstable)/publicRooms$ - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$ - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$ - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$ - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$ - ^/_matrix/client/(api/v1|r0|unstable)/login$ - ^/_matrix/client/(api/v1|r0|unstable)/account/3pid$ - ^/_matrix/client/(api/v1|r0|unstable)/keys/query$ - ^/_matrix/client/(api/v1|r0|unstable)/keys/changes$ - ^/_matrix/client/versions$ - ^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$ - ^/_matrix/client/(api/v1|r0|unstable)/joined_groups$ - ^/_matrix/client/(api/v1|r0|unstable)/publicised_groups$ - ^/_matrix/client/(api/v1|r0|unstable)/publicised_groups/ - -Additionally, the following REST endpoints can be handled for GET requests: - - ^/_matrix/client/(api/v1|r0|unstable)/pushrules/.*$ - ^/_matrix/client/(api/v1|r0|unstable)/groups/.*$ - ^/_matrix/client/(api/v1|r0|unstable)/user/[^/]*/account_data/ - ^/_matrix/client/(api/v1|r0|unstable)/user/[^/]*/rooms/[^/]*/account_data/ - -Additionally, the following REST endpoints can be handled, but all requests must -be routed to the same instance: - - ^/_matrix/client/(r0|unstable)/register$ - ^/_matrix/client/(r0|unstable)/auth/.*/fallback/web$ - -Pagination requests can also be handled, but all requests with the same path -room must be routed to the same instance. Additionally, care must be taken to -ensure that the purge history admin API is not used while pagination requests -for the room are in flight: - - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/messages$ - ### `synapse.app.user_dir` Handles searches in the user directory. It can handle REST endpoints matching @@ -388,15 +388,48 @@ file. For example: worker_main_http_uri: http://127.0.0.1:8008 -### `synapse.app.event_creator` +### Historical apps -Handles some event creation. It can handle REST endpoints matching: +*Note:* Historically there used to be more apps, however they have been +amalgamated into a single `synapse.app.generic_worker` app. The remaining apps +are ones that do specific processing unrelated to requests, e.g. the `pusher` +that handles sending out push notifications for new events. The intention is for +all these to be folded into the `generic_worker` app and to use config to define +which processes handle the various proccessing such as push notifications. - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state/ - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$ - ^/_matrix/client/(api/v1|r0|unstable)/join/ - ^/_matrix/client/(api/v1|r0|unstable)/profile/ -It will create events locally and then send them on to the main synapse -instance to be persisted and handled. +## Architectural diagram + +The following shows an example setup using Redis and a reverse proxy: + +``` + Clients & Federation + | + v + +-----------+ + | | + | Reverse | + | Proxy | + | | + +-----------+ + | | | + | | | HTTP requests + +-------------------+ | +-----------+ + | +---+ | + | | | + v v v ++--------------+ +--------------+ +--------------+ +--------------+ +| Main | | Generic | | Generic | | Event | +| Process | | Worker 1 | | Worker 2 | | Persister | ++--------------+ +--------------+ +--------------+ +--------------+ + ^ ^ | ^ | | ^ | ^ ^ + | | | | | | | | | | + | | | | | HTTP | | | | | + | +----------+<--|---|---------+ | | | | + | | +-------------|-->+----------+ | + | | | | + | | | | + v v v v +==================================================================== + Redis pub/sub channel +``` diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index ec0dbddb8c..5841454c9a 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -940,7 +940,7 @@ def start(config_options): config.server.update_user_directory = False if config.worker_app == "synapse.app.federation_sender": - if config.federation.send_federation: + if config.worker.send_federation: sys.stderr.write( "\nThe send_federation must be disabled in the main synapse process" "\nbefore they can be run in a separate worker." @@ -950,10 +950,10 @@ def start(config_options): sys.exit(1) # Force the pushers to start since they will be disabled in the main config - config.federation.send_federation = True + config.worker.send_federation = True else: # For other worker types we force this to off. - config.federation.send_federation = False + config.worker.send_federation = False synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts diff --git a/synapse/config/federation.py b/synapse/config/federation.py index 82ff9664de..2c77d8f85b 100644 --- a/synapse/config/federation.py +++ b/synapse/config/federation.py @@ -17,23 +17,13 @@ from typing import Optional from netaddr import IPSet -from ._base import Config, ConfigError, ShardedWorkerHandlingConfig +from ._base import Config, ConfigError class FederationConfig(Config): section = "federation" def read_config(self, config, **kwargs): - # Whether to send federation traffic out in this process. This only - # applies to some federation traffic, and so shouldn't be used to - # "disable" federation - self.send_federation = config.get("send_federation", True) - - federation_sender_instances = config.get("federation_sender_instances") or [] - self.federation_shard_config = ShardedWorkerHandlingConfig( - federation_sender_instances - ) - # FIXME: federation_domain_whitelist needs sytests self.federation_domain_whitelist = None # type: Optional[dict] federation_domain_whitelist = config.get("federation_domain_whitelist", None) diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 8e93d31394..556e291495 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -78,7 +78,6 @@ class HomeServerConfig(RootConfig): JWTConfig, PasswordConfig, EmailConfig, - WorkerConfig, PasswordAuthProviderConfig, PushConfig, SpamCheckerConfig, @@ -91,6 +90,7 @@ class HomeServerConfig(RootConfig): RoomDirectoryConfig, ThirdPartyRulesConfig, TracerConfig, + WorkerConfig, RedisConfig, FederationConfig, ] diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 49f6c32beb..dd775a97e8 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -214,7 +214,7 @@ def setup_logging( Set up the logging subsystem. Args: - config (LoggingConfig | synapse.config.workers.WorkerConfig): + config (LoggingConfig | synapse.config.worker.WorkerConfig): configuration data use_worker_options (bool): True to use the 'worker_log_config' option diff --git a/synapse/config/redis.py b/synapse/config/redis.py index d5d3ca1c9e..1373302335 100644 --- a/synapse/config/redis.py +++ b/synapse/config/redis.py @@ -21,7 +21,7 @@ class RedisConfig(Config): section = "redis" def read_config(self, config, **kwargs): - redis_config = config.get("redis", {}) + redis_config = config.get("redis") or {} self.redis_enabled = redis_config.get("enabled", False) if not self.redis_enabled: @@ -32,3 +32,24 @@ class RedisConfig(Config): self.redis_host = redis_config.get("host", "localhost") self.redis_port = redis_config.get("port", 6379) self.redis_password = redis_config.get("password") + + def generate_config_section(self, config_dir_path, server_name, **kwargs): + return """\ + # Configuration for Redis when using workers. This *must* be enabled when + # using workers (unless using old style direct TCP configuration). + # + redis: + # Uncomment the below to enable Redis support. + # + #enabled: true + + # Optional host and port to use to connect to redis. Defaults to + # localhost and 6379 + # + #host: localhost + #port: 6379 + + # Optional password if configured on the Redis instance + # + #password: + """ diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 2574cd3aa1..c784a71508 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -15,7 +15,7 @@ import attr -from ._base import Config, ConfigError +from ._base import Config, ConfigError, ShardedWorkerHandlingConfig from .server import ListenerConfig, parse_listener_def @@ -85,6 +85,16 @@ class WorkerConfig(Config): ) ) + # Whether to send federation traffic out in this process. This only + # applies to some federation traffic, and so shouldn't be used to + # "disable" federation + self.send_federation = config.get("send_federation", True) + + federation_sender_instances = config.get("federation_sender_instances") or [] + self.federation_shard_config = ShardedWorkerHandlingConfig( + federation_sender_instances + ) + # A map from instance name to host/port of their HTTP replication endpoint. instance_map = config.get("instance_map") or {} self.instance_map = { @@ -105,6 +115,43 @@ class WorkerConfig(Config): % (instance, stream) ) + def generate_config_section(self, config_dir_path, server_name, **kwargs): + return """\ + ## Workers ## + + # Disables sending of outbound federation transactions on the main process. + # Uncomment if using a federation sender worker. + # + #send_federation: false + + # It is possible to run multiple federation sender workers, in which case the + # work is balanced across them. + # + # This configuration must be shared between all federation sender workers, and if + # changed all federation sender workers must be stopped at the same time and then + # started, to ensure that all instances are running with the same config (otherwise + # events may be dropped). + # + #federation_sender_instances: + # - federation_sender1 + + # When using workers this should be a map from `worker_name` to the + # HTTP replication listener of the worker, if configured. + # + #instance_map: + # worker1: + # host: localhost + # port: 8034 + + # Experimental: When using workers you can define which workers should + # handle event persistence and typing notifications. Any worker + # specified here must also be in the `instance_map`. + # + #stream_writers: + # events: worker1 + # typing: worker1 + """ + def read_arguments(self, args): # We support a bunch of command line arguments that override options in # the config. A lot of these options have a worker_* prefix when running diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 4fc9ff92e5..2b0ab2dcbf 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -57,7 +57,7 @@ class FederationRemoteSendQueue(object): # We may have multiple federation sender instances, so we need to track # their positions separately. - self._sender_instances = hs.config.federation.federation_shard_config.instances + self._sender_instances = hs.config.worker.federation_shard_config.instances self._sender_positions = {} # Pending presence map user_id -> UserPresenceState diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index ba4ddd2370..6ae6522f87 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -70,7 +70,7 @@ class FederationSender(object): self._transaction_manager = TransactionManager(hs) self._instance_name = hs.get_instance_name() - self._federation_shard_config = hs.config.federation.federation_shard_config + self._federation_shard_config = hs.config.worker.federation_shard_config # map from destination to PerDestinationQueue self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue] diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 3436741783..dd150f89a6 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -75,7 +75,7 @@ class PerDestinationQueue(object): self._store = hs.get_datastore() self._transaction_manager = transaction_manager self._instance_name = hs.get_instance_name() - self._federation_shard_config = hs.config.federation.federation_shard_config + self._federation_shard_config = hs.config.worker.federation_shard_config self._should_send_on_this_instance = True if not self._federation_shard_config.should_handle( diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 5e32c7aa1e..10d39b3699 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -255,7 +255,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): self._instance_name = hs.get_instance_name() self._send_federation = hs.should_send_federation() - self._federation_shard_config = hs.config.federation.federation_shard_config + self._federation_shard_config = hs.config.worker.federation_shard_config # If we're a process that sends federation we may need to reset the # `federation_stream_position` table to match the current sharding -- cgit 1.5.1 From ad6190c9252aafd37cd8c229b70853bfc4ef0e64 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 17 Aug 2020 07:24:46 -0400 Subject: Convert stream database to async/await. (#8074) --- changelog.d/8074.misc | 1 + synapse/api/filtering.py | 2 +- synapse/api/presence.py | 69 ++++ synapse/federation/send_queue.py | 2 +- synapse/federation/sender/__init__.py | 2 +- synapse/federation/sender/per_destination_queue.py | 2 +- synapse/handlers/presence.py | 2 +- synapse/storage/databases/main/presence.py | 2 +- synapse/storage/databases/main/stream.py | 387 +++++++++++---------- synapse/storage/presence.py | 69 ---- tests/handlers/test_presence.py | 2 +- tests/storage/test_purge.py | 49 +-- 12 files changed, 293 insertions(+), 296 deletions(-) create mode 100644 changelog.d/8074.misc create mode 100644 synapse/api/presence.py delete mode 100644 synapse/storage/presence.py (limited to 'synapse/federation/send_queue.py') diff --git a/changelog.d/8074.misc b/changelog.d/8074.misc new file mode 100644 index 0000000000..dfe4c03171 --- /dev/null +++ b/changelog.d/8074.misc @@ -0,0 +1 @@ +Convert various parts of the codebase to async/await. diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 7393d6cb74..a8937d2595 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -23,7 +23,7 @@ from jsonschema import FormatChecker from synapse.api.constants import EventContentFields from synapse.api.errors import SynapseError -from synapse.storage.presence import UserPresenceState +from synapse.api.presence import UserPresenceState from synapse.types import RoomID, UserID FILTER_SCHEMA = { diff --git a/synapse/api/presence.py b/synapse/api/presence.py new file mode 100644 index 0000000000..18a462f0ee --- /dev/null +++ b/synapse/api/presence.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# Copyright 2014-2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import namedtuple + +from synapse.api.constants import PresenceState + + +class UserPresenceState( + namedtuple( + "UserPresenceState", + ( + "user_id", + "state", + "last_active_ts", + "last_federation_update_ts", + "last_user_sync_ts", + "status_msg", + "currently_active", + ), + ) +): + """Represents the current presence state of the user. + + user_id (str) + last_active (int): Time in msec that the user last interacted with server. + last_federation_update (int): Time in msec since either a) we sent a presence + update to other servers or b) we received a presence update, depending + on if is a local user or not. + last_user_sync (int): Time in msec that the user last *completed* a sync + (or event stream). + status_msg (str): User set status message. + """ + + def as_dict(self): + return dict(self._asdict()) + + @staticmethod + def from_dict(d): + return UserPresenceState(**d) + + def copy_and_replace(self, **kwargs): + return self._replace(**kwargs) + + @classmethod + def default(cls, user_id): + """Returns a default presence state. + """ + return cls( + user_id=user_id, + state=PresenceState.OFFLINE, + last_active_ts=0, + last_federation_update_ts=0, + last_user_sync_ts=0, + status_msg=None, + currently_active=False, + ) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 2b0ab2dcbf..4d65d4aeea 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -37,8 +37,8 @@ from sortedcontainers import SortedDict from twisted.internet import defer +from synapse.api.presence import UserPresenceState from synapse.metrics import LaterGauge -from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure from .units import Edu diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 94cc63001e..e53b6ac456 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -22,6 +22,7 @@ from twisted.internet import defer import synapse import synapse.metrics +from synapse.api.presence import UserPresenceState from synapse.events import EventBase from synapse.federation.sender.per_destination_queue import PerDestinationQueue from synapse.federation.sender.transaction_manager import TransactionManager @@ -39,7 +40,6 @@ from synapse.metrics import ( events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.storage.presence import UserPresenceState from synapse.types import ReadReceipt from synapse.util.metrics import Measure, measure_func diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 8cbc23d901..c09ffcaf4c 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -24,12 +24,12 @@ from synapse.api.errors import ( HttpResponseException, RequestSendFailed, ) +from synapse.api.presence import UserPresenceState from synapse.events import EventBase from synapse.federation.units import Edu from synapse.handlers.presence import format_user_presence_state from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.storage.presence import UserPresenceState from synapse.types import ReadReceipt from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 5387b3724f..24e1940ee5 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -33,13 +33,13 @@ from typing_extensions import ContextManager import synapse.metrics from synapse.api.constants import EventTypes, Membership, PresenceState from synapse.api.errors import SynapseError +from synapse.api.presence import UserPresenceState from synapse.logging.context import run_in_background from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.state import StateHandler from synapse.storage.databases.main import DataStore -from synapse.storage.presence import UserPresenceState from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches.descriptors import cached diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 9f691e5792..4e3ec02d14 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -15,8 +15,8 @@ from typing import List, Tuple +from synapse.api.presence import UserPresenceState from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause -from synapse.storage.presence import UserPresenceState from synapse.util.caches.descriptors import cached, cachedList from synapse.util.iterutils import batch_iter diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index aaf225894e..8ccfb8fc46 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -39,15 +39,17 @@ what sort order was used: import abc import logging from collections import namedtuple -from typing import Optional +from typing import Dict, Iterable, List, Optional, Tuple from twisted.internet import defer +from synapse.api.filtering import Filter +from synapse.events import EventBase from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool, make_in_list_sql_clause from synapse.storage.databases.main.events_worker import EventsWorkerStore -from synapse.storage.engines import PostgresEngine +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -68,8 +70,12 @@ _EventDictReturn = namedtuple( def generate_pagination_where_clause( - direction, column_names, from_token, to_token, engine -): + direction: str, + column_names: Tuple[str, str], + from_token: Optional[Tuple[int, int]], + to_token: Optional[Tuple[int, int]], + engine: BaseDatabaseEngine, +) -> str: """Creates an SQL expression to bound the columns by the pagination tokens. @@ -90,21 +96,19 @@ def generate_pagination_where_clause( token, but include those that match the to token. Args: - direction (str): Whether we're paginating backwards("b") or - forwards ("f"). - column_names (tuple[str, str]): The column names to bound. Must *not* - be user defined as these get inserted directly into the SQL - statement without escapes. - from_token (tuple[int, int]|None): The start point for the pagination. - This is an exclusive minimum bound if direction is "f", and an - inclusive maximum bound if direction is "b". - to_token (tuple[int, int]|None): The endpoint point for the pagination. - This is an inclusive maximum bound if direction is "f", and an - exclusive minimum bound if direction is "b". + direction: Whether we're paginating backwards("b") or forwards ("f"). + column_names: The column names to bound. Must *not* be user defined as + these get inserted directly into the SQL statement without escapes. + from_token: The start point for the pagination. This is an exclusive + minimum bound if direction is "f", and an inclusive maximum bound if + direction is "b". + to_token: The endpoint point for the pagination. This is an inclusive + maximum bound if direction is "f", and an exclusive minimum bound if + direction is "b". engine: The database engine to generate the clauses for Returns: - str: The sql expression + The sql expression """ assert direction in ("b", "f") @@ -132,7 +136,12 @@ def generate_pagination_where_clause( return " AND ".join(where_clause) -def _make_generic_sql_bound(bound, column_names, values, engine): +def _make_generic_sql_bound( + bound: str, + column_names: Tuple[str, str], + values: Tuple[Optional[int], int], + engine: BaseDatabaseEngine, +) -> str: """Create an SQL expression that bounds the given column names by the values, e.g. create the equivalent of `(1, 2) < (col1, col2)`. @@ -142,18 +151,18 @@ def _make_generic_sql_bound(bound, column_names, values, engine): out manually. Args: - bound (str): The comparison operator to use. One of ">", "<", ">=", + bound: The comparison operator to use. One of ">", "<", ">=", "<=", where the values are on the left and columns on the right. - names (tuple[str, str]): The column names. Must *not* be user defined + names: The column names. Must *not* be user defined as these get inserted directly into the SQL statement without escapes. - values (tuple[int|None, int]): The values to bound the columns by. If + values: The values to bound the columns by. If the first value is None then only creates a bound on the second column. engine: The database engine to generate the SQL for Returns: - str + The SQL statement """ assert bound in (">", "<", ">=", "<=") @@ -193,7 +202,7 @@ def _make_generic_sql_bound(bound, column_names, values, engine): ) -def filter_to_clause(event_filter): +def filter_to_clause(event_filter: Filter) -> Tuple[str, List[str]]: # NB: This may create SQL clauses that don't optimise well (and we don't # have indices on all possible clauses). E.g. it may create # "room_id == X AND room_id != X", which postgres doesn't optimise. @@ -291,34 +300,35 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def get_room_min_stream_ordering(self): raise NotImplementedError() - @defer.inlineCallbacks - def get_room_events_stream_for_rooms( - self, room_ids, from_key, to_key, limit=0, order="DESC" - ): + async def get_room_events_stream_for_rooms( + self, + room_ids: Iterable[str], + from_key: str, + to_key: str, + limit: int = 0, + order: str = "DESC", + ) -> Dict[str, Tuple[List[EventBase], str]]: """Get new room events in stream ordering since `from_key`. Args: - room_id (str) - from_key (str): Token from which no events are returned before - to_key (str): Token from which no events are returned after. (This + room_ids + from_key: Token from which no events are returned before + to_key: Token from which no events are returned after. (This is typically the current stream token) - limit (int): Maximum number of events to return - order (str): Either "DESC" or "ASC". Determines which events are + limit: Maximum number of events to return + order: Either "DESC" or "ASC". Determines which events are returned when the result is limited. If "DESC" then the most recent `limit` events are returned, otherwise returns the oldest `limit` events. Returns: - Deferred[dict[str,tuple[list[FrozenEvent], str]]] - A map from room id to a tuple containing: - - list of recent events in the room - - stream ordering key for the start of the chunk of events returned. + A map from room id to a tuple containing: + - list of recent events in the room + - stream ordering key for the start of the chunk of events returned. """ from_id = RoomStreamToken.parse_stream_token(from_key).stream - room_ids = yield self._events_stream_cache.get_entities_changed( - room_ids, from_id - ) + room_ids = self._events_stream_cache.get_entities_changed(room_ids, from_id) if not room_ids: return {} @@ -326,7 +336,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): results = {} room_ids = list(room_ids) for rm_ids in (room_ids[i : i + 20] for i in range(0, len(room_ids), 20)): - res = yield make_deferred_yieldable( + res = await make_deferred_yieldable( defer.gatherResults( [ run_in_background( @@ -361,28 +371,31 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if self._events_stream_cache.has_entity_changed(room_id, from_key) } - @defer.inlineCallbacks - def get_room_events_stream_for_room( - self, room_id, from_key, to_key, limit=0, order="DESC" - ): + async def get_room_events_stream_for_room( + self, + room_id: str, + from_key: str, + to_key: str, + limit: int = 0, + order: str = "DESC", + ) -> Tuple[List[EventBase], str]: """Get new room events in stream ordering since `from_key`. Args: - room_id (str) - from_key (str): Token from which no events are returned before - to_key (str): Token from which no events are returned after. (This + room_id + from_key: Token from which no events are returned before + to_key: Token from which no events are returned after. (This is typically the current stream token) - limit (int): Maximum number of events to return - order (str): Either "DESC" or "ASC". Determines which events are + limit: Maximum number of events to return + order: Either "DESC" or "ASC". Determines which events are returned when the result is limited. If "DESC" then the most recent `limit` events are returned, otherwise returns the oldest `limit` events. Returns: - Deferred[tuple[list[FrozenEvent], str]]: Returns the list of - events (in ascending order) and the token from the start of - the chunk of events returned. + The list of events (in ascending order) and the token from the start + of the chunk of events returned. """ if from_key == to_key: return [], from_key @@ -390,9 +403,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): from_id = RoomStreamToken.parse_stream_token(from_key).stream to_id = RoomStreamToken.parse_stream_token(to_key).stream - has_changed = yield self._events_stream_cache.has_entity_changed( - room_id, from_id - ) + has_changed = self._events_stream_cache.has_entity_changed(room_id, from_id) if not has_changed: return [], from_key @@ -410,9 +421,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] return rows - rows = yield self.db_pool.runInteraction("get_room_events_stream_for_room", f) + rows = await self.db_pool.runInteraction("get_room_events_stream_for_room", f) - ret = yield self.get_events_as_list( + ret = await self.get_events_as_list( [r.event_id for r in rows], get_prev_content=True ) @@ -430,8 +441,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return ret, key - @defer.inlineCallbacks - def get_membership_changes_for_user(self, user_id, from_key, to_key): + async def get_membership_changes_for_user(self, user_id, from_key, to_key): from_id = RoomStreamToken.parse_stream_token(from_key).stream to_id = RoomStreamToken.parse_stream_token(to_key).stream @@ -460,9 +470,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return rows - rows = yield self.db_pool.runInteraction("get_membership_changes_for_user", f) + rows = await self.db_pool.runInteraction("get_membership_changes_for_user", f) - ret = yield self.get_events_as_list( + ret = await self.get_events_as_list( [r.event_id for r in rows], get_prev_content=True ) @@ -470,27 +480,26 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return ret - @defer.inlineCallbacks - def get_recent_events_for_room(self, room_id, limit, end_token): + async def get_recent_events_for_room( + self, room_id: str, limit: int, end_token: str + ) -> Tuple[List[EventBase], str]: """Get the most recent events in the room in topological ordering. Args: - room_id (str) - limit (int) - end_token (str): The stream token representing now. + room_id + limit + end_token: The stream token representing now. Returns: - Deferred[tuple[list[FrozenEvent], str]]: Returns a list of - events and a token pointing to the start of the returned - events. - The events returned are in ascending order. + A list of events and a token pointing to the start of the returned + events. The events returned are in ascending order. """ - rows, token = yield self.get_recent_event_ids_for_room( + rows, token = await self.get_recent_event_ids_for_room( room_id, limit, end_token ) - events = yield self.get_events_as_list( + events = await self.get_events_as_list( [r.event_id for r in rows], get_prev_content=True ) @@ -498,20 +507,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return (events, token) - @defer.inlineCallbacks - def get_recent_event_ids_for_room(self, room_id, limit, end_token): + async def get_recent_event_ids_for_room( + self, room_id: str, limit: int, end_token: str + ) -> Tuple[List[_EventDictReturn], str]: """Get the most recent events in the room in topological ordering. Args: - room_id (str) - limit (int) - end_token (str): The stream token representing now. + room_id + limit + end_token: The stream token representing now. Returns: - Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of - _EventDictReturn and a token pointing to the start of the returned - events. - The events returned are in ascending order. + A list of _EventDictReturn and a token pointing to the start of the + returned events. The events returned are in ascending order. """ # Allow a zero limit here, and no-op. if limit == 0: @@ -519,7 +527,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): end_token = RoomStreamToken.parse(end_token) - rows, token = yield self.db_pool.runInteraction( + rows, token = await self.db_pool.runInteraction( "get_recent_event_ids_for_room", self._paginate_room_events_txn, room_id, @@ -532,12 +540,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return rows, token - def get_room_event_before_stream_ordering(self, room_id, stream_ordering): + def get_room_event_before_stream_ordering(self, room_id: str, stream_ordering: int): """Gets details of the first event in a room at or before a stream ordering Args: - room_id (str): - stream_ordering (int): + room_id: + stream_ordering: Returns: Deferred[(int, int, str)]: @@ -574,55 +582,56 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) return "t%d-%d" % (topo, token) - def get_stream_token_for_event(self, event_id): + async def get_stream_token_for_event(self, event_id: str) -> str: """The stream token for an event Args: - event_id(str): The id of the event to look up a stream token for. + event_id: The id of the event to look up a stream token for. Raises: StoreError if the event wasn't in the database. Returns: - A deferred "s%d" stream token. + A "s%d" stream token. """ - return self.db_pool.simple_select_one_onecol( + row = await self.db_pool.simple_select_one_onecol( table="events", keyvalues={"event_id": event_id}, retcol="stream_ordering" - ).addCallback(lambda row: "s%d" % (row,)) + ) + return "s%d" % (row,) - def get_topological_token_for_event(self, event_id): + async def get_topological_token_for_event(self, event_id: str) -> str: """The stream token for an event Args: - event_id(str): The id of the event to look up a stream token for. + event_id: The id of the event to look up a stream token for. Raises: StoreError if the event wasn't in the database. Returns: - A deferred "t%d-%d" topological token. + A "t%d-%d" topological token. """ - return self.db_pool.simple_select_one( + row = await self.db_pool.simple_select_one( table="events", keyvalues={"event_id": event_id}, retcols=("stream_ordering", "topological_ordering"), desc="get_topological_token_for_event", - ).addCallback( - lambda row: "t%d-%d" % (row["topological_ordering"], row["stream_ordering"]) ) + return "t%d-%d" % (row["topological_ordering"], row["stream_ordering"]) - def get_max_topological_token(self, room_id, stream_key): + async def get_max_topological_token(self, room_id: str, stream_key: int) -> int: """Get the max topological token in a room before the given stream ordering. Args: - room_id (str) - stream_key (int) + room_id + stream_key Returns: - Deferred[int] + The maximum topological token. """ sql = ( "SELECT coalesce(max(topological_ordering), 0) FROM events" " WHERE room_id = ? AND stream_ordering < ?" ) - return self.db_pool.execute( + row = await self.db_pool.execute( "get_max_topological_token", None, sql, room_id, stream_key - ).addCallback(lambda r: r[0][0] if r else 0) + ) + return row[0][0] if row else 0 def _get_max_topological_txn(self, txn, room_id): txn.execute( @@ -634,16 +643,18 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return rows[0][0] if rows else 0 @staticmethod - def _set_before_and_after(events, rows, topo_order=True): + def _set_before_and_after( + events: List[EventBase], rows: List[_EventDictReturn], topo_order: bool = True + ): """Inserts ordering information to events' internal metadata from the DB rows. Args: - events (list[FrozenEvent]) - rows (list[_EventDictReturn]) - topo_order (bool): Whether the events were ordered topologically - or by stream ordering. If true then all rows should have a non - null topological_ordering. + events + rows + topo_order: Whether the events were ordered topologically or by stream + ordering. If true then all rows should have a non null + topological_ordering. """ for event, row in zip(events, rows): stream = row.stream_ordering @@ -656,25 +667,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): internal.after = str(RoomStreamToken(topo, stream)) internal.order = (int(topo) if topo else 0, int(stream)) - @defer.inlineCallbacks - def get_events_around( - self, room_id, event_id, before_limit, after_limit, event_filter=None - ): + async def get_events_around( + self, + room_id: str, + event_id: str, + before_limit: int, + after_limit: int, + event_filter: Optional[Filter] = None, + ) -> dict: """Retrieve events and pagination tokens around a given event in a room. - - Args: - room_id (str) - event_id (str) - before_limit (int) - after_limit (int) - event_filter (Filter|None) - - Returns: - dict """ - results = yield self.db_pool.runInteraction( + results = await self.db_pool.runInteraction( "get_events_around", self._get_events_around_txn, room_id, @@ -684,11 +689,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): event_filter, ) - events_before = yield self.get_events_as_list( + events_before = await self.get_events_as_list( list(results["before"]["event_ids"]), get_prev_content=True ) - events_after = yield self.get_events_as_list( + events_after = await self.get_events_as_list( list(results["after"]["event_ids"]), get_prev_content=True ) @@ -700,17 +705,23 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): } def _get_events_around_txn( - self, txn, room_id, event_id, before_limit, after_limit, event_filter - ): + self, + txn, + room_id: str, + event_id: str, + before_limit: int, + after_limit: int, + event_filter: Optional[Filter], + ) -> dict: """Retrieves event_ids and pagination tokens around a given event in a room. Args: - room_id (str) - event_id (str) - before_limit (int) - after_limit (int) - event_filter (Filter|None) + room_id + event_id + before_limit + after_limit + event_filter Returns: dict @@ -758,22 +769,23 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "after": {"event_ids": events_after, "token": end_token}, } - @defer.inlineCallbacks - def get_all_new_events_stream(self, from_id, current_id, limit): + async def get_all_new_events_stream( + self, from_id: int, current_id: int, limit: int + ) -> Tuple[int, List[EventBase]]: """Get all new events Returns all events with from_id < stream_ordering <= current_id. Args: - from_id (int): the stream_ordering of the last event we processed - current_id (int): the stream_ordering of the most recently processed event - limit (int): the maximum number of events to return + from_id: the stream_ordering of the last event we processed + current_id: the stream_ordering of the most recently processed event + limit: the maximum number of events to return Returns: - Deferred[Tuple[int, list[FrozenEvent]]]: A tuple of (next_id, events), where - `next_id` is the next value to pass as `from_id` (it will either be the - stream_ordering of the last returned event, or, if fewer than `limit` events - were found, `current_id`. + A tuple of (next_id, events), where `next_id` is the next value to + pass as `from_id` (it will either be the stream_ordering of the + last returned event, or, if fewer than `limit` events were found, + the `current_id`). """ def get_all_new_events_stream_txn(txn): @@ -795,11 +807,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return upper_bound, [row[1] for row in rows] - upper_bound, event_ids = yield self.db_pool.runInteraction( + upper_bound, event_ids = await self.db_pool.runInteraction( "get_all_new_events_stream", get_all_new_events_stream_txn ) - events = yield self.get_events_as_list(event_ids) + events = await self.get_events_as_list(event_ids) return upper_bound, events @@ -817,21 +829,21 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): desc="get_federation_out_pos", ) - async def update_federation_out_pos(self, typ, stream_id): + async def update_federation_out_pos(self, typ: str, stream_id: int) -> None: if self._need_to_reset_federation_stream_positions: await self.db_pool.runInteraction( "_reset_federation_positions_txn", self._reset_federation_positions_txn ) self._need_to_reset_federation_stream_positions = False - return await self.db_pool.simple_update_one( + await self.db_pool.simple_update_one( table="federation_stream_position", keyvalues={"type": typ, "instance_name": self._instance_name}, updatevalues={"stream_id": stream_id}, desc="update_federation_out_pos", ) - def _reset_federation_positions_txn(self, txn): + def _reset_federation_positions_txn(self, txn) -> None: """Fiddles with the `federation_stream_position` table to make it match the configured federation sender instances during start up. """ @@ -892,39 +904,37 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): values={"stream_id": stream_id}, ) - def has_room_changed_since(self, room_id, stream_id): + def has_room_changed_since(self, room_id: str, stream_id: int) -> bool: return self._events_stream_cache.has_entity_changed(room_id, stream_id) def _paginate_room_events_txn( self, txn, - room_id, - from_token, - to_token=None, - direction="b", - limit=-1, - event_filter=None, - ): + room_id: str, + from_token: RoomStreamToken, + to_token: Optional[RoomStreamToken] = None, + direction: str = "b", + limit: int = -1, + event_filter: Optional[Filter] = None, + ) -> Tuple[List[_EventDictReturn], str]: """Returns list of events before or after a given token. Args: txn - room_id (str) - from_token (RoomStreamToken): The token used to stream from - to_token (RoomStreamToken|None): A token which if given limits the - results to only those before - direction(char): Either 'b' or 'f' to indicate whether we are - paginating forwards or backwards from `from_key`. - limit (int): The maximum number of events to return. - event_filter (Filter|None): If provided filters the events to + room_id + from_token: The token used to stream from + to_token: A token which if given limits the results to only those before + direction: Either 'b' or 'f' to indicate whether we are paginating + forwards or backwards from `from_key`. + limit: The maximum number of events to return. + event_filter: If provided filters the events to those that match the filter. Returns: - Deferred[tuple[list[_EventDictReturn], str]]: Returns the results - as a list of _EventDictReturn and a token that points to the end - of the result set. If no events are returned then the end of the - stream has been reached (i.e. there are no events between - `from_token` and `to_token`), or `limit` is zero. + A list of _EventDictReturn and a token that points to the end of the + result set. If no events are returned then the end of the stream has + been reached (i.e. there are no events between `from_token` and + `to_token`), or `limit` is zero. """ assert int(limit) >= 0 @@ -1008,35 +1018,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return rows, str(next_token) - @defer.inlineCallbacks - def paginate_room_events( - self, room_id, from_key, to_key=None, direction="b", limit=-1, event_filter=None - ): + async def paginate_room_events( + self, + room_id: str, + from_key: str, + to_key: Optional[str] = None, + direction: str = "b", + limit: int = -1, + event_filter: Optional[Filter] = None, + ) -> Tuple[List[EventBase], str]: """Returns list of events before or after a given token. Args: - room_id (str) - from_key (str): The token used to stream from - to_key (str|None): A token which if given limits the results to - only those before - direction(char): Either 'b' or 'f' to indicate whether we are - paginating forwards or backwards from `from_key`. - limit (int): The maximum number of events to return. - event_filter (Filter|None): If provided filters the events to - those that match the filter. + room_id + from_key: The token used to stream from + to_key: A token which if given limits the results to only those before + direction: Either 'b' or 'f' to indicate whether we are paginating + forwards or backwards from `from_key`. + limit: The maximum number of events to return. + event_filter: If provided filters the events to those that match the filter. Returns: - tuple[list[FrozenEvent], str]: Returns the results as a list of - events and a token that points to the end of the result set. If no - events are returned then the end of the stream has been reached - (i.e. there are no events between `from_key` and `to_key`). + The results as a list of events and a token that points to the end + of the result set. If no events are returned then the end of the + stream has been reached (i.e. there are no events between `from_key` + and `to_key`). """ from_key = RoomStreamToken.parse(from_key) if to_key: to_key = RoomStreamToken.parse(to_key) - rows, token = yield self.db_pool.runInteraction( + rows, token = await self.db_pool.runInteraction( "paginate_room_events", self._paginate_room_events_txn, room_id, @@ -1047,7 +1060,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): event_filter, ) - events = yield self.get_events_as_list( + events = await self.get_events_as_list( [r.event_id for r in rows], get_prev_content=True ) @@ -1057,8 +1070,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): class StreamStore(StreamWorkerStore): - def get_room_max_stream_ordering(self): + def get_room_max_stream_ordering(self) -> int: return self._stream_id_gen.get_current_token() - def get_room_min_stream_ordering(self): + def get_room_min_stream_ordering(self) -> int: return self._backfill_id_gen.get_current_token() diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py deleted file mode 100644 index 18a462f0ee..0000000000 --- a/synapse/storage/presence.py +++ /dev/null @@ -1,69 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014-2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from collections import namedtuple - -from synapse.api.constants import PresenceState - - -class UserPresenceState( - namedtuple( - "UserPresenceState", - ( - "user_id", - "state", - "last_active_ts", - "last_federation_update_ts", - "last_user_sync_ts", - "status_msg", - "currently_active", - ), - ) -): - """Represents the current presence state of the user. - - user_id (str) - last_active (int): Time in msec that the user last interacted with server. - last_federation_update (int): Time in msec since either a) we sent a presence - update to other servers or b) we received a presence update, depending - on if is a local user or not. - last_user_sync (int): Time in msec that the user last *completed* a sync - (or event stream). - status_msg (str): User set status message. - """ - - def as_dict(self): - return dict(self._asdict()) - - @staticmethod - def from_dict(d): - return UserPresenceState(**d) - - def copy_and_replace(self, **kwargs): - return self._replace(**kwargs) - - @classmethod - def default(cls, user_id): - """Returns a default presence state. - """ - return cls( - user_id=user_id, - state=PresenceState.OFFLINE, - last_active_ts=0, - last_federation_update_ts=0, - last_user_sync_ts=0, - status_msg=None, - currently_active=False, - ) diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 05ea40a7de..306dcfe944 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -19,6 +19,7 @@ from mock import Mock, call from signedjson.key import generate_signing_key from synapse.api.constants import EventTypes, Membership, PresenceState +from synapse.api.presence import UserPresenceState from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events.builder import EventBuilder from synapse.handlers.presence import ( @@ -32,7 +33,6 @@ from synapse.handlers.presence import ( handle_update, ) from synapse.rest.client.v1 import room -from synapse.storage.presence import UserPresenceState from synapse.types import UserID, get_domain_from_id from tests import unittest diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index a6012c973d..918387733b 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -15,6 +15,7 @@ from twisted.internet import defer +from synapse.api.errors import NotFoundError from synapse.rest.client.v1 import room from tests.unittest import HomeserverTestCase @@ -46,30 +47,19 @@ class PurgeTests(HomeserverTestCase): storage = self.hs.get_storage() # Get the topological token - event = store.get_topological_token_for_event(last["event_id"]) - self.pump() - event = self.successResultOf(event) - - # Purge everything before this topological token - purge = defer.ensureDeferred( - storage.purge_events.purge_history(self.room_id, event, True) + event = self.get_success( + store.get_topological_token_for_event(last["event_id"]) ) - self.pump() - self.assertEqual(self.successResultOf(purge), None) - # Try and get the events - get_first = store.get_event(first["event_id"]) - get_second = store.get_event(second["event_id"]) - get_third = store.get_event(third["event_id"]) - get_last = store.get_event(last["event_id"]) - self.pump() + # Purge everything before this topological token + self.get_success(storage.purge_events.purge_history(self.room_id, event, True)) # 1-3 should fail and last will succeed, meaning that 1-3 are deleted # and last is not. - self.failureResultOf(get_first) - self.failureResultOf(get_second) - self.failureResultOf(get_third) - self.successResultOf(get_last) + self.get_failure(store.get_event(first["event_id"]), NotFoundError) + self.get_failure(store.get_event(second["event_id"]), NotFoundError) + self.get_failure(store.get_event(third["event_id"]), NotFoundError) + self.get_success(store.get_event(last["event_id"])) def test_purge_wont_delete_extrems(self): """ @@ -84,9 +74,9 @@ class PurgeTests(HomeserverTestCase): storage = self.hs.get_datastore() # Set the topological token higher than it should be - event = storage.get_topological_token_for_event(last["event_id"]) - self.pump() - event = self.successResultOf(event) + event = self.get_success( + storage.get_topological_token_for_event(last["event_id"]) + ) event = "t{}-{}".format( *list(map(lambda x: x + 1, map(int, event[1:].split("-")))) ) @@ -98,14 +88,7 @@ class PurgeTests(HomeserverTestCase): self.assertIn("greater than forward", f.value.args[0]) # Try and get the events - get_first = storage.get_event(first["event_id"]) - get_second = storage.get_event(second["event_id"]) - get_third = storage.get_event(third["event_id"]) - get_last = storage.get_event(last["event_id"]) - self.pump() - - # Nothing is deleted. - self.successResultOf(get_first) - self.successResultOf(get_second) - self.successResultOf(get_third) - self.successResultOf(get_last) + self.get_success(storage.get_event(first["event_id"])) + self.get_success(storage.get_event(second["event_id"])) + self.get_success(storage.get_event(third["event_id"])) + self.get_success(storage.get_event(last["event_id"])) -- cgit 1.5.1 From c619253db80c8d1c606dc40756dd3c9e3a55a9fb Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 4 Sep 2020 06:54:56 -0400 Subject: Stop sub-classing object (#8249) --- changelog.d/8249.misc | 1 + contrib/cmdclient/http.py | 6 ++--- contrib/experiments/cursesio.py | 2 +- contrib/experiments/test_messaging.py | 4 ++-- scripts-dev/hash_history.py | 2 +- synapse/api/auth.py | 2 +- synapse/api/auth_blocking.py | 2 +- synapse/api/constants.py | 26 +++++++++++----------- synapse/api/errors.py | 2 +- synapse/api/filtering.py | 6 ++--- synapse/api/ratelimiting.py | 2 +- synapse/api/room_versions.py | 10 ++++----- synapse/api/urls.py | 2 +- synapse/app/_base.py | 4 ++-- synapse/app/generic_worker.py | 2 +- synapse/appservice/__init__.py | 6 ++--- synapse/appservice/scheduler.py | 8 +++---- synapse/config/_base.py | 4 ++-- synapse/config/cache.py | 2 +- synapse/config/key.py | 2 +- synapse/config/metrics.py | 2 +- synapse/config/ratelimiting.py | 4 ++-- synapse/config/room.py | 2 +- synapse/config/room_directory.py | 2 +- synapse/config/server.py | 2 +- synapse/crypto/context_factory.py | 8 +++---- synapse/crypto/keyring.py | 8 +++---- synapse/events/__init__.py | 2 +- synapse/events/builder.py | 4 ++-- synapse/events/spamcheck.py | 2 +- synapse/events/third_party_rules.py | 2 +- synapse/events/utils.py | 2 +- synapse/events/validator.py | 2 +- synapse/federation/federation_base.py | 2 +- synapse/federation/federation_server.py | 2 +- synapse/federation/persistence.py | 2 +- synapse/federation/send_queue.py | 4 ++-- synapse/federation/sender/__init__.py | 2 +- synapse/federation/sender/per_destination_queue.py | 2 +- synapse/federation/sender/transaction_manager.py | 2 +- synapse/federation/transport/client.py | 2 +- synapse/federation/transport/server.py | 4 ++-- synapse/groups/attestations.py | 4 ++-- synapse/groups/groups_server.py | 2 +- synapse/handlers/__init__.py | 2 +- synapse/handlers/_base.py | 2 +- synapse/handlers/account_data.py | 2 +- synapse/handlers/account_validity.py | 2 +- synapse/handlers/acme.py | 2 +- synapse/handlers/acme_issuing_service.py | 2 +- synapse/handlers/admin.py | 2 +- synapse/handlers/appservice.py | 2 +- synapse/handlers/auth.py | 2 +- synapse/handlers/device.py | 2 +- synapse/handlers/devicemessage.py | 2 +- synapse/handlers/e2e_keys.py | 4 ++-- synapse/handlers/e2e_room_keys.py | 2 +- synapse/handlers/groups_local.py | 2 +- synapse/handlers/message.py | 4 ++-- synapse/handlers/pagination.py | 4 ++-- synapse/handlers/password_policy.py | 2 +- synapse/handlers/presence.py | 2 +- synapse/handlers/receipts.py | 2 +- synapse/handlers/room.py | 6 ++--- synapse/handlers/room_member.py | 2 +- synapse/handlers/saml_handler.py | 4 ++-- synapse/handlers/state_deltas.py | 2 +- synapse/handlers/sync.py | 4 ++-- synapse/handlers/typing.py | 2 +- synapse/http/client.py | 8 +++---- synapse/http/connectproxyclient.py | 2 +- synapse/http/federation/matrix_federation_agent.py | 6 ++--- synapse/http/federation/srv_resolver.py | 4 ++-- synapse/http/federation/well_known_resolver.py | 4 ++-- synapse/http/matrixfederationclient.py | 6 ++--- synapse/http/request_metrics.py | 2 +- synapse/http/server.py | 2 +- synapse/http/servlet.py | 2 +- synapse/logging/_structured.py | 6 ++--- synapse/logging/_terse_json.py | 4 ++-- synapse/logging/context.py | 6 ++--- synapse/logging/opentracing.py | 2 +- synapse/metrics/__init__.py | 16 ++++++------- synapse/metrics/background_process_metrics.py | 4 ++-- synapse/module_api/__init__.py | 2 +- synapse/notifier.py | 6 ++--- synapse/push/action_generator.py | 2 +- synapse/push/bulk_push_rule_evaluator.py | 4 ++-- synapse/push/emailpusher.py | 2 +- synapse/push/httppusher.py | 2 +- synapse/push/mailer.py | 2 +- synapse/push/push_rule_evaluator.py | 2 +- synapse/push/pusher.py | 2 +- synapse/replication/http/_base.py | 2 +- .../slave/storage/_slaved_id_tracker.py | 2 +- synapse/replication/tcp/protocol.py | 2 +- synapse/replication/tcp/resource.py | 2 +- synapse/replication/tcp/streams/_base.py | 2 +- synapse/replication/tcp/streams/events.py | 4 ++-- synapse/rest/client/transactions.py | 2 +- synapse/rest/client/v2_alpha/register.py | 2 +- synapse/rest/media/v1/_base.py | 4 ++-- synapse/rest/media/v1/filepath.py | 2 +- synapse/rest/media/v1/media_repository.py | 2 +- synapse/rest/media/v1/media_storage.py | 2 +- synapse/rest/media/v1/thumbnailer.py | 2 +- synapse/rest/well_known.py | 2 +- synapse/secrets.py | 2 +- synapse/server_notices/consent_server_notices.py | 2 +- .../resource_limits_server_notices.py | 2 +- synapse/server_notices/server_notices_manager.py | 2 +- synapse/server_notices/server_notices_sender.py | 2 +- .../server_notices/worker_server_notices_sender.py | 2 +- synapse/spam_checker_api/__init__.py | 2 +- synapse/state/__init__.py | 8 +++---- synapse/storage/__init__.py | 2 +- synapse/storage/background_updates.py | 4 ++-- synapse/storage/database.py | 4 ++-- synapse/storage/databases/__init__.py | 2 +- synapse/storage/databases/main/roommember.py | 2 +- synapse/storage/keys.py | 2 +- synapse/storage/persist_events.py | 4 ++-- synapse/storage/prepare_database.py | 2 +- synapse/storage/purge_events.py | 2 +- synapse/storage/relations.py | 6 ++--- synapse/storage/state.py | 4 ++-- synapse/storage/util/id_generators.py | 4 ++-- synapse/streams/config.py | 4 ++-- synapse/streams/events.py | 2 +- synapse/types.py | 2 +- synapse/util/__init__.py | 2 +- synapse/util/async_helpers.py | 8 +++---- synapse/util/caches/__init__.py | 2 +- synapse/util/caches/descriptors.py | 8 +++---- synapse/util/caches/dictionary_cache.py | 4 ++-- synapse/util/caches/expiringcache.py | 4 ++-- synapse/util/caches/lrucache.py | 4 ++-- synapse/util/caches/response_cache.py | 2 +- synapse/util/caches/treecache.py | 4 ++-- synapse/util/caches/ttlcache.py | 4 ++-- synapse/util/distributor.py | 4 ++-- synapse/util/file_consumer.py | 2 +- synapse/util/jsonobject.py | 2 +- synapse/util/metrics.py | 2 +- synapse/util/ratelimitutils.py | 4 ++-- synapse/util/retryutils.py | 2 +- synapse/util/wheel_timer.py | 4 ++-- tests/api/test_auth.py | 2 +- tests/crypto/test_keyring.py | 2 +- tests/federation/transport/test_server.py | 2 +- tests/handlers/test_auth.py | 2 +- tests/handlers/test_profile.py | 2 +- tests/http/__init__.py | 2 +- .../federation/test_matrix_federation_agent.py | 2 +- tests/logging/test_structured.py | 4 ++-- tests/push/test_email.py | 2 +- tests/rest/client/third_party_rules.py | 2 +- tests/rest/client/v1/utils.py | 2 +- tests/rest/media/v1/test_url_preview.py | 6 ++--- tests/server.py | 6 ++--- tests/state/test_v2.py | 4 ++-- tests/storage/test__base.py | 18 +++++++-------- tests/test_state.py | 4 ++-- tests/test_visibility.py | 2 +- tests/unittest.py | 2 +- tests/util/caches/test_descriptors.py | 20 ++++++++--------- tests/util/test_file_consumer.py | 4 ++-- tests/utils.py | 6 ++--- 168 files changed, 293 insertions(+), 292 deletions(-) create mode 100644 changelog.d/8249.misc (limited to 'synapse/federation/send_queue.py') diff --git a/changelog.d/8249.misc b/changelog.d/8249.misc new file mode 100644 index 0000000000..6a42e8a4e6 --- /dev/null +++ b/changelog.d/8249.misc @@ -0,0 +1 @@ +Stop sub-classing from object. diff --git a/contrib/cmdclient/http.py b/contrib/cmdclient/http.py index e2534ee584..cd3260b27d 100644 --- a/contrib/cmdclient/http.py +++ b/contrib/cmdclient/http.py @@ -24,7 +24,7 @@ from twisted.web.client import Agent, readBody from twisted.web.http_headers import Headers -class HttpClient(object): +class HttpClient: """ Interface for talking json over http """ @@ -169,7 +169,7 @@ class TwistedHttpClient(HttpClient): return d -class _RawProducer(object): +class _RawProducer: def __init__(self, data): self.data = data self.body = data @@ -186,7 +186,7 @@ class _RawProducer(object): pass -class _JsonProducer(object): +class _JsonProducer: """ Used by the twisted http client to create the HTTP body from json """ diff --git a/contrib/experiments/cursesio.py b/contrib/experiments/cursesio.py index ffefe3bb39..15a22c3a0e 100644 --- a/contrib/experiments/cursesio.py +++ b/contrib/experiments/cursesio.py @@ -141,7 +141,7 @@ class CursesStdIO: curses.endwin() -class Callback(object): +class Callback: def __init__(self, stdio): self.stdio = stdio diff --git a/contrib/experiments/test_messaging.py b/contrib/experiments/test_messaging.py index a84ec4ecae..d4c35ff2fc 100644 --- a/contrib/experiments/test_messaging.py +++ b/contrib/experiments/test_messaging.py @@ -55,7 +55,7 @@ def excpetion_errback(failure): logging.exception(failure) -class InputOutput(object): +class InputOutput: """ This is responsible for basic I/O so that a user can interact with the example app. """ @@ -132,7 +132,7 @@ class IOLoggerHandler(logging.Handler): self.io.print_log(msg) -class Room(object): +class Room: """ Used to store (in memory) the current membership state of a room, and which home servers we should send PDUs associated with the room to. """ diff --git a/scripts-dev/hash_history.py b/scripts-dev/hash_history.py index bf3862a386..89acb52e6a 100644 --- a/scripts-dev/hash_history.py +++ b/scripts-dev/hash_history.py @@ -15,7 +15,7 @@ from synapse.storage.pdu import PduStore from synapse.storage.signatures import SignatureStore -class Store(object): +class Store: _get_pdu_tuples = PduStore.__dict__["_get_pdu_tuples"] _get_pdu_content_hashes_txn = SignatureStore.__dict__["_get_pdu_content_hashes_txn"] _get_prev_pdu_hashes_txn = SignatureStore.__dict__["_get_prev_pdu_hashes_txn"] diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 7aab764360..75388643ee 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -58,7 +58,7 @@ class _InvalidMacaroonException(Exception): pass -class Auth(object): +class Auth: """ FIXME: This class contains a mix of functions for authenticating users of our client-server API and authenticating events added to room graphs. diff --git a/synapse/api/auth_blocking.py b/synapse/api/auth_blocking.py index 49093bf181..d8fafd7cb8 100644 --- a/synapse/api/auth_blocking.py +++ b/synapse/api/auth_blocking.py @@ -22,7 +22,7 @@ from synapse.config.server import is_threepid_reserved logger = logging.getLogger(__name__) -class AuthBlocking(object): +class AuthBlocking: def __init__(self, hs): self.store = hs.get_datastore() diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 6a6d32c302..46013cde15 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -28,7 +28,7 @@ MAX_ALIAS_LENGTH = 255 MAX_USERID_LENGTH = 255 -class Membership(object): +class Membership: """Represents the membership states of a user in a room.""" @@ -40,7 +40,7 @@ class Membership(object): LIST = (INVITE, JOIN, KNOCK, LEAVE, BAN) -class PresenceState(object): +class PresenceState: """Represents the presence state of a user.""" OFFLINE = "offline" @@ -48,14 +48,14 @@ class PresenceState(object): ONLINE = "online" -class JoinRules(object): +class JoinRules: PUBLIC = "public" KNOCK = "knock" INVITE = "invite" PRIVATE = "private" -class LoginType(object): +class LoginType: PASSWORD = "m.login.password" EMAIL_IDENTITY = "m.login.email.identity" MSISDN = "m.login.msisdn" @@ -65,7 +65,7 @@ class LoginType(object): DUMMY = "m.login.dummy" -class EventTypes(object): +class EventTypes: Member = "m.room.member" Create = "m.room.create" Tombstone = "m.room.tombstone" @@ -96,17 +96,17 @@ class EventTypes(object): Presence = "m.presence" -class RejectedReason(object): +class RejectedReason: AUTH_ERROR = "auth_error" -class RoomCreationPreset(object): +class RoomCreationPreset: PRIVATE_CHAT = "private_chat" PUBLIC_CHAT = "public_chat" TRUSTED_PRIVATE_CHAT = "trusted_private_chat" -class ThirdPartyEntityKind(object): +class ThirdPartyEntityKind: USER = "user" LOCATION = "location" @@ -115,7 +115,7 @@ ServerNoticeMsgType = "m.server_notice" ServerNoticeLimitReached = "m.server_notice.usage_limit_reached" -class UserTypes(object): +class UserTypes: """Allows for user type specific behaviour. With the benefit of hindsight 'admin' and 'guest' users should also be UserTypes. Normal users are type None """ @@ -125,7 +125,7 @@ class UserTypes(object): ALL_USER_TYPES = (SUPPORT, BOT) -class RelationTypes(object): +class RelationTypes: """The types of relations known to this server. """ @@ -134,14 +134,14 @@ class RelationTypes(object): REFERENCE = "m.reference" -class LimitBlockingTypes(object): +class LimitBlockingTypes: """Reasons that a server may be blocked""" MONTHLY_ACTIVE_USER = "monthly_active_user" HS_DISABLED = "hs_disabled" -class EventContentFields(object): +class EventContentFields: """Fields found in events' content, regardless of type.""" # Labels for the event, cf https://github.com/matrix-org/matrix-doc/pull/2326 @@ -152,6 +152,6 @@ class EventContentFields(object): SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after" -class RoomEncryptionAlgorithms(object): +class RoomEncryptionAlgorithms: MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2" DEFAULT = MEGOLM_V1_AES_SHA2 diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 4888c0ec4d..94a9e58eae 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -31,7 +31,7 @@ if typing.TYPE_CHECKING: logger = logging.getLogger(__name__) -class Codes(object): +class Codes: UNRECOGNIZED = "M_UNRECOGNIZED" UNAUTHORIZED = "M_UNAUTHORIZED" FORBIDDEN = "M_FORBIDDEN" diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index a8937d2595..2a2c9e6f13 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -130,7 +130,7 @@ def matrix_user_id_validator(user_id_str): return UserID.from_string(user_id_str) -class Filtering(object): +class Filtering: def __init__(self, hs): super(Filtering, self).__init__() self.store = hs.get_datastore() @@ -168,7 +168,7 @@ class Filtering(object): raise SynapseError(400, str(e)) -class FilterCollection(object): +class FilterCollection: def __init__(self, filter_json): self._filter_json = filter_json @@ -249,7 +249,7 @@ class FilterCollection(object): ) -class Filter(object): +class Filter: def __init__(self, filter_json): self.filter_json = filter_json diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py index e62ae50ac2..5d9d5a228f 100644 --- a/synapse/api/ratelimiting.py +++ b/synapse/api/ratelimiting.py @@ -21,7 +21,7 @@ from synapse.types import Requester from synapse.util import Clock -class Ratelimiter(object): +class Ratelimiter: """ Ratelimit actions marked by arbitrary keys. diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py index d7baf2bc39..f3ecbf36b6 100644 --- a/synapse/api/room_versions.py +++ b/synapse/api/room_versions.py @@ -18,7 +18,7 @@ from typing import Dict import attr -class EventFormatVersions(object): +class EventFormatVersions: """This is an internal enum for tracking the version of the event format, independently from the room version. """ @@ -35,20 +35,20 @@ KNOWN_EVENT_FORMAT_VERSIONS = { } -class StateResolutionVersions(object): +class StateResolutionVersions: """Enum to identify the state resolution algorithms""" V1 = 1 # room v1 state res V2 = 2 # MSC1442 state res: room v2 and later -class RoomDisposition(object): +class RoomDisposition: STABLE = "stable" UNSTABLE = "unstable" @attr.s(slots=True, frozen=True) -class RoomVersion(object): +class RoomVersion: """An object which describes the unique attributes of a room version.""" identifier = attr.ib() # str; the identifier for this version @@ -69,7 +69,7 @@ class RoomVersion(object): limit_notifications_power_levels = attr.ib(type=bool) -class RoomVersions(object): +class RoomVersions: V1 = RoomVersion( "1", RoomDisposition.STABLE, diff --git a/synapse/api/urls.py b/synapse/api/urls.py index bd03ebca5a..bbfccf955e 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -33,7 +33,7 @@ MEDIA_PREFIX = "/_matrix/media/r0" LEGACY_MEDIA_PREFIX = "/_matrix/media/v1" -class ConsentURIBuilder(object): +class ConsentURIBuilder: def __init__(self, hs_config): """ Args: diff --git a/synapse/app/_base.py b/synapse/app/_base.py index a43dc5b2c9..fb476ddaf5 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -349,7 +349,7 @@ def install_dns_limiter(reactor, max_dns_requests_in_flight=100): reactor.installNameResolver(new_resolver) -class _LimitedHostnameResolver(object): +class _LimitedHostnameResolver: """Wraps a IHostnameResolver, limiting the number of in-flight DNS lookups. """ @@ -409,7 +409,7 @@ class _LimitedHostnameResolver(object): yield deferred -class _DeferredResolutionReceiver(object): +class _DeferredResolutionReceiver: """Wraps a IResolutionReceiver and simply resolves the given deferred when resolution is complete """ diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 739b013d4c..f985810e88 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -745,7 +745,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): self.send_handler.wake_destination(server) -class FederationSenderHandler(object): +class FederationSenderHandler: """Processes the fedration replication stream This class is only instantiate on the worker responsible for sending outbound diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 69a7182ef4..13ec1f71a6 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -27,12 +27,12 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class ApplicationServiceState(object): +class ApplicationServiceState: DOWN = "down" UP = "up" -class AppServiceTransaction(object): +class AppServiceTransaction: """Represents an application service transaction.""" def __init__(self, service, id, events): @@ -64,7 +64,7 @@ class AppServiceTransaction(object): await store.complete_appservice_txn(service=self.service, txn_id=self.id) -class ApplicationService(object): +class ApplicationService: """Defines an application service. This definition is mostly what is provided to the /register AS API. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index d5204b1314..8eb8c6f51c 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -57,7 +57,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process logger = logging.getLogger(__name__) -class ApplicationServiceScheduler(object): +class ApplicationServiceScheduler: """ Public facing API for this module. Does the required DI to tie the components together. This also serves as the "event_pool", which in this case is a simple array. @@ -86,7 +86,7 @@ class ApplicationServiceScheduler(object): self.queuer.enqueue(service, event) -class _ServiceQueuer(object): +class _ServiceQueuer: """Queue of events waiting to be sent to appservices. Groups events into transactions per-appservice, and sends them on to the @@ -133,7 +133,7 @@ class _ServiceQueuer(object): self.requests_in_flight.discard(service.id) -class _TransactionController(object): +class _TransactionController: """Transaction manager. Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer @@ -209,7 +209,7 @@ class _TransactionController(object): return state == ApplicationServiceState.UP or state is None -class _Recoverer(object): +class _Recoverer: """Manages retries and backoff for a DOWN appservice. We have one of these for each appservice which is currently considered DOWN. diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 1417487427..ad5ab6ad62 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -88,7 +88,7 @@ def path_exists(file_path): return False -class Config(object): +class Config: """ A configuration section, containing configuration keys and values. @@ -283,7 +283,7 @@ def _create_mxc_to_http_filter(public_baseurl: str) -> Callable: return mxc_to_http_filter -class RootConfig(object): +class RootConfig: """ Holder of an application's configuration. diff --git a/synapse/config/cache.py b/synapse/config/cache.py index aff5b21ab2..8e03f14005 100644 --- a/synapse/config/cache.py +++ b/synapse/config/cache.py @@ -33,7 +33,7 @@ _DEFAULT_FACTOR_SIZE = 0.5 _DEFAULT_EVENT_CACHE_SIZE = "10K" -class CacheProperties(object): +class CacheProperties: def __init__(self): # The default factor size for all caches self.default_factor_size = float( diff --git a/synapse/config/key.py b/synapse/config/key.py index b529ea5da0..de964dff13 100644 --- a/synapse/config/key.py +++ b/synapse/config/key.py @@ -82,7 +82,7 @@ logger = logging.getLogger(__name__) @attr.s -class TrustedKeyServer(object): +class TrustedKeyServer: # string: name of the server. server_name = attr.ib() diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py index 6aad0d37c0..dfd27e1523 100644 --- a/synapse/config/metrics.py +++ b/synapse/config/metrics.py @@ -22,7 +22,7 @@ from ._base import Config, ConfigError @attr.s -class MetricsFlags(object): +class MetricsFlags: known_servers = attr.ib(default=False, validator=attr.validators.instance_of(bool)) @classmethod diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index b2c78ac40c..14b8836197 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -17,7 +17,7 @@ from typing import Dict from ._base import Config -class RateLimitConfig(object): +class RateLimitConfig: def __init__( self, config: Dict[str, float], @@ -27,7 +27,7 @@ class RateLimitConfig(object): self.burst_count = config.get("burst_count", defaults["burst_count"]) -class FederationRateLimitConfig(object): +class FederationRateLimitConfig: _items_and_default = { "window_size": 1000, "sleep_limit": 10, diff --git a/synapse/config/room.py b/synapse/config/room.py index 52cf0b62fc..692d7a1936 100644 --- a/synapse/config/room.py +++ b/synapse/config/room.py @@ -22,7 +22,7 @@ from ._base import Config, ConfigError logger = logging.Logger(__name__) -class RoomDefaultEncryptionTypes(object): +class RoomDefaultEncryptionTypes: """Possible values for the encryption_enabled_by_default_for_room_type config option""" ALL = "all" diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py index 7ac7699676..6de1f9d103 100644 --- a/synapse/config/room_directory.py +++ b/synapse/config/room_directory.py @@ -149,7 +149,7 @@ class RoomDirectoryConfig(Config): return False -class _RoomDirectoryRule(object): +class _RoomDirectoryRule: """Helper class to test whether a room directory action is allowed, like creating an alias or publishing a room. """ diff --git a/synapse/config/server.py b/synapse/config/server.py index 526a90b26a..e85c6a0840 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -424,7 +424,7 @@ class ServerConfig(Config): self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None)) @attr.s - class LimitRemoteRoomsConfig(object): + class LimitRemoteRoomsConfig: enabled = attr.ib( validator=attr.validators.instance_of(bool), default=False ) diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py index 777c0f00b1..2b03f5ac76 100644 --- a/synapse/crypto/context_factory.py +++ b/synapse/crypto/context_factory.py @@ -83,7 +83,7 @@ class ServerContextFactory(ContextFactory): @implementer(IPolicyForHTTPS) -class FederationPolicyForHTTPS(object): +class FederationPolicyForHTTPS: """Factory for Twisted SSLClientConnectionCreators that are used to make connections to remote servers for federation. @@ -152,7 +152,7 @@ class FederationPolicyForHTTPS(object): @implementer(IPolicyForHTTPS) -class RegularPolicyForHTTPS(object): +class RegularPolicyForHTTPS: """Factory for Twisted SSLClientConnectionCreators that are used to make connections to remote servers, for other than federation. @@ -189,7 +189,7 @@ def _context_info_cb(ssl_connection, where, ret): @implementer(IOpenSSLClientConnectionCreator) -class SSLClientConnectionCreator(object): +class SSLClientConnectionCreator: """Creates openssl connection objects for client connections. Replaces twisted.internet.ssl.ClientTLSOptions @@ -214,7 +214,7 @@ class SSLClientConnectionCreator(object): return connection -class ConnectionVerifier(object): +class ConnectionVerifier: """Set the SNI, and do cert verification This is a thing which is attached to the TLSMemoryBIOProtocol, and is called by diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 81c4b430b2..32c31b1cd1 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -57,7 +57,7 @@ logger = logging.getLogger(__name__) @attr.s(slots=True, cmp=False) -class VerifyJsonRequest(object): +class VerifyJsonRequest: """ A request to verify a JSON object. @@ -96,7 +96,7 @@ class KeyLookupError(ValueError): pass -class Keyring(object): +class Keyring: def __init__(self, hs, key_fetchers=None): self.clock = hs.get_clock() @@ -420,7 +420,7 @@ class Keyring(object): remaining_requests.difference_update(completed) -class KeyFetcher(object): +class KeyFetcher: async def get_keys(self, keys_to_fetch): """ Args: @@ -456,7 +456,7 @@ class StoreKeyFetcher(KeyFetcher): return keys -class BaseV2KeyFetcher(object): +class BaseV2KeyFetcher: def __init__(self, hs): self.store = hs.get_datastore() self.config = hs.get_config() diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 62ea44fa49..bf800a3852 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -96,7 +96,7 @@ class DefaultDictProperty(DictProperty): return instance._dict.get(self.key, self.default) -class _EventInternalMetadata(object): +class _EventInternalMetadata: __slots__ = ["_dict"] def __init__(self, internal_metadata_dict: JsonDict): diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 7878cd7044..b6c47be646 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -36,7 +36,7 @@ from synapse.util.stringutils import random_string @attr.s(slots=True, cmp=False, frozen=True) -class EventBuilder(object): +class EventBuilder: """A format independent event builder used to build up the event content before signing the event. @@ -164,7 +164,7 @@ class EventBuilder(object): ) -class EventBuilderFactory(object): +class EventBuilderFactory: def __init__(self, hs): self.clock = hs.get_clock() self.hostname = hs.hostname diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index a7cddac974..b0fc859a47 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -25,7 +25,7 @@ if MYPY: import synapse.server -class SpamChecker(object): +class SpamChecker: def __init__(self, hs: "synapse.server.HomeServer"): self.spam_checkers = [] # type: List[Any] diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py index 2956a64234..9d5310851c 100644 --- a/synapse/events/third_party_rules.py +++ b/synapse/events/third_party_rules.py @@ -18,7 +18,7 @@ from synapse.events.snapshot import EventContext from synapse.types import Requester -class ThirdPartyEventRules(object): +class ThirdPartyEventRules: """Allows server admins to provide a Python module implementing an extra set of rules to apply when processing events. diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 2d42e268c6..32c73d3413 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -322,7 +322,7 @@ def serialize_event( return d -class EventClientSerializer(object): +class EventClientSerializer: """Serializes events that are to be sent to clients. This is used for bundling extra information with any events to be sent to diff --git a/synapse/events/validator.py b/synapse/events/validator.py index 5ce3874fba..9df35b54ba 100644 --- a/synapse/events/validator.py +++ b/synapse/events/validator.py @@ -20,7 +20,7 @@ from synapse.events.utils import validate_canonicaljson from synapse.types import EventID, RoomID, UserID -class EventValidator(object): +class EventValidator: def validate_new(self, event, config): """Validates the event has roughly the right format diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 420df2385f..38aa47963f 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -39,7 +39,7 @@ from synapse.types import JsonDict, get_domain_from_id logger = logging.getLogger(__name__) -class FederationBase(object): +class FederationBase: def __init__(self, hs): self.hs = hs diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 630f571cd4..218df884b0 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -785,7 +785,7 @@ def _acl_entry_matches(server_name: str, acl_entry: str) -> Match: return regex.match(server_name) -class FederationHandlerRegistry(object): +class FederationHandlerRegistry: """Allows classes to register themselves as handlers for a given EDU or query type for incoming federation traffic. """ diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index de1fe7da38..079e2b2fe0 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -29,7 +29,7 @@ from synapse.types import JsonDict logger = logging.getLogger(__name__) -class TransactionActions(object): +class TransactionActions: """ Defines persistence actions that relate to handling Transactions. """ diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 4d65d4aeea..8e46957d15 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -46,7 +46,7 @@ from .units import Edu logger = logging.getLogger(__name__) -class FederationRemoteSendQueue(object): +class FederationRemoteSendQueue: """A drop in replacement for FederationSender""" def __init__(self, hs): @@ -365,7 +365,7 @@ class FederationRemoteSendQueue(object): ) -class BaseFederationRow(object): +class BaseFederationRow: """Base class for rows to be sent in the federation stream. Specifies how to identify, serialize and deserialize the different types. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 5276c1734f..552519e82c 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -56,7 +56,7 @@ sent_pdus_destination_dist_total = Counter( ) -class FederationSender(object): +class FederationSender: def __init__(self, hs: "synapse.server.HomeServer"): self.hs = hs self.server_name = hs.hostname diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index f1534d431d..defc228c23 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -53,7 +53,7 @@ sent_edus_by_type = Counter( ) -class PerDestinationQueue(object): +class PerDestinationQueue: """ Manages the per-destination transmission queues. diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 0ebc70d57d..c84072ab73 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -35,7 +35,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class TransactionManager(object): +class TransactionManager: """Helper class which handles building and sending transactions shared between PerDestinationQueue objects diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 9ea821dbb2..17a10f622e 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -30,7 +30,7 @@ from synapse.logging.utils import log_function logger = logging.getLogger(__name__) -class TransportLayerClient(object): +class TransportLayerClient: """Sends federation HTTP requests to other servers""" def __init__(self, hs): diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 5e111aa902..9325e0f857 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -100,7 +100,7 @@ class NoAuthenticationError(AuthenticationError): pass -class Authenticator(object): +class Authenticator: def __init__(self, hs: HomeServer): self._clock = hs.get_clock() self.keyring = hs.get_keyring() @@ -228,7 +228,7 @@ def _parse_auth_header(header_bytes): ) -class BaseFederationServlet(object): +class BaseFederationServlet: """Abstract base class for federation servlet classes. The servlet object should have a PATH attribute which takes the form of a regexp to diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index e674bf44a2..a86b3debc5 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -60,7 +60,7 @@ DEFAULT_ATTESTATION_JITTER = (0.9, 1.3) UPDATE_ATTESTATION_TIME_MS = 1 * 24 * 60 * 60 * 1000 -class GroupAttestationSigning(object): +class GroupAttestationSigning: """Creates and verifies group attestations. """ @@ -124,7 +124,7 @@ class GroupAttestationSigning(object): ) -class GroupAttestionRenewer(object): +class GroupAttestionRenewer: """Responsible for sending and receiving attestation updates. """ diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 8cb922ddc7..1dd20ee4e1 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -32,7 +32,7 @@ logger = logging.getLogger(__name__) # TODO: Flairs -class GroupsServerWorkerHandler(object): +class GroupsServerWorkerHandler: def __init__(self, hs): self.hs = hs self.store = hs.get_datastore() diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 2dd183018a..286f0054be 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -20,7 +20,7 @@ from .identity import IdentityHandler from .search import SearchHandler -class Handlers(object): +class Handlers: """ Deprecated. A collection of handlers. diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index ba2bf99800..0206320e96 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -25,7 +25,7 @@ from synapse.types import UserID logger = logging.getLogger(__name__) -class BaseHandler(object): +class BaseHandler: """ Common base class for the event handlers. """ diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index a8d3fbc6de..9112a0ab86 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -14,7 +14,7 @@ # limitations under the License. -class AccountDataEventSource(object): +class AccountDataEventSource: def __init__(self, hs): self.store = hs.get_datastore() diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py index b865bf5b48..4caf6d591a 100644 --- a/synapse/handlers/account_validity.py +++ b/synapse/handlers/account_validity.py @@ -29,7 +29,7 @@ from synapse.util import stringutils logger = logging.getLogger(__name__) -class AccountValidityHandler(object): +class AccountValidityHandler: def __init__(self, hs): self.hs = hs self.config = hs.config diff --git a/synapse/handlers/acme.py b/synapse/handlers/acme.py index 7666d3abcd..8476256a59 100644 --- a/synapse/handlers/acme.py +++ b/synapse/handlers/acme.py @@ -34,7 +34,7 @@ solutions, please read https://github.com/matrix-org/synapse/blob/master/docs/AC --------------------------------------------------------------------------------""" -class AcmeHandler(object): +class AcmeHandler: def __init__(self, hs): self.hs = hs self.reactor = hs.get_reactor() diff --git a/synapse/handlers/acme_issuing_service.py b/synapse/handlers/acme_issuing_service.py index e1d4224e74..69650ff221 100644 --- a/synapse/handlers/acme_issuing_service.py +++ b/synapse/handlers/acme_issuing_service.py @@ -78,7 +78,7 @@ def create_issuing_service(reactor, acme_url, account_key_file, well_known_resou @attr.s @implementer(ICertificateStore) -class ErsatzStore(object): +class ErsatzStore: """ A store that only stores in memory. """ diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 506bb2b275..918d0e037c 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -197,7 +197,7 @@ class AdminHandler(BaseHandler): return writer.finished() -class ExfiltrationWriter(object): +class ExfiltrationWriter: """Interface used to specify how to write exported data. """ diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index c9044a5019..9d4e87dad6 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -34,7 +34,7 @@ logger = logging.getLogger(__name__) events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "") -class ApplicationServicesHandler(object): +class ApplicationServicesHandler: def __init__(self, hs): self.store = hs.get_datastore() self.is_mine_id = hs.is_mine_id diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index f0b0a4d76a..90189869cc 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -1236,7 +1236,7 @@ class AuthHandler(BaseHandler): @attr.s -class MacaroonGenerator(object): +class MacaroonGenerator: hs = attr.ib() diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index ee4666337a..643d71a710 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -497,7 +497,7 @@ def _update_device_from_client_ips(device, client_ips): device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")}) -class DeviceListUpdater(object): +class DeviceListUpdater: "Handles incoming device list updates from federation and updates the DB" def __init__(self, hs, device_handler): diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index dcb4c82244..64ef7f63ab 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -31,7 +31,7 @@ from synapse.util.stringutils import random_string logger = logging.getLogger(__name__) -class DeviceMessageHandler(object): +class DeviceMessageHandler: def __init__(self, hs): """ Args: diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index dfd1c78549..d629c7c16c 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -43,7 +43,7 @@ from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) -class E2eKeysHandler(object): +class E2eKeysHandler: def __init__(self, hs): self.store = hs.get_datastore() self.federation = hs.get_federation_client() @@ -1212,7 +1212,7 @@ class SignatureListItem: signature = attr.ib() -class SigningKeyEduUpdater(object): +class SigningKeyEduUpdater: """Handles incoming signing key updates from federation and updates the DB""" def __init__(self, hs, e2e_keys_handler): diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 0bb983dc28..f01b090772 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -29,7 +29,7 @@ from synapse.util.async_helpers import Linearizer logger = logging.getLogger(__name__) -class E2eRoomKeysHandler(object): +class E2eRoomKeysHandler: """ Implements an optional realtime backup mechanism for encrypted E2E megolm room keys. This gives a way for users to store and recover their megolm keys if they lose all diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 0e2656ccb3..44df567983 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -52,7 +52,7 @@ def _create_rerouter(func_name): return f -class GroupsLocalWorkerHandler(object): +class GroupsLocalWorkerHandler: def __init__(self, hs): self.hs = hs self.store = hs.get_datastore() diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 72bb638167..8a7b4916cd 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -64,7 +64,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class MessageHandler(object): +class MessageHandler: """Contains some read only APIs to get state about a room """ @@ -361,7 +361,7 @@ class MessageHandler(object): _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000 -class EventCreationHandler(object): +class EventCreationHandler: def __init__(self, hs: "HomeServer"): self.hs = hs self.auth = hs.get_auth() diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 63d7edff87..34ed0e2921 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -37,7 +37,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class PurgeStatus(object): +class PurgeStatus: """Object tracking the status of a purge request This class contains information on the progress of a purge request, for @@ -65,7 +65,7 @@ class PurgeStatus(object): return {"status": PurgeStatus.STATUS_TEXT[self.status]} -class PaginationHandler(object): +class PaginationHandler: """Handles pagination and purge history requests. These are in the same handler due to the fact we need to block clients diff --git a/synapse/handlers/password_policy.py b/synapse/handlers/password_policy.py index d06b110269..88e2f87200 100644 --- a/synapse/handlers/password_policy.py +++ b/synapse/handlers/password_policy.py @@ -22,7 +22,7 @@ from synapse.api.errors import Codes, PasswordRefusedError logger = logging.getLogger(__name__) -class PasswordPolicyHandler(object): +class PasswordPolicyHandler: def __init__(self, hs): self.policy = hs.config.password_policy self.enabled = hs.config.password_policy_enabled diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1846068150..91a3aec1cc 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1010,7 +1010,7 @@ def format_user_presence_state(state, now, include_user_id=True): return content -class PresenceEventSource(object): +class PresenceEventSource: def __init__(self, hs): # We can't call get_presence_handler here because there's a cycle: # diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index f922d8a545..2cc6c2eb68 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -123,7 +123,7 @@ class ReceiptsHandler(BaseHandler): await self.federation.send_read_receipt(receipt) -class ReceiptEventSource(object): +class ReceiptEventSource: def __init__(self, hs): self.store = hs.get_datastore() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 9d5b1828df..a29305f655 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -974,7 +974,7 @@ class RoomCreationHandler(BaseHandler): raise StoreError(500, "Couldn't generate a room ID.") -class RoomContextHandler(object): +class RoomContextHandler: def __init__(self, hs: "HomeServer"): self.hs = hs self.store = hs.get_datastore() @@ -1084,7 +1084,7 @@ class RoomContextHandler(object): return results -class RoomEventSource(object): +class RoomEventSource: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() @@ -1146,7 +1146,7 @@ class RoomEventSource(object): return self.store.get_room_events_max_id(room_id) -class RoomShutdownHandler(object): +class RoomShutdownHandler: DEFAULT_MESSAGE = ( "Sharing illegal content on this server is not permitted and rooms in" diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index a7962b0ada..32b7e323fa 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -51,7 +51,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class RoomMemberHandler(object): +class RoomMemberHandler: # TODO(paul): This handler currently contains a messy conflation of # low-level API that works on UserID objects and so on, and REST-level # API that takes ID strings and returns pagination chunks. These concerns diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml_handler.py index b426199aa6..66b063f991 100644 --- a/synapse/handlers/saml_handler.py +++ b/synapse/handlers/saml_handler.py @@ -360,12 +360,12 @@ MXID_MAPPER_MAP = { @attr.s -class SamlConfig(object): +class SamlConfig: mxid_source_attribute = attr.ib() mxid_mapper = attr.ib() -class DefaultSamlMappingProvider(object): +class DefaultSamlMappingProvider: __version__ = "0.0.1" def __init__(self, parsed_config: SamlConfig, module_api: ModuleApi): diff --git a/synapse/handlers/state_deltas.py b/synapse/handlers/state_deltas.py index 8590c1eff4..7a4ae0727a 100644 --- a/synapse/handlers/state_deltas.py +++ b/synapse/handlers/state_deltas.py @@ -18,7 +18,7 @@ import logging logger = logging.getLogger(__name__) -class StateDeltasHandler(object): +class StateDeltasHandler: def __init__(self, hs): self.store = hs.get_datastore() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8728403e62..e2ddb628ff 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -246,7 +246,7 @@ class SyncResult: __bool__ = __nonzero__ # python3 -class SyncHandler(object): +class SyncHandler: def __init__(self, hs: "HomeServer"): self.hs_config = hs.config self.store = hs.get_datastore() @@ -2075,7 +2075,7 @@ class SyncResultBuilder: @attr.s -class RoomSyncResultBuilder(object): +class RoomSyncResultBuilder: """Stores information needed to create either a `JoinedSyncResult` or `ArchivedSyncResult`. diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 1d828bd7be..3cbfc2d780 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -412,7 +412,7 @@ class TypingWriterHandler(FollowerTypingHandler): raise Exception("Typing writer instance got typing info over replication") -class TypingNotificationEventSource(object): +class TypingNotificationEventSource: def __init__(self, hs): self.hs = hs self.clock = hs.get_clock() diff --git a/synapse/http/client.py b/synapse/http/client.py index dad01a8e56..13fcab3378 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -86,7 +86,7 @@ def _make_scheduler(reactor): return _scheduler -class IPBlacklistingResolver(object): +class IPBlacklistingResolver: """ A proxy for reactor.nameResolver which only produces non-blacklisted IP addresses, preventing DNS rebinding attacks on URL preview. @@ -133,7 +133,7 @@ class IPBlacklistingResolver(object): r.resolutionComplete() @provider(IResolutionReceiver) - class EndpointReceiver(object): + class EndpointReceiver: @staticmethod def resolutionBegan(resolutionInProgress): pass @@ -192,7 +192,7 @@ class BlacklistingAgentWrapper(Agent): ) -class SimpleHttpClient(object): +class SimpleHttpClient: """ A simple, no-frills HTTP client with methods that wrap up common ways of using HTTP in Matrix @@ -244,7 +244,7 @@ class SimpleHttpClient(object): ) @implementer(IReactorPluggableNameResolver) - class Reactor(object): + class Reactor: def __getattr__(_self, attr): if attr == "nameResolver": return nameResolver diff --git a/synapse/http/connectproxyclient.py b/synapse/http/connectproxyclient.py index be7b2ceb8e..856e28454f 100644 --- a/synapse/http/connectproxyclient.py +++ b/synapse/http/connectproxyclient.py @@ -31,7 +31,7 @@ class ProxyConnectError(ConnectError): @implementer(IStreamClientEndpoint) -class HTTPConnectProxyEndpoint(object): +class HTTPConnectProxyEndpoint: """An Endpoint implementation which will send a CONNECT request to an http proxy Wraps an existing HostnameEndpoint for the proxy. diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 782d39d4ca..83d6196d4a 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -36,7 +36,7 @@ logger = logging.getLogger(__name__) @implementer(IAgent) -class MatrixFederationAgent(object): +class MatrixFederationAgent: """An Agent-like thing which provides a `request` method which correctly handles resolving matrix server names when using matrix://. Handles standard https URIs as normal. @@ -175,7 +175,7 @@ class MatrixFederationAgent(object): @implementer(IAgentEndpointFactory) -class MatrixHostnameEndpointFactory(object): +class MatrixHostnameEndpointFactory: """Factory for MatrixHostnameEndpoint for parsing to an Agent. """ @@ -198,7 +198,7 @@ class MatrixHostnameEndpointFactory(object): @implementer(IStreamClientEndpoint) -class MatrixHostnameEndpoint(object): +class MatrixHostnameEndpoint: """An endpoint that resolves matrix:// URLs using Matrix server name resolution (i.e. via SRV). Does not check for well-known delegation. diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py index 2ede90a9b1..d9620032d2 100644 --- a/synapse/http/federation/srv_resolver.py +++ b/synapse/http/federation/srv_resolver.py @@ -33,7 +33,7 @@ SERVER_CACHE = {} @attr.s(slots=True, frozen=True) -class Server(object): +class Server: """ Our record of an individual server which can be tried to reach a destination. @@ -96,7 +96,7 @@ def _sort_server_list(server_list): return results -class SrvResolver(object): +class SrvResolver: """Interface to the dns client to do SRV lookups, with result caching. The default resolver in twisted.names doesn't do any caching (it has a CacheResolver, diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py index cdb6bec56e..e6f067ca29 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py @@ -71,11 +71,11 @@ _had_valid_well_known_cache = TTLCache("had-valid-well-known") @attr.s(slots=True, frozen=True) -class WellKnownLookupResult(object): +class WellKnownLookupResult: delegated_server = attr.ib() -class WellKnownResolver(object): +class WellKnownResolver: """Handles well-known lookups for matrix servers. """ diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 738be43f46..775fad3be4 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -76,7 +76,7 @@ _next_id = 1 @attr.s(frozen=True) -class MatrixFederationRequest(object): +class MatrixFederationRequest: method = attr.ib() """HTTP method :type: str @@ -203,7 +203,7 @@ async def _handle_json_response( return body -class MatrixFederationHttpClient(object): +class MatrixFederationHttpClient: """HTTP client used to talk to other homeservers over the federation protocol. Send client certificates and signs requests. @@ -226,7 +226,7 @@ class MatrixFederationHttpClient(object): ) @implementer(IReactorPluggableNameResolver) - class Reactor(object): + class Reactor: def __getattr__(_self, attr): if attr == "nameResolver": return nameResolver diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index b58ae3d9db..cd94e789e8 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -145,7 +145,7 @@ LaterGauge( ) -class RequestMetrics(object): +class RequestMetrics: def start(self, time_sec, name, method): self.start = time_sec self.start_context = current_context() diff --git a/synapse/http/server.py b/synapse/http/server.py index 8d791bd2ca..996a31a9ec 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -174,7 +174,7 @@ def wrap_async_request_handler(h): return preserve_fn(wrapped_async_request_handler) -class HttpServer(object): +class HttpServer: """ Interface for registering callbacks on a HTTP server """ diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 53acba56cb..fd90ba7828 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -256,7 +256,7 @@ def assert_params_in_dict(body, required): raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM) -class RestServlet(object): +class RestServlet: """ A Synapse REST Servlet. diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py index 7372450b45..144506c8f2 100644 --- a/synapse/logging/_structured.py +++ b/synapse/logging/_structured.py @@ -55,7 +55,7 @@ def stdlib_log_level_to_twisted(level: str) -> LogLevel: @attr.s @implementer(ILogObserver) -class LogContextObserver(object): +class LogContextObserver: """ An ILogObserver which adds Synapse-specific log context information. @@ -169,7 +169,7 @@ class OutputPipeType(Values): @attr.s -class DrainConfiguration(object): +class DrainConfiguration: name = attr.ib() type = attr.ib() location = attr.ib() @@ -177,7 +177,7 @@ class DrainConfiguration(object): @attr.s -class NetworkJSONTerseOptions(object): +class NetworkJSONTerseOptions: maximum_buffer = attr.ib(type=int) diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index c0b9384189..1b8916cfa2 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -152,7 +152,7 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb @attr.s @implementer(IPushProducer) -class LogProducer(object): +class LogProducer: """ An IPushProducer that writes logs from its buffer to its transport when it is resumed. @@ -190,7 +190,7 @@ class LogProducer(object): @attr.s @implementer(ILogObserver) -class TerseJSONToTCPLogObserver(object): +class TerseJSONToTCPLogObserver: """ An IObserver that writes JSON logs to a TCP target. diff --git a/synapse/logging/context.py b/synapse/logging/context.py index cbeeb870cb..22598e02d2 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -74,7 +74,7 @@ except Exception: get_thread_id = threading.get_ident -class ContextResourceUsage(object): +class ContextResourceUsage: """Object for tracking the resources used by a log context Attributes: @@ -179,7 +179,7 @@ class ContextResourceUsage(object): LoggingContextOrSentinel = Union["LoggingContext", "_Sentinel"] -class _Sentinel(object): +class _Sentinel: """Sentinel to represent the root context""" __slots__ = ["previous_context", "finished", "request", "scope", "tag"] @@ -226,7 +226,7 @@ class _Sentinel(object): SENTINEL_CONTEXT = _Sentinel() -class LoggingContext(object): +class LoggingContext: """Additional context for log formatting. Contexts are scoped within a "with" block. diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index d39ac62168..7df0aa197d 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -185,7 +185,7 @@ if TYPE_CHECKING: # Helper class -class _DummyTagNames(object): +class _DummyTagNames: """wrapper of opentracings tags. We need to have them if we want to reference them without opentracing around. Clearly they should never actually show up in a trace. `set_tags` overwrites diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 6035672698..2643380d9e 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -51,7 +51,7 @@ all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge, BucketCollec HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") -class RegistryProxy(object): +class RegistryProxy: @staticmethod def collect(): for metric in REGISTRY.collect(): @@ -60,7 +60,7 @@ class RegistryProxy(object): @attr.s(hash=True) -class LaterGauge(object): +class LaterGauge: name = attr.ib(type=str) desc = attr.ib(type=str) @@ -100,7 +100,7 @@ class LaterGauge(object): all_gauges[self.name] = self -class InFlightGauge(object): +class InFlightGauge: """Tracks number of things (e.g. requests, Measure blocks, etc) in flight at any given time. @@ -206,7 +206,7 @@ class InFlightGauge(object): @attr.s(hash=True) -class BucketCollector(object): +class BucketCollector: """ Like a Histogram, but allows buckets to be point-in-time instead of incrementally added to. @@ -269,7 +269,7 @@ class BucketCollector(object): # -class CPUMetrics(object): +class CPUMetrics: def __init__(self): ticks_per_sec = 100 try: @@ -329,7 +329,7 @@ gc_time = Histogram( ) -class GCCounts(object): +class GCCounts: def collect(self): cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"]) for n, m in enumerate(gc.get_count()): @@ -347,7 +347,7 @@ if not running_on_pypy: # -class PyPyGCStats(object): +class PyPyGCStats: def collect(self): # @stats is a pretty-printer object with __str__() returning a nice table, @@ -482,7 +482,7 @@ build_info.labels( last_ticked = time.time() -class ReactorLastSeenMetric(object): +class ReactorLastSeenMetric: def collect(self): cm = GaugeMetricFamily( "python_twisted_reactor_last_seen", diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 4cd7932e5b..5b73463504 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -105,7 +105,7 @@ _background_processes_active_since_last_scrape = set() # type: Set[_BackgroundP _bg_metrics_lock = threading.Lock() -class _Collector(object): +class _Collector: """A custom metrics collector for the background process metrics. Ensures that all of the metrics are up-to-date with any in-flight processes @@ -140,7 +140,7 @@ class _Collector(object): REGISTRY.register(_Collector()) -class _BackgroundProcess(object): +class _BackgroundProcess: def __init__(self, desc, ctx): self.desc = desc self._context = ctx diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index ae0e359a77..fcbd5378c4 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -31,7 +31,7 @@ __all__ = ["errors", "make_deferred_yieldable", "run_in_background", "ModuleApi" logger = logging.getLogger(__name__) -class ModuleApi(object): +class ModuleApi: """A proxy object that gets passed to various plugin modules so they can register new users etc if necessary. """ diff --git a/synapse/notifier.py b/synapse/notifier.py index dfb096e589..b7f4041306 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -68,7 +68,7 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int: return n -class _NotificationListener(object): +class _NotificationListener: """ This represents a single client connection to the events stream. The events stream handler will have yielded to the deferred, so to notify the handler it is sufficient to resolve the deferred. @@ -80,7 +80,7 @@ class _NotificationListener(object): self.deferred = deferred -class _NotifierUserStream(object): +class _NotifierUserStream: """This represents a user connected to the event stream. It tracks the most recent stream token for that user. At a given point a user may have a number of streams listening for @@ -168,7 +168,7 @@ class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))): __bool__ = __nonzero__ # python3 -class Notifier(object): +class Notifier: """ This class is responsible for notifying any listeners when there are new events available for it. diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 0d23142653..fabc9ba126 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -22,7 +22,7 @@ from .bulk_push_rule_evaluator import BulkPushRuleEvaluator logger = logging.getLogger(__name__) -class ActionGenerator(object): +class ActionGenerator: def __init__(self, hs): self.hs = hs self.clock = hs.get_clock() diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index e7fa02b78b..1bb8e346b9 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -95,7 +95,7 @@ def _should_count_as_unread(event: EventBase, context: EventContext) -> bool: return False -class BulkPushRuleEvaluator(object): +class BulkPushRuleEvaluator: """Calculates the outcome of push rules for an event for all users in the room at once. """ @@ -263,7 +263,7 @@ def _condition_checker(evaluator, conditions, uid, display_name, cache): return True -class RulesForRoom(object): +class RulesForRoom: """Caches push rules for users in a room. This efficiently handles users joining/leaving the room by not invalidating diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 568c13eaea..b7ea4438e0 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -45,7 +45,7 @@ THROTTLE_RESET_AFTER_MS = 12 * 60 * 60 * 1000 INCLUDE_ALL_UNREAD_NOTIFS = False -class EmailPusher(object): +class EmailPusher: """ A pusher that sends email notifications about events (approximately) when they happen. diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 4c469efb20..f21fa9b659 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -49,7 +49,7 @@ http_badges_failed_counter = Counter( ) -class HttpPusher(object): +class HttpPusher: INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes MAX_BACKOFF_SEC = 60 * 60 diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index c38e037281..6c57854018 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -92,7 +92,7 @@ ALLOWED_ATTRS = { # ALLOWED_SCHEMES = ["http", "https", "ftp", "mailto"] -class Mailer(object): +class Mailer: def __init__(self, hs, app_name, template_html, template_text): self.hs = hs self.template_html = template_html diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 2d79ada189..709ace01e5 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -105,7 +105,7 @@ def tweaks_for_actions(actions: List[Union[str, Dict]]) -> Dict[str, Any]: return tweaks -class PushRuleEvaluatorForEvent(object): +class PushRuleEvaluatorForEvent: def __init__( self, event: EventBase, diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py index f626797133..2a52e226e3 100644 --- a/synapse/push/pusher.py +++ b/synapse/push/pusher.py @@ -23,7 +23,7 @@ from .httppusher import HttpPusher logger = logging.getLogger(__name__) -class PusherFactory(object): +class PusherFactory: def __init__(self, hs): self.hs = hs self.config = hs.config diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 6a28c2db9d..ba16f22c91 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -33,7 +33,7 @@ from synapse.util.stringutils import random_string logger = logging.getLogger(__name__) -class ReplicationEndpoint(object): +class ReplicationEndpoint: """Helper base class for defining new replication HTTP endpoints. This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..` diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py index 047f2c50f7..eb74903d68 100644 --- a/synapse/replication/slave/storage/_slaved_id_tracker.py +++ b/synapse/replication/slave/storage/_slaved_id_tracker.py @@ -16,7 +16,7 @@ from synapse.storage.util.id_generators import _load_current_id -class SlavedIdTracker(object): +class SlavedIdTracker: def __init__(self, db_conn, table, column, extra_tables=[], step=1): self.step = step self._current = _load_current_id(db_conn, table, column, step) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 0350923898..0b0d204e64 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -113,7 +113,7 @@ PING_TIMEOUT_MULTIPLIER = 5 PING_TIMEOUT_MS = PING_TIME * PING_TIMEOUT_MULTIPLIER -class ConnectionStates(object): +class ConnectionStates: CONNECTING = "connecting" ESTABLISHED = "established" PAUSED = "paused" diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 41569305df..04d894fb3d 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -58,7 +58,7 @@ class ReplicationStreamProtocolFactory(Factory): ) -class ReplicationStreamer(object): +class ReplicationStreamer: """Handles replication connections. This needs to be poked when new replication data may be available. When new diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 8c3caf30c9..682d47f402 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -79,7 +79,7 @@ StreamUpdateResult = Tuple[List[Tuple[Token, StreamRow]], Token, bool] UpdateFunction = Callable[[str, Token, Token, int], Awaitable[StreamUpdateResult]] -class Stream(object): +class Stream: """Base class for the streams. Provides a `get_updates()` function that returns new updates since the last diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 16c63ff4ec..f929fc3954 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -49,14 +49,14 @@ data part are: @attr.s(slots=True, frozen=True) -class EventsStreamRow(object): +class EventsStreamRow: """A parsed row from the events replication stream""" type = attr.ib() # str: the TypeId of one of the *EventsStreamRows data = attr.ib() # BaseEventsStreamRow -class BaseEventsStreamRow(object): +class BaseEventsStreamRow: """Base class for rows to be sent in the events stream. Specifies how to identify, serialize and deserialize the different types. diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index 6da71dc46f..7be5c0fb88 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) CLEANUP_PERIOD_MS = 1000 * 60 * 30 # 30 mins -class HttpTransactionCache(object): +class HttpTransactionCache: def __init__(self, hs): self.hs = hs self.auth = self.hs.get_auth() diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 51372cdb5e..b6b90a8b30 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -658,7 +658,7 @@ class RegisterRestServlet(RestServlet): (object) params: registration parameters, from which we pull device_id, initial_device_name and inhibit_login Returns: - (object) dictionary for response from /register + dictionary for response from /register """ result = {"user_id": user_id, "home_server": self.hs.hostname} if not params.get("inhibit_login", False): diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py index 20ddb9550b..6568e61829 100644 --- a/synapse/rest/media/v1/_base.py +++ b/synapse/rest/media/v1/_base.py @@ -235,7 +235,7 @@ async def respond_with_responder( finish_request(request) -class Responder(object): +class Responder: """Represents a response that can be streamed to the requester. Responder is a context manager which *must* be used, so that any resources @@ -260,7 +260,7 @@ class Responder(object): pass -class FileInfo(object): +class FileInfo: """Details about a requested/uploaded file. Attributes: diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index e25c382c9c..d2826374a7 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -33,7 +33,7 @@ def _wrap_in_base_path(func): return _wrapped -class MediaFilePaths(object): +class MediaFilePaths: """Describes where files are stored on disk. Most of the functions have a `*_rel` variant which returns a file path that diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 6fb4039e98..9a1b7779f7 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -62,7 +62,7 @@ logger = logging.getLogger(__name__) UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000 -class MediaRepository(object): +class MediaRepository: def __init__(self, hs): self.hs = hs self.auth = hs.get_auth() diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index ab1fa705bf..3a352b5631 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -34,7 +34,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class MediaStorage(object): +class MediaStorage: """Responsible for storing/fetching files from local sources. Args: diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py index 7126997134..d681bf7bf0 100644 --- a/synapse/rest/media/v1/thumbnailer.py +++ b/synapse/rest/media/v1/thumbnailer.py @@ -31,7 +31,7 @@ EXIF_TRANSPOSE_MAPPINGS = { } -class Thumbnailer(object): +class Thumbnailer: FORMATS = {"image/jpeg": "JPEG", "image/png": "PNG"} diff --git a/synapse/rest/well_known.py b/synapse/rest/well_known.py index e15e13b756..f591cc6c5c 100644 --- a/synapse/rest/well_known.py +++ b/synapse/rest/well_known.py @@ -23,7 +23,7 @@ from synapse.util import json_encoder logger = logging.getLogger(__name__) -class WellKnownBuilder(object): +class WellKnownBuilder: """Utility to construct the well-known response Args: diff --git a/synapse/secrets.py b/synapse/secrets.py index ff86950a54..fb6d90a3b7 100644 --- a/synapse/secrets.py +++ b/synapse/secrets.py @@ -37,7 +37,7 @@ else: import binascii import os - class Secrets(object): + class Secrets: def token_bytes(self, nbytes=32): return os.urandom(nbytes) diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py index 089cfef0b3..3673e7f47e 100644 --- a/synapse/server_notices/consent_server_notices.py +++ b/synapse/server_notices/consent_server_notices.py @@ -23,7 +23,7 @@ from synapse.types import get_localpart_from_id logger = logging.getLogger(__name__) -class ConsentServerNotices(object): +class ConsentServerNotices: """Keeps track of whether we need to send users server_notices about privacy policy consent, and sends one if we do. """ diff --git a/synapse/server_notices/resource_limits_server_notices.py b/synapse/server_notices/resource_limits_server_notices.py index c2faef6eab..2258d306d9 100644 --- a/synapse/server_notices/resource_limits_server_notices.py +++ b/synapse/server_notices/resource_limits_server_notices.py @@ -27,7 +27,7 @@ from synapse.server_notices.server_notices_manager import SERVER_NOTICE_ROOM_TAG logger = logging.getLogger(__name__) -class ResourceLimitsServerNotices(object): +class ResourceLimitsServerNotices: """ Keeps track of whether the server has reached it's resource limit and ensures that the client is kept up to date. """ diff --git a/synapse/server_notices/server_notices_manager.py b/synapse/server_notices/server_notices_manager.py index ed96aa8571..0422d4c7ce 100644 --- a/synapse/server_notices/server_notices_manager.py +++ b/synapse/server_notices/server_notices_manager.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) SERVER_NOTICE_ROOM_TAG = "m.server_notice" -class ServerNoticesManager(object): +class ServerNoticesManager: def __init__(self, hs): """ diff --git a/synapse/server_notices/server_notices_sender.py b/synapse/server_notices/server_notices_sender.py index a754f75db4..6870b67ca0 100644 --- a/synapse/server_notices/server_notices_sender.py +++ b/synapse/server_notices/server_notices_sender.py @@ -20,7 +20,7 @@ from synapse.server_notices.resource_limits_server_notices import ( ) -class ServerNoticesSender(object): +class ServerNoticesSender: """A centralised place which sends server notices automatically when Certain Events take place """ diff --git a/synapse/server_notices/worker_server_notices_sender.py b/synapse/server_notices/worker_server_notices_sender.py index e9390b19da..9273e61895 100644 --- a/synapse/server_notices/worker_server_notices_sender.py +++ b/synapse/server_notices/worker_server_notices_sender.py @@ -14,7 +14,7 @@ # limitations under the License. -class WorkerServerNoticesSender(object): +class WorkerServerNoticesSender: """Stub impl of ServerNoticesSender which does nothing""" def __init__(self, hs): diff --git a/synapse/spam_checker_api/__init__.py b/synapse/spam_checker_api/__init__.py index 9be92e2565..395ac5ab02 100644 --- a/synapse/spam_checker_api/__init__.py +++ b/synapse/spam_checker_api/__init__.py @@ -36,7 +36,7 @@ class RegistrationBehaviour(Enum): DENY = "deny" -class SpamCheckerApi(object): +class SpamCheckerApi: """A proxy object that gets passed to spam checkers so they can get access to rooms and other relevant information. """ diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 9bf2ec368f..c7e3015b5d 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -77,7 +77,7 @@ def _gen_state_id(): return s -class _StateCacheEntry(object): +class _StateCacheEntry: __slots__ = ["state", "state_group", "state_id", "prev_group", "delta_ids"] def __init__( @@ -113,7 +113,7 @@ class _StateCacheEntry(object): return len(self.state) -class StateHandler(object): +class StateHandler: """Fetches bits of state from the stores, and does state resolution where necessary """ @@ -462,7 +462,7 @@ class StateHandler(object): return {key: state_map[ev_id] for key, ev_id in new_state.items()} -class StateResolutionHandler(object): +class StateResolutionHandler: """Responsible for doing state conflict resolution. Note that the storage layer depends on this handler, so all functions must @@ -679,7 +679,7 @@ def resolve_events_with_store( @attr.s -class StateResolutionStore(object): +class StateResolutionStore: """Interface that allows state resolution algorithms to access the database in well defined way. diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 5ef3853559..8e5d78f6f7 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -37,7 +37,7 @@ from synapse.storage.state import StateGroupStorage __all__ = ["DataStores", "DataStore"] -class Storage(object): +class Storage: """The high level interfaces for talking to various storage layers. """ diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 67a89cd51a..810721ebe9 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -24,7 +24,7 @@ from . import engines logger = logging.getLogger(__name__) -class BackgroundUpdatePerformance(object): +class BackgroundUpdatePerformance: """Tracks the how long a background update is taking to update its items""" def __init__(self, name): @@ -71,7 +71,7 @@ class BackgroundUpdatePerformance(object): return float(self.total_item_count) / float(self.total_duration_ms) -class BackgroundUpdater(object): +class BackgroundUpdater: """ Background updates are updates to the database that run in the background. Each update processes a batch of data at once. We attempt to limit the impact of each update by monitoring how long each batch takes to diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 78ca6d8346..8be943f589 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -248,7 +248,7 @@ class LoggingTransaction: self.txn.close() -class PerformanceCounters(object): +class PerformanceCounters: def __init__(self): self.current_counters = {} self.previous_counters = {} @@ -286,7 +286,7 @@ class PerformanceCounters(object): R = TypeVar("R") -class DatabasePool(object): +class DatabasePool: """Wraps a single physical database and connection pool. A single database may be used by multiple data stores. diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index 0ac854aee2..7f08bd8285 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -24,7 +24,7 @@ from synapse.storage.prepare_database import prepare_database logger = logging.getLogger(__name__) -class Databases(object): +class Databases: """The various databases. These are low level interfaces to physical databases. diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index c46f5cd524..91a8b43da3 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -999,7 +999,7 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore): await self.db_pool.runInteraction("forget_membership", f) -class _JoinedHostsCache(object): +class _JoinedHostsCache: """Cache for joined hosts in a room that is optimised to handle updates via state deltas. """ diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 4769b21529..afd10f7bae 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -22,6 +22,6 @@ logger = logging.getLogger(__name__) @attr.s(slots=True, frozen=True) -class FetchKeyResult(object): +class FetchKeyResult: verify_key = attr.ib() # VerifyKey: the key itself valid_until_ts = attr.ib() # int: how long we can use this key for diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index f15b95e633..dbaeef91dd 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -69,7 +69,7 @@ stale_forward_extremities_counter = Histogram( ) -class _EventPeristenceQueue(object): +class _EventPeristenceQueue: """Queues up events so that they can be persisted in bulk with only one concurrent transaction per room. """ @@ -172,7 +172,7 @@ class _EventPeristenceQueue(object): pass -class EventsPersistenceStorage(object): +class EventsPersistenceStorage: """High level interface for handling persisting newly received events. Takes care of batching up events by room, and calculating the necessary diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 1c5f305132..964d8d9eb8 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -569,7 +569,7 @@ def _get_or_create_schema_state(txn, database_engine): @attr.s() -class _DirectoryListing(object): +class _DirectoryListing: """Helper class to store schema file name and the absolute path to it. diff --git a/synapse/storage/purge_events.py b/synapse/storage/purge_events.py index 79d9f06e2e..bfa0a9fd06 100644 --- a/synapse/storage/purge_events.py +++ b/synapse/storage/purge_events.py @@ -20,7 +20,7 @@ from typing import Set logger = logging.getLogger(__name__) -class PurgeEventsStorage(object): +class PurgeEventsStorage: """High level interface for purging rooms and event history. """ diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py index d471ec9860..d30e3f11e7 100644 --- a/synapse/storage/relations.py +++ b/synapse/storage/relations.py @@ -23,7 +23,7 @@ logger = logging.getLogger(__name__) @attr.s -class PaginationChunk(object): +class PaginationChunk: """Returned by relation pagination APIs. Attributes: @@ -51,7 +51,7 @@ class PaginationChunk(object): @attr.s(frozen=True, slots=True) -class RelationPaginationToken(object): +class RelationPaginationToken: """Pagination token for relation pagination API. As the results are in topological order, we can use the @@ -82,7 +82,7 @@ class RelationPaginationToken(object): @attr.s(frozen=True, slots=True) -class AggregationPaginationToken(object): +class AggregationPaginationToken: """Pagination token for relation aggregation pagination API. As the results are order by count and then MAX(stream_ordering) of the diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 96a1b59d64..8f68d968f0 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -29,7 +29,7 @@ T = TypeVar("T") @attr.s(slots=True) -class StateFilter(object): +class StateFilter: """A filter used when querying for state. Attributes: @@ -326,7 +326,7 @@ class StateFilter(object): return member_filter, non_member_filter -class StateGroupStorage(object): +class StateGroupStorage: """High level interface to fetching state for event. """ diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 9f3d23f0a5..76bc3afdfa 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -25,7 +25,7 @@ from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.util.sequence import PostgresSequenceGenerator -class IdGenerator(object): +class IdGenerator: def __init__(self, db_conn, table, column): self._lock = threading.Lock() self._next_id = _load_current_id(db_conn, table, column) @@ -59,7 +59,7 @@ def _load_current_id(db_conn, table, column, step=1): return (max if step > 0 else min)(current_id, step) -class StreamIdGenerator(object): +class StreamIdGenerator: """Used to generate new stream ids when persisting events while keeping track of which transactions have been completed. diff --git a/synapse/streams/config.py b/synapse/streams/config.py index ca7c16ff65..d97dc4d101 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) MAX_LIMIT = 1000 -class SourcePaginationConfig(object): +class SourcePaginationConfig: """A configuration object which stores pagination parameters for a specific event source.""" @@ -45,7 +45,7 @@ class SourcePaginationConfig(object): ) -class PaginationConfig(object): +class PaginationConfig: """A configuration object which stores pagination parameters.""" diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 7ab46f42bf..92fd5d489f 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -23,7 +23,7 @@ from synapse.handlers.typing import TypingNotificationEventSource from synapse.types import StreamToken -class EventSources(object): +class EventSources: SOURCE_TYPES = { "room": RoomEventSource, "presence": PresenceEventSource, diff --git a/synapse/types.py b/synapse/types.py index f8b9b03850..f7de48f148 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -529,7 +529,7 @@ class ThirdPartyInstanceID( @attr.s(slots=True) -class ReadReceipt(object): +class ReadReceipt: """Information about a read-receipt""" room_id = attr.ib() diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index b2a22dbd5c..3ad4b28fc7 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -46,7 +46,7 @@ def unwrapFirstError(failure): @attr.s -class Clock(object): +class Clock: """ A Clock wraps a Twisted reactor and provides utilities on top of it. diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index dfefbd996d..bb57e27beb 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -36,7 +36,7 @@ from synapse.util import Clock, unwrapFirstError logger = logging.getLogger(__name__) -class ObservableDeferred(object): +class ObservableDeferred: """Wraps a deferred object so that we can add observer deferreds. These observer deferreds do not affect the callback chain of the original deferred. @@ -188,7 +188,7 @@ def yieldable_gather_results(func, iter, *args, **kwargs): ).addErrback(unwrapFirstError) -class Linearizer(object): +class Linearizer: """Limits concurrent access to resources based on a key. Useful to ensure only a few things happen at a time on a given resource. @@ -338,7 +338,7 @@ class Linearizer(object): return new_defer -class ReadWriteLock(object): +class ReadWriteLock: """An async read write lock. Example: @@ -502,7 +502,7 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None): @attr.s(slots=True, frozen=True) -class DoneAwaitable(object): +class DoneAwaitable: """Simple awaitable that returns the provided value. """ diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index dd356bf156..237f588658 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -43,7 +43,7 @@ response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["n @attr.s -class CacheMetric(object): +class CacheMetric: _cache = attr.ib() _cache_type = attr.ib(type=str) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 825810eb16..98b34f2223 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -64,7 +64,7 @@ cache_pending_metric = Gauge( _CacheSentinel = object() -class CacheEntry(object): +class CacheEntry: __slots__ = ["deferred", "callbacks", "invalidated"] def __init__(self, deferred, callbacks): @@ -80,7 +80,7 @@ class CacheEntry(object): self.callbacks.clear() -class Cache(object): +class Cache: __slots__ = ( "cache", "name", @@ -288,7 +288,7 @@ class Cache(object): self._pending_deferred_cache.clear() -class _CacheDescriptorBase(object): +class _CacheDescriptorBase: def __init__(self, orig: _CachedFunction, num_args, cache_context=False): self.orig = orig @@ -705,7 +705,7 @@ def cachedList( Example: - class Example(object): + class Example: @cached(num_args=2) def do_something(self, first_arg): ... diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index 6834e6f3ae..8592b93689 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -40,7 +40,7 @@ class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "known_absent", "va return len(self.value) -class DictionaryCache(object): +class DictionaryCache: """Caches key -> dictionary lookups, supporting caching partial dicts, i.e. fetching a subset of dictionary keys for a particular key. """ @@ -53,7 +53,7 @@ class DictionaryCache(object): self.thread = None # caches_by_name[name] = self.cache - class Sentinel(object): + class Sentinel: __slots__ = [] self.sentinel = Sentinel() diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 89a3420f92..e15f7ee698 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) SENTINEL = object() -class ExpiringCache(object): +class ExpiringCache: def __init__( self, cache_name, @@ -190,7 +190,7 @@ class ExpiringCache(object): return False -class _CacheEntry(object): +class _CacheEntry: __slots__ = ["time", "value"] def __init__(self, time, value): diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index df4ea5901d..4bc1a67b58 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -30,7 +30,7 @@ def enumerate_leaves(node, depth): yield m -class _Node(object): +class _Node: __slots__ = ["prev_node", "next_node", "key", "value", "callbacks"] def __init__(self, prev_node, next_node, key, value, callbacks=set()): @@ -41,7 +41,7 @@ class _Node(object): self.callbacks = callbacks -class LruCache(object): +class LruCache: """ Least-recently-used cache. Supports del_multi only if cache_type=TreeCache diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index a6c60888e5..df1a721add 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -23,7 +23,7 @@ from synapse.util.caches import register_cache logger = logging.getLogger(__name__) -class ResponseCache(object): +class ResponseCache: """ This caches a deferred response. Until the deferred completes it will be returned from the cache. This means that if the client retries the request diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index ecd9948e79..eb4d98f683 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -3,7 +3,7 @@ from typing import Dict SENTINEL = object() -class TreeCache(object): +class TreeCache: """ Tree-based backing store for LruCache. Allows subtrees of data to be deleted efficiently. @@ -89,7 +89,7 @@ def iterate_tree_cache_entry(d): yield d -class _Entry(object): +class _Entry: __slots__ = ["value"] def __init__(self, value): diff --git a/synapse/util/caches/ttlcache.py b/synapse/util/caches/ttlcache.py index 6437aa907e..3e180cafd3 100644 --- a/synapse/util/caches/ttlcache.py +++ b/synapse/util/caches/ttlcache.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) SENTINEL = object() -class TTLCache(object): +class TTLCache: """A key/value cache implementation where each entry has its own TTL""" def __init__(self, cache_name, timer=time.time): @@ -154,7 +154,7 @@ class TTLCache(object): @attr.s(frozen=True, slots=True) -class _CacheEntry(object): +class _CacheEntry: """TTLCache entry""" # expiry_time is the first attribute, so that entries are sorted by expiry. diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 22a857a306..a750261e77 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -34,7 +34,7 @@ def user_joined_room(distributor, user, room_id): distributor.fire("user_joined_room", user=user, room_id=room_id) -class Distributor(object): +class Distributor: """A central dispatch point for loosely-connected pieces of code to register, observe, and fire signals. @@ -103,7 +103,7 @@ def maybeAwaitableDeferred(f, *args, **kw): return succeed(result) -class Signal(object): +class Signal: """A Signal is a dispatch point that stores a list of callables as observers of it. diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 6a3f6177b1..733f5e26e6 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -20,7 +20,7 @@ from twisted.internet import threads from synapse.logging.context import make_deferred_yieldable, run_in_background -class BackgroundFileConsumer(object): +class BackgroundFileConsumer: """A consumer that writes to a file like object. Supports both push and pull producers diff --git a/synapse/util/jsonobject.py b/synapse/util/jsonobject.py index 6dce03dd3a..50516926f3 100644 --- a/synapse/util/jsonobject.py +++ b/synapse/util/jsonobject.py @@ -14,7 +14,7 @@ # limitations under the License. -class JsonEncodedObject(object): +class JsonEncodedObject: """ A common base class for defining protocol units that are represented as JSON. diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 13775b43f9..6e57c1ee72 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -93,7 +93,7 @@ def measure_func(name: Optional[str] = None) -> Callable[[T], T]: return wrapper -class Measure(object): +class Measure: __slots__ = [ "clock", "name", diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index e5efdfcd02..70d11e1ec3 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -29,7 +29,7 @@ from synapse.logging.context import ( logger = logging.getLogger(__name__) -class FederationRateLimiter(object): +class FederationRateLimiter: def __init__(self, clock, config): """ Args: @@ -60,7 +60,7 @@ class FederationRateLimiter(object): return self.ratelimiters[host].ratelimit() -class _PerHostRatelimiter(object): +class _PerHostRatelimiter: def __init__(self, clock, config): """ Args: diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 919988d3bc..79869aaa44 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -114,7 +114,7 @@ async def get_retry_limiter(destination, clock, store, ignore_backoff=False, **k ) -class RetryDestinationLimiter(object): +class RetryDestinationLimiter: def __init__( self, destination, diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py index 023beb5ede..be3b22469d 100644 --- a/synapse/util/wheel_timer.py +++ b/synapse/util/wheel_timer.py @@ -14,7 +14,7 @@ # limitations under the License. -class _Entry(object): +class _Entry: __slots__ = ["end_key", "queue"] def __init__(self, end_key): @@ -22,7 +22,7 @@ class _Entry(object): self.queue = [] -class WheelTimer(object): +class WheelTimer: """Stores arbitrary objects that will be returned after their timers have expired. """ diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py index 5d45689c8c..8ab56ec94c 100644 --- a/tests/api/test_auth.py +++ b/tests/api/test_auth.py @@ -36,7 +36,7 @@ from tests import unittest from tests.utils import mock_getRawHeaders, setup_test_homeserver -class TestHandlers(object): +class TestHandlers: def __init__(self, hs): self.auth_handler = synapse.handlers.auth.AuthHandler(hs) diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index d264653e74..2e6e7abf1f 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -43,7 +43,7 @@ from tests import unittest from tests.test_utils import make_awaitable -class MockPerspectiveServer(object): +class MockPerspectiveServer: def __init__(self): self.server_name = "mock_server" self.key = signedjson.key.generate_signing_key(0) diff --git a/tests/federation/transport/test_server.py b/tests/federation/transport/test_server.py index 27d83bb7d9..72e22d655f 100644 --- a/tests/federation/transport/test_server.py +++ b/tests/federation/transport/test_server.py @@ -26,7 +26,7 @@ from tests.unittest import override_config class RoomDirectoryFederationTests(unittest.HomeserverTestCase): def prepare(self, reactor, clock, homeserver): - class Authenticator(object): + class Authenticator: def authenticate_request(self, request, content): return defer.succeed("otherserver.nottld") diff --git a/tests/handlers/test_auth.py b/tests/handlers/test_auth.py index 4b3fb018b1..c7efd3822d 100644 --- a/tests/handlers/test_auth.py +++ b/tests/handlers/test_auth.py @@ -28,7 +28,7 @@ from tests.test_utils import make_awaitable from tests.utils import setup_test_homeserver -class AuthHandlers(object): +class AuthHandlers: def __init__(self, hs): self.auth_handler = AuthHandler(hs) diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 60ebc95f3e..8e95e53d9e 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -28,7 +28,7 @@ from tests.test_utils import make_awaitable from tests.utils import setup_test_homeserver -class ProfileHandlers(object): +class ProfileHandlers: def __init__(self, hs): self.profile_handler = MasterProfileHandler(hs) diff --git a/tests/http/__init__.py b/tests/http/__init__.py index 2096ba3c91..5d41443293 100644 --- a/tests/http/__init__.py +++ b/tests/http/__init__.py @@ -133,7 +133,7 @@ def create_test_cert_file(sanlist): @implementer(IOpenSSLServerConnectionCreator) -class TestServerTLSConnectionFactory(object): +class TestServerTLSConnectionFactory: """An SSL connection creator which returns connections which present a certificate signed by our test CA.""" diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py index eb78ab412a..8b5ad4574f 100644 --- a/tests/http/federation/test_matrix_federation_agent.py +++ b/tests/http/federation/test_matrix_federation_agent.py @@ -1264,7 +1264,7 @@ def _log_request(request): @implementer(IPolicyForHTTPS) -class TrustingTLSPolicyForHTTPS(object): +class TrustingTLSPolicyForHTTPS: """An IPolicyForHTTPS which checks that the certificate belongs to the right server, but doesn't check the certificate chain.""" diff --git a/tests/logging/test_structured.py b/tests/logging/test_structured.py index 451d05c0f0..d36f5f426c 100644 --- a/tests/logging/test_structured.py +++ b/tests/logging/test_structured.py @@ -29,12 +29,12 @@ from synapse.logging.context import LoggingContext from tests.unittest import DEBUG, HomeserverTestCase -class FakeBeginner(object): +class FakeBeginner: def beginLoggingTo(self, observers, **kwargs): self.observers = observers -class StructuredLoggingTestBase(object): +class StructuredLoggingTestBase: """ Test base that registers a cleanup handler to reset the stdlib log handler to 'unset'. diff --git a/tests/push/test_email.py b/tests/push/test_email.py index 227b0d32d0..3224568640 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -27,7 +27,7 @@ from tests.unittest import HomeserverTestCase @attr.s -class _User(object): +class _User: "Helper wrapper for user ID and access token" id = attr.ib() token = attr.ib() diff --git a/tests/rest/client/third_party_rules.py b/tests/rest/client/third_party_rules.py index 7167fc56b6..8c24add530 100644 --- a/tests/rest/client/third_party_rules.py +++ b/tests/rest/client/third_party_rules.py @@ -19,7 +19,7 @@ from synapse.rest.client.v1 import login, room from tests import unittest -class ThirdPartyRulesTestModule(object): +class ThirdPartyRulesTestModule: def __init__(self, config): pass diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py index e66c9a4c4c..afaf9f7b85 100644 --- a/tests/rest/client/v1/utils.py +++ b/tests/rest/client/v1/utils.py @@ -30,7 +30,7 @@ from tests.server import make_request, render @attr.s -class RestHelper(object): +class RestHelper: """Contains extra helper functions to quickly and clearly perform a given REST action, which isn't the focus of the test. """ diff --git a/tests/rest/media/v1/test_url_preview.py b/tests/rest/media/v1/test_url_preview.py index 74765a582b..c00a7b9114 100644 --- a/tests/rest/media/v1/test_url_preview.py +++ b/tests/rest/media/v1/test_url_preview.py @@ -32,7 +32,7 @@ from tests.server import FakeTransport @attr.s -class FakeResponse(object): +class FakeResponse: version = attr.ib() code = attr.ib() phrase = attr.ib() @@ -43,7 +43,7 @@ class FakeResponse(object): @property def request(self): @attr.s - class FakeTransport(object): + class FakeTransport: absoluteURI = self.absoluteURI return FakeTransport() @@ -111,7 +111,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): self.lookups = {} - class Resolver(object): + class Resolver: def resolveHostName( _self, resolutionReceiver, diff --git a/tests/server.py b/tests/server.py index b6e0b14e78..48e45c6c8b 100644 --- a/tests/server.py +++ b/tests/server.py @@ -35,7 +35,7 @@ class TimedOutException(Exception): @attr.s -class FakeChannel(object): +class FakeChannel: """ A fake Twisted Web Channel (the part that interfaces with the wire). @@ -242,7 +242,7 @@ class ThreadedMemoryReactorClock(MemoryReactorClock): lookups = self.lookups = {} @implementer(IResolverSimple) - class FakeResolver(object): + class FakeResolver: def getHostByName(self, name, timeout=None): if name not in lookups: return fail(DNSLookupError("OH NO: unknown %s" % (name,))) @@ -371,7 +371,7 @@ def get_clock(): @attr.s(cmp=False) -class FakeTransport(object): +class FakeTransport: """ A twisted.internet.interfaces.ITransport implementation which sends all its data straight into an IProtocol object: it exists to connect two IProtocols together. diff --git a/tests/state/test_v2.py b/tests/state/test_v2.py index f2955a9c69..ad9bbef9d2 100644 --- a/tests/state/test_v2.py +++ b/tests/state/test_v2.py @@ -49,7 +49,7 @@ class FakeClock: return defer.succeed(None) -class FakeEvent(object): +class FakeEvent: """A fake event we use as a convenience. NOTE: Again as a convenience we use "node_ids" rather than event_ids to @@ -595,7 +595,7 @@ def pairwise(iterable): @attr.s -class TestStateResolutionStore(object): +class TestStateResolutionStore: event_map = attr.ib() def get_events(self, event_ids, allow_rejected=False): diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index 319e2c2325..f5afed017c 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -99,7 +99,7 @@ class CacheTestCase(unittest.HomeserverTestCase): class CacheDecoratorTestCase(unittest.HomeserverTestCase): @defer.inlineCallbacks def test_passthrough(self): - class A(object): + class A: @cached() def func(self, key): return key @@ -113,7 +113,7 @@ class CacheDecoratorTestCase(unittest.HomeserverTestCase): def test_hit(self): callcount = [0] - class A(object): + class A: @cached() def func(self, key): callcount[0] += 1 @@ -131,7 +131,7 @@ class CacheDecoratorTestCase(unittest.HomeserverTestCase): def test_invalidate(self): callcount = [0] - class A(object): + class A: @cached() def func(self, key): callcount[0] += 1 @@ -149,7 +149,7 @@ class CacheDecoratorTestCase(unittest.HomeserverTestCase): self.assertEquals(callcount[0], 2) def test_invalidate_missing(self): - class A(object): + class A: @cached() def func(self, key): return key @@ -160,7 +160,7 @@ class CacheDecoratorTestCase(unittest.HomeserverTestCase): def test_max_entries(self): callcount = [0] - class A(object): + class A: @cached(max_entries=10) def func(self, key): callcount[0] += 1 @@ -187,7 +187,7 @@ class CacheDecoratorTestCase(unittest.HomeserverTestCase): d = defer.succeed(123) - class A(object): + class A: @cached() def func(self, key): callcount[0] += 1 @@ -205,7 +205,7 @@ class CacheDecoratorTestCase(unittest.HomeserverTestCase): callcount = [0] callcount2 = [0] - class A(object): + class A: @cached() def func(self, key): callcount[0] += 1 @@ -238,7 +238,7 @@ class CacheDecoratorTestCase(unittest.HomeserverTestCase): callcount = [0] callcount2 = [0] - class A(object): + class A: @cached(max_entries=2) def func(self, key): callcount[0] += 1 @@ -275,7 +275,7 @@ class CacheDecoratorTestCase(unittest.HomeserverTestCase): callcount = [0] callcount2 = [0] - class A(object): + class A: @cached() def func(self, key): callcount[0] += 1 diff --git a/tests/test_state.py b/tests/test_state.py index 56ba0fecf5..2d58467932 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -71,7 +71,7 @@ def create_event( return event -class StateGroupStore(object): +class StateGroupStore: def __init__(self): self._event_to_state_group = {} self._group_to_state = {} @@ -129,7 +129,7 @@ class DictObj(dict): self.__dict__ = self -class Graph(object): +class Graph: def __init__(self, nodes, edges): events = {} clobbered = set(events.keys()) diff --git a/tests/test_visibility.py b/tests/test_visibility.py index 4a4483ba12..510b630114 100644 --- a/tests/test_visibility.py +++ b/tests/test_visibility.py @@ -294,7 +294,7 @@ class FilterEventsForServerTestCase(tests.unittest.TestCase): test_large_room.skip = "Disabled by default because it's slow" -class _TestStore(object): +class _TestStore: """Implements a few methods of the DataStore, so that we can test filter_events_for_server diff --git a/tests/unittest.py b/tests/unittest.py index 7b80999a74..3cb55a7e96 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -614,7 +614,7 @@ class FederatingHomeserverTestCase(HomeserverTestCase): """ def prepare(self, reactor, clock, homeserver): - class Authenticator(object): + class Authenticator: def authenticate_request(self, request, content): return succeed("other.example.com") diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 0363735d4f..677e925477 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -88,7 +88,7 @@ class CacheTestCase(unittest.TestCase): class DescriptorTestCase(unittest.TestCase): @defer.inlineCallbacks def test_cache(self): - class Cls(object): + class Cls: def __init__(self): self.mock = mock.Mock() @@ -122,7 +122,7 @@ class DescriptorTestCase(unittest.TestCase): def test_cache_num_args(self): """Only the first num_args arguments should matter to the cache""" - class Cls(object): + class Cls: def __init__(self): self.mock = mock.Mock() @@ -156,7 +156,7 @@ class DescriptorTestCase(unittest.TestCase): """If the wrapped function throws synchronously, things should continue to work """ - class Cls(object): + class Cls: @cached() def fn(self, arg1): raise SynapseError(100, "mai spoon iz too big!!1") @@ -180,7 +180,7 @@ class DescriptorTestCase(unittest.TestCase): complete_lookup = defer.Deferred() - class Cls(object): + class Cls: @descriptors.cached() def fn(self, arg1): @defer.inlineCallbacks @@ -223,7 +223,7 @@ class DescriptorTestCase(unittest.TestCase): """Check that the cache sets and restores logcontexts correctly when the lookup function throws an exception""" - class Cls(object): + class Cls: @descriptors.cached() def fn(self, arg1): @defer.inlineCallbacks @@ -263,7 +263,7 @@ class DescriptorTestCase(unittest.TestCase): @defer.inlineCallbacks def test_cache_default_args(self): - class Cls(object): + class Cls: def __init__(self): self.mock = mock.Mock() @@ -300,7 +300,7 @@ class DescriptorTestCase(unittest.TestCase): obj.mock.assert_not_called() def test_cache_iterable(self): - class Cls(object): + class Cls: def __init__(self): self.mock = mock.Mock() @@ -336,7 +336,7 @@ class DescriptorTestCase(unittest.TestCase): """If the wrapped function throws synchronously, things should continue to work """ - class Cls(object): + class Cls: @descriptors.cached(iterable=True) def fn(self, arg1): raise SynapseError(100, "mai spoon iz too big!!1") @@ -358,7 +358,7 @@ class DescriptorTestCase(unittest.TestCase): class CachedListDescriptorTestCase(unittest.TestCase): @defer.inlineCallbacks def test_cache(self): - class Cls(object): + class Cls: def __init__(self): self.mock = mock.Mock() @@ -408,7 +408,7 @@ class CachedListDescriptorTestCase(unittest.TestCase): def test_invalidate(self): """Make sure that invalidation callbacks are called.""" - class Cls(object): + class Cls: def __init__(self): self.mock = mock.Mock() diff --git a/tests/util/test_file_consumer.py b/tests/util/test_file_consumer.py index 8d6627ec33..2012263184 100644 --- a/tests/util/test_file_consumer.py +++ b/tests/util/test_file_consumer.py @@ -112,7 +112,7 @@ class FileConsumerTests(unittest.TestCase): self.assertTrue(string_file.closed) -class DummyPullProducer(object): +class DummyPullProducer: def __init__(self): self.consumer = None self.deferred = defer.Deferred() @@ -134,7 +134,7 @@ class DummyPullProducer(object): return d -class BlockingStringWrite(object): +class BlockingStringWrite: def __init__(self): self.buffer = "" self.closed = False diff --git a/tests/utils.py b/tests/utils.py index a61cbdef44..4673872f88 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -472,7 +472,7 @@ class MockHttpResource(HttpServer): self.callbacks.append((method, path_pattern, callback)) -class MockKey(object): +class MockKey: alg = "mock_alg" version = "mock_version" signature = b"\x9a\x87$" @@ -491,7 +491,7 @@ class MockKey(object): return b"" -class MockClock(object): +class MockClock: now = 1000 def __init__(self): @@ -568,7 +568,7 @@ def _format_call(args, kwargs): ) -class DeferredMockCallable(object): +class DeferredMockCallable: """A callable instance that stores a set of pending call expectations and return values for them. It allows a unit test to assert that the given set of function calls are eventually made, by awaiting on them to be called. -- cgit 1.5.1