summary refs log tree commit diff
path: root/tests/replication/storage/test_events.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-07-08 20:30:23 +0100
committerGitHub <noreply@github.com>2024-07-08 20:30:23 +0100
commit8cdd2d214e9bbf8995ff4920140804ebcea6497e (patch)
treee4eb4b9dfc4419f74704bbc3023c38fe426dd2b3 /tests/replication/storage/test_events.py
parentAdd `rooms.bump_stamp` to Sliding Sync `/sync` for easier client-side sorting... (diff)
downloadsynapse-8cdd2d214e9bbf8995ff4920140804ebcea6497e.tar.xz
Fix bug in sliding sync when using old DB. (#17398)
We don't necessarily have `instance_name` for old events (before we
support multiple event persisters). We treat those as if the
`instance_name` was "master".

---------

Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
Diffstat (limited to '')
-rw-r--r--tests/replication/storage/test_events.py124
1 files changed, 1 insertions, 123 deletions
diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py
index a56f1e2d5d..1afe523d02 100644
--- a/tests/replication/storage/test_events.py
+++ b/tests/replication/storage/test_events.py
@@ -30,19 +30,16 @@ from synapse.api.constants import ReceiptTypes
 from synapse.api.room_versions import RoomVersions
 from synapse.events import EventBase, make_event_from_dict
 from synapse.events.snapshot import EventContext
-from synapse.handlers.room import RoomEventSource
 from synapse.server import HomeServer
 from synapse.storage.databases.main.event_push_actions import (
     NotifCounts,
     RoomNotifCounts,
 )
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
-from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser
+from synapse.storage.roommember import RoomsForUser
 from synapse.types import PersistedEventPosition
 from synapse.util import Clock
 
-from tests.server import FakeTransport
-
 from ._base import BaseWorkerStoreTestCase
 
 USER_ID = "@feeling:test"
@@ -221,125 +218,6 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
             ),
         )
 
-    def test_get_rooms_for_user_with_stream_ordering(self) -> None:
-        """Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated
-        by rows in the events stream
-        """
-        self.persist(type="m.room.create", key="", creator=USER_ID)
-        self.persist(type="m.room.member", key=USER_ID, membership="join")
-        self.replicate()
-        self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
-
-        j2 = self.persist(
-            type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
-        )
-        assert j2.internal_metadata.instance_name is not None
-        assert j2.internal_metadata.stream_ordering is not None
-        self.replicate()
-
-        expected_pos = PersistedEventPosition(
-            j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering
-        )
-        self.check(
-            "get_rooms_for_user_with_stream_ordering",
-            (USER_ID_2,),
-            {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
-        )
-
-    def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(
-        self,
-    ) -> None:
-        """Check that current_state invalidation happens correctly with multiple events
-        in the persistence batch.
-
-        This test attempts to reproduce a race condition between the event persistence
-        loop and a worker-based Sync handler.
-
-        The problem occurred when the master persisted several events in one batch. It
-        only updates the current_state at the end of each batch, so the obvious thing
-        to do is then to issue a current_state_delta stream update corresponding to the
-        last stream_id in the batch.
-
-        However, that raises the possibility that a worker will see the replication
-        notification for a join event before the current_state caches are invalidated.
-
-        The test involves:
-         * creating a join and a message event for a user, and persisting them in the
-           same batch
-
-         * controlling the replication stream so that updates are sent gradually
-
-         * between each bunch of replication updates, check that we see a consistent
-           snapshot of the state.
-        """
-        self.persist(type="m.room.create", key="", creator=USER_ID)
-        self.persist(type="m.room.member", key=USER_ID, membership="join")
-        self.replicate()
-        self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
-
-        # limit the replication rate
-        repl_transport = self._server_transport
-        assert isinstance(repl_transport, FakeTransport)
-        repl_transport.autoflush = False
-
-        # build the join and message events and persist them in the same batch.
-        logger.info("----- build test events ------")
-        j2, j2ctx = self.build_event(
-            type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
-        )
-        msg, msgctx = self.build_event()
-        self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)]))
-        self.replicate()
-        assert j2.internal_metadata.instance_name is not None
-        assert j2.internal_metadata.stream_ordering is not None
-
-        event_source = RoomEventSource(self.hs)
-        event_source.store = self.worker_store
-        current_token = event_source.get_current_key()
-
-        # gradually stream out the replication
-        while repl_transport.buffer:
-            logger.info("------ flush ------")
-            repl_transport.flush(30)
-            self.pump(0)
-
-            prev_token = current_token
-            current_token = event_source.get_current_key()
-
-            # attempt to replicate the behaviour of the sync handler.
-            #
-            # First, we get a list of the rooms we are joined to
-            joined_rooms = self.get_success(
-                self.worker_store.get_rooms_for_user_with_stream_ordering(USER_ID_2)
-            )
-
-            # Then, we get a list of the events since the last sync
-            membership_changes = self.get_success(
-                self.worker_store.get_membership_changes_for_user(
-                    USER_ID_2, prev_token, current_token
-                )
-            )
-
-            logger.info(
-                "%s->%s: joined_rooms=%r membership_changes=%r",
-                prev_token,
-                current_token,
-                joined_rooms,
-                membership_changes,
-            )
-
-            # the membership change is only any use to us if the room is in the
-            # joined_rooms list.
-            if membership_changes:
-                expected_pos = PersistedEventPosition(
-                    j2.internal_metadata.instance_name,
-                    j2.internal_metadata.stream_ordering,
-                )
-                self.assertEqual(
-                    joined_rooms,
-                    {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
-                )
-
     event_id = 0
 
     def persist(self, backfill: bool = False, **kwargs: Any) -> EventBase: