diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
index 674dd4fb54..77aafa492e 100644
--- a/tests/handlers/test_sync.py
+++ b/tests/handlers/test_sync.py
@@ -210,7 +210,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
)
# Blow away caches (supported room versions can only change due to a restart).
- self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
self.store.get_rooms_for_user.invalidate_all()
self.store._get_event_cache.clear()
self.store._event_ref.clear()
diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py
index a56f1e2d5d..1afe523d02 100644
--- a/tests/replication/storage/test_events.py
+++ b/tests/replication/storage/test_events.py
@@ -30,19 +30,16 @@ from synapse.api.constants import ReceiptTypes
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, make_event_from_dict
from synapse.events.snapshot import EventContext
-from synapse.handlers.room import RoomEventSource
from synapse.server import HomeServer
from synapse.storage.databases.main.event_push_actions import (
NotifCounts,
RoomNotifCounts,
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
-from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser
+from synapse.storage.roommember import RoomsForUser
from synapse.types import PersistedEventPosition
from synapse.util import Clock
-from tests.server import FakeTransport
-
from ._base import BaseWorkerStoreTestCase
USER_ID = "@feeling:test"
@@ -221,125 +218,6 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
),
)
- def test_get_rooms_for_user_with_stream_ordering(self) -> None:
- """Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated
- by rows in the events stream
- """
- self.persist(type="m.room.create", key="", creator=USER_ID)
- self.persist(type="m.room.member", key=USER_ID, membership="join")
- self.replicate()
- self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
-
- j2 = self.persist(
- type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
- )
- assert j2.internal_metadata.instance_name is not None
- assert j2.internal_metadata.stream_ordering is not None
- self.replicate()
-
- expected_pos = PersistedEventPosition(
- j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering
- )
- self.check(
- "get_rooms_for_user_with_stream_ordering",
- (USER_ID_2,),
- {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
- )
-
- def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(
- self,
- ) -> None:
- """Check that current_state invalidation happens correctly with multiple events
- in the persistence batch.
-
- This test attempts to reproduce a race condition between the event persistence
- loop and a worker-based Sync handler.
-
- The problem occurred when the master persisted several events in one batch. It
- only updates the current_state at the end of each batch, so the obvious thing
- to do is then to issue a current_state_delta stream update corresponding to the
- last stream_id in the batch.
-
- However, that raises the possibility that a worker will see the replication
- notification for a join event before the current_state caches are invalidated.
-
- The test involves:
- * creating a join and a message event for a user, and persisting them in the
- same batch
-
- * controlling the replication stream so that updates are sent gradually
-
- * between each bunch of replication updates, check that we see a consistent
- snapshot of the state.
- """
- self.persist(type="m.room.create", key="", creator=USER_ID)
- self.persist(type="m.room.member", key=USER_ID, membership="join")
- self.replicate()
- self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
-
- # limit the replication rate
- repl_transport = self._server_transport
- assert isinstance(repl_transport, FakeTransport)
- repl_transport.autoflush = False
-
- # build the join and message events and persist them in the same batch.
- logger.info("----- build test events ------")
- j2, j2ctx = self.build_event(
- type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
- )
- msg, msgctx = self.build_event()
- self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)]))
- self.replicate()
- assert j2.internal_metadata.instance_name is not None
- assert j2.internal_metadata.stream_ordering is not None
-
- event_source = RoomEventSource(self.hs)
- event_source.store = self.worker_store
- current_token = event_source.get_current_key()
-
- # gradually stream out the replication
- while repl_transport.buffer:
- logger.info("------ flush ------")
- repl_transport.flush(30)
- self.pump(0)
-
- prev_token = current_token
- current_token = event_source.get_current_key()
-
- # attempt to replicate the behaviour of the sync handler.
- #
- # First, we get a list of the rooms we are joined to
- joined_rooms = self.get_success(
- self.worker_store.get_rooms_for_user_with_stream_ordering(USER_ID_2)
- )
-
- # Then, we get a list of the events since the last sync
- membership_changes = self.get_success(
- self.worker_store.get_membership_changes_for_user(
- USER_ID_2, prev_token, current_token
- )
- )
-
- logger.info(
- "%s->%s: joined_rooms=%r membership_changes=%r",
- prev_token,
- current_token,
- joined_rooms,
- membership_changes,
- )
-
- # the membership change is only any use to us if the room is in the
- # joined_rooms list.
- if membership_changes:
- expected_pos = PersistedEventPosition(
- j2.internal_metadata.instance_name,
- j2.internal_metadata.stream_ordering,
- )
- self.assertEqual(
- joined_rooms,
- {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
- )
-
event_id = 0
def persist(self, backfill: bool = False, **kwargs: Any) -> EventBase:
|