summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/storage/chunk_ordered_table.py474
-rw-r--r--synapse/storage/events.py2
-rw-r--r--synapse/storage/schema/delta/49/event_chunks.py47
-rw-r--r--synapse/storage/stream.py2
-rw-r--r--tests/storage/test_chunk_linearizer_table.py153
-rw-r--r--tests/util/test_katriel_bodlaender.py26
6 files changed, 568 insertions, 136 deletions
diff --git a/synapse/storage/chunk_ordered_table.py b/synapse/storage/chunk_ordered_table.py
index 79d0ca44ec..5e552ef138 100644
--- a/synapse/storage/chunk_ordered_table.py
+++ b/synapse/storage/chunk_ordered_table.py
@@ -16,9 +16,11 @@
 import math
 import logging
 
+from fractions import Fraction
+
 from synapse.storage._base import SQLBaseStore
+from synapse.storage.engines import PostgresEngine
 from synapse.util.katriel_bodlaender import OrderedListStore
-from synapse.util.metrics import Measure
 
 import synapse.metrics
 
@@ -59,11 +61,12 @@ class ChunkDBOrderedListStore(OrderedListStore):
     re-instantiated in each transaction, so all state needs to be stored
     in the database.
 
-    Internally the ordering is implemented using floats, and the average is
-    taken when a node is inserted between other nodes. To avoid precision
-    errors a minimum difference between sucessive orderings is attempted to be
-    kept; whenever the difference is too small we attempt to rebalance. See
-    the `_rebalance` function for implementation details.
+    Internally the ordering is implemented using a linked list and assigning
+    each chunk a fraction. `get_next` and `get_prev` are implemented via linked
+    lists, and comparisons implemented using the fractions. When inserting
+    chunks fractions are picked such that their denominator is the smallest
+    possible. However, if the denominators grow too big then a rebalancing has
+    to take place to reduce the denominators; see `_rebalance` for details.
 
     Note that OrderedListStore orders nodes such that source of an edge
     comes before the target. This is counter intuitive when edges represent
@@ -80,23 +83,24 @@ class ChunkDBOrderedListStore(OrderedListStore):
         txn
         room_id (str)
         clock
-        rebalance_digits (int): When a rebalance is triggered we rebalance
-            in a range around the node, where the bounds are rounded to this
-            number of digits.
-        min_difference (int): A rebalance is triggered when the difference
-            between two successive orderings is less than the reciprocal of
-            this.
+        database_engine
+        rebalance_max_denominator (int): When a rebalance is triggered we
+            replace existing orders with those that have a denominator smaller
+            or equal to this
+        max_denominator (int): A rebalance is triggered when a node has an
+            ordering with a denominator greater than this
     """
     def __init__(self,
-                 txn, room_id, clock,
-                 rebalance_digits=3,
-                 min_difference=1000000):
+                 txn, room_id, clock, database_engine,
+                 rebalance_max_denominator=100,
+                 max_denominator=100000):
         self.txn = txn
         self.room_id = room_id
         self.clock = clock
+        self.database_engine = database_engine
 
-        self.rebalance_digits = rebalance_digits
-        self.min_difference = 1. / min_difference
+        self.rebalance_md = rebalance_max_denominator
+        self.max_denominator = max_denominator
 
     def is_before(self, a, b):
         """Implements OrderedListStore"""
@@ -104,16 +108,13 @@ class ChunkDBOrderedListStore(OrderedListStore):
 
     def get_prev(self, node_id):
         """Implements OrderedListStore"""
-        order = self._get_order(node_id)
 
         sql = """
             SELECT chunk_id FROM chunk_linearized
-            WHERE ordering < ? AND room_id = ?
-            ORDER BY ordering DESC
-            LIMIT 1
+            WHERE next_chunk_id = ?
         """
 
-        self.txn.execute(sql, (order, self.room_id,))
+        self.txn.execute(sql, (node_id,))
 
         row = self.txn.fetchone()
         if row:
@@ -122,16 +123,13 @@ class ChunkDBOrderedListStore(OrderedListStore):
 
     def get_next(self, node_id):
         """Implements OrderedListStore"""
-        order = self._get_order(node_id)
 
         sql = """
