summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/action_generator.py6
-rw-r--r--synapse/push/baserules.py3
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py24
-rw-r--r--synapse/push/clientformat.py6
-rw-r--r--synapse/push/emailpusher.py11
-rw-r--r--synapse/push/httppusher.py12
-rw-r--r--synapse/push/mailer.py33
-rw-r--r--synapse/push/presentable_names.py6
-rw-r--r--synapse/push/push_rule_evaluator.py4
-rw-r--r--synapse/push/push_tools.py5
-rw-r--r--synapse/push/pusher.py3
-rw-r--r--synapse/push/pusherpool.py20
12 files changed, 73 insertions, 60 deletions
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index 8f619a7a1b..a5de75c48a 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -13,13 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+import logging
 
-from .bulk_push_rule_evaluator import BulkPushRuleEvaluator
+from twisted.internet import defer
 
 from synapse.util.metrics import Measure
 
-import logging
+from .bulk_push_rule_evaluator import BulkPushRuleEvaluator
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index a8ae7bcd6c..8f0682c948 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -13,9 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.push.rulekinds import PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
 import copy
 
+from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP
+
 
 def list_with_base_rules(rawrules):
     """Combine the list of rules set by the user with the default push rules
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index a5cab1f043..8f9a76147f 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -15,21 +15,22 @@
 # limitations under the License.
 
 import logging
+from collections import namedtuple
 
-from twisted.internet import defer
+from six import iteritems, itervalues
 
-from .push_rule_evaluator import PushRuleEvaluatorForEvent
+from prometheus_client import Counter
+
+from twisted.internet import defer
 
-from synapse.event_auth import get_user_power_level
 from synapse.api.constants import EventTypes, Membership
+from synapse.event_auth import get_user_power_level
+from synapse.state import POWER_KEY
+from synapse.util.async_helpers import Linearizer
 from synapse.util.caches import register_cache
 from synapse.util.caches.descriptors import cached
-from synapse.util.async import Linearizer
-from synapse.state import POWER_KEY
 
-from collections import namedtuple
-from prometheus_client import Counter
-from six import itervalues, iteritems
+from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
 logger = logging.getLogger(__name__)
 
@@ -111,7 +112,8 @@ class BulkPushRuleEvaluator(object):
 
     @defer.inlineCallbacks
     def _get_power_levels_and_sender_level(self, event, context):
-        pl_event_id = context.prev_state_ids.get(POWER_KEY)
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        pl_event_id = prev_state_ids.get(POWER_KEY)
         if pl_event_id:
             # fastpath: if there's a power level event, that's all we need, and
             # not having a power level event is an extreme edge case
@@ -119,7 +121,7 @@ class BulkPushRuleEvaluator(object):
             auth_events = {POWER_KEY: pl_event}
         else:
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.prev_state_ids, for_verification=False,
+                event, prev_state_ids, for_verification=False,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -303,7 +305,7 @@ class RulesForRoom(object):
 
                 push_rules_delta_state_cache_metric.inc_hits()
             else:
-                current_state_ids = context.current_state_ids
+                current_state_ids = yield context.get_current_state_ids(self.store)
                 push_rules_delta_state_cache_metric.inc_misses()
 
             push_rules_state_size_counter.inc(len(current_state_ids))
diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
index e0331b2d2d..ecbf364a5e 100644
--- a/synapse/push/clientformat.py
+++ b/synapse/push/clientformat.py
@@ -13,12 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.push.rulekinds import (
-    PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
-)
-
 import copy
 
+from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP
+
 
 def format_push_rules_for_user(user, ruleslist):
     """Converts a list of rawrules and a enabled map into nested dictionaries
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index ba7286cb72..d746371420 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -13,14 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer, reactor
-from twisted.internet.error import AlreadyCalled, AlreadyCancelled
-
 import logging
 
-from synapse.util.metrics import Measure
-from synapse.util.logcontext import LoggingContext
+from twisted.internet import defer
+from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
+from synapse.util.logcontext import LoggingContext
+from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 
@@ -199,7 +198,7 @@ class EmailPusher(object):
                     self.timed_call = None
 
         if soonest_due_at is not None:
