summary refs log tree commit diff
path: root/synapse/storage/util
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-05-30 12:07:32 +0100
committerGitHub <noreply@github.com>2024-05-30 11:07:32 +0000
commitd16910ca021320f0fa09c6cf82a802ee97e22a0c (patch)
treeeea35c87fecdccec394c8afee0ad23182d4c783d /synapse/storage/util
parentClean out invalid destinations from outbox (#17242) (diff)
downloadsynapse-d16910ca021320f0fa09c6cf82a802ee97e22a0c.tar.xz
Replaces all usages of `StreamIdGenerator` with `MultiWriterIdGenerator` (#17229)
Replaces all usages of `StreamIdGenerator` with `MultiWriterIdGenerator`, which is safer.
Diffstat (limited to 'synapse/storage/util')
-rw-r--r--synapse/storage/util/id_generators.py158
1 files changed, 0 insertions, 158 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 0cf5851ad7..59c8e05c39 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -23,15 +23,12 @@ import abc
 import heapq
 import logging
 import threading
-from collections import OrderedDict
-from contextlib import contextmanager
 from types import TracebackType
 from typing import (
     TYPE_CHECKING,
     AsyncContextManager,
     ContextManager,
     Dict,
-    Generator,
     Generic,
     Iterable,
     List,
@@ -179,161 +176,6 @@ class AbstractStreamIdGenerator(metaclass=abc.ABCMeta):
         raise NotImplementedError()
 
 
-class StreamIdGenerator(AbstractStreamIdGenerator):
-    """Generates and tracks stream IDs for a stream with a single writer.
-
-    This class must only be used when the current Synapse process is the sole
-    writer for a stream.
-
-    Args:
-        db_conn(connection):  A database connection to use to fetch the
-            initial value of the generator from.
-        table(str): A database table to read the initial value of the id
-            generator from.
-        column(str): The column of the database table to read the initial
-            value from the id generator from.
-        extra_tables(list): List of pairs of database tables and columns to
-            use to source the initial value of the generator from. The value
-            with the largest magnitude is used.
-        step(int): which direction the stream ids grow in. +1 to grow
-            upwards, -1 to grow downwards.
-
-    Usage:
-        async with stream_id_gen.get_next() as stream_id:
-            # ... persist event ...
-    """
-
-    def __init__(
-        self,
-        db_conn: LoggingDatabaseConnection,
-        notifier: "ReplicationNotifier",
-        table: str,
-        column: str,
-        extra_tables: Iterable[Tuple[str, str]] = (),
-        step: int = 1,
-        is_writer: bool = True,
-    ) -> None:
-        assert step != 0
-        self._lock = threading.Lock()
-        self._step: int = step
-        self._current: int = _load_current_id(db_conn, table, column, step)
-        self._is_writer = is_writer
-        for table, column in extra_tables:
-            self._current = (max if step > 0 else min)(
-                self._current, _load_current_id(db_conn, table, column, step)
-            )
-
-        # We use this as an ordered set, as we want to efficiently append items,
-        # remove items and get the first item. Since we insert IDs in order, the
-        # insertion ordering will ensure its in the correct ordering.
-        #
-        # The key and values are the same, but we never look at the values.
-        self._unfinished_ids: OrderedDict[int, int] = OrderedDict()
-
-        self._notifier = notifier
-
-    def advance(self, instance_name: str, new_id: int) -> None:
-        # Advance should never be called on a writer instance, only over replication
-        if self._is_writer:
-            raise Exception("Replication is not supported by writer StreamIdGenerator")
-
-        self._current = (max if self._step > 0 else min)(self._current, new_id)
-
-    def get_next(self) -> AsyncContextManager[int]:
-        with self._lock:
-            self._current += self._step
-            next_id = self._current
-
-            self._unfinished_ids[next_id] = next_id
-
-        @contextmanager
-        def manager() -> Generator[int, None, None]:
-            try:
-                yield next_id
-            finally:
-                with self._lock:
-                    self._unfinished_ids.pop(next_id)
-
-                self._notifier.notify_replication()
-
-        return _AsyncCtxManagerWrapper(manager())
-
-    def get_next_mult(self, n: int) -> AsyncContextManager[Sequence[int]]:
-        with self._lock:
-            next_ids = range(
-                self._current + self._step,
-                self._current + self._step * (n + 1),
-                self._step,
-            )
-            self._current += n * self._step
-
-            for next_id in next_ids:
-                self._unfinished_ids[next_id] = next_id
-
-        @contextmanager
-        def manager() -> Generator[Sequence[int], None, None]:
-            try:
-                yield next_ids
-            finally:
-                with self._lock:
-                    for next_id in next_ids:
-                        self._unfinished_ids.pop(next_id)
-
-                self._notifier.notify_replication()
-
-        return _AsyncCtxManagerWrapper(manager())
-
-    def get_next_txn(self, txn: LoggingTransaction) -> int:
-        """
-        Retrieve the next stream ID from within a database transaction.
-
-        Clean-up functions will be called when the transaction finishes.
-
-        Args:
-            txn: The database transaction object.
-
-        Returns:
-            The next stream ID.
-        """
-        if not self._is_writer:
-            raise Exception("Tried to allocate stream ID on non-writer")
-
-        # Get the next stream ID.
-        with self._lock:
-            self._current += self._step
-            next_id = self._current
-
-            self._unfinished_ids[next_id] = next_id
-
-        def clear_unfinished_id(id_to_clear: int) -> None:
-            """A function to mark processing this ID as finished"""
-            with self._lock:
-                self._unfinished_ids.pop(id_to_clear)
-
-        # Mark this ID as finished once the database transaction itself finishes.
-        txn.call_after(clear_unfinished_id, next_id)
-        txn.call_on_exception(clear_unfinished_id, next_id)
-
-        # Return the new ID.
-        return next_id
-
-    def get_current_token(self) -> int:
-        if not self._is_writer:
-            return self._current
-
-        with self._lock:
-            if self._unfinished_ids:
-                return next(iter(self._unfinished_ids)) - self._step
-
-            return self._current
-
-    def get_current_token_for_writer(self, instance_name: str) -> int:
-        return self.get_current_token()
-
-    def get_minimal_local_current_token(self) -> int:
-        return self.get_current_token()
-
-
 class MultiWriterIdGenerator(AbstractStreamIdGenerator):
     """Generates and tracks stream IDs for a stream with multiple writers.