-            SELECT chunk_id FROM chunk_linearized
-            WHERE ordering > ? AND room_id = ?
-            ORDER BY ordering ASC
-            LIMIT 1
+            SELECT next_chunk_id FROM chunk_linearized
+            WHERE chunk_id = ?
         """
 
-        self.txn.execute(sql, (order, self.room_id,))
+        self.txn.execute(sql, (node_id,))
 
         row = self.txn.fetchone()
         if row:
@@ -144,27 +142,26 @@ class ChunkDBOrderedListStore(OrderedListStore):
         rebalance = False  # Set to true if we need to trigger a rebalance
 
         if target_id:
-            target_order = self._get_order(target_id)
             before_id = self.get_prev(target_id)
-
             if before_id:
-                before_order = self._get_order(before_id)
-                new_order = (target_order + before_order) / 2.
-
-                rebalance = math.fabs(target_order - before_order) < self.min_difference
+                new_order = self._insert_between(node_id, before_id, target_id)
             else:
-                new_order = math.floor(target_order) - 1
+                new_order = self._insert_at_start(node_id, target_id)
         else:
             # If target_id is None then we insert at the end.
             self.txn.execute("""
-                SELECT COALESCE(MAX(ordering), 0) + 1
+                SELECT chunk_id
                 FROM chunk_linearized
-                WHERE room_id = ?
+                WHERE room_id = ? AND next_chunk_id is NULL
             """, (self.room_id,))
 
-            new_order, = self.txn.fetchone()
+            row = self.txn.fetchone()
+            if row:
+                new_order = self._insert_at_end(node_id, row[0])
+            else:
+                new_order = self._insert_first(node_id)
 
-        self._insert(node_id, new_order)
+        rebalance = new_order.denominator > self.max_denominator
 
         if rebalance:
             self._rebalance(node_id)
@@ -174,64 +171,204 @@ class ChunkDBOrderedListStore(OrderedListStore):
 
         rebalance = False  # Set to true if we need to trigger a rebalance
 
+        next_chunk_id = None
         if target_id:
-            target_order = self._get_order(target_id)
-            after_id = self.get_next(target_id)
-            if after_id:
-                after_order = self._get_order(after_id)
-                new_order = (target_order + after_order) / 2.
-
-                rebalance = math.fabs(target_order - after_order) < self.min_difference
+            next_chunk_id = self.get_next(target_id)
+            if next_chunk_id:
+                new_order = self._insert_between(node_id, target_id, next_chunk_id)
             else:
-                new_order = math.ceil(target_order) + 1
+                new_order = self._insert_at_end(node_id, target_id)
         else:
             # If target_id is None then we insert at the start.
             self.txn.execute("""
-                SELECT COALESCE(MIN(ordering), 0) - 1
+                SELECT chunk_id
                 FROM chunk_linearized
