summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/sliding_sync.py114
-rw-r--r--synapse/rest/client/sync.py15
-rw-r--r--synapse/server_notices/server_notices_manager.py4
-rw-r--r--synapse/storage/databases/main/cache.py3
-rw-r--r--synapse/storage/databases/main/events.py24
-rw-r--r--synapse/storage/databases/main/room.py24
-rw-r--r--synapse/storage/databases/main/roommember.py4
-rw-r--r--synapse/storage/databases/main/stream.py145
-rw-r--r--synapse/storage/schema/main/delta/85/07_sliding_sync.sql24
-rw-r--r--tests/rest/client/test_account.py2
-rw-r--r--tests/storage/test_event_chain.py1
-rw-r--r--tests/storage/test_roommember.py2
12 files changed, 317 insertions, 45 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index c0e5192f00..4f8e6a0f8d 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -444,6 +444,7 @@ class SlidingSyncHandler:
 
         return result
 
+    @trace
     async def current_sync_for_user(
         self,
         sync_config: SlidingSyncConfig,
@@ -504,9 +505,14 @@ class SlidingSyncHandler:
             for list_key, list_config in sync_config.lists.items():
                 # Apply filters
                 filtered_sync_room_map = sync_room_map
-                if list_config.filters is not None:
+
+                if list_config.filters:
+
                     filtered_sync_room_map = await self.filter_rooms(
-                        sync_config.user, sync_room_map, list_config.filters, to_token
+                        sync_config.user,
+                        filtered_sync_room_map,
+                        list_config.filters,
+                        to_token,
                     )
 
                 # Sort the list
@@ -616,6 +622,30 @@ class SlidingSyncHandler:
                 else:
                     relevant_room_map[room_id] = room_sync_config
 
+        # Filter out rooms that haven't received updates and we've sent down
+        # previously.
+        if from_token:
+            rooms_should_send = set()
+            for room_id in relevant_room_map:
+                status = await self.connection_store.have_sent_room(
+                    sync_config,
+                    from_token.connection_position,
+                    room_id,
+                )
+                if status.status != HaveSentRoomFlag.LIVE:
+                    rooms_should_send.add(room_id)
+
+            # TODO: Also check current state delta stream
+            rooms_that_have_updates = (
+                self.store._events_stream_cache.get_entities_changed(
+                    relevant_room_map, from_token.stream_token.room_key.stream
+                )
+            )
+            rooms_should_send.update(rooms_that_have_updates)
+            relevant_room_map = {
+                r: c for r, c in relevant_room_map.items() if r in rooms_should_send
+            }
+
         # Fetch room data
         rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
 
@@ -726,13 +756,12 @@ class SlidingSyncHandler:
 
         # First grab a current snapshot rooms for the user
         # (also handles forgotten rooms)
-        room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is(
-            user_id=user_id,
-            # We want to fetch any kind of membership (joined and left rooms) in order
-            # to get the `event_pos` of the latest room membership event for the
-            # user.
-            membership_list=Membership.LIST,
-            excluded_rooms=self.rooms_to_exclude_globally,
+        room_for_user_list = (
+            await self.store.get_rooms_for_local_user_where_membership_is(
+                user_id=user_id,
+                membership_list=Membership.LIST,
+                excluded_rooms=self.rooms_to_exclude_globally,
+            )
         )
 
         # If the user has never joined any rooms before, we can just return an empty list
@@ -1143,6 +1172,7 @@ class SlidingSyncHandler:
 
         # return None
 
+    @trace
     async def filter_rooms(
         self,
         user: UserID,
@@ -1266,6 +1296,7 @@ class SlidingSyncHandler:
         # Assemble a new sync room map but only with the `filtered_room_id_set`
         return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
 
+    @trace
     async def sort_rooms(
         self,
         sync_room_map: Dict[str, _RoomMembershipForUser],
@@ -1509,6 +1540,10 @@ class SlidingSyncHandler:
                     room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
                 )
 
+            fiddled_timeline_limit = room_sync_config.timeline_limit
+            # if to_bound:
+            #     fiddled_timeline_limit = max(fiddled_timeline_limit, 10)
+
             timeline_events, new_room_key = await self.store.paginate_room_events(
                 room_id=room_id,
                 # The bounds are reversed so we can paginate backwards
@@ -1519,7 +1554,7 @@ class SlidingSyncHandler:
                 direction=Direction.BACKWARDS,
                 # We add one so we can determine if there are enough events to saturate
                 # the limit or not (see `limited`)
-                limit=room_sync_config.timeline_limit + 1,
+                limit=fiddled_timeline_limit + 1,
                 event_filter=None,
             )
 
@@ -1530,11 +1565,11 @@ class SlidingSyncHandler:
             # Determine our `limited` status based on the timeline. We do this before
             # filtering the events so we can accurately determine if there is more to
             # paginate even if we filter out some/all events.
-            if len(timeline_events) > room_sync_config.timeline_limit:
+            if len(timeline_events) > fiddled_timeline_limit:
                 limited = True
                 # Get rid of that extra "+ 1" event because we only used it to determine
                 # if we hit the limit or not
-                timeline_events = timeline_events[-room_sync_config.timeline_limit :]
+                timeline_events = timeline_events[-fiddled_timeline_limit:]
                 assert timeline_events[0].internal_metadata.stream_ordering
                 new_room_key = RoomStreamToken(
                     stream=timeline_events[0].internal_metadata.stream_ordering - 1
@@ -1826,24 +1861,36 @@ class SlidingSyncHandler:
                 )
 
         # Figure out the last bump event in the room
-        last_bump_event_result = (
-            await self.store.get_last_event_pos_in_room_before_stream_ordering(
-                room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
+        bump_stamp = None
+        if timeline_events:
+            for e in reversed(timeline_events):
+                if (
+                    e.type in DEFAULT_BUMP_EVENT_TYPES
+                    and e.internal_metadata.stream_ordering > 0
+                ):
+                    bump_stamp = e.internal_metadata.stream_ordering
+                    break
+
+        if bump_stamp is None:
+            # By default, just choose the membership event position
+            bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
+
+            last_bump_event_result = (
+                await self.store.get_last_event_pos_in_room_before_stream_ordering(
+                    room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
+                )
             )
-        )
 
-        # By default, just choose the membership event position
-        bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
-        # But if we found a bump event, use that instead
-        if last_bump_event_result is not None:
-            _, new_bump_event_pos = last_bump_event_result
+            # But if we found a bump event, use that instead
+            if last_bump_event_result is not None:
+                _, new_bump_event_pos = last_bump_event_result
 
-            # If we've just joined a remote room, then the last bump event may
-            # have been backfilled (and so have a negative stream ordering).
-            # These negative stream orderings can't sensibly be compared, so
-            # instead we use the membership event position.
-            if new_bump_event_pos.stream > 0:
-                bump_stamp = new_bump_event_pos.stream
+                # If we've just joined a remote room, then the last bump event may
+                # have been backfilled (and so have a negative stream ordering).
+                # These negative stream orderings can't sensibly be compared, so
+                # instead we use the membership event position.
+                if new_bump_event_pos.stream > 0:
+                    bump_stamp = new_bump_event_pos.stream
 
         return SlidingSyncResult.RoomResult(
             name=room_name,
@@ -1978,12 +2025,13 @@ class SlidingSyncHandler:
                 up_to_stream_id=since_stream_id,
             )
 
-            logger.debug(
-                "Deleted %d to-device messages up to %d for %s",
-                deleted,
-                since_stream_id,
-                user_id,
-            )
+            if deleted:
+                logger.debug(
+                    "Deleted %d to-device messages up to %d for %s",
+                    deleted,
+                    since_stream_id,
+                    user_id,
+                )
 
         messages, stream_id = await self.store.get_messages_for_device(
             user_id=user_id,
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index bf3ac8d483..fcb054b970 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -49,10 +49,13 @@ from synapse.http.servlet import (
     parse_and_validate_json_object_from_request,
     parse_boolean,
     parse_integer,
+    parse_json_object_from_request,
+    parse_json_value_from_request,
     parse_string,
+    validate_json_object,
 )
 from synapse.http.site import SynapseRequest
-from synapse.logging.opentracing import trace_with_opname
+from synapse.logging.opentracing import log_kv, set_tag, trace_with_opname
 from synapse.rest.admin.experimental_features import ExperimentalFeature
 from synapse.types import JsonDict, Requester, SlidingSyncStreamToken, StreamToken
 from synapse.types.rest.client import SlidingSyncBody
@@ -896,8 +899,14 @@ class SlidingSyncRestServlet(RestServlet):
         # maybe some filters like sync v2  where they are built up once and referenced
         # by filter ID. For now, we will just prototype with always passing everything
         # in.
-        body = parse_and_validate_json_object_from_request(request, SlidingSyncBody)
+        content = parse_json_object_from_request(request, allow_empty_body=False)
+        body = validate_json_object(content, SlidingSyncBody)
         logger.info("Sliding sync request: %r", body)
+        logger.info("Sliding sync json: %r", content)
+        log_kv({"request_body": body})
+
+        if body.lists:
+            set_tag("sliding_sync.lists", True)
 
         sync_config = SlidingSyncConfig(
             user=user,
@@ -985,7 +994,7 @@ class SlidingSyncRestServlet(RestServlet):
         serialized_rooms: Dict[str, JsonDict] = {}
         for room_id, room_result in rooms.items():
             serialized_rooms[room_id] = {
-                "bump_stamp": room_result.bump_stamp,
+                "bump_stamp": abs(room_result.bump_stamp),
                 "joined_count": room_result.joined_count,
                 "invited_count": room_result.invited_count,
                 "notification_count": room_result.notification_count,
diff --git a/synapse/server_notices/server_notices_manager.py b/synapse/server_notices/server_notices_manager.py
index 001a290e87..e9cdc628d5 100644
--- a/synapse/server_notices/server_notices_manager.py
+++ b/synapse/server_notices/server_notices_manager.py
@@ -114,7 +114,7 @@ class ServerNoticesManager:
             return None
 
         rooms = await self._store.get_rooms_for_local_user_where_membership_is(
-            user_id, [Membership.INVITE, Membership.JOIN]
+            user_id, (Membership.INVITE, Membership.JOIN)
         )
         for room in rooms:
             # it's worth noting that there is an asymmetry here in that we
@@ -262,7 +262,7 @@ class ServerNoticesManager:
         # Check whether the user has already joined or been invited to this room. If
         # that's the case, there is no need to re-invite them.
         joined_rooms = await self._store.get_rooms_for_local_user_where_membership_is(
-            user_id, [Membership.INVITE, Membership.JOIN]
+            user_id, (Membership.INVITE, Membership.JOIN)
         )
         for room in joined_rooms:
             if room.room_id == room_id:
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 26b8e1a172..8c2c0c5ab0 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -309,6 +309,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
 
         if not backfilled:
             self._events_stream_cache.entity_has_changed(room_id, stream_ordering)  # type: ignore[attr-defined]
+            self._attempt_to_invalidate_cache(
+                "get_max_stream_ordering_in_room", (room_id,)
+            )
 
         if redacts:
             self._invalidate_local_get_event_cache(redacts)  # type: ignore[attr-defined]
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f7acdb859..0c7c2f9306 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -551,7 +551,7 @@ class PersistEventsStore:
         # From this point onwards the events are only events that we haven't
         # seen before.
 
-        self._store_event_txn(txn, events_and_contexts=events_and_contexts)
+        self._store_event_txn(txn, room_id, events_and_contexts=events_and_contexts)
 
         if new_forward_extremities:
             self._update_forward_extremities_txn(
@@ -1555,6 +1555,7 @@ class PersistEventsStore:
     def _store_event_txn(
         self,
         txn: LoggingTransaction,
+        room_id: str,
         events_and_contexts: Collection[Tuple[EventBase, EventContext]],
     ) -> None:
         """Insert new events into the event, event_json, redaction and
@@ -1629,6 +1630,27 @@ class PersistEventsStore:
             ],
         )
 
+        # Update the `sliding_sync_room_metadata` with the latest
+        # (non-backfilled, ie positive) stream ordering.
+        #
+        # We know this list is sorted and non-empty, so we just take the last
+        # one event.
+        max_stream_ordering: int
+        for e, _ in events_and_contexts:
+            assert e.internal_metadata.stream_ordering is not None
+            max_stream_ordering = e.internal_metadata.stream_ordering
+
+        if max_stream_ordering > 0:
+            self.db_pool.simple_upsert_txn(
+                txn,
+                table="sliding_sync_room_metadata",
+                keyvalues={"room_id": room_id},
+                values={
+                    "instance_name": self._instance_name,
+                    "last_stream_ordering": max_stream_ordering,
+                },
+            )
+
         # If we're persisting an unredacted event we go and ensure
         # that we mark any redactions that reference this event as
         # requiring censoring.
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 80a4bf95f2..498a136543 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -228,6 +228,30 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
             return row
         return bool(row[0]), bool(row[1])
 
+    @cached(max_entries=10000)
+    async def get_room_type(self, room_id: str) -> Optional[str]:
+        # TODO: Upsert room_stats_state on room creation / initial join.
+        return await self.db_pool.simple_select_one_onecol(
+            table="room_stats_state",
+            keyvalues={"room_id": room_id},
+            retcol="room_type",
+            allow_none=True,
+            desc="get_room_type",
+        )
+
+    @cachedList(cached_method_name="get_room_type", list_name="room_ids")
+    async def bulk_get_room_type(
+        self, room_ids: StrCollection
+    ) -> Mapping[str, Optional[str]]:
+        rows = await self.db_pool.simple_select_many_batch(
+            table="room_stats_state",
+            column="room_id",
+            iterable=room_ids,
+            retcols=("room_id", "room_type"),
+            desc="bulk_get_room_type",
+        )
+        return dict(rows)
+
     async def get_room_with_stats(self, room_id: str) -> Optional[RoomStats]:
         """Retrieve room with statistics.
 
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 640ab123f0..2e0e6afac5 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -385,7 +385,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
         """
 
         return await self.get_rooms_for_local_user_where_membership_is(
-            user_id, [Membership.INVITE]
+            user_id, (Membership.INVITE,)
         )
 
     async def get_knocked_at_rooms_for_local_user(
@@ -401,7 +401,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
         """
 
         return await self.get_rooms_for_local_user_where_membership_is(
-            user_id, [Membership.KNOCK]
+            user_id, (Membership.KNOCK,)
         )
 
     async def get_invite_for_local_user_in_room(
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index b034361aec..395a1f46af 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -50,6 +50,7 @@ from typing import (
     Dict,
     Iterable,
     List,
+    Mapping,
     Optional,
     Set,
     Tuple,
@@ -78,8 +79,13 @@ from synapse.storage.database import (
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
-from synapse.util.caches.descriptors import cached
+from synapse.types import (
+    JsonDict,
+    PersistedEventPosition,
+    RoomStreamToken,
+    StrCollection,
+)
+from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.cancellation import cancellable
 from synapse.util.iterutils import batch_iter
@@ -611,6 +617,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         self._stream_order_on_start = self.get_room_max_stream_ordering()
         self._min_stream_order_on_start = self.get_room_min_stream_ordering()
 
+        database.updates.register_background_update_handler(
+            "sliding_sync_room_metadata", self._sliding_sync_room_metadata_bg_update
+        )
+
     def get_room_max_stream_ordering(self) -> int:
         """Get the stream_ordering of regular events that we have committed up to
 
@@ -1186,6 +1196,52 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return None
 
+    @cachedList(
+        cached_method_name="get_max_stream_ordering_in_room",
+        list_name="room_ids",
+    )
+    async def get_max_stream_ordering_in_rooms(
+        self, room_ids: StrCollection
+    ) -> Mapping[str, Optional[PersistedEventPosition]]:
+        """Get the positions for the latest event in a room.
+
+        A batched version of `get_max_stream_ordering_in_room`.
+        """
+        rows = await self.db_pool.simple_select_many_batch(
+            table="sliding_sync_room_metadata",
+            column="room_id",
+            iterable=room_ids,
+            retcols=("room_id", "instance_name", "last_stream_ordering"),
+            desc="get_max_stream_ordering_in_rooms",
+        )
+
+        return {
+            room_id: PersistedEventPosition(instance_name, stream)
+            for room_id, instance_name, stream in rows
+        }
+
+    @cached(max_entries=10000)
+    async def get_max_stream_ordering_in_room(
+        self,
+        room_id: str,
+    ) -> Optional[PersistedEventPosition]:
+        """Get the position for the latest event in a room.
+
+        Note: this may be after the current token for the room stream on this
+        process (e.g. due to replication lag)
+        """
+        row = await self.db_pool.simple_select_one(
+            table="sliding_sync_room_metadata",
+            retcols=("instance_name", "last_stream_ordering"),
+            keyvalues={"room_id": room_id},
+            allow_none=True,
+            desc="get_max_stream_ordering_in_room",
+        )
+        if not row:
+            return None
+
+        return PersistedEventPosition(instance_name=row[0], stream=row[1])
+
     async def get_last_event_pos_in_room_before_stream_ordering(
         self,
         room_id: str,
@@ -2104,3 +2160,88 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             return RoomStreamToken(stream=last_position.stream - 1)
 
         return None
+
+    async def _sliding_sync_room_metadata_bg_update(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Background update to fill out 'sliding_sync_room_metadata' table"""
+        previous_room = progress.get("previous_room", "")
+
+        def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int:
+            # Both these queries are just getting the most recent
+            # instance_name/stream ordering for the next N rooms.
+            if isinstance(self.database_engine, PostgresEngine):
+                sql = """
+                    SELECT room_id, instance_name, stream_ordering FROM rooms AS r,
+                    LATERAL (
+                        SELECT instance_name, stream_ordering
+                        FROM events WHERE events.room_id = r.room_id
+                        ORDER BY stream_ordering DESC
+                        LIMIT 1
+                    ) e
+                    WHERE r.room_id > ?
+                    ORDER BY r.room_id ASC
+                    LIMIT ?
+                """
+            else:
+                sql = """
+                    SELECT
+                        room_id,
+                        (
+                            SELECT instance_name
+                            FROM events WHERE events.room_id = r.room_id
+                            ORDER BY stream_ordering DESC
+                            LIMIT 1
+                        ),
+                        (
+                            SELECT stream_ordering
+                            FROM events WHERE events.room_id = r.room_id
+                            ORDER BY stream_ordering DESC
+                            LIMIT 1
+                        )
+                    FROM rooms AS r
+                    WHERE r.room_id > ?
+                    ORDER BY r.room_id ASC
+                    LIMIT ?
+                """
+
+            txn.execute(sql, (previous_room, batch_size))
+            rows = txn.fetchall()
+            if not rows:
+                return 0
+
+            self.db_pool.simple_upsert_many_txn(
+                txn,
+                table="sliding_sync_room_metadata",
+                key_names=("room_id",),
+                key_values=[(room_id,) for room_id, _, _ in rows],
+                value_names=(
+                    "instance_name",
+                    "last_stream_ordering",
+                ),
+                value_values=[
+                    (
+                        instance_name or "master",
+                        stream,
+                    )
+                    for _, instance_name, stream in rows
+                ],
+            )
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn, "sliding_sync_room_metadata", {"previous_room": rows[-1][0]}
+            )
+
+            return len(rows)
+
+        rows = await self.db_pool.runInteraction(
+            "_sliding_sync_room_metadata_bg_update",
+            _sliding_sync_room_metadata_bg_update_txn,
+        )
+
+        if rows == 0:
+            await self.db_pool.updates._end_background_update(
+                "sliding_sync_room_metadata"
+            )
+
+        return rows
diff --git a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
new file mode 100644
index 0000000000..e8bc33ff40
--- /dev/null
+++ b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
@@ -0,0 +1,24 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+-- A table that maps from room ID to metadata useful for sliding sync.
+CREATE TABLE sliding_sync_room_metadata (
+    room_id TEXT NOT NULL PRIMARY KEY,
+
+    -- The instance_name / stream ordering of the last event in the room.
+    instance_name TEXT NOT NULL,
+    last_stream_ordering BIGINT NOT NULL
+);
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+    (8507, 'sliding_sync_room_metadata', '{}');
diff --git a/tests/rest/client/test_account.py b/tests/rest/client/test_account.py
index a85ea994de..77caab2489 100644
--- a/tests/rest/client/test_account.py
+++ b/tests/rest/client/test_account.py
@@ -535,7 +535,7 @@ class DeactivateTestCase(unittest.HomeserverTestCase):
         # Check that the membership of @invitee:test in the room is now "leave".
         memberships = self.get_success(
             store.get_rooms_for_local_user_where_membership_is(
-                invitee_id, [Membership.LEAVE]
+                invitee_id, (Membership.LEAVE,)
             )
         )
         self.assertEqual(len(memberships), 1, memberships)
diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py
index c4e216c308..037bbca1ba 100644
--- a/tests/storage/test_event_chain.py
+++ b/tests/storage/test_event_chain.py
@@ -440,6 +440,7 @@ class EventChainStoreTestCase(HomeserverTestCase):
             assert persist_events_store is not None
             persist_events_store._store_event_txn(
                 txn,
+                events[0].room_id,
                 [
                     (e, EventContext(self.hs.get_storage_controllers(), {}))
                     for e in events
diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py
index 418b556108..e2f19e25e3 100644
--- a/tests/storage/test_roommember.py
+++ b/tests/storage/test_roommember.py
@@ -68,7 +68,7 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
 
         rooms_for_user = self.get_success(
             self.store.get_rooms_for_local_user_where_membership_is(
-                self.u_alice, [Membership.JOIN]
+                self.u_alice, (Membership.JOIN,)
             )
         )