diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 87b2ac5773..a7d7c54d7e 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -18,6 +18,8 @@ from twisted.internet import defer
from canonicaljson import encode_canonical_json
+from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
+
import logging
import simplejson as json
import types
@@ -48,23 +50,46 @@ class PusherStore(SQLBaseStore):
return rows
@defer.inlineCallbacks
- def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
- def r(txn):
- sql = (
- "SELECT * FROM pushers"
- " WHERE app_id = ? AND pushkey = ?"
- )
+ def user_has_pusher(self, user_id):
+ ret = yield self._simple_select_one_onecol(
+ "pushers", {"user_name": user_id}, "id", allow_none=True
+ )
+ defer.returnValue(ret is not None)
- txn.execute(sql, (app_id, pushkey,))
- rows = self.cursor_to_dict(txn)
+ def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
+ return self.get_pushers_by({
+ "app_id": app_id,
+ "pushkey": pushkey,
+ })
- return self._decode_pushers_rows(rows)
+ def get_pushers_by_user_id(self, user_id):
+ return self.get_pushers_by({
+ "user_name": user_id,
+ })
- rows = yield self.runInteraction(
- "get_pushers_by_app_id_and_pushkey", r
+ @defer.inlineCallbacks
+ def get_pushers_by(self, keyvalues):
+ ret = yield self._simple_select_list(
+ "pushers", keyvalues,
+ [
+ "id",
+ "user_name",
+ "access_token",
+ "profile_tag",
+ "kind",
+ "app_id",
+ "app_display_name",
+ "device_display_name",
+ "pushkey",
+ "ts",
+ "lang",
+ "data",
+ "last_stream_ordering",
+ "last_success",
+ "failing_since",
+ ], desc="get_pushers_by"
)
-
- defer.returnValue(rows)
+ defer.returnValue(self._decode_pushers_rows(ret))
@defer.inlineCallbacks
def get_all_pushers(self):
@@ -78,9 +103,12 @@ class PusherStore(SQLBaseStore):
defer.returnValue(rows)
def get_pushers_stream_token(self):
- return self._pushers_id_gen.get_max_token()
+ return self._pushers_id_gen.get_current_token()
def get_all_updated_pushers(self, last_id, current_id, limit):
+ if last_id == current_id:
+ return defer.succeed(([], []))
+
def get_all_updated_pushers_txn(txn):
sql = (
"SELECT id, user_name, access_token, profile_tag, kind,"
@@ -107,35 +135,76 @@ class PusherStore(SQLBaseStore):
"get_all_updated_pushers", get_all_updated_pushers_txn
)
+ @cachedInlineCallbacks(lru=True, num_args=1, max_entries=15000)
+ def get_if_user_has_pusher(self, user_id):
+ result = yield self._simple_select_many_batch(
+ table='pushers',
+ keyvalues={
+ 'user_name': 'user_id',
+ },
+ retcol='user_name',
+ desc='get_if_user_has_pusher',
+ allow_none=True,
+ )
+
+ defer.returnValue(bool(result))
+
+ @cachedList(cached_method_name="get_if_user_has_pusher",
+ list_name="user_ids", num_args=1, inlineCallbacks=True)
+ def get_if_users_have_pushers(self, user_ids):
+ rows = yield self._simple_select_many_batch(
+ table='pushers',
+ column='user_name',
+ iterable=user_ids,
+ retcols=['user_name'],
+ desc='get_if_users_have_pushers'
+ )
+
+ result = {user_id: False for user_id in user_ids}
+ result.update({r['user_name']: True for r in rows})
+
+ defer.returnValue(result)
+
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
app_display_name, device_display_name,
- pushkey, pushkey_ts, lang, data, profile_tag=""):
+ pushkey, pushkey_ts, lang, data, last_stream_ordering,
+ profile_tag=""):
with self._pushers_id_gen.get_next() as stream_id:
- yield self._simple_upsert(
- "pushers",
- dict(
- app_id=app_id,
- pushkey=pushkey,
- user_name=user_id,
- ),
- dict(
- access_token=access_token,
- kind=kind,
- app_display_name=app_display_name,
- device_display_name=device_display_name,
- ts=pushkey_ts,
- lang=lang,
- data=encode_canonical_json(data),
- profile_tag=profile_tag,
- id=stream_id,
- ),
- desc="add_pusher",
- )
+ def f(txn):
+ newly_inserted = self._simple_upsert_txn(
+ txn,
+ "pushers",
+ {
+ "app_id": app_id,
+ "pushkey": pushkey,
+ "user_name": user_id,
+ },
+ {
+ "access_token": access_token,
+ "kind": kind,
+ "app_display_name": app_display_name,
+ "device_display_name": device_display_name,
+ "ts": pushkey_ts,
+ "lang": lang,
+ "data": encode_canonical_json(data),
+ "last_stream_ordering": last_stream_ordering,
+ "profile_tag": profile_tag,
+ "id": stream_id,
+ },
+ )
+ if newly_inserted:
+ # get_if_user_has_pusher only cares if the user has
+ # at least *one* pusher.
+ txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+
+ yield self.runInteraction("add_pusher", f)
@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
def delete_pusher_txn(txn, stream_id):
+ txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+
self._simple_delete_one_txn(
txn,
"pushers",
@@ -147,28 +216,35 @@ class PusherStore(SQLBaseStore):
{"app_id": app_id, "pushkey": pushkey, "user_id": user_id},
{"stream_id": stream_id},
)
+
with self._pushers_id_gen.get_next() as stream_id:
yield self.runInteraction(
"delete_pusher", delete_pusher_txn, stream_id
)
@defer.inlineCallbacks
- def update_pusher_last_token(self, app_id, pushkey, user_id, last_token):
+ def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id,
+ last_stream_ordering):
yield self._simple_update_one(
"pushers",
{'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
- {'last_token': last_token},
- desc="update_pusher_last_token",
+ {'last_stream_ordering': last_stream_ordering},
+ desc="update_pusher_last_stream_ordering",
)
@defer.inlineCallbacks
- def update_pusher_last_token_and_success(self, app_id, pushkey, user_id,
- last_token, last_success):
+ def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey,
+ user_id,
+ last_stream_ordering,
+ last_success):
yield self._simple_update_one(
"pushers",
{'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
- {'last_token': last_token, 'last_success': last_success},
- desc="update_pusher_last_token_and_success",
+ {
+ 'last_stream_ordering': last_stream_ordering,
+ 'last_success': last_success
+ },
+ desc="update_pusher_last_stream_ordering_and_success",
)
@defer.inlineCallbacks
@@ -180,3 +256,30 @@ class PusherStore(SQLBaseStore):
{'failing_since': failing_since},
desc="update_pusher_failing_since",
)
+
+ @defer.inlineCallbacks
+ def get_throttle_params_by_room(self, pusher_id):
+ res = yield self._simple_select_list(
+ "pusher_throttle",
+ {"pusher": pusher_id},
+ ["room_id", "last_sent_ts", "throttle_ms"],
+ desc="get_throttle_params_by_room"
+ )
+
+ params_by_room = {}
+ for row in res:
+ params_by_room[row["room_id"]] = {
+ "last_sent_ts": row["last_sent_ts"],
+ "throttle_ms": row["throttle_ms"]
+ }
+
+ defer.returnValue(params_by_room)
+
+ @defer.inlineCallbacks
+ def set_throttle_params(self, pusher_id, room_id, params):
+ yield self._simple_upsert(
+ "pusher_throttle",
+ {"pusher": pusher_id, "room_id": room_id},
+ params,
+ desc="set_throttle_params"
+ )
|