From 5859a5c569c03f3b7c578fe4dbf2274e37af03bb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 18 Oct 2019 07:42:26 +0200 Subject: Fix presence timeouts when synchrotron restarts. (#6212) * Fix presence timeouts when synchrotron restarts. Handling timeouts would fail if there was an external process that had timed out, e.g. a synchrotron restarting. This was due to a couple of variable name typoes. Fixes #3715. --- tests/handlers/test_presence.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) (limited to 'tests/handlers/test_presence.py') diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index f70c6e7d65..d4293b4312 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -22,6 +22,7 @@ from synapse.api.constants import EventTypes, Membership, PresenceState from synapse.events import room_version_to_event_format from synapse.events.builder import EventBuilder from synapse.handlers.presence import ( + EXTERNAL_PROCESS_EXPIRY, FEDERATION_PING_INTERVAL, FEDERATION_TIMEOUT, IDLE_TIMER, @@ -413,6 +414,44 @@ class PresenceTimeoutTestCase(unittest.TestCase): self.assertEquals(state, new_state) +class PresenceHandlerTestCase(unittest.HomeserverTestCase): + def prepare(self, reactor, clock, hs): + self.presence_handler = hs.get_presence_handler() + self.clock = hs.get_clock() + + def test_external_process_timeout(self): + """Test that if an external process doesn't update the records for a while + we time out their syncing users presence. + """ + process_id = 1 + user_id = "@test:server" + + # Notify handler that a user is now syncing. + self.get_success( + self.presence_handler.update_external_syncs_row( + process_id, user_id, True, self.clock.time_msec() + ) + ) + + # Check that if we wait a while without telling the handler the user has + # stopped syncing that their presence state doesn't get timed out. + self.reactor.advance(EXTERNAL_PROCESS_EXPIRY / 2) + + state = self.get_success( + self.presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, PresenceState.ONLINE) + + # Check that if the external process timeout fires, then the syncing + # user gets timed out + self.reactor.advance(EXTERNAL_PROCESS_EXPIRY) + + state = self.get_success( + self.presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, PresenceState.OFFLINE) + + class PresenceJoinTestCase(unittest.HomeserverTestCase): """Tests remote servers get told about presence of users in the room when they join and when new local users join. -- cgit 1.5.1 From ef6bdafb29abe14cb40f6b83a46e70d82cd3e041 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 29 Jan 2020 17:55:48 +0000 Subject: Store the room version in EventBuilder --- synapse/events/builder.py | 12 +++++++----- tests/handlers/test_presence.py | 4 ++-- 2 files changed, 9 insertions(+), 7 deletions(-) (limited to 'tests/handlers/test_presence.py') diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 3997751337..291fb38a26 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -23,6 +23,7 @@ from synapse.api.room_versions import ( KNOWN_EVENT_FORMAT_VERSIONS, KNOWN_ROOM_VERSIONS, EventFormatVersions, + RoomVersion, ) from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.types import EventID @@ -40,7 +41,7 @@ class EventBuilder(object): content/unsigned/internal_metadata fields are still mutable) Attributes: - format_version (int): Event format version + room_version: Version of the target room room_id (str) type (str) sender (str) @@ -63,7 +64,7 @@ class EventBuilder(object): _hostname = attr.ib() _signing_key = attr.ib() - format_version = attr.ib() + room_version = attr.ib(type=RoomVersion) room_id = attr.ib() type = attr.ib() @@ -108,7 +109,8 @@ class EventBuilder(object): ) auth_ids = yield self._auth.compute_auth_events(self, state_ids) - if self.format_version == EventFormatVersions.V1: + format_version = self.room_version.event_format + if format_version == EventFormatVersions.V1: auth_events = yield self._store.add_event_hashes(auth_ids) prev_events = yield self._store.add_event_hashes(prev_event_ids) else: @@ -148,7 +150,7 @@ class EventBuilder(object): clock=self._clock, hostname=self._hostname, signing_key=self._signing_key, - format_version=self.format_version, + format_version=format_version, event_dict=event_dict, internal_metadata_dict=self.internal_metadata.get_dict(), ) @@ -201,7 +203,7 @@ class EventBuilderFactory(object): clock=self.clock, hostname=self.hostname, signing_key=self.signing_key, - format_version=room_version.event_format, + room_version=room_version, type=key_values["type"], state_key=key_values.get("state_key"), room_id=key_values["room_id"], diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index d4293b4312..69914428e2 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -19,7 +19,7 @@ from mock import Mock, call from signedjson.key import generate_signing_key from synapse.api.constants import EventTypes, Membership, PresenceState -from synapse.events import room_version_to_event_format +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events.builder import EventBuilder from synapse.handlers.presence import ( EXTERNAL_PROCESS_EXPIRY, @@ -597,7 +597,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): clock=self.clock, hostname=hostname, signing_key=self.random_signing_key, - format_version=room_version_to_event_format(room_version), + room_version=KNOWN_ROOM_VERSIONS[room_version], room_id=room_id, type=EventTypes.Member, sender=user_id, -- cgit 1.5.1 From d7bf793cc1e3f5268285286341835ac54753eff6 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 31 Jan 2020 10:06:21 +0000 Subject: s/get_room_version/get_room_version_id/ ... to make way for a forthcoming get_room_version which returns a RoomVersion object. --- synapse/federation/federation_client.py | 10 +++++----- synapse/federation/federation_server.py | 16 ++++++++-------- synapse/handlers/federation.py | 18 +++++++++--------- synapse/handlers/message.py | 8 +++++--- synapse/handlers/pagination.py | 2 +- synapse/handlers/room.py | 2 +- synapse/state/__init__.py | 2 +- synapse/storage/data_stores/main/state.py | 2 +- synapse/storage/persist_events.py | 2 +- tests/handlers/test_presence.py | 2 +- tests/test_state.py | 2 +- tests/unittest.py | 4 +++- 12 files changed, 37 insertions(+), 33 deletions(-) (limited to 'tests/handlers/test_presence.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index d57e8ca7a2..4ac3d81cba 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -198,7 +198,7 @@ class FederationClient(FederationBase): logger.debug("backfill transaction_data=%r", transaction_data) - room_version = yield self.store.get_room_version(room_id) + room_version = yield self.store.get_room_version_id(room_id) format_ver = room_version_to_event_format(room_version) pdus = [ @@ -336,7 +336,7 @@ class FederationClient(FederationBase): def get_event_auth(self, destination, room_id, event_id): res = yield self.transport_layer.get_event_auth(destination, room_id, event_id) - room_version = yield self.store.get_room_version(room_id) + room_version = yield self.store.get_room_version_id(room_id) format_ver = room_version_to_event_format(room_version) auth_chain = [ @@ -649,7 +649,7 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def send_invite(self, destination, room_id, event_id, pdu): - room_version = yield self.store.get_room_version(room_id) + room_version = yield self.store.get_room_version_id(room_id) content = yield self._do_send_invite(destination, pdu, room_version) @@ -657,7 +657,7 @@ class FederationClient(FederationBase): logger.debug("Got response to send_invite: %s", pdu_dict) - room_version = yield self.store.get_room_version(room_id) + room_version = yield self.store.get_room_version_id(room_id) format_ver = room_version_to_event_format(room_version) pdu = event_from_pdu_json(pdu_dict, format_ver) @@ -859,7 +859,7 @@ class FederationClient(FederationBase): timeout=timeout, ) - room_version = yield self.store.get_room_version(room_id) + room_version = yield self.store.get_room_version_id(room_id) format_ver = room_version_to_event_format(room_version) events = [ diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9562faa3ee..a4c97ed458 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -234,7 +234,7 @@ class FederationServer(FederationBase): continue try: - room_version = await self.store.get_room_version(room_id) + room_version = await self.store.get_room_version_id(room_id) except NotFoundError: logger.info("Ignoring PDU for unknown room_id: %s", room_id) continue @@ -334,7 +334,7 @@ class FederationServer(FederationBase): ) ) - room_version = await self.store.get_room_version(room_id) + room_version = await self.store.get_room_version_id(room_id) resp["room_version"] = room_version return 200, resp @@ -385,7 +385,7 @@ class FederationServer(FederationBase): origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) - room_version = await self.store.get_room_version(room_id) + room_version = await self.store.get_room_version_id(room_id) if room_version not in supported_versions: logger.warning( "Room version %s not in %s", room_version, supported_versions @@ -417,7 +417,7 @@ class FederationServer(FederationBase): async def on_send_join_request(self, origin, content, room_id): logger.debug("on_send_join_request: content: %s", content) - room_version = await self.store.get_room_version(room_id) + room_version = await self.store.get_room_version_id(room_id) format_ver = room_version_to_event_format(room_version) pdu = event_from_pdu_json(content, format_ver) @@ -440,7 +440,7 @@ class FederationServer(FederationBase): await self.check_server_matches_acl(origin_host, room_id) pdu = await self.handler.on_make_leave_request(origin, room_id, user_id) - room_version = await self.store.get_room_version(room_id) + room_version = await self.store.get_room_version_id(room_id) time_now = self._clock.time_msec() return {"event": pdu.get_pdu_json(time_now), "room_version": room_version} @@ -448,7 +448,7 @@ class FederationServer(FederationBase): async def on_send_leave_request(self, origin, content, room_id): logger.debug("on_send_leave_request: content: %s", content) - room_version = await self.store.get_room_version(room_id) + room_version = await self.store.get_room_version_id(room_id) format_ver = room_version_to_event_format(room_version) pdu = event_from_pdu_json(content, format_ver) @@ -495,7 +495,7 @@ class FederationServer(FederationBase): origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) - room_version = await self.store.get_room_version(room_id) + room_version = await self.store.get_room_version_id(room_id) format_ver = room_version_to_event_format(room_version) auth_chain = [ @@ -664,7 +664,7 @@ class FederationServer(FederationBase): logger.info("Accepting join PDU %s from %s", pdu.event_id, origin) # We've already checked that we know the room version by this point - room_version = await self.store.get_room_version(pdu.room_id) + room_version = await self.store.get_room_version_id(pdu.room_id) # Check signature. try: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 01372f6d47..30c720f093 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -388,7 +388,7 @@ class FederationHandler(BaseHandler): for x in remote_state: event_map[x.event_id] = x - room_version = await self.store.get_room_version(room_id) + room_version = await self.store.get_room_version_id(room_id) state_map = await resolve_events_with_store( room_id, room_version, @@ -1110,7 +1110,7 @@ class FederationHandler(BaseHandler): Logs a warning if we can't find the given event. """ - room_version = await self.store.get_room_version(room_id) + room_version = await self.store.get_room_version_id(room_id) event_infos = [] @@ -1373,7 +1373,7 @@ class FederationHandler(BaseHandler): event_content = {"membership": Membership.JOIN} - room_version = yield self.store.get_room_version(room_id) + room_version = yield self.store.get_room_version_id(room_id) builder = self.event_builder_factory.new( room_version, @@ -1607,7 +1607,7 @@ class FederationHandler(BaseHandler): ) raise SynapseError(403, "User not from origin", Codes.FORBIDDEN) - room_version = yield self.store.get_room_version(room_id) + room_version = yield self.store.get_room_version_id(room_id) builder = self.event_builder_factory.new( room_version, { @@ -2055,7 +2055,7 @@ class FederationHandler(BaseHandler): do_soft_fail_check = False if do_soft_fail_check: - room_version = yield self.store.get_room_version(event.room_id) + room_version = yield self.store.get_room_version_id(event.room_id) room_version_obj = KNOWN_ROOM_VERSIONS[room_version] # Calculate the "current state". @@ -2191,7 +2191,7 @@ class FederationHandler(BaseHandler): Returns: defer.Deferred[EventContext]: updated context object """ - room_version = yield self.store.get_room_version(event.room_id) + room_version = yield self.store.get_room_version_id(event.room_id) room_version_obj = KNOWN_ROOM_VERSIONS[room_version] try: @@ -2363,7 +2363,7 @@ class FederationHandler(BaseHandler): remote_auth_events.update({(d.type, d.state_key): d for d in different_events}) remote_state = remote_auth_events.values() - room_version = yield self.store.get_room_version(event.room_id) + room_version = yield self.store.get_room_version_id(event.room_id) new_state = yield self.state_handler.resolve_events( room_version, (local_state, remote_state), event ) @@ -2587,7 +2587,7 @@ class FederationHandler(BaseHandler): } if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): - room_version = yield self.store.get_room_version(room_id) + room_version = yield self.store.get_room_version_id(room_id) builder = self.event_builder_factory.new(room_version, event_dict) EventValidator().validate_builder(builder) @@ -2650,7 +2650,7 @@ class FederationHandler(BaseHandler): Returns: Deferred: resolves (to None) """ - room_version = yield self.store.get_room_version(room_id) + room_version = yield self.store.get_room_version_id(room_id) # NB: event_dict has a particular specced format we might need to fudge # if we change event formats too much. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 9a0f661b9b..bdf16c84d3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -459,7 +459,9 @@ class EventCreationHandler(object): room_version = event_dict["content"]["room_version"] else: try: - room_version = yield self.store.get_room_version(event_dict["room_id"]) + room_version = yield self.store.get_room_version_id( + event_dict["room_id"] + ) except NotFoundError: raise AuthError(403, "Unknown room") @@ -788,7 +790,7 @@ class EventCreationHandler(object): ): room_version = event.content.get("room_version", RoomVersions.V1.identifier) else: - room_version = yield self.store.get_room_version(event.room_id) + room_version = yield self.store.get_room_version_id(event.room_id) event_allowed = yield self.third_party_event_rules.check_event_allowed( event, context @@ -963,7 +965,7 @@ class EventCreationHandler(object): auth_events = yield self.store.get_events(auth_events_ids) auth_events = {(e.type, e.state_key): e for e in auth_events.values()} - room_version = yield self.store.get_room_version(event.room_id) + room_version = yield self.store.get_room_version_id(event.room_id) room_version_obj = KNOWN_ROOM_VERSIONS[room_version] if event_auth.check_redaction( diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 71d76202c9..caf841a643 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -281,7 +281,7 @@ class PaginationHandler(object): """Purge the given room from the database""" with (await self.pagination_lock.write(room_id)): # check we know about the room - await self.store.get_room_version(room_id) + await self.store.get_room_version_id(room_id) # first check that we have no users in this room joined = await defer.maybeDeferred( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a95b45d791..1382399557 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -178,7 +178,7 @@ class RoomCreationHandler(BaseHandler): }, token_id=requester.access_token_id, ) - old_room_version = yield self.store.get_room_version(old_room_id) + old_room_version = yield self.store.get_room_version_id(old_room_id) yield self.auth.check_from_context( old_room_version, tombstone_event, tombstone_context ) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index cacd0c0c2b..fdd6bef6b4 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -394,7 +394,7 @@ class StateHandler(object): delta_ids=delta_ids, ) - room_version = yield self.store.get_room_version(room_id) + room_version = yield self.store.get_room_version_id(room_id) result = yield self._state_resolution_handler.resolve_state_groups( room_id, diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py index 4167f83c9b..6700942523 100644 --- a/synapse/storage/data_stores/main/state.py +++ b/synapse/storage/data_stores/main/state.py @@ -62,7 +62,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): super(StateGroupWorkerStore, self).__init__(database, db_conn, hs) @cached(max_entries=10000) - async def get_room_version(self, room_id: str) -> str: + async def get_room_version_id(self, room_id: str) -> str: """Get the room_version of a given room Raises: diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 86166fd4c1..af3fd67ab9 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -661,7 +661,7 @@ class EventsPersistenceStorage(object): break if not room_version: - room_version = await self.main_store.get_room_version(room_id) + room_version = await self.main_store.get_room_version_id(room_id) logger.debug("calling resolve_state_groups from preserve_events") res = await self._state_resolution_handler.resolve_state_groups( diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index d4293b4312..e92e090c3c 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -588,7 +588,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): hostname = get_domain_from_id(user_id) - room_version = self.get_success(self.store.get_room_version(room_id)) + room_version = self.get_success(self.store.get_room_version_id(room_id)) builder = EventBuilder( state=self.state, diff --git a/tests/test_state.py b/tests/test_state.py index e0aae06be4..1e4449fa1c 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -119,7 +119,7 @@ class StateGroupStore(object): def register_event_id_state_group(self, event_id, state_group): self._event_to_state_group[event_id] = state_group - def get_room_version(self, room_id): + def get_room_version_id(self, room_id): return RoomVersions.V1.identifier diff --git a/tests/unittest.py b/tests/unittest.py index b56e249386..98bf27d39c 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -589,7 +589,9 @@ class HomeserverTestCase(TestCase): event_builder_factory = self.hs.get_event_builder_factory() event_creation_handler = self.hs.get_event_creation_handler() - room_version = self.get_success(self.hs.get_datastore().get_room_version(room)) + room_version = self.get_success( + self.hs.get_datastore().get_room_version_id(room) + ) builder = event_builder_factory.for_room_version( KNOWN_ROOM_VERSIONS[room_version], -- cgit 1.5.1 From 509e381afa8c656e72f5fef3d651a9819794174a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 21 Feb 2020 07:15:07 -0500 Subject: Clarify list/set/dict/tuple comprehensions and enforce via flake8 (#6957) Ensure good comprehension hygiene using flake8-comprehensions. --- CONTRIBUTING.md | 2 +- changelog.d/6957.misc | 1 + docs/code_style.md | 2 +- scripts-dev/convert_server_keys.py | 2 +- synapse/app/_base.py | 2 +- synapse/app/federation_sender.py | 4 +-- synapse/app/pusher.py | 2 +- synapse/config/server.py | 4 +-- synapse/config/tls.py | 2 +- synapse/crypto/keyring.py | 6 ++-- synapse/federation/send_queue.py | 4 +-- synapse/groups/groups_server.py | 2 +- synapse/handlers/device.py | 2 +- synapse/handlers/directory.py | 4 +-- synapse/handlers/federation.py | 18 +++++----- synapse/handlers/presence.py | 6 ++-- synapse/handlers/receipts.py | 2 +- synapse/handlers/room.py | 2 +- synapse/handlers/search.py | 8 ++--- synapse/handlers/sync.py | 22 ++++++------ synapse/handlers/typing.py | 4 +-- synapse/logging/utils.py | 2 +- synapse/metrics/__init__.py | 2 +- synapse/metrics/background_process_metrics.py | 4 +-- synapse/push/bulk_push_rule_evaluator.py | 8 ++--- synapse/push/emailpusher.py | 2 +- synapse/push/mailer.py | 20 +++++------ synapse/push/pusherpool.py | 2 +- synapse/rest/admin/_base.py | 4 +-- synapse/rest/client/v1/push_rule.py | 6 ++-- synapse/rest/client/v1/pusher.py | 4 +-- synapse/rest/client/v2_alpha/sync.py | 2 +- synapse/rest/key/v2/remote_key_resource.py | 2 +- synapse/rest/media/v1/_base.py | 40 ++++++++++------------ synapse/state/v1.py | 10 +++--- synapse/state/v2.py | 8 ++--- synapse/storage/_base.py | 2 +- synapse/storage/background_updates.py | 2 +- synapse/storage/data_stores/main/appservice.py | 14 ++++---- synapse/storage/data_stores/main/client_ips.py | 4 +-- synapse/storage/data_stores/main/devices.py | 13 ++++--- .../storage/data_stores/main/event_federation.py | 2 +- synapse/storage/data_stores/main/events.py | 8 ++--- .../storage/data_stores/main/events_bg_updates.py | 2 +- synapse/storage/data_stores/main/events_worker.py | 6 ++-- synapse/storage/data_stores/main/push_rule.py | 8 ++--- synapse/storage/data_stores/main/receipts.py | 4 +-- synapse/storage/data_stores/main/roommember.py | 4 +-- synapse/storage/data_stores/main/state.py | 8 ++--- synapse/storage/data_stores/main/stream.py | 8 ++--- .../storage/data_stores/main/user_erasure_store.py | 4 +-- synapse/storage/data_stores/state/store.py | 4 +-- synapse/storage/database.py | 4 +-- synapse/storage/persist_events.py | 8 ++--- synapse/storage/prepare_database.py | 6 ++-- synapse/util/frozenutils.py | 2 +- synapse/visibility.py | 4 +-- tests/config/test_generate.py | 2 +- tests/federation/test_federation_server.py | 2 +- tests/handlers/test_presence.py | 4 +-- tests/handlers/test_typing.py | 6 ++-- tests/handlers/test_user_directory.py | 12 +++---- tests/push/test_email.py | 6 ++-- tests/push/test_http.py | 8 ++--- tests/rest/client/v2_alpha/test_sync.py | 28 ++++++++------- tests/storage/test__base.py | 4 +-- tests/storage/test_appservice.py | 36 +++++++++---------- tests/storage/test_cleanup_extrems.py | 10 +++--- tests/storage/test_event_metrics.py | 36 +++++++++---------- tests/storage/test_state.py | 2 +- tests/test_state.py | 18 +++------- tests/util/test_stream_change_cache.py | 18 +++------- tox.ini | 1 + 73 files changed, 251 insertions(+), 276 deletions(-) create mode 100644 changelog.d/6957.misc (limited to 'tests/handlers/test_presence.py') diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 4b01b6ac8c..253a0ca648 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -60,7 +60,7 @@ python 3.6 and to install each tool: ``` # Install the dependencies -pip install -U black flake8 isort +pip install -U black flake8 flake8-comprehensions isort # Run the linter script ./scripts-dev/lint.sh diff --git a/changelog.d/6957.misc b/changelog.d/6957.misc new file mode 100644 index 0000000000..4f98030110 --- /dev/null +++ b/changelog.d/6957.misc @@ -0,0 +1 @@ +Use flake8-comprehensions to enforce good hygiene of list/set/dict comprehensions. diff --git a/docs/code_style.md b/docs/code_style.md index 71aecd41f7..6ef6f80290 100644 --- a/docs/code_style.md +++ b/docs/code_style.md @@ -30,7 +30,7 @@ The necessary tools are detailed below. Install `flake8` with: - pip install --upgrade flake8 + pip install --upgrade flake8 flake8-comprehensions Check all application and test code with: diff --git a/scripts-dev/convert_server_keys.py b/scripts-dev/convert_server_keys.py index 179be61c30..06b4c1e2ff 100644 --- a/scripts-dev/convert_server_keys.py +++ b/scripts-dev/convert_server_keys.py @@ -103,7 +103,7 @@ def main(): yaml.safe_dump(result, sys.stdout, default_flow_style=False) - rows = list(row for server, json in result.items() for row in rows_v2(server, json)) + rows = [row for server, json in result.items() for row in rows_v2(server, json)] cursor = connection.cursor() cursor.executemany( diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 109b1e2fb5..9ffd23c6df 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -141,7 +141,7 @@ def start_reactor( def quit_with_error(error_string): message_lines = error_string.split("\n") - line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2 + line_length = max(len(l) for l in message_lines if len(l) < 80) + 2 sys.stderr.write("*" * line_length + "\n") for line in message_lines: sys.stderr.write(" %s\n" % (line.rstrip(),)) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 63a91f1177..b7fcf80ddc 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -262,7 +262,7 @@ class FederationSenderHandler(object): # ... as well as device updates and messages elif stream_name == DeviceListsStream.NAME: - hosts = set(row.destination for row in rows) + hosts = {row.destination for row in rows} for host in hosts: self.federation_sender.send_device_messages(host) @@ -270,7 +270,7 @@ class FederationSenderHandler(object): # The to_device stream includes stuff to be pushed to both local # clients and remote servers, so we ignore entities that start with # '@' (since they'll be local users rather than destinations). - hosts = set(row.entity for row in rows if not row.entity.startswith("@")) + hosts = {row.entity for row in rows if not row.entity.startswith("@")} for host in hosts: self.federation_sender.send_device_messages(host) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index e46b6ac598..84e9f8d5e2 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -158,7 +158,7 @@ class PusherReplicationHandler(ReplicationClientHandler): yield self.pusher_pool.on_new_notifications(token, token) elif stream_name == "receipts": yield self.pusher_pool.on_new_receipts( - token, token, set(row.room_id for row in rows) + token, token, {row.room_id for row in rows} ) except Exception: logger.exception("Error poking pushers") diff --git a/synapse/config/server.py b/synapse/config/server.py index 0ec1b0fadd..7525765fee 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -1066,12 +1066,12 @@ KNOWN_RESOURCES = ( def _check_resource_config(listeners): - resource_names = set( + resource_names = { res_name for listener in listeners for res in listener.get("resources", []) for res_name in res.get("names", []) - ) + } for resource in resource_names: if resource not in KNOWN_RESOURCES: diff --git a/synapse/config/tls.py b/synapse/config/tls.py index 97a12d51f6..a65538562b 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -260,7 +260,7 @@ class TlsConfig(Config): crypto.FILETYPE_ASN1, self.tls_certificate ) sha256_fingerprint = encode_base64(sha256(x509_certificate_bytes).digest()) - sha256_fingerprints = set(f["sha256"] for f in self.tls_fingerprints) + sha256_fingerprints = {f["sha256"] for f in self.tls_fingerprints} if sha256_fingerprint not in sha256_fingerprints: self.tls_fingerprints.append({"sha256": sha256_fingerprint}) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 6fe5a6a26a..983f0ead8c 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -326,9 +326,7 @@ class Keyring(object): verify_requests (list[VerifyJsonRequest]): list of verify requests """ - remaining_requests = set( - (rq for rq in verify_requests if not rq.key_ready.called) - ) + remaining_requests = {rq for rq in verify_requests if not rq.key_ready.called} @defer.inlineCallbacks def do_iterations(): @@ -396,7 +394,7 @@ class Keyring(object): results = yield fetcher.get_keys(missing_keys) - completed = list() + completed = [] for verify_request in remaining_requests: server_name = verify_request.server_name diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 001bb304ae..876fb0e245 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -129,9 +129,9 @@ class FederationRemoteSendQueue(object): for key in keys[:i]: del self.presence_changed[key] - user_ids = set( + user_ids = { user_id for uids in self.presence_changed.values() for user_id in uids - ) + } keys = self.presence_destinations.keys() i = self.presence_destinations.bisect_left(position_to_delete) diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index c106abae21..4f0dc0a209 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -608,7 +608,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler): user_results = yield self.store.get_users_in_group( group_id, include_private=True ) - if user_id in [user_result["user_id"] for user_result in user_results]: + if user_id in (user_result["user_id"] for user_result in user_results): raise SynapseError(400, "User already in group") content = { diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 50cea3f378..a514c30714 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -742,6 +742,6 @@ class DeviceListUpdater(object): # We clobber the seen updates since we've re-synced from a given # point. - self._seen_updates[user_id] = set([stream_id]) + self._seen_updates[user_id] = {stream_id} defer.returnValue(result) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 921d887b24..0b23ca919a 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -72,7 +72,7 @@ class DirectoryHandler(BaseHandler): # TODO(erikj): Check if there is a current association. if not servers: users = yield self.state.get_current_users_in_room(room_id) - servers = set(get_domain_from_id(u) for u in users) + servers = {get_domain_from_id(u) for u in users} if not servers: raise SynapseError(400, "Failed to get server list") @@ -255,7 +255,7 @@ class DirectoryHandler(BaseHandler): ) users = yield self.state.get_current_users_in_room(room_id) - extra_servers = set(get_domain_from_id(u) for u in users) + extra_servers = {get_domain_from_id(u) for u in users} servers = set(extra_servers) | set(servers) # If this server is in the list of servers, return it first. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index eb20ef4aec..a689065f89 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -659,11 +659,11 @@ class FederationHandler(BaseHandler): # this can happen if a remote server claims that the state or # auth_events at an event in room A are actually events in room B - bad_events = list( + bad_events = [ (event_id, event.room_id) for event_id, event in fetched_events.items() if event.room_id != room_id - ) + ] for bad_event_id, bad_room_id in bad_events: # This is a bogus situation, but since we may only discover it a long time @@ -856,7 +856,7 @@ class FederationHandler(BaseHandler): # Don't bother processing events we already have. seen_events = await self.store.have_events_in_timeline( - set(e.event_id for e in events) + {e.event_id for e in events} ) events = [e for e in events if e.event_id not in seen_events] @@ -866,7 +866,7 @@ class FederationHandler(BaseHandler): event_map = {e.event_id: e for e in events} - event_ids = set(e.event_id for e in events) + event_ids = {e.event_id for e in events} # build a list of events whose prev_events weren't in the batch. # (XXX: this will include events whose prev_events we already have; that doesn't @@ -892,13 +892,13 @@ class FederationHandler(BaseHandler): state_events.update({s.event_id: s for s in state}) events_to_state[e_id] = state - required_auth = set( + required_auth = { a_id for event in events + list(state_events.values()) + list(auth_events.values()) for a_id in event.auth_event_ids() - ) + } auth_events.update( {e_id: event_map[e_id] for e_id in required_auth if e_id in event_map} ) @@ -1247,7 +1247,7 @@ class FederationHandler(BaseHandler): async def on_event_auth(self, event_id: str) -> List[EventBase]: event = await self.store.get_event(event_id) auth = await self.store.get_auth_chain( - [auth_id for auth_id in event.auth_event_ids()], include_given=True + list(event.auth_event_ids()), include_given=True ) return list(auth) @@ -2152,7 +2152,7 @@ class FederationHandler(BaseHandler): # Now get the current auth_chain for the event. local_auth_chain = await self.store.get_auth_chain( - [auth_id for auth_id in event.auth_event_ids()], include_given=True + list(event.auth_event_ids()), include_given=True ) # TODO: Check if we would now reject event_id. If so we need to tell @@ -2654,7 +2654,7 @@ class FederationHandler(BaseHandler): member_handler = self.hs.get_room_member_handler() yield member_handler.send_membership_event(None, event, context) else: - destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id)) + destinations = {x.split(":", 1)[-1] for x in (sender_user_id, room_id)} yield self.federation_client.forward_third_party_invite( destinations, room_id, event_dict ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 202aa9294f..0d6cf2b008 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -313,7 +313,7 @@ class PresenceHandler(object): notified_presence_counter.inc(len(to_notify)) yield self._persist_and_notify(list(to_notify.values())) - self.unpersisted_users_changes |= set(s.user_id for s in new_states) + self.unpersisted_users_changes |= {s.user_id for s in new_states} self.unpersisted_users_changes -= set(to_notify.keys()) to_federation_ping = { @@ -698,7 +698,7 @@ class PresenceHandler(object): updates = yield self.current_state_for_users(target_user_ids) updates = list(updates.values()) - for user_id in set(target_user_ids) - set(u.user_id for u in updates): + for user_id in set(target_user_ids) - {u.user_id for u in updates}: updates.append(UserPresenceState.default(user_id)) now = self.clock.time_msec() @@ -886,7 +886,7 @@ class PresenceHandler(object): hosts = yield self.state.get_current_hosts_in_room(room_id) # Filter out ourselves. - hosts = set(host for host in hosts if host != self.server_name) + hosts = {host for host in hosts if host != self.server_name} self.federation.send_presence_to_destinations( states=[state], destinations=hosts diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 9283c039e3..8bc100db42 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -94,7 +94,7 @@ class ReceiptsHandler(BaseHandler): # no new receipts return False - affected_room_ids = list(set([r.room_id for r in receipts])) + affected_room_ids = list({r.room_id for r in receipts}) self.notifier.on_new_event("receipt_key", max_batch_id, rooms=affected_room_ids) # Note that the min here shouldn't be relied upon to be accurate. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 76e8f61b74..8ee870f0bb 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -355,7 +355,7 @@ class RoomCreationHandler(BaseHandler): # If so, mark the new room as non-federatable as well creation_content["m.federate"] = False - initial_state = dict() + initial_state = {} # Replicate relevant room events types_to_copy = ( diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 110097eab9..ec1542d416 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -184,7 +184,7 @@ class SearchHandler(BaseHandler): membership_list=[Membership.JOIN], # membership_list=[Membership.JOIN, Membership.LEAVE, Membership.Ban], ) - room_ids = set(r.room_id for r in rooms) + room_ids = {r.room_id for r in rooms} # If doing a subset of all rooms seearch, check if any of the rooms # are from an upgraded room, and search their contents as well @@ -374,12 +374,12 @@ class SearchHandler(BaseHandler): ).to_string() if include_profile: - senders = set( + senders = { ev.sender for ev in itertools.chain( res["events_before"], [event], res["events_after"] ) - ) + } if res["events_after"]: last_event_id = res["events_after"][-1].event_id @@ -421,7 +421,7 @@ class SearchHandler(BaseHandler): state_results = {} if include_state: - rooms = set(e.room_id for e in allowed_events) + rooms = {e.room_id for e in allowed_events} for room_id in rooms: state = yield self.state_handler.get_current_state(room_id) state_results[room_id] = list(state.values()) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4324bc702e..669dbc8a48 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -682,11 +682,9 @@ class SyncHandler(object): # FIXME: order by stream ordering rather than as returned by SQL if joined_user_ids or invited_user_ids: - summary["m.heroes"] = sorted( - [user_id for user_id in (joined_user_ids + invited_user_ids)] - )[0:5] + summary["m.heroes"] = sorted(joined_user_ids + invited_user_ids)[0:5] else: - summary["m.heroes"] = sorted([user_id for user_id in gone_user_ids])[0:5] + summary["m.heroes"] = sorted(gone_user_ids)[0:5] if not sync_config.filter_collection.lazy_load_members(): return summary @@ -697,9 +695,9 @@ class SyncHandler(object): # track which members the client should already know about via LL: # Ones which are already in state... - existing_members = set( + existing_members = { user_id for (typ, user_id) in state.keys() if typ == EventTypes.Member - ) + } # ...or ones which are in the timeline... for ev in batch.events: @@ -773,10 +771,10 @@ class SyncHandler(object): # We only request state for the members needed to display the # timeline: - members_to_fetch = set( + members_to_fetch = { event.sender # FIXME: we also care about invite targets etc. for event in batch.events - ) + } if full_state: # always make sure we LL ourselves so we know we're in the room @@ -1993,10 +1991,10 @@ def _calculate_state( ) } - c_ids = set(e for e in itervalues(current)) - ts_ids = set(e for e in itervalues(timeline_start)) - p_ids = set(e for e in itervalues(previous)) - tc_ids = set(e for e in itervalues(timeline_contains)) + c_ids = set(itervalues(current)) + ts_ids = set(itervalues(timeline_start)) + p_ids = set(itervalues(previous)) + tc_ids = set(itervalues(timeline_contains)) # If we are lazyloading room members, we explicitly add the membership events # for the senders in the timeline into the state block returned by /sync, diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 5406618431..391bceb0c4 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -198,7 +198,7 @@ class TypingHandler(object): now=now, obj=member, then=now + FEDERATION_PING_INTERVAL ) - for domain in set(get_domain_from_id(u) for u in users): + for domain in {get_domain_from_id(u) for u in users}: if domain != self.server_name: logger.debug("sending typing update to %s", domain) self.federation.build_and_send_edu( @@ -231,7 +231,7 @@ class TypingHandler(object): return users = yield self.state.get_current_users_in_room(room_id) - domains = set(get_domain_from_id(u) for u in users) + domains = {get_domain_from_id(u) for u in users} if self.server_name in domains: logger.info("Got typing update from %s: %r", user_id, content) diff --git a/synapse/logging/utils.py b/synapse/logging/utils.py index 6073fc2725..0c2527bd86 100644 --- a/synapse/logging/utils.py +++ b/synapse/logging/utils.py @@ -148,7 +148,7 @@ def trace_function(f): pathname=pathname, lineno=lineno, msg=msg, - args=tuple(), + args=(), exc_info=None, ) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 0b45e1f52a..0dba997a23 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -240,7 +240,7 @@ class BucketCollector(object): res.append(["+Inf", sum(data.values())]) metric = HistogramMetricFamily( - self.name, "", buckets=res, sum_value=sum([x * y for x, y in data.items()]) + self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items()) ) yield metric diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index c53d2a0d40..b65bcd8806 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -80,13 +80,13 @@ _background_process_db_sched_duration = Counter( # map from description to a counter, so that we can name our logcontexts # incrementally. (It actually duplicates _background_process_start_count, but # it's much simpler to do so than to try to combine them.) -_background_process_counts = dict() # type: dict[str, int] +_background_process_counts = {} # type: dict[str, int] # map from description to the currently running background processes. # # it's kept as a dict of sets rather than a big set so that we can keep track # of process descriptions that no longer have any active processes. -_background_processes = dict() # type: dict[str, set[_BackgroundProcess]] +_background_processes = {} # type: dict[str, set[_BackgroundProcess]] # A lock that covers the above dicts _bg_metrics_lock = threading.Lock() diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 7d9f5a38d9..433ca2f416 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -400,11 +400,11 @@ class RulesForRoom(object): if logger.isEnabledFor(logging.DEBUG): logger.debug("Found members %r: %r", self.room_id, members.values()) - interested_in_user_ids = set( + interested_in_user_ids = { user_id for user_id, membership in itervalues(members) if membership == Membership.JOIN - ) + } logger.debug("Joined: %r", interested_in_user_ids) @@ -412,9 +412,9 @@ class RulesForRoom(object): interested_in_user_ids, on_invalidate=self.invalidate_all_cb ) - user_ids = set( + user_ids = { uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher - ) + } logger.debug("With pushers: %r", user_ids) diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 8c818a86bf..ba4551d619 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -204,7 +204,7 @@ class EmailPusher(object): yield self.send_notification(unprocessed, reason) yield self.save_last_stream_ordering_and_success( - max([ea["stream_ordering"] for ea in unprocessed]) + max(ea["stream_ordering"] for ea in unprocessed) ) # we update the throttle on all the possible unprocessed push actions diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index b13b646bfd..4ccaf178ce 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -526,12 +526,10 @@ class Mailer(object): # If the room doesn't have a name, say who the messages # are from explicitly to avoid, "messages in the Bob room" sender_ids = list( - set( - [ - notif_events[n["event_id"]].sender - for n in notifs_by_room[room_id] - ] - ) + { + notif_events[n["event_id"]].sender + for n in notifs_by_room[room_id] + } ) member_events = yield self.store.get_events( @@ -558,12 +556,10 @@ class Mailer(object): # If the reason room doesn't have a name, say who the messages # are from explicitly to avoid, "messages in the Bob room" sender_ids = list( - set( - [ - notif_events[n["event_id"]].sender - for n in notifs_by_room[reason["room_id"]] - ] - ) + { + notif_events[n["event_id"]].sender + for n in notifs_by_room[reason["room_id"]] + } ) member_events = yield self.store.get_events( diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index b9dca5bc63..01789a9fb4 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -191,7 +191,7 @@ class PusherPool: min_stream_id - 1, max_stream_id ) # This returns a tuple, user_id is at index 3 - users_affected = set([r[3] for r in updated_receipts]) + users_affected = {r[3] for r in updated_receipts} for u in users_affected: if u in self.pushers: diff --git a/synapse/rest/admin/_base.py b/synapse/rest/admin/_base.py index 459482eb6d..a96f75ce26 100644 --- a/synapse/rest/admin/_base.py +++ b/synapse/rest/admin/_base.py @@ -29,7 +29,7 @@ def historical_admin_path_patterns(path_regex): Note that this should only be used for existing endpoints: new ones should just register for the /_synapse/admin path. """ - return list( + return [ re.compile(prefix + path_regex) for prefix in ( "^/_synapse/admin/v1", @@ -37,7 +37,7 @@ def historical_admin_path_patterns(path_regex): "^/_matrix/client/unstable/admin", "^/_matrix/client/r0/admin", ) - ) + ] def admin_patterns(path_regex: str): diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 4f74600239..9fd4908136 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -49,7 +49,7 @@ class PushRuleRestServlet(RestServlet): if self._is_worker: raise Exception("Cannot handle PUT /push_rules on worker") - spec = _rule_spec_from_path([x for x in path.split("/")]) + spec = _rule_spec_from_path(path.split("/")) try: priority_class = _priority_class_from_spec(spec) except InvalidRuleException as e: @@ -110,7 +110,7 @@ class PushRuleRestServlet(RestServlet): if self._is_worker: raise Exception("Cannot handle DELETE /push_rules on worker") - spec = _rule_spec_from_path([x for x in path.split("/")]) + spec = _rule_spec_from_path(path.split("/")) requester = await self.auth.get_user_by_req(request) user_id = requester.user.to_string() @@ -138,7 +138,7 @@ class PushRuleRestServlet(RestServlet): rules = format_push_rules_for_user(requester.user, rules) - path = [x for x in path.split("/")][1:] + path = path.split("/")[1:] if path == []: # we're a reference impl: pedantry is our job. diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py index 6f6b7aed6e..550a2f1b44 100644 --- a/synapse/rest/client/v1/pusher.py +++ b/synapse/rest/client/v1/pusher.py @@ -54,9 +54,9 @@ class PushersRestServlet(RestServlet): pushers = await self.hs.get_datastore().get_pushers_by_user_id(user.to_string()) - filtered_pushers = list( + filtered_pushers = [ {k: v for k, v in p.items() if k in ALLOWED_KEYS} for p in pushers - ) + ] return 200, {"pushers": filtered_pushers} diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index d8292ce29f..8fa68dd37f 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -72,7 +72,7 @@ class SyncRestServlet(RestServlet): """ PATTERNS = client_patterns("/sync$") - ALLOWED_PRESENCE = set(["online", "offline", "unavailable"]) + ALLOWED_PRESENCE = {"online", "offline", "unavailable"} def __init__(self, hs): super(SyncRestServlet, self).__init__() diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index 9d6813a047..4b6d030a57 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -149,7 +149,7 @@ class RemoteKey(DirectServeResource): time_now_ms = self.clock.time_msec() - cache_misses = dict() # type: Dict[str, Set[str]] + cache_misses = {} # type: Dict[str, Set[str]] for (server_name, key_id, from_server), results in cached.items(): results = [(result["ts_added_ms"], result) for result in results] diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py index 65bbf00073..ba28dd089d 100644 --- a/synapse/rest/media/v1/_base.py +++ b/synapse/rest/media/v1/_base.py @@ -135,27 +135,25 @@ def add_file_headers(request, media_type, file_size, upload_name): # separators as defined in RFC2616. SP and HT are handled separately. # see _can_encode_filename_as_token. -_FILENAME_SEPARATOR_CHARS = set( - ( - "(", - ")", - "<", - ">", - "@", - ",", - ";", - ":", - "\\", - '"', - "/", - "[", - "]", - "?", - "=", - "{", - "}", - ) -) +_FILENAME_SEPARATOR_CHARS = { + "(", + ")", + "<", + ">", + "@", + ",", + ";", + ":", + "\\", + '"', + "/", + "[", + "]", + "?", + "=", + "{", + "}", +} def _can_encode_filename_as_token(x): diff --git a/synapse/state/v1.py b/synapse/state/v1.py index 24b7c0faef..9bf98d06f2 100644 --- a/synapse/state/v1.py +++ b/synapse/state/v1.py @@ -69,9 +69,9 @@ def resolve_events_with_store( unconflicted_state, conflicted_state = _seperate(state_sets) - needed_events = set( + needed_events = { event_id for event_ids in itervalues(conflicted_state) for event_id in event_ids - ) + } needed_event_count = len(needed_events) if event_map is not None: needed_events -= set(iterkeys(event_map)) @@ -261,11 +261,11 @@ def _resolve_state_events(conflicted_state, auth_events): def _resolve_auth_events(events, auth_events): - reverse = [i for i in reversed(_ordered_events(events))] + reverse = list(reversed(_ordered_events(events))) - auth_keys = set( + auth_keys = { key for event in events for key in event_auth.auth_types_for_event(event) - ) + } new_auth_events = {} for key in auth_keys: diff --git a/synapse/state/v2.py b/synapse/state/v2.py index 75fe58305a..0ffe6d8c14 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -105,7 +105,7 @@ def resolve_events_with_store( % (room_id, event.event_id, event.room_id,) ) - full_conflicted_set = set(eid for eid in full_conflicted_set if eid in event_map) + full_conflicted_set = {eid for eid in full_conflicted_set if eid in event_map} logger.debug("%d full_conflicted_set entries", len(full_conflicted_set)) @@ -233,7 +233,7 @@ def _get_auth_chain_difference(state_sets, event_map, state_res_store): auth_sets = [] for state_set in state_sets: - auth_ids = set( + auth_ids = { eid for key, eid in iteritems(state_set) if ( @@ -246,7 +246,7 @@ def _get_auth_chain_difference(state_sets, event_map, state_res_store): ) ) and eid not in common - ) + } auth_chain = yield state_res_store.get_auth_chain(auth_ids, common) auth_ids.update(auth_chain) @@ -275,7 +275,7 @@ def _seperate(state_sets): conflicted_state = {} for key in set(itertools.chain.from_iterable(state_sets)): - event_ids = set(state_set.get(key) for state_set in state_sets) + event_ids = {state_set.get(key) for state_set in state_sets} if len(event_ids) == 1: unconflicted_state[key] = event_ids.pop() else: diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index da3b99f93d..13de5f1f62 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -56,7 +56,7 @@ class SQLBaseStore(metaclass=ABCMeta): members_changed (iterable[str]): The user_ids of members that have changed """ - for host in set(get_domain_from_id(u) for u in members_changed): + for host in {get_domain_from_id(u) for u in members_changed}: self._attempt_to_invalidate_cache("is_host_joined", (room_id, host)) self._attempt_to_invalidate_cache("was_host_joined", (room_id, host)) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index bd547f35cf..eb1a7e5002 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -189,7 +189,7 @@ class BackgroundUpdater(object): keyvalues=None, retcols=("update_name", "depends_on"), ) - in_flight = set(update["update_name"] for update in updates) + in_flight = {update["update_name"] for update in updates} for update in updates: if update["depends_on"] not in in_flight: self._background_update_queue.append(update["update_name"]) diff --git a/synapse/storage/data_stores/main/appservice.py b/synapse/storage/data_stores/main/appservice.py index b2f39649fd..efbc06c796 100644 --- a/synapse/storage/data_stores/main/appservice.py +++ b/synapse/storage/data_stores/main/appservice.py @@ -135,7 +135,7 @@ class ApplicationServiceTransactionWorkerStore( may be empty. """ results = yield self.db.simple_select_list( - "application_services_state", dict(state=state), ["as_id"] + "application_services_state", {"state": state}, ["as_id"] ) # NB: This assumes this class is linked with ApplicationServiceStore as_list = self.get_app_services() @@ -158,7 +158,7 @@ class ApplicationServiceTransactionWorkerStore( """ result = yield self.db.simple_select_one( "application_services_state", - dict(as_id=service.id), + {"as_id": service.id}, ["state"], allow_none=True, desc="get_appservice_state", @@ -177,7 +177,7 @@ class ApplicationServiceTransactionWorkerStore( A Deferred which resolves when the state was set successfully. """ return self.db.simple_upsert( - "application_services_state", dict(as_id=service.id), dict(state=state) + "application_services_state", {"as_id": service.id}, {"state": state} ) def create_appservice_txn(self, service, events): @@ -253,13 +253,15 @@ class ApplicationServiceTransactionWorkerStore( self.db.simple_upsert_txn( txn, "application_services_state", - dict(as_id=service.id), - dict(last_txn=txn_id), + {"as_id": service.id}, + {"last_txn": txn_id}, ) # Delete txn self.db.simple_delete_txn( - txn, "application_services_txns", dict(txn_id=txn_id, as_id=service.id) + txn, + "application_services_txns", + {"txn_id": txn_id, "as_id": service.id}, ) return self.db.runInteraction( diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py index 13f4c9c72e..e1ccb27142 100644 --- a/synapse/storage/data_stores/main/client_ips.py +++ b/synapse/storage/data_stores/main/client_ips.py @@ -530,7 +530,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): ((row["access_token"], row["ip"]), (row["user_agent"], row["last_seen"])) for row in rows ) - return list( + return [ { "access_token": access_token, "ip": ip, @@ -538,7 +538,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): "last_seen": last_seen, } for (access_token, ip), (user_agent, last_seen) in iteritems(results) - ) + ] @wrap_as_background_process("prune_old_user_ips") async def _prune_old_user_ips(self): diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index b7617efb80..d55733a4cd 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -137,7 +137,7 @@ class DeviceWorkerStore(SQLBaseStore): # get the cross-signing keys of the users in the list, so that we can # determine which of the device changes were cross-signing keys - users = set(r[0] for r in updates) + users = {r[0] for r in updates} master_key_by_user = {} self_signing_key_by_user = {} for user in users: @@ -446,7 +446,7 @@ class DeviceWorkerStore(SQLBaseStore): a set of user_ids and results_map is a mapping of user_id -> device_id -> device_info """ - user_ids = set(user_id for user_id, _ in query_list) + user_ids = {user_id for user_id, _ in query_list} user_map = yield self.get_device_list_last_stream_id_for_remotes(list(user_ids)) # We go and check if any of the users need to have their device lists @@ -454,10 +454,9 @@ class DeviceWorkerStore(SQLBaseStore): users_needing_resync = yield self.get_user_ids_requiring_device_list_resync( user_ids ) - user_ids_in_cache = ( - set(user_id for user_id, stream_id in user_map.items() if stream_id) - - users_needing_resync - ) + user_ids_in_cache = { + user_id for user_id, stream_id in user_map.items() if stream_id + } - users_needing_resync user_ids_not_in_cache = user_ids - user_ids_in_cache results = {} @@ -604,7 +603,7 @@ class DeviceWorkerStore(SQLBaseStore): rows = yield self.db.execute( "get_users_whose_signatures_changed", None, sql, user_id, from_key ) - return set(user for row in rows for user in json.loads(row[0])) + return {user for row in rows for user in json.loads(row[0])} else: return set() diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py index 750ec1b70d..49a7b8b433 100644 --- a/synapse/storage/data_stores/main/event_federation.py +++ b/synapse/storage/data_stores/main/event_federation.py @@ -426,7 +426,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas query, (room_id, event_id, False, limit - len(event_results)) ) - new_results = set(t[0] for t in txn) - seen_events + new_results = {t[0] for t in txn} - seen_events new_front |= new_results seen_events |= new_results diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index c9d0d68c3a..8ae23df00a 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -145,7 +145,7 @@ class EventsStore( return txn.fetchall() res = yield self.db.runInteraction("read_forward_extremities", fetch) - self._current_forward_extremities_amount = c_counter(list(x[0] for x in res)) + self._current_forward_extremities_amount = c_counter([x[0] for x in res]) @_retry_on_integrity_error @defer.inlineCallbacks @@ -598,11 +598,11 @@ class EventsStore( # We find out which membership events we may have deleted # and which we have added, then we invlidate the caches for all # those users. - members_changed = set( + members_changed = { state_key for ev_type, state_key in itertools.chain(to_delete, to_insert) if ev_type == EventTypes.Member - ) + } for member in members_changed: txn.call_after( @@ -1615,7 +1615,7 @@ class EventsStore( """ ) - referenced_state_groups = set(sg for sg, in txn) + referenced_state_groups = {sg for sg, in txn} logger.info( "[purge] found %i referenced state groups", len(referenced_state_groups) ) diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py index 5177b71016..f54c8b1ee0 100644 --- a/synapse/storage/data_stores/main/events_bg_updates.py +++ b/synapse/storage/data_stores/main/events_bg_updates.py @@ -402,7 +402,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): keyvalues={}, retcols=("room_id",), ) - room_ids = set(row["room_id"] for row in rows) + room_ids = {row["room_id"] for row in rows} for room_id in room_ids: txn.call_after( self.get_latest_event_ids_in_room.invalidate, (room_id,) diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 7251e819f5..47a3a26072 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -494,9 +494,9 @@ class EventsWorkerStore(SQLBaseStore): """ with Measure(self._clock, "_fetch_event_list"): try: - events_to_fetch = set( + events_to_fetch = { event_id for events, _ in event_list for event_id in events - ) + } row_dict = self.db.new_transaction( conn, "do_fetch", [], [], self._fetch_event_rows, events_to_fetch @@ -804,7 +804,7 @@ class EventsWorkerStore(SQLBaseStore): desc="have_events_in_timeline", ) - return set(r["event_id"] for r in rows) + return {r["event_id"] for r in rows} @defer.inlineCallbacks def have_seen_events(self, event_ids): diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py index e2673ae073..62ac88d9f2 100644 --- a/synapse/storage/data_stores/main/push_rule.py +++ b/synapse/storage/data_stores/main/push_rule.py @@ -276,21 +276,21 @@ class PushRulesWorkerStore( # We ignore app service users for now. This is so that we don't fill # up the `get_if_users_have_pushers` cache with AS entries that we # know don't have pushers, nor even read receipts. - local_users_in_room = set( + local_users_in_room = { u for u in users_in_room if self.hs.is_mine_id(u) and not self.get_if_app_services_interested_in_user(u) - ) + } # users in the room who have pushers need to get push rules run because # that's how their pushers work if_users_with_pushers = yield self.get_if_users_have_pushers( local_users_in_room, on_invalidate=cache_context.invalidate ) - user_ids = set( + user_ids = { uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher - ) + } users_with_receipts = yield self.get_users_with_read_receipts_in_room( room_id, on_invalidate=cache_context.invalidate diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py index 96e54d145e..0d932a0672 100644 --- a/synapse/storage/data_stores/main/receipts.py +++ b/synapse/storage/data_stores/main/receipts.py @@ -58,7 +58,7 @@ class ReceiptsWorkerStore(SQLBaseStore): @cachedInlineCallbacks() def get_users_with_read_receipts_in_room(self, room_id): receipts = yield self.get_receipts_for_room(room_id, "m.read") - return set(r["user_id"] for r in receipts) + return {r["user_id"] for r in receipts} @cached(num_args=2) def get_receipts_for_room(self, room_id, receipt_type): @@ -283,7 +283,7 @@ class ReceiptsWorkerStore(SQLBaseStore): args.append(limit) txn.execute(sql, args) - return list(r[0:5] + (json.loads(r[5]),) for r in txn) + return [r[0:5] + (json.loads(r[5]),) for r in txn] return self.db.runInteraction( "get_all_updated_receipts", get_all_updated_receipts_txn diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py index d5ced05701..d5bd0cb5cf 100644 --- a/synapse/storage/data_stores/main/roommember.py +++ b/synapse/storage/data_stores/main/roommember.py @@ -465,7 +465,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): txn.execute(sql % (clause,), args) - return set(row[0] for row in txn) + return {row[0] for row in txn} return await self.db.runInteraction( "get_users_server_still_shares_room_with", @@ -826,7 +826,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): GROUP BY room_id, user_id; """ txn.execute(sql, (user_id,)) - return set(row[0] for row in txn if row[1] == 0) + return {row[0] for row in txn if row[1] == 0} return self.db.runInteraction( "get_forgotten_rooms_for_user", _get_forgotten_rooms_for_user_txn diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py index 3d34103e67..3a3b9a8e72 100644 --- a/synapse/storage/data_stores/main/state.py +++ b/synapse/storage/data_stores/main/state.py @@ -321,7 +321,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): desc="get_referenced_state_groups", ) - return set(row["state_group"] for row in rows) + return {row["state_group"] for row in rows} class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): @@ -367,7 +367,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): """ txn.execute(sql, (last_room_id, batch_size)) - room_ids = list(row[0] for row in txn) + room_ids = [row[0] for row in txn] if not room_ids: return True, set() @@ -384,7 +384,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): txn.execute(sql, (last_room_id, room_ids[-1], "%:" + self.server_name)) - joined_room_ids = set(row[0] for row in txn) + joined_room_ids = {row[0] for row in txn} left_rooms = set(room_ids) - joined_room_ids @@ -404,7 +404,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): retcols=("state_key",), ) - potentially_left_users = set(row["state_key"] for row in rows) + potentially_left_users = {row["state_key"] for row in rows} # Now lets actually delete the rooms from the DB. self.db.simple_delete_many_txn( diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 056b25b13a..ada5cce6c2 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -346,11 +346,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): from_key (str): The room_key portion of a StreamToken """ from_key = RoomStreamToken.parse_stream_token(from_key).stream - return set( + return { room_id for room_id in room_ids if self._events_stream_cache.has_entity_changed(room_id, from_key) - ) + } @defer.inlineCallbacks def get_room_events_stream_for_room( @@ -679,11 +679,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) events_before = yield self.get_events_as_list( - [e for e in results["before"]["event_ids"]], get_prev_content=True + list(results["before"]["event_ids"]), get_prev_content=True ) events_after = yield self.get_events_as_list( - [e for e in results["after"]["event_ids"]], get_prev_content=True + list(results["after"]["event_ids"]), get_prev_content=True ) return { diff --git a/synapse/storage/data_stores/main/user_erasure_store.py b/synapse/storage/data_stores/main/user_erasure_store.py index af8025bc17..ec6b8a4ffd 100644 --- a/synapse/storage/data_stores/main/user_erasure_store.py +++ b/synapse/storage/data_stores/main/user_erasure_store.py @@ -63,9 +63,9 @@ class UserErasureWorkerStore(SQLBaseStore): retcols=("user_id",), desc="are_users_erased", ) - erased_users = set(row["user_id"] for row in rows) + erased_users = {row["user_id"] for row in rows} - res = dict((u, u in erased_users) for u in user_ids) + res = {u: u in erased_users for u in user_ids} return res diff --git a/synapse/storage/data_stores/state/store.py b/synapse/storage/data_stores/state/store.py index c4ee9b7ccb..57a5267663 100644 --- a/synapse/storage/data_stores/state/store.py +++ b/synapse/storage/data_stores/state/store.py @@ -520,11 +520,11 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): retcols=("state_group",), ) - remaining_state_groups = set( + remaining_state_groups = { row["state_group"] for row in rows if row["state_group"] not in state_groups_to_delete - ) + } logger.info( "[purge] de-delta-ing %i remaining state groups", diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 6dcb5c04da..1953614401 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -554,8 +554,8 @@ class Database(object): Returns: A list of dicts where the key is the column header. """ - col_headers = list(intern(str(column[0])) for column in cursor.description) - results = list(dict(zip(col_headers, row)) for row in cursor) + col_headers = [intern(str(column[0])) for column in cursor.description] + results = [dict(zip(col_headers, row)) for row in cursor] return results def execute(self, desc, decoder, query, *args): diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index b950550f23..0f9ac1cf09 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -602,14 +602,14 @@ class EventsPersistenceStorage(object): event_id_to_state_group.update(event_to_groups) # State groups of old_latest_event_ids - old_state_groups = set( + old_state_groups = { event_id_to_state_group[evid] for evid in old_latest_event_ids - ) + } # State groups of new_latest_event_ids - new_state_groups = set( + new_state_groups = { event_id_to_state_group[evid] for evid in new_latest_event_ids - ) + } # If they old and new groups are the same then we don't need to do # anything. diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index c285ef52a0..fc69c32a0a 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -345,9 +345,9 @@ def _upgrade_existing_database( "Could not open delta dir for version %d: %s" % (v, directory) ) - duplicates = set( + duplicates = { file_name for file_name, count in file_name_counter.items() if count > 1 - ) + } if duplicates: # We don't support using the same file name in the same delta version. raise PrepareDatabaseException( @@ -454,7 +454,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams) ), (modname,), ) - applied_deltas = set(d for d, in cur) + applied_deltas = {d for d, in cur} for (name, stream) in names_and_streams: if name in applied_deltas: continue diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py index 635b897d6c..f2ccd5e7c6 100644 --- a/synapse/util/frozenutils.py +++ b/synapse/util/frozenutils.py @@ -30,7 +30,7 @@ def freeze(o): return o try: - return tuple([freeze(i) for i in o]) + return tuple(freeze(i) for i in o) except TypeError: pass diff --git a/synapse/visibility.py b/synapse/visibility.py index d0abd8f04f..e60d9756b7 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -75,7 +75,7 @@ def filter_events_for_client( """ # Filter out events that have been soft failed so that we don't relay them # to clients. - events = list(e for e in events if not e.internal_metadata.is_soft_failed()) + events = [e for e in events if not e.internal_metadata.is_soft_failed()] types = ((EventTypes.RoomHistoryVisibility, ""), (EventTypes.Member, user_id)) event_id_to_state = yield storage.state.get_state_for_events( @@ -97,7 +97,7 @@ def filter_events_for_client( erased_senders = yield storage.main.are_users_erased((e.sender for e in events)) if apply_retention_policies: - room_ids = set(e.room_id for e in events) + room_ids = {e.room_id for e in events} retention_policies = {} for room_id in room_ids: diff --git a/tests/config/test_generate.py b/tests/config/test_generate.py index 2684e662de..463855ecc8 100644 --- a/tests/config/test_generate.py +++ b/tests/config/test_generate.py @@ -48,7 +48,7 @@ class ConfigGenerationTestCase(unittest.TestCase): ) self.assertSetEqual( - set(["homeserver.yaml", "lemurs.win.log.config", "lemurs.win.signing.key"]), + {"homeserver.yaml", "lemurs.win.log.config", "lemurs.win.signing.key"}, set(os.listdir(self.dir)), ) diff --git a/tests/federation/test_federation_server.py b/tests/federation/test_federation_server.py index e7d8699040..296dc887be 100644 --- a/tests/federation/test_federation_server.py +++ b/tests/federation/test_federation_server.py @@ -83,7 +83,7 @@ class StateQueryTests(unittest.FederatingHomeserverTestCase): ) ) - self.assertEqual(members, set(["@user:other.example.com", u1])) + self.assertEqual(members, {"@user:other.example.com", u1}) self.assertEqual(len(channel.json_body["pdus"]), 6) def test_needs_to_be_in_room(self): diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index c171038df8..64915bafcd 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -338,7 +338,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, syncing_user_ids=set([user_id]), now=now + state, is_mine=True, syncing_user_ids={user_id}, now=now ) self.assertIsNotNone(new_state) @@ -579,7 +579,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): ) self.assertEqual(expected_state.state, PresenceState.ONLINE) self.federation_sender.send_presence_to_destinations.assert_called_once_with( - destinations=set(("server2", "server3")), states=[expected_state] + destinations={"server2", "server3"}, states=[expected_state] ) def _add_new_user(self, room_id, user_id): diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 140cc0a3c2..07b204666e 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -129,12 +129,12 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): hs.get_auth().check_user_in_room = check_user_in_room def get_joined_hosts_for_room(room_id): - return set(member.domain for member in self.room_members) + return {member.domain for member in self.room_members} self.datastore.get_joined_hosts_for_room = get_joined_hosts_for_room def get_current_users_in_room(room_id): - return set(str(u) for u in self.room_members) + return {str(u) for u in self.room_members} hs.get_state_handler().get_current_users_in_room = get_current_users_in_room @@ -257,7 +257,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): member = RoomMember(ROOM_ID, U_APPLE.to_string()) self.handler._member_typing_until[member] = 1002000 - self.handler._room_typing[ROOM_ID] = set([U_APPLE.to_string()]) + self.handler._room_typing[ROOM_ID] = {U_APPLE.to_string()} self.assertEquals(self.event_source.get_current_key(), 0) diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index 0a4765fff4..7b92bdbc47 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -114,7 +114,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): public_users = self.get_users_in_public_rooms() self.assertEqual( - self._compress_shared(shares_private), set([(u1, u2, room), (u2, u1, room)]) + self._compress_shared(shares_private), {(u1, u2, room), (u2, u1, room)} ) self.assertEqual(public_users, []) @@ -169,7 +169,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): public_users = self.get_users_in_public_rooms() self.assertEqual( - self._compress_shared(shares_private), set([(u1, u2, room), (u2, u1, room)]) + self._compress_shared(shares_private), {(u1, u2, room), (u2, u1, room)} ) self.assertEqual(public_users, []) @@ -226,7 +226,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): public_users = self.get_users_in_public_rooms() self.assertEqual( - self._compress_shared(shares_private), set([(u1, u2, room), (u2, u1, room)]) + self._compress_shared(shares_private), {(u1, u2, room), (u2, u1, room)} ) self.assertEqual(public_users, []) @@ -358,12 +358,12 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): public_users = self.get_users_in_public_rooms() # User 1 and User 2 are in the same public room - self.assertEqual(set(public_users), set([(u1, room), (u2, room)])) + self.assertEqual(set(public_users), {(u1, room), (u2, room)}) # User 1 and User 3 share private rooms self.assertEqual( self._compress_shared(shares_private), - set([(u1, u3, private_room), (u3, u1, private_room)]), + {(u1, u3, private_room), (u3, u1, private_room)}, ) def test_initial_share_all_users(self): @@ -398,7 +398,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): # No users share rooms self.assertEqual(public_users, []) - self.assertEqual(self._compress_shared(shares_private), set([])) + self.assertEqual(self._compress_shared(shares_private), set()) # Despite not sharing a room, search_all_users means we get a search # result. diff --git a/tests/push/test_email.py b/tests/push/test_email.py index 80187406bc..83032cc9ea 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -163,7 +163,7 @@ class EmailPusherTests(HomeserverTestCase): # Get the stream ordering before it gets sent pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=self.user_id)) + self.hs.get_datastore().get_pushers_by({"user_name": self.user_id}) ) pushers = list(pushers) self.assertEqual(len(pushers), 1) @@ -174,7 +174,7 @@ class EmailPusherTests(HomeserverTestCase): # It hasn't succeeded yet, so the stream ordering shouldn't have moved pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=self.user_id)) + self.hs.get_datastore().get_pushers_by({"user_name": self.user_id}) ) pushers = list(pushers) self.assertEqual(len(pushers), 1) @@ -192,7 +192,7 @@ class EmailPusherTests(HomeserverTestCase): # The stream ordering has increased pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=self.user_id)) + self.hs.get_datastore().get_pushers_by({"user_name": self.user_id}) ) pushers = list(pushers) self.assertEqual(len(pushers), 1) diff --git a/tests/push/test_http.py b/tests/push/test_http.py index fe3441f081..baf9c785f4 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -102,7 +102,7 @@ class HTTPPusherTests(HomeserverTestCase): # Get the stream ordering before it gets sent pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) + self.hs.get_datastore().get_pushers_by({"user_name": user_id}) ) pushers = list(pushers) self.assertEqual(len(pushers), 1) @@ -113,7 +113,7 @@ class HTTPPusherTests(HomeserverTestCase): # It hasn't succeeded yet, so the stream ordering shouldn't have moved pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) + self.hs.get_datastore().get_pushers_by({"user_name": user_id}) ) pushers = list(pushers) self.assertEqual(len(pushers), 1) @@ -132,7 +132,7 @@ class HTTPPusherTests(HomeserverTestCase): # The stream ordering has increased pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) + self.hs.get_datastore().get_pushers_by({"user_name": user_id}) ) pushers = list(pushers) self.assertEqual(len(pushers), 1) @@ -152,7 +152,7 @@ class HTTPPusherTests(HomeserverTestCase): # The stream ordering has increased, again pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) + self.hs.get_datastore().get_pushers_by({"user_name": user_id}) ) pushers = list(pushers) self.assertEqual(len(pushers), 1) diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index 9c13a13786..fa3a3ec1bd 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -40,16 +40,14 @@ class FilterTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.code, 200) self.assertTrue( - set( - [ - "next_batch", - "rooms", - "presence", - "account_data", - "to_device", - "device_lists", - ] - ).issubset(set(channel.json_body.keys())) + { + "next_batch", + "rooms", + "presence", + "account_data", + "to_device", + "device_lists", + }.issubset(set(channel.json_body.keys())) ) def test_sync_presence_disabled(self): @@ -63,9 +61,13 @@ class FilterTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.code, 200) self.assertTrue( - set( - ["next_batch", "rooms", "account_data", "to_device", "device_lists"] - ).issubset(set(channel.json_body.keys())) + { + "next_batch", + "rooms", + "account_data", + "to_device", + "device_lists", + }.issubset(set(channel.json_body.keys())) ) diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index d491ea2924..e37260a820 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -373,7 +373,7 @@ class UpsertManyTests(unittest.HomeserverTestCase): ) self.assertEqual( set(self._dump_to_tuple(res)), - set([(1, "user1", "hello"), (2, "user2", "there")]), + {(1, "user1", "hello"), (2, "user2", "there")}, ) # Update only user2 @@ -400,5 +400,5 @@ class UpsertManyTests(unittest.HomeserverTestCase): ) self.assertEqual( set(self._dump_to_tuple(res)), - set([(1, "user1", "hello"), (2, "user2", "bleb")]), + {(1, "user1", "hello"), (2, "user2", "bleb")}, ) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index fd52512696..31710949a8 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -69,14 +69,14 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): pass def _add_appservice(self, as_token, id, url, hs_token, sender): - as_yaml = dict( - url=url, - as_token=as_token, - hs_token=hs_token, - id=id, - sender_localpart=sender, - namespaces={}, - ) + as_yaml = { + "url": url, + "as_token": as_token, + "hs_token": hs_token, + "id": id, + "sender_localpart": sender, + "namespaces": {}, + } # use the token as the filename with open(as_token, "w") as outfile: outfile.write(yaml.dump(as_yaml)) @@ -135,14 +135,14 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): ) def _add_service(self, url, as_token, id): - as_yaml = dict( - url=url, - as_token=as_token, - hs_token="something", - id=id, - sender_localpart="a_sender", - namespaces={}, - ) + as_yaml = { + "url": url, + "as_token": as_token, + "hs_token": "something", + "id": id, + "sender_localpart": "a_sender", + "namespaces": {}, + } # use the token as the filename with open(as_token, "w") as outfile: outfile.write(yaml.dump(as_yaml)) @@ -384,8 +384,8 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): ) self.assertEquals(2, len(services)) self.assertEquals( - set([self.as_list[2]["id"], self.as_list[0]["id"]]), - set([services[0].id, services[1].id]), + {self.as_list[2]["id"], self.as_list[0]["id"]}, + {services[0].id, services[1].id}, ) diff --git a/tests/storage/test_cleanup_extrems.py b/tests/storage/test_cleanup_extrems.py index 029ac26454..0e04b2cf92 100644 --- a/tests/storage/test_cleanup_extrems.py +++ b/tests/storage/test_cleanup_extrems.py @@ -134,7 +134,7 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase): latest_event_ids = self.get_success( self.store.get_latest_event_ids_in_room(self.room_id) ) - self.assertEqual(set(latest_event_ids), set((event_id_a, event_id_b))) + self.assertEqual(set(latest_event_ids), {event_id_a, event_id_b}) # Run the background update and check it did the right thing self.run_background_update() @@ -172,7 +172,7 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase): latest_event_ids = self.get_success( self.store.get_latest_event_ids_in_room(self.room_id) ) - self.assertEqual(set(latest_event_ids), set((event_id_a, event_id_b))) + self.assertEqual(set(latest_event_ids), {event_id_a, event_id_b}) # Run the background update and check it did the right thing self.run_background_update() @@ -227,9 +227,7 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase): latest_event_ids = self.get_success( self.store.get_latest_event_ids_in_room(self.room_id) ) - self.assertEqual( - set(latest_event_ids), set((event_id_a, event_id_b, event_id_c)) - ) + self.assertEqual(set(latest_event_ids), {event_id_a, event_id_b, event_id_c}) # Run the background update and check it did the right thing self.run_background_update() @@ -237,7 +235,7 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase): latest_event_ids = self.get_success( self.store.get_latest_event_ids_in_room(self.room_id) ) - self.assertEqual(set(latest_event_ids), set([event_id_b, event_id_c])) + self.assertEqual(set(latest_event_ids), {event_id_b, event_id_c}) class CleanupExtremDummyEventsTestCase(HomeserverTestCase): diff --git a/tests/storage/test_event_metrics.py b/tests/storage/test_event_metrics.py index f26ff57a18..a7b7fd36d3 100644 --- a/tests/storage/test_event_metrics.py +++ b/tests/storage/test_event_metrics.py @@ -59,24 +59,22 @@ class ExtremStatisticsTestCase(HomeserverTestCase): ) ) - expected = set( - [ - b'synapse_forward_extremities_bucket{le="1.0"} 0.0', - b'synapse_forward_extremities_bucket{le="2.0"} 2.0', - b'synapse_forward_extremities_bucket{le="3.0"} 2.0', - b'synapse_forward_extremities_bucket{le="5.0"} 2.0', - b'synapse_forward_extremities_bucket{le="7.0"} 3.0', - b'synapse_forward_extremities_bucket{le="10.0"} 3.0', - b'synapse_forward_extremities_bucket{le="15.0"} 3.0', - b'synapse_forward_extremities_bucket{le="20.0"} 3.0', - b'synapse_forward_extremities_bucket{le="50.0"} 3.0', - b'synapse_forward_extremities_bucket{le="100.0"} 3.0', - b'synapse_forward_extremities_bucket{le="200.0"} 3.0', - b'synapse_forward_extremities_bucket{le="500.0"} 3.0', - b'synapse_forward_extremities_bucket{le="+Inf"} 3.0', - b"synapse_forward_extremities_count 3.0", - b"synapse_forward_extremities_sum 10.0", - ] - ) + expected = { + b'synapse_forward_extremities_bucket{le="1.0"} 0.0', + b'synapse_forward_extremities_bucket{le="2.0"} 2.0', + b'synapse_forward_extremities_bucket{le="3.0"} 2.0', + b'synapse_forward_extremities_bucket{le="5.0"} 2.0', + b'synapse_forward_extremities_bucket{le="7.0"} 3.0', + b'synapse_forward_extremities_bucket{le="10.0"} 3.0', + b'synapse_forward_extremities_bucket{le="15.0"} 3.0', + b'synapse_forward_extremities_bucket{le="20.0"} 3.0', + b'synapse_forward_extremities_bucket{le="50.0"} 3.0', + b'synapse_forward_extremities_bucket{le="100.0"} 3.0', + b'synapse_forward_extremities_bucket{le="200.0"} 3.0', + b'synapse_forward_extremities_bucket{le="500.0"} 3.0', + b'synapse_forward_extremities_bucket{le="+Inf"} 3.0', + b"synapse_forward_extremities_count 3.0", + b"synapse_forward_extremities_sum 10.0", + } self.assertEqual(items, expected) diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py index 04d58fbf24..0b88308ff4 100644 --- a/tests/storage/test_state.py +++ b/tests/storage/test_state.py @@ -394,7 +394,7 @@ class StateStoreTestCase(tests.unittest.TestCase): ) = self.state_datastore._state_group_cache.get(group) self.assertEqual(is_all, False) - self.assertEqual(known_absent, set([(e1.type, e1.state_key)])) + self.assertEqual(known_absent, {(e1.type, e1.state_key)}) self.assertDictEqual(state_dict_ids, {(e1.type, e1.state_key): e1.event_id}) ############################################ diff --git a/tests/test_state.py b/tests/test_state.py index d1578fe581..66f22f6813 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -254,9 +254,7 @@ class StateTestCase(unittest.TestCase): ctx_d = context_store["D"] prev_state_ids = yield ctx_d.get_prev_state_ids() - self.assertSetEqual( - {"START", "A", "C"}, {e_id for e_id in prev_state_ids.values()} - ) + self.assertSetEqual({"START", "A", "C"}, set(prev_state_ids.values())) self.assertEqual(ctx_c.state_group, ctx_d.state_group_before_event) self.assertEqual(ctx_d.state_group_before_event, ctx_d.state_group) @@ -313,9 +311,7 @@ class StateTestCase(unittest.TestCase): ctx_e = context_store["E"] prev_state_ids = yield ctx_e.get_prev_state_ids() - self.assertSetEqual( - {"START", "A", "B", "C"}, {e for e in prev_state_ids.values()} - ) + self.assertSetEqual({"START", "A", "B", "C"}, set(prev_state_ids.values())) self.assertEqual(ctx_c.state_group, ctx_e.state_group_before_event) self.assertEqual(ctx_e.state_group_before_event, ctx_e.state_group) @@ -388,9 +384,7 @@ class StateTestCase(unittest.TestCase): ctx_d = context_store["D"] prev_state_ids = yield ctx_d.get_prev_state_ids() - self.assertSetEqual( - {"A1", "A2", "A3", "A5", "B"}, {e for e in prev_state_ids.values()} - ) + self.assertSetEqual({"A1", "A2", "A3", "A5", "B"}, set(prev_state_ids.values())) self.assertEqual(ctx_b.state_group, ctx_d.state_group_before_event) self.assertEqual(ctx_d.state_group_before_event, ctx_d.state_group) @@ -482,7 +476,7 @@ class StateTestCase(unittest.TestCase): current_state_ids = yield context.get_current_state_ids() self.assertEqual( - set([e.event_id for e in old_state]), set(current_state_ids.values()) + {e.event_id for e in old_state}, set(current_state_ids.values()) ) self.assertEqual(group_name, context.state_group) @@ -513,9 +507,7 @@ class StateTestCase(unittest.TestCase): prev_state_ids = yield context.get_prev_state_ids() - self.assertEqual( - set([e.event_id for e in old_state]), set(prev_state_ids.values()) - ) + self.assertEqual({e.event_id for e in old_state}, set(prev_state_ids.values())) self.assertIsNotNone(context.state_group) diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py index f2be63706b..72a9de5370 100644 --- a/tests/util/test_stream_change_cache.py +++ b/tests/util/test_stream_change_cache.py @@ -67,7 +67,7 @@ class StreamChangeCacheTests(unittest.TestCase): # If we update an existing entity, it keeps the two existing entities cache.entity_has_changed("bar@baz.net", 5) self.assertEqual( - set(["bar@baz.net", "user@elsewhere.org"]), set(cache._entity_to_key) + {"bar@baz.net", "user@elsewhere.org"}, set(cache._entity_to_key) ) def test_get_all_entities_changed(self): @@ -137,7 +137,7 @@ class StreamChangeCacheTests(unittest.TestCase): cache.get_entities_changed( ["user@foo.com", "bar@baz.net", "user@elsewhere.org"], stream_pos=2 ), - set(["bar@baz.net", "user@elsewhere.org"]), + {"bar@baz.net", "user@elsewhere.org"}, ) # Query all the entries mid-way through the stream, but include one @@ -153,7 +153,7 @@ class StreamChangeCacheTests(unittest.TestCase): ], stream_pos=2, ), - set(["bar@baz.net", "user@elsewhere.org"]), + {"bar@baz.net", "user@elsewhere.org"}, ) # Query all the entries, but before the first known point. We will get @@ -168,21 +168,13 @@ class StreamChangeCacheTests(unittest.TestCase): ], stream_pos=0, ), - set( - [ - "user@foo.com", - "bar@baz.net", - "user@elsewhere.org", - "not@here.website", - ] - ), + {"user@foo.com", "bar@baz.net", "user@elsewhere.org", "not@here.website"}, ) # Query a subset of the entries mid-way through the stream. We should # only get back the subset. self.assertEqual( - cache.get_entities_changed(["bar@baz.net"], stream_pos=2), - set(["bar@baz.net"]), + cache.get_entities_changed(["bar@baz.net"], stream_pos=2), {"bar@baz.net"}, ) def test_max_pos(self): diff --git a/tox.ini b/tox.ini index b9132a3177..b715ea0bff 100644 --- a/tox.ini +++ b/tox.ini @@ -123,6 +123,7 @@ skip_install = True basepython = python3.6 deps = flake8 + flake8-comprehensions black==19.10b0 # We pin so that our tests don't start failing on new releases of black. commands = python -m black --check --diff . -- cgit 1.5.1 From 1f773eec912e4908ab60f7823f5c0a024261af4d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 26 Feb 2020 15:33:26 +0000 Subject: Port PresenceHandler to async/await (#6991) --- changelog.d/6991.misc | 1 + synapse/handlers/message.py | 5 +- synapse/handlers/presence.py | 192 ++++++++++++++++-------------------- synapse/replication/tcp/resource.py | 6 +- synapse/server.pyi | 5 + tests/handlers/test_presence.py | 18 ++-- tox.ini | 1 + 7 files changed, 113 insertions(+), 115 deletions(-) create mode 100644 changelog.d/6991.misc (limited to 'tests/handlers/test_presence.py') diff --git a/changelog.d/6991.misc b/changelog.d/6991.misc new file mode 100644 index 0000000000..5130f4e8af --- /dev/null +++ b/changelog.d/6991.misc @@ -0,0 +1 @@ +Port `synapse.handlers.presence` to async/await. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index d6be280952..a0103addd3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1016,11 +1016,10 @@ class EventCreationHandler(object): # matters as sometimes presence code can take a while. run_in_background(self._bump_active_time, requester.user) - @defer.inlineCallbacks - def _bump_active_time(self, user): + async def _bump_active_time(self, user): try: presence = self.hs.get_presence_handler() - yield presence.bump_presence_active_time(user) + await presence.bump_presence_active_time(user) except Exception: logger.exception("Error bumping presence active time") diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 0d6cf2b008..5526015ddb 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -24,11 +24,12 @@ The methods that define policy are: import logging from contextlib import contextmanager -from typing import Dict, Set +from typing import Dict, List, Set from six import iteritems, itervalues from prometheus_client import Counter +from typing_extensions import ContextManager from twisted.internet import defer @@ -42,10 +43,14 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.presence import UserPresenceState from synapse.types import UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cached from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer +MYPY = False +if MYPY: + import synapse.server + logger = logging.getLogger(__name__) @@ -97,7 +102,6 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER class PresenceHandler(object): def __init__(self, hs: "synapse.server.HomeServer"): self.hs = hs - self.is_mine = hs.is_mine self.is_mine_id = hs.is_mine_id self.server_name = hs.hostname self.clock = hs.get_clock() @@ -150,7 +154,7 @@ class PresenceHandler(object): # Set of users who have presence in the `user_to_current_state` that # have not yet been persisted - self.unpersisted_users_changes = set() + self.unpersisted_users_changes = set() # type: Set[str] hs.get_reactor().addSystemEventTrigger( "before", @@ -160,12 +164,11 @@ class PresenceHandler(object): self._on_shutdown, ) - self.serial_to_user = {} self._next_serial = 1 # Keeps track of the number of *ongoing* syncs on this process. While # this is non zero a user will never go offline. - self.user_to_num_current_syncs = {} + self.user_to_num_current_syncs = {} # type: Dict[str, int] # Keeps track of the number of *ongoing* syncs on other processes. # While any sync is ongoing on another process the user will never @@ -213,8 +216,7 @@ class PresenceHandler(object): self._event_pos = self.store.get_current_events_token() self._event_processing = False - @defer.inlineCallbacks - def _on_shutdown(self): + async def _on_shutdown(self): """Gets called when shutting down. This lets us persist any updates that we haven't yet persisted, e.g. updates that only changes some internal timers. This allows changes to persist across startup without having to @@ -235,7 +237,7 @@ class PresenceHandler(object): if self.unpersisted_users_changes: - yield self.store.update_presence( + await self.store.update_presence( [ self.user_to_current_state[user_id] for user_id in self.unpersisted_users_changes @@ -243,8 +245,7 @@ class PresenceHandler(object): ) logger.info("Finished _on_shutdown") - @defer.inlineCallbacks - def _persist_unpersisted_changes(self): + async def _persist_unpersisted_changes(self): """We periodically persist the unpersisted changes, as otherwise they may stack up and slow down shutdown times. """ @@ -253,12 +254,11 @@ class PresenceHandler(object): if unpersisted: logger.info("Persisting %d unpersisted presence updates", len(unpersisted)) - yield self.store.update_presence( + await self.store.update_presence( [self.user_to_current_state[user_id] for user_id in unpersisted] ) - @defer.inlineCallbacks - def _update_states(self, new_states): + async def _update_states(self, new_states): """Updates presence of users. Sets the appropriate timeouts. Pokes the notifier and federation if and only if the changed presence state should be sent to clients/servers. @@ -267,7 +267,7 @@ class PresenceHandler(object): with Measure(self.clock, "presence_update_states"): - # NOTE: We purposefully don't yield between now and when we've + # NOTE: We purposefully don't await between now and when we've # calculated what we want to do with the new states, to avoid races. to_notify = {} # Changes we want to notify everyone about @@ -311,7 +311,7 @@ class PresenceHandler(object): if to_notify: notified_presence_counter.inc(len(to_notify)) - yield self._persist_and_notify(list(to_notify.values())) + await self._persist_and_notify(list(to_notify.values())) self.unpersisted_users_changes |= {s.user_id for s in new_states} self.unpersisted_users_changes -= set(to_notify.keys()) @@ -326,7 +326,7 @@ class PresenceHandler(object): self._push_to_remotes(to_federation_ping.values()) - def _handle_timeouts(self): + async def _handle_timeouts(self): """Checks the presence of users that have timed out and updates as appropriate. """ @@ -368,10 +368,9 @@ class PresenceHandler(object): now=now, ) - return self._update_states(changes) + return await self._update_states(changes) - @defer.inlineCallbacks - def bump_presence_active_time(self, user): + async def bump_presence_active_time(self, user): """We've seen the user do something that indicates they're interacting with the app. """ @@ -383,16 +382,17 @@ class PresenceHandler(object): bump_active_time_counter.inc() - prev_state = yield self.current_state_for_user(user_id) + prev_state = await self.current_state_for_user(user_id) new_fields = {"last_active_ts": self.clock.time_msec()} if prev_state.state == PresenceState.UNAVAILABLE: new_fields["state"] = PresenceState.ONLINE - yield self._update_states([prev_state.copy_and_replace(**new_fields)]) + await self._update_states([prev_state.copy_and_replace(**new_fields)]) - @defer.inlineCallbacks - def user_syncing(self, user_id, affect_presence=True): + async def user_syncing( + self, user_id: str, affect_presence: bool = True + ) -> ContextManager[None]: """Returns a context manager that should surround any stream requests from the user. @@ -415,11 +415,11 @@ class PresenceHandler(object): curr_sync = self.user_to_num_current_syncs.get(user_id, 0) self.user_to_num_current_syncs[user_id] = curr_sync + 1 - prev_state = yield self.current_state_for_user(user_id) + prev_state = await self.current_state_for_user(user_id) if prev_state.state == PresenceState.OFFLINE: # If they're currently offline then bring them online, otherwise # just update the last sync times. - yield self._update_states( + await self._update_states( [ prev_state.copy_and_replace( state=PresenceState.ONLINE, @@ -429,7 +429,7 @@ class PresenceHandler(object): ] ) else: - yield self._update_states( + await self._update_states( [ prev_state.copy_and_replace( last_user_sync_ts=self.clock.time_msec() @@ -437,13 +437,12 @@ class PresenceHandler(object): ] ) - @defer.inlineCallbacks - def _end(): + async def _end(): try: self.user_to_num_current_syncs[user_id] -= 1 - prev_state = yield self.current_state_for_user(user_id) - yield self._update_states( + prev_state = await self.current_state_for_user(user_id) + await self._update_states( [ prev_state.copy_and_replace( last_user_sync_ts=self.clock.time_msec() @@ -480,8 +479,7 @@ class PresenceHandler(object): else: return set() - @defer.inlineCallbacks - def update_external_syncs_row( + async def update_external_syncs_row( self, process_id, user_id, is_syncing, sync_time_msec ): """Update the syncing users for an external process as a delta. @@ -494,8 +492,8 @@ class PresenceHandler(object): is_syncing (bool): Whether or not the user is now syncing sync_time_msec(int): Time in ms when the user was last syncing """ - with (yield self.external_sync_linearizer.queue(process_id)): - prev_state = yield self.current_state_for_user(user_id) + with (await self.external_sync_linearizer.queue(process_id)): + prev_state = await self.current_state_for_user(user_id) process_presence = self.external_process_to_current_syncs.setdefault( process_id, set() @@ -525,25 +523,24 @@ class PresenceHandler(object): process_presence.discard(user_id) if updates: - yield self._update_states(updates) + await self._update_states(updates) self.external_process_last_updated_ms[process_id] = self.clock.time_msec() - @defer.inlineCallbacks - def update_external_syncs_clear(self, process_id): + async def update_external_syncs_clear(self, process_id): """Marks all users that had been marked as syncing by a given process as offline. Used when the process has stopped/disappeared. """ - with (yield self.external_sync_linearizer.queue(process_id)): + with (await self.external_sync_linearizer.queue(process_id)): process_presence = self.external_process_to_current_syncs.pop( process_id, set() ) - prev_states = yield self.current_state_for_users(process_presence) + prev_states = await self.current_state_for_users(process_presence) time_now_ms = self.clock.time_msec() - yield self._update_states( + await self._update_states( [ prev_state.copy_and_replace(last_user_sync_ts=time_now_ms) for prev_state in itervalues(prev_states) @@ -551,15 +548,13 @@ class PresenceHandler(object): ) self.external_process_last_updated_ms.pop(process_id, None) - @defer.inlineCallbacks - def current_state_for_user(self, user_id): + async def current_state_for_user(self, user_id): """Get the current presence state for a user. """ - res = yield self.current_state_for_users([user_id]) + res = await self.current_state_for_users([user_id]) return res[user_id] - @defer.inlineCallbacks - def current_state_for_users(self, user_ids): + async def current_state_for_users(self, user_ids): """Get the current presence state for multiple users. Returns: @@ -574,7 +569,7 @@ class PresenceHandler(object): if missing: # There are things not in our in memory cache. Lets pull them out of # the database. - res = yield self.store.get_presence_for_users(missing) + res = await self.store.get_presence_for_users(missing) states.update(res) missing = [user_id for user_id, state in iteritems(states) if not state] @@ -587,14 +582,13 @@ class PresenceHandler(object): return states - @defer.inlineCallbacks - def _persist_and_notify(self, states): + async def _persist_and_notify(self, states): """Persist states in the database, poke the notifier and send to interested remote servers """ - stream_id, max_token = yield self.store.update_presence(states) + stream_id, max_token = await self.store.update_presence(states) - parties = yield get_interested_parties(self.store, states) + parties = await get_interested_parties(self.store, states) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( @@ -606,9 +600,8 @@ class PresenceHandler(object): self._push_to_remotes(states) - @defer.inlineCallbacks - def notify_for_states(self, state, stream_id): - parties = yield get_interested_parties(self.store, [state]) + async def notify_for_states(self, state, stream_id): + parties = await get_interested_parties(self.store, [state]) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( @@ -626,8 +619,7 @@ class PresenceHandler(object): """ self.federation.send_presence(states) - @defer.inlineCallbacks - def incoming_presence(self, origin, content): + async def incoming_presence(self, origin, content): """Called when we receive a `m.presence` EDU from a remote server. """ now = self.clock.time_msec() @@ -670,21 +662,19 @@ class PresenceHandler(object): new_fields["status_msg"] = push.get("status_msg", None) new_fields["currently_active"] = push.get("currently_active", False) - prev_state = yield self.current_state_for_user(user_id) + prev_state = await self.current_state_for_user(user_id) updates.append(prev_state.copy_and_replace(**new_fields)) if updates: federation_presence_counter.inc(len(updates)) - yield self._update_states(updates) + await self._update_states(updates) - @defer.inlineCallbacks - def get_state(self, target_user, as_event=False): - results = yield self.get_states([target_user.to_string()], as_event=as_event) + async def get_state(self, target_user, as_event=False): + results = await self.get_states([target_user.to_string()], as_event=as_event) return results[0] - @defer.inlineCallbacks - def get_states(self, target_user_ids, as_event=False): + async def get_states(self, target_user_ids, as_event=False): """Get the presence state for users. Args: @@ -695,7 +685,7 @@ class PresenceHandler(object): list """ - updates = yield self.current_state_for_users(target_user_ids) + updates = await self.current_state_for_users(target_user_ids) updates = list(updates.values()) for user_id in set(target_user_ids) - {u.user_id for u in updates}: @@ -713,8 +703,7 @@ class PresenceHandler(object): else: return updates - @defer.inlineCallbacks - def set_state(self, target_user, state, ignore_status_msg=False): + async def set_state(self, target_user, state, ignore_status_msg=False): """Set the presence state of the user. """ status_msg = state.get("status_msg", None) @@ -730,7 +719,7 @@ class PresenceHandler(object): user_id = target_user.to_string() - prev_state = yield self.current_state_for_user(user_id) + prev_state = await self.current_state_for_user(user_id) new_fields = {"state": presence} @@ -741,16 +730,15 @@ class PresenceHandler(object): if presence == PresenceState.ONLINE: new_fields["last_active_ts"] = self.clock.time_msec() - yield self._update_states([prev_state.copy_and_replace(**new_fields)]) + await self._update_states([prev_state.copy_and_replace(**new_fields)]) - @defer.inlineCallbacks - def is_visible(self, observed_user, observer_user): + async def is_visible(self, observed_user, observer_user): """Returns whether a user can see another user's presence. """ - observer_room_ids = yield self.store.get_rooms_for_user( + observer_room_ids = await self.store.get_rooms_for_user( observer_user.to_string() ) - observed_room_ids = yield self.store.get_rooms_for_user( + observed_room_ids = await self.store.get_rooms_for_user( observed_user.to_string() ) @@ -759,8 +747,7 @@ class PresenceHandler(object): return False - @defer.inlineCallbacks - def get_all_presence_updates(self, last_id, current_id): + async def get_all_presence_updates(self, last_id, current_id): """ Gets a list of presence update rows from between the given stream ids. Each row has: @@ -775,7 +762,7 @@ class PresenceHandler(object): """ # TODO(markjh): replicate the unpersisted changes. # This could use the in-memory stores for recent changes. - rows = yield self.store.get_all_presence_updates(last_id, current_id) + rows = await self.store.get_all_presence_updates(last_id, current_id) return rows def notify_new_event(self): @@ -786,20 +773,18 @@ class PresenceHandler(object): if self._event_processing: return - @defer.inlineCallbacks - def _process_presence(): + async def _process_presence(): assert not self._event_processing self._event_processing = True try: - yield self._unsafe_process() + await self._unsafe_process() finally: self._event_processing = False run_as_background_process("presence.notify_new_event", _process_presence) - @defer.inlineCallbacks - def _unsafe_process(self): + async def _unsafe_process(self): # Loop round handling deltas until we're up to date while True: with Measure(self.clock, "presence_delta"): @@ -812,10 +797,10 @@ class PresenceHandler(object): self._event_pos, room_max_stream_ordering, ) - max_pos, deltas = yield self.store.get_current_state_deltas( + max_pos, deltas = await self.store.get_current_state_deltas( self._event_pos, room_max_stream_ordering ) - yield self._handle_state_delta(deltas) + await self._handle_state_delta(deltas) self._event_pos = max_pos @@ -824,8 +809,7 @@ class PresenceHandler(object): max_pos ) - @defer.inlineCallbacks - def _handle_state_delta(self, deltas): + async def _handle_state_delta(self, deltas): """Process current state deltas to find new joins that need to be handled. """ @@ -846,13 +830,13 @@ class PresenceHandler(object): # joins. continue - event = yield self.store.get_event(event_id, allow_none=True) + event = await self.store.get_event(event_id, allow_none=True) if not event or event.content.get("membership") != Membership.JOIN: # We only care about joins continue if prev_event_id: - prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + prev_event = await self.store.get_event(prev_event_id, allow_none=True) if ( prev_event and prev_event.content.get("membership") == Membership.JOIN @@ -860,10 +844,9 @@ class PresenceHandler(object): # Ignore changes to join events. continue - yield self._on_user_joined_room(room_id, state_key) + await self._on_user_joined_room(room_id, state_key) - @defer.inlineCallbacks - def _on_user_joined_room(self, room_id, user_id): + async def _on_user_joined_room(self, room_id, user_id): """Called when we detect a user joining the room via the current state delta stream. @@ -882,8 +865,8 @@ class PresenceHandler(object): # TODO: We should be able to filter the hosts down to those that # haven't previously seen the user - state = yield self.current_state_for_user(user_id) - hosts = yield self.state.get_current_hosts_in_room(room_id) + state = await self.current_state_for_user(user_id) + hosts = await self.state.get_current_hosts_in_room(room_id) # Filter out ourselves. hosts = {host for host in hosts if host != self.server_name} @@ -903,10 +886,10 @@ class PresenceHandler(object): # TODO: Check that this is actually a new server joining the # room. - user_ids = yield self.state.get_current_users_in_room(room_id) + user_ids = await self.state.get_current_users_in_room(room_id) user_ids = list(filter(self.is_mine_id, user_ids)) - states = yield self.current_state_for_users(user_ids) + states = await self.current_state_for_users(user_ids) # Filter out old presence, i.e. offline presence states where # the user hasn't been active for a week. We can change this @@ -996,9 +979,8 @@ class PresenceEventSource(object): self.store = hs.get_datastore() self.state = hs.get_state_handler() - @defer.inlineCallbacks @log_function - def get_new_events( + async def get_new_events( self, user, from_key, @@ -1045,7 +1027,7 @@ class PresenceEventSource(object): presence = self.get_presence_handler() stream_change_cache = self.store.presence_stream_cache - users_interested_in = yield self._get_interested_in(user, explicit_room_id) + users_interested_in = await self._get_interested_in(user, explicit_room_id) user_ids_changed = set() changed = None @@ -1071,7 +1053,7 @@ class PresenceEventSource(object): else: user_ids_changed = users_interested_in - updates = yield presence.current_state_for_users(user_ids_changed) + updates = await presence.current_state_for_users(user_ids_changed) if include_offline: return (list(updates.values()), max_token) @@ -1084,11 +1066,11 @@ class PresenceEventSource(object): def get_current_key(self): return self.store.get_current_presence_token() - def get_pagination_rows(self, user, pagination_config, key): - return self.get_new_events(user, from_key=None, include_offline=False) + async def get_pagination_rows(self, user, pagination_config, key): + return await self.get_new_events(user, from_key=None, include_offline=False) - @cachedInlineCallbacks(num_args=2, cache_context=True) - def _get_interested_in(self, user, explicit_room_id, cache_context): + @cached(num_args=2, cache_context=True) + async def _get_interested_in(self, user, explicit_room_id, cache_context): """Returns the set of users that the given user should see presence updates for """ @@ -1096,13 +1078,13 @@ class PresenceEventSource(object): users_interested_in = set() users_interested_in.add(user_id) # So that we receive our own presence - users_who_share_room = yield self.store.get_users_who_share_room_with_user( + users_who_share_room = await self.store.get_users_who_share_room_with_user( user_id, on_invalidate=cache_context.invalidate ) users_interested_in.update(users_who_share_room) if explicit_room_id: - user_ids = yield self.store.get_users_in_room( + user_ids = await self.store.get_users_in_room( explicit_room_id, on_invalidate=cache_context.invalidate ) users_interested_in.update(user_ids) @@ -1277,8 +1259,8 @@ def get_interested_parties(store, states): 2-tuple: `(room_ids_to_states, users_to_states)`, with each item being a dict of `entity_name` -> `[UserPresenceState]` """ - room_ids_to_states = {} - users_to_states = {} + room_ids_to_states = {} # type: Dict[str, List[UserPresenceState]] + users_to_states = {} # type: Dict[str, List[UserPresenceState]] for state in states: room_ids = yield store.get_rooms_for_user(state.user_id) for room_id in room_ids: diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index ce60ae2e07..ce9d1fae12 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -323,7 +323,11 @@ class ReplicationStreamer(object): # We need to tell the presence handler that the connection has been # lost so that it can handle any ongoing syncs on that connection. - self.presence_handler.update_external_syncs_clear(connection.conn_id) + run_as_background_process( + "update_external_syncs_clear", + self.presence_handler.update_external_syncs_clear, + connection.conn_id, + ) def _batch_updates(updates): diff --git a/synapse/server.pyi b/synapse/server.pyi index 40eabfe5d9..3844f0e12f 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -3,6 +3,7 @@ import twisted.internet import synapse.api.auth import synapse.config.homeserver import synapse.crypto.keyring +import synapse.federation.federation_server import synapse.federation.sender import synapse.federation.transport.client import synapse.handlers @@ -107,5 +108,9 @@ class HomeServer(object): self, ) -> synapse.replication.tcp.client.ReplicationClientHandler: pass + def get_federation_registry( + self, + ) -> synapse.federation.federation_server.FederationHandlerRegistry: + pass def is_mine_id(self, domain_id: str) -> bool: pass diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 64915bafcd..05ea40a7de 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -494,8 +494,10 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): self.helper.join(room_id, "@test2:server") # Mark test2 as online, test will be offline with a last_active of 0 - self.presence_handler.set_state( - UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE} + self.get_success( + self.presence_handler.set_state( + UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE} + ) ) self.reactor.pump([0]) # Wait for presence updates to be handled @@ -543,14 +545,18 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): room_id = self.helper.create_room_as(self.user_id) # Mark test as online - self.presence_handler.set_state( - UserID.from_string("@test:server"), {"presence": PresenceState.ONLINE} + self.get_success( + self.presence_handler.set_state( + UserID.from_string("@test:server"), {"presence": PresenceState.ONLINE} + ) ) # Mark test2 as online, test will be offline with a last_active of 0. # Note we don't join them to the room yet - self.presence_handler.set_state( - UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE} + self.get_success( + self.presence_handler.set_state( + UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE} + ) ) # Add servers to the room diff --git a/tox.ini b/tox.ini index b715ea0bff..4ccfde01b5 100644 --- a/tox.ini +++ b/tox.ini @@ -183,6 +183,7 @@ commands = mypy \ synapse/events/spamcheck.py \ synapse/federation/sender \ synapse/federation/transport \ + synapse/handlers/presence.py \ synapse/handlers/sync.py \ synapse/handlers/ui_auth \ synapse/logging/ \ -- cgit 1.5.1