summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/__init__.py4
-rw-r--r--synapse/storage/databases/main/events_worker.py4
-rw-r--r--synapse/storage/databases/main/stream.py34
3 files changed, 17 insertions, 25 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index a594223fc6..f024761ba7 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -68,7 +68,7 @@ from .session import SessionStore
 from .signatures import SignatureStore
 from .state import StateStore
 from .stats import StatsStore
-from .stream import StreamStore
+from .stream import StreamWorkerStore
 from .tags import TagsStore
 from .transactions import TransactionWorkerStore
 from .ui_auth import UIAuthStore
@@ -87,7 +87,7 @@ class DataStore(
     RoomStore,
     RoomBatchStore,
     RegistrationStore,
-    StreamStore,
+    StreamWorkerStore,
     ProfileStore,
     PresenceStore,
     TransactionWorkerStore,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index c7b660ac5a..8d4287045a 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1383,10 +1383,6 @@ class EventsWorkerStore(SQLBaseStore):
 
         return {"v1": complexity_v1}
 
-    def get_current_events_token(self) -> int:
-        """The current maximum token that events have reached"""
-        return self._stream_id_gen.get_current_token()
-
     async def get_all_new_forward_event_rows(
         self, instance_name: str, last_id: int, current_id: int, limit: int
     ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index b0642ca69f..319464b1fa 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -34,7 +34,7 @@ what sort order was used:
     - topological tokems: "t%d-%d", where the integers map to the topological
       and stream ordering columns respectively.
 """
-import abc
+
 import logging
 from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple
 
@@ -336,12 +336,7 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
     return " AND ".join(clauses), args
 
 
-class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
-    """This is an abstract base class where subclasses must implement
-    `get_room_max_stream_ordering` and `get_room_min_stream_ordering`
-    which can be called in the initializer.
-    """
-
+class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
     def __init__(
         self,
         database: DatabasePool,
@@ -379,13 +374,22 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
 
         self._stream_order_on_start = self.get_room_max_stream_ordering()
 
-    @abc.abstractmethod
     def get_room_max_stream_ordering(self) -> int:
-        raise NotImplementedError()
+        """Get the stream_ordering of regular events that we have committed up to
+
+        Returns the maximum stream id such that all stream ids less than or
+        equal to it have been successfully persisted.
+        """
+        return self._stream_id_gen.get_current_token()
 
-    @abc.abstractmethod
     def get_room_min_stream_ordering(self) -> int:
-        raise NotImplementedError()
+        """Get the stream_ordering of backfilled events that we have committed up to
+
+        Backfilled events use *negative* stream orderings, so this returns the
+        minimum negative stream id such that all stream ids greater than or
+        equal to it have been successfully persisted.
+        """
+        return self._backfill_id_gen.get_current_token()
 
     def get_room_max_token(self) -> RoomStreamToken:
         """Get a `RoomStreamToken` that marks the current maximum persisted
@@ -1351,11 +1355,3 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
             retcol="instance_name",
             desc="get_name_from_instance_id",
         )
-
-
-class StreamStore(StreamWorkerStore):
-    def get_room_max_stream_ordering(self) -> int:
-        return self._stream_id_gen.get_current_token()
-
-    def get_room_min_stream_ordering(self) -> int:
-        return self._backfill_id_gen.get_current_token()