-            self.timed_call = reactor.callLater(
+            self.timed_call = self.hs.get_reactor().callLater(
                 self.seconds_until(soonest_due_at), self.on_timer
             )
 
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index bf7ff74a1a..81e18bcf7d 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -15,16 +15,16 @@
 # limitations under the License.
 import logging
 
-from twisted.internet import defer, reactor
+from prometheus_client import Counter
+
+from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
-from . import push_rule_evaluator
-from . import push_tools
 from synapse.push import PusherConfigException
 from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure
 
-from prometheus_client import Counter
+from . import push_rule_evaluator, push_tools
 
 logger = logging.getLogger(__name__)
 
@@ -220,7 +220,9 @@ class HttpPusher(object):
                     )
                 else:
                     logger.info("Push failed: delaying for %ds", self.backoff_delay)
-                    self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
+                    self.timed_call = self.hs.get_reactor().callLater(
+                        self.backoff_delay, self.on_timer
+                    )
                     self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
                     break
 
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index d4be800e5e..bfa6df7b68 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -13,30 +13,31 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
-from twisted.mail.smtp import sendmail
-
-import email.utils
 import email.mime.multipart
-from email.mime.text import MIMEText
+import email.utils
+import logging
+import time
+import urllib
 from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
 
-from synapse.util.async import concurrently_execute
+import bleach
+import jinja2
+
+from twisted.internet import defer
+from twisted.mail.smtp import sendmail
+
+from synapse.api.constants import EventTypes
+from synapse.api.errors import StoreError
 from synapse.push.presentable_names import (
-    calculate_room_name, name_from_member_event, descriptor_from_member_events
+    calculate_room_name,
+    descriptor_from_member_events,
+    name_from_member_event,
 )
 from synapse.types import UserID
-from synapse.api.errors import StoreError
-from synapse.api.constants import EventTypes
+from synapse.util.async_helpers import concurrently_execute
 from synapse.visibility import filter_events_for_client
 
-import jinja2
-import bleach
-
-import time
-import urllib
-
-import logging
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/push/presentable_names.py b/synapse/push/presentable_names.py
index 43f0c74ff3..eef6e18c2e 100644
--- a/synapse/push/presentable_names.py
+++ b/synapse/push/presentable_names.py
@@ -13,10 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
-
-import re
 import logging
+import re
+
+from twisted.internet import defer
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index cf735f7468..2bd321d530 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -17,12 +17,12 @@
 import logging
 import re
 
+from six import string_types
+
 from synapse.types import UserID
 from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
 from synapse.util.caches.lrucache import LruCache
 
-from six import string_types
-
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index 6835f54e97..8049c298c2 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -14,9 +14,8 @@
 # limitations under the License.
 
 from twisted.internet import defer
-from synapse.push.presentable_names import (
-    calculate_room_name, name_from_member_event
-)
+
+from synapse.push.presentable_names import calculate_room_name, name_from_member_event
 
 
 @defer.inlineCallbacks
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 5aa6667e91..fcee6d9d7e 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -13,9 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+
 from .httppusher import HttpPusher
 
-import logging
 logger = logging.getLogger(__name__)
 
 # We try importing this if we can (it will fail if we don't
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 750d11ca38..9f7d5ef217 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -18,8 +18,8 @@ import logging
 
 from twisted.internet import defer
 
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.push.pusher import PusherFactory
-from synapse.util.async import run_on_reactor
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
 logger = logging.getLogger(__name__)
@@ -123,9 +123,14 @@ class PusherPool:
                     p['app_id'], p['pushkey'], p['user_name'],
                 )
 
-    @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_id, max_stream_id):
-        yield run_on_reactor()
+        run_as_background_process(
+            "on_new_notifications",
+            self._on_new_notifications, min_stream_id, max_stream_id,
+        )
+
+    @defer.inlineCallbacks
+    def _on_new_notifications(self, min_stream_id, max_stream_id):
         try:
             users_affected = yield self.store.get_push_action_users_in_range(
                 min_stream_id, max_stream_id
@@ -149,9 +154,14 @@ class PusherPool:
         except Exception:
             logger.exception("Exception in pusher on_new_notifications")
 
-    @defer.inlineCallbacks
     def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
-        yield run_on_reactor()
+        run_as_background_process(
+            "on_new_receipts",
+            self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids,
+        )
+
+    @defer.inlineCallbacks
+    def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
         try:
             # Need to subtract 1 from the minimum because the lower bound here
             # is not inclusive