summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/__init__.py2
-rw-r--r--synapse/util/async_helpers.py14
-rw-r--r--synapse/util/check_dependencies.py3
-rw-r--r--synapse/util/iterutils.py51
-rw-r--r--synapse/util/task_scheduler.py4
5 files changed, 70 insertions, 4 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py

index 9f3b8741c1..8d9df352b2 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py
@@ -93,7 +93,7 @@ class Clock: _reactor: IReactorTime = attr.ib() - @defer.inlineCallbacks # type: ignore[arg-type] # Issue in Twisted's type annotations + @defer.inlineCallbacks def sleep(self, seconds: float) -> "Generator[Deferred[float], Any, Any]": d: defer.Deferred[float] = defer.Deferred() with context.PreserveLoggingContext(): diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 0cbeb0c365..8a55e4e41d 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py
@@ -345,6 +345,7 @@ async def yieldable_gather_results_delaying_cancellation( T1 = TypeVar("T1") T2 = TypeVar("T2") T3 = TypeVar("T3") +T4 = TypeVar("T4") @overload @@ -380,6 +381,19 @@ def gather_results( ... +@overload +def gather_results( + deferredList: Tuple[ + "defer.Deferred[T1]", + "defer.Deferred[T2]", + "defer.Deferred[T3]", + "defer.Deferred[T4]", + ], + consumeErrors: bool = ..., +) -> "defer.Deferred[Tuple[T1, T2, T3, T4]]": + ... + + def gather_results( # type: ignore[misc] deferredList: Tuple["defer.Deferred[T1]", ...], consumeErrors: bool = False, diff --git a/synapse/util/check_dependencies.py b/synapse/util/check_dependencies.py
index f7cead9e12..6f008734a0 100644 --- a/synapse/util/check_dependencies.py +++ b/synapse/util/check_dependencies.py
@@ -189,7 +189,8 @@ def check_requirements(extra: Optional[str] = None) -> None: errors.append(_not_installed(requirement, extra)) else: if dist.version is None: - # This shouldn't happen---it suggests a borked virtualenv. (See #12223) + # This shouldn't happen---it suggests a borked virtualenv. (See + # https://github.com/matrix-org/synapse/issues/12223) # Try to give a vaguely helpful error message anyway. # Type-ignore: the annotations don't reflect reality: see # https://github.com/python/typeshed/issues/7513 diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py
index a0efb96d3b..f4c0194af0 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py
@@ -135,3 +135,54 @@ def sorted_topologically( degree_map[edge] -= 1 if degree_map[edge] == 0: heapq.heappush(zero_degree, edge) + + +def sorted_topologically_batched( + nodes: Iterable[T], + graph: Mapping[T, Collection[T]], +) -> Generator[Collection[T], None, None]: + r"""Walk the graph topologically, returning batches of nodes where all nodes + that references it have been previously returned. + + For example, given the following graph: + + A + / \ + B C + \ / + D + + This function will return: `[[A], [B, C], [D]]`. + + This function is useful for e.g. batch persisting events in an auth chain, + where we can only persist an event if all its auth events have already been + persisted. + """ + + degree_map = {node: 0 for node in nodes} + reverse_graph: Dict[T, Set[T]] = {} + + for node, edges in graph.items(): + if node not in degree_map: + continue + + for edge in set(edges): + if edge in degree_map: + degree_map[node] += 1 + + reverse_graph.setdefault(edge, set()).add(node) + reverse_graph.setdefault(node, set()) + + zero_degree = [node for node, degree in degree_map.items() if degree == 0] + + while zero_degree: + new_zero_degree = [] + for node in zero_degree: + for edge in reverse_graph.get(node, []): + if edge in degree_map: + degree_map[edge] -= 1 + if degree_map[edge] == 0: + new_zero_degree.append(edge) + + yield zero_degree + zero_degree = new_zero_degree diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index caf13b3474..8c2df233d3 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py
@@ -71,7 +71,7 @@ class TaskScheduler: # Time before a complete or failed task is deleted from the DB KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week # Maximum number of tasks that can run at the same time - MAX_CONCURRENT_RUNNING_TASKS = 10 + MAX_CONCURRENT_RUNNING_TASKS = 5 # Time from the last task update after which we will log a warning LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs @@ -193,7 +193,7 @@ class TaskScheduler: result: Optional[JsonMapping] = None, error: Optional[str] = None, ) -> bool: - """Update some task associated values. This is exposed publically so it can + """Update some task associated values. This is exposed publicly so it can be used inside task functions, mainly to update the result and be able to resume a task at a specific step after a restart of synapse.