summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-11-16 16:27:21 +0000
committerErik Johnston <erik@matrix.org>2023-11-16 16:27:21 +0000
commitb20bdd3997227dd74403ce39977a37a3b89762ed (patch)
treeb35f4771a535bbbd7fe8955c2c238b62f0ad5f71 /synapse/util
parentMerge remote-tracking branch 'origin/develop' into matrix-org-hotfixes (diff)
parentSpeed up deleting device messages (#16643) (diff)
downloadsynapse-b20bdd3997227dd74403ce39977a37a3b89762ed.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/check_dependencies.py3
-rw-r--r--synapse/util/iterutils.py51
-rw-r--r--synapse/util/task_scheduler.py2
3 files changed, 54 insertions, 2 deletions
diff --git a/synapse/util/check_dependencies.py b/synapse/util/check_dependencies.py

index f7cead9e12..6f008734a0 100644 --- a/synapse/util/check_dependencies.py +++ b/synapse/util/check_dependencies.py
@@ -189,7 +189,8 @@ def check_requirements(extra: Optional[str] = None) -> None: errors.append(_not_installed(requirement, extra)) else: if dist.version is None: - # This shouldn't happen---it suggests a borked virtualenv. (See #12223) + # This shouldn't happen---it suggests a borked virtualenv. (See + # https://github.com/matrix-org/synapse/issues/12223) # Try to give a vaguely helpful error message anyway. # Type-ignore: the annotations don't reflect reality: see # https://github.com/python/typeshed/issues/7513 diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py
index a0efb96d3b..f4c0194af0 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py
@@ -135,3 +135,54 @@ def sorted_topologically( degree_map[edge] -= 1 if degree_map[edge] == 0: heapq.heappush(zero_degree, edge) + + +def sorted_topologically_batched( + nodes: Iterable[T], + graph: Mapping[T, Collection[T]], +) -> Generator[Collection[T], None, None]: + r"""Walk the graph topologically, returning batches of nodes where all nodes + that references it have been previously returned. + + For example, given the following graph: + + A + / \ + B C + \ / + D + + This function will return: `[[A], [B, C], [D]]`. + + This function is useful for e.g. batch persisting events in an auth chain, + where we can only persist an event if all its auth events have already been + persisted. + """ + + degree_map = {node: 0 for node in nodes} + reverse_graph: Dict[T, Set[T]] = {} + + for node, edges in graph.items(): + if node not in degree_map: + continue + + for edge in set(edges): + if edge in degree_map: + degree_map[node] += 1 + + reverse_graph.setdefault(edge, set()).add(node) + reverse_graph.setdefault(node, set()) + + zero_degree = [node for node, degree in degree_map.items() if degree == 0] + + while zero_degree: + new_zero_degree = [] + for node in zero_degree: + for edge in reverse_graph.get(node, []): + if edge in degree_map: + degree_map[edge] -= 1 + if degree_map[edge] == 0: + new_zero_degree.append(edge) + + yield zero_degree + zero_degree = new_zero_degree diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index caf13b3474..29c561e555 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py
@@ -193,7 +193,7 @@ class TaskScheduler: result: Optional[JsonMapping] = None, error: Optional[str] = None, ) -> bool: - """Update some task associated values. This is exposed publically so it can + """Update some task associated values. This is exposed publicly so it can be used inside task functions, mainly to update the result and be able to resume a task at a specific step after a restart of synapse.