diff options
-rw-r--r-- | synapse/handlers/sliding_sync.py | 34 | ||||
-rw-r--r-- | synapse/storage/databases/main/stream.py | 25 |
2 files changed, 28 insertions, 31 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 818b13621c..0d4d764483 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1006,36 +1006,10 @@ class SlidingSyncHandler: A sorted list of room IDs by `stream_ordering` along with membership information. """ - # Assemble a map of room ID to the `stream_ordering` of the last activity that the - # user should see in the room (<= `to_token`) - last_activity_in_room_map: Dict[str, int] = {} - for room_id, room_for_user in sync_room_map.items(): - # If they are fully-joined to the room, let's find the latest activity - # at/before the `to_token`. - if room_for_user.membership == Membership.JOIN: - last_event_result = ( - await self.store.get_last_event_pos_in_room_before_stream_ordering( - room_id, to_token.room_key - ) - ) - - # If the room has no events at/before the `to_token`, this is probably a - # mistake in the code that generates the `sync_room_map` since that should - # only give us rooms that the user had membership in during the token range. - assert last_event_result is not None - - _, event_pos = last_event_result - - last_activity_in_room_map[room_id] = event_pos.stream - else: - # Otherwise, if the user has left/been invited/knocked/been banned from - # a room, they shouldn't see anything past that point. - # - # FIXME: It's possible that people should see beyond this point in - # invited/knocked cases if for example the room has - # `invite`/`world_readable` history visibility, see - # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 - last_activity_in_room_map[room_id] = room_for_user.event_pos.stream + # XXX: FIXUP + last_activity_in_room_map = await self.store.rough_get_last_pos( + sync_room_map.keys() + ) return sorted( sync_room_map.values(), diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index e74e0d2e91..dccae56608 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -78,7 +78,7 @@ from synapse.storage.database import ( from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.util.id_generators import MultiWriterIdGenerator -from synapse.types import PersistedEventPosition, RoomStreamToken +from synapse.types import PersistedEventPosition, RoomStreamToken, StrSequence from synapse.util.caches.descriptors import cached from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable @@ -1185,6 +1185,29 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return None + + async def rough_get_last_pos(self, room_ids: StrSequence) -> Dict[str, int]: + def rough_get_last_pos_Txn( + txn: LoggingTransaction, + ) -> Dict[str, int]: + clause, args = make_in_list_sql_clause(self.database_engine, "room_id", room_ids) + sql = f""" + SELECT room_id, MAX(stream_ordering) FROM events + WHERE {clause} + GROUP BY room_id + """ + + txn.execute(sql, (args,)) + + return { + room_id: stream_ordering for room_id, stream_ordering in txn + } + + return await self.db_pool.runInteraction( + "rough_get_last_pos", + rough_get_last_pos_Txn, + ) + async def get_last_event_pos_in_room_before_stream_ordering( self, room_id: str, |