summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/storage/stream.py108
-rw-r--r--synapse/types.py19
2 files changed, 84 insertions, 43 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index fb463c525a..387120090a 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -62,24 +62,25 @@ _TOPOLOGICAL_TOKEN = "topological"
 
 # Used as return values for pagination APIs
 _EventDictReturn = namedtuple("_EventDictReturn", (
-    "event_id", "topological_ordering", "stream_ordering",
+    "event_id", "chunk_id", "topological_ordering", "stream_ordering",
 ))
 
 
 def lower_bound(token, engine, inclusive=False):
     inclusive = "=" if inclusive else ""
-    if token.topological is None:
+    if token.chunk is None:
         return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering")
     else:
         if isinstance(engine, PostgresEngine):
             # Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
             # as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
             # use the later form when running against postgres.
-            return "((%d,%d) <%s (%s,%s))" % (
-                token.topological, token.stream, inclusive,
+            return "(chunk_id = %d AND (%d,%d) <%s (%s,%s))" % (
+                token.chunk, token.topological, token.stream, inclusive,
                 "topological_ordering", "stream_ordering",
             )
-        return "(%d < %s OR (%d = %s AND %d <%s %s))" % (
+        return "(chunk_id = %d AND (%d < %s OR (%d = %s AND %d <%s %s)))" % (
+            token.chunk,
             token.topological, "topological_ordering",
             token.topological, "topological_ordering",
             token.stream, inclusive, "stream_ordering",
@@ -88,18 +89,19 @@ def lower_bound(token, engine, inclusive=False):
 
 def upper_bound(token, engine, inclusive=True):
     inclusive = "=" if inclusive else ""
-    if token.topological is None:
+    if token.chunk is None:
         return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering")
     else:
         if isinstance(engine, PostgresEngine):
             # Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well
             # as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we
             # use the later form when running against postgres.
-            return "((%d,%d) >%s (%s,%s))" % (
-                token.topological, token.stream, inclusive,
+            return "(chunk_id = %d AND (%d,%d) >%s (%s,%s))" % (
+                token.chunk, token.topological, token.stream, inclusive,
                 "topological_ordering", "stream_ordering",
             )
-        return "(%d > %s OR (%d = %s AND %d >%s %s))" % (
+        return "(chunk_id = %d AND (%d > %s OR (%d = %s AND %d >%s %s)))" % (
+            token.chunk,
             token.topological, "topological_ordering",
             token.topological, "topological_ordering",
             token.stream, inclusive, "stream_ordering",
@@ -275,7 +277,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             ) % (order,)
             txn.execute(sql, (room_id, from_id, to_id, limit))
 
-            rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
+            rows = [_EventDictReturn(row[0], None, None, row[1]) for row in txn]
             return rows
 
         rows = yield self.runInteraction("get_room_events_stream_for_room", f)
@@ -325,7 +327,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             )
             txn.execute(sql, (user_id, from_id, to_id,))
 
-            rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
+            rows = [_EventDictReturn(row[0], None, None, row[1]) for row in txn]
 
             return rows
 
@@ -437,15 +439,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         `room_id` causes it to return the current room specific topological
         token.
         """
-        token = yield self.get_room_max_stream_ordering()
         if room_id is None:
-            defer.returnValue("s%d" % (token,))
+            token = yield self.get_room_max_stream_ordering()
+            defer.returnValue(str(RoomStreamToken(None, None, token)))
         else:
-            topo = yield self.runInteraction(
-                "_get_max_topological_txn", self._get_max_topological_txn,
+            token = yield self.runInteraction(
+                "get_room_events_max_id", self._get_topological_token_for_room_txn,
                 room_id,
             )
-            defer.returnValue("t%d-%d" % (topo, token))
+            if not token:
+                raise Exception("Server not in room")
+            defer.returnValue(str(token))
 
     def get_stream_token_for_event(self, event_id):
         """The stream token for an event
@@ -460,7 +464,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             table="events",
             keyvalues={"event_id": event_id},
             retcol="stream_ordering",
-        ).addCallback(lambda row: "s%d" % (row,))
+        ).addCallback(lambda row: str(RoomStreamToken(None, None, row)))
 
     def get_topological_token_for_event(self, event_id):
         """The stream token for an event
@@ -469,16 +473,34 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         Raises:
             StoreError if the event wasn't in the database.
         Returns:
-            A deferred "t%d-%d" topological token.
+            A deferred topological token.
         """
         return self._simple_select_one(
             table="events",
             keyvalues={"event_id": event_id},
-            retcols=("stream_ordering", "topological_ordering"),
+            retcols=("stream_ordering", "topological_ordering", "chunk_id"),
             desc="get_topological_token_for_event",
-        ).addCallback(lambda row: "t%d-%d" % (
-            row["topological_ordering"], row["stream_ordering"],)
-        )
+        ).addCallback(lambda row: str(RoomStreamToken(
+            row["chunk_id"],
+            row["topological_ordering"],
+            row["stream_ordering"],
+        )))
+
+    def _get_topological_token_for_room_txn(self, txn, room_id):
+        sql = """
+            SELECT chunk_id, topological_ordering, stream_ordering
+            FROM events
+            NATURAL JOIN event_forward_extremities
+            WHERE room_id = ?
+            ORDER BY stream_ordering DESC
+            LIMIT 1
+        """
+        txn.execute(sql, (room_id,))
+        row = txn.fetchone()
+        if row:
+            c, t, s = row
+            return RoomStreamToken(c, t, s)
+        return None
 
     def get_max_topological_token(self, room_id, stream_key):
         sql = (
@@ -515,18 +537,25 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 null topological_ordering.
         """
         for event, row in zip(events, rows):
+            chunk = row.chunk_id
+            topo = row.topological_ordering
             stream = row.stream_ordering
-            if topo_order and row.topological_ordering:
-                topo = row.topological_ordering
-            else:
-                topo = None
+
             internal = event.internal_metadata
-            internal.before = str(RoomStreamToken(topo, stream - 1))
-            internal.after = str(RoomStreamToken(topo, stream))
-            internal.order = (
-                int(topo) if topo else 0,
-                int(stream),
-            )
+            if topo_order and chunk:
+                internal.before = str(RoomStreamToken(chunk, topo, stream - 1))
+                internal.after = str(RoomStreamToken(chunk, topo, stream))
+                internal.order = (
+                    int(chunk) if chunk else 0,
+                    int(topo) if topo else 0,
+                    int(stream),
+                )
+            else:
+                internal.before = str(RoomStreamToken(None, None, stream - 1))
+                internal.after = str(RoomStreamToken(None, None, stream))
+                internal.order = (
+                    0, 0, int(stream),
+                )
 
     @defer.inlineCallbacks
     def get_events_around(self, room_id, event_id, before_limit, after_limit):
@@ -586,17 +615,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 "event_id": event_id,
                 "room_id": room_id,
             },
-            retcols=["stream_ordering", "topological_ordering"],
+            retcols=["stream_ordering", "topological_ordering", "chunk_id"],
         )
 
         # Paginating backwards includes the event at the token, but paginating
         # forward doesn't.
         before_token = RoomStreamToken(
-            results["topological_ordering"] - 1,
-            results["stream_ordering"],
+            results["chunk_id"],
+            results["topological_ordering"],
+            results["stream_ordering"] - 1,
         )
 
         after_token = RoomStreamToken(
+            results["chunk_id"],
             results["topological_ordering"],
             results["stream_ordering"],
         )
@@ -728,7 +759,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         args.append(int(limit))
 
         sql = (
-            "SELECT event_id, topological_ordering, stream_ordering"
+            "SELECT event_id, chunk_id, topological_ordering, stream_ordering"
             " FROM events"
             " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
             " ORDER BY topological_ordering %(order)s,"
@@ -743,6 +774,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
 
         if rows:
+            chunk = rows[-1].chunk_id
             topo = rows[-1].topological_ordering
             toke = rows[-1].stream_ordering
             if direction == 'b':
@@ -752,12 +784,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 # when we are going backwards so we subtract one from the
                 # stream part.
                 toke -= 1
-            next_token = RoomStreamToken(topo, toke)
+            next_token = RoomStreamToken(chunk, topo, toke)
         else:
             # TODO (erikj): We should work out what to do here instead.
             next_token = to_token if to_token else from_token
 
-        return rows, str(next_token),
+        return rows, str(next_token), iterated_chunks,
 
     @defer.inlineCallbacks
     def paginate_room_events(self, room_id, from_key, to_key=None,
diff --git a/synapse/types.py b/synapse/types.py
index cc7c182a78..27f204ce79 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -306,7 +306,7 @@ StreamToken.START = StreamToken(
 )
 
 
-class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
+class RoomStreamToken(namedtuple("_StreamToken", "chunk topological stream")):
     """Tokens are positions between events. The token "s1" comes after event 1.
 
             s0    s1
@@ -334,10 +334,17 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
     def parse(cls, string):
         try:
             if string[0] == 's':
-                return cls(topological=None, stream=int(string[1:]))
-            if string[0] == 't':
+                return cls(chunk=None, topological=None, stream=int(string[1:]))
+            if string[0] == 't':  # For backwards compat with older tokens.
                 parts = string[1:].split('-', 1)
-                return cls(topological=int(parts[0]), stream=int(parts[1]))
+                return cls(chunk=None, topological=int(parts[0]), stream=int(parts[1]))
+            if string[0] == 'c':
+                parts = string[1:].split('~', 2)
+                return cls(
+                    chunk=int(parts[0]),
+                    topological=int(parts[1]),
+                    stream=int(parts[2]),
+                )
         except Exception:
             pass
         raise SynapseError(400, "Invalid token %r" % (string,))
@@ -346,12 +353,14 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
     def parse_stream_token(cls, string):
         try:
             if string[0] == 's':
-                return cls(topological=None, stream=int(string[1:]))
+                return cls(chunk=None, topological=None, stream=int(string[1:]))
         except Exception:
             pass
         raise SynapseError(400, "Invalid token %r" % (string,))
 
     def __str__(self):
+        if self.chunk is not None:
+            return "c%d~%d~%d" % (self.chunk, self.topological, self.stream)
         if self.topological is not None:
             return "t%d-%d" % (self.topological, self.stream)
         else: