summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-07-22 12:39:50 +0100
committerGitHub <noreply@github.com>2021-07-22 12:39:50 +0100
commit38b346a504cd4155b1986d50ebcff2199e1690be (patch)
tree70c3760c8d54cc64617714ebe7f99166d19ab28a
parentFix a handful of type annotations. (#10446) (diff)
downloadsynapse-38b346a504cd4155b1986d50ebcff2199e1690be.tar.xz
Replace `or_ignore` in `simple_insert` with `simple_upsert` (#10442)
Now that we have `simple_upsert` that should be used in preference to
trying to insert and looking for an exception. The main benefit is that
we ERROR message don't get written to postgres logs.

We also have tidy up the return value on `simple_upsert`, rather than
having a tri-state of inserted/not-inserted/unknown.
-rw-r--r--changelog.d/10442.misc1
-rw-r--r--synapse/storage/database.py51
-rw-r--r--synapse/storage/databases/main/devices.py9
-rw-r--r--synapse/storage/databases/main/monthly_active_users.py8
-rw-r--r--synapse/storage/databases/main/transactions.py8
-rw-r--r--synapse/storage/databases/main/user_directory.py66
6 files changed, 44 insertions, 99 deletions
diff --git a/changelog.d/10442.misc b/changelog.d/10442.misc
new file mode 100644
index 0000000000..b8d412d732
--- /dev/null
+++ b/changelog.d/10442.misc
@@ -0,0 +1 @@
+Replace usage of `or_ignore` in `simple_insert` with `simple_upsert` usage, to stop spamming postgres logs with spurious ERROR messages.
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index ccf9ac51ef..4d4643619f 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -832,31 +832,16 @@ class DatabasePool:
         self,
         table: str,
         values: Dict[str, Any],
-        or_ignore: bool = False,
         desc: str = "simple_insert",
-    ) -> bool:
+    ) -> None:
         """Executes an INSERT query on the named table.
 
         Args:
             table: string giving the table name
             values: dict of new column names and values for them
-            or_ignore: bool stating whether an exception should be raised
-                when a conflicting row already exists. If True, False will be
-                returned by the function instead
             desc: description of the transaction, for logging and metrics
-
-        Returns:
-             Whether the row was inserted or not. Only useful when `or_ignore` is True
         """
-        try:
-            await self.runInteraction(desc, self.simple_insert_txn, table, values)
-        except self.engine.module.IntegrityError:
-            # We have to do or_ignore flag at this layer, since we can't reuse
-            # a cursor after we receive an error from the db.
-            if not or_ignore:
-                raise
-            return False
-        return True
+        await self.runInteraction(desc, self.simple_insert_txn, table, values)
 
     @staticmethod
     def simple_insert_txn(
@@ -930,7 +915,7 @@ class DatabasePool:
         insertion_values: Optional[Dict[str, Any]] = None,
         desc: str = "simple_upsert",
         lock: bool = True,
-    ) -> Optional[bool]:
+    ) -> bool:
         """
 
         `lock` should generally be set to True (the default), but can be set
@@ -951,8 +936,8 @@ class DatabasePool:
             desc: description of the transaction, for logging and metrics
             lock: True to lock the table when doing the upsert.
         Returns:
-            Native upserts always return None. Emulated upserts return True if a
-            new entry was created, False if an existing one was updated.
+            Returns True if a row was inserted or updated (i.e. if `values` is
+            not empty then this always returns True)
         """
         insertion_values = insertion_values or {}
 
@@ -995,7 +980,7 @@ class DatabasePool:
         values: Dict[str, Any],
         insertion_values: Optional[Dict[str, Any]] = None,
         lock: bool = True,
-    ) -> Optional[bool]:
+    ) -> bool:
         """
         Pick the UPSERT method which works best on the platform. Either the
         native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
@@ -1008,16 +993,15 @@ class DatabasePool:
             insertion_values: additional key/values to use only when inserting
             lock: True to lock the table when doing the upsert.
         Returns:
-            Native upserts always return None. Emulated upserts return True if a
-            new entry was created, False if an existing one was updated.
+            Returns True if a row was inserted or updated (i.e. if `values` is
+            not empty then this always returns True)
         """
         insertion_values = insertion_values or {}
 
         if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
