diff options
author | Erik Johnston <erik@matrix.org> | 2021-01-21 16:31:51 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-21 16:31:51 +0000 |
commit | 12ec55bfaa30bc8040131c23f7c6728e40b21d01 (patch) | |
tree | 48876e0000eff539ae90a46af79625bf473cffa8 /synapse/storage | |
parent | Merge remote-tracking branch 'origin/release-v1.26.0' into develop (diff) | |
download | synapse-12ec55bfaa30bc8040131c23f7c6728e40b21d01.tar.xz |
Increase perf of handling concurrent use of StreamIDGenerators. (#9190)
We have seen a failure mode here where if there are many in flight unfinished IDs then marking an ID as finished takes a lot of CPU (as calling deque.remove iterates over the list)
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/util/id_generators.py | 21 |
1 files changed, 13 insertions, 8 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index bb84c0d792..71ef5a72dc 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -15,12 +15,11 @@ import heapq import logging import threading -from collections import deque +from collections import OrderedDict from contextlib import contextmanager from typing import Dict, List, Optional, Set, Tuple, Union import attr -from typing_extensions import Deque from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.database import DatabasePool, LoggingTransaction @@ -101,7 +100,13 @@ class StreamIdGenerator: self._current = (max if step > 0 else min)( self._current, _load_current_id(db_conn, table, column, step) ) - self._unfinished_ids = deque() # type: Deque[int] + + # We use this as an ordered set, as we want to efficiently append items, + # remove items and get the first item. Since we insert IDs in order, the + # insertion ordering will ensure its in the correct ordering. + # + # The key and values are the same, but we never look at the values. + self._unfinished_ids = OrderedDict() # type: OrderedDict[int, int] def get_next(self): """ @@ -113,7 +118,7 @@ class StreamIdGenerator: self._current += self._step next_id = self._current - self._unfinished_ids.append(next_id) + self._unfinished_ids[next_id] = next_id @contextmanager def manager(): @@ -121,7 +126,7 @@ class StreamIdGenerator: yield next_id finally: with self._lock: - self._unfinished_ids.remove(next_id) + self._unfinished_ids.pop(next_id) return _AsyncCtxManagerWrapper(manager()) @@ -140,7 +145,7 @@ class StreamIdGenerator: self._current += n * self._step for next_id in next_ids: - self._unfinished_ids.append(next_id) + self._unfinished_ids[next_id] = next_id @contextmanager def manager(): @@ -149,7 +154,7 @@ class StreamIdGenerator: finally: with self._lock: for next_id in next_ids: - self._unfinished_ids.remove(next_id) + self._unfinished_ids.pop(next_id) return _AsyncCtxManagerWrapper(manager()) @@ -162,7 +167,7 @@ class StreamIdGenerator: """ with self._lock: if self._unfinished_ids: - return self._unfinished_ids[0] - self._step + return next(iter(self._unfinished_ids)) - self._step return self._current |