summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r--synapse/storage/stream.py76
1 files changed, 76 insertions, 0 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py

index 0d32a3a498..00da329924 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py
@@ -915,6 +915,82 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): defer.returnValue((events, token, extremities)) + def clamp_token_before(self, room_id, token_str, clamp_to_token_str): + """For a given room returns the given token if its before + clamp_to, otherwise returns clamp_to. + + Args: + room_id (str) + token_str (str) + clamp_to_token_str(str): Must be topological token + + Returns: + Deferred[str] + """ + + token = RoomStreamToken.parse(token_str) + clamp_to_token = RoomStreamToken.parse(clamp_to_token_str) + + def clamp_token_before_txn(txn, token): + # If we're given a stream ordering, convert to topological token + if not token.chunk: + row = self._simple_select_one_txn( + txn, + table="events", + keyvalues={ + "stream_ordering": token.stream, + }, + retcols=("chunk_id", "topological_ordering", "stream_ordering",), + ) + token = RoomStreamToken(*row) + + # If both tokens have chunk_ids, we can use that. + if token.chunk and clamp_to_token.chunk: + if token.chunk == clamp_to_token.chunk: + if token.topological < clamp_to_token.topological: + return token_str + else: + return clamp_to_token_str + + table = ChunkDBOrderedListStore( + txn, room_id, self.clock, + ) + + if table.is_before(token.chunk, clamp_to_token.chunk): + return token_str + else: + return clamp_to_token_str + + # Ok, so we're dealing with events that haven't been chunked yet, + # lets just cheat and fallback to depth. + + token_depth = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={ + "stream_ordering": token.stream, + }, + retcol="depth", + ) + + clamp_depth = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={ + "stream_ordering": clamp_to_token.stream, + }, + retcol="depth", + ) + + if token_depth < clamp_depth: + return token_str + else: + return clamp_to_token_str + + return self.runInteraction( + "clamp_token_before", clamp_token_before_txn, token + ) + class StreamStore(StreamWorkerStore): def get_room_max_stream_ordering(self):