summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py6
-rw-r--r--synapse/storage/devices.py8
-rw-r--r--synapse/storage/event_federation.py9
-rw-r--r--synapse/storage/event_push_actions.py13
-rw-r--r--synapse/storage/events.py11
-rw-r--r--synapse/storage/transactions.py6
6 files changed, 37 insertions, 16 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 1d41d8d445..44f37b4c1e 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -311,6 +311,12 @@ class SQLBaseStore(object):
         after_callbacks = []
         exception_callbacks = []
 
+        if LoggingContext.current_context() == LoggingContext.sentinel:
+            logger.warn(
+                "Starting db txn '%s' from sentinel context",
+                desc,
+            )
+
         try:
             result = yield self.runWithConnection(
                 self._new_transaction,
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index cc3cdf2ebc..52dccb1507 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -21,6 +21,7 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 
 from ._base import Cache, SQLBaseStore
@@ -711,6 +712,9 @@ class DeviceStore(SQLBaseStore):
 
             logger.info("Pruned %d device list outbound pokes", txn.rowcount)
 
-        return self.runInteraction(
-            "_prune_old_outbound_device_pokes", _prune_txn
+        run_as_background_process(
+            "prune_old_outbound_device_pokes",
+            self.runInteraction,
+            "_prune_old_outbound_device_pokes",
+            _prune_txn,
         )
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 8d366d1b91..65f2d19e20 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -23,6 +23,7 @@ from unpaddedbase64 import encode_base64
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.events import EventsWorkerStore
 from synapse.storage.signatures import SignatureWorkerStore
@@ -446,7 +447,7 @@ class EventFederationStore(EventFederationWorkerStore):
         )
 
         hs.get_clock().looping_call(
-            self._delete_old_forward_extrem_cache, 60 * 60 * 1000
+            self._delete_old_forward_extrem_cache, 60 * 60 * 1000,
         )
 
     def _update_min_depth_for_room_txn(self, txn, room_id, depth):
@@ -548,9 +549,11 @@ class EventFederationStore(EventFederationWorkerStore):
                 sql,
                 (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
             )
-        return self.runInteraction(
+        run_as_background_process(
+            "delete_old_forward_extrem_cache",
+            self.runInteraction,
             "_delete_old_forward_extrem_cache",
-            _delete_old_forward_extrem_cache_txn
+            _delete_old_forward_extrem_cache_txn,
         )
 
     def clean_room_for_join(self, room_id):
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 29b511ae5e..4f44b0ad47 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -22,6 +22,7 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import LoggingTransaction, SQLBaseStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 
@@ -458,11 +459,12 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 "Error removing push actions after event persistence failure",
             )
 
-    @defer.inlineCallbacks
     def _find_stream_orderings_for_times(self):
-        yield self.runInteraction(
+        run_as_background_process(
+            "event_push_action_stream_orderings",
+            self.runInteraction,
             "_find_stream_orderings_for_times",
-            self._find_stream_orderings_for_times_txn
+            self._find_stream_orderings_for_times_txn,
         )
 
     def _find_stream_orderings_for_times_txn(self, txn):
@@ -604,7 +606,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
 
         self._doing_notif_rotation = False
         self._rotate_notif_loop = self._clock.looping_call(
-            self._rotate_notifs, 30 * 60 * 1000
+            self._start_rotate_notifs, 30 * 60 * 1000,
         )
 
     def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
@@ -787,6 +789,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
         """, (room_id, user_id, stream_ordering))
 
+    def _start_rotate_notifs(self):
+        run_as_background_process("rotate_notifs", self._rotate_notifs)
+
     @defer.inlineCallbacks
     def _rotate_notifs(self):
         if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index d7fea0986b..200f5ec95f 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -142,15 +142,14 @@ class _EventPeristenceQueue(object):
             try:
                 queue = self._get_drainining_queue(room_id)
                 for item in queue:
-                    # handle_queue_loop runs in the sentinel logcontext, so
-                    # there is no need to preserve_fn when running the
-                    # callbacks on the deferred.
                     try:
                         ret = yield per_item_callback(item)
+                    except Exception:
+                        with PreserveLoggingContext():
+                            item.deferred.errback()
+                    else:
                         with PreserveLoggingContext():
                             item.deferred.callback(ret)
-                    except Exception:
-                        item.deferred.errback()
             finally:
                 queue = self._event_persist_queues.pop(room_id, None)
                 if queue:
@@ -1139,7 +1138,7 @@ class EventsStore(EventsWorkerStore):
         ):
             txn.executemany(
                 "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
-                [(ev.event_id,) for ev, _ in events_and_contexts]
+                [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts]
             )
 
     def _store_event_txn(self, txn, events_and_contexts):
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index c3bc94f56d..b4b479d94c 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -22,6 +22,7 @@ from canonicaljson import encode_canonical_json, json
 
 from twisted.internet import defer
 
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.caches.descriptors import cached
 
 from ._base import SQLBaseStore
@@ -57,7 +58,7 @@ class TransactionStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
         super(TransactionStore, self).__init__(db_conn, hs)
 
-        self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000)
+        self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
 
     def get_received_txn_response(self, transaction_id, origin):
         """For an incoming transaction from a given origin, check if we have
@@ -271,6 +272,9 @@ class TransactionStore(SQLBaseStore):
         txn.execute(query, (self._clock.time_msec(),))
         return self.cursor_to_dict(txn)
 
+    def _start_cleanup_transactions(self):
+        run_as_background_process("cleanup_transactions", self._cleanup_transactions)
+
     def _cleanup_transactions(self):
         now = self._clock.time_msec()
         month_ago = now - 30 * 24 * 60 * 60 * 1000