summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-09-29 21:48:33 +0100
committerGitHub <noreply@github.com>2020-09-29 21:48:33 +0100
commitea70f1c362dc4bd6c0f8a67e16ed0971fe095e5b (patch)
tree83a37ad1ffa3e1432dc4ca281a0cfec5ed600445 /synapse/handlers
parentUpdate description of server_name config option (#8415) (diff)
downloadsynapse-ea70f1c362dc4bd6c0f8a67e16ed0971fe095e5b.tar.xz
Various clean ups to room stream tokens. (#8423)
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/admin.py2
-rw-r--r--synapse/handlers/device.py4
-rw-r--r--synapse/handlers/initial_sync.py3
-rw-r--r--synapse/handlers/pagination.py5
-rw-r--r--synapse/handlers/room.py4
-rw-r--r--synapse/handlers/sync.py20
6 files changed, 22 insertions, 16 deletions
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index dd981c597e..1ce2091b46 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -153,7 +153,7 @@ class AdminHandler(BaseHandler):
                 if not events:
                     break
 
-                from_key = RoomStreamToken.parse(events[-1].internal_metadata.after)
+                from_key = events[-1].internal_metadata.after
 
                 events = await filter_events_for_client(self.storage, user_id, events)
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 4149520d6c..b9d9098104 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -29,7 +29,6 @@ from synapse.api.errors import (
 from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import (
-    RoomStreamToken,
     StreamToken,
     get_domain_from_id,
     get_verify_key_from_cross_signing_key,
@@ -113,8 +112,7 @@ class DeviceWorkerHandler(BaseHandler):
 
         set_tag("user_id", user_id)
         set_tag("from_token", from_token)
-        now_room_id = self.store.get_room_max_stream_ordering()
-        now_room_key = RoomStreamToken(None, now_room_id)
+        now_room_key = self.store.get_room_max_token()
 
         room_ids = await self.store.get_rooms_for_user(user_id)
 
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 8cd7eb22a3..43f15435de 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -325,7 +325,8 @@ class InitialSyncHandler(BaseHandler):
         if limit is None:
             limit = 10
 
-        stream_token = await self.store.get_stream_token_for_event(member_event_id)
+        leave_position = await self.store.get_position_for_event(member_event_id)
+        stream_token = leave_position.to_room_stream_token()
 
         messages, token = await self.store.get_recent_events_for_room(
             room_id, limit=limit, end_token=stream_token
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index a0b3bdb5e0..d6779a4b44 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -25,7 +25,7 @@ from synapse.logging.context import run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.state import StateFilter
 from synapse.streams.config import PaginationConfig
-from synapse.types import Requester, RoomStreamToken
+from synapse.types import Requester
 from synapse.util.async_helpers import ReadWriteLock
 from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
@@ -373,10 +373,9 @@ class PaginationHandler:
                     # case "JOIN" would have been returned.
                     assert member_event_id
 
-                    leave_token_str = await self.store.get_topological_token_for_event(
+                    leave_token = await self.store.get_topological_token_for_event(
                         member_event_id
                     )
-                    leave_token = RoomStreamToken.parse(leave_token_str)
                     assert leave_token.topological is not None
 
                     if leave_token.topological < curr_topo:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 11bf146bed..836b3f381a 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1134,14 +1134,14 @@ class RoomEventSource:
                 events[:] = events[:limit]
 
             if events:
-                end_key = RoomStreamToken.parse(events[-1].internal_metadata.after)
+                end_key = events[-1].internal_metadata.after
             else:
                 end_key = to_key
 
         return (events, end_key)
 
     def get_current_key(self) -> RoomStreamToken:
-        return RoomStreamToken(None, self.store.get_room_max_stream_ordering())
+        return self.store.get_room_max_token()
 
     def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
         return self.store.get_room_events_max_id(room_id)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index e948efef2e..bfe2583002 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -519,7 +519,7 @@ class SyncHandler:
             if len(recents) > timeline_limit:
                 limited = True
                 recents = recents[-timeline_limit:]
-                room_key = RoomStreamToken.parse(recents[0].internal_metadata.before)
+                room_key = recents[0].internal_metadata.before
 
             prev_batch_token = now_token.copy_and_replace("room_key", room_key)
 
@@ -1595,16 +1595,24 @@ class SyncHandler:
 
             if leave_events:
                 leave_event = leave_events[-1]
-                leave_stream_token = await self.store.get_stream_token_for_event(
+                leave_position = await self.store.get_position_for_event(
                     leave_event.event_id
                 )
-                leave_token = since_token.copy_and_replace(
-                    "room_key", leave_stream_token
-                )
 
-                if since_token and since_token.is_after(leave_token):
+                # If the leave event happened before the since token then we
+                # bail.
+                if since_token and not leave_position.persisted_after(
+                    since_token.room_key
+                ):
                     continue
 
+                # We can safely convert the position of the leave event into a
+                # stream token as it'll only be used in the context of this
+                # room. (c.f. the docstring of `to_room_stream_token`).
+                leave_token = since_token.copy_and_replace(
+                    "room_key", leave_position.to_room_stream_token()
+                )
+
                 # If this is an out of band message, like a remote invite
                 # rejection, we include it in the recents batch. Otherwise, we
                 # let _load_filtered_recents handle fetching the correct