diff options
Diffstat (limited to 'synapse/api/streams')
-rw-r--r-- | synapse/api/streams/__init__.py | 19 | ||||
-rw-r--r-- | synapse/api/streams/event.py | 92 |
2 files changed, 64 insertions, 47 deletions
diff --git a/synapse/api/streams/__init__.py b/synapse/api/streams/__init__.py index 989e63f9ec..44f4cc6078 100644 --- a/synapse/api/streams/__init__.py +++ b/synapse/api/streams/__init__.py @@ -20,23 +20,23 @@ class PaginationConfig(object): """A configuration object which stores pagination parameters.""" - def __init__(self, from_tok=None, to_tok=None, limit=0): + def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): self.from_tok = from_tok self.to_tok = to_tok + self.direction = direction self.limit = limit @classmethod def from_request(cls, request, raise_invalid_params=True): params = { - "from_tok": PaginationStream.TOK_START, - "to_tok": PaginationStream.TOK_END, - "limit": 0 + "direction": 'f', } query_param_mappings = [ # 3-tuple of qp_key, attribute, rules ("from", "from_tok", lambda x: type(x) == str), ("to", "to_tok", lambda x: type(x) == str), - ("limit", "limit", lambda x: x.isdigit()) + ("limit", "limit", lambda x: x.isdigit()), + ("dir", "direction", lambda x: x == 'f' or x == 'b'), ] for qp, attr, is_valid in query_param_mappings: @@ -48,12 +48,17 @@ class PaginationConfig(object): return PaginationConfig(**params) + def __str__(self): + return ( + "<PaginationConfig from_tok=%s, to_tok=%s, " + "direction=%s, limit=%s>" + ) % (self.from_tok, self.to_tok, self.direction, self.limit) + class PaginationStream(object): """ An interface for streaming data as chunks. """ - TOK_START = "START" TOK_END = "END" def get_chunk(self, config=None): @@ -76,7 +81,7 @@ class StreamData(object): self.hs = hs self.store = hs.get_datastore() - def get_rows(self, user_id, from_pkey, to_pkey, limit): + def get_rows(self, user_id, from_pkey, to_pkey, limit, direction): """ Get event stream data between the specified pkeys. Args: diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py index 414b05be30..a5c8b2b31f 100644 --- a/synapse/api/streams/event.py +++ b/synapse/api/streams/event.py @@ -38,8 +38,8 @@ class EventsStreamData(StreamData): self.with_feedback = feedback @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - data, latest_ver = yield self.store.get_room_events_stream( + def get_rows(self, user_id, from_key, to_key, limit, direction): + data, latest_ver = yield self.store.get_room_events( user_id=user_id, from_key=from_key, to_key=to_key, @@ -70,6 +70,15 @@ class EventStream(PaginationStream): pagination_config.from_tok) pagination_config.to_tok = yield self.fix_token( pagination_config.to_tok) + + if ( + not pagination_config.to_tok + and pagination_config.direction == 'f' + ): + pagination_config.to_tok = yield self.get_current_max_token() + + logger.debug("pagination_config: %s", pagination_config) + defer.returnValue(pagination_config) @defer.inlineCallbacks @@ -81,39 +90,42 @@ class EventStream(PaginationStream): Returns: The fixed-up token, which may == token. """ - # replace TOK_START and TOK_END with 0_0_0 or -1_-1_-1 depending. - replacements = [ - (PaginationStream.TOK_START, "0"), - (PaginationStream.TOK_END, "-1") - ] - for magic_token, key in replacements: - if magic_token == token: - token = EventStream.SEPARATOR.join( - [key] * len(self.stream_data) - ) - - # replace -1 values with an actual pkey - token_segments = self._split_token(token) - for i, tok in enumerate(token_segments): - if tok == -1: - # add 1 to the max token because results are EXCLUSIVE from the - # latest version. - token_segments[i] = 1 + (yield self.stream_data[i].max_token()) - defer.returnValue(EventStream.SEPARATOR.join( - str(x) for x in token_segments - )) + if token == PaginationStream.TOK_END: + new_token = yield self.get_current_max_token() + + logger.debug("fix_token: From %s to %s", token, new_token) + + token = new_token + + defer.returnValue(token) @defer.inlineCallbacks - def get_chunk(self, config=None): + def get_current_max_token(self): + new_token_parts = [] + for s in self.stream_data: + mx = yield s.max_token() + new_token_parts.append(str(mx)) + + new_token = EventStream.SEPARATOR.join(new_token_parts) + + logger.debug("get_current_max_token: %s", new_token) + + defer.returnValue(new_token) + + @defer.inlineCallbacks + def get_chunk(self, config): # no support for limit on >1 streams, makes no sense. if config.limit and len(self.stream_data) > 1: raise EventStreamError( 400, "Limit not supported on multiplexed streams." ) - (chunk_data, next_tok) = yield self._get_chunk_data(config.from_tok, - config.to_tok, - config.limit) + chunk_data, next_tok = yield self._get_chunk_data( + config.from_tok, + config.to_tok, + config.limit, + config.direction, + ) defer.returnValue({ "chunk": chunk_data, @@ -122,7 +134,7 @@ class EventStream(PaginationStream): }) @defer.inlineCallbacks - def _get_chunk_data(self, from_tok, to_tok, limit): + def _get_chunk_data(self, from_tok, to_tok, limit, direction): """ Get event data between the two tokens. Tokens are SEPARATOR separated values representing pkey values of @@ -140,11 +152,12 @@ class EventStream(PaginationStream): EventStreamError if something went wrong. """ # sanity check - if (from_tok.count(EventStream.SEPARATOR) != - to_tok.count(EventStream.SEPARATOR) or - (from_tok.count(EventStream.SEPARATOR) + 1) != - len(self.stream_data)): - raise EventStreamError(400, "Token lengths don't match.") + if to_tok is not None: + if (from_tok.count(EventStream.SEPARATOR) != + to_tok.count(EventStream.SEPARATOR) or + (from_tok.count(EventStream.SEPARATOR) + 1) != + len(self.stream_data)): + raise EventStreamError(400, "Token lengths don't match.") chunk = [] next_ver = [] @@ -158,7 +171,7 @@ class EventStream(PaginationStream): continue (event_chunk, max_pkey) = yield self.stream_data[i].get_rows( - self.user_id, from_pkey, to_pkey, limit + self.user_id, from_pkey, to_pkey, limit, direction, ) chunk.extend([ @@ -177,9 +190,8 @@ class EventStream(PaginationStream): Returns: A list of ints. """ - segments = token.split(EventStream.SEPARATOR) - try: - int_segments = [int(x) for x in segments] - except ValueError: - raise EventStreamError(400, "Bad token: %s" % token) - return int_segments + if token: + segments = token.split(EventStream.SEPARATOR) + else: + segments = [None] * len(self.stream_data) + return segments |