diff options
-rw-r--r-- | synapse/handlers/sliding_sync.py | 114 | ||||
-rw-r--r-- | synapse/rest/client/sync.py | 15 | ||||
-rw-r--r-- | synapse/server_notices/server_notices_manager.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/cache.py | 3 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 24 | ||||
-rw-r--r-- | synapse/storage/databases/main/room.py | 24 | ||||
-rw-r--r-- | synapse/storage/databases/main/roommember.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/stream.py | 145 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/85/07_sliding_sync.sql | 24 | ||||
-rw-r--r-- | tests/rest/client/test_account.py | 2 | ||||
-rw-r--r-- | tests/storage/test_event_chain.py | 1 | ||||
-rw-r--r-- | tests/storage/test_roommember.py | 2 |
12 files changed, 317 insertions, 45 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index c0e5192f00..4f8e6a0f8d 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -444,6 +444,7 @@ class SlidingSyncHandler: return result + @trace async def current_sync_for_user( self, sync_config: SlidingSyncConfig, @@ -504,9 +505,14 @@ class SlidingSyncHandler: for list_key, list_config in sync_config.lists.items(): # Apply filters filtered_sync_room_map = sync_room_map - if list_config.filters is not None: + + if list_config.filters: + filtered_sync_room_map = await self.filter_rooms( - sync_config.user, sync_room_map, list_config.filters, to_token + sync_config.user, + filtered_sync_room_map, + list_config.filters, + to_token, ) # Sort the list @@ -616,6 +622,30 @@ class SlidingSyncHandler: else: relevant_room_map[room_id] = room_sync_config + # Filter out rooms that haven't received updates and we've sent down + # previously. + if from_token: + rooms_should_send = set() + for room_id in relevant_room_map: + status = await self.connection_store.have_sent_room( + sync_config, + from_token.connection_position, + room_id, + ) + if status.status != HaveSentRoomFlag.LIVE: + rooms_should_send.add(room_id) + + # TODO: Also check current state delta stream + rooms_that_have_updates = ( + self.store._events_stream_cache.get_entities_changed( + relevant_room_map, from_token.stream_token.room_key.stream + ) + ) + rooms_should_send.update(rooms_that_have_updates) + relevant_room_map = { + r: c for r, c in relevant_room_map.items() if r in rooms_should_send + } + # Fetch room data rooms: Dict[str, SlidingSyncResult.RoomResult] = {} @@ -726,13 +756,12 @@ class SlidingSyncHandler: # First grab a current snapshot rooms for the user # (also handles forgotten rooms) - room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is( - user_id=user_id, - # We want to fetch any kind of membership (joined and left rooms) in order - # to get the `event_pos` of the latest room membership event for the - # user. - membership_list=Membership.LIST, - excluded_rooms=self.rooms_to_exclude_globally, + room_for_user_list = ( + await self.store.get_rooms_for_local_user_where_membership_is( + user_id=user_id, + membership_list=Membership.LIST, + excluded_rooms=self.rooms_to_exclude_globally, + ) ) # If the user has never joined any rooms before, we can just return an empty list @@ -1143,6 +1172,7 @@ class SlidingSyncHandler: # return None + @trace async def filter_rooms( self, user: UserID, @@ -1266,6 +1296,7 @@ class SlidingSyncHandler: # Assemble a new sync room map but only with the `filtered_room_id_set` return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set} + @trace async def sort_rooms( self, sync_room_map: Dict[str, _RoomMembershipForUser], @@ -1509,6 +1540,10 @@ class SlidingSyncHandler: room_membership_for_user_at_to_token.event_pos.to_room_stream_token() ) + fiddled_timeline_limit = room_sync_config.timeline_limit + # if to_bound: + # fiddled_timeline_limit = max(fiddled_timeline_limit, 10) + timeline_events, new_room_key = await self.store.paginate_room_events( room_id=room_id, # The bounds are reversed so we can paginate backwards @@ -1519,7 +1554,7 @@ class SlidingSyncHandler: direction=Direction.BACKWARDS, # We add one so we can determine if there are enough events to saturate # the limit or not (see `limited`) - limit=room_sync_config.timeline_limit + 1, + limit=fiddled_timeline_limit + 1, event_filter=None, ) @@ -1530,11 +1565,11 @@ class SlidingSyncHandler: # Determine our `limited` status based on the timeline. We do this before # filtering the events so we can accurately determine if there is more to # paginate even if we filter out some/all events. - if len(timeline_events) > room_sync_config.timeline_limit: + if len(timeline_events) > fiddled_timeline_limit: limited = True # Get rid of that extra "+ 1" event because we only used it to determine # if we hit the limit or not - timeline_events = timeline_events[-room_sync_config.timeline_limit :] + timeline_events = timeline_events[-fiddled_timeline_limit:] assert timeline_events[0].internal_metadata.stream_ordering new_room_key = RoomStreamToken( stream=timeline_events[0].internal_metadata.stream_ordering - 1 @@ -1826,24 +1861,36 @@ class SlidingSyncHandler: ) # Figure out the last bump event in the room - last_bump_event_result = ( - await self.store.get_last_event_pos_in_room_before_stream_ordering( - room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES + bump_stamp = None + if timeline_events: + for e in reversed(timeline_events): + if ( + e.type in DEFAULT_BUMP_EVENT_TYPES + and e.internal_metadata.stream_ordering > 0 + ): + bump_stamp = e.internal_metadata.stream_ordering + break + + if bump_stamp is None: + # By default, just choose the membership event position + bump_stamp = room_membership_for_user_at_to_token.event_pos.stream + + last_bump_event_result = ( + await self.store.get_last_event_pos_in_room_before_stream_ordering( + room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES + ) ) - ) - # By default, just choose the membership event position - bump_stamp = room_membership_for_user_at_to_token.event_pos.stream - # But if we found a bump event, use that instead - if last_bump_event_result is not None: - _, new_bump_event_pos = last_bump_event_result + # But if we found a bump event, use that instead + if last_bump_event_result is not None: + _, new_bump_event_pos = last_bump_event_result - # If we've just joined a remote room, then the last bump event may - # have been backfilled (and so have a negative stream ordering). - # These negative stream orderings can't sensibly be compared, so - # instead we use the membership event position. - if new_bump_event_pos.stream > 0: - bump_stamp = new_bump_event_pos.stream + # If we've just joined a remote room, then the last bump event may + # have been backfilled (and so have a negative stream ordering). + # These negative stream orderings can't sensibly be compared, so + # instead we use the membership event position. + if new_bump_event_pos.stream > 0: + bump_stamp = new_bump_event_pos.stream return SlidingSyncResult.RoomResult( name=room_name, @@ -1978,12 +2025,13 @@ class SlidingSyncHandler: up_to_stream_id=since_stream_id, ) - logger.debug( - "Deleted %d to-device messages up to %d for %s", - deleted, - since_stream_id, - user_id, - ) + if deleted: + logger.debug( + "Deleted %d to-device messages up to %d for %s", + deleted, + since_stream_id, + user_id, + ) messages, stream_id = await self.store.get_messages_for_device( user_id=user_id, diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index bf3ac8d483..fcb054b970 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -49,10 +49,13 @@ from synapse.http.servlet import ( parse_and_validate_json_object_from_request, parse_boolean, parse_integer, + parse_json_object_from_request, + parse_json_value_from_request, parse_string, + validate_json_object, ) from synapse.http.site import SynapseRequest -from synapse.logging.opentracing import trace_with_opname +from synapse.logging.opentracing import log_kv, set_tag, trace_with_opname from synapse.rest.admin.experimental_features import ExperimentalFeature from synapse.types import JsonDict, Requester, SlidingSyncStreamToken, StreamToken from synapse.types.rest.client import SlidingSyncBody @@ -896,8 +899,14 @@ class SlidingSyncRestServlet(RestServlet): # maybe some filters like sync v2 where they are built up once and referenced # by filter ID. For now, we will just prototype with always passing everything # in. - body = parse_and_validate_json_object_from_request(request, SlidingSyncBody) + content = parse_json_object_from_request(request, allow_empty_body=False) + body = validate_json_object(content, SlidingSyncBody) logger.info("Sliding sync request: %r", body) + logger.info("Sliding sync json: %r", content) + log_kv({"request_body": body}) + + if body.lists: + set_tag("sliding_sync.lists", True) sync_config = SlidingSyncConfig( user=user, @@ -985,7 +994,7 @@ class SlidingSyncRestServlet(RestServlet): serialized_rooms: Dict[str, JsonDict] = {} for room_id, room_result in rooms.items(): serialized_rooms[room_id] = { - "bump_stamp": room_result.bump_stamp, + "bump_stamp": abs(room_result.bump_stamp), "joined_count": room_result.joined_count, "invited_count": room_result.invited_count, "notification_count": room_result.notification_count, diff --git a/synapse/server_notices/server_notices_manager.py b/synapse/server_notices/server_notices_manager.py index 001a290e87..e9cdc628d5 100644 --- a/synapse/server_notices/server_notices_manager.py +++ b/synapse/server_notices/server_notices_manager.py @@ -114,7 +114,7 @@ class ServerNoticesManager: return None rooms = await self._store.get_rooms_for_local_user_where_membership_is( - user_id, [Membership.INVITE, Membership.JOIN] + user_id, (Membership.INVITE, Membership.JOIN) ) for room in rooms: # it's worth noting that there is an asymmetry here in that we @@ -262,7 +262,7 @@ class ServerNoticesManager: # Check whether the user has already joined or been invited to this room. If # that's the case, there is no need to re-invite them. joined_rooms = await self._store.get_rooms_for_local_user_where_membership_is( - user_id, [Membership.INVITE, Membership.JOIN] + user_id, (Membership.INVITE, Membership.JOIN) ) for room in joined_rooms: if room.room_id == room_id: diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 26b8e1a172..8c2c0c5ab0 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -309,6 +309,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore): if not backfilled: self._events_stream_cache.entity_has_changed(room_id, stream_ordering) # type: ignore[attr-defined] + self._attempt_to_invalidate_cache( + "get_max_stream_ordering_in_room", (room_id,) + ) if redacts: self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined] diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1f7acdb859..0c7c2f9306 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -551,7 +551,7 @@ class PersistEventsStore: # From this point onwards the events are only events that we haven't # seen before. - self._store_event_txn(txn, events_and_contexts=events_and_contexts) + self._store_event_txn(txn, room_id, events_and_contexts=events_and_contexts) if new_forward_extremities: self._update_forward_extremities_txn( @@ -1555,6 +1555,7 @@ class PersistEventsStore: def _store_event_txn( self, txn: LoggingTransaction, + room_id: str, events_and_contexts: Collection[Tuple[EventBase, EventContext]], ) -> None: """Insert new events into the event, event_json, redaction and @@ -1629,6 +1630,27 @@ class PersistEventsStore: ], ) + # Update the `sliding_sync_room_metadata` with the latest + # (non-backfilled, ie positive) stream ordering. + # + # We know this list is sorted and non-empty, so we just take the last + # one event. + max_stream_ordering: int + for e, _ in events_and_contexts: + assert e.internal_metadata.stream_ordering is not None + max_stream_ordering = e.internal_metadata.stream_ordering + + if max_stream_ordering > 0: + self.db_pool.simple_upsert_txn( + txn, + table="sliding_sync_room_metadata", + keyvalues={"room_id": room_id}, + values={ + "instance_name": self._instance_name, + "last_stream_ordering": max_stream_ordering, + }, + ) + # If we're persisting an unredacted event we go and ensure # that we mark any redactions that reference this event as # requiring censoring. diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 80a4bf95f2..498a136543 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -228,6 +228,30 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): return row return bool(row[0]), bool(row[1]) + @cached(max_entries=10000) + async def get_room_type(self, room_id: str) -> Optional[str]: + # TODO: Upsert room_stats_state on room creation / initial join. + return await self.db_pool.simple_select_one_onecol( + table="room_stats_state", + keyvalues={"room_id": room_id}, + retcol="room_type", + allow_none=True, + desc="get_room_type", + ) + + @cachedList(cached_method_name="get_room_type", list_name="room_ids") + async def bulk_get_room_type( + self, room_ids: StrCollection + ) -> Mapping[str, Optional[str]]: + rows = await self.db_pool.simple_select_many_batch( + table="room_stats_state", + column="room_id", + iterable=room_ids, + retcols=("room_id", "room_type"), + desc="bulk_get_room_type", + ) + return dict(rows) + async def get_room_with_stats(self, room_id: str) -> Optional[RoomStats]: """Retrieve room with statistics. diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 640ab123f0..2e0e6afac5 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -385,7 +385,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): """ return await self.get_rooms_for_local_user_where_membership_is( - user_id, [Membership.INVITE] + user_id, (Membership.INVITE,) ) async def get_knocked_at_rooms_for_local_user( @@ -401,7 +401,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): """ return await self.get_rooms_for_local_user_where_membership_is( - user_id, [Membership.KNOCK] + user_id, (Membership.KNOCK,) ) async def get_invite_for_local_user_in_room( diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index b034361aec..395a1f46af 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -50,6 +50,7 @@ from typing import ( Dict, Iterable, List, + Mapping, Optional, Set, Tuple, @@ -78,8 +79,13 @@ from synapse.storage.database import ( from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.util.id_generators import MultiWriterIdGenerator -from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection -from synapse.util.caches.descriptors import cached +from synapse.types import ( + JsonDict, + PersistedEventPosition, + RoomStreamToken, + StrCollection, +) +from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter @@ -611,6 +617,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): self._stream_order_on_start = self.get_room_max_stream_ordering() self._min_stream_order_on_start = self.get_room_min_stream_ordering() + database.updates.register_background_update_handler( + "sliding_sync_room_metadata", self._sliding_sync_room_metadata_bg_update + ) + def get_room_max_stream_ordering(self) -> int: """Get the stream_ordering of regular events that we have committed up to @@ -1186,6 +1196,52 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return None + @cachedList( + cached_method_name="get_max_stream_ordering_in_room", + list_name="room_ids", + ) + async def get_max_stream_ordering_in_rooms( + self, room_ids: StrCollection + ) -> Mapping[str, Optional[PersistedEventPosition]]: + """Get the positions for the latest event in a room. + + A batched version of `get_max_stream_ordering_in_room`. + """ + rows = await self.db_pool.simple_select_many_batch( + table="sliding_sync_room_metadata", + column="room_id", + iterable=room_ids, + retcols=("room_id", "instance_name", "last_stream_ordering"), + desc="get_max_stream_ordering_in_rooms", + ) + + return { + room_id: PersistedEventPosition(instance_name, stream) + for room_id, instance_name, stream in rows + } + + @cached(max_entries=10000) + async def get_max_stream_ordering_in_room( + self, + room_id: str, + ) -> Optional[PersistedEventPosition]: + """Get the position for the latest event in a room. + + Note: this may be after the current token for the room stream on this + process (e.g. due to replication lag) + """ + row = await self.db_pool.simple_select_one( + table="sliding_sync_room_metadata", + retcols=("instance_name", "last_stream_ordering"), + keyvalues={"room_id": room_id}, + allow_none=True, + desc="get_max_stream_ordering_in_room", + ) + if not row: + return None + + return PersistedEventPosition(instance_name=row[0], stream=row[1]) + async def get_last_event_pos_in_room_before_stream_ordering( self, room_id: str, @@ -2104,3 +2160,88 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return RoomStreamToken(stream=last_position.stream - 1) return None + + async def _sliding_sync_room_metadata_bg_update( + self, progress: JsonDict, batch_size: int + ) -> int: + """Background update to fill out 'sliding_sync_room_metadata' table""" + previous_room = progress.get("previous_room", "") + + def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int: + # Both these queries are just getting the most recent + # instance_name/stream ordering for the next N rooms. + if isinstance(self.database_engine, PostgresEngine): + sql = """ + SELECT room_id, instance_name, stream_ordering FROM rooms AS r, + LATERAL ( + SELECT instance_name, stream_ordering + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ) e + WHERE r.room_id > ? + ORDER BY r.room_id ASC + LIMIT ? + """ + else: + sql = """ + SELECT + room_id, + ( + SELECT instance_name + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ), + ( + SELECT stream_ordering + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ) + FROM rooms AS r + WHERE r.room_id > ? + ORDER BY r.room_id ASC + LIMIT ? + """ + + txn.execute(sql, (previous_room, batch_size)) + rows = txn.fetchall() + if not rows: + return 0 + + self.db_pool.simple_upsert_many_txn( + txn, + table="sliding_sync_room_metadata", + key_names=("room_id",), + key_values=[(room_id,) for room_id, _, _ in rows], + value_names=( + "instance_name", + "last_stream_ordering", + ), + value_values=[ + ( + instance_name or "master", + stream, + ) + for _, instance_name, stream in rows + ], + ) + + self.db_pool.updates._background_update_progress_txn( + txn, "sliding_sync_room_metadata", {"previous_room": rows[-1][0]} + ) + + return len(rows) + + rows = await self.db_pool.runInteraction( + "_sliding_sync_room_metadata_bg_update", + _sliding_sync_room_metadata_bg_update_txn, + ) + + if rows == 0: + await self.db_pool.updates._end_background_update( + "sliding_sync_room_metadata" + ) + + return rows diff --git a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql new file mode 100644 index 0000000000..e8bc33ff40 --- /dev/null +++ b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql @@ -0,0 +1,24 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- <https://www.gnu.org/licenses/agpl-3.0.html>. + +-- A table that maps from room ID to metadata useful for sliding sync. +CREATE TABLE sliding_sync_room_metadata ( + room_id TEXT NOT NULL PRIMARY KEY, + + -- The instance_name / stream ordering of the last event in the room. + instance_name TEXT NOT NULL, + last_stream_ordering BIGINT NOT NULL +); + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8507, 'sliding_sync_room_metadata', '{}'); diff --git a/tests/rest/client/test_account.py b/tests/rest/client/test_account.py index a85ea994de..77caab2489 100644 --- a/tests/rest/client/test_account.py +++ b/tests/rest/client/test_account.py @@ -535,7 +535,7 @@ class DeactivateTestCase(unittest.HomeserverTestCase): # Check that the membership of @invitee:test in the room is now "leave". memberships = self.get_success( store.get_rooms_for_local_user_where_membership_is( - invitee_id, [Membership.LEAVE] + invitee_id, (Membership.LEAVE,) ) ) self.assertEqual(len(memberships), 1, memberships) diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index c4e216c308..037bbca1ba 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -440,6 +440,7 @@ class EventChainStoreTestCase(HomeserverTestCase): assert persist_events_store is not None persist_events_store._store_event_txn( txn, + events[0].room_id, [ (e, EventContext(self.hs.get_storage_controllers(), {})) for e in events diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py index 418b556108..e2f19e25e3 100644 --- a/tests/storage/test_roommember.py +++ b/tests/storage/test_roommember.py @@ -68,7 +68,7 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase): rooms_for_user = self.get_success( self.store.get_rooms_for_local_user_where_membership_is( - self.u_alice, [Membership.JOIN] + self.u_alice, (Membership.JOIN,) ) ) |