diff options
Diffstat (limited to 'synapse/storage/pusher.py')
-rw-r--r-- | synapse/storage/pusher.py | 167 |
1 files changed, 73 insertions, 94 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 587dada68f..08ea62681b 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -13,162 +13,141 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections - from ._base import SQLBaseStore, Table from twisted.internet import defer from synapse.api.errors import StoreError +from syutil.jsonutil import encode_canonical_json + import logging +import simplejson as json +import types logger = logging.getLogger(__name__) class PusherStore(SQLBaseStore): + def _decode_pushers_rows(self, rows): + for r in rows: + dataJson = r['data'] + r['data'] = None + try: + if isinstance(dataJson, types.BufferType): + dataJson = str(dataJson).decode("UTF8") + + r['data'] = json.loads(dataJson) + except Exception as e: + logger.warn( + "Invalid JSON in data for pusher %d: %s, %s", + r['id'], dataJson, e.message, + ) + pass + + if isinstance(r['pushkey'], types.BufferType): + r['pushkey'] = str(r['pushkey']).decode("UTF8") + + return rows + @defer.inlineCallbacks - def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey): - sql = ( - "SELECT id, user_name, kind, profile_tag, app_id," - "app_display_name, device_display_name, pushkey, ts, data, " - "last_token, last_success, failing_since " - "FROM pushers " - "WHERE app_id = ? AND pushkey = ?" - ) + def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey): + def r(txn): + sql = ( + "SELECT * FROM pushers" + " WHERE app_id = ? AND pushkey = ?" + ) - rows = yield self._execute( - "get_pushers_by_app_id_and_pushkey", None, sql, - app_id_and_pushkey[0], app_id_and_pushkey[1] + txn.execute(sql, (app_id, pushkey,)) + rows = self.cursor_to_dict(txn) + + return self._decode_pushers_rows(rows) + + rows = yield self.runInteraction( + "get_pushers_by_app_id_and_pushkey", r ) - ret = [ - { - "id": r[0], - "user_name": r[1], - "kind": r[2], - "profile_tag": r[3], - "app_id": r[4], - "app_display_name": r[5], - "device_display_name": r[6], - "pushkey": r[7], - "pushkey_ts": r[8], - "data": r[9], - "last_token": r[10], - "last_success": r[11], - "failing_since": r[12] - } - for r in rows - ] - - defer.returnValue(ret[0]) + defer.returnValue(rows) @defer.inlineCallbacks def get_all_pushers(self): - sql = ( - "SELECT id, user_name, kind, profile_tag, app_id," - "app_display_name, device_display_name, pushkey, ts, data, " - "last_token, last_success, failing_since " - "FROM pushers" - ) + def get_pushers(txn): + txn.execute("SELECT * FROM pushers") + rows = self.cursor_to_dict(txn) - rows = yield self._execute("get_all_pushers", None, sql) - - ret = [ - { - "id": r[0], - "user_name": r[1], - "kind": r[2], - "profile_tag": r[3], - "app_id": r[4], - "app_display_name": r[5], - "device_display_name": r[6], - "pushkey": r[7], - "pushkey_ts": r[8], - "data": r[9], - "last_token": r[10], - "last_success": r[11], - "failing_since": r[12] - } - for r in rows - ] - - defer.returnValue(ret) + return self._decode_pushers_rows(rows) + + rows = yield self.runInteraction("get_all_pushers", get_pushers) + defer.returnValue(rows) @defer.inlineCallbacks - def add_pusher(self, user_name, profile_tag, kind, app_id, + def add_pusher(self, user_name, access_token, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, pushkey_ts, lang, data): try: + next_id = yield self._pushers_id_gen.get_next() yield self._simple_upsert( PushersTable.table_name, dict( app_id=app_id, pushkey=pushkey, + user_name=user_name, ), dict( - user_name=user_name, + access_token=access_token, kind=kind, profile_tag=profile_tag, app_display_name=app_display_name, device_display_name=device_display_name, ts=pushkey_ts, lang=lang, - data=data - )) + data=encode_canonical_json(data), + ), + insertion_values=dict( + id=next_id, + ), + desc="add_pusher", + ) except Exception as e: logger.error("create_pusher with failed: %s", e) raise StoreError(500, "Problem creating pusher.") @defer.inlineCallbacks - def delete_pusher_by_app_id_pushkey(self, app_id, pushkey): + def delete_pusher_by_app_id_pushkey_user_name(self, app_id, pushkey, user_name): yield self._simple_delete_one( PushersTable.table_name, - dict(app_id=app_id, pushkey=pushkey) + {"app_id": app_id, "pushkey": pushkey, 'user_name': user_name}, + desc="delete_pusher_by_app_id_pushkey_user_name", ) @defer.inlineCallbacks - def update_pusher_last_token(self, app_id, pushkey, last_token): + def update_pusher_last_token(self, app_id, pushkey, user_name, last_token): yield self._simple_update_one( PushersTable.table_name, - {'app_id': app_id, 'pushkey': pushkey}, - {'last_token': last_token} + {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name}, + {'last_token': last_token}, + desc="update_pusher_last_token", ) @defer.inlineCallbacks - def update_pusher_last_token_and_success(self, app_id, pushkey, + def update_pusher_last_token_and_success(self, app_id, pushkey, user_name, last_token, last_success): yield self._simple_update_one( PushersTable.table_name, - {'app_id': app_id, 'pushkey': pushkey}, - {'last_token': last_token, 'last_success': last_success} + {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name}, + {'last_token': last_token, 'last_success': last_success}, + desc="update_pusher_last_token_and_success", ) @defer.inlineCallbacks - def update_pusher_failing_since(self, app_id, pushkey, failing_since): + def update_pusher_failing_since(self, app_id, pushkey, user_name, + failing_since): yield self._simple_update_one( PushersTable.table_name, - {'app_id': app_id, 'pushkey': pushkey}, - {'failing_since': failing_since} + {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name}, + {'failing_since': failing_since}, + desc="update_pusher_failing_since", ) class PushersTable(Table): table_name = "pushers" - - fields = [ - "id", - "user_name", - "kind", - "profile_tag", - "app_id", - "app_display_name", - "device_display_name", - "pushkey", - "pushkey_ts", - "data", - "last_token", - "last_success", - "failing_since" - ] - - EntryType = collections.namedtuple("PusherEntry", fields) |