summary refs log tree commit diff
path: root/tests/storage
diff options
context:
space:
mode:
Diffstat (limited to 'tests/storage')
-rw-r--r--tests/storage/test_id_generators.py112
1 files changed, 106 insertions, 6 deletions
diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py
index cc0612cf65..3e2fd4da01 100644
--- a/tests/storage/test_id_generators.py
+++ b/tests/storage/test_id_generators.py
@@ -51,9 +51,7 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
                 self.db_pool,
                 stream_name="test_stream",
                 instance_name=instance_name,
-                table="foobar",
-                instance_column="instance_name",
-                id_column="stream_id",
+                tables=[("foobar", "instance_name", "stream_id")],
                 sequence_name="foobar_seq",
                 writers=writers,
             )
@@ -487,9 +485,7 @@ class BackwardsMultiWriterIdGeneratorTestCase(HomeserverTestCase):
                 self.db_pool,
                 stream_name="test_stream",
                 instance_name=instance_name,
-                table="foobar",
-                instance_column="instance_name",
-                id_column="stream_id",
+                tables=[("foobar", "instance_name", "stream_id")],
                 sequence_name="foobar_seq",
                 writers=writers,
                 positive=False,
@@ -579,3 +575,107 @@ class BackwardsMultiWriterIdGeneratorTestCase(HomeserverTestCase):
         self.assertEqual(id_gen_2.get_positions(), {"first": -1, "second": -2})
         self.assertEqual(id_gen_1.get_persisted_upto_position(), -2)
         self.assertEqual(id_gen_2.get_persisted_upto_position(), -2)
+
+
+class MultiTableMultiWriterIdGeneratorTestCase(HomeserverTestCase):
+    if not USE_POSTGRES_FOR_TESTS:
+        skip = "Requires Postgres"
+
+    def prepare(self, reactor, clock, hs):
+        self.store = hs.get_datastore()
+        self.db_pool = self.store.db_pool  # type: DatabasePool
+
+        self.get_success(self.db_pool.runInteraction("_setup_db", self._setup_db))
+
+    def _setup_db(self, txn):
+        txn.execute("CREATE SEQUENCE foobar_seq")
+        txn.execute(
+            """
+            CREATE TABLE foobar1 (
+                stream_id BIGINT NOT NULL,
+                instance_name TEXT NOT NULL,
+                data TEXT
+            );
+            """
+        )
+
+        txn.execute(
+            """
+            CREATE TABLE foobar2 (
+                stream_id BIGINT NOT NULL,
+                instance_name TEXT NOT NULL,
+                data TEXT
+            );
+            """
+        )
+
+    def _create_id_generator(
+        self, instance_name="master", writers=["master"]
+    ) -> MultiWriterIdGenerator:
+        def _create(conn):
+            return MultiWriterIdGenerator(
+                conn,
+                self.db_pool,
+                stream_name="test_stream",
+                instance_name=instance_name,
+                tables=[
+                    ("foobar1", "instance_name", "stream_id"),
+                    ("foobar2", "instance_name", "stream_id"),
+                ],
+                sequence_name="foobar_seq",
+                writers=writers,
+            )
+
+        return self.get_success_or_raise(self.db_pool.runWithConnection(_create))
+
+    def _insert_rows(
+        self,
+        table: str,
+        instance_name: str,
+        number: int,
+        update_stream_table: bool = True,
+    ):
+        """Insert N rows as the given instance, inserting with stream IDs pulled
+        from the postgres sequence.
+        """
+
+        def _insert(txn):
+            for _ in range(number):
+                txn.execute(
+                    "INSERT INTO %s VALUES (nextval('foobar_seq'), ?)" % (table,),
+                    (instance_name,),
+                )
+                if update_stream_table:
+                    txn.execute(
+                        """
+                        INSERT INTO stream_positions VALUES ('test_stream', ?,  lastval())
+                        ON CONFLICT (stream_name, instance_name) DO UPDATE SET stream_id = lastval()
+                        """,
+                        (instance_name,),
+                    )
+
+        self.get_success(self.db_pool.runInteraction("_insert_rows", _insert))
+
+    def test_load_existing_stream(self):
+        """Test creating ID gens with multiple tables that have rows from after
+        the position in `stream_positions` table.
+        """
+        self._insert_rows("foobar1", "first", 3)
+        self._insert_rows("foobar2", "second", 3)
+        self._insert_rows("foobar2", "second", 1, update_stream_table=False)
+
+        first_id_gen = self._create_id_generator("first", writers=["first", "second"])
+        second_id_gen = self._create_id_generator("second", writers=["first", "second"])
+
+        # The first ID gen will notice that it can advance its token to 7 as it
+        # has no in progress writes...
+        self.assertEqual(first_id_gen.get_positions(), {"first": 7, "second": 6})
+        self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 7)
+        self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 6)
+        self.assertEqual(first_id_gen.get_persisted_upto_position(), 7)
+
+        # ... but the second ID gen doesn't know that.
+        self.assertEqual(second_id_gen.get_positions(), {"first": 3, "second": 7})
+        self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 3)
+        self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 7)
+        self.assertEqual(first_id_gen.get_persisted_upto_position(), 7)