summary refs log tree commit diff
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-09 15:51:42 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-09 15:51:42 +0100
commitd54ae7118dced5d8fd07f92994ccfccda509c6cb (patch)
tree1246f14cbd080425af4aa872f8627b9f1415972d
parentAdd initial batch of stats tests (diff)
downloadsynapse-d54ae7118dced5d8fd07f92994ccfccda509c6cb.tar.xz
Move back to `defer.inlineCallbacks` from `async` as it makes stats
unergonomic if we move to `async` from the bottom-up.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
-rw-r--r--synapse/handlers/stats.py46
-rw-r--r--synapse/storage/stats.py31
2 files changed, 38 insertions, 39 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 422645cd27..3590133a32 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -16,7 +16,6 @@
 import logging
 
 from twisted.internet import defer
-from twisted.internet.defer import ensureDeferred
 
 from synapse.api.constants import EventTypes, JoinRules, Membership
 from synapse.handlers.state_deltas import StateDeltasHandler
@@ -245,10 +244,8 @@ class StatsHandler(StateDeltasHandler):
                     field = "public_rooms" if public else "private_rooms"
                     delta = +1 if membership == Membership.JOIN else -1
 
-                    yield ensureDeferred(
-                        self.store.update_stats_delta(
-                            now, "user", user_id, {field: delta}
-                        )
+                    yield self.store.update_stats_delta(
+                        now, "user", user_id, {field: delta}
                     )
 
             elif typ == EventTypes.Create:
@@ -329,20 +326,17 @@ class StatsHandler(StateDeltasHandler):
                 )
 
             if room_stats_complete:
-                yield ensureDeferred(
-                    self.store.update_stats_delta(
-                        now,
-                        "room",
-                        room_id,
-                        room_stats_delta,
-                        complete_with_stream_id=stream_id,
-                    )
+                yield self.store.update_stats_delta(
+                    now,
+                    "room",
+                    room_id,
+                    room_stats_delta,
+                    complete_with_stream_id=stream_id,
                 )
+
             elif len(room_stats_delta) > 0:
-                yield ensureDeferred(
-                    self.store.update_stats_delta(
-                        now, "room", room_id, room_stats_delta
-                    )
+                yield self.store.update_stats_delta(
+                    now, "room", room_id, room_stats_delta
                 )
 
     @defer.inlineCallbacks
@@ -362,16 +356,14 @@ class StatsHandler(StateDeltasHandler):
 
         for user_id in user_ids:
             if self.hs.is_mine(UserID.from_string(user_id)):
-                yield ensureDeferred(
-                    self.store.update_stats_delta(
-                        ts,
-                        "user",
-                        user_id,
-                        {
-                            "public_rooms": +1 if is_public else -1,
-                            "private_rooms": -1 if is_public else +1,
-                        },
-                    )
+                yield self.store.update_stats_delta(
+                    ts,
+                    "user",
+                    user_id,
+                    {
+                        "public_rooms": +1 if is_public else -1,
+                        "private_rooms": -1 if is_public else +1,
+                    },
                 )
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index d60e6fb7d8..1bb5459839 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -19,7 +19,6 @@ from itertools import chain
 from threading import Lock
 
 from twisted.internet import defer
-from twisted.internet.defer import ensureDeferred
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.storage.engines import Sqlite3Engine
@@ -81,7 +80,8 @@ class StatsStore(StateDeltasStore):
     def quantise_stats_time(self, ts):
         return (ts // self.stats_bucket_size) * self.stats_bucket_size
 
-    async def _unwedge_incremental_processor(self, forced_promise):
+    @defer.inlineCallbacks
+    def _unwedge_incremental_processor(self, forced_promise):
         """
         Make a promise about what this initial background count will handle,
         so that we can allow the incremental processor to start doing things
@@ -90,7 +90,7 @@ class StatsStore(StateDeltasStore):
 
         if forced_promise is None:
             promised_stats_delta_pos = (
-                await self.get_max_stream_id_in_current_state_deltas()
+                yield self.get_max_stream_id_in_current_state_deltas()
             )
 
             promised_max = self.get_room_max_stream_ordering()
@@ -105,15 +105,19 @@ class StatsStore(StateDeltasStore):
             promised_positions = forced_promise
 
         # this stores it for our reference later
-        await self.update_stats_positions(
+        yield self.update_stats_positions(
             promised_positions, for_initial_processor=True
         )
 
         # this unwedges the incremental processor
-        await self.update_stats_positions(
+        yield self.update_stats_positions(
             promised_positions, for_initial_processor=False
         )
 
+        # with the delta processor unwedged, now let it catch up in case
+        # anything was missed during the wedge period
+        self.clock.call_later(0, self.hs.get_stats_handler().notify_new_event)
+
     @defer.inlineCallbacks
     def _populate_stats_prepare(self, progress, batch_size):
 
@@ -188,7 +192,7 @@ class StatsStore(StateDeltasStore):
             "populate_stats_delete_dirty_skeletons", _delete_dirty_skeletons
         )
 
-        yield ensureDeferred(self._unwedge_incremental_processor(old_positions))
+        yield self._unwedge_incremental_processor(old_positions)
 
         yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons)
         self.get_earliest_token_for_stats.invalidate_all()
@@ -821,15 +825,17 @@ class StatsStore(StateDeltasStore):
 
         return maybe_more
 
-    async def collect_old(self, stats_type):
+    @defer.inlineCallbacks
+    def collect_old(self, stats_type):
         while True:
-            maybe_more = await self.runInteraction(
+            maybe_more = yield self.runInteraction(
                 "stats_collect_old", self._collect_old_txn, stats_type
             )
             if not maybe_more:
-                return
+                defer.returnValue(None)
 
-    async def update_stats_delta(
+    @defer.inlineCallbacks
+    def update_stats_delta(
         self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
     ):
         """
@@ -847,7 +853,7 @@ class StatsStore(StateDeltasStore):
 
         while True:
             try:
-                return await self.runInteraction(
+                res = yield self.runInteraction(
                     "update_stats_delta",
                     self._update_stats_delta_txn,
                     ts,
@@ -856,9 +862,10 @@ class StatsStore(StateDeltasStore):
                     fields,
                     complete_with_stream_id=complete_with_stream_id,
                 )
+                defer.returnValue(res)
             except OldCollectionRequired:
                 # retry after collecting old rows
-                await self.collect_old(stats_type)
+                yield self.collect_old(stats_type)
 
     def _update_stats_delta_txn(
         self,