summary refs log tree commit diff
path: root/tests/storage
diff options
context:
space:
mode:
Diffstat (limited to 'tests/storage')
-rw-r--r--tests/storage/test_event_federation.py481
1 files changed, 477 insertions, 4 deletions
diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py
index a6679e1312..85739c464e 100644
--- a/tests/storage/test_event_federation.py
+++ b/tests/storage/test_event_federation.py
@@ -12,25 +12,38 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import Tuple, Union
+import datetime
+from typing import Dict, List, Tuple, Union
 
 import attr
 from parameterized import parameterized
 
+from twisted.test.proto_helpers import MemoryReactor
+
+from synapse.api.constants import EventTypes
 from synapse.api.room_versions import (
     KNOWN_ROOM_VERSIONS,
     EventFormatVersions,
     RoomVersion,
 )
 from synapse.events import _EventInternalMetadata
-from synapse.util import json_encoder
+from synapse.server import HomeServer
+from synapse.storage.database import LoggingTransaction
+from synapse.types import JsonDict
+from synapse.util import Clock, json_encoder
 
 import tests.unittest
 import tests.utils
 
 
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class _BackfillSetupInfo:
+    room_id: str
+    depth_map: Dict[str, int]
+
+
 class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
-    def prepare(self, reactor, clock, hs):
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
         self.store = hs.get_datastores().main
 
     def test_get_prev_events_for_room(self):
@@ -571,11 +584,471 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
         )
         self.assertEqual(count, 1)
 
