diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 0d32a3a498..00da329924 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -915,6 +915,82 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue((events, token, extremities))
+ def clamp_token_before(self, room_id, token_str, clamp_to_token_str):
+ """For a given room returns the given token if its before
+ clamp_to, otherwise returns clamp_to.
+
+ Args:
+ room_id (str)
+ token_str (str)
+ clamp_to_token_str(str): Must be topological token
+
+ Returns:
+ Deferred[str]
+ """
+
+ token = RoomStreamToken.parse(token_str)
+ clamp_to_token = RoomStreamToken.parse(clamp_to_token_str)
+
+ def clamp_token_before_txn(txn, token):
+ # If we're given a stream ordering, convert to topological token
+ if not token.chunk:
+ row = self._simple_select_one_txn(
+ txn,
+ table="events",
+ keyvalues={
+ "stream_ordering": token.stream,
+ },
+ retcols=("chunk_id", "topological_ordering", "stream_ordering",),
+ )
+ token = RoomStreamToken(*row)
+
+ # If both tokens have chunk_ids, we can use that.
+ if token.chunk and clamp_to_token.chunk:
+ if token.chunk == clamp_to_token.chunk:
+ if token.topological < clamp_to_token.topological:
+ return token_str
+ else:
+ return clamp_to_token_str
+
+ table = ChunkDBOrderedListStore(
+ txn, room_id, self.clock,
+ )
+
+ if table.is_before(token.chunk, clamp_to_token.chunk):
+ return token_str
+ else:
+ return clamp_to_token_str
+
+ # Ok, so we're dealing with events that haven't been chunked yet,
+ # lets just cheat and fallback to depth.
+
+ token_depth = self._simple_select_one_onecol_txn(
+ txn,
+ table="events",
+ keyvalues={
+ "stream_ordering": token.stream,
+ },
+ retcol="depth",
+ )
+
+ clamp_depth = self._simple_select_one_onecol_txn(
+ txn,
+ table="events",
+ keyvalues={
+ "stream_ordering": clamp_to_token.stream,
+ },
+ retcol="depth",
+ )
+
+ if token_depth < clamp_depth:
+ return token_str
+ else:
+ return clamp_to_token_str
+
+ return self.runInteraction(
+ "clamp_token_before", clamp_token_before_txn, token
+ )
+
class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self):
|