+                NATURAL JOIN chunk_linearized_first
                 WHERE room_id = ?
             """, (self.room_id,))
 
-            new_order, = self.txn.fetchone()
+            row = self.txn.fetchone()
+            if row:
+                new_order = self._insert_at_start(node_id, row[0])
+            else:
+                new_order = self._insert_first(node_id)
 
-        self._insert(node_id, new_order)
+        rebalance = new_order.denominator > self.max_denominator
 
         if rebalance:
             self._rebalance(node_id)
 
+    def _insert_between(self, node_id, left_id, right_id):
+        """Inserts node between given existing nodes.
+        """
+
+        left_order = self._get_order(left_id)
+        right_order = self._get_order(right_id)
+
+        assert left_order < right_order
+
+        new_order = get_fraction_in_range(left_order, right_order)
+
+        SQLBaseStore._simple_update_one_txn(
+            self.txn,
+            table="chunk_linearized",
+            keyvalues={"chunk_id": left_id},
+            updatevalues={"next_chunk_id": node_id},
+        )
+
+        SQLBaseStore._simple_insert_txn(
+            self.txn,
+            table="chunk_linearized",
+            values={
+                "chunk_id": node_id,
+                "room_id": self.room_id,
+                "next_chunk_id": right_id,
+                "numerator": int(new_order.numerator),
+                "denominator": int(new_order.denominator),
+            }
+        )
+
+        return new_order
+
+    def _insert_at_end(self, node_id, last_id):
+        """Inserts node at the end using existing last node.
+        """
+
+        last_order = self._get_order(last_id)
+        new_order = Fraction(int(math.ceil(last_order)) + 1, 1)
+
+        SQLBaseStore._simple_update_one_txn(
+            self.txn,
+            table="chunk_linearized",
+            keyvalues={"chunk_id": last_id},
+            updatevalues={"next_chunk_id": node_id},
+        )
+
+        SQLBaseStore._simple_insert_txn(
+            self.txn,
+            table="chunk_linearized",
+            values={
+                "chunk_id": node_id,
+                "room_id": self.room_id,
+                "next_chunk_id": None,
+                "numerator": int(new_order.numerator),
+                "denominator": int(new_order.denominator),
+            }
+        )
+
+        return new_order
+
+    def _insert_at_start(self, node_id, first_id):
+        """Inserts node at the start using existing first node.
+        """
+
+        first_order = self._get_order(first_id)
+        new_order = get_fraction_in_range(0, first_order)
+
+        SQLBaseStore._simple_update_one_txn(
+            self.txn,
+            table="chunk_linearized_first",
+            keyvalues={"room_id": self.room_id},
+            updatevalues={"chunk_id": node_id},
+        )
+
+        SQLBaseStore._simple_insert_txn(
+            self.txn,
+            table="chunk_linearized",
+            values={
+                "chunk_id": node_id,
+                "room_id": self.room_id,
+                "next_chunk_id": first_id,
+                "numerator": int(new_order.numerator),
+                "denominator": int(new_order.denominator),
+            }
+        )
+
+        return new_order
+
+    def _insert_first(self, node_id):
+        """Inserts the first node for this room.
+        """
+
+        SQLBaseStore._simple_insert_txn(
+            self.txn,
+            table="chunk_linearized_first",
+            values={
+                "room_id": self.room_id,
+                "chunk_id": node_id,
+            },
+        )
+
+        SQLBaseStore._simple_insert_txn(
+            self.txn,
+            table="chunk_linearized",
+            values={
+                "chunk_id": node_id,
+                "room_id": self.room_id,
+                "next_chunk_id": None,
+                "numerator": 1,
+                "denominator": 1,
+            }
+        )
+
+        return Fraction(1, 1)
+
     def get_nodes_with_edges_to(self, node_id):
         """Implements OrderedListStore"""
 
         # Note that we use the inverse relation here
         sql = """
-            SELECT l.ordering, l.chunk_id FROM chunk_graph AS g
+            SELECT l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g
             INNER JOIN chunk_linearized AS l ON g.prev_id = l.chunk_id
             WHERE g.chunk_id = ?
         """
         self.txn.execute(sql, (node_id,))
-        return self.txn.fetchall()
+        return [(Fraction(n, d), c) for c, n, d in self.txn]
 
     def get_nodes_with_edges_from(self, node_id):
         """Implements OrderedListStore"""
 
         # Note that we use the inverse relation here
         sql = """
-            SELECT l.ordering, l.chunk_id FROM chunk_graph AS g
+            SELECT  l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g
             INNER JOIN chunk_linearized AS l ON g.chunk_id = l.chunk_id
             WHERE g.prev_id = ?
         """
         self.txn.execute(sql, (node_id,))
-        return self.txn.fetchall()
+        return [(Fraction(n, d), c) for c, n, d in self.txn]
 
     def _delete_ordering(self, node_id):
         """Implements OrderedListStore"""
 
+        next_chunk_id = SQLBaseStore._simple_select_one_onecol_txn(
+            self.txn,
+            table="chunk_linearized",
+            keyvalues={
+                "chunk_id": node_id,
+            },
+            retcol="next_chunk_id",
+        )
+
         SQLBaseStore._simple_delete_txn(
             self.txn,
             table="chunk_linearized",
             keyvalues={"chunk_id": node_id},
         )
 
+        sql = """
+            UPDATE chunk_linearized SET next_chunk_id = ?
+            WHERE next_chunk_id = ?
+        """
+
+        self.txn.execute(sql, (next_chunk_id, node_id,))
+
+        sql = """
+            UPDATE chunk_linearized_first SET chunk_id = ?
+            WHERE chunk_id = ?
+        """
+
+        self.txn.execute(sql, (next_chunk_id, node_id,))
+
     def _add_edge_to_graph(self, source_id, target_id):
         """Implements OrderedListStore"""
 
@@ -242,78 +379,199 @@ class ChunkDBOrderedListStore(OrderedListStore):
             values={"chunk_id": target_id, "prev_id": source_id}
         )
 
-    def _insert(self, node_id, order):
-        """Inserts the node with the given ordering.
-        """
-        SQLBaseStore._simple_insert_txn(
-            self.txn,
-            table="chunk_linearized",
-            values={
-                "chunk_id": node_id,
-                "room_id": self.room_id,
-                "ordering": order,
-            }
-        )
-
     def _get_order(self, node_id):
         """Get the ordering of the given node.
         """
 
-        return SQLBaseStore._simple_select_one_onecol_txn(
+        row = SQLBaseStore._simple_select_one_txn(
             self.txn,
             table="chunk_linearized",
             keyvalues={"chunk_id": node_id},
-            retcol="ordering"
+            retcols=("numerator", "denominator",),
         )
+        return Fraction(row["numerator"], row["denominator"])
 
     def _rebalance(self, node_id):
         """Rebalances the list around the given node to ensure that the
