diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/_base.py | 6 | ||||
-rw-r--r-- | synapse/storage/devices.py | 8 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 9 | ||||
-rw-r--r-- | synapse/storage/event_push_actions.py | 13 | ||||
-rw-r--r-- | synapse/storage/events.py | 11 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 6 |
6 files changed, 37 insertions, 16 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1d41d8d445..44f37b4c1e 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -311,6 +311,12 @@ class SQLBaseStore(object): after_callbacks = [] exception_callbacks = [] + if LoggingContext.current_context() == LoggingContext.sentinel: + logger.warn( + "Starting db txn '%s' from sentinel context", + desc, + ) + try: result = yield self.runWithConnection( self._new_transaction, diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index cc3cdf2ebc..52dccb1507 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -21,6 +21,7 @@ from canonicaljson import json from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList from ._base import Cache, SQLBaseStore @@ -711,6 +712,9 @@ class DeviceStore(SQLBaseStore): logger.info("Pruned %d device list outbound pokes", txn.rowcount) - return self.runInteraction( - "_prune_old_outbound_device_pokes", _prune_txn + run_as_background_process( + "prune_old_outbound_device_pokes", + self.runInteraction, + "_prune_old_outbound_device_pokes", + _prune_txn, ) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 8d366d1b91..65f2d19e20 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -23,6 +23,7 @@ from unpaddedbase64 import encode_base64 from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore from synapse.storage.events import EventsWorkerStore from synapse.storage.signatures import SignatureWorkerStore @@ -446,7 +447,7 @@ class EventFederationStore(EventFederationWorkerStore): ) hs.get_clock().looping_call( - self._delete_old_forward_extrem_cache, 60 * 60 * 1000 + self._delete_old_forward_extrem_cache, 60 * 60 * 1000, ) def _update_min_depth_for_room_txn(self, txn, room_id, depth): @@ -548,9 +549,11 @@ class EventFederationStore(EventFederationWorkerStore): sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago,) ) - return self.runInteraction( + run_as_background_process( + "delete_old_forward_extrem_cache", + self.runInteraction, "_delete_old_forward_extrem_cache", - _delete_old_forward_extrem_cache_txn + _delete_old_forward_extrem_cache_txn, ) def clean_room_for_join(self, room_id): diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 29b511ae5e..4f44b0ad47 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -22,6 +22,7 @@ from canonicaljson import json from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import LoggingTransaction, SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks @@ -458,11 +459,12 @@ class EventPushActionsWorkerStore(SQLBaseStore): "Error removing push actions after event persistence failure", ) - @defer.inlineCallbacks def _find_stream_orderings_for_times(self): - yield self.runInteraction( + run_as_background_process( + "event_push_action_stream_orderings", + self.runInteraction, "_find_stream_orderings_for_times", - self._find_stream_orderings_for_times_txn + self._find_stream_orderings_for_times_txn, ) def _find_stream_orderings_for_times_txn(self, txn): @@ -604,7 +606,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): self._doing_notif_rotation = False self._rotate_notif_loop = self._clock.looping_call( - self._rotate_notifs, 30 * 60 * 1000 + self._start_rotate_notifs, 30 * 60 * 1000, ) def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts, @@ -787,6 +789,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore): WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? """, (room_id, user_id, stream_ordering)) + def _start_rotate_notifs(self): + run_as_background_process("rotate_notifs", self._rotate_notifs) + @defer.inlineCallbacks def _rotate_notifs(self): if self._doing_notif_rotation or self.stream_ordering_day_ago is None: diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d7fea0986b..200f5ec95f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -142,15 +142,14 @@ class _EventPeristenceQueue(object): try: queue = self._get_drainining_queue(room_id) for item in queue: - # handle_queue_loop runs in the sentinel logcontext, so - # there is no need to preserve_fn when running the - # callbacks on the deferred. try: ret = yield per_item_callback(item) + except Exception: + with PreserveLoggingContext(): + item.deferred.errback() + else: with PreserveLoggingContext(): item.deferred.callback(ret) - except Exception: - item.deferred.errback() finally: queue = self._event_persist_queues.pop(room_id, None) if queue: @@ -1139,7 +1138,7 @@ class EventsStore(EventsWorkerStore): ): txn.executemany( "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,), - [(ev.event_id,) for ev, _ in events_and_contexts] + [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts] ) def _store_event_txn(self, txn, events_and_contexts): diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index c3bc94f56d..b4b479d94c 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -22,6 +22,7 @@ from canonicaljson import encode_canonical_json, json from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches.descriptors import cached from ._base import SQLBaseStore @@ -57,7 +58,7 @@ class TransactionStore(SQLBaseStore): def __init__(self, db_conn, hs): super(TransactionStore, self).__init__(db_conn, hs) - self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) + self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000) def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have @@ -271,6 +272,9 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) + def _start_cleanup_transactions(self): + run_as_background_process("cleanup_transactions", self._cleanup_transactions) + def _cleanup_transactions(self): now = self._clock.time_msec() month_ago = now - 30 * 24 * 60 * 60 * 1000 |