summary refs log tree commit diff
path: root/synapse/storage/pusher.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/pusher.py')
-rw-r--r--synapse/storage/pusher.py187
1 files changed, 145 insertions, 42 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 87b2ac5773..a7d7c54d7e 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -18,6 +18,8 @@ from twisted.internet import defer
 
 from canonicaljson import encode_canonical_json
 
+from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
+
 import logging
 import simplejson as json
 import types
@@ -48,23 +50,46 @@ class PusherStore(SQLBaseStore):
         return rows
 
     @defer.inlineCallbacks
-    def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
-        def r(txn):
-            sql = (
-                "SELECT * FROM pushers"
-                " WHERE app_id = ? AND pushkey = ?"
-            )
+    def user_has_pusher(self, user_id):
+        ret = yield self._simple_select_one_onecol(
+            "pushers", {"user_name": user_id}, "id", allow_none=True
+        )
+        defer.returnValue(ret is not None)
 
-            txn.execute(sql, (app_id, pushkey,))
-            rows = self.cursor_to_dict(txn)
+    def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
+        return self.get_pushers_by({
+            "app_id": app_id,
+            "pushkey": pushkey,
+        })
 
-            return self._decode_pushers_rows(rows)
+    def get_pushers_by_user_id(self, user_id):
+        return self.get_pushers_by({
+            "user_name": user_id,
+        })
 
-        rows = yield self.runInteraction(
-            "get_pushers_by_app_id_and_pushkey", r
+    @defer.inlineCallbacks
+    def get_pushers_by(self, keyvalues):
+        ret = yield self._simple_select_list(
+            "pushers", keyvalues,
+            [
+                "id",
+                "user_name",
+                "access_token",
+                "profile_tag",
+                "kind",
+                "app_id",
+                "app_display_name",
+                "device_display_name",
+                "pushkey",
+                "ts",
+                "lang",
+                "data",
+                "last_stream_ordering",
+                "last_success",
+                "failing_since",
+            ], desc="get_pushers_by"
         )
-
-        defer.returnValue(rows)
+        defer.returnValue(self._decode_pushers_rows(ret))
 
     @defer.inlineCallbacks
     def get_all_pushers(self):
@@ -78,9 +103,12 @@ class PusherStore(SQLBaseStore):
         defer.returnValue(rows)
 
     def get_pushers_stream_token(self):
-        return self._pushers_id_gen.get_max_token()
+        return self._pushers_id_gen.get_current_token()
 
     def get_all_updated_pushers(self, last_id, current_id, limit):
+        if last_id == current_id:
+            return defer.succeed(([], []))
+
         def get_all_updated_pushers_txn(txn):
             sql = (
                 "SELECT id, user_name, access_token, profile_tag, kind,"
@@ -107,35 +135,76 @@ class PusherStore(SQLBaseStore):
             "get_all_updated_pushers", get_all_updated_pushers_txn
         )
 
+    @cachedInlineCallbacks(lru=True, num_args=1, max_entries=15000)
+    def get_if_user_has_pusher(self, user_id):
+        result = yield self._simple_select_many_batch(
+            table='pushers',
+            keyvalues={
+                'user_name': 'user_id',
+            },
+            retcol='user_name',
+            desc='get_if_user_has_pusher',
+            allow_none=True,
+        )
+
+        defer.returnValue(bool(result))
+
+    @cachedList(cached_method_name="get_if_user_has_pusher",
+                list_name="user_ids", num_args=1, inlineCallbacks=True)
+    def get_if_users_have_pushers(self, user_ids):
+        rows = yield self._simple_select_many_batch(
+            table='pushers',
+            column='user_name',
+            iterable=user_ids,
+            retcols=['user_name'],
+            desc='get_if_users_have_pushers'
+        )
+
+        result = {user_id: False for user_id in user_ids}
+        result.update({r['user_name']: True for r in rows})
+
+        defer.returnValue(result)
+
     @defer.inlineCallbacks
     def add_pusher(self, user_id, access_token, kind, app_id,
                    app_display_name, device_display_name,
-                   pushkey, pushkey_ts, lang, data, profile_tag=""):
+                   pushkey, pushkey_ts, lang, data, last_stream_ordering,
+                   profile_tag=""):
         with self._pushers_id_gen.get_next() as stream_id:
