diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 254fdc04c6..f62f70b9f1 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -26,6 +26,7 @@ from prometheus_client import Histogram
from twisted.internet import defer
from synapse.api.errors import StoreError
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.engines import PostgresEngine
from synapse.util.caches.descriptors import Cache
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
@@ -198,7 +199,12 @@ class SQLBaseStore(object):
if self.database_engine.can_native_upsert:
# Check ASAP (and then later, every 1s) to see if we have finished
# background updates of tables that aren't safe to update.
- self._clock.call_later(0.0, self._check_safe_to_upsert)
+ self._clock.call_later(
+ 0.0,
+ run_as_background_process,
+ "upsert_safety_check",
+ self._check_safe_to_upsert
+ )
@defer.inlineCallbacks
def _check_safe_to_upsert(self):
@@ -208,7 +214,7 @@ class SQLBaseStore(object):
If there are background updates, we will need to wait, as they may be
the addition of indexes that set the UNIQUE constraint that we require.
- If the background updates have not completed, wait a second and check again.
+ If the background updates have not completed, wait 15 sec and check again.
"""
updates = yield self._simple_select_list(
"background_updates",
@@ -221,11 +227,16 @@ class SQLBaseStore(object):
# The User IPs table in schema #53 was missing a unique index, which we
# run as a background update.
if "user_ips_device_unique_index" not in updates:
- self._unsafe_to_upsert_tables.discard("user_id")
+ self._unsafe_to_upsert_tables.discard("user_ips")
# If there's any tables left to check, reschedule to run.
if self._unsafe_to_upsert_tables:
- self._clock.call_later(1.0, self._check_safe_to_upsert)
+ self._clock.call_later(
+ 15.0,
+ run_as_background_process,
+ "upsert_safety_check",
+ self._check_safe_to_upsert
+ )
def start_profiling(self):
self._previous_loop_ts = self._clock.time_msec()
@@ -609,7 +620,7 @@ class SQLBaseStore(object):
inserting
lock (bool): True to lock the table when doing the upsert.
Returns:
- Deferred(None or bool): Native upserts always return None. Emulated
+ None or bool: Native upserts always return None. Emulated
upserts return True if a new entry was created, False if an existing
one was updated.
"""
@@ -637,6 +648,18 @@ class SQLBaseStore(object):
def _simple_upsert_txn_emulated(
self, txn, table, keyvalues, values, insertion_values={}, lock=True
):
+ """
+ Args:
+ table (str): The table to upsert into
+ keyvalues (dict): The unique key tables and their new values
+ values (dict): The nonunique columns and their new values
+ insertion_values (dict): additional key/values to use only when
+ inserting
+ lock (bool): True to lock the table when doing the upsert.
+ Returns:
+ bool: Return True if a new entry was created, False if an existing
+ one was updated.
+ """
# We need to lock the table :(, unless we're *really* careful
if lock:
self.database_engine.lock_table(txn, table)
|