diff options
-rw-r--r-- | synapse/api/streams/__init__.py | 14 | ||||
-rw-r--r-- | synapse/types.py | 74 |
2 files changed, 83 insertions, 5 deletions
diff --git a/synapse/api/streams/__init__.py b/synapse/api/streams/__init__.py index d831eafbab..0ba4783ea2 100644 --- a/synapse/api/streams/__init__.py +++ b/synapse/api/streams/__init__.py @@ -14,6 +14,7 @@ # limitations under the License. from synapse.api.errors import SynapseError +from synapse.types import StreamToken class PaginationConfig(object): @@ -21,10 +22,10 @@ class PaginationConfig(object): """A configuration object which stores pagination parameters.""" def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): - self.from_tok = from_tok - self.to_tok = to_tok - self.direction = direction - self.limit = limit + self.from_tok = StreamToken(from_tok) if from_tok else None + self.to_tok = StreamToken(to_tok) if to_tok else None + self.direction = 'f' if direction == 'f' else 'b' + self.limit = int(limit) @classmethod def from_request(cls, request, raise_invalid_params=True): @@ -47,7 +48,10 @@ class PaginationConfig(object): elif raise_invalid_params: raise SynapseError(400, "%s parameter is invalid." % qp) - return PaginationConfig(**params) + try: + return PaginationConfig(**params) + except: + raise SynapseError(400, "Invalid request.") def __str__(self): return ( diff --git a/synapse/types.py b/synapse/types.py index fd6a3d1d72..baec8a6002 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -92,3 +92,77 @@ class RoomAlias(DomainSpecificString): class RoomID(DomainSpecificString): """Structure representing a room id. """ SIGIL = "!" + + +class StreamToken( + namedtuple( + "Token", + ("events_type", "topological_key", "stream_key", "presence_key") + ) +): + _SEPARATOR = "_" + + _TOPOLOGICAL_PREFIX = "t" + _STREAM_PREFIX = "s" + + _TOPOLOGICAL_SEPERATOR = "-" + + TOPOLOGICAL_TYPE = "topo" + STREAM_TYPE = "stream" + + @classmethod + def from_string(cls, string): + try: + events_part, presence_part = string.split(cls._SEPARATOR) + + presence_key = int(presence_part) + + topo_length = len(cls._TOPOLOGICAL_PREFIX) + stream_length = len(cls._STREAM_PREFIX) + if events_part[:topo_length] == cls._TOPOLOGICAL_PREFIX: + # topological event token + topo_tok = events_part[topo_length:] + topo_key, stream_key = topo_tok.split( + cls._TOPOLOGICAL_SEPERATOR, 1 + ) + + topo_key = int(topo_key) + stream_key = int(stream_key) + + events_type = cls.TOPOLOGICAL_TYPE + elif events_part[:stream_length] == cls._STREAM_PREFIX: + topo_key = None + stream_key = int(events_part[stream_length:]) + + events_type = cls.STREAM_TYPE + else: + raise + + return cls( + events_type=events_type, + topological_key=topo_key, + stream_key=stream_key, + presence_key=presence_key, + ) + except: + raise SynapseError(400, "Invalid Token") + + def to_string(self): + if self.events_type == self.TOPOLOGICAL_TYPE: + return "".join([ + self._TOPOLOGICAL_PREFIX, + str(self.topological_key), + self._TOPOLOGICAL_SEPERATOR, + str(self.stream_key), + self._SEPARATOR, + str(self.presence_key), + ]) + elif self.events_type == self.STREAM_TYPE: + return "".join([ + self._STREAM_PREFIX, + str(self.stream_key), + self._SEPARATOR, + str(self.presence_key), + ]) + + raise RuntimeError("Unrecognized event type: %s", self.events_type) |