diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 422645cd27..3590133a32 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -16,7 +16,6 @@
import logging
from twisted.internet import defer
-from twisted.internet.defer import ensureDeferred
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.handlers.state_deltas import StateDeltasHandler
@@ -245,10 +244,8 @@ class StatsHandler(StateDeltasHandler):
field = "public_rooms" if public else "private_rooms"
delta = +1 if membership == Membership.JOIN else -1
- yield ensureDeferred(
- self.store.update_stats_delta(
- now, "user", user_id, {field: delta}
- )
+ yield self.store.update_stats_delta(
+ now, "user", user_id, {field: delta}
)
elif typ == EventTypes.Create:
@@ -329,20 +326,17 @@ class StatsHandler(StateDeltasHandler):
)
if room_stats_complete:
- yield ensureDeferred(
- self.store.update_stats_delta(
- now,
- "room",
- room_id,
- room_stats_delta,
- complete_with_stream_id=stream_id,
- )
+ yield self.store.update_stats_delta(
+ now,
+ "room",
+ room_id,
+ room_stats_delta,
+ complete_with_stream_id=stream_id,
)
+
elif len(room_stats_delta) > 0:
- yield ensureDeferred(
- self.store.update_stats_delta(
- now, "room", room_id, room_stats_delta
- )
+ yield self.store.update_stats_delta(
+ now, "room", room_id, room_stats_delta
)
@defer.inlineCallbacks
@@ -362,16 +356,14 @@ class StatsHandler(StateDeltasHandler):
for user_id in user_ids:
if self.hs.is_mine(UserID.from_string(user_id)):
- yield ensureDeferred(
- self.store.update_stats_delta(
- ts,
- "user",
- user_id,
- {
- "public_rooms": +1 if is_public else -1,
- "private_rooms": -1 if is_public else +1,
- },
- )
+ yield self.store.update_stats_delta(
+ ts,
+ "user",
+ user_id,
+ {
+ "public_rooms": +1 if is_public else -1,
+ "private_rooms": -1 if is_public else +1,
+ },
)
@defer.inlineCallbacks
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index d60e6fb7d8..1bb5459839 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -19,7 +19,6 @@ from itertools import chain
from threading import Lock
from twisted.internet import defer
-from twisted.internet.defer import ensureDeferred
from synapse.api.constants import EventTypes, Membership
from synapse.storage.engines import Sqlite3Engine
@@ -81,7 +80,8 @@ class StatsStore(StateDeltasStore):
def quantise_stats_time(self, ts):
return (ts // self.stats_bucket_size) * self.stats_bucket_size
- async def _unwedge_incremental_processor(self, forced_promise):
+ @defer.inlineCallbacks
+ def _unwedge_incremental_processor(self, forced_promise):
"""
Make a promise about what this initial background count will handle,
so that we can allow the incremental processor to start doing things
@@ -90,7 +90,7 @@ class StatsStore(StateDeltasStore):
if forced_promise is None:
promised_stats_delta_pos = (
- await self.get_max_stream_id_in_current_state_deltas()
+ yield self.get_max_stream_id_in_current_state_deltas()
)
promised_max = self.get_room_max_stream_ordering()
@@ -105,15 +105,19 @@ class StatsStore(StateDeltasStore):
promised_positions = forced_promise
# this stores it for our reference later
- await self.update_stats_positions(
+ yield self.update_stats_positions(
promised_positions, for_initial_processor=True
)
# this unwedges the incremental processor
- await self.update_stats_positions(
+ yield self.update_stats_positions(
promised_positions, for_initial_processor=False
)
+ # with the delta processor unwedged, now let it catch up in case
+ # anything was missed during the wedge period
+ self.clock.call_later(0, self.hs.get_stats_handler().notify_new_event)
+
@defer.inlineCallbacks
def _populate_stats_prepare(self, progress, batch_size):
@@ -188,7 +192,7 @@ class StatsStore(StateDeltasStore):
"populate_stats_delete_dirty_skeletons", _delete_dirty_skeletons
)
- yield ensureDeferred(self._unwedge_incremental_processor(old_positions))
+ yield self._unwedge_incremental_processor(old_positions)
yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons)
self.get_earliest_token_for_stats.invalidate_all()
@@ -821,15 +825,17 @@ class StatsStore(StateDeltasStore):
return maybe_more
- async def collect_old(self, stats_type):
+ @defer.inlineCallbacks
+ def collect_old(self, stats_type):
while True:
- maybe_more = await self.runInteraction(
+ maybe_more = yield self.runInteraction(
"stats_collect_old", self._collect_old_txn, stats_type
)
if not maybe_more:
- return
+ defer.returnValue(None)
- async def update_stats_delta(
+ @defer.inlineCallbacks
+ def update_stats_delta(
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
):
"""
@@ -847,7 +853,7 @@ class StatsStore(StateDeltasStore):
while True:
try:
- return await self.runInteraction(
+ res = yield self.runInteraction(
"update_stats_delta",
self._update_stats_delta_txn,
ts,
@@ -856,9 +862,10 @@ class StatsStore(StateDeltasStore):
fields,
complete_with_stream_id=complete_with_stream_id,
)
+ defer.returnValue(res)
except OldCollectionRequired:
# retry after collecting old rows
- await self.collect_old(stats_type)
+ yield self.collect_old(stats_type)
def _update_stats_delta_txn(
self,
|