diff --git a/synapse/util/check_dependencies.py b/synapse/util/check_dependencies.py
index f7cead9e12..6f008734a0 100644
--- a/synapse/util/check_dependencies.py
+++ b/synapse/util/check_dependencies.py
@@ -189,7 +189,8 @@ def check_requirements(extra: Optional[str] = None) -> None:
errors.append(_not_installed(requirement, extra))
else:
if dist.version is None:
- # This shouldn't happen---it suggests a borked virtualenv. (See #12223)
+ # This shouldn't happen---it suggests a borked virtualenv. (See
+ # https://github.com/matrix-org/synapse/issues/12223)
# Try to give a vaguely helpful error message anyway.
# Type-ignore: the annotations don't reflect reality: see
# https://github.com/python/typeshed/issues/7513
diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py
index a0efb96d3b..f4c0194af0 100644
--- a/synapse/util/iterutils.py
+++ b/synapse/util/iterutils.py
@@ -135,3 +135,54 @@ def sorted_topologically(
degree_map[edge] -= 1
if degree_map[edge] == 0:
heapq.heappush(zero_degree, edge)
+
+
+def sorted_topologically_batched(
+ nodes: Iterable[T],
+ graph: Mapping[T, Collection[T]],
+) -> Generator[Collection[T], None, None]:
+ r"""Walk the graph topologically, returning batches of nodes where all nodes
+ that references it have been previously returned.
+
+ For example, given the following graph:
+
+ A
+ / \
+ B C
+ \ /
+ D
+
+ This function will return: `[[A], [B, C], [D]]`.
+
+ This function is useful for e.g. batch persisting events in an auth chain,
+ where we can only persist an event if all its auth events have already been
+ persisted.
+ """
+
+ degree_map = {node: 0 for node in nodes}
+ reverse_graph: Dict[T, Set[T]] = {}
+
+ for node, edges in graph.items():
+ if node not in degree_map:
+ continue
+
+ for edge in set(edges):
+ if edge in degree_map:
+ degree_map[node] += 1
+
+ reverse_graph.setdefault(edge, set()).add(node)
+ reverse_graph.setdefault(node, set())
+
+ zero_degree = [node for node, degree in degree_map.items() if degree == 0]
+
+ while zero_degree:
+ new_zero_degree = []
+ for node in zero_degree:
+ for edge in reverse_graph.get(node, []):
+ if edge in degree_map:
+ degree_map[edge] -= 1
+ if degree_map[edge] == 0:
+ new_zero_degree.append(edge)
+
+ yield zero_degree
+ zero_degree = new_zero_degree
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index caf13b3474..29c561e555 100644
--- a/synapse/util/task_scheduler.py
+++ b/synapse/util/task_scheduler.py
@@ -193,7 +193,7 @@ class TaskScheduler:
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> bool:
- """Update some task associated values. This is exposed publically so it can
+ """Update some task associated values. This is exposed publicly so it can
be used inside task functions, mainly to update the result and be able to
resume a task at a specific step after a restart of synapse.
|