-        ordering floats don't get too small.
+        ordering denominators aren't too big.
+
+        This is done by starting at the given chunk and generating new orders
+        based on a Farey sequence of order `self.rebalance_md` for all
+        subsequent chunks that have an order less than that of the ordering
+        generated by the Farey sequence.
+
+        For example say we have chunks (and orders): A (23/90),  B (24/91) and
+        C (2/3), and we have rebalance_md set to 5, a rebalancing would produce:
+
+            A: 23/90 -> 1/3
+            B: 24/91 -> 2/5
+            C: 2/3  (no change)
 
-        This works by finding a range that includes the given node, and
-        recalculating the ordering floats such that they're equidistant in
-        that range.
+        Since the farey sequence is 1/5, 1/4, 1/3, 2/5, 1/2, ... and 1/3 is the
+        smallest term greater than 23/90.
+
+        Note that we've extended Farey Sequence to be infinite by repeating the
+        sequence with an added integer. For example sequence with order 3:
+
+            0/1, 1/3, 2/3, 1/1, 4/3, 5/3, 2/1, 7/3, ...
         """
 
         logger.info("Rebalancing room %s, chunk %s", self.room_id, node_id)
 
-        with Measure(self.clock, "chunk_rebalance"):
-            # We pick the interval to try and minimise the number of decimal
-            # places, i.e. we round to nearest float with `rebalance_digits` and
-            # use that as one side of the interval
-            order = self._get_order(node_id)
-            a = round(order, self.rebalance_digits)
-            min_order = a - 10 ** -self.rebalance_digits
-            max_order = a + 10 ** -self.rebalance_digits
-
-            # Now we get all the nodes in the range. We add the minimum difference
-            # to the bounds to ensure that we don't accidentally move a node to be
-            # within the minimum difference of a node outside the range.
-            sql = """
-                SELECT chunk_id FROM chunk_linearized
-                WHERE ordering >= ? AND ordering <= ? AND room_id = ?
-            """
-            self.txn.execute(sql, (
-                min_order - self.min_difference,
-                max_order + self.min_difference,
-                self.room_id,
-            ))
-
-            chunk_ids = [c for c, in self.txn]
+        old_order = self._get_order(node_id)
+
+        a, b, c, d = find_farey_terms(old_order, self.rebalance_md)
+        assert old_order < Fraction(a, b)
+        assert b + d > self.rebalance_md
+
+        # Since we can easily produce farey sequence terms with an iterative
+        # algorithm, we can use WITH RECURSIVE to do so. This is less clear
+        # than doing it in python, but saves us being killed by the RTT to the
+        # DB if we need to rebalance a large number of nodes.
+        with_sql = """
+            WITH RECURSIVE chunks (chunk_id, next, n, a, b, c, d) AS (
+                    SELECT chunk_id, next_chunk_id, ?, ?, ?, ?, ?
+                    FROM chunk_linearized WHERE chunk_id = ?
+                UNION ALL
+                    SELECT n.chunk_id, n.next_chunk_id, n,
+                    c, d, ((n + b) / d) * c - a, ((n + b) / d) * d - b
+                    FROM chunks AS c
+                    INNER JOIN chunk_linearized AS l ON l.chunk_id = c.chunk_id
+                    INNER JOIN chunk_linearized AS n ON n.chunk_id = l.next_chunk_id
+                    WHERE c * 1.0 / d > n.numerator * 1.0 / n.denominator
+            )
+        """
 
-            sql = """
+        # Annoyingly, postgres 9.4 doesn't support the standard SQL subquery
+        # syntax for updates.
+        if isinstance(self.database_engine, PostgresEngine):
+            sql = with_sql + """
+                UPDATE chunk_linearized AS l
+                SET numerator = a, denominator = b
+                FROM chunks AS c
+                WHERE c.chunk_id = l.chunk_id
+            """
+        else:
+            sql = with_sql + """
                 UPDATE chunk_linearized
-                SET ordering = ?
-                WHERE chunk_id = ?
+                SET (numerator, denominator) = (
+                    SELECT a, b FROM chunks
+                    WHERE chunks.chunk_id = chunk_linearized.chunk_id
+                )
+                WHERE chunk_id in (SELECT chunk_id FROM chunks)
             """
 
