diff options
Diffstat (limited to 'synapse/storage/pusher.py')
-rw-r--r-- | synapse/storage/pusher.py | 187 |
1 files changed, 145 insertions, 42 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 87b2ac5773..a7d7c54d7e 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -18,6 +18,8 @@ from twisted.internet import defer from canonicaljson import encode_canonical_json +from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList + import logging import simplejson as json import types @@ -48,23 +50,46 @@ class PusherStore(SQLBaseStore): return rows @defer.inlineCallbacks - def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey): - def r(txn): - sql = ( - "SELECT * FROM pushers" - " WHERE app_id = ? AND pushkey = ?" - ) + def user_has_pusher(self, user_id): + ret = yield self._simple_select_one_onecol( + "pushers", {"user_name": user_id}, "id", allow_none=True + ) + defer.returnValue(ret is not None) - txn.execute(sql, (app_id, pushkey,)) - rows = self.cursor_to_dict(txn) + def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey): + return self.get_pushers_by({ + "app_id": app_id, + "pushkey": pushkey, + }) - return self._decode_pushers_rows(rows) + def get_pushers_by_user_id(self, user_id): + return self.get_pushers_by({ + "user_name": user_id, + }) - rows = yield self.runInteraction( - "get_pushers_by_app_id_and_pushkey", r + @defer.inlineCallbacks + def get_pushers_by(self, keyvalues): + ret = yield self._simple_select_list( + "pushers", keyvalues, + [ + "id", + "user_name", + "access_token", + "profile_tag", + "kind", + "app_id", + "app_display_name", + "device_display_name", + "pushkey", + "ts", + "lang", + "data", + "last_stream_ordering", + "last_success", + "failing_since", + ], desc="get_pushers_by" ) - - defer.returnValue(rows) + defer.returnValue(self._decode_pushers_rows(ret)) @defer.inlineCallbacks def get_all_pushers(self): @@ -78,9 +103,12 @@ class PusherStore(SQLBaseStore): defer.returnValue(rows) def get_pushers_stream_token(self): - return self._pushers_id_gen.get_max_token() + return self._pushers_id_gen.get_current_token() def get_all_updated_pushers(self, last_id, current_id, limit): + if last_id == current_id: + return defer.succeed(([], [])) + def get_all_updated_pushers_txn(txn): sql = ( "SELECT id, user_name, access_token, profile_tag, kind," @@ -107,35 +135,76 @@ class PusherStore(SQLBaseStore): "get_all_updated_pushers", get_all_updated_pushers_txn ) + @cachedInlineCallbacks(lru=True, num_args=1, max_entries=15000) + def get_if_user_has_pusher(self, user_id): + result = yield self._simple_select_many_batch( + table='pushers', + keyvalues={ + 'user_name': 'user_id', + }, + retcol='user_name', + desc='get_if_user_has_pusher', + allow_none=True, + ) + + defer.returnValue(bool(result)) + + @cachedList(cached_method_name="get_if_user_has_pusher", + list_name="user_ids", num_args=1, inlineCallbacks=True) + def get_if_users_have_pushers(self, user_ids): + rows = yield self._simple_select_many_batch( + table='pushers', + column='user_name', + iterable=user_ids, + retcols=['user_name'], + desc='get_if_users_have_pushers' + ) + + result = {user_id: False for user_id in user_ids} + result.update({r['user_name']: True for r in rows}) + + defer.returnValue(result) + @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, app_display_name, device_display_name, - pushkey, pushkey_ts, lang, data, profile_tag=""): + pushkey, pushkey_ts, lang, data, last_stream_ordering, + profile_tag=""): with self._pushers_id_gen.get_next() as stream_id: - yield self._simple_upsert( - "pushers", - dict( - app_id=app_id, - pushkey=pushkey, - user_name=user_id, - ), - dict( - access_token=access_token, - kind=kind, - app_display_name=app_display_name, - device_display_name=device_display_name, - ts=pushkey_ts, - lang=lang, - data=encode_canonical_json(data), - profile_tag=profile_tag, - id=stream_id, - ), - desc="add_pusher", - ) + def f(txn): + newly_inserted = self._simple_upsert_txn( + txn, + "pushers", + { + "app_id": app_id, + "pushkey": pushkey, + "user_name": user_id, + }, + { + "access_token": access_token, + "kind": kind, + "app_display_name": app_display_name, + "device_display_name": device_display_name, + "ts": pushkey_ts, + "lang": lang, + "data": encode_canonical_json(data), + "last_stream_ordering": last_stream_ordering, + "profile_tag": profile_tag, + "id": stream_id, + }, + ) + if newly_inserted: + # get_if_user_has_pusher only cares if the user has + # at least *one* pusher. + txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,)) + + yield self.runInteraction("add_pusher", f) @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): def delete_pusher_txn(txn, stream_id): + txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,)) + self._simple_delete_one_txn( txn, "pushers", @@ -147,28 +216,35 @@ class PusherStore(SQLBaseStore): {"app_id": app_id, "pushkey": pushkey, "user_id": user_id}, {"stream_id": stream_id}, ) + with self._pushers_id_gen.get_next() as stream_id: yield self.runInteraction( "delete_pusher", delete_pusher_txn, stream_id ) @defer.inlineCallbacks - def update_pusher_last_token(self, app_id, pushkey, user_id, last_token): + def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id, + last_stream_ordering): yield self._simple_update_one( "pushers", {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, - {'last_token': last_token}, - desc="update_pusher_last_token", + {'last_stream_ordering': last_stream_ordering}, + desc="update_pusher_last_stream_ordering", ) @defer.inlineCallbacks - def update_pusher_last_token_and_success(self, app_id, pushkey, user_id, - last_token, last_success): + def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey, + user_id, + last_stream_ordering, + last_success): yield self._simple_update_one( "pushers", {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, - {'last_token': last_token, 'last_success': last_success}, - desc="update_pusher_last_token_and_success", + { + 'last_stream_ordering': last_stream_ordering, + 'last_success': last_success + }, + desc="update_pusher_last_stream_ordering_and_success", ) @defer.inlineCallbacks @@ -180,3 +256,30 @@ class PusherStore(SQLBaseStore): {'failing_since': failing_since}, desc="update_pusher_failing_since", ) + + @defer.inlineCallbacks + def get_throttle_params_by_room(self, pusher_id): + res = yield self._simple_select_list( + "pusher_throttle", + {"pusher": pusher_id}, + ["room_id", "last_sent_ts", "throttle_ms"], + desc="get_throttle_params_by_room" + ) + + params_by_room = {} + for row in res: + params_by_room[row["room_id"]] = { + "last_sent_ts": row["last_sent_ts"], + "throttle_ms": row["throttle_ms"] + } + + defer.returnValue(params_by_room) + + @defer.inlineCallbacks + def set_throttle_params(self, pusher_id, room_id, params): + yield self._simple_upsert( + "pusher_throttle", + {"pusher": pusher_id, "room_id": room_id}, + params, + desc="set_throttle_params" + ) |