summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/api/streams/__init__.py14
-rw-r--r--synapse/types.py74
2 files changed, 83 insertions, 5 deletions
diff --git a/synapse/api/streams/__init__.py b/synapse/api/streams/__init__.py
index d831eafbab..0ba4783ea2 100644
--- a/synapse/api/streams/__init__.py
+++ b/synapse/api/streams/__init__.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from synapse.api.errors import SynapseError
+from synapse.types import StreamToken
 
 
 class PaginationConfig(object):
@@ -21,10 +22,10 @@ class PaginationConfig(object):
     """A configuration object which stores pagination parameters."""
 
     def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0):
-        self.from_tok = from_tok
-        self.to_tok = to_tok
-        self.direction = direction
-        self.limit = limit
+        self.from_tok = StreamToken(from_tok) if from_tok else None
+        self.to_tok = StreamToken(to_tok) if to_tok else None
+        self.direction = 'f' if direction == 'f' else 'b'
+        self.limit = int(limit)
 
     @classmethod
     def from_request(cls, request, raise_invalid_params=True):
@@ -47,7 +48,10 @@ class PaginationConfig(object):
                 elif raise_invalid_params:
                     raise SynapseError(400, "%s parameter is invalid." % qp)
 
-        return PaginationConfig(**params)
+        try:
+            return PaginationConfig(**params)
+        except:
+            raise SynapseError(400, "Invalid request.")
 
     def __str__(self):
         return (
diff --git a/synapse/types.py b/synapse/types.py
index fd6a3d1d72..baec8a6002 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -92,3 +92,77 @@ class RoomAlias(DomainSpecificString):
 class RoomID(DomainSpecificString):
     """Structure representing a room id. """
     SIGIL = "!"
+
+
+class StreamToken(
+    namedtuple(
+        "Token",
+        ("events_type", "topological_key", "stream_key", "presence_key")
+    )
+):
+    _SEPARATOR = "_"
+
+    _TOPOLOGICAL_PREFIX = "t"
+    _STREAM_PREFIX = "s"
+
+    _TOPOLOGICAL_SEPERATOR = "-"
+
+    TOPOLOGICAL_TYPE = "topo"
+    STREAM_TYPE = "stream"
+
+    @classmethod
+    def from_string(cls, string):
+        try:
+            events_part, presence_part = string.split(cls._SEPARATOR)
+
+            presence_key = int(presence_part)
+
+            topo_length = len(cls._TOPOLOGICAL_PREFIX)
+            stream_length = len(cls._STREAM_PREFIX)
+            if events_part[:topo_length] == cls._TOPOLOGICAL_PREFIX:
+                # topological event token
+                topo_tok = events_part[topo_length:]
+                topo_key, stream_key = topo_tok.split(
+                    cls._TOPOLOGICAL_SEPERATOR, 1
+                )
+
+                topo_key = int(topo_key)
+                stream_key = int(stream_key)
+
+                events_type = cls.TOPOLOGICAL_TYPE
+            elif events_part[:stream_length] == cls._STREAM_PREFIX:
+                topo_key = None
+                stream_key = int(events_part[stream_length:])
+
+                events_type = cls.STREAM_TYPE
+            else:
+                raise
+
+            return cls(
+                events_type=events_type,
+                topological_key=topo_key,
+                stream_key=stream_key,
+                presence_key=presence_key,
+            )
+        except:
+            raise SynapseError(400, "Invalid Token")
+
+    def to_string(self):
+        if self.events_type == self.TOPOLOGICAL_TYPE:
+            return "".join([
+                self._TOPOLOGICAL_PREFIX,
+                str(self.topological_key),
+                self._TOPOLOGICAL_SEPERATOR,
+                str(self.stream_key),
+                self._SEPARATOR,
+                str(self.presence_key),
+            ])
+        elif self.events_type == self.STREAM_TYPE:
+            return "".join([
+                self._STREAM_PREFIX,
+                str(self.stream_key),
+                self._SEPARATOR,
+                str(self.presence_key),
+            ])
+
+        raise RuntimeError("Unrecognized event type: %s", self.events_type)