-            step = (max_order - min_order) / len(chunk_ids)
-            self.txn.executemany(
-                sql,
-                (
-                    ((idx * step + min_order), chunk_id)
-                    for idx, chunk_id in enumerate(chunk_ids)
-                )
-            )
+        self.txn.execute(sql, (
+            self.rebalance_md, a, b, c, d, node_id
+        ))
+
+        logger.info("Rebalanced %d chunks in room %s", self.txn.rowcount, self.room_id)
+
+        rebalance_counter.inc()
+
+
+def get_fraction_in_range(min_frac, max_frac):
+    """Gets a fraction in between the given numbers.
+
+    Uses Stern-Brocot tree to generate the fraction with the smallest
+    denominator.
+
+    See https://en.wikipedia.org/wiki/Stern%E2%80%93Brocot_tree
+
+    Args:
+        min_frac (numbers.Rational)
+        max_frac (numbers.Rational)
+
+    Returns:
+        numbers.Rational
+    """
+
+    assert 0 <= min_frac < max_frac
+
+    # If the determinant is 1 then the fraction with smallest numerator and
+    # denominator in the range is the mediant, so we don't have to use the
+    # stern brocot tree to search for it.
+    determinant = (
+        min_frac.denominator * max_frac.numerator
+        - min_frac.numerator * max_frac.denominator
+    )
+
+    if determinant == 1:
+        return Fraction(
+            min_frac.numerator + max_frac.numerator,
+            min_frac.denominator + max_frac.denominator,
+        )
+
+    # This works by tracking two fractions a/b and c/d and repeatedly replacing
+    # one of them with their mediant, depending on if the mediant is smaller
+    # or greater than the specified range.
+    a, b, c, d = 0, 1, 1, 0
+
+    while True:
+        f = Fraction(a + c, b + d)
+
+        if f <= min_frac:
+            a, b, c, d = a + c, b + d, c, d
+        elif min_frac < f < max_frac:
+            return f
+        else:
+            a, b, c, d = a, b, a + c, b + d
+
+
+def find_farey_terms(min_frac, max_denom):
+    """Find the smallest pair of fractions that are part of the Farey sequence
+    of order `max_denom` (the ordered sequence of all fraction with denominator
+    less than or equal to max_denom).
+
+    This is useful as it can be fed into a simple iterative algorithm to
+    generate subsequent entries in the sequence.
+
+    A pair of fractions a/b, c/d are neighbours in the sequence of order
+    max(b, d) if and only if their determinant is one, i.e. bc - ad = 1. Note
+    that the next order sequence is generate by taking the mediants of the
+    previous order, so a/b and c/d are neighbours in all sequences with orders
+    between max(b, d) and b + d.
+
+    We can therefore use the Stern-Brocot tree to find the closest pair of
+    fractions to min_frac such that b + d is strictly greater than max_denom,
+    since all neighbouring fractions in Stern-Brocot satisfy the necessary
+    determinant property.
+
+    Note that we've extended Farey Sequence to be infinite by repeating the
+    sequence with an added integer. For example sequence with order 3:
+
+        0/1, 1/3, 2/3, 1/1, 4/3, 5/3, 2/1, 7/3, ...
+
+    See https://en.wikipedia.org/wiki/Farey_sequence
+
+    Args:
+        min_frac (numbers.Rational)
+        max_frac (int)
+
+    Returns:
+        tuple[int, int, int, int]
+    """
+
+    a, b, c, d = 0, 1, 1, 0
+
+    while True:
+        cur_frac = Fraction(a + c, b + d)
+
+        if b + d > max_denom:
+            break
+
+        if cur_frac <= min_frac:
+            a, b, c, d = a + c, b + d, c, d
+        elif min_frac < cur_frac:
+            a, b, c, d = a, b, a + c, b + d
+
+    # a/b may be smaller than min_frac, so we run the algorithm to generate
+    # next Farey sequence terms until a/b is strictly greater than min_frac
+    while Fraction(a, b) <= min_frac:
+        k = int((max_denom + b) / d)
+        a, b, c, d = c, d, k * c - a, k * d - b
+
+    assert min_frac < Fraction(a, b) < Fraction(c, d)
+    assert b * c - a * d == 1
 
