diff --git a/changelog.d/5809.misc b/changelog.d/5809.misc
new file mode 100644
index 0000000000..82a812480e
--- /dev/null
+++ b/changelog.d/5809.misc
@@ -0,0 +1 @@
+Handle pusher being deleted during processing rather than logging an exception.
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 424ffa8b68..42e5b0c0a5 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -234,13 +234,19 @@ class EmailPusher(object):
return
self.last_stream_ordering = last_stream_ordering
- yield self.store.update_pusher_last_stream_ordering_and_success(
- self.app_id,
- self.email,
- self.user_id,
- last_stream_ordering,
- self.clock.time_msec(),
+ pusher_still_exists = (
+ yield self.store.update_pusher_last_stream_ordering_and_success(
+ self.app_id,
+ self.email,
+ self.user_id,
+ last_stream_ordering,
+ self.clock.time_msec(),
+ )
)
+ if not pusher_still_exists:
+ # The pusher has been deleted while we were processing, so
+ # lets just stop and return.
+ self.on_stop()
def seconds_until(self, ts_msec):
secs = (ts_msec - self.clock.time_msec()) / 1000
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 5b15b0dbe7..bd5d53af91 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -199,13 +199,21 @@ class HttpPusher(object):
http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"]
- yield self.store.update_pusher_last_stream_ordering_and_success(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.last_stream_ordering,
- self.clock.time_msec(),
+ pusher_still_exists = (
+ yield self.store.update_pusher_last_stream_ordering_and_success(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.last_stream_ordering,
+ self.clock.time_msec(),
+ )
)
+ if not pusher_still_exists:
+ # The pusher has been deleted while we were processing, so
+ # lets just stop and return.
+ self.on_stop()
+ return
+
if self.failing_since:
self.failing_since = None
yield self.store.update_pusher_failing_since(
@@ -234,12 +242,17 @@ class HttpPusher(object):
)
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"]
- yield self.store.update_pusher_last_stream_ordering(
+ pusher_still_exists = yield self.store.update_pusher_last_stream_ordering(
self.app_id,
self.pushkey,
self.user_id,
self.last_stream_ordering,
)
+ if not pusher_still_exists:
+ # The pusher has been deleted while we were processing, so
+ # lets just stop and return.
+ self.on_stop()
+ return
self.failing_since = None
yield self.store.update_pusher_failing_since(
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index be3d4d9ded..b431d24b8a 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -308,22 +308,36 @@ class PusherStore(PusherWorkerStore):
def update_pusher_last_stream_ordering_and_success(
self, app_id, pushkey, user_id, last_stream_ordering, last_success
):
- yield self._simple_update_one(
- "pushers",
- {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
- {
+ """Update the last stream ordering position we've processed up to for
+ the given pusher.
+
+ Args:
+ app_id (str)
+ pushkey (str)
+ last_stream_ordering (int)
+ last_success (int)
+
+ Returns:
+ Deferred[bool]: True if the pusher still exists; False if it has been deleted.
+ """
+ updated = yield self._simple_update(
+ table="pushers",
+ keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+ updatevalues={
"last_stream_ordering": last_stream_ordering,
"last_success": last_success,
},
desc="update_pusher_last_stream_ordering_and_success",
)
+ return bool(updated)
+
@defer.inlineCallbacks
def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
- yield self._simple_update_one(
- "pushers",
- {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
- {"failing_since": failing_since},
+ yield self._simple_update(
+ table="pushers",
+ keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+ updatevalues={"failing_since": failing_since},
desc="update_pusher_failing_since",
)
|