From c37dad67ab04980ac934554399f52a27e54292ab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Aug 2021 13:54:51 +0100 Subject: Improve event caching code (#10119) Ensure we only load an event from the DB once when the same event is requested multiple times at once. --- synapse/storage/databases/main/roommember.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'synapse/storage/databases/main/roommember.py') diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 68f1b40ea6..e8157ba3d4 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -629,14 +629,12 @@ class RoomMemberWorkerStore(EventsWorkerStore): # We don't update the event cache hit ratio as it completely throws off # the hit ratio counts. After all, we don't populate the cache if we # miss it here - event_map = self._get_events_from_cache( - member_event_ids, allow_rejected=False, update_metrics=False - ) + event_map = self._get_events_from_cache(member_event_ids, update_metrics=False) missing_member_event_ids = [] for event_id in member_event_ids: ev_entry = event_map.get(event_id) - if ev_entry: + if ev_entry and not ev_entry.event.rejected_reason: if ev_entry.event.membership == Membership.JOIN: users_in_room[ev_entry.event.state_key] = ProfileInfo( display_name=ev_entry.event.content.get("displayname", None), -- cgit 1.5.1 From bec01c075829730cf467572e2fcf93e15372b0e9 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 18 Aug 2021 09:22:07 -0400 Subject: Convert room member storage tuples to attrs. (#10629) Instead of using namedtuples. This helps with asserting type hints and code completion. --- changelog.d/10629.misc | 1 + synapse/handlers/initial_sync.py | 2 +- synapse/handlers/sync.py | 18 +++++----- synapse/storage/databases/main/roommember.py | 8 +++-- synapse/storage/databases/main/user_directory.py | 2 +- synapse/storage/roommember.py | 43 ++++++++++++++++-------- tests/replication/slave/storage/test_events.py | 9 +++-- 7 files changed, 54 insertions(+), 29 deletions(-) create mode 100644 changelog.d/10629.misc (limited to 'synapse/storage/databases/main/roommember.py') diff --git a/changelog.d/10629.misc b/changelog.d/10629.misc new file mode 100644 index 0000000000..cca1eb6c57 --- /dev/null +++ b/changelog.d/10629.misc @@ -0,0 +1 @@ +Convert room member storage tuples to `attrs` classes. diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index e1c544a3c9..4e8f7f1d85 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -151,7 +151,7 @@ class InitialSyncHandler(BaseHandler): limit = 10 async def handle_room(event: RoomsForUser): - d = { + d: JsonDict = { "room_id": event.room_id, "membership": event.membership, "visibility": ( diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index eba915819e..b7b299961f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -701,7 +701,7 @@ class SyncHandler: name_id = state_ids.get((EventTypes.Name, "")) canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, "")) - summary = {} + summary: JsonDict = {} empty_ms = MemberSummary([], 0) # TODO: only send these when they change. @@ -2076,21 +2076,23 @@ class SyncHandler: # If the membership's stream ordering is after the given stream # ordering, we need to go and work out if the user was in the room # before. - for room_id, event_pos in joined_rooms: - if not event_pos.persisted_after(room_key): - joined_room_ids.add(room_id) + for joined_room in joined_rooms: + if not joined_room.event_pos.persisted_after(room_key): + joined_room_ids.add(joined_room.room_id) continue - logger.info("User joined room after current token: %s", room_id) + logger.info("User joined room after current token: %s", joined_room.room_id) extrems = ( await self.store.get_forward_extremities_for_room_at_stream_ordering( - room_id, event_pos.stream + joined_room.room_id, joined_room.event_pos.stream ) ) - users_in_room = await self.state.get_current_users_in_room(room_id, extrems) + users_in_room = await self.state.get_current_users_in_room( + joined_room.room_id, extrems + ) if user_id in users_in_room: - joined_room_ids.add(room_id) + joined_room_ids.add(joined_room.room_id) return frozenset(joined_room_ids) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index e8157ba3d4..c2f6b9d63d 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -307,7 +307,9 @@ class RoomMemberWorkerStore(EventsWorkerStore): ) @cached() - async def get_invited_rooms_for_local_user(self, user_id: str) -> RoomsForUser: + async def get_invited_rooms_for_local_user( + self, user_id: str + ) -> List[RoomsForUser]: """Get all the rooms the *local* user is invited to. Args: @@ -522,7 +524,9 @@ class RoomMemberWorkerStore(EventsWorkerStore): _get_users_server_still_shares_room_with_txn, ) - async def get_rooms_for_user(self, user_id: str, on_invalidate=None): + async def get_rooms_for_user( + self, user_id: str, on_invalidate=None + ) -> FrozenSet[str]: """Returns a set of room_ids the user is currently joined to. If a remote user only returns rooms this server is currently diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 9d28d69ac7..65dde67ae9 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -365,7 +365,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): return False async def update_profile_in_user_dir( - self, user_id: str, display_name: str, avatar_url: str + self, user_id: str, display_name: Optional[str], avatar_url: Optional[str] ) -> None: """ Update or add a user's profile in the user directory. diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index c34fbf21bc..0ff66debdf 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -14,25 +14,40 @@ # limitations under the License. import logging -from collections import namedtuple +from typing import List, Optional, Tuple + +import attr + +from synapse.types import PersistedEventPosition logger = logging.getLogger(__name__) -RoomsForUser = namedtuple( - "RoomsForUser", ("room_id", "sender", "membership", "event_id", "stream_ordering") -) +@attr.s(slots=True, frozen=True, weakref_slot=True, auto_attribs=True) +class RoomsForUser: + room_id: str + sender: str + membership: str + event_id: str + stream_ordering: int + + +@attr.s(slots=True, frozen=True, weakref_slot=True, auto_attribs=True) +class GetRoomsForUserWithStreamOrdering: + room_id: str + event_pos: PersistedEventPosition -GetRoomsForUserWithStreamOrdering = namedtuple( - "GetRoomsForUserWithStreamOrdering", ("room_id", "event_pos") -) +@attr.s(slots=True, frozen=True, weakref_slot=True, auto_attribs=True) +class ProfileInfo: + avatar_url: Optional[str] + display_name: Optional[str] -# We store this using a namedtuple so that we save about 3x space over using a -# dict. -ProfileInfo = namedtuple("ProfileInfo", ("avatar_url", "display_name")) -# "members" points to a truncated list of (user_id, event_id) tuples for users of -# a given membership type, suitable for use in calculating heroes for a room. -# "count" points to the total numberr of users of a given membership type. -MemberSummary = namedtuple("MemberSummary", ("members", "count")) +@attr.s(slots=True, frozen=True, weakref_slot=True, auto_attribs=True) +class MemberSummary: + # A truncated list of (user_id, event_id) tuples for users of a given + # membership type, suitable for use in calculating heroes for a room. + members: List[Tuple[str, str]] + # The total number of users of a given membership type. + count: int diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index db80a0bdbd..8d1b0606c4 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -20,7 +20,7 @@ from synapse.api.room_versions import RoomVersions from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict from synapse.handlers.room import RoomEventSource from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.storage.roommember import RoomsForUser +from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser from synapse.types import PersistedEventPosition from tests.server import FakeTransport @@ -216,7 +216,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): self.check( "get_rooms_for_user_with_stream_ordering", (USER_ID_2,), - {(ROOM_ID, expected_pos)}, + {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)}, ) def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(self): @@ -305,7 +305,10 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): expected_pos = PersistedEventPosition( "master", j2.internal_metadata.stream_ordering ) - self.assertEqual(joined_rooms, {(ROOM_ID, expected_pos)}) + self.assertEqual( + joined_rooms, + {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)}, + ) event_id = 0 -- cgit 1.5.1 From 000aa89be63c27092998eca03c97eaead21404cd Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 19 Aug 2021 11:12:55 -0400 Subject: Do not include rooms with an unknown room version in a sync response. (#10644) A user will still see this room if it is in a local cache, but it will not reappear if clearing the cache and reloading. --- changelog.d/10644.bugfix | 1 + mypy.ini | 1 + synapse/handlers/sync.py | 7 +- synapse/storage/databases/main/roommember.py | 8 +- synapse/storage/roommember.py | 1 + tests/handlers/test_sync.py | 137 +++++++++++++++++++++++-- tests/replication/slave/storage/test_events.py | 1 + 7 files changed, 145 insertions(+), 11 deletions(-) create mode 100644 changelog.d/10644.bugfix (limited to 'synapse/storage/databases/main/roommember.py') diff --git a/changelog.d/10644.bugfix b/changelog.d/10644.bugfix new file mode 100644 index 0000000000..d88a81fd82 --- /dev/null +++ b/changelog.d/10644.bugfix @@ -0,0 +1 @@ +Rooms with unsupported room versions are no longer returned via `/sync`. diff --git a/mypy.ini b/mypy.ini index 107f4de76c..90ade37b3f 100644 --- a/mypy.ini +++ b/mypy.ini @@ -90,6 +90,7 @@ files = tests/test_utils, tests/handlers/test_password_providers.py, tests/handlers/test_room_summary.py, + tests/handlers/test_sync.py, tests/rest/client/v1/test_login.py, tests/rest/client/v2_alpha/test_auth.py, tests/util/test_itertools.py, diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b7b299961f..2203c45dcc 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1,5 +1,4 @@ -# Copyright 2015, 2016 OpenMarket Ltd -# Copyright 2018, 2019 New Vector Ltd +# Copyright 2015-2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -31,6 +30,7 @@ from prometheus_client import Counter from synapse.api.constants import AccountDataTypes, EventTypes, Membership from synapse.api.filtering import FilterCollection +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase from synapse.logging.context import current_context from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span @@ -1843,6 +1843,9 @@ class SyncHandler: knocked = [] for event in room_list: + if event.room_version_id not in KNOWN_ROOM_VERSIONS: + continue + if event.membership == Membership.JOIN: room_entries.append( RoomSyncResultBuilder( diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index c2f6b9d63d..c58a4b8690 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -386,9 +386,10 @@ class RoomMemberWorkerStore(EventsWorkerStore): ) sql = """ - SELECT room_id, e.sender, c.membership, event_id, e.stream_ordering + SELECT room_id, e.sender, c.membership, event_id, e.stream_ordering, r.room_version FROM local_current_membership AS c INNER JOIN events AS e USING (room_id, event_id) + INNER JOIN rooms AS r USING (room_id) WHERE user_id = ? AND %s @@ -397,7 +398,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): ) txn.execute(sql, (user_id, *args)) - results = [RoomsForUser(**r) for r in self.db_pool.cursor_to_dict(txn)] + results = [RoomsForUser(*r) for r in txn] return results @@ -447,7 +448,8 @@ class RoomMemberWorkerStore(EventsWorkerStore): Returns: Returns the rooms the user is in currently, along with the stream - ordering of the most recent join for that user and room. + ordering of the most recent join for that user and room, along with + the room version of the room. """ return await self.db_pool.runInteraction( "get_rooms_for_user_with_stream_ordering", diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 9fad67ce48..2500381b7b 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -30,6 +30,7 @@ class RoomsForUser: membership: str event_id: str stream_ordering: int + room_version_id: str @attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True) diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 84f05f6c58..339c039914 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -12,9 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Optional + +from synapse.api.constants import EventTypes, JoinRules from synapse.api.errors import Codes, ResourceLimitError from synapse.api.filtering import DEFAULT_FILTER_COLLECTION +from synapse.api.room_versions import RoomVersions from synapse.handlers.sync import SyncConfig +from synapse.rest import admin +from synapse.rest.client import knock, login, room +from synapse.server import HomeServer from synapse.types import UserID, create_requester import tests.unittest @@ -24,8 +31,14 @@ import tests.utils class SyncTestCase(tests.unittest.HomeserverTestCase): """Tests Sync Handler.""" - def prepare(self, reactor, clock, hs): - self.hs = hs + servlets = [ + admin.register_servlets, + knock.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def prepare(self, reactor, clock, hs: HomeServer): self.sync_handler = self.hs.get_sync_handler() self.store = self.hs.get_datastore() @@ -68,12 +81,124 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): ) self.assertEquals(e.value.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) + def test_unknown_room_version(self): + """ + A room with an unknown room version should not break sync (and should be excluded). + """ + inviter = self.register_user("creator", "pass", admin=True) + inviter_tok = self.login("@creator:test", "pass") + + user = self.register_user("user", "pass") + tok = self.login("user", "pass") + + # Do an initial sync on a different device. + requester = create_requester(user) + initial_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + requester, sync_config=generate_sync_config(user, device_id="dev") + ) + ) + + # Create a room as the user. + joined_room = self.helper.create_room_as(user, tok=tok) + + # Invite the user to the room as someone else. + invite_room = self.helper.create_room_as(inviter, tok=inviter_tok) + self.helper.invite(invite_room, targ=user, tok=inviter_tok) + + knock_room = self.helper.create_room_as( + inviter, room_version=RoomVersions.V7.identifier, tok=inviter_tok + ) + self.helper.send_state( + knock_room, + EventTypes.JoinRules, + {"join_rule": JoinRules.KNOCK}, + tok=inviter_tok, + ) + channel = self.make_request( + "POST", + "/_matrix/client/r0/knock/%s" % (knock_room,), + b"{}", + tok, + ) + self.assertEquals(200, channel.code, channel.result) + + # The rooms should appear in the sync response. + result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + requester, sync_config=generate_sync_config(user) + ) + ) + self.assertIn(joined_room, [r.room_id for r in result.joined]) + self.assertIn(invite_room, [r.room_id for r in result.invited]) + self.assertIn(knock_room, [r.room_id for r in result.knocked]) + + # Test a incremental sync (by providing a since_token). + result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + requester, + sync_config=generate_sync_config(user, device_id="dev"), + since_token=initial_result.next_batch, + ) + ) + self.assertIn(joined_room, [r.room_id for r in result.joined]) + self.assertIn(invite_room, [r.room_id for r in result.invited]) + self.assertIn(knock_room, [r.room_id for r in result.knocked]) + + # Poke the database and update the room version to an unknown one. + for room_id in (joined_room, invite_room, knock_room): + self.get_success( + self.hs.get_datastores().main.db_pool.simple_update( + "rooms", + keyvalues={"room_id": room_id}, + updatevalues={"room_version": "unknown-room-version"}, + desc="updated-room-version", + ) + ) + + # Blow away caches (supported room versions can only change due to a restart). + self.get_success( + self.store.get_rooms_for_user_with_stream_ordering.invalidate_all() + ) + self.store._get_event_cache.clear() + + # The rooms should be excluded from the sync response. + # Get a new request key. + result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + requester, sync_config=generate_sync_config(user) + ) + ) + self.assertNotIn(joined_room, [r.room_id for r in result.joined]) + self.assertNotIn(invite_room, [r.room_id for r in result.invited]) + self.assertNotIn(knock_room, [r.room_id for r in result.knocked]) + + # The rooms should also not be in an incremental sync. + result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + requester, + sync_config=generate_sync_config(user, device_id="dev"), + since_token=initial_result.next_batch, + ) + ) + self.assertNotIn(joined_room, [r.room_id for r in result.joined]) + self.assertNotIn(invite_room, [r.room_id for r in result.invited]) + self.assertNotIn(knock_room, [r.room_id for r in result.knocked]) + + +_request_key = 0 + -def generate_sync_config(user_id: str) -> SyncConfig: +def generate_sync_config( + user_id: str, device_id: Optional[str] = "device_id" +) -> SyncConfig: + """Generate a sync config (with a unique request key).""" + global _request_key + _request_key += 1 return SyncConfig( - user=UserID(user_id.split(":")[0][1:], user_id.split(":")[1]), + user=UserID.from_string(user_id), filter_collection=DEFAULT_FILTER_COLLECTION, is_guest=False, - request_key="request_key", - device_id="device_id", + request_key=("request_key", _request_key), + device_id=device_id, ) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 8d1b0606c4..b25a06b427 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -150,6 +150,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): "invite", event.event_id, event.internal_metadata.stream_ordering, + RoomVersions.V1.identifier, ) ], ) -- cgit 1.5.1