diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 280d4ad605..b03fc67f71 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -37,11 +37,9 @@ from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.api.constants import EventTypes
-from synapse.api.errors import SynapseError
+from synapse.types import RoomStreamToken
from synapse.util.logutils import log_function
-from collections import namedtuple
-
import logging
@@ -55,76 +53,26 @@ _STREAM_TOKEN = "stream"
_TOPOLOGICAL_TOKEN = "topological"
-class _StreamToken(namedtuple("_StreamToken", "topological stream")):
- """Tokens are positions between events. The token "s1" comes after event 1.
-
- s0 s1
- | |
- [0] V [1] V [2]
-
- Tokens can either be a point in the live event stream or a cursor going
- through historic events.
-
- When traversing the live event stream events are ordered by when they
- arrived at the homeserver.
-
- When traversing historic events the events are ordered by their depth in
- the event graph "topological_ordering" and then by when they arrived at the
- homeserver "stream_ordering".
-
- Live tokens start with an "s" followed by the "stream_ordering" id of the
- event it comes after. Historic tokens start with a "t" followed by the
- "topological_ordering" id of the event it comes after, follewed by "-",
- followed by the "stream_ordering" id of the event it comes after.
- """
- __slots__ = []
-
- @classmethod
- def parse(cls, string):
- try:
- if string[0] == 's':
- return cls(topological=None, stream=int(string[1:]))
- if string[0] == 't':
- parts = string[1:].split('-', 1)
- return cls(topological=int(parts[0]), stream=int(parts[1]))
- except:
- pass
- raise SynapseError(400, "Invalid token %r" % (string,))
-
- @classmethod
- def parse_stream_token(cls, string):
- try:
- if string[0] == 's':
- return cls(topological=None, stream=int(string[1:]))
- except:
- pass
- raise SynapseError(400, "Invalid token %r" % (string,))
-
- def __str__(self):
- if self.topological is not None:
- return "t%d-%d" % (self.topological, self.stream)
- else:
- return "s%d" % (self.stream,)
+def lower_bound(token):
+ if token.topological is None:
+ return "(%d < %s)" % (token.stream, "stream_ordering")
+ else:
+ return "(%d < %s OR (%d = %s AND %d < %s))" % (
+ token.topological, "topological_ordering",
+ token.topological, "topological_ordering",
+ token.stream, "stream_ordering",
+ )
- def lower_bound(self):
- if self.topological is None:
- return "(%d < %s)" % (self.stream, "stream_ordering")
- else:
- return "(%d < %s OR (%d = %s AND %d < %s))" % (
- self.topological, "topological_ordering",
- self.topological, "topological_ordering",
- self.stream, "stream_ordering",
- )
- def upper_bound(self):
- if self.topological is None:
- return "(%d >= %s)" % (self.stream, "stream_ordering")
- else:
- return "(%d > %s OR (%d = %s AND %d >= %s))" % (
- self.topological, "topological_ordering",
- self.topological, "topological_ordering",
- self.stream, "stream_ordering",
- )
+def upper_bound(token):
+ if token.topological is None:
+ return "(%d >= %s)" % (token.stream, "stream_ordering")
+ else:
+ return "(%d > %s OR (%d = %s AND %d >= %s))" % (
+ token.topological, "topological_ordering",
+ token.topological, "topological_ordering",
+ token.stream, "stream_ordering",
+ )
class StreamStore(SQLBaseStore):
@@ -139,8 +87,8 @@ class StreamStore(SQLBaseStore):
limit = MAX_STREAM_SIZE
# From and to keys should be integers from ordering.
- from_id = _StreamToken.parse_stream_token(from_key)
- to_id = _StreamToken.parse_stream_token(to_key)
+ from_id = RoomStreamToken.parse_stream_token(from_key)
+ to_id = RoomStreamToken.parse_stream_token(to_key)
if from_key == to_key:
defer.returnValue(([], to_key))
@@ -234,8 +182,8 @@ class StreamStore(SQLBaseStore):
limit = MAX_STREAM_SIZE
# From and to keys should be integers from ordering.
- from_id = _StreamToken.parse_stream_token(from_key)
- to_id = _StreamToken.parse_stream_token(to_key)
+ from_id = RoomStreamToken.parse_stream_token(from_key)
+ to_id = RoomStreamToken.parse_stream_token(to_key)
if from_key == to_key:
return defer.succeed(([], to_key))
@@ -288,17 +236,17 @@ class StreamStore(SQLBaseStore):
args = [False, room_id]
if direction == 'b':
order = "DESC"
- bounds = _StreamToken.parse(from_key).upper_bound()
+ bounds = upper_bound(RoomStreamToken.parse(from_key))
if to_key:
bounds = "%s AND %s" % (
- bounds, _StreamToken.parse(to_key).lower_bound()
+ bounds, lower_bound(RoomStreamToken.parse(to_key))
)
else:
order = "ASC"
- bounds = _StreamToken.parse(from_key).lower_bound()
+ bounds = lower_bound(RoomStreamToken.parse(from_key))
if to_key:
bounds = "%s AND %s" % (
- bounds, _StreamToken.parse(to_key).upper_bound()
+ bounds, upper_bound(RoomStreamToken.parse(to_key))
)
if int(limit) > 0:
@@ -333,7 +281,7 @@ class StreamStore(SQLBaseStore):
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
- next_token = str(_StreamToken(topo, toke))
+ next_token = str(RoomStreamToken(topo, toke))
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_key if to_key else from_key
@@ -354,7 +302,7 @@ class StreamStore(SQLBaseStore):
with_feedback=False, from_token=None):
# TODO (erikj): Handle compressed feedback
- end_token = _StreamToken.parse_stream_token(end_token)
+ end_token = RoomStreamToken.parse_stream_token(end_token)
if from_token is None:
sql = (
@@ -365,7 +313,7 @@ class StreamStore(SQLBaseStore):
" LIMIT ?"
)
else:
- from_token = _StreamToken.parse_stream_token(from_token)
+ from_token = RoomStreamToken.parse_stream_token(from_token)
sql = (
"SELECT stream_ordering, topological_ordering, event_id"
" FROM events"
@@ -395,7 +343,7 @@ class StreamStore(SQLBaseStore):
# stream part.
topo = rows[0]["topological_ordering"]
toke = rows[0]["stream_ordering"] - 1
- start_token = str(_StreamToken(topo, toke))
+ start_token = str(RoomStreamToken(topo, toke))
token = (start_token, str(end_token))
else:
@@ -439,5 +387,5 @@ class StreamStore(SQLBaseStore):
stream = row["stream_ordering"]
topo = event.depth
internal = event.internal_metadata
- internal.before = str(_StreamToken(topo, stream - 1))
- internal.after = str(_StreamToken(topo, stream))
+ internal.before = str(RoomStreamToken(topo, stream - 1))
+ internal.after = str(RoomStreamToken(topo, stream))
diff --git a/synapse/types.py b/synapse/types.py
index f6a1b0bbcf..0f16867d75 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -121,4 +121,56 @@ class StreamToken(
return StreamToken(**d)
+class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
+ """Tokens are positions between events. The token "s1" comes after event 1.
+
+ s0 s1
+ | |
+ [0] V [1] V [2]
+
+ Tokens can either be a point in the live event stream or a cursor going
+ through historic events.
+
+ When traversing the live event stream events are ordered by when they
+ arrived at the homeserver.
+
+ When traversing historic events the events are ordered by their depth in
+ the event graph "topological_ordering" and then by when they arrived at the
+ homeserver "stream_ordering".
+
+ Live tokens start with an "s" followed by the "stream_ordering" id of the
+ event it comes after. Historic tokens start with a "t" followed by the
+ "topological_ordering" id of the event it comes after, follewed by "-",
+ followed by the "stream_ordering" id of the event it comes after.
+ """
+ __slots__ = []
+
+ @classmethod
+ def parse(cls, string):
+ try:
+ if string[0] == 's':
+ return cls(topological=None, stream=int(string[1:]))
+ if string[0] == 't':
+ parts = string[1:].split('-', 1)
+ return cls(topological=int(parts[0]), stream=int(parts[1]))
+ except:
+ pass
+ raise SynapseError(400, "Invalid token %r" % (string,))
+
+ @classmethod
+ def parse_stream_token(cls, string):
+ try:
+ if string[0] == 's':
+ return cls(topological=None, stream=int(string[1:]))
+ except:
+ pass
+ raise SynapseError(400, "Invalid token %r" % (string,))
+
+ def __str__(self):
+ if self.topological is not None:
+ return "t%d-%d" % (self.topological, self.stream)
+ else:
+ return "s%d" % (self.stream,)
+
+
ClientInfo = namedtuple("ClientInfo", ("device_id", "token_id"))
|