diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 92e96468b4..37249f1e3f 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -35,7 +35,6 @@ what sort order was used:
- topological tokems: "t%d-%d", where the integers map to the topological
and stream ordering columns respectively.
"""
-
import abc
import logging
from collections import namedtuple
@@ -54,7 +53,7 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
-from synapse.types import Collection, RoomStreamToken
+from synapse.types import Collection, PersistedEventPosition, RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
if TYPE_CHECKING:
@@ -305,6 +304,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
def get_room_min_stream_ordering(self) -> int:
raise NotImplementedError()
+ def get_room_max_token(self) -> RoomStreamToken:
+ return RoomStreamToken(None, self.get_room_max_stream_ordering())
+
async def get_room_events_stream_for_rooms(
self,
room_ids: Collection[str],
@@ -611,26 +613,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
allow_none=allow_none,
)
- async def get_stream_token_for_event(self, event_id: str) -> RoomStreamToken:
- """The stream token for an event
- Args:
- event_id: The id of the event to look up a stream token for.
- Raises:
- StoreError if the event wasn't in the database.
- Returns:
- A stream token.
+ async def get_position_for_event(self, event_id: str) -> PersistedEventPosition:
+ """Get the persisted position for an event
"""
- stream_id = await self.get_stream_id_for_event(event_id)
- return RoomStreamToken(None, stream_id)
+ row = await self.db_pool.simple_select_one(
+ table="events",
+ keyvalues={"event_id": event_id},
+ retcols=("stream_ordering", "instance_name"),
+ desc="get_position_for_event",
+ )
+
+ return PersistedEventPosition(
+ row["instance_name"] or "master", row["stream_ordering"]
+ )
- async def get_topological_token_for_event(self, event_id: str) -> str:
+ async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToken:
"""The stream token for an event
Args:
event_id: The id of the event to look up a stream token for.
Raises:
StoreError if the event wasn't in the database.
Returns:
- A "t%d-%d" topological token.
+ A `RoomStreamToken` topological token.
"""
row = await self.db_pool.simple_select_one(
table="events",
@@ -638,7 +642,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
retcols=("stream_ordering", "topological_ordering"),
desc="get_topological_token_for_event",
)
- return "t%d-%d" % (row["topological_ordering"], row["stream_ordering"])
+ return RoomStreamToken(row["topological_ordering"], row["stream_ordering"])
async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
"""Gets the topological token in a room after or at the given stream
@@ -687,8 +691,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
else:
topo = None
internal = event.internal_metadata
- internal.before = str(RoomStreamToken(topo, stream - 1))
- internal.after = str(RoomStreamToken(topo, stream))
+ internal.before = RoomStreamToken(topo, stream - 1)
+ internal.after = RoomStreamToken(topo, stream)
internal.order = (int(topo) if topo else 0, int(stream))
async def get_events_around(
|