diff options
Diffstat (limited to 'synapse/push/pusherpool.py')
-rw-r--r-- | synapse/push/pusherpool.py | 92 |
1 files changed, 69 insertions, 23 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 90babd7224..0ab2f65972 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -19,10 +19,7 @@ from twisted.internet import defer from httppusher import HttpPusher from synapse.push import PusherConfigException -from syutil.jsonutil import encode_canonical_json - import logging -import simplejson as json logger = logging.getLogger(__name__) @@ -52,12 +49,10 @@ class PusherPool: @defer.inlineCallbacks def start(self): pushers = yield self.store.get_all_pushers() - for p in pushers: - p['data'] = json.loads(p['data']) self._start_pushers(pushers) @defer.inlineCallbacks - def add_pusher(self, user_name, profile_tag, kind, app_id, + def add_pusher(self, user_name, access_token, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, lang, data): # we try to create the pusher just to validate the config: it # will then get pulled out of the database, @@ -71,7 +66,7 @@ class PusherPool: "app_display_name": app_display_name, "device_display_name": device_display_name, "pushkey": pushkey, - "pushkey_ts": self.hs.get_clock().time_msec(), + "ts": self.hs.get_clock().time_msec(), "lang": lang, "data": data, "last_token": None, @@ -79,17 +74,50 @@ class PusherPool: "failing_since": None }) yield self._add_pusher_to_store( - user_name, profile_tag, kind, app_id, + user_name, access_token, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, lang, data ) @defer.inlineCallbacks - def _add_pusher_to_store(self, user_name, profile_tag, kind, app_id, - app_display_name, device_display_name, + def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey, + not_user_id): + to_remove = yield self.store.get_pushers_by_app_id_and_pushkey( + app_id, pushkey + ) + for p in to_remove: + if p['user_name'] != not_user_id: + logger.info( + "Removing pusher for app id %s, pushkey %s, user %s", + app_id, pushkey, p['user_name'] + ) + self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) + + @defer.inlineCallbacks + def remove_pushers_by_user_access_token(self, user_id, not_access_token_id): + all = yield self.store.get_all_pushers() + logger.info( + "Removing all pushers for user %s except access token %s", + user_id, not_access_token_id + ) + for p in all: + if ( + p['user_name'] == user_id and + p['access_token'] != not_access_token_id + ): + logger.info( + "Removing pusher for app id %s, pushkey %s, user %s", + p['app_id'], p['pushkey'], p['user_name'] + ) + self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) + + @defer.inlineCallbacks + def _add_pusher_to_store(self, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, pushkey, lang, data): yield self.store.add_pusher( user_name=user_name, + access_token=access_token, profile_tag=profile_tag, kind=kind, app_id=app_id, @@ -98,9 +126,9 @@ class PusherPool: pushkey=pushkey, pushkey_ts=self.hs.get_clock().time_msec(), lang=lang, - data=encode_canonical_json(data).decode("UTF-8"), + data=data, ) - self._refresh_pusher((app_id, pushkey)) + self._refresh_pusher(app_id, pushkey, user_name) def _create_pusher(self, pusherdict): if pusherdict['kind'] == 'http': @@ -112,7 +140,7 @@ class PusherPool: app_display_name=pusherdict['app_display_name'], device_display_name=pusherdict['device_display_name'], pushkey=pusherdict['pushkey'], - pushkey_ts=pusherdict['pushkey_ts'], + pushkey_ts=pusherdict['ts'], data=pusherdict['data'], last_token=pusherdict['last_token'], last_success=pusherdict['last_success'], @@ -125,30 +153,48 @@ class PusherPool: ) @defer.inlineCallbacks - def _refresh_pusher(self, app_id_pushkey): - p = yield self.store.get_pushers_by_app_id_and_pushkey( - app_id_pushkey + def _refresh_pusher(self, app_id, pushkey, user_name): + resultlist = yield self.store.get_pushers_by_app_id_and_pushkey( + app_id, pushkey ) - p['data'] = json.loads(p['data']) - self._start_pushers([p]) + p = None + for r in resultlist: + if r['user_name'] == user_name: + p = r + + if p: + + self._start_pushers([p]) def _start_pushers(self, pushers): logger.info("Starting %d pushers", len(pushers)) for pusherdict in pushers: - p = self._create_pusher(pusherdict) + try: + p = self._create_pusher(pusherdict) + except PusherConfigException: + logger.exception("Couldn't start a pusher: caught PusherConfigException") + continue if p: - fullid = "%s:%s" % (pusherdict['app_id'], pusherdict['pushkey']) + fullid = "%s:%s:%s" % ( + pusherdict['app_id'], + pusherdict['pushkey'], + pusherdict['user_name'] + ) if fullid in self.pushers: self.pushers[fullid].stop() self.pushers[fullid] = p p.start() + logger.info("Started pushers") + @defer.inlineCallbacks - def remove_pusher(self, app_id, pushkey): - fullid = "%s:%s" % (app_id, pushkey) + def remove_pusher(self, app_id, pushkey, user_name): + fullid = "%s:%s:%s" % (app_id, pushkey, user_name) if fullid in self.pushers: logger.info("Stopping pusher %s", fullid) self.pushers[fullid].stop() del self.pushers[fullid] - yield self.store.delete_pusher_by_app_id_pushkey(app_id, pushkey) + yield self.store.delete_pusher_by_app_id_pushkey_user_name( + app_id, pushkey, user_name + ) |