diff options
-rw-r--r-- | synapse/handlers/message.py | 22 | ||||
-rw-r--r-- | synapse/storage/stream.py | 76 |
2 files changed, 81 insertions, 17 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c350c93c7e..ca9c48cabc 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -211,29 +211,17 @@ class MessageHandler(BaseHandler): ) if source_config.direction == 'b': - # if we're going backwards, we might need to backfill. This - # requires that we have a topo token. - if room_token.topological: - max_topo = room_token.topological - else: - max_topo = yield self.store.get_max_topological_token( - room_id, room_token.stream - ) - if membership == Membership.LEAVE: # If they have left the room then clamp the token to be before # they left the room, to save the effort of loading from the # database. + leave_token = yield self.store.get_topological_token_for_event( - member_event_id + member_event_id, + ) + source_config.from_key = yield self.store.clamp_token_before( + room_id, source_config.from_key, leave_token, ) - leave_token = RoomStreamToken.parse(leave_token) - if leave_token.topological < max_topo: - source_config.from_key = str(leave_token) - - yield self.hs.get_handlers().federation_handler.maybe_backfill( - room_id, max_topo - ) events, next_key, extremities = yield self.store.paginate_room_events( room_id=room_id, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 0d32a3a498..00da329924 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -915,6 +915,82 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): defer.returnValue((events, token, extremities)) + def clamp_token_before(self, room_id, token_str, clamp_to_token_str): + """For a given room returns the given token if its before + clamp_to, otherwise returns clamp_to. + + Args: + room_id (str) + token_str (str) + clamp_to_token_str(str): Must be topological token + + Returns: + Deferred[str] + """ + + token = RoomStreamToken.parse(token_str) + clamp_to_token = RoomStreamToken.parse(clamp_to_token_str) + + def clamp_token_before_txn(txn, token): + # If we're given a stream ordering, convert to topological token + if not token.chunk: + row = self._simple_select_one_txn( + txn, + table="events", + keyvalues={ + "stream_ordering": token.stream, + }, + retcols=("chunk_id", "topological_ordering", "stream_ordering",), + ) + token = RoomStreamToken(*row) + + # If both tokens have chunk_ids, we can use that. + if token.chunk and clamp_to_token.chunk: + if token.chunk == clamp_to_token.chunk: + if token.topological < clamp_to_token.topological: + return token_str + else: + return clamp_to_token_str + + table = ChunkDBOrderedListStore( + txn, room_id, self.clock, + ) + + if table.is_before(token.chunk, clamp_to_token.chunk): + return token_str + else: + return clamp_to_token_str + + # Ok, so we're dealing with events that haven't been chunked yet, + # lets just cheat and fallback to depth. + + token_depth = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={ + "stream_ordering": token.stream, + }, + retcol="depth", + ) + + clamp_depth = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={ + "stream_ordering": clamp_to_token.stream, + }, + retcol="depth", + ) + + if token_depth < clamp_depth: + return token_str + else: + return clamp_to_token_str + + return self.runInteraction( + "clamp_token_before", clamp_token_before_txn, token + ) + class StreamStore(StreamWorkerStore): def get_room_max_stream_ordering(self): |