summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2018-05-18 13:54:34 +0100
committerGitHub <noreply@github.com>2018-05-18 13:54:34 +0100
commit0a325e538594109bf7e57e92cf13478a958022b7 (patch)
tree655677152302db60dc9b62e22591f9d30c027b99
parentMerge pull request #3212 from matrix-org/erikj/epa_stream (diff)
parentComments (diff)
downloadsynapse-0a325e538594109bf7e57e92cf13478a958022b7.tar.xz
Merge pull request #3226 from matrix-org/erikj/chunk_base
Begin adding implementing room chunks
-rw-r--r--CHANGES.rst10
-rw-r--r--synapse/storage/chunk_ordered_table.py319
-rw-r--r--synapse/storage/events.py9
-rw-r--r--synapse/storage/schema/delta/49/event_chunks.sql49
-rw-r--r--synapse/util/katriel_bodlaender.py337
-rw-r--r--tests/storage/test_chunk_linearizer_table.py185
-rw-r--r--tests/util/test_katriel_bodlaender.py58
7 files changed, 966 insertions, 1 deletions
diff --git a/CHANGES.rst b/CHANGES.rst
index b769b0f046..ea532c6cca 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,5 +1,13 @@
+Changes in <unreleased>
+=======================
+
+This release adds an index to the events table. This means that on first
+startup there will be an inceased amount of IO until the index is created, and
+an increase in disk usage.
+
+
 Changes in synapse v0.29.0 (2018-05-16)
-===========================================
+=======================================
 
 
 Changes in synapse v0.29.0-rc1 (2018-05-14)