-            rebalance_counter.inc()
+    return a, b, c, d
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 873bc717bb..e76af4ec69 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1446,7 +1446,7 @@ class EventsStore(EventsWorkerStore):
             sibling_events.update(pes)
 
         table = ChunkDBOrderedListStore(
-            txn, room_id, self.clock,
+            txn, room_id, self.clock, self.database_engine,
         )
 
         # If there is only one previous chunk (and that isn't None), then this
diff --git a/synapse/storage/schema/delta/49/event_chunks.py b/synapse/storage/schema/delta/49/event_chunks.py
index 7d8d711600..50040a779c 100644
--- a/synapse/storage/schema/delta/49/event_chunks.py
+++ b/synapse/storage/schema/delta/49/event_chunks.py
@@ -53,11 +53,23 @@ CREATE INDEX chunk_backwards_extremities_event_id ON chunk_backwards_extremities
 CREATE TABLE chunk_linearized (
     chunk_id BIGINT NOT NULL,
     room_id TEXT NOT NULL,
-    ordering DOUBLE PRECISION NOT NULL
+    next_chunk_id BIGINT,  -- The chunk directly after this chunk, or NULL if last chunk
+    numerator BIGINT NOT NULL,
+    denominator BIGINT NOT NULL
 );
 
 CREATE UNIQUE INDEX chunk_linearized_id ON chunk_linearized (chunk_id);
-CREATE INDEX chunk_linearized_ordering ON chunk_linearized (room_id, ordering);
+CREATE UNIQUE INDEX chunk_linearized_next_id ON chunk_linearized (
+    next_chunk_id, room_id
+);
+
+-- Records the first chunk in a room.
+CREATE TABLE chunk_linearized_first (
+    chunk_id BIGINT NOT NULL,
+    room_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX chunk_linearized_first_id ON chunk_linearized_first (room_id);
 
 INSERT into background_updates (update_name, progress_json)
     VALUES ('event_fields_chunk_id', '{}');
@@ -69,10 +81,6 @@ def run_create(cur, database_engine, *args, **kwargs):
     for statement in get_statements(SQL.splitlines()):
         cur.execute(statement)
 
-    # We now go through and assign chunk IDs for all forward extremities.
-    # Note that we know that extremities can't reference each other, so we
-    # can simply assign each event a new chunk ID with an arbitrary order.
-
     txn = LoggingTransaction(
         cur, "schema_update", database_engine, [], [],
     )
@@ -86,6 +94,7 @@ def run_create(cur, database_engine, *args, **kwargs):
 
     next_chunk_id = 1
     room_to_next_order = {}
+    prev_chunks_by_room = {}
 
     for row in rows:
         chunk_id = next_chunk_id
@@ -101,19 +110,41 @@ def run_create(cur, database_engine, *args, **kwargs):
             updatevalues={"chunk_id": chunk_id},
         )
 
-        ordering = room_to_next_order.get(room_id, 0)
+        ordering = room_to_next_order.get(room_id, 1)
         room_to_next_order[room_id] = ordering + 1
 
+        prev_chunks = prev_chunks_by_room.setdefault(room_id, [])
+
         SQLBaseStore._simple_insert_txn(
             txn,
             table="chunk_linearized",
             values={
                 "chunk_id": chunk_id,
                 "room_id": row["room_id"],
-                "ordering": 0,
+                "numerator": ordering,
+                "denominator": 1,
             },
         )
 
