diff options
Diffstat (limited to 'synapse/storage/chunk_ordered_table.py')
-rw-r--r-- | synapse/storage/chunk_ordered_table.py | 474 |
1 files changed, 366 insertions, 108 deletions
diff --git a/synapse/storage/chunk_ordered_table.py b/synapse/storage/chunk_ordered_table.py index 79d0ca44ec..5e552ef138 100644 --- a/synapse/storage/chunk_ordered_table.py +++ b/synapse/storage/chunk_ordered_table.py @@ -16,9 +16,11 @@ import math import logging +from fractions import Fraction + from synapse.storage._base import SQLBaseStore +from synapse.storage.engines import PostgresEngine from synapse.util.katriel_bodlaender import OrderedListStore -from synapse.util.metrics import Measure import synapse.metrics @@ -59,11 +61,12 @@ class ChunkDBOrderedListStore(OrderedListStore): re-instantiated in each transaction, so all state needs to be stored in the database. - Internally the ordering is implemented using floats, and the average is - taken when a node is inserted between other nodes. To avoid precision - errors a minimum difference between sucessive orderings is attempted to be - kept; whenever the difference is too small we attempt to rebalance. See - the `_rebalance` function for implementation details. + Internally the ordering is implemented using a linked list and assigning + each chunk a fraction. `get_next` and `get_prev` are implemented via linked + lists, and comparisons implemented using the fractions. When inserting + chunks fractions are picked such that their denominator is the smallest + possible. However, if the denominators grow too big then a rebalancing has + to take place to reduce the denominators; see `_rebalance` for details. Note that OrderedListStore orders nodes such that source of an edge comes before the target. This is counter intuitive when edges represent @@ -80,23 +83,24 @@ class ChunkDBOrderedListStore(OrderedListStore): txn room_id (str) clock - rebalance_digits (int): When a rebalance is triggered we rebalance - in a range around the node, where the bounds are rounded to this - number of digits. - min_difference (int): A rebalance is triggered when the difference - between two successive orderings is less than the reciprocal of - this. + database_engine + rebalance_max_denominator (int): When a rebalance is triggered we + replace existing orders with those that have a denominator smaller + or equal to this + max_denominator (int): A rebalance is triggered when a node has an + ordering with a denominator greater than this """ def __init__(self, - txn, room_id, clock, - rebalance_digits=3, - min_difference=1000000): + txn, room_id, clock, database_engine, + rebalance_max_denominator=100, + max_denominator=100000): self.txn = txn self.room_id = room_id self.clock = clock + self.database_engine = database_engine - self.rebalance_digits = rebalance_digits - self.min_difference = 1. / min_difference + self.rebalance_md = rebalance_max_denominator + self.max_denominator = max_denominator def is_before(self, a, b): """Implements OrderedListStore""" @@ -104,16 +108,13 @@ class ChunkDBOrderedListStore(OrderedListStore): def get_prev(self, node_id): """Implements OrderedListStore""" - order = self._get_order(node_id) sql = """ SELECT chunk_id FROM chunk_linearized - WHERE ordering < ? AND room_id = ? - ORDER BY ordering DESC - LIMIT 1 + WHERE next_chunk_id = ? """ - self.txn.execute(sql, (order, self.room_id,)) + self.txn.execute(sql, (node_id,)) row = self.txn.fetchone() if row: @@ -122,16 +123,13 @@ class ChunkDBOrderedListStore(OrderedListStore): def get_next(self, node_id): """Implements OrderedListStore""" - order = self._get_order(node_id) sql = """ - SELECT chunk_id FROM chunk_linearized - WHERE ordering > ? AND room_id = ? - ORDER BY ordering ASC - LIMIT 1 + SELECT next_chunk_id FROM chunk_linearized + WHERE chunk_id = ? """ - self.txn.execute(sql, (order, self.room_id,)) + self.txn.execute(sql, (node_id,)) row = self.txn.fetchone() if row: @@ -144,27 +142,26 @@ class ChunkDBOrderedListStore(OrderedListStore): rebalance = False # Set to true if we need to trigger a rebalance if target_id: - target_order = self._get_order(target_id) before_id = self.get_prev(target_id) - if before_id: - before_order = self._get_order(before_id) - new_order = (target_order + before_order) / 2. - - rebalance = math.fabs(target_order - before_order) < self.min_difference + new_order = self._insert_between(node_id, before_id, target_id) else: - new_order = math.floor(target_order) - 1 + new_order = self._insert_at_start(node_id, target_id) else: # If target_id is None then we insert at the end. self.txn.execute(""" - SELECT COALESCE(MAX(ordering), 0) + 1 + SELECT chunk_id FROM chunk_linearized - WHERE room_id = ? + WHERE room_id = ? AND next_chunk_id is NULL """, (self.room_id,)) - new_order, = self.txn.fetchone() + row = self.txn.fetchone() + if row: + new_order = self._insert_at_end(node_id, row[0]) + else: + new_order = self._insert_first(node_id) - self._insert(node_id, new_order) + rebalance = new_order.denominator > self.max_denominator if rebalance: self._rebalance(node_id) @@ -174,64 +171,204 @@ class ChunkDBOrderedListStore(OrderedListStore): rebalance = False # Set to true if we need to trigger a rebalance + next_chunk_id = None if target_id: - target_order = self._get_order(target_id) - after_id = self.get_next(target_id) - if after_id: - after_order = self._get_order(after_id) - new_order = (target_order + after_order) / 2. - - rebalance = math.fabs(target_order - after_order) < self.min_difference + next_chunk_id = self.get_next(target_id) + if next_chunk_id: + new_order = self._insert_between(node_id, target_id, next_chunk_id) else: - new_order = math.ceil(target_order) + 1 + new_order = self._insert_at_end(node_id, target_id) else: # If target_id is None then we insert at the start. self.txn.execute(""" - SELECT COALESCE(MIN(ordering), 0) - 1 + SELECT chunk_id FROM chunk_linearized + NATURAL JOIN chunk_linearized_first WHERE room_id = ? """, (self.room_id,)) - new_order, = self.txn.fetchone() + row = self.txn.fetchone() + if row: + new_order = self._insert_at_start(node_id, row[0]) + else: + new_order = self._insert_first(node_id) - self._insert(node_id, new_order) + rebalance = new_order.denominator > self.max_denominator if rebalance: self._rebalance(node_id) + def _insert_between(self, node_id, left_id, right_id): + """Inserts node between given existing nodes. + """ + + left_order = self._get_order(left_id) + right_order = self._get_order(right_id) + + assert left_order < right_order + + new_order = get_fraction_in_range(left_order, right_order) + + SQLBaseStore._simple_update_one_txn( + self.txn, + table="chunk_linearized", + keyvalues={"chunk_id": left_id}, + updatevalues={"next_chunk_id": node_id}, + ) + + SQLBaseStore._simple_insert_txn( + self.txn, + table="chunk_linearized", + values={ + "chunk_id": node_id, + "room_id": self.room_id, + "next_chunk_id": right_id, + "numerator": int(new_order.numerator), + "denominator": int(new_order.denominator), + } + ) + + return new_order + + def _insert_at_end(self, node_id, last_id): + """Inserts node at the end using existing last node. + """ + + last_order = self._get_order(last_id) + new_order = Fraction(int(math.ceil(last_order)) + 1, 1) + + SQLBaseStore._simple_update_one_txn( + self.txn, + table="chunk_linearized", + keyvalues={"chunk_id": last_id}, + updatevalues={"next_chunk_id": node_id}, + ) + + SQLBaseStore._simple_insert_txn( + self.txn, + table="chunk_linearized", + values={ + "chunk_id": node_id, + "room_id": self.room_id, + "next_chunk_id": None, + "numerator": int(new_order.numerator), + "denominator": int(new_order.denominator), + } + ) + + return new_order + + def _insert_at_start(self, node_id, first_id): + """Inserts node at the start using existing first node. + """ + + first_order = self._get_order(first_id) + new_order = get_fraction_in_range(0, first_order) + + SQLBaseStore._simple_update_one_txn( + self.txn, + table="chunk_linearized_first", + keyvalues={"room_id": self.room_id}, + updatevalues={"chunk_id": node_id}, + ) + + SQLBaseStore._simple_insert_txn( + self.txn, + table="chunk_linearized", + values={ + "chunk_id": node_id, + "room_id": self.room_id, + "next_chunk_id": first_id, + "numerator": int(new_order.numerator), + "denominator": int(new_order.denominator), + } + ) + + return new_order + + def _insert_first(self, node_id): + """Inserts the first node for this room. + """ + + SQLBaseStore._simple_insert_txn( + self.txn, + table="chunk_linearized_first", + values={ + "room_id": self.room_id, + "chunk_id": node_id, + }, + ) + + SQLBaseStore._simple_insert_txn( + self.txn, + table="chunk_linearized", + values={ + "chunk_id": node_id, + "room_id": self.room_id, + "next_chunk_id": None, + "numerator": 1, + "denominator": 1, + } + ) + + return Fraction(1, 1) + def get_nodes_with_edges_to(self, node_id): """Implements OrderedListStore""" # Note that we use the inverse relation here sql = """ - SELECT l.ordering, l.chunk_id FROM chunk_graph AS g + SELECT l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g INNER JOIN chunk_linearized AS l ON g.prev_id = l.chunk_id WHERE g.chunk_id = ? """ self.txn.execute(sql, (node_id,)) - return self.txn.fetchall() + return [(Fraction(n, d), c) for c, n, d in self.txn] def get_nodes_with_edges_from(self, node_id): """Implements OrderedListStore""" # Note that we use the inverse relation here sql = """ - SELECT l.ordering, l.chunk_id FROM chunk_graph AS g + SELECT l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g INNER JOIN chunk_linearized AS l ON g.chunk_id = l.chunk_id WHERE g.prev_id = ? """ self.txn.execute(sql, (node_id,)) - return self.txn.fetchall() + return [(Fraction(n, d), c) for c, n, d in self.txn] def _delete_ordering(self, node_id): """Implements OrderedListStore""" + next_chunk_id = SQLBaseStore._simple_select_one_onecol_txn( + self.txn, + table="chunk_linearized", + keyvalues={ + "chunk_id": node_id, + }, + retcol="next_chunk_id", + ) + SQLBaseStore._simple_delete_txn( self.txn, table="chunk_linearized", keyvalues={"chunk_id": node_id}, ) + sql = """ + UPDATE chunk_linearized SET next_chunk_id = ? + WHERE next_chunk_id = ? + """ + + self.txn.execute(sql, (next_chunk_id, node_id,)) + + sql = """ + UPDATE chunk_linearized_first SET chunk_id = ? + WHERE chunk_id = ? + """ + + self.txn.execute(sql, (next_chunk_id, node_id,)) + def _add_edge_to_graph(self, source_id, target_id): """Implements OrderedListStore""" @@ -242,78 +379,199 @@ class ChunkDBOrderedListStore(OrderedListStore): values={"chunk_id": target_id, "prev_id": source_id} ) - def _insert(self, node_id, order): - """Inserts the node with the given ordering. - """ - SQLBaseStore._simple_insert_txn( - self.txn, - table="chunk_linearized", - values={ - "chunk_id": node_id, - "room_id": self.room_id, - "ordering": order, - } - ) - def _get_order(self, node_id): """Get the ordering of the given node. """ - return SQLBaseStore._simple_select_one_onecol_txn( + row = SQLBaseStore._simple_select_one_txn( self.txn, table="chunk_linearized", keyvalues={"chunk_id": node_id}, - retcol="ordering" + retcols=("numerator", "denominator",), ) + return Fraction(row["numerator"], row["denominator"]) def _rebalance(self, node_id): """Rebalances the list around the given node to ensure that the - ordering floats don't get too small. + ordering denominators aren't too big. + + This is done by starting at the given chunk and generating new orders + based on a Farey sequence of order `self.rebalance_md` for all + subsequent chunks that have an order less than that of the ordering + generated by the Farey sequence. + + For example say we have chunks (and orders): A (23/90), B (24/91) and + C (2/3), and we have rebalance_md set to 5, a rebalancing would produce: + + A: 23/90 -> 1/3 + B: 24/91 -> 2/5 + C: 2/3 (no change) - This works by finding a range that includes the given node, and - recalculating the ordering floats such that they're equidistant in - that range. + Since the farey sequence is 1/5, 1/4, 1/3, 2/5, 1/2, ... and 1/3 is the + smallest term greater than 23/90. + + Note that we've extended Farey Sequence to be infinite by repeating the + sequence with an added integer. For example sequence with order 3: + + 0/1, 1/3, 2/3, 1/1, 4/3, 5/3, 2/1, 7/3, ... """ logger.info("Rebalancing room %s, chunk %s", self.room_id, node_id) - with Measure(self.clock, "chunk_rebalance"): - # We pick the interval to try and minimise the number of decimal - # places, i.e. we round to nearest float with `rebalance_digits` and - # use that as one side of the interval - order = self._get_order(node_id) - a = round(order, self.rebalance_digits) - min_order = a - 10 ** -self.rebalance_digits - max_order = a + 10 ** -self.rebalance_digits - - # Now we get all the nodes in the range. We add the minimum difference - # to the bounds to ensure that we don't accidentally move a node to be - # within the minimum difference of a node outside the range. - sql = """ - SELECT chunk_id FROM chunk_linearized - WHERE ordering >= ? AND ordering <= ? AND room_id = ? - """ - self.txn.execute(sql, ( - min_order - self.min_difference, - max_order + self.min_difference, - self.room_id, - )) - - chunk_ids = [c for c, in self.txn] + old_order = self._get_order(node_id) + + a, b, c, d = find_farey_terms(old_order, self.rebalance_md) + assert old_order < Fraction(a, b) + assert b + d > self.rebalance_md + + # Since we can easily produce farey sequence terms with an iterative + # algorithm, we can use WITH RECURSIVE to do so. This is less clear + # than doing it in python, but saves us being killed by the RTT to the + # DB if we need to rebalance a large number of nodes. + with_sql = """ + WITH RECURSIVE chunks (chunk_id, next, n, a, b, c, d) AS ( + SELECT chunk_id, next_chunk_id, ?, ?, ?, ?, ? + FROM chunk_linearized WHERE chunk_id = ? + UNION ALL + SELECT n.chunk_id, n.next_chunk_id, n, + c, d, ((n + b) / d) * c - a, ((n + b) / d) * d - b + FROM chunks AS c + INNER JOIN chunk_linearized AS l ON l.chunk_id = c.chunk_id + INNER JOIN chunk_linearized AS n ON n.chunk_id = l.next_chunk_id + WHERE c * 1.0 / d > n.numerator * 1.0 / n.denominator + ) + """ - sql = """ + # Annoyingly, postgres 9.4 doesn't support the standard SQL subquery + # syntax for updates. + if isinstance(self.database_engine, PostgresEngine): + sql = with_sql + """ + UPDATE chunk_linearized AS l + SET numerator = a, denominator = b + FROM chunks AS c + WHERE c.chunk_id = l.chunk_id + """ + else: + sql = with_sql + """ UPDATE chunk_linearized - SET ordering = ? - WHERE chunk_id = ? + SET (numerator, denominator) = ( + SELECT a, b FROM chunks + WHERE chunks.chunk_id = chunk_linearized.chunk_id + ) + WHERE chunk_id in (SELECT chunk_id FROM chunks) """ - step = (max_order - min_order) / len(chunk_ids) - self.txn.executemany( - sql, - ( - ((idx * step + min_order), chunk_id) - for idx, chunk_id in enumerate(chunk_ids) - ) - ) + self.txn.execute(sql, ( + self.rebalance_md, a, b, c, d, node_id + )) + + logger.info("Rebalanced %d chunks in room %s", self.txn.rowcount, self.room_id) + + rebalance_counter.inc() + + +def get_fraction_in_range(min_frac, max_frac): + """Gets a fraction in between the given numbers. + + Uses Stern-Brocot tree to generate the fraction with the smallest + denominator. + + See https://en.wikipedia.org/wiki/Stern%E2%80%93Brocot_tree + + Args: + min_frac (numbers.Rational) + max_frac (numbers.Rational) + + Returns: + numbers.Rational + """ + + assert 0 <= min_frac < max_frac + + # If the determinant is 1 then the fraction with smallest numerator and + # denominator in the range is the mediant, so we don't have to use the + # stern brocot tree to search for it. + determinant = ( + min_frac.denominator * max_frac.numerator + - min_frac.numerator * max_frac.denominator + ) + + if determinant == 1: + return Fraction( + min_frac.numerator + max_frac.numerator, + min_frac.denominator + max_frac.denominator, + ) + + # This works by tracking two fractions a/b and c/d and repeatedly replacing + # one of them with their mediant, depending on if the mediant is smaller + # or greater than the specified range. + a, b, c, d = 0, 1, 1, 0 + + while True: + f = Fraction(a + c, b + d) + + if f <= min_frac: + a, b, c, d = a + c, b + d, c, d + elif min_frac < f < max_frac: + return f + else: + a, b, c, d = a, b, a + c, b + d + + +def find_farey_terms(min_frac, max_denom): + """Find the smallest pair of fractions that are part of the Farey sequence + of order `max_denom` (the ordered sequence of all fraction with denominator + less than or equal to max_denom). + + This is useful as it can be fed into a simple iterative algorithm to + generate subsequent entries in the sequence. + + A pair of fractions a/b, c/d are neighbours in the sequence of order + max(b, d) if and only if their determinant is one, i.e. bc - ad = 1. Note + that the next order sequence is generate by taking the mediants of the + previous order, so a/b and c/d are neighbours in all sequences with orders + between max(b, d) and b + d. + + We can therefore use the Stern-Brocot tree to find the closest pair of + fractions to min_frac such that b + d is strictly greater than max_denom, + since all neighbouring fractions in Stern-Brocot satisfy the necessary + determinant property. + + Note that we've extended Farey Sequence to be infinite by repeating the + sequence with an added integer. For example sequence with order 3: + + 0/1, 1/3, 2/3, 1/1, 4/3, 5/3, 2/1, 7/3, ... + + See https://en.wikipedia.org/wiki/Farey_sequence + + Args: + min_frac (numbers.Rational) + max_frac (int) + + Returns: + tuple[int, int, int, int] + """ + + a, b, c, d = 0, 1, 1, 0 + + while True: + cur_frac = Fraction(a + c, b + d) + + if b + d > max_denom: + break + + if cur_frac <= min_frac: + a, b, c, d = a + c, b + d, c, d + elif min_frac < cur_frac: + a, b, c, d = a, b, a + c, b + d + + # a/b may be smaller than min_frac, so we run the algorithm to generate + # next Farey sequence terms until a/b is strictly greater than min_frac + while Fraction(a, b) <= min_frac: + k = int((max_denom + b) / d) + a, b, c, d = c, d, k * c - a, k * d - b + + assert min_frac < Fraction(a, b) < Fraction(c, d) + assert b * c - a * d == 1 - rebalance_counter.inc() + return a, b, c, d |