summary refs log tree commit diff
path: root/synapse/storage/pusher.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/pusher.py')
-rw-r--r--synapse/storage/pusher.py40
1 files changed, 27 insertions, 13 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index cfe0a94330..b431d24b8a 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -63,7 +63,7 @@ class PusherWorkerStore(SQLBaseStore):
         ret = yield self._simple_select_one_onecol(
             "pushers", {"user_name": user_id}, "id", allow_none=True
         )
-        defer.returnValue(ret is not None)
+        return ret is not None
 
     def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
         return self.get_pushers_by({"app_id": app_id, "pushkey": pushkey})
@@ -95,7 +95,7 @@ class PusherWorkerStore(SQLBaseStore):
             ],
             desc="get_pushers_by",
         )
-        defer.returnValue(self._decode_pushers_rows(ret))
+        return self._decode_pushers_rows(ret)
 
     @defer.inlineCallbacks
     def get_all_pushers(self):
@@ -106,7 +106,7 @@ class PusherWorkerStore(SQLBaseStore):
             return self._decode_pushers_rows(rows)
 
         rows = yield self.runInteraction("get_all_pushers", get_pushers)
-        defer.returnValue(rows)
+        return rows
 
     def get_all_updated_pushers(self, last_id, current_id, limit):
         if last_id == current_id:
@@ -205,7 +205,7 @@ class PusherWorkerStore(SQLBaseStore):
         result = {user_id: False for user_id in user_ids}
         result.update({r["user_name"]: True for r in rows})
 
-        defer.returnValue(result)
+        return result
 
 
 class PusherStore(PusherWorkerStore):
@@ -308,22 +308,36 @@ class PusherStore(PusherWorkerStore):
     def update_pusher_last_stream_ordering_and_success(
         self, app_id, pushkey, user_id, last_stream_ordering, last_success
     ):
-        yield self._simple_update_one(
-            "pushers",
-            {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
-            {
+        """Update the last stream ordering position we've processed up to for
+        the given pusher.
+
+        Args:
+            app_id (str)
+            pushkey (str)
+            last_stream_ordering (int)
+            last_success (int)
+
+        Returns:
+            Deferred[bool]: True if the pusher still exists; False if it has been deleted.
+        """
+        updated = yield self._simple_update(
+            table="pushers",
+            keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+            updatevalues={
                 "last_stream_ordering": last_stream_ordering,
                 "last_success": last_success,
             },
             desc="update_pusher_last_stream_ordering_and_success",
         )
 
+        return bool(updated)
+
     @defer.inlineCallbacks
     def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
-        yield self._simple_update_one(
-            "pushers",
-            {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
-            {"failing_since": failing_since},
+        yield self._simple_update(
+            table="pushers",
+            keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+            updatevalues={"failing_since": failing_since},
             desc="update_pusher_failing_since",
         )
 
@@ -343,7 +357,7 @@ class PusherStore(PusherWorkerStore):
                 "throttle_ms": row["throttle_ms"],
             }
 
-        defer.returnValue(params_by_room)
+        return params_by_room
 
     @defer.inlineCallbacks
     def set_throttle_params(self, pusher_id, room_id, params):