summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-07-08 20:30:23 +0100
committerGitHub <noreply@github.com>2024-07-08 20:30:23 +0100
commit8cdd2d214e9bbf8995ff4920140804ebcea6497e (patch)
treee4eb4b9dfc4419f74704bbc3023c38fe426dd2b3
parentAdd `rooms.bump_stamp` to Sliding Sync `/sync` for easier client-side sorting... (diff)
downloadsynapse-8cdd2d214e9bbf8995ff4920140804ebcea6497e.tar.xz
Fix bug in sliding sync when using old DB. (#17398)
We don't necessarily have `instance_name` for old events (before we
support multiple event persisters). We treat those as if the
`instance_name` was "master".

---------

Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
-rw-r--r--changelog.d/17398.bugfix1
-rw-r--r--synapse/storage/_base.py6
-rw-r--r--synapse/storage/databases/main/cache.py10
-rw-r--r--synapse/storage/databases/main/events_worker.py3
-rw-r--r--synapse/storage/databases/main/roommember.py67
-rw-r--r--synapse/storage/databases/main/stream.py33
-rw-r--r--tests/handlers/test_sync.py1
-rw-r--r--tests/replication/storage/test_events.py124
8 files changed, 33 insertions, 212 deletions
diff --git a/changelog.d/17398.bugfix b/changelog.d/17398.bugfix
new file mode 100644
index 0000000000..7931c431ef
--- /dev/null
+++ b/changelog.d/17398.bugfix
@@ -0,0 +1 @@
+Fix bug in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint when using an old database.
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index b127289d8d..881888fa93 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -119,9 +119,6 @@ class SQLBaseStore(metaclass=ABCMeta):
             self._attempt_to_invalidate_cache(
                 "get_user_in_room_with_profile", (room_id, user_id)
             )
-            self._attempt_to_invalidate_cache(
-                "get_rooms_for_user_with_stream_ordering", (user_id,)
-            )
             self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,))
 
         # Purge other caches based on room state.
@@ -148,9 +145,6 @@ class SQLBaseStore(metaclass=ABCMeta):
         self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
         self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
         self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
-        self._attempt_to_invalidate_cache(
-            "get_rooms_for_user_with_stream_ordering", None
-        )
         self._attempt_to_invalidate_cache("get_rooms_for_user", None)
         self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
 
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index c6787faea0..2d6b75e47e 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -268,16 +268,12 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
             self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token)  # type: ignore[attr-defined]
 
             if data.type == EventTypes.Member:
-                self.get_rooms_for_user_with_stream_ordering.invalidate(  # type: ignore[attr-defined]
-                    (data.state_key,)
-                )
                 self.get_rooms_for_user.invalidate((data.state_key,))  # type: ignore[attr-defined]
         elif row.type == EventsStreamAllStateRow.TypeId:
             assert isinstance(data, EventsStreamAllStateRow)
             # Similar to the above, but the entire caches are invalidated. This is
             # unfortunate for the membership caches, but should recover quickly.
             self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token)  # type: ignore[attr-defined]
-            self.get_rooms_for_user_with_stream_ordering.invalidate_all()  # type: ignore[attr-defined]
             self.get_rooms_for_user.invalidate_all()  # type: ignore[attr-defined]
         else:
             raise Exception("Unknown events stream row type %s" % (row.type,))
@@ -334,9 +330,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
             self._attempt_to_invalidate_cache(
                 "get_invited_rooms_for_local_user", (state_key,)
             )