+        if prev_chunks:
+            SQLBaseStore._simple_update_one_txn(
+                txn,
+                table="chunk_linearized",
+                keyvalues={"chunk_id": prev_chunks[-1]},
+                updatevalues={"next_chunk_id": chunk_id},
+            )
+        else:
+            SQLBaseStore._simple_insert_txn(
+                txn,
+                table="chunk_linearized_first",
+                values={
+                    "chunk_id": chunk_id,
+                    "room_id": row["room_id"],
+                },
+            )
+
+        prev_chunks.append(chunk_id)
+
 
 def run_upgrade(*args, **kwargs):
     pass
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 0d32a3a498..c5b52a3d60 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -792,7 +792,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 iterated_chunks = [chunk_id]
 
         table = ChunkDBOrderedListStore(
-            txn, room_id, self.clock,
+            txn, room_id, self.clock, self.database_engine,
         )
 
         if filter_clause:
diff --git a/tests/storage/test_chunk_linearizer_table.py b/tests/storage/test_chunk_linearizer_table.py
index beb1ac9a42..ce2883865a 100644
--- a/tests/storage/test_chunk_linearizer_table.py
+++ b/tests/storage/test_chunk_linearizer_table.py
@@ -15,11 +15,16 @@
 
 from twisted.internet import defer
 
+import itertools
 import random
 import tests.unittest
 import tests.utils
 
-from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
+from fractions import Fraction
+
+from synapse.storage.chunk_ordered_table import (
+    ChunkDBOrderedListStore, find_farey_terms, get_fraction_in_range,
+)
 
 
 class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
@@ -42,23 +47,26 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
 
         def test_txn(txn):
             table = ChunkDBOrderedListStore(
-                txn, room_id, self.clock, 1, 100,
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 100,
             )
 
             table.add_node("A")
             table._insert_after("B", "A")
             table._insert_before("C", "A")
+            table._insert_after("D", "A")
 
             sql = """
-                SELECT chunk_id FROM chunk_linearized
+                SELECT chunk_id, numerator, denominator FROM chunk_linearized
                 WHERE room_id = ?
-                ORDER BY ordering ASC
             """
             txn.execute(sql, (room_id,))
 
-            ordered = [r for r, in txn]
+            ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
+            ordered = [c for _, c in ordered]
 
-            self.assertEqual(["C", "A", "B"], ordered)
+            self.assertEqual(["C", "A", "D", "B"], ordered)
 
         yield self.store.runInteraction("test", test_txn)
 
@@ -68,7 +76,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
 
         def test_txn(txn):
             table = ChunkDBOrderedListStore(
-                txn, room_id, self.clock, 1, 20,
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 100,
             )
 
             nodes = [(i, "node_%d" % (i,)) for i in xrange(1, 1000)]
@@ -95,13 +105,13 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
                 already_inserted.sort()
 
             sql = """
-                SELECT chunk_id FROM chunk_linearized
+                SELECT chunk_id, numerator, denominator FROM chunk_linearized
                 WHERE room_id = ?
-                ORDER BY ordering ASC
             """
             txn.execute(sql, (room_id,))
 
-            ordered = [r for r, in txn]
+            ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
+            ordered = [c for _, c in ordered]
 
             self.assertEqual(expected, ordered)
 
@@ -113,7 +123,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
 
         def test_txn(txn):
             table = ChunkDBOrderedListStore(
-                txn, room_id, self.clock, 1, 20,
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 1000,
             )
 
             table.add_node("a")
@@ -131,13 +143,13 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
                 expected.append(node_id)
 
             sql = """
-                SELECT chunk_id FROM chunk_linearized
+                SELECT chunk_id, numerator, denominator FROM chunk_linearized
                 WHERE room_id = ?
-                ORDER BY ordering ASC
             """
             txn.execute(sql, (room_id,))
 
-            ordered = [r for r, in txn]
+            ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
+            ordered = [c for _, c in ordered]
 
             self.assertEqual(expected, ordered)
 
@@ -149,7 +161,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
 
         def test_txn(txn):
             table = ChunkDBOrderedListStore(
-                txn, room_id, self.clock, 1, 100,
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 100,
             )
 
             table.add_node("a")
@@ -170,16 +184,119 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
                 prev_node = node_id
 
             sql = """
-                SELECT chunk_id FROM chunk_linearized
+                SELECT chunk_id, numerator, denominator FROM chunk_linearized
                 WHERE room_id = ?
-                ORDER BY ordering ASC
             """
             txn.execute(sql, (room_id,))
 
-            ordered = [r for r, in txn]
+            ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
+            ordered = [c for _, c in ordered]
 
             expected = expected_prefix + list(reversed(expected_suffix))
 
             self.assertEqual(expected, ordered)
 
         yield self.store.runInteraction("test", test_txn)