diff --git a/synapse/storage/chunk_ordered_table.py b/synapse/storage/chunk_ordered_table.py
new file mode 100644
index 0000000000..79d0ca44ec
--- /dev/null
+++ b/synapse/storage/chunk_ordered_table.py
@@ -0,0 +1,319 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import math
+import logging
+
+from synapse.storage._base import SQLBaseStore
+from synapse.util.katriel_bodlaender import OrderedListStore
+from synapse.util.metrics import Measure
+
+import synapse.metrics
+
+metrics = synapse.metrics.get_metrics_for(__name__)
+rebalance_counter = metrics.register_counter("rebalances")
+
+
+logger = logging.getLogger(__name__)
+
+
+class ChunkDBOrderedListStore(OrderedListStore):
+    """Used as the list store for room chunks, efficiently maintaining them in
+    topological order on updates.
+
+    A room chunk is a connected portion of the room events DAG. Chunks are
+    constructed so that they have the additional property that for all events in
+    the chunk, either all of their prev_events are in that chunk or none of them
+    are. This ensures that no event that is subsequently received needs to be
+    inserted into the middle of a chunk, since it cannot both reference an event
+    in the chunk and be referenced by an event in the chunk (assuming no
+    cycles).
+
+    As such the set of chunks in a room inherits a DAG, i.e. if an event in one
+    chunk references an event in a second chunk, then we say that the first
+    chunk references the second, and thus forming a DAG. (This means that chunks
+    start off disconnected until an event is received that connects the two
+    chunks.)
+
+    We can therefore end up with multiple chunks in a room when the server
+    misses some events, e.g. due to the server being offline for a time.
+
+    The server may only have a subset of all events in a room, in which case
+    its possible for the server to have chunks that are unconnected from each
+    other. The ordering between unconnected chunks is arbitrary.
+
+    The class is designed for use inside transactions and so takes a
+    transaction object in the constructor. This means that it needs to be
+    re-instantiated in each transaction, so all state needs to be stored
+    in the database.
+
+    Internally the ordering is implemented using floats, and the average is
+    taken when a node is inserted between other nodes. To avoid precision
+    errors a minimum difference between sucessive orderings is attempted to be
+    kept; whenever the difference is too small we attempt to rebalance. See
+    the `_rebalance` function for implementation details.
+
+    Note that OrderedListStore orders nodes such that source of an edge
+    comes before the target. This is counter intuitive when edges represent
+    causality, so for the purposes of ordering algorithm we invert the edge
+    directions, i.e. if chunk A has a prev chunk of B then we say that the
+    edge is from B to A. This ensures that newer chunks get inserted at the
+    end (rather than the start).
+
+    Note: Calls to `add_node` and `add_edge` cannot overlap for the same room,
+    and so callers should perform some form of per-room locking when using
+    this class.
+
+    Args:
+        txn
+        room_id (str)
+        clock
+        rebalance_digits (int): When a rebalance is triggered we rebalance
+            in a range around the node, where the bounds are rounded to this
+            number of digits.
+        min_difference (int): A rebalance is triggered when the difference
+            between two successive orderings is less than the reciprocal of
+            this.
+    """
+    def __init__(self,
+                 txn, room_id, clock,
+                 rebalance_digits=3,
+                 min_difference=1000000):
+        self.txn = txn
+        self.room_id = room_id
+        self.clock = clock
+
+        self.rebalance_digits = rebalance_digits
+        self.min_difference = 1. / min_difference
+
+    def is_before(self, a, b):
+        """Implements OrderedListStore"""
+        return self._get_order(a) < self._get_order(b)
+
+    def get_prev(self, node_id):
+        """Implements OrderedListStore"""
+        order = self._get_order(node_id)
+
+        sql = """
+            SELECT chunk_id FROM chunk_linearized
+            WHERE ordering < ? AND room_id = ?
+            ORDER BY ordering DESC
+            LIMIT 1
+        """
+
+        self.txn.execute(sql, (order, self.room_id,))
+
+        row = self.txn.fetchone()
+        if row:
+            return row[0]
+        return None
+
+    def get_next(self, node_id):
+        """Implements OrderedListStore"""
+        order = self._get_order(node_id)
+
+        sql = """
+            SELECT chunk_id FROM chunk_linearized
+            WHERE ordering > ? AND room_id = ?
+            ORDER BY ordering ASC
+            LIMIT 1
+        """
+
+        self.txn.execute(sql, (order, self.room_id,))
+
+        row = self.txn.fetchone()
+        if row:
+            return row[0]
+        return None
+
+    def _insert_before(self, node_id, target_id):
+        """Implements OrderedListStore"""
+
+        rebalance = False  # Set to true if we need to trigger a rebalance
+
+        if target_id:
+            target_order = self._get_order(target_id)
+            before_id = self.get_prev(target_id)
+
+            if before_id:
+                before_order = self._get_order(before_id)
+                new_order = (target_order + before_order) / 2.
+
+                rebalance = math.fabs(target_order - before_order) < self.min_difference
+            else:
+                new_order = math.floor(target_order) - 1
+        else:
+            # If target_id is None then we insert at the end.
+            self.txn.execute("""
+                SELECT COALESCE(MAX(ordering), 0) + 1
+                FROM chunk_linearized
+                WHERE room_id = ?
+            """, (self.room_id,))
+
+            new_order, = self.txn.fetchone()
+
+        self._insert(node_id, new_order)
+
+        if rebalance:
+            self._rebalance(node_id)
+
+    def _insert_after(self, node_id, target_id):
+        """Implements OrderedListStore"""
+
+        rebalance = False  # Set to true if we need to trigger a rebalance
+
+        if target_id:
+            target_order = self._get_order(target_id)
+            after_id = self.get_next(target_id)
+            if after_id:
+                after_order = self._get_order(after_id)
+                new_order = (target_order + after_order) / 2.
+
+                rebalance = math.fabs(target_order - after_order) < self.min_difference
+            else:
+                new_order = math.ceil(target_order) + 1
+        else:
+            # If target_id is None then we insert at the start.
+            self.txn.execute("""
+                SELECT COALESCE(MIN(ordering), 0) - 1
+                FROM chunk_linearized
+                WHERE room_id = ?
+            """, (self.room_id,))
+
+            new_order, = self.txn.fetchone()
+
+        self._insert(node_id, new_order)
+
+        if rebalance:
+            self._rebalance(node_id)
+
+    def get_nodes_with_edges_to(self, node_id):
+        """Implements OrderedListStore"""
+
+        # Note that we use the inverse relation here
+        sql = """
+            SELECT l.ordering, l.chunk_id FROM chunk_graph AS g
+            INNER JOIN chunk_linearized AS l ON g.prev_id = l.chunk_id
+            WHERE g.chunk_id = ?
+        """
+        self.txn.execute(sql, (node_id,))
+        return self.txn.fetchall()
+
+    def get_nodes_with_edges_from(self, node_id):
+        """Implements OrderedListStore"""
+
+        # Note that we use the inverse relation here
+        sql = """
+            SELECT l.ordering, l.chunk_id FROM chunk_graph AS g
+            INNER JOIN chunk_linearized AS l ON g.chunk_id = l.chunk_id
+            WHERE g.prev_id = ?
+        """
+        self.txn.execute(sql, (node_id,))
+        return self.txn.fetchall()
+
+    def _delete_ordering(self, node_id):
+        """Implements OrderedListStore"""
+
+        SQLBaseStore._simple_delete_txn(
+            self.txn,
+            table="chunk_linearized",
+            keyvalues={"chunk_id": node_id},
+        )
+
+    def _add_edge_to_graph(self, source_id, target_id):
+        """Implements OrderedListStore"""
+
+        # Note that we use the inverse relation
+        SQLBaseStore._simple_insert_txn(
+            self.txn,
+            table="chunk_graph",
+            values={"chunk_id": target_id, "prev_id": source_id}
+        )
+
+    def _insert(self, node_id, order):
+        """Inserts the node with the given ordering.
+        """
+        SQLBaseStore._simple_insert_txn(
+            self.txn,
+            table="chunk_linearized",
+            values={
+                "chunk_id": node_id,
+                "room_id": self.room_id,
+                "ordering": order,
+            }
+        )
+
+    def _get_order(self, node_id):
+        """Get the ordering of the given node.
+        """
+
+        return SQLBaseStore._simple_select_one_onecol_txn(
+            self.txn,
+            table="chunk_linearized",
+            keyvalues={"chunk_id": node_id},
+            retcol="ordering"
+        )
+
+    def _rebalance(self, node_id):
+        """Rebalances the list around the given node to ensure that the
+        ordering floats don't get too small.
+
+        This works by finding a range that includes the given node, and
+        recalculating the ordering floats such that they're equidistant in
+        that range.
+        """
+
+        logger.info("Rebalancing room %s, chunk %s", self.room_id, node_id)
+
+        with Measure(self.clock, "chunk_rebalance"):
+            # We pick the interval to try and minimise the number of decimal
+            # places, i.e. we round to nearest float with `rebalance_digits` and
+            # use that as one side of the interval
+            order = self._get_order(node_id)
+            a = round(order, self.rebalance_digits)
+            min_order = a - 10 ** -self.rebalance_digits
+            max_order = a + 10 ** -self.rebalance_digits
+
+            # Now we get all the nodes in the range. We add the minimum difference
+            # to the bounds to ensure that we don't accidentally move a node to be
+            # within the minimum difference of a node outside the range.
+            sql = """
+                SELECT chunk_id FROM chunk_linearized
+                WHERE ordering >= ? AND ordering <= ? AND room_id = ?
+            """
+            self.txn.execute(sql, (
+                min_order - self.min_difference,
+                max_order + self.min_difference,
+                self.room_id,
+            ))
+
+            chunk_ids = [c for c, in self.txn]
+
+            sql = """
+                UPDATE chunk_linearized
+                SET ordering = ?
+                WHERE chunk_id = ?
+            """
+
+            step = (max_order - min_order) / len(chunk_ids)
+            self.txn.executemany(
+                sql,
+                (
+                    ((idx * step + min_order), chunk_id)
+                    for idx, chunk_id in enumerate(chunk_ids)
+                )
+            )
+
+            rebalance_counter.inc()
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 05cde96afc..70b9041eee 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -232,6 +232,15 @@ class EventsStore(EventsWorkerStore):
             psql_only=True,
         )
 
+        self.register_background_index_update(
+            "events_chunk_index",
+            index_name="events_chunk_index",
+            table="events",
+            columns=["room_id", "chunk_id", "topological_ordering", "stream_ordering"],
+            unique=True,
+            psql_only=True,
+        )
+
         self._event_persist_queue = _EventPeristenceQueue()
 
         self._state_resolution_handler = hs.get_state_resolution_handler()
diff --git a/synapse/storage/schema/delta/49/event_chunks.sql b/synapse/storage/schema/delta/49/event_chunks.sql
new file mode 100644
index 0000000000..6b428b4ef8
--- /dev/null
+++ b/synapse/storage/schema/delta/49/event_chunks.sql
@@ -0,0 +1,49 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE events ADD COLUMN chunk_id BIGINT;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+    ('events_chunk_index', '{}');
+
+-- Stores how chunks of graph relate to each other
+CREATE TABLE chunk_graph (
+    chunk_id BIGINT NOT NULL,
+    prev_id BIGINT NOT NULL
+);
+
+CREATE UNIQUE INDEX chunk_graph_id ON chunk_graph (chunk_id, prev_id);
+CREATE INDEX chunk_graph_prev_id ON chunk_graph (prev_id);
+
+-- The extremities in each chunk. Note that these are pointing to events that
+-- we don't have, rather than boundary between chunks.
+CREATE TABLE chunk_backwards_extremities (
+    chunk_id BIGINT NOT NULL,
+    event_id TEXT NOT NULL
+);
+
+CREATE INDEX chunk_backwards_extremities_id ON chunk_backwards_extremities(chunk_id, event_id);
+CREATE INDEX chunk_backwards_extremities_event_id ON chunk_backwards_extremities(event_id);
+
+-- Maintains an absolute ordering of chunks. Gets updated when we see new
+-- edges between chunks.
+CREATE TABLE chunk_linearized (
+    chunk_id BIGINT NOT NULL,
+    room_id TEXT NOT NULL,
+    ordering DOUBLE PRECISION NOT NULL
+);
+
+CREATE UNIQUE INDEX chunk_linearized_id ON chunk_linearized (chunk_id);
+CREATE INDEX chunk_linearized_ordering ON chunk_linearized (room_id, ordering);
diff --git a/synapse/util/katriel_bodlaender.py b/synapse/util/katriel_bodlaender.py
new file mode 100644
index 0000000000..b0eab2b4b0
--- /dev/null
+++ b/synapse/util/katriel_bodlaender.py
@@ -0,0 +1,337 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""This module contains an implementation of the Katriel-Bodlaender algorithm,
+which is used to do online topological ordering of graphs.
+
+Note that the ordering derived from the graph is such that the source node of
+an edge comes before the target node of the edge, i.e. a graph of A -> B -> C
+would produce the ordering [A, B, C].
+
+This ordering is therefore opposite to what one might expect when considering
+the room DAG, as newer messages would be added to the start rather than the
+end.
+
+***The ChunkDBOrderedListStore therefore inverts the direction of edges***
+
+See:
+    A tight analysis of the Katriel–Bodlaender algorithm for online topological
+    ordering
+    Hsiao-Fei Liua and Kun-Mao Chao
+    https://www.sciencedirect.com/science/article/pii/S0304397507006573
+and:
+    Online Topological Ordering
+    Irit Katriel and Hans L. Bodlaender
+    http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.78.7933 )
+"""
+
+from abc import ABCMeta, abstractmethod
+
+
+class OrderedListStore(object):
+    """An abstract base class that is used to store a graph and maintain a
+    topological consistent, total ordering.
+
+    Internally this uses the Katriel-Bodlaender algorithm, which requires the
+    store expose an interface for the total ordering that supports:
+
+        - Insertion of the node into the ordering either immediately before or
+          after another node.
+        - Deletion of the node from the ordering
+        - Comparing the relative ordering of two arbitary nodes
+        - Get the node immediately before or after a given node in the ordering
+
+    It also needs to be able to interact with the graph in the following ways:
+
+        - Query the number of edges from a node in the graph
+        - Query the number of edges into a node in the graph
+        - Add an edge to the graph
+
+
+    Users of subclasses should call `add_node` and `add_edge` whenever editing
+    the graph. The total ordering exposed will remain constant until the next
+    call to one of these methods.
+
+    Note: Calls to `add_node` and `add_edge` cannot overlap, and so callers
+    should perform some form of locking.
+    """
+
+    __metaclass__ = ABCMeta
+
+    def add_node(self, node_id):
+        """Adds a node to the graph.
+
+        Args:
+            node_id (str)
+        """
+        self._insert_before(node_id, None)
+
+    def add_edge(self, source, target):
+        """Adds a new edge to the graph and updates the ordering.
+
+        See module level docs.
+
+        Note that both the source and target nodes must have been inserted into
+        the store (at an arbitrary position) already.
+
+        Args:
+            source (str): The source node of the new edge
+            target (str): The target node of the new edge
+        """
+
+        # The following is the Katriel-Bodlaender algorithm.
+
+        to_s = []
+        from_t = []
+        to_s_neighbours = []
+        from_t_neighbours = []
+        to_s_indegree = 0
+        from_t_outdegree = 0
+        s = source
+        t = target
+
+        while s and t and not self.is_before(s, t):
+            m_s = to_s_indegree
+            m_t = from_t_outdegree
+
+            # These functions return a tuple where the first term is a float
+            # that can be used to order the the list of neighbours.
+            # These are valid until the next write
+            pe_s = self.get_nodes_with_edges_to(s)
+            fe_t = self.get_nodes_with_edges_from(t)
+
+            l_s = len(pe_s)
+            l_t = len(fe_t)
+
+            if m_s + l_s <= m_t + l_t:
+                to_s.append(s)
+                to_s_neighbours.extend(pe_s)
+                to_s_indegree += l_s
+
+                if to_s_neighbours:
+                    to_s_neighbours.sort()
+                    _, s = to_s_neighbours.pop()
+                else:
+                    s = None
+
+            if m_s + l_s >= m_t + l_t:
+                from_t.append(t)
+                from_t_neighbours.extend(fe_t)
+                from_t_outdegree += l_t
+
+                if from_t_neighbours:
+                    from_t_neighbours.sort(reverse=True)
+                    _, t = from_t_neighbours.pop()
+                else:
+                    t = None
+
+        if s is None:
+            s = self.get_prev(target)
+
+        if t is None:
+            t = self.get_next(source)
+
+        while to_s:
+            s1 = to_s.pop()
+            self._delete_ordering(s1)
+            self._insert_after(s1, s)
+            s = s1
+
+        while from_t:
+            t1 = from_t.pop()
+            self._delete_ordering(t1)
+            self._insert_before(t1, t)
+            t = t1
+
+        self._add_edge_to_graph(source, target)
+
+    @abstractmethod
+    def is_before(self, first_node, second_node):
+        """Returns whether the first node is before the second node.
+
+        Args:
+            first_node (str)
+            second_node (str)
+
+        Returns:
+            bool: True if first_node is before second_node
+        """
+        pass
+
+    @abstractmethod
+    def get_prev(self, node_id):
+        """Gets the node immediately before the given node in the topological
+        ordering.
+
+        Args:
+            node_id (str)
+
+        Returns:
+            str|None: A node ID or None if no preceding node exists
+        """
+        pass
+
+    @abstractmethod
+    def get_next(self, node_id):
+        """Gets the node immediately after the given node in the topological
+        ordering.
+
+        Args:
+            node_id (str)
+
+        Returns:
+            str|None: A node ID or None if no proceding node exists
+        """
+        pass
+
+    @abstractmethod
+    def get_nodes_with_edges_to(self, node_id):
+        """Get all nodes with edges to the given node
+
+        Args:
+            node_id (str)
+
+        Returns:
+            list[tuple[float, str]]: Returns a list of tuple of an ordering
+            term and the node ID. The ordering term can be used to sort the
+            returned list.
+            The ordering is valid until subsequent calls to `add_edge`
+            functions
+        """
+        pass
+
+    @abstractmethod
+    def get_nodes_with_edges_from(self, node_id):
+        """Get all nodes with edges from the given node
+
+        Args:
+            node_id (str)
+
+        Returns:
+            list[tuple[float, str]]: Returns a list of tuple of an ordering
+            term and the node ID. The ordering term can be used to sort the
+            returned list.
+            The ordering is valid until subsequent calls to `add_edge`
+            functions
+        """
+        pass
+
+    @abstractmethod
+    def _insert_before(self, node_id, target_id):
+        """Inserts node immediately before target node.
+
+        If target_id is None then the node is inserted at the end of the list
+
+        Args:
+            node_id (str)
+            target_id (str|None)
+        """
+        pass
+
+    @abstractmethod
+    def _insert_after(self, node_id, target_id):
+        """Inserts node immediately after target node.
+
+        If target_id is None then the node is inserted at the start of the list
+
+        Args:
+            node_id (str)
+            target_id (str|None)
+        """
+        pass
+
+    @abstractmethod
+    def _delete_ordering(self, node_id):
+        """Deletes the given node from the ordered list (but not the graph).
+
+        Used when we want to reinsert it into a different position
+
+        Args:
+            node_id (str)
+        """
+        pass
+
+    @abstractmethod
+    def _add_edge_to_graph(self, source_id, target_id):
+        """Adds an edge to the graph from source to target.
+
+        Does not update ordering.
+
+        Args:
+            source_id (str)
+            target_id (str)
+        """
+        pass
+
+
+class InMemoryOrderedListStore(OrderedListStore):
+    """An in memory OrderedListStore
+    """
+
+    def __init__(self):
+        # The ordered list of nodes
+        self.list = []
+
+        # Map from node to set of nodes that it references
+        self.edges_from = {}
+
+        # Map from node to set of nodes that it is referenced by
+        self.edges_to = {}
+
+    def is_before(self, first_node, second_node):
+        return self.list.index(first_node) < self.list.index(second_node)
+
+    def get_prev(self, node_id):
+        idx = self.list.index(node_id) - 1
+        if idx >= 0:
+            return self.list[idx]
+        else:
+            return None
+
+    def get_next(self, node_id):
+        idx = self.list.index(node_id) + 1
+        if idx < len(self.list):
+            return self.list[idx]
+        else:
+            return None
+
+    def _insert_before(self, node_id, target_id):
+        if target_id is not None:
+            idx = self.list.index(target_id)
+            self.list.insert(idx, node_id)
+        else:
+            self.list.append(node_id)
+
+    def _insert_after(self, node_id, target_id):
+        if target_id is not None:
+            idx = self.list.index(target_id) + 1
+            self.list.insert(idx, node_id)
+        else:
+            self.list.insert(0, node_id)
+
+    def _delete_ordering(self, node_id):
+        self.list.remove(node_id)
+
+    def get_nodes_with_edges_to(self, node_id):
+        to_nodes = self.edges_to.get(node_id, [])
+        return [(self.list.index(nid), nid) for nid in to_nodes]
+
+    def get_nodes_with_edges_from(self, node_id):
+        from_nodes = self.edges_from.get(node_id, [])
+        return [(self.list.index(nid), nid) for nid in from_nodes]
+
+    def _add_edge_to_graph(self, source_id, target_id):
+        self.edges_from.setdefault(source_id, set()).add(target_id)
+        self.edges_to.setdefault(target_id, set()).add(source_id)
diff --git a/tests/storage/test_chunk_linearizer_table.py b/tests/storage/test_chunk_linearizer_table.py
new file mode 100644
index 0000000000..beb1ac9a42
--- /dev/null
+++ b/tests/storage/test_chunk_linearizer_table.py
@@ -0,0 +1,185 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+import random
+import tests.unittest
+import tests.utils
+
+from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
+
+
+class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
+    """Tests to ensure that the ordering and rebalancing functions of
+    ChunkDBOrderedListStore work as expected.
+    """
+
+    def __init__(self, *args, **kwargs):
+        super(ChunkLinearizerStoreTestCase, self).__init__(*args, **kwargs)
+
+    @defer.inlineCallbacks
+    def setUp(self):
+        hs = yield tests.utils.setup_test_homeserver()
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def test_simple_insert_fetch(self):
+        room_id = "foo_room1"
+
+        def test_txn(txn):
+            table = ChunkDBOrderedListStore(
+                txn, room_id, self.clock, 1, 100,
+            )
+
+            table.add_node("A")
+            table._insert_after("B", "A")
+            table._insert_before("C", "A")
+
+            sql = """
+                SELECT chunk_id FROM chunk_linearized
+                WHERE room_id = ?
+                ORDER BY ordering ASC
+            """
+            txn.execute(sql, (room_id,))
+
+            ordered = [r for r, in txn]
+
+            self.assertEqual(["C", "A", "B"], ordered)
+
+        yield self.store.runInteraction("test", test_txn)
+
+    @defer.inlineCallbacks
+    def test_many_insert_fetch(self):
+        room_id = "foo_room2"
+
+        def test_txn(txn):
+            table = ChunkDBOrderedListStore(
+                txn, room_id, self.clock, 1, 20,
+            )
+
+            nodes = [(i, "node_%d" % (i,)) for i in xrange(1, 1000)]
+            expected = [n for _, n in nodes]
+
+            already_inserted = []
+
+            random.shuffle(nodes)
+            while nodes:
+                i, node_id = nodes.pop()
+                if not already_inserted:
+                    table.add_node(node_id)
+                else:
+                    for j, target_id in already_inserted:
+                        if j > i:
+                            break
+
+                    if j < i:
+                        table._insert_after(node_id, target_id)
+                    else:
+                        table._insert_before(node_id, target_id)
+
+                already_inserted.append((i, node_id))
+                already_inserted.sort()
+
+            sql = """
+                SELECT chunk_id FROM chunk_linearized
+                WHERE room_id = ?
+                ORDER BY ordering ASC
+            """
+            txn.execute(sql, (room_id,))
+
+            ordered = [r for r, in txn]
+
+            self.assertEqual(expected, ordered)
+
+        yield self.store.runInteraction("test", test_txn)
+
+    @defer.inlineCallbacks
+    def test_prepend_and_append(self):
+        room_id = "foo_room3"
+
+        def test_txn(txn):
+            table = ChunkDBOrderedListStore(
+                txn, room_id, self.clock, 1, 20,
+            )
+
+            table.add_node("a")
+
+            expected = ["a"]
+
+            for i in xrange(1, 1000):
+                node_id = "node_id_before_%d" % i
+                table._insert_before(node_id, expected[0])
+                expected.insert(0, node_id)
+
+            for i in xrange(1, 1000):
+                node_id = "node_id_after_%d" % i
+                table._insert_after(node_id, expected[-1])
+                expected.append(node_id)
+
+            sql = """
+                SELECT chunk_id FROM chunk_linearized
+                WHERE room_id = ?
+                ORDER BY ordering ASC
+            """
+            txn.execute(sql, (room_id,))
+
+            ordered = [r for r, in txn]
+
+            self.assertEqual(expected, ordered)
+
+        yield self.store.runInteraction("test", test_txn)
+
+    @defer.inlineCallbacks
+    def test_worst_case(self):
+        room_id = "foo_room3"
+
+        def test_txn(txn):
+            table = ChunkDBOrderedListStore(
+                txn, room_id, self.clock, 1, 100,
+            )
+
+            table.add_node("a")
+
+            prev_node = "a"
+
+            expected_prefix = ["a"]
+            expected_suffix = []
+
+            for i in xrange(1, 100):
+                node_id = "node_id_%d" % i
+                if i % 2 == 0:
+                    table._insert_before(node_id, prev_node)
+                    expected_prefix.append(node_id)
+                else:
+                    table._insert_after(node_id, prev_node)
+                    expected_suffix.append(node_id)
+                prev_node = node_id
+
+            sql = """
+                SELECT chunk_id FROM chunk_linearized
+                WHERE room_id = ?
+                ORDER BY ordering ASC
+            """
+            txn.execute(sql, (room_id,))
+
+            ordered = [r for r, in txn]
+
+            expected = expected_prefix + list(reversed(expected_suffix))
+
+            self.assertEqual(expected, ordered)
+
+        yield self.store.runInteraction("test", test_txn)
diff --git a/tests/util/test_katriel_bodlaender.py b/tests/util/test_katriel_bodlaender.py
new file mode 100644
index 0000000000..5768408604
--- /dev/null
+++ b/tests/util/test_katriel_bodlaender.py
@@ -0,0 +1,58 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.util.katriel_bodlaender import InMemoryOrderedListStore
+
+from tests import unittest
+
+
+class KatrielBodlaenderTests(unittest.TestCase):
+    def test_simple_graph(self):
+        store = InMemoryOrderedListStore()
+
+        nodes = [
+            "node_1",
+            "node_2",
+            "node_3",
+            "node_4",
+        ]
+
+        for node in nodes:
+            store.add_node(node)
+
+        store.add_edge("node_2", "node_3")
+        store.add_edge("node_1", "node_2")
+        store.add_edge("node_3", "node_4")
+
+        self.assertEqual(nodes, store.list)
+
+    def test_reverse_graph(self):
+        store = InMemoryOrderedListStore()
+
+        nodes = [
+            "node_1",
+            "node_2",
+            "node_3",
+            "node_4",
+        ]
+
+        for node in nodes:
+            store.add_node(node)
+
+        store.add_edge("node_3", "node_2")
+        store.add_edge("node_2", "node_1")
+        store.add_edge("node_4", "node_3")
+
+        self.assertEqual(list(reversed(nodes)), store.list)