diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/storage/_base.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/cache.py | 10 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 3 | ||||
-rw-r--r-- | synapse/storage/databases/main/roommember.py | 67 | ||||
-rw-r--r-- | synapse/storage/databases/main/stream.py | 33 |
5 files changed, 31 insertions, 88 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b127289d8d..881888fa93 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -119,9 +119,6 @@ class SQLBaseStore(metaclass=ABCMeta): self._attempt_to_invalidate_cache( "get_user_in_room_with_profile", (room_id, user_id) ) - self._attempt_to_invalidate_cache( - "get_rooms_for_user_with_stream_ordering", (user_id,) - ) self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,)) # Purge other caches based on room state. @@ -148,9 +145,6 @@ class SQLBaseStore(metaclass=ABCMeta): self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None) self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None) - self._attempt_to_invalidate_cache( - "get_rooms_for_user_with_stream_ordering", None - ) self._attempt_to_invalidate_cache("get_rooms_for_user", None) self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index c6787faea0..2d6b75e47e 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -268,16 +268,12 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined] if data.type == EventTypes.Member: - self.get_rooms_for_user_with_stream_ordering.invalidate( # type: ignore[attr-defined] - (data.state_key,) - ) self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined] elif row.type == EventsStreamAllStateRow.TypeId: assert isinstance(data, EventsStreamAllStateRow) # Similar to the above, but the entire caches are invalidated. This is # unfortunate for the membership caches, but should recover quickly. self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined] - self.get_rooms_for_user_with_stream_ordering.invalidate_all() # type: ignore[attr-defined] self.get_rooms_for_user.invalidate_all() # type: ignore[attr-defined] else: raise Exception("Unknown events stream row type %s" % (row.type,)) @@ -334,9 +330,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._attempt_to_invalidate_cache( "get_invited_rooms_for_local_user", (state_key,) ) - self._attempt_to_invalidate_cache( - "get_rooms_for_user_with_stream_ordering", (state_key,) - ) self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,)) self._attempt_to_invalidate_cache( @@ -399,9 +392,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._attempt_to_invalidate_cache("get_thread_id", None) self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None) self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None) - self._attempt_to_invalidate_cache( - "get_rooms_for_user_with_stream_ordering", None - ) self._attempt_to_invalidate_cache("get_rooms_for_user", None) self._attempt_to_invalidate_cache("did_forget", None) self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index a5acea8c3b..4d4877c4c3 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1457,7 +1457,8 @@ class EventsWorkerStore(SQLBaseStore): event_dict[event_id] = _EventRow( event_id=event_id, stream_ordering=row[1], - instance_name=row[2], + # If instance_name is null we default to "master" + instance_name=row[2] or "master", internal_metadata=row[3], json=row[4], format_version=row[5], diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index d8b54dc4e3..5d2fd08495 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -50,12 +50,7 @@ from synapse.storage.database import ( from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import Sqlite3Engine -from synapse.storage.roommember import ( - GetRoomsForUserWithStreamOrdering, - MemberSummary, - ProfileInfo, - RoomsForUser, -) +from synapse.storage.roommember import MemberSummary, ProfileInfo, RoomsForUser from synapse.types import ( JsonDict, PersistedEventPosition, @@ -494,7 +489,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): sender=sender, membership=membership, event_id=event_id, - event_pos=PersistedEventPosition(instance_name, stream_ordering), + event_pos=PersistedEventPosition( + # If instance_name is null we default to "master" + instance_name or "master", + stream_ordering, + ), room_version_id=room_version, ) for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn @@ -606,53 +605,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): return results - @cached(max_entries=500000, iterable=True) - async def get_rooms_for_user_with_stream_ordering( - self, user_id: str - ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: - """Returns a set of room_ids the user is currently joined to. - - If a remote user only returns rooms this server is currently - participating in. - - Args: - user_id - - Returns: - Returns the rooms the user is in currently, along with the stream - ordering of the most recent join for that user and room, along with - the room version of the room. - """ - return await self.db_pool.runInteraction( - "get_rooms_for_user_with_stream_ordering", - self._get_rooms_for_user_with_stream_ordering_txn, - user_id, - ) - - def _get_rooms_for_user_with_stream_ordering_txn( - self, txn: LoggingTransaction, user_id: str - ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: - # We use `current_state_events` here and not `local_current_membership` - # as a) this gets called with remote users and b) this only gets called - # for rooms the server is participating in. - sql = """ - SELECT room_id, e.instance_name, e.stream_ordering - FROM current_state_events AS c - INNER JOIN events AS e USING (room_id, event_id) - WHERE - c.type = 'm.room.member' - AND c.state_key = ? - AND c.membership = ? - """ - - txn.execute(sql, (user_id, Membership.JOIN)) - return frozenset( - GetRoomsForUserWithStreamOrdering( - room_id, PersistedEventPosition(instance, stream_id) - ) - for room_id, instance, stream_id in txn - ) - async def get_users_server_still_shares_room_with( self, user_ids: Collection[str] ) -> Set[str]: @@ -701,13 +653,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): If a remote user only returns rooms this server is currently participating in. """ - rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate( - (user_id,), - None, - update_metrics=False, - ) - if rooms: - return frozenset(r.room_id for r in rooms) room_ids = await self.db_pool.simple_select_onecol( table="current_state_events", diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index be81025355..e74e0d2e91 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -371,7 +371,7 @@ def _make_generic_sql_bound( def _filter_results( lower_token: Optional[RoomStreamToken], upper_token: Optional[RoomStreamToken], - instance_name: str, + instance_name: Optional[str], topological_ordering: int, stream_ordering: int, ) -> bool: @@ -384,8 +384,14 @@ def _filter_results( position maps, which we handle by fetching more than necessary from the DB and then filtering (rather than attempting to construct a complicated SQL query). + + The `instance_name` arg is optional to handle historic rows, and is + interpreted as if it was "master". """ + if instance_name is None: + instance_name = "master" + event_historical_tuple = ( topological_ordering, stream_ordering, @@ -420,7 +426,7 @@ def _filter_results( def _filter_results_by_stream( lower_token: Optional[RoomStreamToken], upper_token: Optional[RoomStreamToken], - instance_name: str, + instance_name: Optional[str], stream_ordering: int, ) -> bool: """ @@ -436,7 +442,14 @@ def _filter_results_by_stream( position maps, which we handle by fetching more than necessary from the DB and then filtering (rather than attempting to construct a complicated SQL query). + + The `instance_name` arg is optional to handle historic rows, and is + interpreted as if it was "master". """ + + if instance_name is None: + instance_name = "master" + if lower_token: assert lower_token.topological is None @@ -912,7 +925,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): prev_sender, ) in txn: assert room_id is not None - assert instance_name is not None assert stream_ordering is not None if _filter_results_by_stream( @@ -936,7 +948,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # Event event_id=event_id, event_pos=PersistedEventPosition( - instance_name=instance_name, + # If instance_name is null we default to "master" + instance_name=instance_name or "master", stream=stream_ordering, ), # When `s.event_id = null`, we won't be able to get respective @@ -952,13 +965,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): prev_event_id=prev_event_id, prev_event_pos=( PersistedEventPosition( - instance_name=prev_instance_name, + # If instance_name is null we default to "master" + instance_name=prev_instance_name or "master", stream=prev_stream_ordering, ) - if ( - prev_instance_name is not None - and prev_stream_ordering is not None - ) + if (prev_stream_ordering is not None) else None ), prev_membership=prev_membership, @@ -1270,7 +1281,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): stream_ordering=stream_ordering, ): return event_id, PersistedEventPosition( - instance_name, stream_ordering + # If instance_name is null we default to "master" + instance_name or "master", + stream_ordering, ) return None |