diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 9f3b8741c1..8d9df352b2 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -93,7 +93,7 @@ class Clock:
_reactor: IReactorTime = attr.ib()
- @defer.inlineCallbacks # type: ignore[arg-type] # Issue in Twisted's type annotations
+ @defer.inlineCallbacks
def sleep(self, seconds: float) -> "Generator[Deferred[float], Any, Any]":
d: defer.Deferred[float] = defer.Deferred()
with context.PreserveLoggingContext():
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 0cbeb0c365..8a55e4e41d 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -345,6 +345,7 @@ async def yieldable_gather_results_delaying_cancellation(
T1 = TypeVar("T1")
T2 = TypeVar("T2")
T3 = TypeVar("T3")
+T4 = TypeVar("T4")
@overload
@@ -380,6 +381,19 @@ def gather_results(
...
+@overload
+def gather_results(
+ deferredList: Tuple[
+ "defer.Deferred[T1]",
+ "defer.Deferred[T2]",
+ "defer.Deferred[T3]",
+ "defer.Deferred[T4]",
+ ],
+ consumeErrors: bool = ...,
+) -> "defer.Deferred[Tuple[T1, T2, T3, T4]]":
+ ...
+
+
def gather_results( # type: ignore[misc]
deferredList: Tuple["defer.Deferred[T1]", ...],
consumeErrors: bool = False,
diff --git a/synapse/util/check_dependencies.py b/synapse/util/check_dependencies.py
index f7cead9e12..6f008734a0 100644
--- a/synapse/util/check_dependencies.py
+++ b/synapse/util/check_dependencies.py
@@ -189,7 +189,8 @@ def check_requirements(extra: Optional[str] = None) -> None:
errors.append(_not_installed(requirement, extra))
else:
if dist.version is None:
- # This shouldn't happen---it suggests a borked virtualenv. (See #12223)
+ # This shouldn't happen---it suggests a borked virtualenv. (See
+ # https://github.com/matrix-org/synapse/issues/12223)
# Try to give a vaguely helpful error message anyway.
# Type-ignore: the annotations don't reflect reality: see
# https://github.com/python/typeshed/issues/7513
diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py
index a0efb96d3b..f4c0194af0 100644
--- a/synapse/util/iterutils.py
+++ b/synapse/util/iterutils.py
@@ -135,3 +135,54 @@ def sorted_topologically(
degree_map[edge] -= 1
if degree_map[edge] == 0:
heapq.heappush(zero_degree, edge)
+
+
+def sorted_topologically_batched(
+ nodes: Iterable[T],
+ graph: Mapping[T, Collection[T]],
+) -> Generator[Collection[T], None, None]:
+ r"""Walk the graph topologically, returning batches of nodes where all nodes
+ that references it have been previously returned.
+
+ For example, given the following graph:
+
+ A
+ / \
+ B C
+ \ /
+ D
+
+ This function will return: `[[A], [B, C], [D]]`.
+
+ This function is useful for e.g. batch persisting events in an auth chain,
+ where we can only persist an event if all its auth events have already been
+ persisted.
+ """
+
+ degree_map = {node: 0 for node in nodes}
+ reverse_graph: Dict[T, Set[T]] = {}
+
+ for node, edges in graph.items():
+ if node not in degree_map:
+ continue
+
+ for edge in set(edges):
+ if edge in degree_map:
+ degree_map[node] += 1
+
+ reverse_graph.setdefault(edge, set()).add(node)
+ reverse_graph.setdefault(node, set())
+
+ zero_degree = [node for node, degree in degree_map.items() if degree == 0]
+
+ while zero_degree:
+ new_zero_degree = []
+ for node in zero_degree:
+ for edge in reverse_graph.get(node, []):
+ if edge in degree_map:
+ degree_map[edge] -= 1
+ if degree_map[edge] == 0:
+ new_zero_degree.append(edge)
+
+ yield zero_degree
+ zero_degree = new_zero_degree
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index caf13b3474..8c2df233d3 100644
--- a/synapse/util/task_scheduler.py
+++ b/synapse/util/task_scheduler.py
@@ -71,7 +71,7 @@ class TaskScheduler:
# Time before a complete or failed task is deleted from the DB
KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week
# Maximum number of tasks that can run at the same time
- MAX_CONCURRENT_RUNNING_TASKS = 10
+ MAX_CONCURRENT_RUNNING_TASKS = 5
# Time from the last task update after which we will log a warning
LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs
@@ -193,7 +193,7 @@ class TaskScheduler:
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> bool:
- """Update some task associated values. This is exposed publically so it can
+ """Update some task associated values. This is exposed publicly so it can
be used inside task functions, mainly to update the result and be able to
resume a task at a specific step after a restart of synapse.
|