summary refs log tree commit diff
path: root/synapse/streams/config.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2014-08-26 18:57:46 +0100
committerErik Johnston <erik@matrix.org>2014-08-26 18:57:46 +0100
commit3a2a5b959cb1f56b26af32e1ad4c1db424279eb7 (patch)
tree5f5b515cdaa586f1f7f92337471dfdc2e0d4683d /synapse/streams/config.py
parentMerge branch 'develop' of github.com:matrix-org/synapse into stream_refactor (diff)
downloadsynapse-3a2a5b959cb1f56b26af32e1ad4c1db424279eb7.tar.xz
WIP: Completely change how event streaming and pagination work. This reflects the change in the underlying storage model.
Diffstat (limited to '')
-rw-r--r--synapse/streams/config.py (renamed from synapse/api/streams/__init__.py)59
1 files changed, 12 insertions, 47 deletions
diff --git a/synapse/api/streams/__init__.py b/synapse/streams/config.py

index 0ba4783ea2..b6ffbab1e7 100644 --- a/synapse/api/streams/__init__.py +++ b/synapse/streams/config.py
@@ -16,21 +16,25 @@ from synapse.api.errors import SynapseError from synapse.types import StreamToken +import logging + + +logger = logging.getLogger(__name__) + class PaginationConfig(object): """A configuration object which stores pagination parameters.""" def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): - self.from_tok = StreamToken(from_tok) if from_tok else None - self.to_tok = StreamToken(to_tok) if to_tok else None + self.from_token = StreamToken.from_string(from_tok) if from_tok else None + self.to_token = StreamToken.from_string(to_tok) if to_tok else None self.direction = 'f' if direction == 'f' else 'b' self.limit = int(limit) @classmethod def from_request(cls, request, raise_invalid_params=True): params = { - "from_tok": "END", "direction": 'f', } @@ -48,9 +52,14 @@ class PaginationConfig(object): elif raise_invalid_params: raise SynapseError(400, "%s parameter is invalid." % qp) + if "from_tok" in params and params["from_tok"] == "END": + # TODO (erikj): This is for compatibility only. + del params["from_tok"] + try: return PaginationConfig(**params) except: + logger.exception("Failed to create pagination config") raise SynapseError(400, "Invalid request.") def __str__(self): @@ -60,48 +69,4 @@ class PaginationConfig(object): ) % (self.from_tok, self.to_tok, self.direction, self.limit) -class PaginationStream(object): - - """ An interface for streaming data as chunks. """ - - TOK_END = "END" - - def get_chunk(self, config=None): - """ Return the next chunk in the stream. - - Args: - config (PaginationConfig): The config to aid which chunk to get. - Returns: - A dict containing the new start token "start", the new end token - "end" and the data "chunk" as a list. - """ - raise NotImplementedError() - - -class StreamData(object): - - """ An interface for obtaining streaming data from a table. """ - - def __init__(self, hs): - self.hs = hs - self.store = hs.get_datastore() - - def get_rows(self, user_id, from_pkey, to_pkey, limit, direction): - """ Get event stream data between the specified pkeys. - - Args: - user_id : The user's ID - from_pkey : The starting pkey. - to_pkey : The end pkey. May be -1 to mean "latest". - limit: The max number of results to return. - Returns: - A tuple containing the list of event stream data and the last pkey. - """ - raise NotImplementedError() - - def max_token(self): - """ Get the latest currently-valid token. - Returns: - The latest token.""" - raise NotImplementedError()