summary refs log tree commit diff
path: root/synapse/storage/pusher.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/pusher.py')
-rw-r--r--synapse/storage/pusher.py86
1 files changed, 48 insertions, 38 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 134297e284..1567e1df48 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -47,7 +47,9 @@ class PusherWorkerStore(SQLBaseStore):
             except Exception as e:
                 logger.warn(
                     "Invalid JSON in data for pusher %d: %s, %s",
-                    r['id'], dataJson, e.args[0],
+                    r['id'],
+                    dataJson,
+                    e.args[0],
                 )
                 pass
 
@@ -64,20 +66,16 @@ class PusherWorkerStore(SQLBaseStore):
         defer.returnValue(ret is not None)
 
     def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
-        return self.get_pushers_by({
-            "app_id": app_id,
-            "pushkey": pushkey,
-        })
+        return self.get_pushers_by({"app_id": app_id, "pushkey": pushkey})
 
     def get_pushers_by_user_id(self, user_id):
-        return self.get_pushers_by({
-            "user_name": user_id,
-        })
+        return self.get_pushers_by({"user_name": user_id})
 
     @defer.inlineCallbacks
     def get_pushers_by(self, keyvalues):
         ret = yield self._simple_select_list(
-            "pushers", keyvalues,
+            "pushers",
+            keyvalues,
             [
                 "id",
                 "user_name",
@@ -94,7 +92,8 @@ class PusherWorkerStore(SQLBaseStore):
                 "last_stream_ordering",
                 "last_success",
                 "failing_since",
-            ], desc="get_pushers_by"
+            ],
+            desc="get_pushers_by",
         )
         defer.returnValue(self._decode_pushers_rows(ret))
 
@@ -135,6 +134,7 @@ class PusherWorkerStore(SQLBaseStore):
             deleted = txn.fetchall()
 
             return (updated, deleted)
+
         return self.runInteraction(
             "get_all_updated_pushers", get_all_updated_pushers_txn
         )
@@ -177,6 +177,7 @@ class PusherWorkerStore(SQLBaseStore):
             results.sort()  # Sort so that they're ordered by stream id
 
             return results
+
         return self.runInteraction(
             "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
         )
@@ -186,15 +187,19 @@ class PusherWorkerStore(SQLBaseStore):
         # This only exists for the cachedList decorator
         raise NotImplementedError()
 
-    @cachedList(cached_method_name="get_if_user_has_pusher",
-                list_name="user_ids", num_args=1, inlineCallbacks=True)
+    @cachedList(
+        cached_method_name="get_if_user_has_pusher",
+        list_name="user_ids",
+        num_args=1,
+        inlineCallbacks=True,
+    )
     def get_if_users_have_pushers(self, user_ids):
         rows = yield self._simple_select_many_batch(
             table='pushers',
             column='user_name',
             iterable=user_ids,
             retcols=['user_name'],
-            desc='get_if_users_have_pushers'
+            desc='get_if_users_have_pushers',
         )
 
         result = {user_id: False for user_id in user_ids}
@@ -208,20 +213,27 @@ class PusherStore(PusherWorkerStore):
         return self._pushers_id_gen.get_current_token()
 
     @defer.inlineCallbacks
-    def add_pusher(self, user_id, access_token, kind, app_id,
-                   app_display_name, device_display_name,
-                   pushkey, pushkey_ts, lang, data, last_stream_ordering,
-                   profile_tag=""):
+    def add_pusher(
+        self,
+        user_id,
+        access_token,
+        kind,
+        app_id,
+        app_display_name,
+        device_display_name,
+        pushkey,
+        pushkey_ts,
+        lang,
+        data,
+        last_stream_ordering,
+        profile_tag="",
+    ):
         with self._pushers_id_gen.get_next() as stream_id:
             # no need to lock because `pushers` has a unique key on
             # (app_id, pushkey, user_name) so _simple_upsert will retry
             yield self._simple_upsert(
                 table="pushers",
-                keyvalues={
-                    "app_id": app_id,
-                    "pushkey": pushkey,
-                    "user_name": user_id,
-                },
+                keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
                 values={
                     "access_token": access_token,
                     "kind": kind,
@@ -247,7 +259,8 @@ class PusherStore(PusherWorkerStore):
                 yield self.runInteraction(
                     "add_pusher",
                     self._invalidate_cache_and_stream,
-                    self.get_if_user_has_pusher, (user_id,)
+                    self.get_if_user_has_pusher,
+                    (user_id,),
                 )
 
     @defer.inlineCallbacks
@@ -260,7 +273,7 @@ class PusherStore(PusherWorkerStore):
             self._simple_delete_one_txn(
                 txn,
                 "pushers",
-                {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}
+                {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
             )
 
             # it's possible for us to end up with duplicate rows for
@@ -278,13 +291,12 @@ class PusherStore(PusherWorkerStore):
             )
 
         with self._pushers_id_gen.get_next() as stream_id:
-            yield self.runInteraction(
-                "delete_pusher", delete_pusher_txn, stream_id
-            )
+            yield self.runInteraction("delete_pusher", delete_pusher_txn, stream_id)
 
     @defer.inlineCallbacks
-    def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id,
-                                           last_stream_ordering):
+    def update_pusher_last_stream_ordering(
+        self, app_id, pushkey, user_id, last_stream_ordering
+    ):
         yield self._simple_update_one(
             "pushers",
             {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
@@ -293,23 +305,21 @@ class PusherStore(PusherWorkerStore):
         )
 
     @defer.inlineCallbacks
-    def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey,
-                                                       user_id,
-                                                       last_stream_ordering,
-                                                       last_success):
+    def update_pusher_last_stream_ordering_and_success(
+        self, app_id, pushkey, user_id, last_stream_ordering, last_success
+    ):
         yield self._simple_update_one(
             "pushers",
             {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
             {
                 'last_stream_ordering': last_stream_ordering,
-                'last_success': last_success
+                'last_success': last_success,
             },
             desc="update_pusher_last_stream_ordering_and_success",
         )
 
     @defer.inlineCallbacks
-    def update_pusher_failing_since(self, app_id, pushkey, user_id,
-                                    failing_since):
+    def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
         yield self._simple_update_one(
             "pushers",
             {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
@@ -323,14 +333,14 @@ class PusherStore(PusherWorkerStore):
             "pusher_throttle",
             {"pusher": pusher_id},
             ["room_id", "last_sent_ts", "throttle_ms"],
-            desc="get_throttle_params_by_room"
+            desc="get_throttle_params_by_room",
         )
 
         params_by_room = {}
         for row in res:
             params_by_room[row["room_id"]] = {
                 "last_sent_ts": row["last_sent_ts"],
-                "throttle_ms": row["throttle_ms"]
+                "throttle_ms": row["throttle_ms"],
             }
 
         defer.returnValue(params_by_room)