summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/__init__.py86
-rw-r--r--synapse/push/action_generator.py22
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py10
-rw-r--r--synapse/push/push_rule_evaluator.py2
-rw-r--r--synapse/push/pusherpool.py9
5 files changed, 70 insertions, 59 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 9bc0b356f4..8da2d8716c 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -17,6 +17,8 @@ from twisted.internet import defer
 
 from synapse.streams.config import PaginationConfig
 from synapse.types import StreamToken
+from synapse.util.logcontext import LoggingContext
+from synapse.util.metrics import Measure
 
 import synapse.util.async
 import push_rule_evaluator as push_rule_evaluator
@@ -27,6 +29,16 @@ import random
 logger = logging.getLogger(__name__)
 
 
+_NEXT_ID = 1
+
+
+def _get_next_id():
+    global _NEXT_ID
+    _id = _NEXT_ID
+    _NEXT_ID += 1
+    return _id
+
+
 # Pushers could now be moved to pull out of the event_push_actions table instead
 # of listening on the event stream: this would avoid them having to run the
 # rules again.
@@ -57,6 +69,8 @@ class Pusher(object):
         self.alive = True
         self.badge = None
 
+        self.name = "Pusher-%d" % (_get_next_id(),)
+
         # The last value of last_active_time that we saw
         self.last_last_active_time = 0
         self.has_unread = True
@@ -86,38 +100,46 @@ class Pusher(object):
 
     @defer.inlineCallbacks
     def start(self):
-        if not self.last_token:
-            # First-time setup: get a token to start from (we can't
-            # just start from no token, ie. 'now'
-            # because we need the result to be reproduceable in case
-            # we fail to dispatch the push)
-            config = PaginationConfig(from_token=None, limit='1')
-            chunk = yield self.evStreamHandler.get_stream(
-                self.user_id, config, timeout=0, affect_presence=False
-            )
-            self.last_token = chunk['end']
-            self.store.update_pusher_last_token(
-                self.app_id, self.pushkey, self.user_id, self.last_token
-            )
-            logger.info("Pusher %s for user %s starting from token %s",
-                        self.pushkey, self.user_id, self.last_token)
-
-        wait = 0
-        while self.alive:
-            try:
-                if wait > 0:
-                    yield synapse.util.async.sleep(wait)
-                yield self.get_and_dispatch()
-                wait = 0
-            except:
-                if wait == 0:
-                    wait = 1
-                else:
-                    wait = min(wait * 2, 1800)
-                logger.exception(
-                    "Exception in pusher loop for pushkey %s. Pausing for %ds",
-                    self.pushkey, wait
+        with LoggingContext(self.name):
+            if not self.last_token:
+                # First-time setup: get a token to start from (we can't
+                # just start from no token, ie. 'now'
+                # because we need the result to be reproduceable in case
+                # we fail to dispatch the push)
+                config = PaginationConfig(from_token=None, limit='1')
+                chunk = yield self.evStreamHandler.get_stream(
+                    self.user_id, config, timeout=0, affect_presence=False
                 )
+                self.last_token = chunk['end']
+                yield self.store.update_pusher_last_token(
+                    self.app_id, self.pushkey, self.user_id, self.last_token
+                )
+                logger.info("New pusher %s for user %s starting from token %s",
+                            self.pushkey, self.user_id, self.last_token)
+
+            else:
+                logger.info(
+                    "Old pusher %s for user %s starting",
+                    self.pushkey, self.user_id,
+                )
+
+            wait = 0
+            while self.alive:
+                try:
+                    if wait > 0:
+                        yield synapse.util.async.sleep(wait)
+                    with Measure(self.clock, "push"):
+                        yield self.get_and_dispatch()
+                    wait = 0
+                except:
+                    if wait == 0:
+                        wait = 1
+                    else:
+                        wait = min(wait * 2, 1800)
+                    logger.exception(
+                        "Exception in pusher loop for pushkey %s. Pausing for %ds",
+                        self.pushkey, wait
+                    )
 
     @defer.inlineCallbacks
     def get_and_dispatch(self):
@@ -316,7 +338,7 @@ class Pusher(object):
                         r.room_id, self.user_id, last_unread_event_id
                     )
                 )
