summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py35
-rw-r--r--synapse/storage/devices.py50
2 files changed, 51 insertions, 34 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py

index aae43d0f99..29589853c6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py
@@ -175,22 +175,22 @@ class PerformanceCounters(object): self.current_counters = {} self.previous_counters = {} - def update(self, key, start_time, end_time=None): - if end_time is None: - end_time = time.time() - duration = end_time - start_time + def update(self, key, duration_secs): count, cum_time = self.current_counters.get(key, (0, 0)) count += 1 - cum_time += duration + cum_time += duration_secs self.current_counters[key] = (count, cum_time) - return end_time - def interval(self, interval_duration, limit=3): + def interval(self, interval_duration_secs, limit=3): counters = [] for name, (count, cum_time) in iteritems(self.current_counters): prev_count, prev_time = self.previous_counters.get(name, (0, 0)) counters.append( - ((cum_time - prev_time) / interval_duration, count - prev_count, name) + ( + (cum_time - prev_time) / interval_duration_secs, + count - prev_count, + name, + ) ) self.previous_counters = dict(self.current_counters) @@ -221,7 +221,6 @@ class SQLBaseStore(object): # is running in mainline, and we have some nice monitoring frontends # to watch it self._txn_perf_counters = PerformanceCounters() - self._get_event_counters = PerformanceCounters() self._get_event_cache = Cache( "*getEvent*", keylen=3, max_entries=hs.config.event_cache_size @@ -369,21 +368,13 @@ class SQLBaseStore(object): time_then = self._previous_loop_ts self._previous_loop_ts = time_now - ratio = (curr - prev) / (time_now - time_then) - - top_three_counters = self._txn_perf_counters.interval( - time_now - time_then, limit=3 - ) + duration = time_now - time_then + ratio = (curr - prev) / duration - top_3_event_counters = self._get_event_counters.interval( - time_now - time_then, limit=3 - ) + top_three_counters = self._txn_perf_counters.interval(duration, limit=3) perf_logger.info( - "Total database time: %.3f%% {%s} {%s}", - ratio * 100, - top_three_counters, - top_3_event_counters, + "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters ) self._clock.looping_call(loop, 10000) @@ -465,7 +456,7 @@ class SQLBaseStore(object): transaction_logger.debug("[TXN END] {%s} %f sec", name, duration) self._current_txn_total_time += duration - self._txn_perf_counters.update(desc, start, end) + self._txn_perf_counters.update(desc, duration) sql_txn_timer.labels(desc).observe(duration) @defer.inlineCallbacks diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 3413a46675..d2b113a4e7 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py
@@ -24,6 +24,7 @@ from synapse.api.errors import StoreError from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import Cache, SQLBaseStore, db_to_json from synapse.storage.background_updates import BackgroundUpdateStore +from synapse.util import batch_iter from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList logger = logging.getLogger(__name__) @@ -391,22 +392,47 @@ class DeviceWorkerStore(SQLBaseStore): return now_stream_id, [] - @defer.inlineCallbacks - def get_user_whose_devices_changed(self, from_key): - """Get set of users whose devices have changed since `from_key`. + def get_users_whose_devices_changed(self, from_key, user_ids): + """Get set of users whose devices have changed since `from_key` that + are in the given list of user_ids. + + Args: + from_key (str): The device lists stream token + user_ids (Iterable[str]) + + Returns: + Deferred[set[str]]: The set of user_ids whose devices have changed + since `from_key` """ from_key = int(from_key) - changed = self._device_list_stream_cache.get_all_entities_changed(from_key) - if changed is not None: - defer.returnValue(set(changed)) - sql = """ - SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ? - """ - rows = yield self._execute( - "get_user_whose_devices_changed", None, sql, from_key + # Get set of users who *may* have changed. Users not in the returned + # list have definitely not changed. + to_check = list( + self._device_list_stream_cache.get_entities_changed(user_ids, from_key) + ) + + if not to_check: + return defer.succeed(set()) + + def _get_users_whose_devices_changed_txn(txn): + changes = set() + + sql = """ + SELECT DISTINCT user_id FROM device_lists_stream + WHERE stream_id > ? + AND user_id IN (%s) + """ + + for chunk in batch_iter(to_check, 100): + txn.execute(sql % (",".join("?" for _ in chunk),), (from_key,) + chunk) + changes.update(user_id for user_id, in txn) + + return changes + + return self.runInteraction( + "get_users_whose_devices_changed", _get_users_whose_devices_changed_txn ) - defer.returnValue(set(row[0] for row in rows)) def get_all_device_list_changes_for_remotes(self, from_key, to_key): """Return a list of `(stream_id, user_id, destination)` which is the