-            self._attempt_to_invalidate_cache(
-                "get_rooms_for_user_with_stream_ordering", (state_key,)
-            )
             self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,))
 
             self._attempt_to_invalidate_cache(
@@ -399,9 +392,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         self._attempt_to_invalidate_cache("get_thread_id", None)
         self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
         self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
-        self._attempt_to_invalidate_cache(
-            "get_rooms_for_user_with_stream_ordering", None
-        )
         self._attempt_to_invalidate_cache("get_rooms_for_user", None)
         self._attempt_to_invalidate_cache("did_forget", None)
         self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index a5acea8c3b..4d4877c4c3 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1457,7 +1457,8 @@ class EventsWorkerStore(SQLBaseStore):
                 event_dict[event_id] = _EventRow(
                     event_id=event_id,
                     stream_ordering=row[1],
-                    instance_name=row[2],
+                    # If instance_name is null we default to "master"
+                    instance_name=row[2] or "master",
                     internal_metadata=row[3],
                     json=row[4],
                     format_version=row[5],
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index d8b54dc4e3..5d2fd08495 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -50,12 +50,7 @@ from synapse.storage.database import (
 from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.engines import Sqlite3Engine
-from synapse.storage.roommember import (
-    GetRoomsForUserWithStreamOrdering,
-    MemberSummary,
-    ProfileInfo,
-    RoomsForUser,
-)
+from synapse.storage.roommember import MemberSummary, ProfileInfo, RoomsForUser
 from synapse.types import (
     JsonDict,
     PersistedEventPosition,
@@ -494,7 +489,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
                 sender=sender,
                 membership=membership,
                 event_id=event_id,
-                event_pos=PersistedEventPosition(instance_name, stream_ordering),
+                event_pos=PersistedEventPosition(
+                    # If instance_name is null we default to "master"
+                    instance_name or "master",
+                    stream_ordering,
+                ),
                 room_version_id=room_version,
             )
             for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn
@@ -606,53 +605,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
 
         return results
 
-    @cached(max_entries=500000, iterable=True)
-    async def get_rooms_for_user_with_stream_ordering(
-        self, user_id: str
-    ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
-        """Returns a set of room_ids the user is currently joined to.
-
-        If a remote user only returns rooms this server is currently
-        participating in.
-
-        Args:
-            user_id
-
-        Returns:
-            Returns the rooms the user is in currently, along with the stream
-            ordering of the most recent join for that user and room, along with
-            the room version of the room.
-        """
-        return await self.db_pool.runInteraction(
-            "get_rooms_for_user_with_stream_ordering",
-            self._get_rooms_for_user_with_stream_ordering_txn,
-            user_id,
-        )
-
-    def _get_rooms_for_user_with_stream_ordering_txn(
-        self, txn: LoggingTransaction, user_id: str
-    ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
-        # We use `current_state_events` here and not `local_current_membership`
-        # as a) this gets called with remote users and b) this only gets called
-        # for rooms the server is participating in.
-        sql = """
-            SELECT room_id, e.instance_name, e.stream_ordering
-            FROM current_state_events AS c
-            INNER JOIN events AS e USING (room_id, event_id)
-            WHERE
-                c.type = 'm.room.member'
-                AND c.state_key = ?
-                AND c.membership = ?
-        """
-
-        txn.execute(sql, (user_id, Membership.JOIN))
-        return frozenset(
-            GetRoomsForUserWithStreamOrdering(
-                room_id, PersistedEventPosition(instance, stream_id)
-            )
-            for room_id, instance, stream_id in txn
-        )
-
     async def get_users_server_still_shares_room_with(
         self, user_ids: Collection[str]
     ) -> Set[str]:
@@ -701,13 +653,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
         If a remote user only returns rooms this server is currently
         participating in.
         """
-        rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(
-            (user_id,),
-            None,
-            update_metrics=False,
-        )
-        if rooms:
-            return frozenset(r.room_id for r in rooms)
 
         room_ids = await self.db_pool.simple_select_onecol(
             table="current_state_events",
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index be81025355..e74e0d2e91 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -371,7 +371,7 @@ def _make_generic_sql_bound(
 def _filter_results(
     lower_token: Optional[RoomStreamToken],
     upper_token: Optional[RoomStreamToken],
-    instance_name: str,
+    instance_name: Optional[str],
     topological_ordering: int,
     stream_ordering: int,
 ) -> bool:
@@ -384,8 +384,14 @@ def _filter_results(
     position maps, which we handle by fetching more than necessary from the DB
     and then filtering (rather than attempting to construct a complicated SQL
     query).
+
+    The `instance_name` arg is optional to handle historic rows, and is
+    interpreted as if it was "master".
     """
 
+    if instance_name is None:
+        instance_name = "master"
+
     event_historical_tuple = (
         topological_ordering,
         stream_ordering,
@@ -420,7 +426,7 @@ def _filter_results(
 def _filter_results_by_stream(
     lower_token: Optional[RoomStreamToken],
     upper_token: Optional[RoomStreamToken],
-    instance_name: str,
+    instance_name: Optional[str],
     stream_ordering: int,
 ) -> bool:
     """
@@ -436,7 +442,14 @@ def _filter_results_by_stream(
     position maps, which we handle by fetching more than necessary from the DB
     and then filtering (rather than attempting to construct a complicated SQL
     query).
+
+    The `instance_name` arg is optional to handle historic rows, and is
+    interpreted as if it was "master".
     """
+
+    if instance_name is None:
+        instance_name = "master"
+
     if lower_token:
         assert lower_token.topological is None
 
@@ -912,7 +925,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 prev_sender,
             ) in txn:
                 assert room_id is not None
-                assert instance_name is not None
                 assert stream_ordering is not None
 
                 if _filter_results_by_stream(
@@ -936,7 +948,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                         # Event
                         event_id=event_id,
                         event_pos=PersistedEventPosition(
-                            instance_name=instance_name,
+                            # If instance_name is null we default to "master"
+                            instance_name=instance_name or "master",
                             stream=stream_ordering,
                         ),
                         # When `s.event_id = null`, we won't be able to get respective
@@ -952,13 +965,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                         prev_event_id=prev_event_id,
                         prev_event_pos=(
                             PersistedEventPosition(
-                                instance_name=prev_instance_name,
+                                # If instance_name is null we default to "master"
+                                instance_name=prev_instance_name or "master",
                                 stream=prev_stream_ordering,
                             )
-                            if (
-                                prev_instance_name is not None
-                                and prev_stream_ordering is not None
-                            )
+                            if (prev_stream_ordering is not None)
                             else None
                         ),
                         prev_membership=prev_membership,
@@ -1270,7 +1281,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                     stream_ordering=stream_ordering,
                 ):
                     return event_id, PersistedEventPosition(
-                        instance_name, stream_ordering
+                        # If instance_name is null we default to "master"
+                        instance_name or "master",
+                        stream_ordering,
                     )
 
             return None
diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
index 674dd4fb54..77aafa492e 100644
--- a/tests/handlers/test_sync.py
+++ b/tests/handlers/test_sync.py
@@ -210,7 +210,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
             )
 
         # Blow away caches (supported room versions can only change due to a restart).
-        self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
         self.store.get_rooms_for_user.invalidate_all()
         self.store._get_event_cache.clear()
         self.store._event_ref.clear()
diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py
index a56f1e2d5d..1afe523d02 100644
--- a/tests/replication/storage/test_events.py
+++ b/tests/replication/storage/test_events.py
@@ -30,19 +30,16 @@ from synapse.api.constants import ReceiptTypes
 from synapse.api.room_versions import RoomVersions
 from synapse.events import EventBase, make_event_from_dict
 from synapse.events.snapshot import EventContext
-from synapse.handlers.room import RoomEventSource
 from synapse.server import HomeServer
 from synapse.storage.databases.main.event_push_actions import (
     NotifCounts,
     RoomNotifCounts,
 )
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
-from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser
+from synapse.storage.roommember import RoomsForUser
 from synapse.types import PersistedEventPosition
 from synapse.util import Clock
 
-from tests.server import FakeTransport
-
 from ._base import BaseWorkerStoreTestCase
 
 USER_ID = "@feeling:test"
@@ -221,125 +218,6 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
             ),
         )
 
-    def test_get_rooms_for_user_with_stream_ordering(self) -> None:
-        """Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated
-        by rows in the events stream
-        """
-        self.persist(type="m.room.create", key="", creator=USER_ID)
-        self.persist(type="m.room.member", key=USER_ID, membership="join")
-        self.replicate()
-        self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
-
-        j2 = self.persist(
-            type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
-        )
-        assert j2.internal_metadata.instance_name is not None
-        assert j2.internal_metadata.stream_ordering is not None
-        self.replicate()
-
-        expected_pos = PersistedEventPosition(
-            j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering
-        )
-        self.check(
-            "get_rooms_for_user_with_stream_ordering",
-            (USER_ID_2,),
-            {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
-        )
-
-    def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(
-        self,
-    ) -> None:
-        """Check that current_state invalidation happens correctly with multiple events
-        in the persistence batch.
-
-        This test attempts to reproduce a race condition between the event persistence
-        loop and a worker-based Sync handler.
-
-        The problem occurred when the master persisted several events in one batch. It
-        only updates the current_state at the end of each batch, so the obvious thing
-        to do is then to issue a current_state_delta stream update corresponding to the
-        last stream_id in the batch.
-
-        However, that raises the possibility that a worker will see the replication
-        notification for a join event before the current_state caches are invalidated.
-
-        The test involves:
-         * creating a join and a message event for a user, and persisting them in the
-           same batch
-
-         * controlling the replication stream so that updates are sent gradually
-
-         * between each bunch of replication updates, check that we see a consistent
-           snapshot of the state.
-        """
-        self.persist(type="m.room.create", key="", creator=USER_ID)
-        self.persist(type="m.room.member", key=USER_ID, membership="join")
-        self.replicate()
-        self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
-
-        # limit the replication rate
-        repl_transport = self._server_transport
-        assert isinstance(repl_transport, FakeTransport)
-        repl_transport.autoflush = False
-
-        # build the join and message events and persist them in the same batch.
-        logger.info("----- build test events ------")
-        j2, j2ctx = self.build_event(
-            type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
-        )
-        msg, msgctx = self.build_event()
-        self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)]))
-        self.replicate()
-        assert j2.internal_metadata.instance_name is not None
-        assert j2.internal_metadata.stream_ordering is not None
-
-        event_source = RoomEventSource(self.hs)
-        event_source.store = self.worker_store
-        current_token = event_source.get_current_key()
-
-        # gradually stream out the replication
-        while repl_transport.buffer:
-            logger.info("------ flush ------")
-            repl_transport.flush(30)
-            self.pump(0)
-
-            prev_token = current_token
-            current_token = event_source.get_current_key()
-
-            # attempt to replicate the behaviour of the sync handler.
-            #
-            # First, we get a list of the rooms we are joined to
-            joined_rooms = self.get_success(
-                self.worker_store.get_rooms_for_user_with_stream_ordering(USER_ID_2)
-            )
-
-            # Then, we get a list of the events since the last sync
-            membership_changes = self.get_success(
-                self.worker_store.get_membership_changes_for_user(
-                    USER_ID_2, prev_token, current_token
-                )
-            )
-
-            logger.info(
-                "%s->%s: joined_rooms=%r membership_changes=%r",
-                prev_token,
-                current_token,
-                joined_rooms,
-                membership_changes,
-            )
-
-            # the membership change is only any use to us if the room is in the
-            # joined_rooms list.
-            if membership_changes:
-                expected_pos = PersistedEventPosition(
-                    j2.internal_metadata.instance_name,
-                    j2.internal_metadata.stream_ordering,
-                )
-                self.assertEqual(
-                    joined_rooms,
-                    {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
-                )
-
     event_id = 0
 
     def persist(self, backfill: bool = False, **kwargs: Any) -> EventBase: