diff options
Diffstat (limited to 'synapse/storage/databases/main/stream.py')
-rw-r--r-- | synapse/storage/databases/main/stream.py | 34 |
1 files changed, 15 insertions, 19 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index b0642ca69f..319464b1fa 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -34,7 +34,7 @@ what sort order was used: - topological tokems: "t%d-%d", where the integers map to the topological and stream ordering columns respectively. """ -import abc + import logging from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple @@ -336,12 +336,7 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]: return " AND ".join(clauses), args -class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): - """This is an abstract base class where subclasses must implement - `get_room_max_stream_ordering` and `get_room_min_stream_ordering` - which can be called in the initializer. - """ - +class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def __init__( self, database: DatabasePool, @@ -379,13 +374,22 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): self._stream_order_on_start = self.get_room_max_stream_ordering() - @abc.abstractmethod def get_room_max_stream_ordering(self) -> int: - raise NotImplementedError() + """Get the stream_ordering of regular events that we have committed up to + + Returns the maximum stream id such that all stream ids less than or + equal to it have been successfully persisted. + """ + return self._stream_id_gen.get_current_token() - @abc.abstractmethod def get_room_min_stream_ordering(self) -> int: - raise NotImplementedError() + """Get the stream_ordering of backfilled events that we have committed up to + + Backfilled events use *negative* stream orderings, so this returns the + minimum negative stream id such that all stream ids greater than or + equal to it have been successfully persisted. + """ + return self._backfill_id_gen.get_current_token() def get_room_max_token(self) -> RoomStreamToken: """Get a `RoomStreamToken` that marks the current maximum persisted @@ -1351,11 +1355,3 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): retcol="instance_name", desc="get_name_from_instance_id", ) - - -class StreamStore(StreamWorkerStore): - def get_room_max_stream_ordering(self) -> int: - return self._stream_id_gen.get_current_token() - - def get_room_min_stream_ordering(self) -> int: - return self._backfill_id_gen.get_current_token() |