diff options
author | Erik Johnston <erik@matrix.org> | 2018-05-31 18:58:30 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2018-05-31 18:58:30 +0100 |
commit | 1e666c7b72d241460bd3989ec4b258cff986603c (patch) | |
tree | 119fa9100e5bf6bae7764223dc34d0c7d143bf79 | |
parent | schema (diff) | |
download | synapse-1e666c7b72d241460bd3989ec4b258cff986603c.tar.xz |
Use fractions
-rw-r--r-- | synapse/storage/chunk_ordered_table.py | 401 | ||||
-rw-r--r-- | synapse/storage/schema/delta/49/event_chunks.py | 34 | ||||
-rw-r--r-- | tests/storage/test_chunk_linearizer_table.py | 38 |
3 files changed, 332 insertions, 141 deletions
diff --git a/synapse/storage/chunk_ordered_table.py b/synapse/storage/chunk_ordered_table.py index c5c339df68..b303fb639a 100644 --- a/synapse/storage/chunk_ordered_table.py +++ b/synapse/storage/chunk_ordered_table.py @@ -16,9 +16,12 @@ import math import logging +from collections import deque +from gmpy2 import mpq as Fraction +from fractions import Fraction as FractionPy + from synapse.storage._base import SQLBaseStore from synapse.util.katriel_bodlaender import OrderedListStore -from synapse.util.metrics import Measure import synapse.metrics @@ -89,14 +92,14 @@ class ChunkDBOrderedListStore(OrderedListStore): """ def __init__(self, txn, room_id, clock, - rebalance_digits=2, - min_difference=1000): + rebalance_max_denominator=100, + max_denominator=10000): self.txn = txn self.room_id = room_id self.clock = clock - self.rebalance_digits = rebalance_digits - self.min_difference = 1. / min_difference + self.rebalance_md = rebalance_max_denominator + self.max_denominator = max_denominator def is_before(self, a, b): """Implements OrderedListStore""" @@ -104,16 +107,13 @@ class ChunkDBOrderedListStore(OrderedListStore): def get_prev(self, node_id): """Implements OrderedListStore""" - order = self._get_order(node_id) sql = """ SELECT chunk_id FROM chunk_linearized - WHERE ordering < ? AND room_id = ? - ORDER BY ordering DESC - LIMIT 1 + WHERE next_chunk_id = ? """ - self.txn.execute(sql, (order, self.room_id,)) + self.txn.execute(sql, (node_id,)) row = self.txn.fetchone() if row: @@ -122,16 +122,13 @@ class ChunkDBOrderedListStore(OrderedListStore): def get_next(self, node_id): """Implements OrderedListStore""" - order = self._get_order(node_id) sql = """ - SELECT chunk_id FROM chunk_linearized - WHERE ordering > ? AND room_id = ? - ORDER BY ordering ASC - LIMIT 1 + SELECT next_chunk_id FROM chunk_linearized + WHERE chunk_id = ? """ - self.txn.execute(sql, (order, self.room_id,)) + self.txn.execute(sql, (node_id,)) row = self.txn.fetchone() if row: @@ -144,27 +141,26 @@ class ChunkDBOrderedListStore(OrderedListStore): rebalance = False # Set to true if we need to trigger a rebalance if target_id: - target_order = self._get_order(target_id) before_id = self.get_prev(target_id) - if before_id: - before_order = self._get_order(before_id) - new_order = (target_order + before_order) / 2. - - rebalance = math.fabs(target_order - before_order) < self.min_difference + new_order = self._insert_between(node_id, before_id, target_id) else: - new_order = math.floor(target_order) - 1 + new_order = self._insert_at_start(node_id, target_id) else: # If target_id is None then we insert at the end. self.txn.execute(""" - SELECT COALESCE(MAX(ordering), 0) + 1 + SELECT chunk_id FROM chunk_linearized - WHERE room_id = ? + WHERE room_id = ? AND next_chunk_id is NULL """, (self.room_id,)) - new_order, = self.txn.fetchone() + row = self.txn.fetchone() + if row: + new_order = self._insert_at_start(node_id, row[0]) + else: + new_order = self._insert_first(node_id) - self._insert(node_id, new_order) + rebalance = new_order.denominator > self.max_denominator if rebalance: self._rebalance(node_id) @@ -174,58 +170,185 @@ class ChunkDBOrderedListStore(OrderedListStore): rebalance = False # Set to true if we need to trigger a rebalance + next_chunk_id = None if target_id: - target_order = self._get_order(target_id) - after_id = self.get_next(target_id) - if after_id: - after_order = self._get_order(after_id) - new_order = (target_order + after_order) / 2. - - rebalance = math.fabs(target_order - after_order) < self.min_difference + next_chunk_id = self.get_next(target_id) + if next_chunk_id: + new_order = self._insert_between(node_id, target_id, next_chunk_id) else: - new_order = math.ceil(target_order) + 1 + new_order = self._insert_at_end(node_id, target_id) else: # If target_id is None then we insert at the start. self.txn.execute(""" - SELECT COALESCE(MIN(ordering), 0) - 1 + SELECT chunk_id FROM chunk_linearized + NATURAL JOIN chunk_linearized_first WHERE room_id = ? """, (self.room_id,)) - new_order, = self.txn.fetchone() + row = self.txn.fetchone() + if row: + self._insert_at_start(node_id, row[0]) + else: + new_order = self._insert_first(node_id) - self._insert(node_id, new_order) + rebalance = new_order.denominator > self.max_denominator if rebalance: self._rebalance(node_id) + def _insert_between(self, node_id, left_id, right_id): + left_order = self._get_order(left_id) + right_order = self._get_order(right_id) + + assert left_order < right_order + + new_order = stern_brocot_single(left_order, right_order) + + SQLBaseStore._simple_update_one_txn( + self.txn, + table="chunk_linearized", + keyvalues={"chunk_id": left_id}, + updatevalues={"next_chunk_id": node_id}, + ) + + SQLBaseStore._simple_insert_txn( + self.txn, + table="chunk_linearized", + values={ + "chunk_id": node_id, + "room_id": self.room_id, + "next_chunk_id": right_id, + "numerator": int(new_order.numerator), + "denominator": int(new_order.denominator), + } + ) + + return new_order + + def _insert_at_end(self, node_id, last_id): + last_order = self._get_order(last_id) + new_order = Fraction(int(math.ceil(last_order)) + 1, 1) + + SQLBaseStore._simple_update_one_txn( + self.txn, + table="chunk_linearized", + keyvalues={"chunk_id": last_id}, + updatevalues={"next_chunk_id": node_id}, + ) + + SQLBaseStore._simple_insert_txn( + self.txn, + table="chunk_linearized", + values={ + "chunk_id": node_id, + "room_id": self.room_id, + "next_chunk_id": None, + "numerator": int(new_order.numerator), + "denominator": int(new_order.denominator), + } + ) + + return new_order + + def _insert_at_start(self, node_id, first_id): + first_order = self._get_order(first_id) + new_order = stern_brocot_single(0, first_order) + + SQLBaseStore._simple_update_one_txn( + self.txn, + table="chunk_linearized_first", + keyvalues={"room_id": self.room_id}, + updatevalues={"chunk_id": node_id}, + ) + + SQLBaseStore._simple_insert_txn( + self.txn, + table="chunk_linearized", + values={ + "chunk_id": node_id, + "room_id": self.room_id, + "next_chunk_id": first_id, + "numerator": int(new_order.numerator), + "denominator": int(new_order.denominator), + } + ) + + return new_order + + def _insert_first(self, node_id): + SQLBaseStore._simple_insert_txn( + self.txn, + table="chunk_linearized_first", + values={ + "room_id": self.room_id, + "chunk_id": node_id, + }, + ) + + SQLBaseStore._simple_insert_txn( + self.txn, + table="chunk_linearized", + values={ + "chunk_id": node_id, + "room_id": self.room_id, + "next_chunk_id": None, + "numerator": 1, + "denominator": 1, + } + ) + + return Fraction(1, 1) + def get_nodes_with_edges_to(self, node_id): """Implements OrderedListStore""" # Note that we use the inverse relation here sql = """ - SELECT l.ordering, l.chunk_id FROM chunk_graph AS g + SELECT l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g INNER JOIN chunk_linearized AS l ON g.prev_id = l.chunk_id WHERE g.chunk_id = ? """ self.txn.execute(sql, (node_id,)) - return self.txn.fetchall() + return [(Fraction(n, d), c) for c, n, d in self.txn] def get_nodes_with_edges_from(self, node_id): """Implements OrderedListStore""" # Note that we use the inverse relation here sql = """ - SELECT l.ordering, l.chunk_id FROM chunk_graph AS g + SELECT l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g INNER JOIN chunk_linearized AS l ON g.chunk_id = l.chunk_id WHERE g.prev_id = ? """ self.txn.execute(sql, (node_id,)) - return self.txn.fetchall() + return [(Fraction(n, d), c) for c, n, d in self.txn] def _delete_ordering(self, node_id): """Implements OrderedListStore""" + sql = """ + UPDATE chunk_linearized SET next_chunk_id = ( + SELECT next_chunk_id + FROM chunk_linearized + WHERE chunk_id = ? + ) + WHERE next_chunk_id = ? + """ + + self.txn.execute(sql, (node_id, node_id,)) + + sql = """ + UPDATE chunk_linearized_first SET next_chunk_id = ( + SELECT next_chunk_id + FROM chunk_linearized + WHERE chunk_id = ? + ) + WHERE chunk_id = ? + """ + + self.txn.execute(sql, (node_id, node_id,)) + SQLBaseStore._simple_delete_txn( self.txn, table="chunk_linearized", @@ -242,29 +365,17 @@ class ChunkDBOrderedListStore(OrderedListStore): values={"chunk_id": target_id, "prev_id": source_id} ) - def _insert(self, node_id, order): - """Inserts the node with the given ordering. - """ - SQLBaseStore._simple_insert_txn( - self.txn, - table="chunk_linearized", - values={ - "chunk_id": node_id, - "room_id": self.room_id, - "ordering": order, - } - ) - def _get_order(self, node_id): """Get the ordering of the given node. """ - return SQLBaseStore._simple_select_one_onecol_txn( + row = SQLBaseStore._simple_select_one_txn( self.txn, table="chunk_linearized", keyvalues={"chunk_id": node_id}, - retcol="ordering" + retcols=("numerator", "denominator",), ) + return Fraction(row["numerator"], row["denominator"]) def _rebalance(self, node_id): """Rebalances the list around the given node to ensure that the @@ -277,65 +388,121 @@ class ChunkDBOrderedListStore(OrderedListStore): logger.info("Rebalancing room %s, chunk %s", self.room_id, node_id) - with Measure(self.clock, "chunk_rebalance"): - # We pick the interval to try and minimise the number of decimal - # places, i.e. we round to nearest float with `rebalance_digits` and - # use that as one side of the interval - - order = self._get_order(node_id) - a = round(order, self.rebalance_digits) - diff = 10 ** - self.rebalance_digits - - while True: - min_order = a - diff - max_order = a + diff - - sql = """ - SELECT count(chunk_id) FROM chunk_linearized - WHERE ordering >= ? AND ordering <= ? AND room_id = ? - """ - self.txn.execute(sql, ( - min_order - self.min_difference, - max_order + self.min_difference, - self.room_id, - )) - - cnt, = self.txn.fetchone() - step = (max_order - min_order) / cnt - if step > 1 / self.min_difference: - break - - diff *= 2 - - # Now we get all the nodes in the range. We add the minimum difference - # to the bounds to ensure that we don't accidentally move a node to be - # within the minimum difference of a node outside the range. - sql = """ - SELECT chunk_id FROM chunk_linearized - WHERE ordering >= ? AND ordering <= ? AND room_id = ? - ORDER BY ordering ASC - """ - self.txn.execute(sql, ( - min_order - self.min_difference, - max_order + self.min_difference, - self.room_id, - )) - - chunk_ids = [c for c, in self.txn] - - sql = """ - UPDATE chunk_linearized - SET ordering = ? - WHERE chunk_id = ? - """ - - step = (max_order - min_order) / len(chunk_ids) - self.txn.executemany( - sql, - ( - ((idx * step + min_order), chunk_id) - for idx, chunk_id in enumerate(chunk_ids) - ) + old_order = self._get_order(node_id) + new_order = FractionPy( + int(old_order.numerator), + int(old_order.denominator), + ).limit_denominator( + self.rebalance_md, + ) + new_order = Fraction(new_order.numerator, new_order.denominator) + if new_order < old_order: + new_order += Fraction(1, new_order.denominator) + + count_nodes = [node_id] + next_id = node_id + while True: + next_id = self.get_next(next_id) + + if not next_id: + max_order = None + break + + count_nodes.append(next_id) + + max_order = self._get_order(next_id) + + if len(count_nodes) < self.rebalance_md * (max_order - new_order): + break + + if len(count_nodes) == 1: + orders = [new_order] + if max_order: + orders = stern_brocot_range(len(count_nodes), new_order, max_order) + orders.sort(reverse=True) + else: + orders = [ + Fraction(int(math.ceil(new_order)) + i, 1) + for i in xrange(0, len(count_nodes)) + ] + orders.reverse() + + assert len(count_nodes) == len(orders) + + next_id = node_id + prev_order = old_order + while orders: + order = orders.pop() + + if max_order: + assert old_order <= new_order <= max_order + else: + assert old_order <= new_order + + assert prev_order < order + + SQLBaseStore._simple_update_txn( + self.txn, + table="chunk_linearized", + keyvalues={"chunk_id": next_id}, + updatevalues={ + "numerator": int(order.numerator), + "denominator": int(order.denominator), + }, ) - rebalance_counter.inc() + next_id = self.get_next(next_id) + + rebalance_counter.inc() + + +def stern_brocot_range(n, min_frac, max_frac): + assert 0 < min_frac < max_frac + + states = deque([(0, 1, 1, 0)]) + + result = [] + + while len(result) < n: + a, b, c, d = states.popleft() + + f = (a + c) / float(b + d) + if f < min_frac: + states.append((a + c, b + d, c, d)) + + elif min_frac <= f <= max_frac: + states.append((a, b, a + c, b + d)) + states.append((a + c, b + d, c, d)) + + result.append(Fraction(a + c, b + d)) + else: + states.append((a, b, a + c, b + d)) + + return result + + +def stern_brocot_single(min_frac, max_frac): + assert 0 <= min_frac < max_frac + + denom = ( + min_frac.denominator * max_frac.numerator + - min_frac.numerator * max_frac.denominator + ) + + if denom == 1: + return Fraction( + min_frac.numerator + max_frac.numerator, + min_frac.denominator + max_frac.denominator, + ) + + a, b, c, d = 0, 1, 1, 0 + + while True: + f = Fraction(a + c, b + d) + if f <= min_frac: + a, b, c, d = a + c, b + d, c, d + + elif min_frac < f < max_frac: + return f + else: + a, b, c, d = a, b, a + c, b + d diff --git a/synapse/storage/schema/delta/49/event_chunks.py b/synapse/storage/schema/delta/49/event_chunks.py index a4a329dd6f..12c8cd9e3f 100644 --- a/synapse/storage/schema/delta/49/event_chunks.py +++ b/synapse/storage/schema/delta/49/event_chunks.py @@ -53,11 +53,22 @@ CREATE INDEX chunk_backwards_extremities_event_id ON chunk_backwards_extremities CREATE TABLE chunk_linearized ( chunk_id BIGINT NOT NULL, room_id TEXT NOT NULL, - ordering DOUBLE PRECISION NOT NULL + next_chunk_id BIGINT, + numerator BIGINT NOT NULL, + denominator BIGINT NOT NULL ); CREATE UNIQUE INDEX chunk_linearized_id ON chunk_linearized (chunk_id); -CREATE INDEX chunk_linearized_ordering ON chunk_linearized (room_id, ordering); +CREATE UNIQUE INDEX chunk_linearized_next_id ON chunk_linearized ( + next_chunk_id, room_id, +); + +CREATE TABLE chunk_linearized_first ( + chunk_id BIGINT NOT NULL, + room_id TEXT NOT NULL +); + +CREATE UNIQUE INDEX chunk_linearized_first_id ON chunk_linearized_first (room_id); INSERT into background_updates (update_name, progress_json) VALUES ('event_fields_chunk_id', '{}'); @@ -70,8 +81,8 @@ def run_create(cur, database_engine, *args, **kwargs): cur.execute(statement) txn = LoggingTransaction( - cur, "schema_update", database_engine, [], [], - ) + cur, "schema_update", database_engine, [], [], + ) rows = SQLBaseStore._simple_select_list_txn( txn, @@ -97,7 +108,7 @@ def run_create(cur, database_engine, *args, **kwargs): updatevalues={"chunk_id": chunk_id}, ) - ordering = room_to_next_order.get(room_id, 0) + ordering = room_to_next_order.get(room_id, 1) room_to_next_order[room_id] = ordering + 1 SQLBaseStore._simple_insert_txn( @@ -106,10 +117,21 @@ def run_create(cur, database_engine, *args, **kwargs): values={ "chunk_id": chunk_id, "room_id": row["room_id"], - "ordering": 0, + "numerator": ordering, + "denominator": 1, }, ) + if ordering == 1: + SQLBaseStore._simple_insert_txn( + txn, + table="chunk_linearized_first", + values={ + "chunk_id": chunk_id, + "room_id": row["room_id"], + }, + ) + def run_upgrade(*args, **kwargs): pass diff --git a/tests/storage/test_chunk_linearizer_table.py b/tests/storage/test_chunk_linearizer_table.py index 9cac62061b..f02067404d 100644 --- a/tests/storage/test_chunk_linearizer_table.py +++ b/tests/storage/test_chunk_linearizer_table.py @@ -19,6 +19,8 @@ import random import tests.unittest import tests.utils +from gmpy2 import mpq as Fraction + from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore @@ -42,7 +44,7 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): def test_txn(txn): table = ChunkDBOrderedListStore( - txn, room_id, self.clock, 1, 100, + txn, room_id, self.clock, 5, 100, ) table.add_node("A") @@ -51,13 +53,13 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): table._insert_after("D", "A") sql = """ - SELECT chunk_id FROM chunk_linearized + SELECT chunk_id, numerator, denominator FROM chunk_linearized WHERE room_id = ? - ORDER BY ordering ASC """ txn.execute(sql, (room_id,)) - ordered = [r for r, in txn] + ordered = sorted([(Fraction(n, d), r) for r, n, d in txn]) + ordered = [c for _, c in ordered] self.assertEqual(["C", "A", "D", "B"], ordered) @@ -69,7 +71,7 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): def test_txn(txn): table = ChunkDBOrderedListStore( - txn, room_id, self.clock, 1, 20, + txn, room_id, self.clock, 5, 100, ) nodes = [(i, "node_%d" % (i,)) for i in xrange(1, 1000)] @@ -96,13 +98,13 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): already_inserted.sort() sql = """ - SELECT chunk_id FROM chunk_linearized + SELECT chunk_id, numerator, denominator FROM chunk_linearized WHERE room_id = ? - ORDER BY ordering ASC """ txn.execute(sql, (room_id,)) - ordered = [r for r, in txn] + ordered = sorted([(Fraction(n, d), r) for r, n, d in txn]) + ordered = [c for _, c in ordered] self.assertEqual(expected, ordered) @@ -114,7 +116,7 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): def test_txn(txn): table = ChunkDBOrderedListStore( - txn, room_id, self.clock, 1, 20, + txn, room_id, self.clock, 5, 1000, ) table.add_node("a") @@ -132,13 +134,13 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): expected.append(node_id) sql = """ - SELECT chunk_id FROM chunk_linearized + SELECT chunk_id, numerator, denominator FROM chunk_linearized WHERE room_id = ? - ORDER BY ordering ASC """ txn.execute(sql, (room_id,)) - ordered = [r for r, in txn] + ordered = sorted([(Fraction(n, d), r) for r, n, d in txn]) + ordered = [c for _, c in ordered] self.assertEqual(expected, ordered) @@ -150,7 +152,7 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): def test_txn(txn): table = ChunkDBOrderedListStore( - txn, room_id, self.clock, 1, 100, + txn, room_id, self.clock, 5, 100, ) table.add_node("a") @@ -171,13 +173,13 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): prev_node = node_id sql = """ - SELECT chunk_id FROM chunk_linearized + SELECT chunk_id, numerator, denominator FROM chunk_linearized WHERE room_id = ? - ORDER BY ordering ASC """ txn.execute(sql, (room_id,)) - ordered = [r for r, in txn] + ordered = sorted([(Fraction(n, d), r) for r, n, d in txn]) + ordered = [c for _, c in ordered] expected = expected_prefix + list(reversed(expected_suffix)) @@ -191,7 +193,7 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): def test_txn(txn): table = ChunkDBOrderedListStore( - txn, room_id, self.clock, 1, 100, + txn, room_id, self.clock, 5, 100, ) table.add_node("A") @@ -214,7 +216,7 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): def test_txn(txn): table = ChunkDBOrderedListStore( - txn, room_id, self.clock, 1, 100, + txn, room_id, self.clock, 5, 100, ) table.add_node("A") |