+
+    @defer.inlineCallbacks
+    def test_get_edges_to(self):
+        room_id = "foo_room4"
+
+        def test_txn(txn):
+            table = ChunkDBOrderedListStore(
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 100,
+            )
+
+            table.add_node("A")
+            table._insert_after("B", "A")
+            table._add_edge_to_graph("A", "B")
+            table._insert_before("C", "A")
+            table._add_edge_to_graph("C", "A")
+
+            nodes = table.get_nodes_with_edges_from("A")
+            self.assertEqual([n for _, n in nodes], ["B"])
+
+            nodes = table.get_nodes_with_edges_to("A")
+            self.assertEqual([n for _, n in nodes], ["C"])
+
+        yield self.store.runInteraction("test", test_txn)
+
+    @defer.inlineCallbacks
+    def test_get_next_and_prev(self):
+        room_id = "foo_room5"
+
+        def test_txn(txn):
+            table = ChunkDBOrderedListStore(
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 100,
+            )
+
+            table.add_node("A")
+            table._insert_after("B", "A")
+            table._insert_before("C", "A")
+
+            self.assertEqual(table.get_next("A"), "B")
+            self.assertEqual(table.get_prev("A"), "C")
+
+        yield self.store.runInteraction("test", test_txn)
+
+    def test_find_farey_terms(self):
+        def _test(min_frac, max_denom):
+            """"Calls `find_farey_terms` with given values and checks they
+            are neighbours in the Farey Sequence.
+            """
+
+            a, b, c, d = find_farey_terms(min_frac, max_denom)
+
+            p = Fraction(a, b)
+            q = Fraction(c, d)
+
+            assert min_frac < p < q
+
+            for x, y in _pairwise(_farey_generator(max_denom)):
+                if min_frac < x < y:
+                    self.assertEqual(x, p)
+                    self.assertEqual(y, q)
+                    break
+
+        _test(Fraction(5, 3), 12)
+        _test(Fraction(1, 3), 12)
+        _test(Fraction(1, 2), 9)
+        _test(Fraction(1, 2), 10)
+        _test(Fraction(1, 2), 15)
+
+    def test_get_fraction_in_range(self):
+        def _test(x, y):
+            assert x < get_fraction_in_range(x, y) < y
+
+        _test(Fraction(1, 2), Fraction(2, 3))
+        _test(Fraction(1, 2), Fraction(3, 2))
+        _test(Fraction(5, 203), Fraction(6, 204))
+
+
+def _farey_generator(n):
+    """Generates Farey sequence of order `n`.
+
+    Note that this doesn't terminate.
+
+    Taken from https://en.wikipedia.org/wiki/Farey_sequence#Next_term
+    """
+
+    a, b, c, d = 0, 1, 1, n
+
+    yield Fraction(a, b)
+
+    while True:
+        k = int((n + b) / d)
+        a, b, c, d = c, d, (k * c - a), (k * d - b)
+        yield Fraction(a, b)
+
+
+def _pairwise(iterable):
+    "s -> (s0,s1), (s1,s2), (s2, s3), ..."
+    a, b = itertools.tee(iterable)
+    next(b, None)
+    return itertools.izip(a, b)
diff --git a/tests/util/test_katriel_bodlaender.py b/tests/util/test_katriel_bodlaender.py
index 5768408604..72126bdea9 100644
--- a/tests/util/test_katriel_bodlaender.py
+++ b/tests/util/test_katriel_bodlaender.py
@@ -56,3 +56,29 @@ class KatrielBodlaenderTests(unittest.TestCase):
         store.add_edge("node_4", "node_3")
 
         self.assertEqual(list(reversed(nodes)), store.list)
+
+    def test_divergent_graph(self):
+        store = InMemoryOrderedListStore()
+
+        nodes = [
+            "node_1",
+            "node_2",
+            "node_3",
+            "node_4",
+            "node_5",
+            "node_6",
+        ]
+
+        for node in reversed(nodes):
+            store.add_node(node)
+
+        store.add_edge("node_2", "node_3")
+        store.add_edge("node_2", "node_5")
+        store.add_edge("node_1", "node_2")
+        store.add_edge("node_3", "node_4")
+        store.add_edge("node_1", "node_3")
+        store.add_edge("node_4", "node_5")
+        store.add_edge("node_5", "node_6")
+        store.add_edge("node_4", "node_6")
+
+        self.assertEqual(nodes, store.list)