summary refs log tree commit diff
path: root/synapse/storage/databases/main/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/stream.py')
-rw-r--r--synapse/storage/databases/main/stream.py34
1 files changed, 15 insertions, 19 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index b0642ca69f..319464b1fa 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -34,7 +34,7 @@ what sort order was used:
     - topological tokems: "t%d-%d", where the integers map to the topological
       and stream ordering columns respectively.
 """
-import abc
+
 import logging
 from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple
 
@@ -336,12 +336,7 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
     return " AND ".join(clauses), args
 
 
-class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
-    """This is an abstract base class where subclasses must implement
-    `get_room_max_stream_ordering` and `get_room_min_stream_ordering`
-    which can be called in the initializer.
-    """
-
+class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
     def __init__(
         self,
         database: DatabasePool,
@@ -379,13 +374,22 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
 
         self._stream_order_on_start = self.get_room_max_stream_ordering()
 
-    @abc.abstractmethod
     def get_room_max_stream_ordering(self) -> int:
-        raise NotImplementedError()
+        """Get the stream_ordering of regular events that we have committed up to
+
+        Returns the maximum stream id such that all stream ids less than or
+        equal to it have been successfully persisted.
+        """
+        return self._stream_id_gen.get_current_token()
 
-    @abc.abstractmethod
     def get_room_min_stream_ordering(self) -> int:
-        raise NotImplementedError()
+        """Get the stream_ordering of backfilled events that we have committed up to
+
+        Backfilled events use *negative* stream orderings, so this returns the
+        minimum negative stream id such that all stream ids greater than or
+        equal to it have been successfully persisted.
+        """
+        return self._backfill_id_gen.get_current_token()
 
     def get_room_max_token(self) -> RoomStreamToken:
         """Get a `RoomStreamToken` that marks the current maximum persisted
@@ -1351,11 +1355,3 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
             retcol="instance_name",
             desc="get_name_from_instance_id",
         )
-
-
-class StreamStore(StreamWorkerStore):
-    def get_room_max_stream_ordering(self) -> int:
-        return self._stream_id_gen.get_current_token()
-
-    def get_room_min_stream_ordering(self) -> int:
-        return self._backfill_id_gen.get_current_token()