summary refs log tree commit diff
path: root/synapse/storage/pusher.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/pusher.py')
-rw-r--r--synapse/storage/pusher.py105
1 files changed, 62 insertions, 43 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 34d2f82b7f..8443bd4c1b 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,21 +14,21 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore
-from twisted.internet import defer
+import logging
+import types
+
+from canonicaljson import encode_canonical_json, json
 
-from canonicaljson import encode_canonical_json
+from twisted.internet import defer
 
 from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
 
-import logging
-import simplejson as json
-import types
+from ._base import SQLBaseStore
 
 logger = logging.getLogger(__name__)
 
 
-class PusherStore(SQLBaseStore):
+class PusherWorkerStore(SQLBaseStore):
     def _decode_pushers_rows(self, rows):
         for r in rows:
             dataJson = r['data']
@@ -102,9 +103,6 @@ class PusherStore(SQLBaseStore):
         rows = yield self.runInteraction("get_all_pushers", get_pushers)
         defer.returnValue(rows)
 
-    def get_pushers_stream_token(self):
-        return self._pushers_id_gen.get_current_token()
-
     def get_all_updated_pushers(self, last_id, current_id, limit):
         if last_id == current_id:
             return defer.succeed(([], []))
@@ -198,56 +196,74 @@ class PusherStore(SQLBaseStore):
 
         defer.returnValue(result)
 
+
+class PusherStore(PusherWorkerStore):
+    def get_pushers_stream_token(self):
+        return self._pushers_id_gen.get_current_token()
+
     @defer.inlineCallbacks
     def add_pusher(self, user_id, access_token, kind, app_id,
                    app_display_name, device_display_name,
                    pushkey, pushkey_ts, lang, data, last_stream_ordering,
                    profile_tag=""):
         with self._pushers_id_gen.get_next() as stream_id:
-            def f(txn):
-                newly_inserted = self._simple_upsert_txn(
-                    txn,
-                    "pushers",
-                    {
-                        "app_id": app_id,
-                        "pushkey": pushkey,
-                        "user_name": user_id,
-                    },
-                    {
-                        "access_token": access_token,
-                        "kind": kind,
-                        "app_display_name": app_display_name,
-                        "device_display_name": device_display_name,
-                        "ts": pushkey_ts,
-                        "lang": lang,
-                        "data": encode_canonical_json(data),
-                        "last_stream_ordering": last_stream_ordering,
-                        "profile_tag": profile_tag,
-                        "id": stream_id,
-                    },
-                )
-                if newly_inserted:
-                    # get_if_user_has_pusher only cares if the user has
-                    # at least *one* pusher.
-                    txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+            # no need to lock because `pushers` has a unique key on
+            # (app_id, pushkey, user_name) so _simple_upsert will retry
+            newly_inserted = yield self._simple_upsert(
+                table="pushers",
+                keyvalues={
+                    "app_id": app_id,
+                    "pushkey": pushkey,
+                    "user_name": user_id,
+                },
+                values={
+                    "access_token": access_token,
+                    "kind": kind,
+                    "app_display_name": app_display_name,
+                    "device_display_name": device_display_name,
+                    "ts": pushkey_ts,
+                    "lang": lang,
+                    "data": encode_canonical_json(data),
+                    "last_stream_ordering": last_stream_ordering,
+                    "profile_tag": profile_tag,
+                    "id": stream_id,
+                },
+                desc="add_pusher",
+                lock=False,
+            )
 
-            yield self.runInteraction("add_pusher", f)
+            if newly_inserted:
+                yield self.runInteraction(
+                    "add_pusher",
+                    self._invalidate_cache_and_stream,
+                    self.get_if_user_has_pusher, (user_id,)
+                )
 
     @defer.inlineCallbacks
     def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
         def delete_pusher_txn(txn, stream_id):
-            txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+            self._invalidate_cache_and_stream(
+                txn, self.get_if_user_has_pusher, (user_id,)
+            )
 
             self._simple_delete_one_txn(
                 txn,
                 "pushers",
                 {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}
             )
-            self._simple_upsert_txn(
+
+            # it's possible for us to end up with duplicate rows for
+            # (app_id, pushkey, user_id) at different stream_ids, but that
+            # doesn't really matter.
+            self._simple_insert_txn(
                 txn,
-                "deleted_pushers",
-                {"app_id": app_id, "pushkey": pushkey, "user_id": user_id},
-                {"stream_id": stream_id},
+                table="deleted_pushers",
+                values={
+                    "stream_id": stream_id,
+                    "app_id": app_id,
+                    "pushkey": pushkey,
+                    "user_id": user_id,
+                },
             )
 
         with self._pushers_id_gen.get_next() as stream_id:
@@ -310,9 +326,12 @@ class PusherStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def set_throttle_params(self, pusher_id, room_id, params):
+        # no need to lock because `pusher_throttle` has a primary key on
+        # (pusher, room_id) so _simple_upsert will retry
         yield self._simple_upsert(
             "pusher_throttle",
             {"pusher": pusher_id, "room_id": room_id},
             params,
-            desc="set_throttle_params"
+            desc="set_throttle_params",
+            lock=False,
         )