diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 4872ff55b6..e124161845 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -50,6 +50,21 @@ sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"])
sql_txn_timer = Histogram("synapse_storage_transaction_time", "sec", ["desc"])
+# Unique indexes which have been added in background updates. Maps from table name
+# to the name of the background update which added the unique index to that table.
+#
+# This is used by the upsert logic to figure out which tables are safe to do a proper
+# UPSERT on: until the relevant background update has completed, we
+# have to emulate an upsert by locking the table.
+#
+UNIQUE_INDEX_BACKGROUND_UPDATES = {
+ "user_ips": "user_ips_device_unique_index",
+ "device_lists_remote_extremeties": "device_lists_remote_extremeties_unique_idx",
+ "device_lists_remote_cache": "device_lists_remote_cache_unique_idx",
+ "event_search": "event_search_event_id_idx",
+}
+
+
class LoggingTransaction(object):
"""An object that almost-transparently proxies for the 'txn' object
passed to the constructor. Adds logging and metrics to the .execute()
@@ -194,7 +209,7 @@ class SQLBaseStore(object):
self.database_engine = hs.database_engine
# A set of tables that are not safe to use native upserts in.
- self._unsafe_to_upsert_tables = {"user_ips"}
+ self._unsafe_to_upsert_tables = set(UNIQUE_INDEX_BACKGROUND_UPDATES.keys())
# We add the user_directory_search table to the blacklist on SQLite
# because the existing search table does not have an index, making it
@@ -230,12 +245,12 @@ class SQLBaseStore(object):
)
updates = [x["update_name"] for x in updates]
- # The User IPs table in schema #53 was missing a unique index, which we
- # run as a background update.
- if "user_ips_device_unique_index" not in updates:
- self._unsafe_to_upsert_tables.discard("user_ips")
+ for table, update_name in UNIQUE_INDEX_BACKGROUND_UPDATES.items():
+ if update_name not in updates:
+ logger.debug("Now safe to upsert in %s", table)
+ self._unsafe_to_upsert_tables.discard(table)
- # If there's any tables left to check, reschedule to run.
+ # If there's any updates still running, reschedule to run.
if updates:
self._clock.call_later(
15.0,
|