diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index d0350ee5fe..6840320641 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -14,16 +14,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage._base import SQLBaseStore, LoggingTransaction
-from twisted.internet import defer
-from synapse.util.async import sleep
-from synapse.util.caches.descriptors import cachedInlineCallbacks
-
import logging
-import simplejson as json
from six import iteritems
+from canonicaljson import json
+
+from twisted.internet import defer
+
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage._base import LoggingTransaction, SQLBaseStore
+from synapse.util.caches.descriptors import cachedInlineCallbacks
+
logger = logging.getLogger(__name__)
@@ -84,6 +86,8 @@ class EventPushActionsWorkerStore(SQLBaseStore):
self.find_stream_orderings_looping_call = self._clock.looping_call(
self._find_stream_orderings_for_times, 10 * 60 * 1000
)
+ self._rotate_delay = 3
+ self._rotate_count = 10000
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
@@ -455,11 +459,12 @@ class EventPushActionsWorkerStore(SQLBaseStore):
"Error removing push actions after event persistence failure",
)
- @defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
- yield self.runInteraction(
+ return run_as_background_process(
+ "event_push_action_stream_orderings",
+ self.runInteraction,
"_find_stream_orderings_for_times",
- self._find_stream_orderings_for_times_txn
+ self._find_stream_orderings_for_times_txn,
)
def _find_stream_orderings_for_times_txn(self, txn):
@@ -601,7 +606,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
self._doing_notif_rotation = False
self._rotate_notif_loop = self._clock.looping_call(
- self._rotate_notifs, 30 * 60 * 1000
+ self._start_rotate_notifs, 30 * 60 * 1000,
)
def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
@@ -784,6 +789,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
""", (room_id, user_id, stream_ordering))
+ def _start_rotate_notifs(self):
+ return run_as_background_process("rotate_notifs", self._rotate_notifs)
+
@defer.inlineCallbacks
def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
@@ -800,7 +808,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
)
if caught_up:
break
- yield sleep(5)
+ yield self.hs.get_clock().sleep(self._rotate_delay)
finally:
self._doing_notif_rotation = False
@@ -821,8 +829,8 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
txn.execute("""
SELECT stream_ordering FROM event_push_actions
WHERE stream_ordering > ?
- ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
- """, (old_rotate_stream_ordering,))
+ ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
+ """, (old_rotate_stream_ordering, self._rotate_count))
stream_row = txn.fetchone()
if stream_row:
offset_stream_ordering, = stream_row
|