summary refs log tree commit diff
path: root/tests/replication/slave
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2023-05-16 15:56:38 -0400
committerGitHub <noreply@github.com>2023-05-16 15:56:38 -0400
commit375b0a8a119bb925ca280f050a25a931662fcbb5 (patch)
tree29e8aa545c741d9dc295e3d687b9bc2341852578 /tests/replication/slave
parentRun mypy type checking with the minimum supported Python version (#15602) (diff)
downloadsynapse-375b0a8a119bb925ca280f050a25a931662fcbb5.tar.xz
Update code to refer to "workers". (#15606)
A bunch of comments and variables are out of date and use
obsolete terms.
Diffstat (limited to 'tests/replication/slave')
-rw-r--r--tests/replication/slave/__init__.py13
-rw-r--r--tests/replication/slave/storage/__init__.py13
-rw-r--r--tests/replication/slave/storage/_base.py72
-rw-r--r--tests/replication/slave/storage/test_events.py420
4 files changed, 0 insertions, 518 deletions
diff --git a/tests/replication/slave/__init__.py b/tests/replication/slave/__init__.py
deleted file mode 100644
index f43a360a80..0000000000
--- a/tests/replication/slave/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/tests/replication/slave/storage/__init__.py b/tests/replication/slave/storage/__init__.py
deleted file mode 100644
index f43a360a80..0000000000
--- a/tests/replication/slave/storage/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py
deleted file mode 100644
index 4c9b494344..0000000000
--- a/tests/replication/slave/storage/_base.py
+++ /dev/null
@@ -1,72 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import Any, Iterable, Optional
-from unittest.mock import Mock
-
-from twisted.test.proto_helpers import MemoryReactor
-
-from synapse.server import HomeServer
-from synapse.util import Clock
-
-from tests.replication._base import BaseStreamTestCase
-
-
-class BaseSlavedStoreTestCase(BaseStreamTestCase):
-    def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
-        return self.setup_test_homeserver(federation_client=Mock())
-
-    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
-        super().prepare(reactor, clock, hs)
-
-        self.reconnect()
-
-        self.master_store = hs.get_datastores().main
-        self.slaved_store = self.worker_hs.get_datastores().main
-        persistence = hs.get_storage_controllers().persistence
-        assert persistence is not None
-        self.persistance = persistence
-
-    def replicate(self) -> None:
-        """Tell the master side of replication that something has happened, and then
-        wait for the replication to occur.
-        """
-        self.streamer.on_notifier_poke()
-        self.pump(0.1)
-
-    def check(
-        self, method: str, args: Iterable[Any], expected_result: Optional[Any] = None
-    ) -> None:
-        master_result = self.get_success(getattr(self.master_store, method)(*args))
-        slaved_result = self.get_success(getattr(self.slaved_store, method)(*args))
-        if expected_result is not None:
-            self.assertEqual(
-                master_result,
-                expected_result,
-                "Expected master result to be %r but was %r"
-                % (expected_result, master_result),
-            )
-            self.assertEqual(
-                slaved_result,
-                expected_result,
-                "Expected slave result to be %r but was %r"
-                % (expected_result, slaved_result),
-            )
-        self.assertEqual(
-            master_result,
-            slaved_result,
-            "Slave result %r does not match master result %r"
-            % (slaved_result, master_result),
-        )
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
deleted file mode 100644
index b2125b1fea..0000000000
--- a/tests/replication/slave/storage/test_events.py
+++ /dev/null
@@ -1,420 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import logging
-from typing import Any, Callable, Iterable, List, Optional, Tuple
-
-from canonicaljson import encode_canonical_json
-from parameterized import parameterized
-
-from twisted.test.proto_helpers import MemoryReactor
-
-from synapse.api.constants import ReceiptTypes
-from synapse.api.room_versions import RoomVersions
-from synapse.events import EventBase, _EventInternalMetadata, make_event_from_dict
-from synapse.events.snapshot import EventContext
-from synapse.handlers.room import RoomEventSource
-from synapse.server import HomeServer
-from synapse.storage.databases.main.event_push_actions import (
-    NotifCounts,
-    RoomNotifCounts,
-)
-from synapse.storage.databases.main.events_worker import EventsWorkerStore
-from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser
-from synapse.types import PersistedEventPosition
-from synapse.util import Clock
-
-from tests.server import FakeTransport
-
-from ._base import BaseSlavedStoreTestCase
-
-USER_ID = "@feeling:test"
-USER_ID_2 = "@bright:test"
-OUTLIER = {"outlier": True}
-ROOM_ID = "!room:test"
-
-logger = logging.getLogger(__name__)
-
-
-def dict_equals(self: EventBase, other: EventBase) -> bool:
-    me = encode_canonical_json(self.get_pdu_json())
-    them = encode_canonical_json(other.get_pdu_json())
-    return me == them
-
-
-def patch__eq__(cls: object) -> Callable[[], None]:
-    eq = getattr(cls, "__eq__", None)
-    cls.__eq__ = dict_equals  # type: ignore[assignment]
-
-    def unpatch() -> None:
-        if eq is not None:
-            cls.__eq__ = eq  # type: ignore[assignment]
-
-    return unpatch
-
-
-class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase):
-    STORE_TYPE = EventsWorkerStore
-
-    def setUp(self) -> None:
-        # Patch up the equality operator for events so that we can check
-        # whether lists of events match using assertEqual
-        self.unpatches = [patch__eq__(_EventInternalMetadata), patch__eq__(EventBase)]
-        super().setUp()
-
-    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
-        super().prepare(reactor, clock, hs)
-
-        self.get_success(
-            self.master_store.store_room(
-                ROOM_ID,
-                USER_ID,
-                is_public=False,
-                room_version=RoomVersions.V1,
-            )
-        )
-
-    def tearDown(self) -> None:
-        [unpatch() for unpatch in self.unpatches]
-
-    def test_get_latest_event_ids_in_room(self) -> None:
-        create = self.persist(type="m.room.create", key="", creator=USER_ID)
-        self.replicate()
-        self.check("get_latest_event_ids_in_room", (ROOM_ID,), [create.event_id])
-
-        join = self.persist(
-            type="m.room.member",
-            key=USER_ID,
-            membership="join",
-            prev_events=[(create.event_id, {})],
-        )
-        self.replicate()
-        self.check("get_latest_event_ids_in_room", (ROOM_ID,), [join.event_id])
-
-    def test_redactions(self) -> None:
-        self.persist(type="m.room.create", key="", creator=USER_ID)
-        self.persist(type="m.room.member", key=USER_ID, membership="join")
-
-        msg = self.persist(type="m.room.message", msgtype="m.text", body="Hello")
-        self.replicate()
-        self.check("get_event", [msg.event_id], msg)
-
-        redaction = self.persist(type="m.room.redaction", redacts=msg.event_id)
-        self.replicate()
-
-        msg_dict = msg.get_dict()
-        msg_dict["content"] = {}
-        msg_dict["unsigned"]["redacted_by"] = redaction.event_id
-        msg_dict["unsigned"]["redacted_because"] = redaction
-        redacted = make_event_from_dict(
-            msg_dict, internal_metadata_dict=msg.internal_metadata.get_dict()
-        )
-        self.check("get_event", [msg.event_id], redacted)
-
-    def test_backfilled_redactions(self) -> None:
-        self.persist(type="m.room.create", key="", creator=USER_ID)
-        self.persist(type="m.room.member", key=USER_ID, membership="join")
-
-        msg = self.persist(type="m.room.message", msgtype="m.text", body="Hello")
-        self.replicate()
-        self.check("get_event", [msg.event_id], msg)
-
-        redaction = self.persist(
-            type="m.room.redaction", redacts=msg.event_id, backfill=True
-        )
-        self.replicate()
-
-        msg_dict = msg.get_dict()
-        msg_dict["content"] = {}
-        msg_dict["unsigned"]["redacted_by"] = redaction.event_id
-        msg_dict["unsigned"]["redacted_because"] = redaction
-        redacted = make_event_from_dict(
-            msg_dict, internal_metadata_dict=msg.internal_metadata.get_dict()
-        )
-        self.check("get_event", [msg.event_id], redacted)
-
-    def test_invites(self) -> None:
-        self.persist(type="m.room.create", key="", creator=USER_ID)
-        self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
-        event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
-        assert event.internal_metadata.stream_ordering is not None
-
-        self.replicate()
-
-        self.check(
-            "get_invited_rooms_for_local_user",
-            [USER_ID_2],
-            [
-                RoomsForUser(
-                    ROOM_ID,
-                    USER_ID,
-                    "invite",
-                    event.event_id,
-                    event.internal_metadata.stream_ordering,
-                    RoomVersions.V1.identifier,
-                )
-            ],
-        )
-
-    @parameterized.expand([(True,), (False,)])
-    def test_push_actions_for_user(self, send_receipt: bool) -> None:
-        self.persist(type="m.room.create", key="", creator=USER_ID)
-        self.persist(type="m.room.member", key=USER_ID, membership="join")
-        self.persist(
-            type="m.room.member", sender=USER_ID, key=USER_ID_2, membership="join"
-        )
-        event1 = self.persist(type="m.room.message", msgtype="m.text", body="hello")
-        self.replicate()
-
-        if send_receipt:
-            self.get_success(
-                self.master_store.insert_receipt(
-                    ROOM_ID, ReceiptTypes.READ, USER_ID_2, [event1.event_id], None, {}
-                )
-            )
-
-        self.check(
-            "get_unread_event_push_actions_by_room_for_user",
-            [ROOM_ID, USER_ID_2],
-            RoomNotifCounts(
-                NotifCounts(highlight_count=0, unread_count=0, notify_count=0), {}
-            ),
-        )
-
-        self.persist(
-            type="m.room.message",
-            msgtype="m.text",
-            body="world",
-            push_actions=[(USER_ID_2, ["notify"])],
-        )
-        self.replicate()
-        self.check(
-            "get_unread_event_push_actions_by_room_for_user",
-            [ROOM_ID, USER_ID_2],
-            RoomNotifCounts(
-                NotifCounts(highlight_count=0, unread_count=0, notify_count=1), {}
-            ),
-        )
-
-        self.persist(
-            type="m.room.message",
-            msgtype="m.text",
-            body="world",
-            push_actions=[
-                (USER_ID_2, ["notify", {"set_tweak": "highlight", "value": True}])
-            ],
-        )
-        self.replicate()
-        self.check(
-            "get_unread_event_push_actions_by_room_for_user",
-            [ROOM_ID, USER_ID_2],
-            RoomNotifCounts(
-                NotifCounts(highlight_count=1, unread_count=0, notify_count=2), {}
-            ),
-        )
-
-    def test_get_rooms_for_user_with_stream_ordering(self) -> None:
-        """Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated
-        by rows in the events stream
-        """
-        self.persist(type="m.room.create", key="", creator=USER_ID)
-        self.persist(type="m.room.member", key=USER_ID, membership="join")
-        self.replicate()
-        self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
-
-        j2 = self.persist(
-            type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
-        )
-        assert j2.internal_metadata.stream_ordering is not None
-        self.replicate()
-
-        expected_pos = PersistedEventPosition(
-            "master", j2.internal_metadata.stream_ordering
-        )
-        self.check(
-            "get_rooms_for_user_with_stream_ordering",
-            (USER_ID_2,),
-            {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
-        )
-
-    def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(
-        self,
-    ) -> None:
-        """Check that current_state invalidation happens correctly with multiple events
-        in the persistence batch.
-
-        This test attempts to reproduce a race condition between the event persistence
-        loop and a worker-based Sync handler.
-
-        The problem occurred when the master persisted several events in one batch. It
-        only updates the current_state at the end of each batch, so the obvious thing
-        to do is then to issue a current_state_delta stream update corresponding to the
-        last stream_id in the batch.
-
-        However, that raises the possibility that a worker will see the replication
-        notification for a join event before the current_state caches are invalidated.
-
-        The test involves:
-         * creating a join and a message event for a user, and persisting them in the
-           same batch
-
-         * controlling the replication stream so that updates are sent gradually
-
-         * between each bunch of replication updates, check that we see a consistent
-           snapshot of the state.
-        """
-        self.persist(type="m.room.create", key="", creator=USER_ID)
-        self.persist(type="m.room.member", key=USER_ID, membership="join")
-        self.replicate()
-        self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
-
-        # limit the replication rate
-        repl_transport = self._server_transport
-        assert isinstance(repl_transport, FakeTransport)
-        repl_transport.autoflush = False
-
-        # build the join and message events and persist them in the same batch.
-        logger.info("----- build test events ------")
-        j2, j2ctx = self.build_event(
-            type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
-        )
-        msg, msgctx = self.build_event()
-        self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)]))
-        self.replicate()
-        assert j2.internal_metadata.stream_ordering is not None
-
-        event_source = RoomEventSource(self.hs)
-        event_source.store = self.slaved_store
-        current_token = event_source.get_current_key()
-
-        # gradually stream out the replication
-        while repl_transport.buffer:
-            logger.info("------ flush ------")
-            repl_transport.flush(30)
-            self.pump(0)
-
-            prev_token = current_token
-            current_token = event_source.get_current_key()
-
-            # attempt to replicate the behaviour of the sync handler.
-            #
-            # First, we get a list of the rooms we are joined to
-            joined_rooms = self.get_success(
-                self.slaved_store.get_rooms_for_user_with_stream_ordering(USER_ID_2)
-            )
-
-            # Then, we get a list of the events since the last sync
-            membership_changes = self.get_success(
-                self.slaved_store.get_membership_changes_for_user(
-                    USER_ID_2, prev_token, current_token
-                )
-            )
-
-            logger.info(
-                "%s->%s: joined_rooms=%r membership_changes=%r",
-                prev_token,
-                current_token,
-                joined_rooms,
-                membership_changes,
-            )
-
-            # the membership change is only any use to us if the room is in the
-            # joined_rooms list.
-            if membership_changes:
-                expected_pos = PersistedEventPosition(
-                    "master", j2.internal_metadata.stream_ordering
-                )
-                self.assertEqual(
-                    joined_rooms,
-                    {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
-                )
-
-    event_id = 0
-
-    def persist(self, backfill: bool = False, **kwargs: Any) -> EventBase:
-        """
-        Returns:
-            The event that was persisted.
-        """
-        event, context = self.build_event(**kwargs)
-
-        if backfill:
-            self.get_success(
-                self.persistance.persist_events([(event, context)], backfilled=True)
-            )
-        else:
-            self.get_success(self.persistance.persist_event(event, context))
-
-        return event
-
-    def build_event(
-        self,
-        sender: str = USER_ID,
-        room_id: str = ROOM_ID,
-        type: str = "m.room.message",
-        key: Optional[str] = None,
-        internal: Optional[dict] = None,
-        depth: Optional[int] = None,
-        prev_events: Optional[List[Tuple[str, dict]]] = None,
-        auth_events: Optional[List[str]] = None,
-        prev_state: Optional[List[str]] = None,
-        redacts: Optional[str] = None,
-        push_actions: Iterable = frozenset(),
-        **content: object,
-    ) -> Tuple[EventBase, EventContext]:
-        prev_events = prev_events or []
-        auth_events = auth_events or []
-        prev_state = prev_state or []
-
-        if depth is None:
-            depth = self.event_id
-
-        if not prev_events:
-            latest_event_ids = self.get_success(
-                self.master_store.get_latest_event_ids_in_room(room_id)
-            )
-            prev_events = [(ev_id, {}) for ev_id in latest_event_ids]
-
-        event_dict = {
-            "sender": sender,
-            "type": type,
-            "content": content,
-            "event_id": "$%d:blue" % (self.event_id,),
-            "room_id": room_id,
-            "depth": depth,
-            "origin_server_ts": self.event_id,
-            "prev_events": prev_events,
-            "auth_events": auth_events,
-        }
-        if key is not None:
-            event_dict["state_key"] = key
-            event_dict["prev_state"] = prev_state
-
-        if redacts is not None:
-            event_dict["redacts"] = redacts
-
-        event = make_event_from_dict(event_dict, internal_metadata_dict=internal or {})
-
-        self.event_id += 1
-        state_handler = self.hs.get_state_handler()
-        context = self.get_success(state_handler.compute_event_context(event))
-
-        self.get_success(
-            self.master_store.add_push_actions_to_staging(
-                event.event_id,
-                dict(push_actions),
-                False,
-                "main",
-            )
-        )
-        return event, context