summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2018-05-29 00:25:22 +0100
committerMatthew Hodgson <matthew@matrix.org>2018-05-29 00:25:22 +0100
commit7a6df013cc8a128278d2ce7e5eb569e0b424f9b0 (patch)
tree5de624a65953eb96ab67274462d850a88c0cce3c /synapse/push
parentmake lazy_load_members configurable in filters (diff)
parentMerge pull request #3256 from matrix-org/3218-official-prom (diff)
downloadsynapse-7a6df013cc8a128278d2ce7e5eb569e0b424f9b0.tar.xz
merge develop
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py43
-rw-r--r--synapse/push/emailpusher.py11
-rw-r--r--synapse/push/httppusher.py22
-rw-r--r--synapse/push/push_rule_evaluator.py6
-rw-r--r--synapse/push/pusher.py2
-rw-r--r--synapse/push/pusherpool.py28
6 files changed, 60 insertions, 52 deletions
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 7c680659b6..a5cab1f043 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -22,35 +22,32 @@ from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
 from synapse.event_auth import get_user_power_level
 from synapse.api.constants import EventTypes, Membership
-from synapse.metrics import get_metrics_for
-from synapse.util.caches import metrics as cache_metrics
+from synapse.util.caches import register_cache
 from synapse.util.caches.descriptors import cached
 from synapse.util.async import Linearizer
 from synapse.state import POWER_KEY
 
 from collections import namedtuple
-
+from prometheus_client import Counter
+from six import itervalues, iteritems
 
 logger = logging.getLogger(__name__)
 
 
 rules_by_room = {}
 
-push_metrics = get_metrics_for(__name__)
 
-push_rules_invalidation_counter = push_metrics.register_counter(
-    "push_rules_invalidation_counter"
-)
-push_rules_state_size_counter = push_metrics.register_counter(
-    "push_rules_state_size_counter"
-)
+push_rules_invalidation_counter = Counter(
+    "synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter", "")
+push_rules_state_size_counter = Counter(
+    "synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter", "")
 
 # Measures whether we use the fast path of using state deltas, or if we have to
 # recalculate from scratch
-push_rules_delta_state_cache_metric = cache_metrics.register_cache(
+push_rules_delta_state_cache_metric = register_cache(
     "cache",
-    size_callback=lambda: 0,  # Meaningless size, as this isn't a cache that stores values
-    cache_name="push_rules_delta_state_cache_metric",
+    "push_rules_delta_state_cache_metric",
+    cache=[],  # Meaningless size, as this isn't a cache that stores values
 )
 
 
@@ -64,10 +61,10 @@ class BulkPushRuleEvaluator(object):
         self.store = hs.get_datastore()
         self.auth = hs.get_auth()
 
-        self.room_push_rule_cache_metrics = cache_metrics.register_cache(
+        self.room_push_rule_cache_metrics = register_cache(
             "cache",
-            size_callback=lambda: 0,  # There's not good value for this
-            cache_name="room_push_rule_cache",
+            "room_push_rule_cache",
+            cache=[],  # Meaningless size, as this isn't a cache that stores values
         )
 
     @defer.inlineCallbacks
@@ -126,7 +123,7 @@ class BulkPushRuleEvaluator(object):
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
-                (e.type, e.state_key): e for e in auth_events.itervalues()
+                (e.type, e.state_key): e for e in itervalues(auth_events)
             }
 
         sender_level = get_user_power_level(event.sender, auth_events)
@@ -160,7 +157,7 @@ class BulkPushRuleEvaluator(object):
 
         condition_cache = {}
 
-        for uid, rules in rules_by_user.iteritems():
+        for uid, rules in iteritems(rules_by_user):
             if event.sender == uid:
                 continue
 
@@ -309,7 +306,7 @@ class RulesForRoom(object):
                 current_state_ids = context.current_state_ids
                 push_rules_delta_state_cache_metric.inc_misses()
 
-            push_rules_state_size_counter.inc_by(len(current_state_ids))
+            push_rules_state_size_counter.inc(len(current_state_ids))
 
             logger.debug(
                 "Looking for member changes in %r %r", state_group, current_state_ids
@@ -406,7 +403,7 @@ class RulesForRoom(object):
         # If the event is a join event then it will be in current state evnts
         # map but not in the DB, so we have to explicitly insert it.
         if event.type == EventTypes.Member:
-            for event_id in member_event_ids.itervalues():
+            for event_id in itervalues(member_event_ids):
                 if event_id == event.event_id:
                     members[event_id] = (event.state_key, event.membership)
 
@@ -414,7 +411,7 @@ class RulesForRoom(object):
             logger.debug("Found members %r: %r", self.room_id, members.values())
 
         interested_in_user_ids = set(
-            user_id for user_id, membership in members.itervalues()
+            user_id for user_id, membership in itervalues(members)
             if membership == Membership.JOIN
         )
 
@@ -426,7 +423,7 @@ class RulesForRoom(object):
         )
 
         user_ids = set(
-            uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher
+            uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher
         )
 
         logger.debug("With pushers: %r", user_ids)
@@ -447,7 +444,7 @@ class RulesForRoom(object):
         )
 
         ret_rules_by_user.update(
-            item for item in rules_by_user.iteritems() if item[0] is not None
+            item for item in iteritems(rules_by_user) if item[0] is not None
         )
 
         self.update_cache(sequence, members, ret_rules_by_user, state_group)
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 58df98a793..ba7286cb72 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -77,10 +77,13 @@ class EmailPusher(object):
     @defer.inlineCallbacks
     def on_started(self):
         if self.mailer is not None:
-            self.throttle_params = yield self.store.get_throttle_params_by_room(
-                self.pusher_id
-            )
-            yield self._process()
+            try:
+                self.throttle_params = yield self.store.get_throttle_params_by_room(
+                    self.pusher_id
+                )
+                yield self._process()
+            except Exception:
+                logger.exception("Error starting email pusher")
 
     def on_stop(self):
         if self.timed_call:
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 2cbac571b8..bf7ff74a1a 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -18,24 +18,19 @@ import logging
 from twisted.internet import defer, reactor
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
-import push_rule_evaluator
-import push_tools
-import synapse
+from . import push_rule_evaluator
+from . import push_tools
 from synapse.push import PusherConfigException
 from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure
 
-logger = logging.getLogger(__name__)
+from prometheus_client import Counter
 
-metrics = synapse.metrics.get_metrics_for(__name__)
+logger = logging.getLogger(__name__)
 
-http_push_processed_counter = metrics.register_counter(
-    "http_pushes_processed",
-)
+http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "")
 
-http_push_failed_counter = metrics.register_counter(
-    "http_pushes_failed",
-)
+http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "")
 
 
 class HttpPusher(object):
@@ -94,7 +89,10 @@ class HttpPusher(object):
 
     @defer.inlineCallbacks
     def on_started(self):
-        yield self._process()
+        try:
+            yield self._process()
+        except Exception:
+            logger.exception("Error starting http pusher")
 
     @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 3601f2d365..cf735f7468 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -21,6 +21,8 @@ from synapse.types import UserID
 from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
 from synapse.util.caches.lrucache import LruCache
 
+from six import string_types
+
 logger = logging.getLogger(__name__)
 
 
@@ -150,7 +152,7 @@ class PushRuleEvaluatorForEvent(object):
 
 # Caches (glob, word_boundary) -> regex for push. See _glob_matches
 regex_cache = LruCache(50000 * CACHE_SIZE_FACTOR)
-register_cache("regex_push_cache", regex_cache)
+register_cache("cache", "regex_push_cache", regex_cache)
 
 
 def _glob_matches(glob, value, word_boundary=False):
@@ -238,7 +240,7 @@ def _flatten_dict(d, prefix=[], result=None):
     if result is None:
         result = {}
     for key, value in d.items():
-        if isinstance(value, basestring):
+        if isinstance(value, string_types):
             result[".".join(prefix + [key])] = value.lower()
         elif hasattr(value, "items"):
             _flatten_dict(value, prefix=(prefix + [key]), result=result)
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 71576330a9..5aa6667e91 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from httppusher import HttpPusher
+from .httppusher import HttpPusher
 
 import logging
 logger = logging.getLogger(__name__)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 134e89b371..750d11ca38 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -14,13 +14,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+
 from twisted.internet import defer
 
-from .pusher import PusherFactory
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.push.pusher import PusherFactory
 from synapse.util.async import run_on_reactor
-
-import logging
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
 logger = logging.getLogger(__name__)
 
@@ -137,12 +137,15 @@ class PusherPool:
                 if u in self.pushers:
                     for p in self.pushers[u].values():
                         deferreds.append(
-                            preserve_fn(p.on_new_notifications)(
-                                min_stream_id, max_stream_id
+                            run_in_background(
+                                p.on_new_notifications,
+                                min_stream_id, max_stream_id,
                             )
                         )
 
-            yield make_deferred_yieldable(defer.gatherResults(deferreds))
+            yield make_deferred_yieldable(
+                defer.gatherResults(deferreds, consumeErrors=True),
+            )
         except Exception:
             logger.exception("Exception in pusher on_new_notifications")
 
@@ -164,10 +167,15 @@ class PusherPool:
                 if u in self.pushers:
                     for p in self.pushers[u].values():
                         deferreds.append(
-                            preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id)
+                            run_in_background(
+                                p.on_new_receipts,
+                                min_stream_id, max_stream_id,
+                            )
                         )
 
-            yield make_deferred_yieldable(defer.gatherResults(deferreds))
+            yield make_deferred_yieldable(
+                defer.gatherResults(deferreds, consumeErrors=True),
+            )
         except Exception:
             logger.exception("Exception in pusher on_new_receipts")
 
@@ -207,7 +215,7 @@ class PusherPool:
                 if appid_pushkey in byuser:
                     byuser[appid_pushkey].on_stop()
                 byuser[appid_pushkey] = p
-                preserve_fn(p.on_started)()
+                run_in_background(p.on_started)
 
         logger.info("Started pushers")