summary refs log tree commit diff
path: root/synapse/storage/pusher.py
diff options
context:
space:
mode:
authorRichard van der Hoff <github@rvanderhoff.org.uk>2017-11-16 16:27:29 +0000
committerGitHub <noreply@github.com>2017-11-16 16:27:29 +0000
commitba05f28ae72512d4196fca01e311dae6b08d51f3 (patch)
treea8d550b51e468e2d316185cb069b76880713317a /synapse/storage/pusher.py
parentMerge pull request #2661 from matrix-org/rav/statereadstore (diff)
parentFix broken ref to IntegrityError (diff)
downloadsynapse-ba05f28ae72512d4196fca01e311dae6b08d51f3.tar.xz
Merge pull request #2684 from matrix-org/rav/unlock_upsert
Start work on avoiding table locks for upserts
Diffstat (limited to 'synapse/storage/pusher.py')
-rw-r--r--synapse/storage/pusher.py55
1 files changed, 28 insertions, 27 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 34d2f82b7f..19ce41fde9 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -204,34 +204,35 @@ class PusherStore(SQLBaseStore):
                    pushkey, pushkey_ts, lang, data, last_stream_ordering,
                    profile_tag=""):
         with self._pushers_id_gen.get_next() as stream_id:
-            def f(txn):
-                newly_inserted = self._simple_upsert_txn(
-                    txn,
-                    "pushers",
-                    {
-                        "app_id": app_id,
-                        "pushkey": pushkey,
-                        "user_name": user_id,
-                    },
-                    {
-                        "access_token": access_token,
-                        "kind": kind,
-                        "app_display_name": app_display_name,
-                        "device_display_name": device_display_name,
-                        "ts": pushkey_ts,
-                        "lang": lang,
-                        "data": encode_canonical_json(data),
-                        "last_stream_ordering": last_stream_ordering,
-                        "profile_tag": profile_tag,
-                        "id": stream_id,
-                    },
-                )
-                if newly_inserted:
-                    # get_if_user_has_pusher only cares if the user has
-                    # at least *one* pusher.
-                    txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+            # no need to lock because `pushers` has a unique key on
+            # (app_id, pushkey, user_name) so _simple_upsert will retry
+            newly_inserted = yield self._simple_upsert(
+                table="pushers",
+                keyvalues={
+                    "app_id": app_id,
+                    "pushkey": pushkey,
+                    "user_name": user_id,
+                },
+                values={
+                    "access_token": access_token,
+                    "kind": kind,
+                    "app_display_name": app_display_name,
+                    "device_display_name": device_display_name,
+                    "ts": pushkey_ts,
+                    "lang": lang,
+                    "data": encode_canonical_json(data),
+                    "last_stream_ordering": last_stream_ordering,
+                    "profile_tag": profile_tag,
+                    "id": stream_id,
+                },
+                desc="add_pusher",
+                lock=False,
+            )
 
-            yield self.runInteraction("add_pusher", f)
+            if newly_inserted:
+                # get_if_user_has_pusher only cares if the user has
+                # at least *one* pusher.
+                self.get_if_user_has_pusher.invalidate(user_id,)
 
     @defer.inlineCallbacks
     def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):