summary refs log tree commit diff
path: root/tests/storage/test_stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/storage/test_stream.py')
-rw-r--r--tests/storage/test_stream.py269
1 files changed, 267 insertions, 2 deletions
diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py
index 2029cd9c68..ee34baf46f 100644
--- a/tests/storage/test_stream.py
+++ b/tests/storage/test_stream.py
@@ -19,7 +19,10 @@
 #
 #
 
-from typing import List
+import logging
+from typing import List, Tuple
+
+from immutabledict import immutabledict
 
 from twisted.test.proto_helpers import MemoryReactor
 
@@ -28,11 +31,13 @@ from synapse.api.filtering import Filter
 from synapse.rest import admin
 from synapse.rest.client import login, room
 from synapse.server import HomeServer
-from synapse.types import JsonDict
+from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken
 from synapse.util import Clock
 
 from tests.unittest import HomeserverTestCase
 
+logger = logging.getLogger(__name__)
+
 
 class PaginationTestCase(HomeserverTestCase):
     """
@@ -268,3 +273,263 @@ class PaginationTestCase(HomeserverTestCase):
         }
         chunk = self._filter_messages(filter)
         self.assertEqual(chunk, [self.event_id_1, self.event_id_2, self.event_id_none])
+
+
+class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
+    """
+    Test `get_last_event_in_room_before_stream_ordering(...)`
+    """
+
+    servlets = [
+        admin.register_servlets,
+        room.register_servlets,
+        login.register_servlets,
+    ]
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+        self.event_sources = hs.get_event_sources()
+
+    def _update_persisted_instance_name_for_event(
+        self, event_id: str, instance_name: str
+    ) -> None:
+        """
+        Update the `instance_name` that persisted the the event in the database.
+        """
+        return self.get_success(
+            self.store.db_pool.simple_update_one(
+                "events",
+                keyvalues={"event_id": event_id},
+                updatevalues={"instance_name": instance_name},
+            )
+        )
+
+    def _send_event_on_instance(
+        self, instance_name: str, room_id: str, access_token: str
+    ) -> Tuple[JsonDict, PersistedEventPosition]:
+        """
+        Send an event in a room and mimic that it was persisted by a specific
+        instance/worker.
+        """
+        event_response = self.helper.send(
+            room_id, f"{instance_name} message", tok=access_token
+        )
+
+        self._update_persisted_instance_name_for_event(
+            event_response["event_id"], instance_name
+        )
+
+        event_pos = self.get_success(
+            self.store.get_position_for_event(event_response["event_id"])
+        )
+
+        return event_response, event_pos
+
+    def test_before_room_created(self) -> None:
+        """
+        Test that no event is returned if we are using a token before the room was even created
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        before_room_token = self.event_sources.get_current_token()
+
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+
+        last_event = self.get_success(
+            self.store.get_last_event_in_room_before_stream_ordering(
+                room_id=room_id,
+                end_token=before_room_token.room_key,
+            )
+        )
+
+        self.assertIsNone(last_event)
+
+    def test_after_room_created(self) -> None:
+        """
+        Test that an event is returned if we are using a token after the room was created
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+
+        after_room_token = self.event_sources.get_current_token()
+
+        last_event = self.get_success(
+            self.store.get_last_event_in_room_before_stream_ordering(
+                room_id=room_id,
+                end_token=after_room_token.room_key,
+            )
+        )
+
+        self.assertIsNotNone(last_event)
+
+    def test_activity_in_other_rooms(self) -> None:
+        """
+        Test to make sure that the last event in the room is returned even if the
+        `stream_ordering` has advanced from activity in other rooms.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+        event_response = self.helper.send(room_id1, "target!", tok=user1_tok)
+        # Create another room to advance the stream_ordering
+        self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+
+        after_room_token = self.event_sources.get_current_token()
+
+        last_event = self.get_success(
+            self.store.get_last_event_in_room_before_stream_ordering(
+                room_id=room_id1,
+                end_token=after_room_token.room_key,
+            )
+        )
+
+        # Make sure it's the event we expect (which also means we know it's from the
+        # correct room)
+        self.assertEqual(last_event, event_response["event_id"])
+
+    def test_activity_after_token_has_no_effect(self) -> None:
+        """
+        Test to make sure we return the last event before the token even if there is
+        activity after it.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+        event_response = self.helper.send(room_id1, "target!", tok=user1_tok)
+
+        after_room_token = self.event_sources.get_current_token()
+
+        # Send some events after the token
+        self.helper.send(room_id1, "after1", tok=user1_tok)
+        self.helper.send(room_id1, "after2", tok=user1_tok)
+
+        last_event = self.get_success(
+            self.store.get_last_event_in_room_before_stream_ordering(
+                room_id=room_id1,
+                end_token=after_room_token.room_key,
+            )
+        )
+
+        # Make sure it's the last event before the token
+        self.assertEqual(last_event, event_response["event_id"])
+
+    def test_last_event_within_sharded_token(self) -> None:
+        """
+        Test to make sure we can find the last event that that is *within* the sharded
+        token (a token that has an `instance_map` and looks like
+        `m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). We are specifically testing
+        that we can find an event within the tokens minimum and instance
+        `stream_ordering`.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+        event_response1, event_pos1 = self._send_event_on_instance(
+            "worker1", room_id1, user1_tok
+        )
+        event_response2, event_pos2 = self._send_event_on_instance(
+            "worker1", room_id1, user1_tok
+        )
+        event_response3, event_pos3 = self._send_event_on_instance(
+            "worker1", room_id1, user1_tok
+        )
+
+        # Create another room to advance the `stream_ordering` on the same worker
+        # so we can sandwich event3 in the middle of the token
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+        event_response4, event_pos4 = self._send_event_on_instance(
+            "worker1", room_id2, user1_tok
+        )
+
+        # Assemble a token that encompasses event1 -> event4 on worker1
+        end_token = RoomStreamToken(
+            stream=event_pos2.stream,
+            instance_map=immutabledict({"worker1": event_pos4.stream}),
+        )
+
+        # Send some events after the token
+        self.helper.send(room_id1, "after1", tok=user1_tok)
+        self.helper.send(room_id1, "after2", tok=user1_tok)
+
+        last_event = self.get_success(
+            self.store.get_last_event_in_room_before_stream_ordering(
+                room_id=room_id1,
+                end_token=end_token,
+            )
+        )
+
+        # Should find closest event at/before the token in room1
+        self.assertEqual(
+            last_event,
+            event_response3["event_id"],
+            f"We expected {event_response3['event_id']} but saw {last_event} which corresponds to "
+            + str(
+                {
+                    "event1": event_response1["event_id"],
+                    "event2": event_response2["event_id"],
+                    "event3": event_response3["event_id"],
+                }
+            ),
+        )
+
+    def test_last_event_before_sharded_token(self) -> None:
+        """
+        Test to make sure we can find the last event that is *before* the sharded token
+        (a token that has an `instance_map` and looks like
+        `m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`).
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+        event_response1, event_pos1 = self._send_event_on_instance(
+            "worker1", room_id1, user1_tok
+        )
+        event_response2, event_pos2 = self._send_event_on_instance(
+            "worker1", room_id1, user1_tok
+        )
+
+        # Create another room to advance the `stream_ordering` on the same worker
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+        event_response3, event_pos3 = self._send_event_on_instance(
+            "worker1", room_id2, user1_tok
+        )
+        event_response4, event_pos4 = self._send_event_on_instance(
+            "worker1", room_id2, user1_tok
+        )
+
+        # Assemble a token that encompasses event3 -> event4 on worker1
+        end_token = RoomStreamToken(
+            stream=event_pos3.stream,
+            instance_map=immutabledict({"worker1": event_pos4.stream}),
+        )
+
+        # Send some events after the token
+        self.helper.send(room_id1, "after1", tok=user1_tok)
+        self.helper.send(room_id1, "after2", tok=user1_tok)
+
+        last_event = self.get_success(
+            self.store.get_last_event_in_room_before_stream_ordering(
+                room_id=room_id1,
+                end_token=end_token,
+            )
+        )
+
+        # Should find closest event at/before the token in room1
+        self.assertEqual(
+            last_event,
+            event_response2["event_id"],
+            f"We expected {event_response2['event_id']} but saw {last_event} which corresponds to "
+            + str(
+                {
+                    "event1": event_response1["event_id"],
+                    "event2": event_response2["event_id"],
+                }
+            ),
+        )