summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--changelog.d/5809.misc1
-rw-r--r--synapse/push/emailpusher.py18
-rw-r--r--synapse/push/httppusher.py27
-rw-r--r--synapse/storage/pusher.py30
4 files changed, 55 insertions, 21 deletions
diff --git a/changelog.d/5809.misc b/changelog.d/5809.misc
new file mode 100644
index 0000000000..82a812480e
--- /dev/null
+++ b/changelog.d/5809.misc
@@ -0,0 +1 @@
+Handle pusher being deleted during processing rather than logging an exception.
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 424ffa8b68..42e5b0c0a5 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -234,13 +234,19 @@ class EmailPusher(object):
             return
 
         self.last_stream_ordering = last_stream_ordering
-        yield self.store.update_pusher_last_stream_ordering_and_success(
-            self.app_id,
-            self.email,
-            self.user_id,
-            last_stream_ordering,
-            self.clock.time_msec(),
+        pusher_still_exists = (
+            yield self.store.update_pusher_last_stream_ordering_and_success(
+                self.app_id,
+                self.email,
+                self.user_id,
+                last_stream_ordering,
+                self.clock.time_msec(),
+            )
         )
+        if not pusher_still_exists:
+            # The pusher has been deleted while we were processing, so
+            # lets just stop and return.
+            self.on_stop()
 
     def seconds_until(self, ts_msec):
         secs = (ts_msec - self.clock.time_msec()) / 1000
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 5b15b0dbe7..bd5d53af91 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -199,13 +199,21 @@ class HttpPusher(object):
                 http_push_processed_counter.inc()
                 self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
                 self.last_stream_ordering = push_action["stream_ordering"]
-                yield self.store.update_pusher_last_stream_ordering_and_success(
-                    self.app_id,
-                    self.pushkey,
-                    self.user_id,
-                    self.last_stream_ordering,
-                    self.clock.time_msec(),
+                pusher_still_exists = (
+                    yield self.store.update_pusher_last_stream_ordering_and_success(
+                        self.app_id,
+                        self.pushkey,
+                        self.user_id,
+                        self.last_stream_ordering,
+                        self.clock.time_msec(),
+                    )
                 )
+                if not pusher_still_exists:
+                    # The pusher has been deleted while we were processing, so
+                    # lets just stop and return.
+                    self.on_stop()
+                    return
+
                 if self.failing_since:
                     self.failing_since = None
                     yield self.store.update_pusher_failing_since(
@@ -234,12 +242,17 @@ class HttpPusher(object):
                     )
                     self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
                     self.last_stream_ordering = push_action["stream_ordering"]
-                    yield self.store.update_pusher_last_stream_ordering(
+                    pusher_still_exists = yield self.store.update_pusher_last_stream_ordering(
                         self.app_id,
                         self.pushkey,
                         self.user_id,
                         self.last_stream_ordering,
                     )
+                    if not pusher_still_exists:
+                        # The pusher has been deleted while we were processing, so
+                        # lets just stop and return.
+                        self.on_stop()
+                        return
 
                     self.failing_since = None
                     yield self.store.update_pusher_failing_since(
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index be3d4d9ded..b431d24b8a 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -308,22 +308,36 @@ class PusherStore(PusherWorkerStore):
     def update_pusher_last_stream_ordering_and_success(
         self, app_id, pushkey, user_id, last_stream_ordering, last_success
     ):
-        yield self._simple_update_one(
-            "pushers",
-            {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
-            {
+        """Update the last stream ordering position we've processed up to for
+        the given pusher.
+
+        Args:
+            app_id (str)
+            pushkey (str)
+            last_stream_ordering (int)
+            last_success (int)
+
+        Returns:
+            Deferred[bool]: True if the pusher still exists; False if it has been deleted.
+        """
+        updated = yield self._simple_update(
+            table="pushers",
+            keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+            updatevalues={
                 "last_stream_ordering": last_stream_ordering,
                 "last_success": last_success,
             },
             desc="update_pusher_last_stream_ordering_and_success",
         )
 
+        return bool(updated)
+
     @defer.inlineCallbacks
     def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
-        yield self._simple_update_one(
-            "pushers",
-            {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
-            {"failing_since": failing_since},
+        yield self._simple_update(
+            table="pushers",
+            keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+            updatevalues={"failing_since": failing_since},
             desc="update_pusher_failing_since",
         )