diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 134297e284..1567e1df48 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -47,7 +47,9 @@ class PusherWorkerStore(SQLBaseStore):
except Exception as e:
logger.warn(
"Invalid JSON in data for pusher %d: %s, %s",
- r['id'], dataJson, e.args[0],
+ r['id'],
+ dataJson,
+ e.args[0],
)
pass
@@ -64,20 +66,16 @@ class PusherWorkerStore(SQLBaseStore):
defer.returnValue(ret is not None)
def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
- return self.get_pushers_by({
- "app_id": app_id,
- "pushkey": pushkey,
- })
+ return self.get_pushers_by({"app_id": app_id, "pushkey": pushkey})
def get_pushers_by_user_id(self, user_id):
- return self.get_pushers_by({
- "user_name": user_id,
- })
+ return self.get_pushers_by({"user_name": user_id})
@defer.inlineCallbacks
def get_pushers_by(self, keyvalues):
ret = yield self._simple_select_list(
- "pushers", keyvalues,
+ "pushers",
+ keyvalues,
[
"id",
"user_name",
@@ -94,7 +92,8 @@ class PusherWorkerStore(SQLBaseStore):
"last_stream_ordering",
"last_success",
"failing_since",
- ], desc="get_pushers_by"
+ ],
+ desc="get_pushers_by",
)
defer.returnValue(self._decode_pushers_rows(ret))
@@ -135,6 +134,7 @@ class PusherWorkerStore(SQLBaseStore):
deleted = txn.fetchall()
return (updated, deleted)
+
return self.runInteraction(
"get_all_updated_pushers", get_all_updated_pushers_txn
)
@@ -177,6 +177,7 @@ class PusherWorkerStore(SQLBaseStore):
results.sort() # Sort so that they're ordered by stream id
return results
+
return self.runInteraction(
"get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
)
@@ -186,15 +187,19 @@ class PusherWorkerStore(SQLBaseStore):
# This only exists for the cachedList decorator
raise NotImplementedError()
- @cachedList(cached_method_name="get_if_user_has_pusher",
- list_name="user_ids", num_args=1, inlineCallbacks=True)
+ @cachedList(
+ cached_method_name="get_if_user_has_pusher",
+ list_name="user_ids",
+ num_args=1,
+ inlineCallbacks=True,
+ )
def get_if_users_have_pushers(self, user_ids):
rows = yield self._simple_select_many_batch(
table='pushers',
column='user_name',
iterable=user_ids,
retcols=['user_name'],
- desc='get_if_users_have_pushers'
+ desc='get_if_users_have_pushers',
)
result = {user_id: False for user_id in user_ids}
@@ -208,20 +213,27 @@ class PusherStore(PusherWorkerStore):
return self._pushers_id_gen.get_current_token()
@defer.inlineCallbacks
- def add_pusher(self, user_id, access_token, kind, app_id,
- app_display_name, device_display_name,
- pushkey, pushkey_ts, lang, data, last_stream_ordering,
- profile_tag=""):
+ def add_pusher(
+ self,
+ user_id,
+ access_token,
+ kind,
+ app_id,
+ app_display_name,
+ device_display_name,
+ pushkey,
+ pushkey_ts,
+ lang,
+ data,
+ last_stream_ordering,
+ profile_tag="",
+ ):
with self._pushers_id_gen.get_next() as stream_id:
# no need to lock because `pushers` has a unique key on
# (app_id, pushkey, user_name) so _simple_upsert will retry
yield self._simple_upsert(
table="pushers",
- keyvalues={
- "app_id": app_id,
- "pushkey": pushkey,
- "user_name": user_id,
- },
+ keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
values={
"access_token": access_token,
"kind": kind,
@@ -247,7 +259,8 @@ class PusherStore(PusherWorkerStore):
yield self.runInteraction(
"add_pusher",
self._invalidate_cache_and_stream,
- self.get_if_user_has_pusher, (user_id,)
+ self.get_if_user_has_pusher,
+ (user_id,),
)
@defer.inlineCallbacks
@@ -260,7 +273,7 @@ class PusherStore(PusherWorkerStore):
self._simple_delete_one_txn(
txn,
"pushers",
- {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}
+ {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
)
# it's possible for us to end up with duplicate rows for
@@ -278,13 +291,12 @@ class PusherStore(PusherWorkerStore):
)
with self._pushers_id_gen.get_next() as stream_id:
- yield self.runInteraction(
- "delete_pusher", delete_pusher_txn, stream_id
- )
+ yield self.runInteraction("delete_pusher", delete_pusher_txn, stream_id)
@defer.inlineCallbacks
- def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id,
- last_stream_ordering):
+ def update_pusher_last_stream_ordering(
+ self, app_id, pushkey, user_id, last_stream_ordering
+ ):
yield self._simple_update_one(
"pushers",
{'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
@@ -293,23 +305,21 @@ class PusherStore(PusherWorkerStore):
)
@defer.inlineCallbacks
- def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey,
- user_id,
- last_stream_ordering,
- last_success):
+ def update_pusher_last_stream_ordering_and_success(
+ self, app_id, pushkey, user_id, last_stream_ordering, last_success
+ ):
yield self._simple_update_one(
"pushers",
{'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
{
'last_stream_ordering': last_stream_ordering,
- 'last_success': last_success
+ 'last_success': last_success,
},
desc="update_pusher_last_stream_ordering_and_success",
)
@defer.inlineCallbacks
- def update_pusher_failing_since(self, app_id, pushkey, user_id,
- failing_since):
+ def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
yield self._simple_update_one(
"pushers",
{'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
@@ -323,14 +333,14 @@ class PusherStore(PusherWorkerStore):
"pusher_throttle",
{"pusher": pusher_id},
["room_id", "last_sent_ts", "throttle_ms"],
- desc="get_throttle_params_by_room"
+ desc="get_throttle_params_by_room",
)
params_by_room = {}
for row in res:
params_by_room[row["room_id"]] = {
"last_sent_ts": row["last_sent_ts"],
- "throttle_ms": row["throttle_ms"]
+ "throttle_ms": row["throttle_ms"],
}
defer.returnValue(params_by_room)
|