diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 865b5e915a..f62f70b9f1 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -26,6 +26,7 @@ from prometheus_client import Histogram
from twisted.internet import defer
from synapse.api.errors import StoreError
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.engines import PostgresEngine
from synapse.util.caches.descriptors import Cache
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
@@ -192,6 +193,51 @@ class SQLBaseStore(object):
self.database_engine = hs.database_engine
+ # A set of tables that are not safe to use native upserts in.
+ self._unsafe_to_upsert_tables = {"user_ips"}
+
+ if self.database_engine.can_native_upsert:
+ # Check ASAP (and then later, every 1s) to see if we have finished
+ # background updates of tables that aren't safe to update.
+ self._clock.call_later(
+ 0.0,
+ run_as_background_process,
+ "upsert_safety_check",
+ self._check_safe_to_upsert
+ )
+
+ @defer.inlineCallbacks
+ def _check_safe_to_upsert(self):
+ """
+ Is it safe to use native UPSERT?
+
+ If there are background updates, we will need to wait, as they may be
+ the addition of indexes that set the UNIQUE constraint that we require.
+
+ If the background updates have not completed, wait 15 sec and check again.
+ """
+ updates = yield self._simple_select_list(
+ "background_updates",
+ keyvalues=None,
+ retcols=["update_name"],
+ desc="check_background_updates",
+ )
+ updates = [x["update_name"] for x in updates]
+
+ # The User IPs table in schema #53 was missing a unique index, which we
+ # run as a background update.
+ if "user_ips_device_unique_index" not in updates:
+ self._unsafe_to_upsert_tables.discard("user_ips")
+
+ # If there's any tables left to check, reschedule to run.
+ if self._unsafe_to_upsert_tables:
+ self._clock.call_later(
+ 15.0,
+ run_as_background_process,
+ "upsert_safety_check",
+ self._check_safe_to_upsert
+ )
+
def start_profiling(self):
self._previous_loop_ts = self._clock.time_msec()
@@ -494,8 +540,15 @@ class SQLBaseStore(object):
txn.executemany(sql, vals)
@defer.inlineCallbacks
- def _simple_upsert(self, table, keyvalues, values,
- insertion_values={}, desc="_simple_upsert", lock=True):
+ def _simple_upsert(
+ self,
+ table,
+ keyvalues,
+ values,
+ insertion_values={},
+ desc="_simple_upsert",
+ lock=True
+ ):
"""
`lock` should generally be set to True (the default), but can be set
@@ -516,16 +569,21 @@ class SQLBaseStore(object):
inserting
lock (bool): True to lock the table when doing the upsert.
Returns:
- Deferred(bool): True if a new entry was created, False if an
- existing one was updated.
+ Deferred(None or bool): Native upserts always return None. Emulated
+ upserts return True if a new entry was created, False if an existing
+ one was updated.
"""
attempts = 0
while True:
try:
result = yield self.runInteraction(
desc,
- self._simple_upsert_txn, table, keyvalues, values, insertion_values,
- lock=lock
+ self._simple_upsert_txn,
+ table,
+ keyvalues,
+ values,
+ insertion_values,
+ lock=lock,
)
defer.returnValue(result)
except self.database_engine.module.IntegrityError as e:
@@ -537,12 +595,71 @@ class SQLBaseStore(object):
# presumably we raced with another transaction: let's retry.
logger.warn(
- "IntegrityError when upserting into %s; retrying: %s",
- table, e
+ "%s when upserting into %s; retrying: %s", e.__name__, table, e
)
- def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
- lock=True):
+ def _simple_upsert_txn(
+ self,
+ txn,
+ table,
+ keyvalues,
+ values,
+ insertion_values={},
+ lock=True,
+ ):
+ """
+ Pick the UPSERT method which works best on the platform. Either the
+ native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
+
+ Args:
+ txn: The transaction to use.
+ table (str): The table to upsert into
+ keyvalues (dict): The unique key tables and their new values
+ values (dict): The nonunique columns and their new values
+ insertion_values (dict): additional key/values to use only when
+ inserting
+ lock (bool): True to lock the table when doing the upsert.
+ Returns:
+ None or bool: Native upserts always return None. Emulated
+ upserts return True if a new entry was created, False if an existing
+ one was updated.
+ """
+ if (
+ self.database_engine.can_native_upsert
+ and table not in self._unsafe_to_upsert_tables
+ ):
+ return self._simple_upsert_txn_native_upsert(
+ txn,
+ table,
+ keyvalues,
+ values,
+ insertion_values=insertion_values,
+ )
+ else:
+ return self._simple_upsert_txn_emulated(
+ txn,
+ table,
+ keyvalues,
+ values,
+ insertion_values=insertion_values,
+ lock=lock,
+ )
+
+ def _simple_upsert_txn_emulated(
+ self, txn, table, keyvalues, values, insertion_values={}, lock=True
+ ):
+ """
+ Args:
+ table (str): The table to upsert into
+ keyvalues (dict): The unique key tables and their new values
+ values (dict): The nonunique columns and their new values
+ insertion_values (dict): additional key/values to use only when
+ inserting
+ lock (bool): True to lock the table when doing the upsert.
+ Returns:
+ bool: Return True if a new entry was created, False if an existing
+ one was updated.
+ """
# We need to lock the table :(, unless we're *really* careful
if lock:
self.database_engine.lock_table(txn, table)
@@ -577,12 +694,44 @@ class SQLBaseStore(object):
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table,
", ".join(k for k in allvalues),
- ", ".join("?" for _ in allvalues)
+ ", ".join("?" for _ in allvalues),
)
txn.execute(sql, list(allvalues.values()))
# successfully inserted
return True
+ def _simple_upsert_txn_native_upsert(
+ self, txn, table, keyvalues, values, insertion_values={}
+ ):
+ """
+ Use the native UPSERT functionality in recent PostgreSQL versions.
+
+ Args:
+ table (str): The table to upsert into
+ keyvalues (dict): The unique key tables and their new values
+ values (dict): The nonunique columns and their new values
+ insertion_values (dict): additional key/values to use only when
+ inserting
+ Returns:
+ None
+ """
+ allvalues = {}
+ allvalues.update(keyvalues)
+ allvalues.update(values)
+ allvalues.update(insertion_values)
+
+ sql = (
+ "INSERT INTO %s (%s) VALUES (%s) "
+ "ON CONFLICT (%s) DO UPDATE SET %s"
+ ) % (
+ table,
+ ", ".join(k for k in allvalues),
+ ", ".join("?" for _ in allvalues),
+ ", ".join(k for k in keyvalues),
+ ", ".join(k + "=EXCLUDED." + k for k in values),
+ )
+ txn.execute(sql, list(allvalues.values()))
+
def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False, desc="_simple_select_one"):
"""Executes a SELECT query on the named table, which is expected to
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index b228a20ac2..091d7116c5 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -257,7 +257,10 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
)
def _update_client_ips_batch_txn(self, txn, to_update):
- self.database_engine.lock_table(txn, "user_ips")
+ if "user_ips" in self._unsafe_to_upsert_tables or (
+ not self.database_engine.can_native_upsert
+ ):
+ self.database_engine.lock_table(txn, "user_ips")
for entry in iteritems(to_update):
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index e2f9de8451..ff5ef97ca8 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -18,7 +18,7 @@ import platform
from ._base import IncorrectDatabaseSetup
from .postgres import PostgresEngine
-from .sqlite3 import Sqlite3Engine
+from .sqlite import Sqlite3Engine
SUPPORTED_MODULE = {
"sqlite3": Sqlite3Engine,
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 42225f8a2a..4004427c7b 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -38,6 +38,13 @@ class PostgresEngine(object):
return sql.replace("?", "%s")
def on_new_connection(self, db_conn):
+
+ # Get the version of PostgreSQL that we're using. As per the psycopg2
+ # docs: The number is formed by converting the major, minor, and
+ # revision numbers into two-decimal-digit numbers and appending them
+ # together. For example, version 8.1.5 will be returned as 80105
+ self._version = db_conn.server_version
+
db_conn.set_isolation_level(
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
@@ -54,6 +61,13 @@ class PostgresEngine(object):
cursor.close()
+ @property
+ def can_native_upsert(self):
+ """
+ Can we use native UPSERTs? This requires PostgreSQL 9.5+.
+ """
+ return self._version >= 90500
+
def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite.py
index 19949fc474..c64d73ff21 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite.py
@@ -15,6 +15,7 @@
import struct
import threading
+from sqlite3 import sqlite_version_info
from synapse.storage.prepare_database import prepare_database
@@ -30,6 +31,14 @@ class Sqlite3Engine(object):
self._current_state_group_id = None
self._current_state_group_id_lock = threading.Lock()
+ @property
+ def can_native_upsert(self):
+ """
+ Do we support native UPSERTs? This requires SQLite3 3.24+, plus some
+ more work we haven't done yet to tell what was inserted vs updated.
+ """
+ return sqlite_version_info >= (3, 24, 0)
+
def check_database(self, txn):
pass
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 2743b52bad..134297e284 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -215,7 +215,7 @@ class PusherStore(PusherWorkerStore):
with self._pushers_id_gen.get_next() as stream_id:
# no need to lock because `pushers` has a unique key on
# (app_id, pushkey, user_name) so _simple_upsert will retry
- newly_inserted = yield self._simple_upsert(
+ yield self._simple_upsert(
table="pushers",
keyvalues={
"app_id": app_id,
@@ -238,7 +238,12 @@ class PusherStore(PusherWorkerStore):
lock=False,
)
- if newly_inserted:
+ user_has_pusher = self.get_if_user_has_pusher.cache.get(
+ (user_id,), None, update_metrics=False
+ )
+
+ if user_has_pusher is not True:
+ # invalidate, since we the user might not have had a pusher before
yield self.runInteraction(
"add_pusher",
self._invalidate_cache_and_stream,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 40b13de80b..592c1bcd33 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -589,11 +589,11 @@ class RoomMemberStore(RoomMemberWorkerStore):
# We update the local_invites table only if the event is "current",
# i.e., its something that has just happened. If the event is an
- # outlier it is only current if its a "new remote event", like a
- # remote invite or a rejection of a remote invite.
+ # outlier it is only current if its an "out of band membership",
+ # like a remote invite or a rejection of a remote invite.
is_new_state = not backfilled and (
not event.internal_metadata.is_outlier()
- or event.internal_metadata.is_new_remote_event()
+ or event.internal_metadata.is_out_of_band_membership()
)
is_mine = self.hs.is_mine_id(event.state_key)
if is_new_state and is_mine:
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index a8781b0e5d..ce48212265 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -168,14 +168,14 @@ class UserDirectoryStore(SQLBaseStore):
if isinstance(self.database_engine, PostgresEngine):
# We weight the localpart most highly, then display name and finally
# server name
- if new_entry:
+ if self.database_engine.can_native_upsert:
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES (?,
setweight(to_tsvector('english', ?), 'A')
|| setweight(to_tsvector('english', ?), 'D')
|| setweight(to_tsvector('english', COALESCE(?, '')), 'B')
- )
+ ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
"""
txn.execute(
sql,
@@ -185,20 +185,45 @@ class UserDirectoryStore(SQLBaseStore):
)
)
else:
- sql = """
- UPDATE user_directory_search
- SET vector = setweight(to_tsvector('english', ?), 'A')
- || setweight(to_tsvector('english', ?), 'D')
- || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
- WHERE user_id = ?
- """
- txn.execute(
- sql,
- (
- get_localpart_from_id(user_id), get_domain_from_id(user_id),
- display_name, user_id,
+ # TODO: Remove this code after we've bumped the minimum version
+ # of postgres to always support upserts, so we can get rid of
+ # `new_entry` usage
+ if new_entry is True:
+ sql = """
+ INSERT INTO user_directory_search(user_id, vector)
+ VALUES (?,
+ setweight(to_tsvector('english', ?), 'A')
+ || setweight(to_tsvector('english', ?), 'D')
+ || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+ )
+ """
+ txn.execute(
+ sql,
+ (
+ user_id, get_localpart_from_id(user_id),
+ get_domain_from_id(user_id), display_name,
+ )
+ )
+ elif new_entry is False:
+ sql = """
+ UPDATE user_directory_search
+ SET vector = setweight(to_tsvector('english', ?), 'A')
+ || setweight(to_tsvector('english', ?), 'D')
+ || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+ WHERE user_id = ?
+ """
+ txn.execute(
+ sql,
+ (
+ get_localpart_from_id(user_id),
+ get_domain_from_id(user_id),
+ display_name, user_id,
+ )
+ )
+ else:
+ raise RuntimeError(
+ "upsert returned None when 'can_native_upsert' is False"
)
- )
elif isinstance(self.database_engine, Sqlite3Engine):
value = "%s %s" % (user_id, display_name,) if display_name else user_id
self._simple_upsert_txn(
|