diff options
Diffstat (limited to 'synapse/api/streams/event.py')
-rw-r--r-- | synapse/api/streams/event.py | 92 |
1 files changed, 52 insertions, 40 deletions
diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py index 414b05be30..a5c8b2b31f 100644 --- a/synapse/api/streams/event.py +++ b/synapse/api/streams/event.py @@ -38,8 +38,8 @@ class EventsStreamData(StreamData): self.with_feedback = feedback @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - data, latest_ver = yield self.store.get_room_events_stream( + def get_rows(self, user_id, from_key, to_key, limit, direction): + data, latest_ver = yield self.store.get_room_events( user_id=user_id, from_key=from_key, to_key=to_key, @@ -70,6 +70,15 @@ class EventStream(PaginationStream): pagination_config.from_tok) pagination_config.to_tok = yield self.fix_token( pagination_config.to_tok) + + if ( + not pagination_config.to_tok + and pagination_config.direction == 'f' + ): + pagination_config.to_tok = yield self.get_current_max_token() + + logger.debug("pagination_config: %s", pagination_config) + defer.returnValue(pagination_config) @defer.inlineCallbacks @@ -81,39 +90,42 @@ class EventStream(PaginationStream): Returns: The fixed-up token, which may == token. """ - # replace TOK_START and TOK_END with 0_0_0 or -1_-1_-1 depending. - replacements = [ - (PaginationStream.TOK_START, "0"), - (PaginationStream.TOK_END, "-1") - ] - for magic_token, key in replacements: - if magic_token == token: - token = EventStream.SEPARATOR.join( - [key] * len(self.stream_data) - ) - - # replace -1 values with an actual pkey - token_segments = self._split_token(token) - for i, tok in enumerate(token_segments): - if tok == -1: - # add 1 to the max token because results are EXCLUSIVE from the - # latest version. - token_segments[i] = 1 + (yield self.stream_data[i].max_token()) - defer.returnValue(EventStream.SEPARATOR.join( - str(x) for x in token_segments - )) + if token == PaginationStream.TOK_END: + new_token = yield self.get_current_max_token() + + logger.debug("fix_token: From %s to %s", token, new_token) + + token = new_token + + defer.returnValue(token) @defer.inlineCallbacks - def get_chunk(self, config=None): + def get_current_max_token(self): + new_token_parts = [] + for s in self.stream_data: + mx = yield s.max_token() + new_token_parts.append(str(mx)) + + new_token = EventStream.SEPARATOR.join(new_token_parts) + + logger.debug("get_current_max_token: %s", new_token) + + defer.returnValue(new_token) + + @defer.inlineCallbacks + def get_chunk(self, config): # no support for limit on >1 streams, makes no sense. if config.limit and len(self.stream_data) > 1: raise EventStreamError( 400, "Limit not supported on multiplexed streams." ) - (chunk_data, next_tok) = yield self._get_chunk_data(config.from_tok, - config.to_tok, - config.limit) + chunk_data, next_tok = yield self._get_chunk_data( + config.from_tok, + config.to_tok, + config.limit, + config.direction, + ) defer.returnValue({ "chunk": chunk_data, @@ -122,7 +134,7 @@ class EventStream(PaginationStream): }) @defer.inlineCallbacks - def _get_chunk_data(self, from_tok, to_tok, limit): + def _get_chunk_data(self, from_tok, to_tok, limit, direction): """ Get event data between the two tokens. Tokens are SEPARATOR separated values representing pkey values of @@ -140,11 +152,12 @@ class EventStream(PaginationStream): EventStreamError if something went wrong. """ # sanity check - if (from_tok.count(EventStream.SEPARATOR) != - to_tok.count(EventStream.SEPARATOR) or - (from_tok.count(EventStream.SEPARATOR) + 1) != - len(self.stream_data)): - raise EventStreamError(400, "Token lengths don't match.") + if to_tok is not None: + if (from_tok.count(EventStream.SEPARATOR) != + to_tok.count(EventStream.SEPARATOR) or + (from_tok.count(EventStream.SEPARATOR) + 1) != + len(self.stream_data)): + raise EventStreamError(400, "Token lengths don't match.") chunk = [] next_ver = [] @@ -158,7 +171,7 @@ class EventStream(PaginationStream): continue (event_chunk, max_pkey) = yield self.stream_data[i].get_rows( - self.user_id, from_pkey, to_pkey, limit + self.user_id, from_pkey, to_pkey, limit, direction, ) chunk.extend([ @@ -177,9 +190,8 @@ class EventStream(PaginationStream): Returns: A list of ints. """ - segments = token.split(EventStream.SEPARATOR) - try: - int_segments = [int(x) for x in segments] - except ValueError: - raise EventStreamError(400, "Bad token: %s" % token) - return int_segments + if token: + segments = token.split(EventStream.SEPARATOR) + else: + segments = [None] * len(self.stream_data) + return segments |