diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 7c069b662e..134e89b371 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -17,7 +17,7 @@
from twisted.internet import defer
from .pusher import PusherFactory
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.async import run_on_reactor
import logging
@@ -103,19 +103,25 @@ class PusherPool:
yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
@defer.inlineCallbacks
- def remove_pushers_by_user(self, user_id, except_access_token_id=None):
- all = yield self.store.get_all_pushers()
- logger.info(
- "Removing all pushers for user %s except access tokens id %r",
- user_id, except_access_token_id
- )
- for p in all:
- if p['user_name'] == user_id and p['access_token'] != except_access_token_id:
+ def remove_pushers_by_access_token(self, user_id, access_tokens):
+ """Remove the pushers for a given user corresponding to a set of
+ access_tokens.
+
+ Args:
+ user_id (str): user to remove pushers for
+ access_tokens (Iterable[int]): access token *ids* to remove pushers
+ for
+ """
+ tokens = set(access_tokens)
+ for p in (yield self.store.get_pushers_by_user_id(user_id)):
+ if p['access_token'] in tokens:
logger.info(
"Removing pusher for app id %s, pushkey %s, user %s",
p['app_id'], p['pushkey'], p['user_name']
)
- yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+ yield self.remove_pusher(
+ p['app_id'], p['pushkey'], p['user_name'],
+ )
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_id, max_stream_id):
@@ -136,7 +142,7 @@ class PusherPool:
)
)
- yield preserve_context_over_deferred(defer.gatherResults(deferreds))
+ yield make_deferred_yieldable(defer.gatherResults(deferreds))
except Exception:
logger.exception("Exception in pusher on_new_notifications")
@@ -161,7 +167,7 @@ class PusherPool:
preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id)
)
- yield preserve_context_over_deferred(defer.gatherResults(deferreds))
+ yield make_deferred_yieldable(defer.gatherResults(deferreds))
except Exception:
logger.exception("Exception in pusher on_new_receipts")
|