diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 34d2f82b7f..3d8b4d5d5b 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -204,34 +204,35 @@ class PusherStore(SQLBaseStore):
pushkey, pushkey_ts, lang, data, last_stream_ordering,
profile_tag=""):
with self._pushers_id_gen.get_next() as stream_id:
- def f(txn):
- newly_inserted = self._simple_upsert_txn(
- txn,
- "pushers",
- {
- "app_id": app_id,
- "pushkey": pushkey,
- "user_name": user_id,
- },
- {
- "access_token": access_token,
- "kind": kind,
- "app_display_name": app_display_name,
- "device_display_name": device_display_name,
- "ts": pushkey_ts,
- "lang": lang,
- "data": encode_canonical_json(data),
- "last_stream_ordering": last_stream_ordering,
- "profile_tag": profile_tag,
- "id": stream_id,
- },
- )
- if newly_inserted:
- # get_if_user_has_pusher only cares if the user has
- # at least *one* pusher.
- txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+ # no need to lock because `pushers` has a unique key on
+ # (app_id, pushkey, user_name) so _simple_upsert will retry
+ newly_inserted = yield self._simple_upsert(
+ table="pushers",
+ keyvalues={
+ "app_id": app_id,
+ "pushkey": pushkey,
+ "user_name": user_id,
+ },
+ values={
+ "access_token": access_token,
+ "kind": kind,
+ "app_display_name": app_display_name,
+ "device_display_name": device_display_name,
+ "ts": pushkey_ts,
+ "lang": lang,
+ "data": encode_canonical_json(data),
+ "last_stream_ordering": last_stream_ordering,
+ "profile_tag": profile_tag,
+ "id": stream_id,
+ },
+ desc="add_pusher",
+ lock=False,
+ )
- yield self.runInteraction("add_pusher", f)
+ if newly_inserted:
+ # get_if_user_has_pusher only cares if the user has
+ # at least *one* pusher.
+ self.get_if_user_has_pusher.invalidate(user_id,)
@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
@@ -243,11 +244,19 @@ class PusherStore(SQLBaseStore):
"pushers",
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id}
)
- self._simple_upsert_txn(
+
+ # it's possible for us to end up with duplicate rows for
+ # (app_id, pushkey, user_id) at different stream_ids, but that
+ # doesn't really matter.
+ self._simple_insert_txn(
txn,
- "deleted_pushers",
- {"app_id": app_id, "pushkey": pushkey, "user_id": user_id},
- {"stream_id": stream_id},
+ table="deleted_pushers",
+ values={
+ "stream_id": stream_id,
+ "app_id": app_id,
+ "pushkey": pushkey,
+ "user_id": user_id,
+ },
)
with self._pushers_id_gen.get_next() as stream_id:
@@ -310,9 +319,12 @@ class PusherStore(SQLBaseStore):
@defer.inlineCallbacks
def set_throttle_params(self, pusher_id, room_id, params):
+ # no need to lock because `pusher_throttle` has a primary key on
+ # (pusher, room_id) so _simple_upsert will retry
yield self._simple_upsert(
"pusher_throttle",
{"pusher": pusher_id, "room_id": room_id},
params,
- desc="set_throttle_params"
+ desc="set_throttle_params",
+ lock=False,
)
|