-                badge += len(notifs)
+                badge += notifs["notify_count"]
         defer.returnValue(badge)
 
 
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index 1d2e558f9a..e0da0868ec 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -19,8 +19,6 @@ import bulk_push_rule_evaluator
 
 import logging
 
-from synapse.api.constants import EventTypes
-
 logger = logging.getLogger(__name__)
 
 
@@ -36,21 +34,15 @@ class ActionGenerator:
         # tag (ie. we just need all the users).
 
     @defer.inlineCallbacks
-    def handle_push_actions_for_event(self, event, handler):
-        if event.type == EventTypes.Redaction and event.redacts is not None:
-            yield self.store.remove_push_actions_for_event_id(
-                event.room_id, event.redacts
-            )
-
+    def handle_push_actions_for_event(self, event, context, handler):
         bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id(
             event.room_id, self.hs, self.store
         )
 
-        actions_by_user = yield bulk_evaluator.action_for_event_by_user(event, handler)
-
-        yield self.store.set_push_actions_for_event_and_users(
-            event,
-            [
-                (uid, None, actions) for uid, actions in actions_by_user.items()
-            ]
+        actions_by_user = yield bulk_evaluator.action_for_event_by_user(
+            event, handler, context.current_state
         )
+
+        context.push_actions = [
+            (uid, None, actions) for uid, actions in actions_by_user.items()
+        ]
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 20c60422bf..8ac5ceb9ef 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -98,25 +98,21 @@ class BulkPushRuleEvaluator:
         self.store = store
 
     @defer.inlineCallbacks
-    def action_for_event_by_user(self, event, handler):
+    def action_for_event_by_user(self, event, handler, current_state):
         actions_by_user = {}
 
         users_dict = yield self.store.are_guests(self.rules_by_user.keys())
 
         filtered_by_user = yield handler._filter_events_for_clients(
-            users_dict.items(), [event]
+            users_dict.items(), [event], {event.event_id: current_state}
         )
 
         evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room))
 
         condition_cache = {}
 
-        member_state = yield self.store.get_state_for_event(
-            event.event_id,
-        )
-
         display_names = {}
-        for ev in member_state.values():
+        for ev in current_state.values():
             nm = ev.content.get("displayname", None)
             if nm and ev.type == EventTypes.Member:
                 display_names[ev.state_key] = nm
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index dca018af95..2a2b4437dc 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -304,7 +304,7 @@ def _flatten_dict(d, prefix=[], result={}):
         if isinstance(value, basestring):
             result[".".join(prefix + [key])] = value.lower()
         elif hasattr(value, "items"):
-            _flatten_dict(value, prefix=(prefix+[key]), result=result)
+            _flatten_dict(value, prefix=(prefix + [key]), result=result)
 
     return result
 
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index d1b7c0802f..d7dcb2de4b 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 
 from httppusher import HttpPusher
 from synapse.push import PusherConfigException
+from synapse.util.logcontext import preserve_fn
 
 import logging
 
@@ -76,7 +77,7 @@ class PusherPool:
                     "Removing pusher for app id %s, pushkey %s, user %s",
                     app_id, pushkey, p['user_name']
                 )
-                self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+                yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
 
     @defer.inlineCallbacks
     def remove_pushers_by_user(self, user_id):
@@ -91,7 +92,7 @@ class PusherPool:
                     "Removing pusher for app id %s, pushkey %s, user %s",
                     p['app_id'], p['pushkey'], p['user_name']
                 )
-                self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+                yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
 
     @defer.inlineCallbacks
     def _add_pusher_to_store(self, user_id, access_token, profile_tag, kind,
@@ -110,7 +111,7 @@ class PusherPool:
             lang=lang,
             data=data,
         )
-        self._refresh_pusher(app_id, pushkey, user_id)
+        yield self._refresh_pusher(app_id, pushkey, user_id)
 
     def _create_pusher(self, pusherdict):
         if pusherdict['kind'] == 'http':
@@ -166,7 +167,7 @@ class PusherPool:
                 if fullid in self.pushers:
                     self.pushers[fullid].stop()
                 self.pushers[fullid] = p
-                p.start()
+                preserve_fn(p.start)()
 
         logger.info("Started pushers")