-            yield self._simple_upsert(
-                "pushers",
-                dict(
-                    app_id=app_id,
-                    pushkey=pushkey,
-                    user_name=user_id,
-                ),
-                dict(
-                    access_token=access_token,
-                    kind=kind,
-                    app_display_name=app_display_name,
-                    device_display_name=device_display_name,
-                    ts=pushkey_ts,
-                    lang=lang,
-                    data=encode_canonical_json(data),
-                    profile_tag=profile_tag,
-                    id=stream_id,
-                ),
-                desc="add_pusher",
-            )
+            def f(txn):
+                newly_inserted = self._simple_upsert_txn(
+                    txn,
+                    "pushers",
+                    {
+                        "app_id": app_id,
+                        "pushkey": pushkey,
+                        "user_name": user_id,
+                    },
+                    {
+                        "access_token": access_token,
+                        "kind": kind,
+                        "app_display_name": app_display_name,
+                        "device_display_name": device_display_name,
+                        "ts": pushkey_ts,
+                        "lang": lang,
+                        "data": encode_canonical_json(data),
+                        "last_stream_ordering": last_stream_ordering,
+                        "profile_tag": profile_tag,
+                        "id": stream_id,
+                    },
+                )
+                if newly_inserted:
+                    # get_if_user_has_pusher only cares if the user has
+                    # at least *one* pusher.
+                    txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+
+            yield self.runInteraction("add_pusher", f)
 
     @defer.inlineCallbacks
     def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
         def delete_pusher_txn(txn, stream_id):
+            txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+
             self._simple_delete_one_txn(
                 txn,
                 "pushers",
@@ -147,28 +216,35 @@ class PusherStore(SQLBaseStore):
                 {"app_id": app_id, "pushkey": pushkey, "user_id": user_id},
                 {"stream_id": stream_id},
             )
+
         with self._pushers_id_gen.get_next() as stream_id:
             yield self.runInteraction(
                 "delete_pusher", delete_pusher_txn, stream_id
             )
 
     @defer.inlineCallbacks
-    def update_pusher_last_token(self, app_id, pushkey, user_id, last_token):
+    def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id,
+                                           last_stream_ordering):
         yield self._simple_update_one(
             "pushers",
             {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
-            {'last_token': last_token},
-            desc="update_pusher_last_token",
+            {'last_stream_ordering': last_stream_ordering},
+            desc="update_pusher_last_stream_ordering",
         )
 
     @defer.inlineCallbacks
-    def update_pusher_last_token_and_success(self, app_id, pushkey, user_id,
-                                             last_token, last_success):
+    def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey,
+                                                       user_id,
+                                                       last_stream_ordering,
+                                                       last_success):
         yield self._simple_update_one(
             "pushers",
             {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
-            {'last_token': last_token, 'last_success': last_success},
-            desc="update_pusher_last_token_and_success",
+            {
+                'last_stream_ordering': last_stream_ordering,
+                'last_success': last_success
+            },
+            desc="update_pusher_last_stream_ordering_and_success",
         )
 
     @defer.inlineCallbacks
@@ -180,3 +256,30 @@ class PusherStore(SQLBaseStore):
             {'failing_since': failing_since},
             desc="update_pusher_failing_since",
         )
+
+    @defer.inlineCallbacks
+    def get_throttle_params_by_room(self, pusher_id):
+        res = yield self._simple_select_list(
+            "pusher_throttle",
+            {"pusher": pusher_id},
+            ["room_id", "last_sent_ts", "throttle_ms"],
+            desc="get_throttle_params_by_room"
+        )
+
+        params_by_room = {}
+        for row in res:
+            params_by_room[row["room_id"]] = {
+                "last_sent_ts": row["last_sent_ts"],
+                "throttle_ms": row["throttle_ms"]
+            }
+
+        defer.returnValue(params_by_room)
+
+    @defer.inlineCallbacks
+    def set_throttle_params(self, pusher_id, room_id, params):
+        yield self._simple_upsert(
+            "pusher_throttle",
+            {"pusher": pusher_id, "room_id": room_id},
+            params,
+            desc="set_throttle_params"
+        )