diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/__init__.py | 2 | ||||
-rw-r--r-- | synapse/config/repository.py | 2 | ||||
-rw-r--r-- | synapse/push/emailpusher.py | 18 | ||||
-rw-r--r-- | synapse/push/httppusher.py | 27 | ||||
-rw-r--r-- | synapse/storage/pusher.py | 30 |
5 files changed, 56 insertions, 23 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index 8301a13d8f..d2316c7df9 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -35,4 +35,4 @@ try: except ImportError: pass -__version__ = "1.2.1" +__version__ = "1.3.0rc1" diff --git a/synapse/config/repository.py b/synapse/config/repository.py index db39697e45..fdb1f246d0 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -93,7 +93,7 @@ class ContentRepositoryConfig(Config): # current worker app is the media repo. if ( self.enable_media_repo is False - and config.worker_app != "synapse.app.media_repository" + and config.get("worker_app") != "synapse.app.media_repository" ): self.can_load_media_repo = False return diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 424ffa8b68..42e5b0c0a5 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -234,13 +234,19 @@ class EmailPusher(object): return self.last_stream_ordering = last_stream_ordering - yield self.store.update_pusher_last_stream_ordering_and_success( - self.app_id, - self.email, - self.user_id, - last_stream_ordering, - self.clock.time_msec(), + pusher_still_exists = ( + yield self.store.update_pusher_last_stream_ordering_and_success( + self.app_id, + self.email, + self.user_id, + last_stream_ordering, + self.clock.time_msec(), + ) ) + if not pusher_still_exists: + # The pusher has been deleted while we were processing, so + # lets just stop and return. + self.on_stop() def seconds_until(self, ts_msec): secs = (ts_msec - self.clock.time_msec()) / 1000 diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 5b15b0dbe7..bd5d53af91 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -199,13 +199,21 @@ class HttpPusher(object): http_push_processed_counter.inc() self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action["stream_ordering"] - yield self.store.update_pusher_last_stream_ordering_and_success( - self.app_id, - self.pushkey, - self.user_id, - self.last_stream_ordering, - self.clock.time_msec(), + pusher_still_exists = ( + yield self.store.update_pusher_last_stream_ordering_and_success( + self.app_id, + self.pushkey, + self.user_id, + self.last_stream_ordering, + self.clock.time_msec(), + ) ) + if not pusher_still_exists: + # The pusher has been deleted while we were processing, so + # lets just stop and return. + self.on_stop() + return + if self.failing_since: self.failing_since = None yield self.store.update_pusher_failing_since( @@ -234,12 +242,17 @@ class HttpPusher(object): ) self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action["stream_ordering"] - yield self.store.update_pusher_last_stream_ordering( + pusher_still_exists = yield self.store.update_pusher_last_stream_ordering( self.app_id, self.pushkey, self.user_id, self.last_stream_ordering, ) + if not pusher_still_exists: + # The pusher has been deleted while we were processing, so + # lets just stop and return. + self.on_stop() + return self.failing_since = None yield self.store.update_pusher_failing_since( diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index be3d4d9ded..b431d24b8a 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -308,22 +308,36 @@ class PusherStore(PusherWorkerStore): def update_pusher_last_stream_ordering_and_success( self, app_id, pushkey, user_id, last_stream_ordering, last_success ): - yield self._simple_update_one( - "pushers", - {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, - { + """Update the last stream ordering position we've processed up to for + the given pusher. + + Args: + app_id (str) + pushkey (str) + last_stream_ordering (int) + last_success (int) + + Returns: + Deferred[bool]: True if the pusher still exists; False if it has been deleted. + """ + updated = yield self._simple_update( + table="pushers", + keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + updatevalues={ "last_stream_ordering": last_stream_ordering, "last_success": last_success, }, desc="update_pusher_last_stream_ordering_and_success", ) + return bool(updated) + @defer.inlineCallbacks def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since): - yield self._simple_update_one( - "pushers", - {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, - {"failing_since": failing_since}, + yield self._simple_update( + table="pushers", + keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + updatevalues={"failing_since": failing_since}, desc="update_pusher_failing_since", ) |