diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index dd981c597e..1ce2091b46 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -153,7 +153,7 @@ class AdminHandler(BaseHandler):
if not events:
break
- from_key = RoomStreamToken.parse(events[-1].internal_metadata.after)
+ from_key = events[-1].internal_metadata.after
events = await filter_events_for_client(self.storage, user_id, events)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 4149520d6c..b9d9098104 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -29,7 +29,6 @@ from synapse.api.errors import (
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import (
- RoomStreamToken,
StreamToken,
get_domain_from_id,
get_verify_key_from_cross_signing_key,
@@ -113,8 +112,7 @@ class DeviceWorkerHandler(BaseHandler):
set_tag("user_id", user_id)
set_tag("from_token", from_token)
- now_room_id = self.store.get_room_max_stream_ordering()
- now_room_key = RoomStreamToken(None, now_room_id)
+ now_room_key = self.store.get_room_max_token()
room_ids = await self.store.get_rooms_for_user(user_id)
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 8cd7eb22a3..43f15435de 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -325,7 +325,8 @@ class InitialSyncHandler(BaseHandler):
if limit is None:
limit = 10
- stream_token = await self.store.get_stream_token_for_event(member_event_id)
+ leave_position = await self.store.get_position_for_event(member_event_id)
+ stream_token = leave_position.to_room_stream_token()
messages, token = await self.store.get_recent_events_for_room(
room_id, limit=limit, end_token=stream_token
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index a0b3bdb5e0..d6779a4b44 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -25,7 +25,7 @@ from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
-from synapse.types import Requester, RoomStreamToken
+from synapse.types import Requester
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
@@ -373,10 +373,9 @@ class PaginationHandler:
# case "JOIN" would have been returned.
assert member_event_id
- leave_token_str = await self.store.get_topological_token_for_event(
+ leave_token = await self.store.get_topological_token_for_event(
member_event_id
)
- leave_token = RoomStreamToken.parse(leave_token_str)
assert leave_token.topological is not None
if leave_token.topological < curr_topo:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 11bf146bed..836b3f381a 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1134,14 +1134,14 @@ class RoomEventSource:
events[:] = events[:limit]
if events:
- end_key = RoomStreamToken.parse(events[-1].internal_metadata.after)
+ end_key = events[-1].internal_metadata.after
else:
end_key = to_key
return (events, end_key)
def get_current_key(self) -> RoomStreamToken:
- return RoomStreamToken(None, self.store.get_room_max_stream_ordering())
+ return self.store.get_room_max_token()
def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
return self.store.get_room_events_max_id(room_id)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index e948efef2e..bfe2583002 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -519,7 +519,7 @@ class SyncHandler:
if len(recents) > timeline_limit:
limited = True
recents = recents[-timeline_limit:]
- room_key = RoomStreamToken.parse(recents[0].internal_metadata.before)
+ room_key = recents[0].internal_metadata.before
prev_batch_token = now_token.copy_and_replace("room_key", room_key)
@@ -1595,16 +1595,24 @@ class SyncHandler:
if leave_events:
leave_event = leave_events[-1]
- leave_stream_token = await self.store.get_stream_token_for_event(
+ leave_position = await self.store.get_position_for_event(
leave_event.event_id
)
- leave_token = since_token.copy_and_replace(
- "room_key", leave_stream_token
- )
- if since_token and since_token.is_after(leave_token):
+ # If the leave event happened before the since token then we
+ # bail.
+ if since_token and not leave_position.persisted_after(
+ since_token.room_key
+ ):
continue
+ # We can safely convert the position of the leave event into a
+ # stream token as it'll only be used in the context of this
+ # room. (c.f. the docstring of `to_room_stream_token`).
+ leave_token = since_token.copy_and_replace(
+ "room_key", leave_position.to_room_stream_token()
+ )
+
# If this is an out of band message, like a remote invite
# rejection, we include it in the recents batch. Otherwise, we
# let _load_filtered_recents handle fetching the correct
|