-            self.simple_upsert_txn_native_upsert(
+            return self.simple_upsert_txn_native_upsert(
                 txn, table, keyvalues, values, insertion_values=insertion_values
             )
-            return None
         else:
             return self.simple_upsert_txn_emulated(
                 txn,
@@ -1045,8 +1029,8 @@ class DatabasePool:
             insertion_values: additional key/values to use only when inserting
             lock: True to lock the table when doing the upsert.
         Returns:
-            Returns True if a new entry was created, False if an existing
-            one was updated.
+            Returns True if a row was inserted or updated (i.e. if `values` is
+            not empty then this always returns True)
         """
         insertion_values = insertion_values or {}
 
@@ -1086,8 +1070,7 @@ class DatabasePool:
 
             txn.execute(sql, sqlargs)
             if txn.rowcount > 0:
-                # successfully updated at least one row.
-                return False
+                return True
 
         # We didn't find any existing rows, so insert a new one
         allvalues: Dict[str, Any] = {}
@@ -1111,15 +1094,19 @@ class DatabasePool:
         keyvalues: Dict[str, Any],
         values: Dict[str, Any],
         insertion_values: Optional[Dict[str, Any]] = None,
-    ) -> None:
+    ) -> bool:
         """
-        Use the native UPSERT functionality in recent PostgreSQL versions.
+        Use the native UPSERT functionality in PostgreSQL.
 
         Args:
             table: The table to upsert into
             keyvalues: The unique key tables and their new values
             values: The nonunique columns and their new values
             insertion_values: additional key/values to use only when inserting
+
+        Returns:
+            Returns True if a row was inserted or updated (i.e. if `values` is
+            not empty then this always returns True)
         """
         allvalues: Dict[str, Any] = {}
         allvalues.update(keyvalues)
@@ -1140,6 +1127,8 @@ class DatabasePool:
         )
         txn.execute(sql, list(allvalues.values()))
 
+        return bool(txn.rowcount)
+
     async def simple_upsert_many(
         self,
         table: str,
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 18f07d96dc..3816a0ca53 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1078,16 +1078,18 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             return False
 
         try:
-            inserted = await self.db_pool.simple_insert(
+            inserted = await self.db_pool.simple_upsert(
                 "devices",
-                values={
+                keyvalues={
                     "user_id": user_id,
                     "device_id": device_id,
+                },
+                values={},
+                insertion_values={
                     "display_name": initial_device_display_name,
                     "hidden": False,
                 },
                 desc="store_device",
-                or_ignore=True,
             )
             if not inserted:
                 # if the device already exists, check if it's a real device, or
@@ -1099,6 +1101,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
                 )
                 if hidden:
                     raise StoreError(400, "The device ID is in use", Codes.FORBIDDEN)
+
             self.device_id_exists_cache.set(key, True)
             return inserted
         except StoreError:
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index fe25638289..d213b26703 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -297,17 +297,13 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
         Args:
             txn (cursor):
             user_id (str): user to add/update
-
-        Returns:
-            bool: True if a new entry was created, False if an
-            existing one was updated.
         """
 
         # Am consciously deciding to lock the table on the basis that is ought
         # never be a big table and alternative approaches (batching multiple
         # upserts into a single txn) introduced a lot of extra complexity.
         # See https://github.com/matrix-org/synapse/issues/3854 for more
-        is_insert = self.db_pool.simple_upsert_txn(
+        self.db_pool.simple_upsert_txn(
             txn,
             table="monthly_active_users",
             keyvalues={"user_id": user_id},
@@ -322,8 +318,6 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
             txn, self.user_last_seen_monthly_active, (user_id,)
         )
 
-        return is_insert
-
     async def populate_monthly_active_users(self, user_id):
         """Checks on the state of monthly active user limits and optionally
         add the user to the monthly active tables
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index d211c423b2..7728d5f102 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -134,16 +134,18 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
             response_dict: The response, to be encoded into JSON.
         """
 
-        await self.db_pool.simple_insert(
+        await self.db_pool.simple_upsert(
             table="received_transactions",
-            values={
+            keyvalues={
                 "transaction_id": transaction_id,
                 "origin": origin,
+            },
+            values={},
+            insertion_values={
                 "response_code": code,
                 "response_json": db_binary_type(encode_canonical_json(response_dict)),
                 "ts": self._clock.time_msec(),
             },
-            or_ignore=True,
             desc="set_received_txn_response",
         )
 
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index a6bfb4902a..9d28d69ac7 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -377,7 +377,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
             avatar_url = None
 
         def _update_profile_in_user_dir_txn(txn):
-            new_entry = self.db_pool.simple_upsert_txn(
+            self.db_pool.simple_upsert_txn(
                 txn,
                 table="user_directory",
                 keyvalues={"user_id": user_id},
@@ -388,8 +388,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
             if isinstance(self.database_engine, PostgresEngine):
                 # We weight the localpart most highly, then display name and finally
                 # server name
-                if self.database_engine.can_native_upsert:
-                    sql = """
+                sql = """
                         INSERT INTO user_directory_search(user_id, vector)
                         VALUES (?,
                             setweight(to_tsvector('simple', ?), 'A')
@@ -397,58 +396,15 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
                             || setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
                         ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
                     """
-                    txn.execute(
-                        sql,
-                        (
-                            user_id,
-                            get_localpart_from_id(user_id),
-                            get_domain_from_id(user_id),
-                            display_name,
-                        ),
-                    )
-                else:
-                    # TODO: Remove this code after we've bumped the minimum version
-                    # of postgres to always support upserts, so we can get rid of
-                    # `new_entry` usage
-                    if new_entry is True:
-                        sql = """
-                            INSERT INTO user_directory_search(user_id, vector)
-                            VALUES (?,
-                                setweight(to_tsvector('simple', ?), 'A')
-                                || setweight(to_tsvector('simple', ?), 'D')
-                                || setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
-                            )
-                        """
-                        txn.execute(
-                            sql,
-                            (
-                                user_id,
-                                get_localpart_from_id(user_id),
-                                get_domain_from_id(user_id),
-                                display_name,
-                            ),
-                        )
-                    elif new_entry is False:
-                        sql = """
-                            UPDATE user_directory_search
-                            SET vector = setweight(to_tsvector('simple', ?), 'A')
-                                || setweight(to_tsvector('simple', ?), 'D')
-                                || setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
-                            WHERE user_id = ?
-                        """
-                        txn.execute(
-                            sql,
-                            (
-                                get_localpart_from_id(user_id),
-                                get_domain_from_id(user_id),
-                                display_name,
-                                user_id,
-                            ),
-                        )
-                    else:
-                        raise RuntimeError(
-                            "upsert returned None when 'can_native_upsert' is False"
-                        )
+                txn.execute(
+                    sql,
+                    (
+                        user_id,
+                        get_localpart_from_id(user_id),
+                        get_domain_from_id(user_id),
+                        display_name,
+                    ),
+                )
             elif isinstance(self.database_engine, Sqlite3Engine):
                 value = "%s %s" % (user_id, display_name) if display_name else user_id
                 self.db_pool.simple_upsert_txn(