diff options
Diffstat (limited to 'synapse/storage/event_push_actions.py')
-rw-r--r-- | synapse/storage/event_push_actions.py | 34 |
1 files changed, 21 insertions, 13 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index d0350ee5fe..6840320641 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -14,16 +14,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage._base import SQLBaseStore, LoggingTransaction -from twisted.internet import defer -from synapse.util.async import sleep -from synapse.util.caches.descriptors import cachedInlineCallbacks - import logging -import simplejson as json from six import iteritems +from canonicaljson import json + +from twisted.internet import defer + +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.storage._base import LoggingTransaction, SQLBaseStore +from synapse.util.caches.descriptors import cachedInlineCallbacks + logger = logging.getLogger(__name__) @@ -84,6 +86,8 @@ class EventPushActionsWorkerStore(SQLBaseStore): self.find_stream_orderings_looping_call = self._clock.looping_call( self._find_stream_orderings_for_times, 10 * 60 * 1000 ) + self._rotate_delay = 3 + self._rotate_count = 10000 @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000) def get_unread_event_push_actions_by_room_for_user( @@ -455,11 +459,12 @@ class EventPushActionsWorkerStore(SQLBaseStore): "Error removing push actions after event persistence failure", ) - @defer.inlineCallbacks def _find_stream_orderings_for_times(self): - yield self.runInteraction( + return run_as_background_process( + "event_push_action_stream_orderings", + self.runInteraction, "_find_stream_orderings_for_times", - self._find_stream_orderings_for_times_txn + self._find_stream_orderings_for_times_txn, ) def _find_stream_orderings_for_times_txn(self, txn): @@ -601,7 +606,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): self._doing_notif_rotation = False self._rotate_notif_loop = self._clock.looping_call( - self._rotate_notifs, 30 * 60 * 1000 + self._start_rotate_notifs, 30 * 60 * 1000, ) def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts, @@ -784,6 +789,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore): WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? """, (room_id, user_id, stream_ordering)) + def _start_rotate_notifs(self): + return run_as_background_process("rotate_notifs", self._rotate_notifs) + @defer.inlineCallbacks def _rotate_notifs(self): if self._doing_notif_rotation or self.stream_ordering_day_ago is None: @@ -800,7 +808,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): ) if caught_up: break - yield sleep(5) + yield self.hs.get_clock().sleep(self._rotate_delay) finally: self._doing_notif_rotation = False @@ -821,8 +829,8 @@ class EventPushActionsStore(EventPushActionsWorkerStore): txn.execute(""" SELECT stream_ordering FROM event_push_actions WHERE stream_ordering > ? - ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000 - """, (old_rotate_stream_ordering,)) + ORDER BY stream_ordering ASC LIMIT 1 OFFSET ? + """, (old_rotate_stream_ordering, self._rotate_count)) stream_row = txn.fetchone() if stream_row: offset_stream_ordering, = stream_row |