diff options
Diffstat (limited to 'synapse/storage/_base.py')
-rw-r--r-- | synapse/storage/_base.py | 27 |
1 files changed, 21 insertions, 6 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 4872ff55b6..e124161845 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -50,6 +50,21 @@ sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"]) sql_txn_timer = Histogram("synapse_storage_transaction_time", "sec", ["desc"]) +# Unique indexes which have been added in background updates. Maps from table name +# to the name of the background update which added the unique index to that table. +# +# This is used by the upsert logic to figure out which tables are safe to do a proper +# UPSERT on: until the relevant background update has completed, we +# have to emulate an upsert by locking the table. +# +UNIQUE_INDEX_BACKGROUND_UPDATES = { + "user_ips": "user_ips_device_unique_index", + "device_lists_remote_extremeties": "device_lists_remote_extremeties_unique_idx", + "device_lists_remote_cache": "device_lists_remote_cache_unique_idx", + "event_search": "event_search_event_id_idx", +} + + class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() @@ -194,7 +209,7 @@ class SQLBaseStore(object): self.database_engine = hs.database_engine # A set of tables that are not safe to use native upserts in. - self._unsafe_to_upsert_tables = {"user_ips"} + self._unsafe_to_upsert_tables = set(UNIQUE_INDEX_BACKGROUND_UPDATES.keys()) # We add the user_directory_search table to the blacklist on SQLite # because the existing search table does not have an index, making it @@ -230,12 +245,12 @@ class SQLBaseStore(object): ) updates = [x["update_name"] for x in updates] - # The User IPs table in schema #53 was missing a unique index, which we - # run as a background update. - if "user_ips_device_unique_index" not in updates: - self._unsafe_to_upsert_tables.discard("user_ips") + for table, update_name in UNIQUE_INDEX_BACKGROUND_UPDATES.items(): + if update_name not in updates: + logger.debug("Now safe to upsert in %s", table) + self._unsafe_to_upsert_tables.discard(table) - # If there's any tables left to check, reschedule to run. + # If there's any updates still running, reschedule to run. if updates: self._clock.call_later( 15.0, |