diff options
author | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2019-08-09 15:51:42 +0100 |
---|---|---|
committer | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2019-08-09 15:51:42 +0100 |
commit | d54ae7118dced5d8fd07f92994ccfccda509c6cb (patch) | |
tree | 1246f14cbd080425af4aa872f8627b9f1415972d | |
parent | Add initial batch of stats tests (diff) | |
download | synapse-d54ae7118dced5d8fd07f92994ccfccda509c6cb.tar.xz |
Move back to `defer.inlineCallbacks` from `async` as it makes stats
unergonomic if we move to `async` from the bottom-up. Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
-rw-r--r-- | synapse/handlers/stats.py | 46 | ||||
-rw-r--r-- | synapse/storage/stats.py | 31 |
2 files changed, 38 insertions, 39 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 422645cd27..3590133a32 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -16,7 +16,6 @@ import logging from twisted.internet import defer -from twisted.internet.defer import ensureDeferred from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.handlers.state_deltas import StateDeltasHandler @@ -245,10 +244,8 @@ class StatsHandler(StateDeltasHandler): field = "public_rooms" if public else "private_rooms" delta = +1 if membership == Membership.JOIN else -1 - yield ensureDeferred( - self.store.update_stats_delta( - now, "user", user_id, {field: delta} - ) + yield self.store.update_stats_delta( + now, "user", user_id, {field: delta} ) elif typ == EventTypes.Create: @@ -329,20 +326,17 @@ class StatsHandler(StateDeltasHandler): ) if room_stats_complete: - yield ensureDeferred( - self.store.update_stats_delta( - now, - "room", - room_id, - room_stats_delta, - complete_with_stream_id=stream_id, - ) + yield self.store.update_stats_delta( + now, + "room", + room_id, + room_stats_delta, + complete_with_stream_id=stream_id, ) + elif len(room_stats_delta) > 0: - yield ensureDeferred( - self.store.update_stats_delta( - now, "room", room_id, room_stats_delta - ) + yield self.store.update_stats_delta( + now, "room", room_id, room_stats_delta ) @defer.inlineCallbacks @@ -362,16 +356,14 @@ class StatsHandler(StateDeltasHandler): for user_id in user_ids: if self.hs.is_mine(UserID.from_string(user_id)): - yield ensureDeferred( - self.store.update_stats_delta( - ts, - "user", - user_id, - { - "public_rooms": +1 if is_public else -1, - "private_rooms": -1 if is_public else +1, - }, - ) + yield self.store.update_stats_delta( + ts, + "user", + user_id, + { + "public_rooms": +1 if is_public else -1, + "private_rooms": -1 if is_public else +1, + }, ) @defer.inlineCallbacks diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index d60e6fb7d8..1bb5459839 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -19,7 +19,6 @@ from itertools import chain from threading import Lock from twisted.internet import defer -from twisted.internet.defer import ensureDeferred from synapse.api.constants import EventTypes, Membership from synapse.storage.engines import Sqlite3Engine @@ -81,7 +80,8 @@ class StatsStore(StateDeltasStore): def quantise_stats_time(self, ts): return (ts // self.stats_bucket_size) * self.stats_bucket_size - async def _unwedge_incremental_processor(self, forced_promise): + @defer.inlineCallbacks + def _unwedge_incremental_processor(self, forced_promise): """ Make a promise about what this initial background count will handle, so that we can allow the incremental processor to start doing things @@ -90,7 +90,7 @@ class StatsStore(StateDeltasStore): if forced_promise is None: promised_stats_delta_pos = ( - await self.get_max_stream_id_in_current_state_deltas() + yield self.get_max_stream_id_in_current_state_deltas() ) promised_max = self.get_room_max_stream_ordering() @@ -105,15 +105,19 @@ class StatsStore(StateDeltasStore): promised_positions = forced_promise # this stores it for our reference later - await self.update_stats_positions( + yield self.update_stats_positions( promised_positions, for_initial_processor=True ) # this unwedges the incremental processor - await self.update_stats_positions( + yield self.update_stats_positions( promised_positions, for_initial_processor=False ) + # with the delta processor unwedged, now let it catch up in case + # anything was missed during the wedge period + self.clock.call_later(0, self.hs.get_stats_handler().notify_new_event) + @defer.inlineCallbacks def _populate_stats_prepare(self, progress, batch_size): @@ -188,7 +192,7 @@ class StatsStore(StateDeltasStore): "populate_stats_delete_dirty_skeletons", _delete_dirty_skeletons ) - yield ensureDeferred(self._unwedge_incremental_processor(old_positions)) + yield self._unwedge_incremental_processor(old_positions) yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons) self.get_earliest_token_for_stats.invalidate_all() @@ -821,15 +825,17 @@ class StatsStore(StateDeltasStore): return maybe_more - async def collect_old(self, stats_type): + @defer.inlineCallbacks + def collect_old(self, stats_type): while True: - maybe_more = await self.runInteraction( + maybe_more = yield self.runInteraction( "stats_collect_old", self._collect_old_txn, stats_type ) if not maybe_more: - return + defer.returnValue(None) - async def update_stats_delta( + @defer.inlineCallbacks + def update_stats_delta( self, ts, stats_type, stats_id, fields, complete_with_stream_id=None ): """ @@ -847,7 +853,7 @@ class StatsStore(StateDeltasStore): while True: try: - return await self.runInteraction( + res = yield self.runInteraction( "update_stats_delta", self._update_stats_delta_txn, ts, @@ -856,9 +862,10 @@ class StatsStore(StateDeltasStore): fields, complete_with_stream_id=complete_with_stream_id, ) + defer.returnValue(res) except OldCollectionRequired: # retry after collecting old rows - await self.collect_old(stats_type) + yield self.collect_old(stats_type) def _update_stats_delta_txn( self, |