diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 57aab55259..319464b1fa 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -34,11 +34,11 @@ what sort order was used:
- topological tokems: "t%d-%d", where the integers map to the topological
and stream ordering columns respectively.
"""
-import abc
+
import logging
-from collections import namedtuple
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple
+import attr
from frozendict import frozendict
from twisted.internet import defer
@@ -49,6 +49,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
+ LoggingDatabaseConnection,
LoggingTransaction,
make_in_list_sql_clause,
)
@@ -73,9 +74,11 @@ _TOPOLOGICAL_TOKEN = "topological"
# Used as return values for pagination APIs
-_EventDictReturn = namedtuple(
- "_EventDictReturn", ("event_id", "topological_ordering", "stream_ordering")
-)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _EventDictReturn:
+ event_id: str
+ topological_ordering: Optional[int]
+ stream_ordering: int
def generate_pagination_where_clause(
@@ -333,13 +336,13 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
return " AND ".join(clauses), args
-class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
- """This is an abstract base class where subclasses must implement
- `get_room_max_stream_ordering` and `get_room_min_stream_ordering`
- which can be called in the initializer.
- """
-
- def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
+class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: LoggingDatabaseConnection,
+ hs: "HomeServer",
+ ):
super().__init__(database, db_conn, hs)
self._instance_name = hs.get_instance_name()
@@ -371,13 +374,22 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
self._stream_order_on_start = self.get_room_max_stream_ordering()
- @abc.abstractmethod
def get_room_max_stream_ordering(self) -> int:
- raise NotImplementedError()
+ """Get the stream_ordering of regular events that we have committed up to
+
+ Returns the maximum stream id such that all stream ids less than or
+ equal to it have been successfully persisted.
+ """
+ return self._stream_id_gen.get_current_token()
- @abc.abstractmethod
def get_room_min_stream_ordering(self) -> int:
- raise NotImplementedError()
+ """Get the stream_ordering of backfilled events that we have committed up to
+
+ Backfilled events use *negative* stream orderings, so this returns the
+ minimum negative stream id such that all stream ids greater than or
+ equal to it have been successfully persisted.
+ """
+ return self._backfill_id_gen.get_current_token()
def get_room_max_token(self) -> RoomStreamToken:
"""Get a `RoomStreamToken` that marks the current maximum persisted
@@ -819,7 +831,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
for event, row in zip(events, rows):
stream = row.stream_ordering
if topo_order and row.topological_ordering:
- topo = row.topological_ordering
+ topo: Optional[int] = row.topological_ordering
else:
topo = None
internal = event.internal_metadata
@@ -1343,11 +1355,3 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
retcol="instance_name",
desc="get_name_from_instance_id",
)
-
-
-class StreamStore(StreamWorkerStore):
- def get_room_max_stream_ordering(self) -> int:
- return self._stream_id_gen.get_current_token()
-
- def get_room_min_stream_ordering(self) -> int:
- return self._backfill_id_gen.get_current_token()
|