summary refs log tree commit diff
path: root/synapse/storage/event_push_actions.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/event_push_actions.py')
-rw-r--r--synapse/storage/event_push_actions.py34
1 files changed, 21 insertions, 13 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index d0350ee5fe..6840320641 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -14,16 +14,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage._base import SQLBaseStore, LoggingTransaction
-from twisted.internet import defer
-from synapse.util.async import sleep
-from synapse.util.caches.descriptors import cachedInlineCallbacks
-
 import logging
-import simplejson as json
 
 from six import iteritems
 
+from canonicaljson import json
+
+from twisted.internet import defer
+
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage._base import LoggingTransaction, SQLBaseStore
+from synapse.util.caches.descriptors import cachedInlineCallbacks
+
 logger = logging.getLogger(__name__)
 
 
@@ -84,6 +86,8 @@ class EventPushActionsWorkerStore(SQLBaseStore):
         self.find_stream_orderings_looping_call = self._clock.looping_call(
             self._find_stream_orderings_for_times, 10 * 60 * 1000
         )
+        self._rotate_delay = 3
+        self._rotate_count = 10000
 
     @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
     def get_unread_event_push_actions_by_room_for_user(
@@ -455,11 +459,12 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 "Error removing push actions after event persistence failure",
             )
 
-    @defer.inlineCallbacks
     def _find_stream_orderings_for_times(self):
-        yield self.runInteraction(
+        return run_as_background_process(
+            "event_push_action_stream_orderings",
+            self.runInteraction,
             "_find_stream_orderings_for_times",
-            self._find_stream_orderings_for_times_txn
+            self._find_stream_orderings_for_times_txn,
         )
 
     def _find_stream_orderings_for_times_txn(self, txn):
@@ -601,7 +606,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
 
         self._doing_notif_rotation = False
         self._rotate_notif_loop = self._clock.looping_call(
-            self._rotate_notifs, 30 * 60 * 1000
+            self._start_rotate_notifs, 30 * 60 * 1000,
         )
 
     def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
@@ -784,6 +789,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
         """, (room_id, user_id, stream_ordering))
 
+    def _start_rotate_notifs(self):
+        return run_as_background_process("rotate_notifs", self._rotate_notifs)
+
     @defer.inlineCallbacks
     def _rotate_notifs(self):
         if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
@@ -800,7 +808,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
                 )
                 if caught_up:
                     break
-                yield sleep(5)
+                yield self.hs.get_clock().sleep(self._rotate_delay)
         finally:
             self._doing_notif_rotation = False
 
@@ -821,8 +829,8 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         txn.execute("""
             SELECT stream_ordering FROM event_push_actions
             WHERE stream_ordering > ?
-            ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
-        """, (old_rotate_stream_ordering,))
+            ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
+        """, (old_rotate_stream_ordering, self._rotate_count))
         stream_row = txn.fetchone()
         if stream_row:
             offset_stream_ordering, = stream_row