summary refs log tree commit diff
diff options
Diffstat (limited to '')
4 files changed, 353 insertions, 2 deletions
diff --git a/changelog.d/7281.misc b/changelog.d/7281.misc
new file mode 100644
index 0000000000..86ad511e19
--- /dev/null
+++ b/changelog.d/7281.misc
@@ -0,0 +1 @@
+Add MultiWriterIdGenerator to support multiple concurrent writers of streams.
diff --git a/synapse/storage/util/ b/synapse/storage/util/
index 9d851beaa5..86d04ea9ac 100644
--- a/synapse/storage/util/
+++ b/synapse/storage/util/
@@ -16,6 +16,11 @@
 import contextlib
 import threading
 from collections import deque
+from typing import Dict, Set, Tuple
+from typing_extensions import Deque
+from import Database, LoggingTransaction
 class IdGenerator(object):
@@ -87,7 +92,7 @@ class StreamIdGenerator(object):
             self._current = (max if step > 0 else min)(
                 self._current, _load_current_id(db_conn, table, column, step)
-        self._unfinished_ids = deque()
+        self._unfinished_ids = deque()  # type: Deque[int]
     def get_next(self):
@@ -163,7 +168,7 @@ class ChainedIdGenerator(object):
         self.chained_generator = chained_generator
         self._lock = threading.Lock()
         self._current_max = _load_current_id(db_conn, table, column)
-        self._unfinished_ids = deque()
+        self._unfinished_ids = deque()  # type: Deque[Tuple[int, int]]
     def get_next(self):
@@ -198,3 +203,163 @@ class ChainedIdGenerator(object):
                 return stream_id - 1, chained_id
             return self._current_max, self.chained_generator.get_current_token()
+class MultiWriterIdGenerator:
+    """An ID generator that tracks a stream that can have multiple writers.
+    Uses a Postgres sequence to coordinate ID assignment, but positions of other
+    writers will only get updated when `advance` is called (by replication).
+    Note: Only works with Postgres.
+    Args:
+        db_conn
+        db
+        instance_name: The name of this instance.
+        table: Database table associated with stream.
+        instance_column: Column that stores the row's writer's instance name
+        id_column: Column that stores the stream ID.
+        sequence_name: The name of the postgres sequence used to generate new
+            IDs.
+    """
+    def __init__(
+        self,
+        db_conn,
+        db: Database,
+        instance_name: str,
+        table: str,
+        instance_column: str,
+        id_column: str,
+        sequence_name: str,
+    ):
+        self._db = db
+        self._instance_name = instance_name
+        self._sequence_name = sequence_name
+        # We lock as some functions may be called from DB threads.
+        self._lock = threading.Lock()
+        self._current_positions = self._load_current_ids(
+            db_conn, table, instance_column, id_column
+        )
+        # Set of local IDs that we're still processing. The current position
+        # should be less than the minimum of this set (if not empty).
+        self._unfinished_ids = set()  # type: Set[int]
+    def _load_current_ids(
+        self, db_conn, table: str, instance_column: str, id_column: str
+    ) -> Dict[str, int]:
+        sql = """
+            SELECT %(instance)s, MAX(%(id)s) FROM %(table)s
+            GROUP BY %(instance)s
+        """ % {
+            "instance": instance_column,
+            "id": id_column,
+            "table": table,
+        }
+        cur = db_conn.cursor()
+        cur.execute(sql)
+        # `cur` is an iterable over returned rows, which are 2-tuples.
+        current_positions = dict(cur)
+        cur.close()
+        return current_positions
+    def _load_next_id_txn(self, txn):
+        txn.execute("SELECT nextval(?)", (self._sequence_name,))
+        (next_id,) = txn.fetchone()
+        return next_id
+    async def get_next(self):
+        """
+        Usage:
+            with await stream_id_gen.get_next() as stream_id:
+                # ... persist event ...
+        """
+        next_id = await self._db.runInteraction("_load_next_id", self._load_next_id_txn)
+        # Assert the fetched ID is actually greater than what we currently
+        # believe the ID to be. If not, then the sequence and table have got
+        # out of sync somehow.
+        assert self.get_current_token() < next_id
+        with self._lock:
+            self._unfinished_ids.add(next_id)
+        @contextlib.contextmanager
+        def manager():
+            try:
+                yield next_id
+            finally:
+                self._mark_id_as_finished(next_id)
+        return manager()
+    def get_next_txn(self, txn: LoggingTransaction):
+        """
+        Usage:
+            stream_id = stream_id_gen.get_next(txn)
+            # ... persist event ...
+        """
+        next_id = self._load_next_id_txn(txn)
+        with self._lock:
+            self._unfinished_ids.add(next_id)
+        txn.call_after(self._mark_id_as_finished, next_id)
+        txn.call_on_exception(self._mark_id_as_finished, next_id)
+        return next_id
+    def _mark_id_as_finished(self, next_id: int):
+        """The ID has finished being processed so we should advance the
+        current poistion if possible.
+        """
+        with self._lock:
+            self._unfinished_ids.discard(next_id)
+            # Figure out if its safe to advance the position by checking there
+            # aren't any lower allocated IDs that are yet to finish.
+            if all(c > next_id for c in self._unfinished_ids):
+                curr = self._current_positions.get(self._instance_name, 0)
+                self._current_positions[self._instance_name] = max(curr, next_id)
+    def get_current_token(self, instance_name: str = None) -> int:
+        """Gets the current position of a named writer (defaults to current
+        instance).
+        Returns 0 if we don't have a position for the named writer (likely due
+        to it being a new writer).
+        """
+        if instance_name is None:
+            instance_name = self._instance_name
+        with self._lock:
+            return self._current_positions.get(instance_name, 0)
+    def get_positions(self) -> Dict[str, int]:
+        """Get a copy of the current positon map.
+        """
+        with self._lock:
+            return dict(self._current_positions)
+    def advance(self, instance_name: str, new_id: int):
+        """Advance the postion of the named writer to the given ID, if greater
+        than existing entry.
+        """
+        with self._lock:
+            self._current_positions[instance_name] = max(
+                new_id, self._current_positions.get(instance_name, 0)
+            )
diff --git a/tests/storage/ b/tests/storage/
new file mode 100644
index 0000000000..55e9ecf264
--- /dev/null
+++ b/tests/storage/
@@ -0,0 +1,184 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Foundation C.I.C.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from import Database
+from import MultiWriterIdGenerator
+from tests.unittest import HomeserverTestCase
+from tests.utils import USE_POSTGRES_FOR_TESTS
+class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
+        skip = "Requires Postgres"
+    def prepare(self, reactor, clock, hs):
+ = hs.get_datastore()
+        self.db =  # type: Database
+        self.get_success(self.db.runInteraction("_setup_db", self._setup_db))
+    def _setup_db(self, txn):
+        txn.execute("CREATE SEQUENCE foobar_seq")
+        txn.execute(
+            """
+            CREATE TABLE foobar (
+                stream_id BIGINT NOT NULL,
+                instance_name TEXT NOT NULL,
+                data TEXT
+            );
+            """
+        )
+    def _create_id_generator(self, instance_name="master") -> MultiWriterIdGenerator:
+        def _create(conn):
+            return MultiWriterIdGenerator(
+                conn,
+                self.db,
+                instance_name=instance_name,
+                table="foobar",
+                instance_column="instance_name",
+                id_column="stream_id",
+                sequence_name="foobar_seq",
+            )
+        return self.get_success(self.db.runWithConnection(_create))
+    def _insert_rows(self, instance_name: str, number: int):
+        def _insert(txn):
+            for _ in range(number):
+                txn.execute(
+                    "INSERT INTO foobar VALUES (nextval('foobar_seq'), ?)",
+                    (instance_name,),
+                )
+        self.get_success(self.db.runInteraction("test_single_instance", _insert))
+    def test_empty(self):
+        """Test an ID generator against an empty database gives sensible
+        current positions.
+        """
+        id_gen = self._create_id_generator()
+        # The table is empty so we expect an empty map for positions
+        self.assertEqual(id_gen.get_positions(), {})
+    def test_single_instance(self):
+        """Test that reads and writes from a single process are handled
+        correctly.
+        """
+        # Prefill table with 7 rows written by 'master'
+        self._insert_rows("master", 7)
+        id_gen = self._create_id_generator()
+        self.assertEqual(id_gen.get_positions(), {"master": 7})
+        self.assertEqual(id_gen.get_current_token("master"), 7)
+        # Try allocating a new ID gen and check that we only see position
+        # advanced after we leave the context manager.
+        async def _get_next_async():
+            with await id_gen.get_next() as stream_id:
+                self.assertEqual(stream_id, 8)
+                self.assertEqual(id_gen.get_positions(), {"master": 7})
+                self.assertEqual(id_gen.get_current_token("master"), 7)
+        self.get_success(_get_next_async())
+        self.assertEqual(id_gen.get_positions(), {"master": 8})
+        self.assertEqual(id_gen.get_current_token("master"), 8)
+    def test_multi_instance(self):
+        """Test that reads and writes from multiple processes are handled
+        correctly.
+        """
+        self._insert_rows("first", 3)
+        self._insert_rows("second", 4)
+        first_id_gen = self._create_id_generator("first")
+        second_id_gen = self._create_id_generator("second")
+        self.assertEqual(first_id_gen.get_positions(), {"first": 3, "second": 7})
+        self.assertEqual(first_id_gen.get_current_token("first"), 3)
+        self.assertEqual(first_id_gen.get_current_token("second"), 7)
+        # Try allocating a new ID gen and check that we only see position
+        # advanced after we leave the context manager.
+        async def _get_next_async():
+            with await first_id_gen.get_next() as stream_id:
+                self.assertEqual(stream_id, 8)
+                self.assertEqual(
+                    first_id_gen.get_positions(), {"first": 3, "second": 7}
+                )
+        self.get_success(_get_next_async())
+        self.assertEqual(first_id_gen.get_positions(), {"first": 8, "second": 7})
+        # However the ID gen on the second instance won't have seen the update
+        self.assertEqual(second_id_gen.get_positions(), {"first": 3, "second": 7})
+        # ... but calling `get_next` on the second instance should give a unique
+        # stream ID
+        async def _get_next_async():
+            with await second_id_gen.get_next() as stream_id:
+                self.assertEqual(stream_id, 9)
+                self.assertEqual(
+                    second_id_gen.get_positions(), {"first": 3, "second": 7}
+                )
+        self.get_success(_get_next_async())
+        self.assertEqual(second_id_gen.get_positions(), {"first": 3, "second": 9})
+        # If the second ID gen gets told about the first, it correctly updates
+        second_id_gen.advance("first", 8)
+        self.assertEqual(second_id_gen.get_positions(), {"first": 8, "second": 9})
+    def test_get_next_txn(self):
+        """Test that the `get_next_txn` function works correctly.
+        """
+        # Prefill table with 7 rows written by 'master'
+        self._insert_rows("master", 7)
+        id_gen = self._create_id_generator()
+        self.assertEqual(id_gen.get_positions(), {"master": 7})
+        self.assertEqual(id_gen.get_current_token("master"), 7)
+        # Try allocating a new ID gen and check that we only see position
+        # advanced after we leave the context manager.
+        def _get_next_txn(txn):
+            stream_id = id_gen.get_next_txn(txn)
+            self.assertEqual(stream_id, 8)
+            self.assertEqual(id_gen.get_positions(), {"master": 7})
+            self.assertEqual(id_gen.get_current_token("master"), 7)
+        self.get_success(self.db.runInteraction("test", _get_next_txn))
+        self.assertEqual(id_gen.get_positions(), {"master": 8})
+        self.assertEqual(id_gen.get_current_token("master"), 8)
diff --git a/tox.ini b/tox.ini
index eccc44e436..4a1509c51b 100644
--- a/tox.ini
+++ b/tox.ini
@@ -203,6 +203,7 @@ commands = mypy \
             synapse/storage/data_stores/main/ \
             synapse/storage/ \
             synapse/storage/engines \
+            synapse/storage/util \
             synapse/streams \
             synapse/util/caches/ \
             tests/replication/tcp/streams \