summary refs log tree commit diff
path: root/synapse/storage/databases/main/stream.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-09-29 21:48:33 +0100
committerGitHub <noreply@github.com>2020-09-29 21:48:33 +0100
commitea70f1c362dc4bd6c0f8a67e16ed0971fe095e5b (patch)
tree83a37ad1ffa3e1432dc4ca281a0cfec5ed600445 /synapse/storage/databases/main/stream.py
parentUpdate description of server_name config option (#8415) (diff)
downloadsynapse-ea70f1c362dc4bd6c0f8a67e16ed0971fe095e5b.tar.xz
Various clean ups to room stream tokens. (#8423)
Diffstat (limited to 'synapse/storage/databases/main/stream.py')
-rw-r--r--synapse/storage/databases/main/stream.py38
1 files changed, 21 insertions, 17 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 92e96468b4..37249f1e3f 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -35,7 +35,6 @@ what sort order was used:
     - topological tokems: "t%d-%d", where the integers map to the topological
       and stream ordering columns respectively.
 """
-
 import abc
 import logging
 from collections import namedtuple
@@ -54,7 +53,7 @@ from synapse.storage.database import (
 )
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
-from synapse.types import Collection, RoomStreamToken
+from synapse.types import Collection, PersistedEventPosition, RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 if TYPE_CHECKING:
@@ -305,6 +304,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
     def get_room_min_stream_ordering(self) -> int:
         raise NotImplementedError()
 
+    def get_room_max_token(self) -> RoomStreamToken:
+        return RoomStreamToken(None, self.get_room_max_stream_ordering())
+
     async def get_room_events_stream_for_rooms(
         self,
         room_ids: Collection[str],
@@ -611,26 +613,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
             allow_none=allow_none,
         )
 
-    async def get_stream_token_for_event(self, event_id: str) -> RoomStreamToken:
-        """The stream token for an event
-        Args:
-            event_id: The id of the event to look up a stream token for.
-        Raises:
-            StoreError if the event wasn't in the database.
-        Returns:
-            A stream token.
+    async def get_position_for_event(self, event_id: str) -> PersistedEventPosition:
+        """Get the persisted position for an event
         """
-        stream_id = await self.get_stream_id_for_event(event_id)
-        return RoomStreamToken(None, stream_id)
+        row = await self.db_pool.simple_select_one(
+            table="events",
+            keyvalues={"event_id": event_id},
+            retcols=("stream_ordering", "instance_name"),
+            desc="get_position_for_event",
+        )
+
+        return PersistedEventPosition(
+            row["instance_name"] or "master", row["stream_ordering"]
+        )
 
-    async def get_topological_token_for_event(self, event_id: str) -> str:
+    async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToken:
         """The stream token for an event
         Args:
             event_id: The id of the event to look up a stream token for.
         Raises:
             StoreError if the event wasn't in the database.
         Returns:
-            A "t%d-%d" topological token.
+            A `RoomStreamToken` topological token.
         """
         row = await self.db_pool.simple_select_one(
             table="events",
@@ -638,7 +642,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
             retcols=("stream_ordering", "topological_ordering"),
             desc="get_topological_token_for_event",
         )
-        return "t%d-%d" % (row["topological_ordering"], row["stream_ordering"])
+        return RoomStreamToken(row["topological_ordering"], row["stream_ordering"])
 
     async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
         """Gets the topological token in a room after or at the given stream
@@ -687,8 +691,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
             else:
                 topo = None
             internal = event.internal_metadata
-            internal.before = str(RoomStreamToken(topo, stream - 1))
-            internal.after = str(RoomStreamToken(topo, stream))
+            internal.before = RoomStreamToken(topo, stream - 1)
+            internal.after = RoomStreamToken(topo, stream)
             internal.order = (int(topo) if topo else 0, int(stream))
 
     async def get_events_around(