diff options
Diffstat (limited to 'synapse/storage/pusher.py')
-rw-r--r-- | synapse/storage/pusher.py | 86 |
1 files changed, 48 insertions, 38 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 134297e284..1567e1df48 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -47,7 +47,9 @@ class PusherWorkerStore(SQLBaseStore): except Exception as e: logger.warn( "Invalid JSON in data for pusher %d: %s, %s", - r['id'], dataJson, e.args[0], + r['id'], + dataJson, + e.args[0], ) pass @@ -64,20 +66,16 @@ class PusherWorkerStore(SQLBaseStore): defer.returnValue(ret is not None) def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey): - return self.get_pushers_by({ - "app_id": app_id, - "pushkey": pushkey, - }) + return self.get_pushers_by({"app_id": app_id, "pushkey": pushkey}) def get_pushers_by_user_id(self, user_id): - return self.get_pushers_by({ - "user_name": user_id, - }) + return self.get_pushers_by({"user_name": user_id}) @defer.inlineCallbacks def get_pushers_by(self, keyvalues): ret = yield self._simple_select_list( - "pushers", keyvalues, + "pushers", + keyvalues, [ "id", "user_name", @@ -94,7 +92,8 @@ class PusherWorkerStore(SQLBaseStore): "last_stream_ordering", "last_success", "failing_since", - ], desc="get_pushers_by" + ], + desc="get_pushers_by", ) defer.returnValue(self._decode_pushers_rows(ret)) @@ -135,6 +134,7 @@ class PusherWorkerStore(SQLBaseStore): deleted = txn.fetchall() return (updated, deleted) + return self.runInteraction( "get_all_updated_pushers", get_all_updated_pushers_txn ) @@ -177,6 +177,7 @@ class PusherWorkerStore(SQLBaseStore): results.sort() # Sort so that they're ordered by stream id return results + return self.runInteraction( "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn ) @@ -186,15 +187,19 @@ class PusherWorkerStore(SQLBaseStore): # This only exists for the cachedList decorator raise NotImplementedError() - @cachedList(cached_method_name="get_if_user_has_pusher", - list_name="user_ids", num_args=1, inlineCallbacks=True) + @cachedList( + cached_method_name="get_if_user_has_pusher", + list_name="user_ids", + num_args=1, + inlineCallbacks=True, + ) def get_if_users_have_pushers(self, user_ids): rows = yield self._simple_select_many_batch( table='pushers', column='user_name', iterable=user_ids, retcols=['user_name'], - desc='get_if_users_have_pushers' + desc='get_if_users_have_pushers', ) result = {user_id: False for user_id in user_ids} @@ -208,20 +213,27 @@ class PusherStore(PusherWorkerStore): return self._pushers_id_gen.get_current_token() @defer.inlineCallbacks - def add_pusher(self, user_id, access_token, kind, app_id, - app_display_name, device_display_name, - pushkey, pushkey_ts, lang, data, last_stream_ordering, - profile_tag=""): + def add_pusher( + self, + user_id, + access_token, + kind, + app_id, + app_display_name, + device_display_name, + pushkey, + pushkey_ts, + lang, + data, + last_stream_ordering, + profile_tag="", + ): with self._pushers_id_gen.get_next() as stream_id: # no need to lock because `pushers` has a unique key on # (app_id, pushkey, user_name) so _simple_upsert will retry yield self._simple_upsert( table="pushers", - keyvalues={ - "app_id": app_id, - "pushkey": pushkey, - "user_name": user_id, - }, + keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, values={ "access_token": access_token, "kind": kind, @@ -247,7 +259,8 @@ class PusherStore(PusherWorkerStore): yield self.runInteraction( "add_pusher", self._invalidate_cache_and_stream, - self.get_if_user_has_pusher, (user_id,) + self.get_if_user_has_pusher, + (user_id,), ) @defer.inlineCallbacks @@ -260,7 +273,7 @@ class PusherStore(PusherWorkerStore): self._simple_delete_one_txn( txn, "pushers", - {"app_id": app_id, "pushkey": pushkey, "user_name": user_id} + {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, ) # it's possible for us to end up with duplicate rows for @@ -278,13 +291,12 @@ class PusherStore(PusherWorkerStore): ) with self._pushers_id_gen.get_next() as stream_id: - yield self.runInteraction( - "delete_pusher", delete_pusher_txn, stream_id - ) + yield self.runInteraction("delete_pusher", delete_pusher_txn, stream_id) @defer.inlineCallbacks - def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id, - last_stream_ordering): + def update_pusher_last_stream_ordering( + self, app_id, pushkey, user_id, last_stream_ordering + ): yield self._simple_update_one( "pushers", {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, @@ -293,23 +305,21 @@ class PusherStore(PusherWorkerStore): ) @defer.inlineCallbacks - def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey, - user_id, - last_stream_ordering, - last_success): + def update_pusher_last_stream_ordering_and_success( + self, app_id, pushkey, user_id, last_stream_ordering, last_success + ): yield self._simple_update_one( "pushers", {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, { 'last_stream_ordering': last_stream_ordering, - 'last_success': last_success + 'last_success': last_success, }, desc="update_pusher_last_stream_ordering_and_success", ) @defer.inlineCallbacks - def update_pusher_failing_since(self, app_id, pushkey, user_id, - failing_since): + def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since): yield self._simple_update_one( "pushers", {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, @@ -323,14 +333,14 @@ class PusherStore(PusherWorkerStore): "pusher_throttle", {"pusher": pusher_id}, ["room_id", "last_sent_ts", "throttle_ms"], - desc="get_throttle_params_by_room" + desc="get_throttle_params_by_room", ) params_by_room = {} for row in res: params_by_room[row["room_id"]] = { "last_sent_ts": row["last_sent_ts"], - "throttle_ms": row["throttle_ms"] + "throttle_ms": row["throttle_ms"], } defer.returnValue(params_by_room) |