summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/handlers/room.py5
-rw-r--r--synapse/storage/events.py9
-rw-r--r--synapse/storage/schema/full_schemas/16/im.sql5
-rw-r--r--synapse/storage/stream.py223
-rw-r--r--synapse/types.py41
6 files changed, 212 insertions, 73 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 81cff0870e..c350c93c7e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -235,7 +235,7 @@ class MessageHandler(BaseHandler):
                     room_id, max_topo
                 )
 
-            events, next_key = yield self.store.paginate_room_events(
+            events, next_key, extremities = yield self.store.paginate_room_events(
                 room_id=room_id,
                 from_key=source_config.from_key,
                 to_key=source_config.to_key,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index b5850db42f..870dbd3799 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -514,7 +514,8 @@ class RoomEventSource(object):
             events = list(room_events)
             events.extend(e for evs, _ in room_to_events.values() for e in evs)
 
-            events.sort(key=lambda e: e.internal_metadata.order)
+            # Order by the stream ordering of the events.
+            events.sort(key=lambda e: e.internal_metadata.stream_ordering)
 
             if limit:
                 events[:] = events[:limit]
@@ -534,7 +535,7 @@ class RoomEventSource(object):
 
     @defer.inlineCallbacks
     def get_pagination_rows(self, user, config, key):
-        events, next_key = yield self.store.paginate_room_events(
+        events, next_key, _ = yield self.store.paginate_room_events(
             room_id=key,
             from_key=config.from_key,
             to_key=config.to_key,
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5194c4a48d..d38f65b4e6 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1020,7 +1020,7 @@ class EventsStore(EventsWorkerStore):
                     }
                 )
 
-                chunk_id, _ = self._insert_into_chunk_txn(
+                chunk_id, topo = self._insert_into_chunk_txn(
                     txn, event.room_id, event.event_id,
                     [eid for eid, _ in event.prev_events],
                 )
@@ -1032,6 +1032,7 @@ class EventsStore(EventsWorkerStore):
                     updatevalues={
                         "outlier": False,
                         "chunk_id": chunk_id,
+                        "topological_ordering": topo,
                     },
                 )
 
@@ -1117,9 +1118,9 @@ class EventsStore(EventsWorkerStore):
 
         for event, _ in events_and_contexts:
             if event.internal_metadata.is_outlier():
-                chunk_id, _topo = None, 0
+                chunk_id, topo = None, 0
             else:
-                chunk_id, _topo = self._insert_into_chunk_txn(
+                chunk_id, topo = self._insert_into_chunk_txn(
                     txn, event.room_id, event.event_id,
                     [eid for eid, _ in event.prev_events],
                 )
@@ -1130,7 +1131,7 @@ class EventsStore(EventsWorkerStore):
                 values={
                     "stream_ordering": event.internal_metadata.stream_ordering,
                     "chunk_id": chunk_id,
-                    "topological_ordering": event.depth,
+                    "topological_ordering": topo,
                     "depth": event.depth,
                     "event_id": event.event_id,
                     "room_id": event.room_id,
diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql
index ba5346806e..4faf8562ad 100644
--- a/synapse/storage/schema/full_schemas/16/im.sql
+++ b/synapse/storage/schema/full_schemas/16/im.sql
@@ -14,7 +14,12 @@
  */
 
 CREATE TABLE IF NOT EXISTS events(
+    -- Defines an ordering used to stream new events to clients. Events
+    -- fetched via backfill have negative values.
     stream_ordering INTEGER PRIMARY KEY,
+    -- Defines a topological ordering of events within a chunk
+    -- (The concept of a chunk was added in later schemas, this used to
+    -- be set to the same value as the `depth` field in an event)
     topological_ordering BIGINT NOT NULL,
     event_id TEXT NOT NULL,
     type TEXT NOT NULL,
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index fb463c525a..0d32a3a498 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -41,6 +41,7 @@ from synapse.storage.events import EventsWorkerStore
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
 from synapse.storage.engines import PostgresEngine
 
 import abc
@@ -62,24 +63,25 @@ _TOPOLOGICAL_TOKEN = "topological"
 
 # Used as return values for pagination APIs
 _EventDictReturn = namedtuple("_EventDictReturn", (
-    "event_id", "topological_ordering", "stream_ordering",
+    "event_id", "chunk_id", "topological_ordering", "stream_ordering",
 ))
 
 
 def lower_bound(token, engine, inclusive=False):
     inclusive = "=" if inclusive else ""
-    if token.topological is None:
+    if token.chunk is None:
         return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering")
     else:
         if isinstance(engine, PostgresEngine):
             # Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
             # as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
             # use the later form when running against postgres.
-            return "((%d,%d) <%s (%s,%s))" % (
-                token.topological, token.stream, inclusive,
+            return "(chunk_id = %d AND (%d,%d) <%s (%s,%s))" % (
+                token.chunk, token.topological, token.stream, inclusive,
                 "topological_ordering", "stream_ordering",
             )
-        return "(%d < %s OR (%d = %s AND %d <%s %s))" % (
+        return "(chunk_id = %d AND (%d < %s OR (%d = %s AND %d <%s %s)))" % (
+            token.chunk,
             token.topological, "topological_ordering",
             token.topological, "topological_ordering",
             token.stream, inclusive, "stream_ordering",
@@ -88,18 +90,19 @@ def lower_bound(token, engine, inclusive=False):
 
 def upper_bound(token, engine, inclusive=True):
     inclusive = "=" if inclusive else ""
-    if token.topological is None:
+    if token.chunk is None:
         return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering")
     else:
         if isinstance(engine, PostgresEngine):
             # Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well
             # as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we
             # use the later form when running against postgres.
-            return "((%d,%d) >%s (%s,%s))" % (
-                token.topological, token.stream, inclusive,
+            return "(chunk_id = %d AND (%d,%d) >%s (%s,%s))" % (
+                token.chunk, token.topological, token.stream, inclusive,
                 "topological_ordering", "stream_ordering",
             )
-        return "(%d > %s OR (%d = %s AND %d >%s %s))" % (
+        return "(chunk_id = %d AND (%d > %s OR (%d = %s AND %d >%s %s)))" % (
+            token.chunk,
             token.topological, "topological_ordering",
             token.topological, "topological_ordering",
             token.stream, inclusive, "stream_ordering",
@@ -275,7 +278,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             ) % (order,)
             txn.execute(sql, (room_id, from_id, to_id, limit))
 
-            rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
+            rows = [_EventDictReturn(row[0], None, None, row[1]) for row in txn]
             return rows
 
         rows = yield self.runInteraction("get_room_events_stream_for_room", f)
@@ -325,7 +328,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             )
             txn.execute(sql, (user_id, from_id, to_id,))
 
-            rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
+            rows = [_EventDictReturn(row[0], None, None, row[1]) for row in txn]
 
             return rows
 
@@ -392,7 +395,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         end_token = RoomStreamToken.parse(end_token)
 
-        rows, token = yield self.runInteraction(
+        rows, token, _ = yield self.runInteraction(
             "get_recent_event_ids_for_room", self._paginate_room_events_txn,
             room_id, from_token=end_token, limit=limit,
         )
@@ -437,15 +440,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         `room_id` causes it to return the current room specific topological
         token.
         """
-        token = yield self.get_room_max_stream_ordering()
         if room_id is None:
-            defer.returnValue("s%d" % (token,))
+            token = yield self.get_room_max_stream_ordering()
+            defer.returnValue(str(RoomStreamToken(None, None, token)))
         else:
-            topo = yield self.runInteraction(
-                "_get_max_topological_txn", self._get_max_topological_txn,
+            token = yield self.runInteraction(
+                "get_room_events_max_id", self._get_topological_token_for_room_txn,
                 room_id,
             )
-            defer.returnValue("t%d-%d" % (topo, token))
+            if not token:
+                raise Exception("Server not in room")
+            defer.returnValue(str(token))
 
     def get_stream_token_for_event(self, event_id):
         """The stream token for an event
@@ -460,7 +465,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             table="events",
             keyvalues={"event_id": event_id},
             retcol="stream_ordering",
-        ).addCallback(lambda row: "s%d" % (row,))
+        ).addCallback(lambda row: str(RoomStreamToken(None, None, row)))
 
     def get_topological_token_for_event(self, event_id):
         """The stream token for an event
@@ -469,16 +474,34 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         Raises:
             StoreError if the event wasn't in the database.
         Returns:
-            A deferred "t%d-%d" topological token.
+            A deferred topological token.
         """
         return self._simple_select_one(
             table="events",
             keyvalues={"event_id": event_id},
-            retcols=("stream_ordering", "topological_ordering"),
+            retcols=("stream_ordering", "topological_ordering", "chunk_id"),
             desc="get_topological_token_for_event",
-        ).addCallback(lambda row: "t%d-%d" % (
-            row["topological_ordering"], row["stream_ordering"],)
-        )
+        ).addCallback(lambda row: str(RoomStreamToken(
+            row["chunk_id"],
+            row["topological_ordering"],
+            row["stream_ordering"],
+        )))
+
+    def _get_topological_token_for_room_txn(self, txn, room_id):
+        sql = """
+            SELECT chunk_id, topological_ordering, stream_ordering
+            FROM events
+            NATURAL JOIN event_forward_extremities
+            WHERE room_id = ?
+            ORDER BY stream_ordering DESC
+            LIMIT 1
+        """
+        txn.execute(sql, (room_id,))
+        row = txn.fetchone()
+        if row:
+            c, t, s = row
+            return RoomStreamToken(c, t, s)
+        return None
 
     def get_max_topological_token(self, room_id, stream_key):
         sql = (
@@ -515,18 +538,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 null topological_ordering.
         """
         for event, row in zip(events, rows):
+            chunk = row.chunk_id
+            topo = row.topological_ordering
             stream = row.stream_ordering
-            if topo_order and row.topological_ordering:
-                topo = row.topological_ordering
-            else:
-                topo = None
+
             internal = event.internal_metadata
-            internal.before = str(RoomStreamToken(topo, stream - 1))
-            internal.after = str(RoomStreamToken(topo, stream))
-            internal.order = (
-                int(topo) if topo else 0,
-                int(stream),
-            )
+
+            internal.stream_ordering = stream
+
+            if topo_order:
+                internal.before = str(RoomStreamToken(chunk, topo, stream - 1))
+                internal.after = str(RoomStreamToken(chunk, topo, stream))
+            else:
+                internal.before = str(RoomStreamToken(None, None, stream - 1))
+                internal.after = str(RoomStreamToken(None, None, stream))
 
     @defer.inlineCallbacks
     def get_events_around(self, room_id, event_id, before_limit, after_limit):
@@ -586,27 +611,29 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 "event_id": event_id,
                 "room_id": room_id,
             },
-            retcols=["stream_ordering", "topological_ordering"],
+            retcols=["stream_ordering", "topological_ordering", "chunk_id"],
         )
 
         # Paginating backwards includes the event at the token, but paginating
         # forward doesn't.
         before_token = RoomStreamToken(
-            results["topological_ordering"] - 1,
-            results["stream_ordering"],
+            results["chunk_id"],
+            results["topological_ordering"],
+            results["stream_ordering"] - 1,
         )
 
         after_token = RoomStreamToken(
+            results["chunk_id"],
             results["topological_ordering"],
             results["stream_ordering"],
         )
 
-        rows, start_token = self._paginate_room_events_txn(
+        rows, start_token, _ = self._paginate_room_events_txn(
             txn, room_id, before_token, direction='b', limit=before_limit,
         )
         events_before = [r.event_id for r in rows]
 
-        rows, end_token = self._paginate_room_events_txn(
+        rows, end_token, _ = self._paginate_room_events_txn(
             txn, room_id, after_token, direction='f', limit=after_limit,
         )
         events_after = [r.event_id for r in rows]
@@ -689,12 +716,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 those that match the filter.
 
         Returns:
-            Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
-            as a list of _EventDictReturn and a token that points to the end
-            of the result set.
+            Deferred[tuple[list[_EventDictReturn], str, list[int]]: Returns
+            the results as a list of _EventDictReturn, a token that points to
+            the end of the result set, and a list of chunks iterated over.
         """
 
-        assert int(limit) >= 0
+        limit = int(limit)  # Sometimes we are passed a string from somewhere
+        assert limit >= 0
+
+        # There are two modes of fetching events: by stream order or by
+        # topological order. This is determined by whether the from_token is a
+        # stream or topological token. If stream then we can simply do a select
+        # ordered by stream_ordering column. If topological, then we need to
+        # fetch events from one chunk at a time until we hit the limit.
 
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
@@ -725,10 +759,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             bounds += " AND " + filter_clause
             args.extend(filter_args)
 
-        args.append(int(limit))
+        args.append(limit)
 
         sql = (
-            "SELECT event_id, topological_ordering, stream_ordering"
+            "SELECT event_id, chunk_id, topological_ordering, stream_ordering"
             " FROM events"
             " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
             " ORDER BY topological_ordering %(order)s,"
@@ -740,9 +774,65 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         txn.execute(sql, args)
 
-        rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
+        rows = [_EventDictReturn(*row) for row in txn]
+
+        # If we are paginating topologically and we haven't hit the limit on
+        # number of events then we need to fetch events from the previous or
+        # next chunk.
+
+        iterated_chunks = []
+
+        chunk_id = None
+        if from_token.chunk:  # FIXME: may be topological but no chunk.
+            if rows:
+                chunk_id = rows[-1].chunk_id
+                iterated_chunks = [r.chunk_id for r in rows]
+            else:
+                chunk_id = from_token.chunk
+                iterated_chunks = [chunk_id]
+
+        table = ChunkDBOrderedListStore(
+            txn, room_id, self.clock,
+        )
+
+        if filter_clause:
+            filter_clause = "AND " + filter_clause
+
+        sql = (
+            "SELECT event_id, chunk_id, topological_ordering, stream_ordering"
+            " FROM events"
+            " WHERE outlier = ? AND room_id = ? %(filter_clause)s"
+            " ORDER BY topological_ordering %(order)s,"
+            " stream_ordering %(order)s LIMIT ?"
+        ) % {
+            "filter_clause": filter_clause,
+            "order": order,
+        }
+
+        args = [False, room_id] + filter_args + [limit]
+
+        while chunk_id and (limit <= 0 or len(rows) < limit):
+            if chunk_id not in iterated_chunks:
+                iterated_chunks.append(chunk_id)
+
+            if direction == 'b':
+                chunk_id = table.get_prev(chunk_id)
+            else:
+                chunk_id = table.get_next(chunk_id)
+
+            if chunk_id is None:
+                break
+
+            txn.execute(sql, args)
+            new_rows = [_EventDictReturn(*row) for row in txn]
+
+            rows.extend(new_rows)
+
+        # We may have inserted more rows than necessary in the loop above
+        rows = rows[:limit]
 
         if rows:
+            chunk = rows[-1].chunk_id
             topo = rows[-1].topological_ordering
             toke = rows[-1].stream_ordering
             if direction == 'b':
@@ -752,12 +842,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 # when we are going backwards so we subtract one from the
                 # stream part.
                 toke -= 1
-            next_token = RoomStreamToken(topo, toke)
+            next_token = RoomStreamToken(chunk, topo, toke)
         else:
             # TODO (erikj): We should work out what to do here instead.
             next_token = to_token if to_token else from_token
 
-        return rows, str(next_token),
+        return rows, str(next_token), iterated_chunks,
 
     @defer.inlineCallbacks
     def paginate_room_events(self, room_id, from_key, to_key=None,
@@ -777,18 +867,43 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 those that match the filter.
 
         Returns:
-            tuple[list[dict], str]: Returns the results as a list of dicts and
-            a token that points to the end of the result set. The dicts have
-            the keys "event_id", "topological_ordering" and "stream_orderign".
+            tuple[list[dict], str, list[str]]: Returns the results as a list of
+            dicts, a token that points to the end of the result set, and a list
+            of backwards extremities. The dicts have the keys "event_id",
+            "topological_ordering" and "stream_ordering".
         """
 
         from_key = RoomStreamToken.parse(from_key)
         if to_key:
             to_key = RoomStreamToken.parse(to_key)
 
-        rows, token = yield self.runInteraction(
-            "paginate_room_events", self._paginate_room_events_txn,
-            room_id, from_key, to_key, direction, limit, event_filter,
+        def _do_paginate_room_events(txn):
+            rows, token, chunks = self._paginate_room_events_txn(
+                txn, room_id, from_key, to_key, direction, limit, event_filter,
+            )
+
+            # We now fetch the extremities by fetching the extremities for
+            # each chunk we iterated over.
+            extremities = []
+            seen = set()
+            for chunk_id in chunks:
+                if chunk_id in seen:
+                    continue
+                seen.add(chunk_id)
+
+                event_ids = self._simple_select_onecol_txn(
+                    txn,
+                    table="chunk_backwards_extremities",
+                    keyvalues={"chunk_id": chunk_id},
+                    retcol="event_id"
+                )
+
+                extremities.extend(e for e in event_ids if e not in extremities)
+
+            return rows, token, extremities
+
+        rows, token, extremities = yield self.runInteraction(
+            "paginate_room_events", _do_paginate_room_events,
         )
 
         events = yield self._get_events(
@@ -798,7 +913,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         self._set_before_and_after(events, rows)
 
-        defer.returnValue((events, token))
+        defer.returnValue((events, token, extremities))
 
 
 class StreamStore(StreamWorkerStore):
diff --git a/synapse/types.py b/synapse/types.py
index cc7c182a78..1ff71aa4e2 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -306,7 +306,7 @@ StreamToken.START = StreamToken(
 )
 
 
-class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
+class RoomStreamToken(namedtuple("_StreamToken", ("chunk", "topological", "stream"))):
     """Tokens are positions between events. The token "s1" comes after event 1.
 
             s0    s1
@@ -319,14 +319,18 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
     When traversing the live event stream events are ordered by when they
     arrived at the homeserver.
 
-    When traversing historic events the events are ordered by their depth in
-    the event graph "topological_ordering" and then by when they arrived at the
-    homeserver "stream_ordering".
+    When traversing historic events the events are ordered by the topological
+    ordering of the room graph. This is done using event chunks and the
+    `topological_ordering` column.
 
-    Live tokens start with an "s" followed by the "stream_ordering" id of the
-    event it comes after. Historic tokens start with a "t" followed by the
-    "topological_ordering" id of the event it comes after, followed by "-",
-    followed by the "stream_ordering" id of the event it comes after.
+    Live tokens start with an 's' and include the stream_ordering of the event
+    it comes after. Historic tokens start with a 'c' and include the chunk ID,
+    topological ordering and stream ordering of the event it comes after.
+
+    (In previous versions, when chunks were not implemented, the historic tokens
+    started with 't' and included the topological and stream ordering. These
+    tokens can be roughly converted to the new format by looking up the chunk
+    and topological ordering of the event with the same stream ordering).
     """
     __slots__ = []
 
@@ -334,10 +338,19 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
     def parse(cls, string):
         try:
             if string[0] == 's':
-                return cls(topological=None, stream=int(string[1:]))
-            if string[0] == 't':
+                return cls(chunk=None, topological=None, stream=int(string[1:]))
+            if string[0] == 't':  # For backwards compat with older tokens.
                 parts = string[1:].split('-', 1)
-                return cls(topological=int(parts[0]), stream=int(parts[1]))
+                return cls(chunk=None, topological=int(parts[0]), stream=int(parts[1]))
+            if string[0] == 'c':
+                # We use '~' as both stream ordering and topological ordering
+                # can be negative, so we can't use '-'
+                parts = string[1:].split('~', 2)
+                return cls(
+                    chunk=int(parts[0]),
+                    topological=int(parts[1]),
+                    stream=int(parts[2]),
+                )
         except Exception:
             pass
         raise SynapseError(400, "Invalid token %r" % (string,))
@@ -346,12 +359,16 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
     def parse_stream_token(cls, string):
         try:
             if string[0] == 's':
-                return cls(topological=None, stream=int(string[1:]))
+                return cls(chunk=None, topological=None, stream=int(string[1:]))
         except Exception:
             pass
         raise SynapseError(400, "Invalid token %r" % (string,))
 
     def __str__(self):
+        if self.chunk is not None:
+            # We use '~' as both stream ordering and topological ordering
+            # can be negative, so we can't use '-'
+            return "c%d~%d~%d" % (self.chunk, self.topological, self.stream)
         if self.topological is not None:
             return "t%d-%d" % (self.topological, self.stream)
         else: