diff options
-rw-r--r-- | synapse/storage/chunk_ordered_table.py | 474 | ||||
-rw-r--r-- | synapse/storage/events.py | 2 | ||||
-rw-r--r-- | synapse/storage/schema/delta/49/event_chunks.py | 47 | ||||
-rw-r--r-- | synapse/storage/stream.py | 2 | ||||
-rw-r--r-- | tests/storage/test_chunk_linearizer_table.py | 153 | ||||
-rw-r--r-- | tests/util/test_katriel_bodlaender.py | 26 |
6 files changed, 568 insertions, 136 deletions
diff --git a/synapse/storage/chunk_ordered_table.py b/synapse/storage/chunk_ordered_table.py index 79d0ca44ec..5e552ef138 100644 --- a/synapse/storage/chunk_ordered_table.py +++ b/synapse/storage/chunk_ordered_table.py @@ -16,9 +16,11 @@ import math import logging +from fractions import Fraction + from synapse.storage._base import SQLBaseStore +from synapse.storage.engines import PostgresEngine from synapse.util.katriel_bodlaender import OrderedListStore -from synapse.util.metrics import Measure import synapse.metrics @@ -59,11 +61,12 @@ class ChunkDBOrderedListStore(OrderedListStore): re-instantiated in each transaction, so all state needs to be stored in the database. - Internally the ordering is implemented using floats, and the average is - taken when a node is inserted between other nodes. To avoid precision - errors a minimum difference between sucessive orderings is attempted to be - kept; whenever the difference is too small we attempt to rebalance. See - the `_rebalance` function for implementation details. + Internally the ordering is implemented using a linked list and assigning + each chunk a fraction. `get_next` and `get_prev` are implemented via linked + lists, and comparisons implemented using the fractions. When inserting + chunks fractions are picked such that their denominator is the smallest + possible. However, if the denominators grow too big then a rebalancing has + to take place to reduce the denominators; see `_rebalance` for details. Note that OrderedListStore orders nodes such that source of an edge comes before the target. This is counter intuitive when edges represent @@ -80,23 +83,24 @@ class ChunkDBOrderedListStore(OrderedListStore): txn room_id (str) clock - rebalance_digits (int): When a rebalance is triggered we rebalance - in a range around the node, where the bounds are rounded to this - number of digits. - min_difference (int): A rebalance is triggered when the difference - between two successive orderings is less than the reciprocal of - this. + database_engine + rebalance_max_denominator (int): When a rebalance is triggered we + replace existing orders with those that have a denominator smaller + or equal to this + max_denominator (int): A rebalance is triggered when a node has an + ordering with a denominator greater than this """ def __init__(self, - txn, room_id, clock, - rebalance_digits=3, - min_difference=1000000): + txn, room_id, clock, database_engine, + rebalance_max_denominator=100, + max_denominator=100000): self.txn = txn self.room_id = room_id self.clock = clock + self.database_engine = database_engine - self.rebalance_digits = rebalance_digits - self.min_difference = 1. / min_difference + self.rebalance_md = rebalance_max_denominator + self.max_denominator = max_denominator def is_before(self, a, b): """Implements OrderedListStore""" @@ -104,16 +108,13 @@ class ChunkDBOrderedListStore(OrderedListStore): def get_prev(self, node_id): """Implements OrderedListStore""" - order = self._get_order(node_id) sql = """ SELECT chunk_id FROM chunk_linearized - WHERE ordering < ? AND room_id = ? - ORDER BY ordering DESC - LIMIT 1 + WHERE next_chunk_id = ? """ - self.txn.execute(sql, (order, self.room_id,)) + self.txn.execute(sql, (node_id,)) row = self.txn.fetchone() if row: @@ -122,16 +123,13 @@ class ChunkDBOrderedListStore(OrderedListStore): def get_next(self, node_id): """Implements OrderedListStore""" - order = self._get_order(node_id) sql = """ - SELECT chunk_id FROM chunk_linearized - WHERE ordering > ? AND room_id = ? - ORDER BY ordering ASC - LIMIT 1 + SELECT next_chunk_id FROM chunk_linearized + WHERE chunk_id = ? """ - self.txn.execute(sql, (order, self.room_id,)) + self.txn.execute(sql, (node_id,)) row = self.txn.fetchone() if row: @@ -144,27 +142,26 @@ class ChunkDBOrderedListStore(OrderedListStore): rebalance = False # Set to true if we need to trigger a rebalance if target_id: - target_order = self._get_order(target_id) before_id = self.get_prev(target_id) - if before_id: - before_order = self._get_order(before_id) - new_order = (target_order + before_order) / 2. - - rebalance = math.fabs(target_order - before_order) < self.min_difference + new_order = self._insert_between(node_id, before_id, target_id) else: - new_order = math.floor(target_order) - 1 + new_order = self._insert_at_start(node_id, target_id) else: # If target_id is None then we insert at the end. self.txn.execute(""" - SELECT COALESCE(MAX(ordering), 0) + 1 + SELECT chunk_id FROM chunk_linearized - WHERE room_id = ? + WHERE room_id = ? AND next_chunk_id is NULL """, (self.room_id,)) - new_order, = self.txn.fetchone() + row = self.txn.fetchone() + if row: + new_order = self._insert_at_end(node_id, row[0]) + else: + new_order = self._insert_first(node_id) - self._insert(node_id, new_order) + rebalance = new_order.denominator > self.max_denominator if rebalance: self._rebalance(node_id) @@ -174,64 +171,204 @@ class ChunkDBOrderedListStore(OrderedListStore): rebalance = False # Set to true if we need to trigger a rebalance + next_chunk_id = None if target_id: - target_order = self._get_order(target_id) - after_id = self.get_next(target_id) - if after_id: - after_order = self._get_order(after_id) - new_order = (target_order + after_order) / 2. - - rebalance = math.fabs(target_order - after_order) < self.min_difference + next_chunk_id = self.get_next(target_id) + if next_chunk_id: + new_order = self._insert_between(node_id, target_id, next_chunk_id) else: - new_order = math.ceil(target_order) + 1 + new_order = self._insert_at_end(node_id, target_id) else: # If target_id is None then we insert at the start. self.txn.execute(""" - SELECT COALESCE(MIN(ordering), 0) - 1 + SELECT chunk_id FROM chunk_linearized + NATURAL JOIN chunk_linearized_first WHERE room_id = ? """, (self.room_id,)) - new_order, = self.txn.fetchone() + row = self.txn.fetchone() + if row: + new_order = self._insert_at_start(node_id, row[0]) + else: + new_order = self._insert_first(node_id) - self._insert(node_id, new_order) + rebalance = new_order.denominator > self.max_denominator if rebalance: self._rebalance(node_id) + def _insert_between(self, node_id, left_id, right_id): + """Inserts node between given existing nodes. + """ + + left_order = self._get_order(left_id) + right_order = self._get_order(right_id) + + assert left_order < right_order + + new_order = get_fraction_in_range(left_order, right_order) + + SQLBaseStore._simple_update_one_txn( + self.txn, + table="chunk_linearized", + keyvalues={"chunk_id": left_id}, + updatevalues={"next_chunk_id": node_id}, + ) + + SQLBaseStore._simple_insert_txn( + self.txn, + table="chunk_linearized", + values={ + "chunk_id": node_id, + "room_id": self.room_id, + "next_chunk_id": right_id, + "numerator": int(new_order.numerator), + "denominator": int(new_order.denominator), + } + ) + + return new_order + + def _insert_at_end(self, node_id, last_id): + """Inserts node at the end using existing last node. + """ + + last_order = self._get_order(last_id) + new_order = Fraction(int(math.ceil(last_order)) + 1, 1) + + SQLBaseStore._simple_update_one_txn( + self.txn, + table="chunk_linearized", + keyvalues={"chunk_id": last_id}, + updatevalues={"next_chunk_id": node_id}, + ) + + SQLBaseStore._simple_insert_txn( + self.txn, + table="chunk_linearized", + values={ + "chunk_id": node_id, + "room_id": self.room_id, + "next_chunk_id": None, + "numerator": int(new_order.numerator), + "denominator": int(new_order.denominator), + } + ) + + return new_order + + def _insert_at_start(self, node_id, first_id): + """Inserts node at the start using existing first node. + """ + + first_order = self._get_order(first_id) + new_order = get_fraction_in_range(0, first_order) + + SQLBaseStore._simple_update_one_txn( + self.txn, + table="chunk_linearized_first", + keyvalues={"room_id": self.room_id}, + updatevalues={"chunk_id": node_id}, + ) + + SQLBaseStore._simple_insert_txn( + self.txn, + table="chunk_linearized", + values={ + "chunk_id": node_id, + "room_id": self.room_id, + "next_chunk_id": first_id, + "numerator": int(new_order.numerator), + "denominator": int(new_order.denominator), + } + ) + + return new_order + + def _insert_first(self, node_id): + """Inserts the first node for this room. + """ + + SQLBaseStore._simple_insert_txn( + self.txn, + table="chunk_linearized_first", + values={ + "room_id": self.room_id, + "chunk_id": node_id, + }, + ) + + SQLBaseStore._simple_insert_txn( + self.txn, + table="chunk_linearized", + values={ + "chunk_id": node_id, + "room_id": self.room_id, + "next_chunk_id": None, + "numerator": 1, + "denominator": 1, + } + ) + + return Fraction(1, 1) + def get_nodes_with_edges_to(self, node_id): """Implements OrderedListStore""" # Note that we use the inverse relation here sql = """ - SELECT l.ordering, l.chunk_id FROM chunk_graph AS g + SELECT l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g INNER JOIN chunk_linearized AS l ON g.prev_id = l.chunk_id WHERE g.chunk_id = ? """ self.txn.execute(sql, (node_id,)) - return self.txn.fetchall() + return [(Fraction(n, d), c) for c, n, d in self.txn] def get_nodes_with_edges_from(self, node_id): """Implements OrderedListStore""" # Note that we use the inverse relation here sql = """ - SELECT l.ordering, l.chunk_id FROM chunk_graph AS g + SELECT l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g INNER JOIN chunk_linearized AS l ON g.chunk_id = l.chunk_id WHERE g.prev_id = ? """ self.txn.execute(sql, (node_id,)) - return self.txn.fetchall() + return [(Fraction(n, d), c) for c, n, d in self.txn] def _delete_ordering(self, node_id): """Implements OrderedListStore""" + next_chunk_id = SQLBaseStore._simple_select_one_onecol_txn( + self.txn, + table="chunk_linearized", + keyvalues={ + "chunk_id": node_id, + }, + retcol="next_chunk_id", + ) + SQLBaseStore._simple_delete_txn( self.txn, table="chunk_linearized", keyvalues={"chunk_id": node_id}, ) + sql = """ + UPDATE chunk_linearized SET next_chunk_id = ? + WHERE next_chunk_id = ? + """ + + self.txn.execute(sql, (next_chunk_id, node_id,)) + + sql = """ + UPDATE chunk_linearized_first SET chunk_id = ? + WHERE chunk_id = ? + """ + + self.txn.execute(sql, (next_chunk_id, node_id,)) + def _add_edge_to_graph(self, source_id, target_id): """Implements OrderedListStore""" @@ -242,78 +379,199 @@ class ChunkDBOrderedListStore(OrderedListStore): values={"chunk_id": target_id, "prev_id": source_id} ) - def _insert(self, node_id, order): - """Inserts the node with the given ordering. - """ - SQLBaseStore._simple_insert_txn( - self.txn, - table="chunk_linearized", - values={ - "chunk_id": node_id, - "room_id": self.room_id, - "ordering": order, - } - ) - def _get_order(self, node_id): """Get the ordering of the given node. """ - return SQLBaseStore._simple_select_one_onecol_txn( + row = SQLBaseStore._simple_select_one_txn( self.txn, table="chunk_linearized", keyvalues={"chunk_id": node_id}, - retcol="ordering" + retcols=("numerator", "denominator",), ) + return Fraction(row["numerator"], row["denominator"]) def _rebalance(self, node_id): """Rebalances the list around the given node to ensure that the - ordering floats don't get too small. + ordering denominators aren't too big. + + This is done by starting at the given chunk and generating new orders + based on a Farey sequence of order `self.rebalance_md` for all + subsequent chunks that have an order less than that of the ordering + generated by the Farey sequence. + + For example say we have chunks (and orders): A (23/90), B (24/91) and + C (2/3), and we have rebalance_md set to 5, a rebalancing would produce: + + A: 23/90 -> 1/3 + B: 24/91 -> 2/5 + C: 2/3 (no change) - This works by finding a range that includes the given node, and - recalculating the ordering floats such that they're equidistant in - that range. + Since the farey sequence is 1/5, 1/4, 1/3, 2/5, 1/2, ... and 1/3 is the + smallest term greater than 23/90. + + Note that we've extended Farey Sequence to be infinite by repeating the + sequence with an added integer. For example sequence with order 3: + + 0/1, 1/3, 2/3, 1/1, 4/3, 5/3, 2/1, 7/3, ... """ logger.info("Rebalancing room %s, chunk %s", self.room_id, node_id) - with Measure(self.clock, "chunk_rebalance"): - # We pick the interval to try and minimise the number of decimal - # places, i.e. we round to nearest float with `rebalance_digits` and - # use that as one side of the interval - order = self._get_order(node_id) - a = round(order, self.rebalance_digits) - min_order = a - 10 ** -self.rebalance_digits - max_order = a + 10 ** -self.rebalance_digits - - # Now we get all the nodes in the range. We add the minimum difference - # to the bounds to ensure that we don't accidentally move a node to be - # within the minimum difference of a node outside the range. - sql = """ - SELECT chunk_id FROM chunk_linearized - WHERE ordering >= ? AND ordering <= ? AND room_id = ? - """ - self.txn.execute(sql, ( - min_order - self.min_difference, - max_order + self.min_difference, - self.room_id, - )) - - chunk_ids = [c for c, in self.txn] + old_order = self._get_order(node_id) + + a, b, c, d = find_farey_terms(old_order, self.rebalance_md) + assert old_order < Fraction(a, b) + assert b + d > self.rebalance_md + + # Since we can easily produce farey sequence terms with an iterative + # algorithm, we can use WITH RECURSIVE to do so. This is less clear + # than doing it in python, but saves us being killed by the RTT to the + # DB if we need to rebalance a large number of nodes. + with_sql = """ + WITH RECURSIVE chunks (chunk_id, next, n, a, b, c, d) AS ( + SELECT chunk_id, next_chunk_id, ?, ?, ?, ?, ? + FROM chunk_linearized WHERE chunk_id = ? + UNION ALL + SELECT n.chunk_id, n.next_chunk_id, n, + c, d, ((n + b) / d) * c - a, ((n + b) / d) * d - b + FROM chunks AS c + INNER JOIN chunk_linearized AS l ON l.chunk_id = c.chunk_id + INNER JOIN chunk_linearized AS n ON n.chunk_id = l.next_chunk_id + WHERE c * 1.0 / d > n.numerator * 1.0 / n.denominator + ) + """ - sql = """ + # Annoyingly, postgres 9.4 doesn't support the standard SQL subquery + # syntax for updates. + if isinstance(self.database_engine, PostgresEngine): + sql = with_sql + """ + UPDATE chunk_linearized AS l + SET numerator = a, denominator = b + FROM chunks AS c + WHERE c.chunk_id = l.chunk_id + """ + else: + sql = with_sql + """ UPDATE chunk_linearized - SET ordering = ? - WHERE chunk_id = ? + SET (numerator, denominator) = ( + SELECT a, b FROM chunks + WHERE chunks.chunk_id = chunk_linearized.chunk_id + ) + WHERE chunk_id in (SELECT chunk_id FROM chunks) """ - step = (max_order - min_order) / len(chunk_ids) - self.txn.executemany( - sql, - ( - ((idx * step + min_order), chunk_id) - for idx, chunk_id in enumerate(chunk_ids) - ) - ) + self.txn.execute(sql, ( + self.rebalance_md, a, b, c, d, node_id + )) + + logger.info("Rebalanced %d chunks in room %s", self.txn.rowcount, self.room_id) + + rebalance_counter.inc() + + +def get_fraction_in_range(min_frac, max_frac): + """Gets a fraction in between the given numbers. + + Uses Stern-Brocot tree to generate the fraction with the smallest + denominator. + + See https://en.wikipedia.org/wiki/Stern%E2%80%93Brocot_tree + + Args: + min_frac (numbers.Rational) + max_frac (numbers.Rational) + + Returns: + numbers.Rational + """ + + assert 0 <= min_frac < max_frac + + # If the determinant is 1 then the fraction with smallest numerator and + # denominator in the range is the mediant, so we don't have to use the + # stern brocot tree to search for it. + determinant = ( + min_frac.denominator * max_frac.numerator + - min_frac.numerator * max_frac.denominator + ) + + if determinant == 1: + return Fraction( + min_frac.numerator + max_frac.numerator, + min_frac.denominator + max_frac.denominator, + ) + + # This works by tracking two fractions a/b and c/d and repeatedly replacing + # one of them with their mediant, depending on if the mediant is smaller + # or greater than the specified range. + a, b, c, d = 0, 1, 1, 0 + + while True: + f = Fraction(a + c, b + d) + + if f <= min_frac: + a, b, c, d = a + c, b + d, c, d + elif min_frac < f < max_frac: + return f + else: + a, b, c, d = a, b, a + c, b + d + + +def find_farey_terms(min_frac, max_denom): + """Find the smallest pair of fractions that are part of the Farey sequence + of order `max_denom` (the ordered sequence of all fraction with denominator + less than or equal to max_denom). + + This is useful as it can be fed into a simple iterative algorithm to + generate subsequent entries in the sequence. + + A pair of fractions a/b, c/d are neighbours in the sequence of order + max(b, d) if and only if their determinant is one, i.e. bc - ad = 1. Note + that the next order sequence is generate by taking the mediants of the + previous order, so a/b and c/d are neighbours in all sequences with orders + between max(b, d) and b + d. + + We can therefore use the Stern-Brocot tree to find the closest pair of + fractions to min_frac such that b + d is strictly greater than max_denom, + since all neighbouring fractions in Stern-Brocot satisfy the necessary + determinant property. + + Note that we've extended Farey Sequence to be infinite by repeating the + sequence with an added integer. For example sequence with order 3: + + 0/1, 1/3, 2/3, 1/1, 4/3, 5/3, 2/1, 7/3, ... + + See https://en.wikipedia.org/wiki/Farey_sequence + + Args: + min_frac (numbers.Rational) + max_frac (int) + + Returns: + tuple[int, int, int, int] + """ + + a, b, c, d = 0, 1, 1, 0 + + while True: + cur_frac = Fraction(a + c, b + d) + + if b + d > max_denom: + break + + if cur_frac <= min_frac: + a, b, c, d = a + c, b + d, c, d + elif min_frac < cur_frac: + a, b, c, d = a, b, a + c, b + d + + # a/b may be smaller than min_frac, so we run the algorithm to generate + # next Farey sequence terms until a/b is strictly greater than min_frac + while Fraction(a, b) <= min_frac: + k = int((max_denom + b) / d) + a, b, c, d = c, d, k * c - a, k * d - b + + assert min_frac < Fraction(a, b) < Fraction(c, d) + assert b * c - a * d == 1 - rebalance_counter.inc() + return a, b, c, d diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 873bc717bb..e76af4ec69 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1446,7 +1446,7 @@ class EventsStore(EventsWorkerStore): sibling_events.update(pes) table = ChunkDBOrderedListStore( - txn, room_id, self.clock, + txn, room_id, self.clock, self.database_engine, ) # If there is only one previous chunk (and that isn't None), then this diff --git a/synapse/storage/schema/delta/49/event_chunks.py b/synapse/storage/schema/delta/49/event_chunks.py index 7d8d711600..50040a779c 100644 --- a/synapse/storage/schema/delta/49/event_chunks.py +++ b/synapse/storage/schema/delta/49/event_chunks.py @@ -53,11 +53,23 @@ CREATE INDEX chunk_backwards_extremities_event_id ON chunk_backwards_extremities CREATE TABLE chunk_linearized ( chunk_id BIGINT NOT NULL, room_id TEXT NOT NULL, - ordering DOUBLE PRECISION NOT NULL + next_chunk_id BIGINT, -- The chunk directly after this chunk, or NULL if last chunk + numerator BIGINT NOT NULL, + denominator BIGINT NOT NULL ); CREATE UNIQUE INDEX chunk_linearized_id ON chunk_linearized (chunk_id); -CREATE INDEX chunk_linearized_ordering ON chunk_linearized (room_id, ordering); +CREATE UNIQUE INDEX chunk_linearized_next_id ON chunk_linearized ( + next_chunk_id, room_id +); + +-- Records the first chunk in a room. +CREATE TABLE chunk_linearized_first ( + chunk_id BIGINT NOT NULL, + room_id TEXT NOT NULL +); + +CREATE UNIQUE INDEX chunk_linearized_first_id ON chunk_linearized_first (room_id); INSERT into background_updates (update_name, progress_json) VALUES ('event_fields_chunk_id', '{}'); @@ -69,10 +81,6 @@ def run_create(cur, database_engine, *args, **kwargs): for statement in get_statements(SQL.splitlines()): cur.execute(statement) - # We now go through and assign chunk IDs for all forward extremities. - # Note that we know that extremities can't reference each other, so we - # can simply assign each event a new chunk ID with an arbitrary order. - txn = LoggingTransaction( cur, "schema_update", database_engine, [], [], ) @@ -86,6 +94,7 @@ def run_create(cur, database_engine, *args, **kwargs): next_chunk_id = 1 room_to_next_order = {} + prev_chunks_by_room = {} for row in rows: chunk_id = next_chunk_id @@ -101,19 +110,41 @@ def run_create(cur, database_engine, *args, **kwargs): updatevalues={"chunk_id": chunk_id}, ) - ordering = room_to_next_order.get(room_id, 0) + ordering = room_to_next_order.get(room_id, 1) room_to_next_order[room_id] = ordering + 1 + prev_chunks = prev_chunks_by_room.setdefault(room_id, []) + SQLBaseStore._simple_insert_txn( txn, table="chunk_linearized", values={ "chunk_id": chunk_id, "room_id": row["room_id"], - "ordering": 0, + "numerator": ordering, + "denominator": 1, }, ) + if prev_chunks: + SQLBaseStore._simple_update_one_txn( + txn, + table="chunk_linearized", + keyvalues={"chunk_id": prev_chunks[-1]}, + updatevalues={"next_chunk_id": chunk_id}, + ) + else: + SQLBaseStore._simple_insert_txn( + txn, + table="chunk_linearized_first", + values={ + "chunk_id": chunk_id, + "room_id": row["room_id"], + }, + ) + + prev_chunks.append(chunk_id) + def run_upgrade(*args, **kwargs): pass diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 0d32a3a498..c5b52a3d60 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -792,7 +792,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): iterated_chunks = [chunk_id] table = ChunkDBOrderedListStore( - txn, room_id, self.clock, + txn, room_id, self.clock, self.database_engine, ) if filter_clause: diff --git a/tests/storage/test_chunk_linearizer_table.py b/tests/storage/test_chunk_linearizer_table.py index beb1ac9a42..ce2883865a 100644 --- a/tests/storage/test_chunk_linearizer_table.py +++ b/tests/storage/test_chunk_linearizer_table.py @@ -15,11 +15,16 @@ from twisted.internet import defer +import itertools import random import tests.unittest import tests.utils -from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore +from fractions import Fraction + +from synapse.storage.chunk_ordered_table import ( + ChunkDBOrderedListStore, find_farey_terms, get_fraction_in_range, +) class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): @@ -42,23 +47,26 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): def test_txn(txn): table = ChunkDBOrderedListStore( - txn, room_id, self.clock, 1, 100, + txn, room_id, self.clock, + self.store.database_engine, + 5, 100, ) table.add_node("A") table._insert_after("B", "A") table._insert_before("C", "A") + table._insert_after("D", "A") sql = """ - SELECT chunk_id FROM chunk_linearized + SELECT chunk_id, numerator, denominator FROM chunk_linearized WHERE room_id = ? - ORDER BY ordering ASC """ txn.execute(sql, (room_id,)) - ordered = [r for r, in txn] + ordered = sorted([(Fraction(n, d), r) for r, n, d in txn]) + ordered = [c for _, c in ordered] - self.assertEqual(["C", "A", "B"], ordered) + self.assertEqual(["C", "A", "D", "B"], ordered) yield self.store.runInteraction("test", test_txn) @@ -68,7 +76,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): def test_txn(txn): table = ChunkDBOrderedListStore( - txn, room_id, self.clock, 1, 20, + txn, room_id, self.clock, + self.store.database_engine, + 5, 100, ) nodes = [(i, "node_%d" % (i,)) for i in xrange(1, 1000)] @@ -95,13 +105,13 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): already_inserted.sort() sql = """ - SELECT chunk_id FROM chunk_linearized + SELECT chunk_id, numerator, denominator FROM chunk_linearized WHERE room_id = ? - ORDER BY ordering ASC """ txn.execute(sql, (room_id,)) - ordered = [r for r, in txn] + ordered = sorted([(Fraction(n, d), r) for r, n, d in txn]) + ordered = [c for _, c in ordered] self.assertEqual(expected, ordered) @@ -113,7 +123,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): def test_txn(txn): table = ChunkDBOrderedListStore( - txn, room_id, self.clock, 1, 20, + txn, room_id, self.clock, + self.store.database_engine, + 5, 1000, ) table.add_node("a") @@ -131,13 +143,13 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): expected.append(node_id) sql = """ - SELECT chunk_id FROM chunk_linearized + SELECT chunk_id, numerator, denominator FROM chunk_linearized WHERE room_id = ? - ORDER BY ordering ASC """ txn.execute(sql, (room_id,)) - ordered = [r for r, in txn] + ordered = sorted([(Fraction(n, d), r) for r, n, d in txn]) + ordered = [c for _, c in ordered] self.assertEqual(expected, ordered) @@ -149,7 +161,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): def test_txn(txn): table = ChunkDBOrderedListStore( - txn, room_id, self.clock, 1, 100, + txn, room_id, self.clock, + self.store.database_engine, + 5, 100, ) table.add_node("a") @@ -170,16 +184,119 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): prev_node = node_id sql = """ - SELECT chunk_id FROM chunk_linearized + SELECT chunk_id, numerator, denominator FROM chunk_linearized WHERE room_id = ? - ORDER BY ordering ASC """ txn.execute(sql, (room_id,)) - ordered = [r for r, in txn] + ordered = sorted([(Fraction(n, d), r) for r, n, d in txn]) + ordered = [c for _, c in ordered] expected = expected_prefix + list(reversed(expected_suffix)) self.assertEqual(expected, ordered) yield self.store.runInteraction("test", test_txn) + + @defer.inlineCallbacks + def test_get_edges_to(self): + room_id = "foo_room4" + + def test_txn(txn): + table = ChunkDBOrderedListStore( + txn, room_id, self.clock, + self.store.database_engine, + 5, 100, + ) + + table.add_node("A") + table._insert_after("B", "A") + table._add_edge_to_graph("A", "B") + table._insert_before("C", "A") + table._add_edge_to_graph("C", "A") + + nodes = table.get_nodes_with_edges_from("A") + self.assertEqual([n for _, n in nodes], ["B"]) + + nodes = table.get_nodes_with_edges_to("A") + self.assertEqual([n for _, n in nodes], ["C"]) + + yield self.store.runInteraction("test", test_txn) + + @defer.inlineCallbacks + def test_get_next_and_prev(self): + room_id = "foo_room5" + + def test_txn(txn): + table = ChunkDBOrderedListStore( + txn, room_id, self.clock, + self.store.database_engine, + 5, 100, + ) + + table.add_node("A") + table._insert_after("B", "A") + table._insert_before("C", "A") + + self.assertEqual(table.get_next("A"), "B") + self.assertEqual(table.get_prev("A"), "C") + + yield self.store.runInteraction("test", test_txn) + + def test_find_farey_terms(self): + def _test(min_frac, max_denom): + """"Calls `find_farey_terms` with given values and checks they + are neighbours in the Farey Sequence. + """ + + a, b, c, d = find_farey_terms(min_frac, max_denom) + + p = Fraction(a, b) + q = Fraction(c, d) + + assert min_frac < p < q + + for x, y in _pairwise(_farey_generator(max_denom)): + if min_frac < x < y: + self.assertEqual(x, p) + self.assertEqual(y, q) + break + + _test(Fraction(5, 3), 12) + _test(Fraction(1, 3), 12) + _test(Fraction(1, 2), 9) + _test(Fraction(1, 2), 10) + _test(Fraction(1, 2), 15) + + def test_get_fraction_in_range(self): + def _test(x, y): + assert x < get_fraction_in_range(x, y) < y + + _test(Fraction(1, 2), Fraction(2, 3)) + _test(Fraction(1, 2), Fraction(3, 2)) + _test(Fraction(5, 203), Fraction(6, 204)) + + +def _farey_generator(n): + """Generates Farey sequence of order `n`. + + Note that this doesn't terminate. + + Taken from https://en.wikipedia.org/wiki/Farey_sequence#Next_term + """ + + a, b, c, d = 0, 1, 1, n + + yield Fraction(a, b) + + while True: + k = int((n + b) / d) + a, b, c, d = c, d, (k * c - a), (k * d - b) + yield Fraction(a, b) + + +def _pairwise(iterable): + "s -> (s0,s1), (s1,s2), (s2, s3), ..." + a, b = itertools.tee(iterable) + next(b, None) + return itertools.izip(a, b) diff --git a/tests/util/test_katriel_bodlaender.py b/tests/util/test_katriel_bodlaender.py index 5768408604..72126bdea9 100644 --- a/tests/util/test_katriel_bodlaender.py +++ b/tests/util/test_katriel_bodlaender.py @@ -56,3 +56,29 @@ class KatrielBodlaenderTests(unittest.TestCase): store.add_edge("node_4", "node_3") self.assertEqual(list(reversed(nodes)), store.list) + + def test_divergent_graph(self): + store = InMemoryOrderedListStore() + + nodes = [ + "node_1", + "node_2", + "node_3", + "node_4", + "node_5", + "node_6", + ] + + for node in reversed(nodes): + store.add_node(node) + + store.add_edge("node_2", "node_3") + store.add_edge("node_2", "node_5") + store.add_edge("node_1", "node_2") + store.add_edge("node_3", "node_4") + store.add_edge("node_1", "node_3") + store.add_edge("node_4", "node_5") + store.add_edge("node_5", "node_6") + store.add_edge("node_4", "node_6") + + self.assertEqual(nodes, store.list) |