-        _, event_id = self.get_success(
+        next_staged_event_info = self.get_success(
             self.store.get_next_staged_event_id_for_room(room_id)
         )
+        assert next_staged_event_info
+        _, event_id = next_staged_event_info
         self.assertEqual(event_id, "$fake_event_id_500")
 
+    def _setup_room_for_backfill_tests(self) -> _BackfillSetupInfo:
+        """
+        Sets up a room with various events and backward extremities to test
+        backfill functions against.
+
+        Returns:
+            _BackfillSetupInfo including the `room_id` to test against and
+            `depth_map` of events in the room
+        """
+        room_id = "!backfill-room-test:some-host"
+
+        # The silly graph we use to test grabbing backward extremities,
+        # where the top is the oldest events.
+        #    1 (oldest)
+        #    |
+        #    2 ⹁
+        #    |  \
+        #    |   [b1, b2, b3]
+        #    |   |
+        #    |   A
+        #    |  /
+        #    3 {
+        #    |  \
+        #    |   [b4, b5, b6]
+        #    |   |
+        #    |   B
+        #    |  /
+        #    4 ´
+        #    |
+        #    5 (newest)
+
+        event_graph: Dict[str, List[str]] = {
+            "1": [],
+            "2": ["1"],
+            "3": ["2", "A"],
+            "4": ["3", "B"],
+            "5": ["4"],
+            "A": ["b1", "b2", "b3"],
+            "b1": ["2"],
+            "b2": ["2"],
+            "b3": ["2"],
+            "B": ["b4", "b5", "b6"],
+            "b4": ["3"],
+            "b5": ["3"],
+            "b6": ["3"],
+        }
+
+        depth_map: Dict[str, int] = {
+            "1": 1,
+            "2": 2,
+            "b1": 3,
+            "b2": 3,
+            "b3": 3,
+            "A": 4,
+            "3": 5,
+            "b4": 6,
+            "b5": 6,
+            "b6": 6,
+            "B": 7,
+            "4": 8,
+            "5": 9,
+        }
+
+        # The events we have persisted on our server.
+        # The rest are events in the room but not backfilled tet.
+        our_server_events = {"5", "4", "B", "3", "A"}
+
+        complete_event_dict_map: Dict[str, JsonDict] = {}
+        stream_ordering = 0
+        for (event_id, prev_event_ids) in event_graph.items():
+            depth = depth_map[event_id]
+
+            complete_event_dict_map[event_id] = {
+                "event_id": event_id,
+                "type": "test_regular_type",
+                "room_id": room_id,
+                "sender": "@sender",
+                "prev_event_ids": prev_event_ids,
+                "auth_event_ids": [],
+                "origin_server_ts": stream_ordering,
+                "depth": depth,
+                "stream_ordering": stream_ordering,
+                "content": {"body": "event" + event_id},
+            }
+
+            stream_ordering += 1
+
+        def populate_db(txn: LoggingTransaction):
+            # Insert the room to satisfy the foreign key constraint of
+            # `event_failed_pull_attempts`
+            self.store.db_pool.simple_insert_txn(
+                txn,
+                "rooms",
+                {
+                    "room_id": room_id,
+                    "creator": "room_creator_user_id",
+                    "is_public": True,
+                    "room_version": "6",
+                },
+            )
+
+            # Insert our server events
+            for event_id in our_server_events:
+                event_dict = complete_event_dict_map[event_id]
+
+                self.store.db_pool.simple_insert_txn(
+                    txn,
+                    table="events",
+                    values={
+                        "event_id": event_dict.get("event_id"),
+                        "type": event_dict.get("type"),
+                        "room_id": event_dict.get("room_id"),
+                        "depth": event_dict.get("depth"),
+                        "topological_ordering": event_dict.get("depth"),
+                        "stream_ordering": event_dict.get("stream_ordering"),
+                        "processed": True,
+                        "outlier": False,
+                    },
+                )
+
+            # Insert the event edges
+            for event_id in our_server_events:
+                for prev_event_id in event_graph[event_id]:
+                    self.store.db_pool.simple_insert_txn(
+                        txn,
+                        table="event_edges",
+                        values={
+                            "event_id": event_id,
+                            "prev_event_id": prev_event_id,
+                            "room_id": room_id,
+                        },
+                    )
+
+            # Insert the backward extremities
+            prev_events_of_our_events = {
+                prev_event_id
+                for our_server_event in our_server_events
+                for prev_event_id in complete_event_dict_map[our_server_event][
+                    "prev_event_ids"
+                ]
+            }
+            backward_extremities = prev_events_of_our_events - our_server_events
+            for backward_extremity in backward_extremities:
+                self.store.db_pool.simple_insert_txn(
+                    txn,
+                    table="event_backward_extremities",
+                    values={
+                        "event_id": backward_extremity,
+                        "room_id": room_id,
+                    },
+                )
+
+        self.get_success(
+            self.store.db_pool.runInteraction(
+                "_setup_room_for_backfill_tests_populate_db",
+                populate_db,
+            )
+        )
+
+        return _BackfillSetupInfo(room_id=room_id, depth_map=depth_map)
+
+    def test_get_backfill_points_in_room(self):
+        """
+        Test to make sure we get some backfill points
+        """
+        setup_info = self._setup_room_for_backfill_tests()
+        room_id = setup_info.room_id
+
+        backfill_points = self.get_success(
+            self.store.get_backfill_points_in_room(room_id)
+        )
+        backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
+        self.assertListEqual(
+            backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"]
+        )
+
+    def test_get_backfill_points_in_room_excludes_events_we_have_attempted(
+        self,
+    ):
+        """
+        Test to make sure that events we have attempted to backfill (and within
+        backoff timeout duration) do not show up as an event to backfill again.
+        """
+        setup_info = self._setup_room_for_backfill_tests()
+        room_id = setup_info.room_id
+
+        # Record some attempts to backfill these events which will make
+        # `get_backfill_points_in_room` exclude them because we
+        # haven't passed the backoff interval.
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(room_id, "b5", "fake cause")
+        )
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(room_id, "b4", "fake cause")
+        )
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(room_id, "b3", "fake cause")
+        )
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(room_id, "b2", "fake cause")
+        )
+
+        # No time has passed since we attempted to backfill ^
+
+        backfill_points = self.get_success(
+            self.store.get_backfill_points_in_room(room_id)
+        )
+        backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
+        # Only the backfill points that we didn't record earlier exist here.
+        self.assertListEqual(backfill_event_ids, ["b6", "2", "b1"])
+
+    def test_get_backfill_points_in_room_attempted_event_retry_after_backoff_duration(
+        self,
+    ):
+        """
+        Test to make sure after we fake attempt to backfill event "b3" many times,
+        we can see retry and see the "b3" again after the backoff timeout duration
+        has exceeded.
+        """
+        setup_info = self._setup_room_for_backfill_tests()
+        room_id = setup_info.room_id
+
+        # Record some attempts to backfill these events which will make
+        # `get_backfill_points_in_room` exclude them because we
+        # haven't passed the backoff interval.
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(room_id, "b3", "fake cause")
+        )
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause")
+        )
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause")
+        )
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause")
+        )
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause")
+        )
+
+        # Now advance time by 2 hours and we should only be able to see "b3"
+        # because we have waited long enough for the single attempt (2^1 hours)
+        # but we still shouldn't see "b1" because we haven't waited long enough
+        # for this many attempts. We didn't do anything to "b2" so it should be
+        # visible regardless.
+        self.reactor.advance(datetime.timedelta(hours=2).total_seconds())
+
+        # Make sure that "b1" is not in the list because we've
+        # already attempted many times
+        backfill_points = self.get_success(
+            self.store.get_backfill_points_in_room(room_id)
+        )
+        backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
+        self.assertListEqual(backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2"])
+
+        # Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
+        # see if we can now backfill it
+        self.reactor.advance(datetime.timedelta(hours=20).total_seconds())
+
+        # Try again after we advanced enough time and we should see "b3" again
+        backfill_points = self.get_success(
+            self.store.get_backfill_points_in_room(room_id)
+        )
+        backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
+        self.assertListEqual(
+            backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"]
+        )
+
+    def _setup_room_for_insertion_backfill_tests(self) -> _BackfillSetupInfo:
+        """
+        Sets up a room with various insertion event backward extremities to test
+        backfill functions against.
+
+        Returns:
+            _BackfillSetupInfo including the `room_id` to test against and
+            `depth_map` of events in the room
+        """
+        room_id = "!backfill-room-test:some-host"
+
+        depth_map: Dict[str, int] = {
+            "1": 1,
+            "2": 2,
+            "insertion_eventA": 3,
+            "3": 4,
+            "insertion_eventB": 5,
+            "4": 6,
+            "5": 7,
+        }
+
+        def populate_db(txn: LoggingTransaction):
+            # Insert the room to satisfy the foreign key constraint of
+            # `event_failed_pull_attempts`
+            self.store.db_pool.simple_insert_txn(
+                txn,
+                "rooms",
+                {
+                    "room_id": room_id,
+                    "creator": "room_creator_user_id",
+                    "is_public": True,
+                    "room_version": "6",
+                },
+            )
+
+            # Insert our server events
+            stream_ordering = 0
+            for event_id, depth in depth_map.items():
+                self.store.db_pool.simple_insert_txn(
+                    txn,
+                    table="events",
+                    values={
+                        "event_id": event_id,
+                        "type": EventTypes.MSC2716_INSERTION
+                        if event_id.startswith("insertion_event")
+                        else "test_regular_type",
+                        "room_id": room_id,
+                        "depth": depth,
+                        "topological_ordering": depth,
+                        "stream_ordering": stream_ordering,
+                        "processed": True,
+                        "outlier": False,
+                    },
+                )
+
+                if event_id.startswith("insertion_event"):
+                    self.store.db_pool.simple_insert_txn(
+                        txn,
+                        table="insertion_event_extremities",
+                        values={
+                            "event_id": event_id,
+                            "room_id": room_id,
+                        },
+                    )
+
+                stream_ordering += 1
+
+        self.get_success(
+            self.store.db_pool.runInteraction(
+                "_setup_room_for_insertion_backfill_tests_populate_db",
+                populate_db,
+            )
+        )
+
+        return _BackfillSetupInfo(room_id=room_id, depth_map=depth_map)
+
+    def test_get_insertion_event_backward_extremities_in_room(self):
+        """
+        Test to make sure insertion event backward extremities are returned.
+        """
+        setup_info = self._setup_room_for_insertion_backfill_tests()
+        room_id = setup_info.room_id
+
+        backfill_points = self.get_success(
+            self.store.get_insertion_event_backward_extremities_in_room(room_id)
+        )
+        backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
+        self.assertListEqual(
+            backfill_event_ids, ["insertion_eventB", "insertion_eventA"]
+        )
+
+    def test_get_insertion_event_backward_extremities_in_room_excludes_events_we_have_attempted(
+        self,
+    ):
+        """
+        Test to make sure that insertion events we have attempted to backfill
+        (and within backoff timeout duration) do not show up as an event to
+        backfill again.
+        """
+        setup_info = self._setup_room_for_insertion_backfill_tests()
+        room_id = setup_info.room_id
+
+        # Record some attempts to backfill these events which will make
+        # `get_insertion_event_backward_extremities_in_room` exclude them
+        # because we haven't passed the backoff interval.
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(
+                room_id, "insertion_eventA", "fake cause"
+            )
+        )
+
+        # No time has passed since we attempted to backfill ^
+
+        backfill_points = self.get_success(
+            self.store.get_insertion_event_backward_extremities_in_room(room_id)
+        )
+        backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
+        # Only the backfill points that we didn't record earlier exist here.
+        self.assertListEqual(backfill_event_ids, ["insertion_eventB"])
+
+    def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_after_backoff_duration(
+        self,
+    ):
+        """
+        Test to make sure after we fake attempt to backfill event
+        "insertion_eventA" many times, we can see retry and see the
+        "insertion_eventA" again after the backoff timeout duration has
+        exceeded.
+        """
+        setup_info = self._setup_room_for_insertion_backfill_tests()
+        room_id = setup_info.room_id
+
+        # Record some attempts to backfill these events which will make
+        # `get_backfill_points_in_room` exclude them because we
+        # haven't passed the backoff interval.
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(
+                room_id, "insertion_eventB", "fake cause"
+            )
+        )
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(
+                room_id, "insertion_eventA", "fake cause"
+            )
+        )
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(
+                room_id, "insertion_eventA", "fake cause"
+            )
+        )
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(
+                room_id, "insertion_eventA", "fake cause"
+            )
+        )
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(
+                room_id, "insertion_eventA", "fake cause"
+            )
+        )
+
+        # Now advance time by 2 hours and we should only be able to see
+        # "insertion_eventB" because we have waited long enough for the single
+        # attempt (2^1 hours) but we still shouldn't see "insertion_eventA"
+        # because we haven't waited long enough for this many attempts.
+        self.reactor.advance(datetime.timedelta(hours=2).total_seconds())
+
+        # Make sure that "insertion_eventA" is not in the list because we've
+        # already attempted many times
+        backfill_points = self.get_success(
+            self.store.get_insertion_event_backward_extremities_in_room(room_id)
+        )
+        backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
+        self.assertListEqual(backfill_event_ids, ["insertion_eventB"])
+
+        # Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
+        # see if we can now backfill it
+        self.reactor.advance(datetime.timedelta(hours=20).total_seconds())
+
+        # Try at "insertion_eventA" again after we advanced enough time and we
+        # should see "insertion_eventA" again
+        backfill_points = self.get_success(
+            self.store.get_insertion_event_backward_extremities_in_room(room_id)
+        )
+        backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
+        self.assertListEqual(
+            backfill_event_ids, ["insertion_eventB", "insertion_eventA"]
+        )
+
 
 @attr.s
 class FakeEvent: