diff options
-rw-r--r-- | synapse/storage/chunk_ordered_table.py | 4 | ||||
-rw-r--r-- | synapse/util/katriel_bodlaender.py | 222 | ||||
-rw-r--r-- | tests/storage/test_chunk_linearizer_table.py | 16 |
3 files changed, 123 insertions, 119 deletions
diff --git a/synapse/storage/chunk_ordered_table.py b/synapse/storage/chunk_ordered_table.py index d112b320a9..4a56af759a 100644 --- a/synapse/storage/chunk_ordered_table.py +++ b/synapse/storage/chunk_ordered_table.py @@ -122,7 +122,7 @@ class ChunkDBOrderedListStore(OrderedListStore): return row[0] return None - def insert_before(self, node_id, target_id): + def _insert_before(self, node_id, target_id): """Implements OrderedListStore""" rebalance = False # Set to true if we need to trigger a rebalance @@ -153,7 +153,7 @@ class ChunkDBOrderedListStore(OrderedListStore): if rebalance: self._rebalance(node_id) - def insert_after(self, node_id, target_id): + def _insert_after(self, node_id, target_id): """Implements OrderedListStore""" rebalance = False # Set to true if we need to trigger a rebalance diff --git a/synapse/util/katriel_bodlaender.py b/synapse/util/katriel_bodlaender.py index 887a6b4681..11ba612dce 100644 --- a/synapse/util/katriel_bodlaender.py +++ b/synapse/util/katriel_bodlaender.py @@ -70,6 +70,93 @@ class OrderedListStore(object): __metaclass__ = ABCMeta + def add_node(self, node_id): + """Adds a node to the graph. + + Args: + node_id (str) + """ + self._insert_before(node_id, None) + + def add_edge(self, source, target): + """Adds a new edge is added to the graph and updates the ordering. + + See module level docs. + + Note that both the source and target nodes must have been inserted into + the store (at an arbitrary position) already. + + Args: + source (str): The source node of the new edge + target (str): The target node of the new edge + """ + + # The following is the Katriel-Bodlaender algorithm. + + to_s = [] + from_t = [] + to_s_neighbours = [] + from_t_neighbours = [] + to_s_indegree = 0 + from_t_outdegree = 0 + s = source + t = target + + while s and t and not self.is_before(s, t): + m_s = to_s_indegree + m_t = from_t_outdegree + + # These functions return a tuple where the first term is a float + # that can be used to order the the list of neighbours. + # These are valid until the next write + pe_s = self.get_nodes_with_edges_to(s) + fe_t = self.get_nodes_with_edges_from(t) + + l_s = len(pe_s) + l_t = len(fe_t) + + if m_s + l_s <= m_t + l_t: + to_s.append(s) + to_s_neighbours.extend(pe_s) + to_s_indegree += l_s + + if to_s_neighbours: + to_s_neighbours.sort() + _, s = to_s_neighbours.pop() + else: + s = None + + if m_s + l_s >= m_t + l_t: + from_t.append(t) + from_t_neighbours.extend(fe_t) + from_t_outdegree += l_t + + if from_t_neighbours: + from_t_neighbours.sort(reverse=True) + _, t = from_t_neighbours.pop() + else: + t = None + + if s is None: + s = self.get_prev(target) + + if t is None: + t = self.get_next(source) + + while to_s: + s1 = to_s.pop() + self._delete_ordering(s1) + self._insert_after(s1, s) + s = s1 + + while from_t: + t1 = from_t.pop() + self._delete_ordering(t1) + self._insert_before(t1, t) + t = t1 + + self._add_edge_to_graph(source, target) + @abstractmethod def is_before(self, first_node, second_node): """Returns whether the first node is before the second node. @@ -110,30 +197,6 @@ class OrderedListStore(object): pass @abstractmethod - def insert_before(self, node_id, target_id): - """Inserts node immediately before target node. - - If target_id is None then the node is inserted at the end of the list - - Args: - node_id (str) - target_id (str|None) - """ - pass - - @abstractmethod - def insert_after(self, node_id, target_id): - """Inserts node immediately after target node. - - If target_id is None then the node is inserted at the start of the list - - Args: - node_id (str) - target_id (str|None) - """ - pass - - @abstractmethod def get_nodes_with_edges_to(self, node_id): """Get all nodes with edges to the given node @@ -144,7 +207,8 @@ class OrderedListStore(object): list[tuple[float, str]]: Returns a list of tuple of an ordering term and the node ID. The ordering term can be used to sort the returned list. - The ordering is valid until subsequent calls to insert_* functions + The ordering is valid until subsequent calls to `add_edge` + functions """ pass @@ -165,111 +229,51 @@ class OrderedListStore(object): pass @abstractmethod - def _delete_ordering(self, node_id): - """Deletes the given node from the ordered list (but not the graph). + def _insert_before(self, node_id, target_id): + """Inserts node immediately before target node. - Used when we want to reinsert it into a different position + If target_id is None then the node is inserted at the end of the list Args: node_id (str) + target_id (str|None) """ pass @abstractmethod - def _add_edge_to_graph(self, source_id, target_id): - """Adds an edge to the graph from source to target. + def _insert_after(self, node_id, target_id): + """Inserts node immediately after target node. - Does not update ordering. + If target_id is None then the node is inserted at the start of the list Args: - source_id (str) - target_id (str) + node_id (str) + target_id (str|None) """ pass - def add_node(self, node_id): - """Adds a node to the graph. + @abstractmethod + def _delete_ordering(self, node_id): + """Deletes the given node from the ordered list (but not the graph). + + Used when we want to reinsert it into a different position Args: node_id (str) """ - self.insert_before(node_id, None) - - def add_edge(self, source, target): - """Adds a new edge is added to the graph and updates the ordering. + pass - See module level docs. + @abstractmethod + def _add_edge_to_graph(self, source_id, target_id): + """Adds an edge to the graph from source to target. - Note that both the source and target nodes must have been inserted into - the store (at an arbitrary position) already. + Does not update ordering. Args: - source (str): The source node of the new edge - target (str): The target node of the new edge + source_id (str) + target_id (str) """ - - # The following is the Katriel-Bodlaender algorithm. - - to_s = [] - from_t = [] - to_s_neighbours = [] - from_t_neighbours = [] - to_s_indegree = 0 - from_t_outdegree = 0 - s = source - t = target - - while s and t and not self.is_before(s, t): - m_s = to_s_indegree - m_t = from_t_outdegree - - pe_s = self.get_nodes_with_edges_to(s) - fe_t = self.get_nodes_with_edges_from(t) - - l_s = len(pe_s) - l_t = len(fe_t) - - if m_s + l_s <= m_t + l_t: - to_s.append(s) - to_s_neighbours.extend(pe_s) - to_s_indegree += l_s - - if to_s_neighbours: - to_s_neighbours.sort() - _, s = to_s_neighbours.pop() - else: - s = None - - if m_s + l_s >= m_t + l_t: - from_t.append(t) - from_t_neighbours.extend(fe_t) - from_t_outdegree += l_t - - if from_t_neighbours: - from_t_neighbours.sort(reverse=True) - _, t = from_t_neighbours.pop() - else: - t = None - - if s is None: - s = self.get_prev(target) - - if t is None: - t = self.get_next(source) - - while to_s: - s1 = to_s.pop() - self._delete_ordering(s1) - self.insert_after(s1, s) - s = s1 - - while from_t: - t1 = from_t.pop() - self._delete_ordering(t1) - self.insert_before(t1, t) - t = t1 - - self._add_edge_to_graph(source, target) + pass class InMemoryOrderedListStore(OrderedListStore): @@ -303,14 +307,14 @@ class InMemoryOrderedListStore(OrderedListStore): else: return None - def insert_before(self, node_id, target_id): + def _insert_before(self, node_id, target_id): if target_id is not None: idx = self.list.index(target_id) self.list.insert(idx, node_id) else: self.list.append(node_id) - def insert_after(self, node_id, target_id): + def _insert_after(self, node_id, target_id): if target_id is not None: idx = self.list.index(target_id) + 1 self.list.insert(idx, node_id) diff --git a/tests/storage/test_chunk_linearizer_table.py b/tests/storage/test_chunk_linearizer_table.py index 9890dffd60..beb1ac9a42 100644 --- a/tests/storage/test_chunk_linearizer_table.py +++ b/tests/storage/test_chunk_linearizer_table.py @@ -46,8 +46,8 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): ) table.add_node("A") - table.insert_after("B", "A") - table.insert_before("C", "A") + table._insert_after("B", "A") + table._insert_before("C", "A") sql = """ SELECT chunk_id FROM chunk_linearized @@ -87,9 +87,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): break if j < i: - table.insert_after(node_id, target_id) + table._insert_after(node_id, target_id) else: - table.insert_before(node_id, target_id) + table._insert_before(node_id, target_id) already_inserted.append((i, node_id)) already_inserted.sort() @@ -122,12 +122,12 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): for i in xrange(1, 1000): node_id = "node_id_before_%d" % i - table.insert_before(node_id, expected[0]) + table._insert_before(node_id, expected[0]) expected.insert(0, node_id) for i in xrange(1, 1000): node_id = "node_id_after_%d" % i - table.insert_after(node_id, expected[-1]) + table._insert_after(node_id, expected[-1]) expected.append(node_id) sql = """ @@ -162,10 +162,10 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase): for i in xrange(1, 100): node_id = "node_id_%d" % i if i % 2 == 0: - table.insert_before(node_id, prev_node) + table._insert_before(node_id, prev_node) expected_prefix.append(node_id) else: - table.insert_after(node_id, prev_node) + table._insert_after(node_id, prev_node) expected_suffix.append(node_id) prev_node = node_id |