diff options
author | Richard van der Hoff <richard@matrix.org> | 2019-06-24 10:00:13 +0100 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2019-06-24 10:00:13 +0100 |
commit | 5097aee740b542407e5bb13d19a3e3e6c2227316 (patch) | |
tree | 09a03650256e09cd0b5df59dbf2d7bb2ba14df6c /synapse/push/pusherpool.py | |
parent | changelog (diff) | |
parent | Improve help and cmdline option names for --generate-config options (#5512) (diff) | |
download | synapse-5097aee740b542407e5bb13d19a3e3e6c2227316.tar.xz |
Merge branch 'develop' into rav/cleanup_metrics
Diffstat (limited to 'synapse/push/pusherpool.py')
-rw-r--r-- | synapse/push/pusherpool.py | 97 |
1 files changed, 53 insertions, 44 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 63c583565f..df6f670740 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -40,6 +40,7 @@ class PusherPool: notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and Pusher.on_new_receipts are not expected to return deferreds. """ + def __init__(self, _hs): self.hs = _hs self.pusher_factory = PusherFactory(_hs) @@ -57,9 +58,19 @@ class PusherPool: run_as_background_process("start_pushers", self._start_pushers) @defer.inlineCallbacks - def add_pusher(self, user_id, access_token, kind, app_id, - app_display_name, device_display_name, pushkey, lang, data, - profile_tag=""): + def add_pusher( + self, + user_id, + access_token, + kind, + app_id, + app_display_name, + device_display_name, + pushkey, + lang, + data, + profile_tag="", + ): """Creates a new pusher and adds it to the pool Returns: @@ -71,21 +82,23 @@ class PusherPool: # will then get pulled out of the database, # recreated, added and started: this means we have only one # code path adding pushers. - self.pusher_factory.create_pusher({ - "id": None, - "user_name": user_id, - "kind": kind, - "app_id": app_id, - "app_display_name": app_display_name, - "device_display_name": device_display_name, - "pushkey": pushkey, - "ts": time_now_msec, - "lang": lang, - "data": data, - "last_stream_ordering": None, - "last_success": None, - "failing_since": None - }) + self.pusher_factory.create_pusher( + { + "id": None, + "user_name": user_id, + "kind": kind, + "app_id": app_id, + "app_display_name": app_display_name, + "device_display_name": device_display_name, + "pushkey": pushkey, + "ts": time_now_msec, + "lang": lang, + "data": data, + "last_stream_ordering": None, + "last_success": None, + "failing_since": None, + } + ) # create the pusher setting last_stream_ordering to the current maximum # stream ordering in event_push_actions, so it will process @@ -113,18 +126,19 @@ class PusherPool: defer.returnValue(pusher) @defer.inlineCallbacks - def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey, - not_user_id): - to_remove = yield self.store.get_pushers_by_app_id_and_pushkey( - app_id, pushkey - ) + def remove_pushers_by_app_id_and_pushkey_not_user( + self, app_id, pushkey, not_user_id + ): + to_remove = yield self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey) for p in to_remove: - if p['user_name'] != not_user_id: + if p["user_name"] != not_user_id: logger.info( "Removing pusher for app id %s, pushkey %s, user %s", - app_id, pushkey, p['user_name'] + app_id, + pushkey, + p["user_name"], ) - yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) + yield self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"]) @defer.inlineCallbacks def remove_pushers_by_access_token(self, user_id, access_tokens): @@ -138,14 +152,14 @@ class PusherPool: """ tokens = set(access_tokens) for p in (yield self.store.get_pushers_by_user_id(user_id)): - if p['access_token'] in tokens: + if p["access_token"] in tokens: logger.info( "Removing pusher for app id %s, pushkey %s, user %s", - p['app_id'], p['pushkey'], p['user_name'] - ) - yield self.remove_pusher( - p['app_id'], p['pushkey'], p['user_name'], + p["app_id"], + p["pushkey"], + p["user_name"], ) + yield self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"]) @defer.inlineCallbacks def on_new_notifications(self, min_stream_id, max_stream_id): @@ -199,13 +213,11 @@ class PusherPool: if not self._should_start_pushers: return - resultlist = yield self.store.get_pushers_by_app_id_and_pushkey( - app_id, pushkey - ) + resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey) pusher_dict = None for r in resultlist: - if r['user_name'] == user_id: + if r["user_name"] == user_id: pusher_dict = r pusher = None @@ -245,9 +257,9 @@ class PusherPool: except PusherConfigException as e: logger.warning( "Pusher incorrectly configured user=%s, appid=%s, pushkey=%s: %s", - pusherdict.get('user_name'), - pusherdict.get('app_id'), - pusherdict.get('pushkey'), + pusherdict.get("user_name"), + pusherdict.get("app_id"), + pusherdict.get("pushkey"), e, ) return @@ -258,11 +270,8 @@ class PusherPool: if not p: return - appid_pushkey = "%s:%s" % ( - pusherdict['app_id'], - pusherdict['pushkey'], - ) - byuser = self.pushers.setdefault(pusherdict['user_name'], {}) + appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"]) + byuser = self.pushers.setdefault(pusherdict["user_name"], {}) if appid_pushkey in byuser: byuser[appid_pushkey].on_stop() @@ -275,7 +284,7 @@ class PusherPool: last_stream_ordering = pusherdict["last_stream_ordering"] if last_stream_ordering: have_notifs = yield self.store.get_if_maybe_push_in_range_for_user( - user_id, last_stream_ordering, + user_id, last_stream_ordering ) else: # We always want to default to starting up the pusher rather than |