diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 9bc0b356f4..8da2d8716c 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -17,6 +17,8 @@ from twisted.internet import defer
from synapse.streams.config import PaginationConfig
from synapse.types import StreamToken
+from synapse.util.logcontext import LoggingContext
+from synapse.util.metrics import Measure
import synapse.util.async
import push_rule_evaluator as push_rule_evaluator
@@ -27,6 +29,16 @@ import random
logger = logging.getLogger(__name__)
+_NEXT_ID = 1
+
+
+def _get_next_id():
+ global _NEXT_ID
+ _id = _NEXT_ID
+ _NEXT_ID += 1
+ return _id
+
+
# Pushers could now be moved to pull out of the event_push_actions table instead
# of listening on the event stream: this would avoid them having to run the
# rules again.
@@ -57,6 +69,8 @@ class Pusher(object):
self.alive = True
self.badge = None
+ self.name = "Pusher-%d" % (_get_next_id(),)
+
# The last value of last_active_time that we saw
self.last_last_active_time = 0
self.has_unread = True
@@ -86,38 +100,46 @@ class Pusher(object):
@defer.inlineCallbacks
def start(self):
- if not self.last_token:
- # First-time setup: get a token to start from (we can't
- # just start from no token, ie. 'now'
- # because we need the result to be reproduceable in case
- # we fail to dispatch the push)
- config = PaginationConfig(from_token=None, limit='1')
- chunk = yield self.evStreamHandler.get_stream(
- self.user_id, config, timeout=0, affect_presence=False
- )
- self.last_token = chunk['end']
- self.store.update_pusher_last_token(
- self.app_id, self.pushkey, self.user_id, self.last_token
- )
- logger.info("Pusher %s for user %s starting from token %s",
- self.pushkey, self.user_id, self.last_token)
-
- wait = 0
- while self.alive:
- try:
- if wait > 0:
- yield synapse.util.async.sleep(wait)
- yield self.get_and_dispatch()
- wait = 0
- except:
- if wait == 0:
- wait = 1
- else:
- wait = min(wait * 2, 1800)
- logger.exception(
- "Exception in pusher loop for pushkey %s. Pausing for %ds",
- self.pushkey, wait
+ with LoggingContext(self.name):
+ if not self.last_token:
+ # First-time setup: get a token to start from (we can't
+ # just start from no token, ie. 'now'
+ # because we need the result to be reproduceable in case
+ # we fail to dispatch the push)
+ config = PaginationConfig(from_token=None, limit='1')
+ chunk = yield self.evStreamHandler.get_stream(
+ self.user_id, config, timeout=0, affect_presence=False
)
+ self.last_token = chunk['end']
+ yield self.store.update_pusher_last_token(
+ self.app_id, self.pushkey, self.user_id, self.last_token
+ )
+ logger.info("New pusher %s for user %s starting from token %s",
+ self.pushkey, self.user_id, self.last_token)
+
+ else:
+ logger.info(
+ "Old pusher %s for user %s starting",
+ self.pushkey, self.user_id,
+ )
+
+ wait = 0
+ while self.alive:
+ try:
+ if wait > 0:
+ yield synapse.util.async.sleep(wait)
+ with Measure(self.clock, "push"):
+ yield self.get_and_dispatch()
+ wait = 0
+ except:
+ if wait == 0:
+ wait = 1
+ else:
+ wait = min(wait * 2, 1800)
+ logger.exception(
+ "Exception in pusher loop for pushkey %s. Pausing for %ds",
+ self.pushkey, wait
+ )
@defer.inlineCallbacks
def get_and_dispatch(self):
@@ -316,7 +338,7 @@ class Pusher(object):
r.room_id, self.user_id, last_unread_event_id
)
)
- badge += len(notifs)
+ badge += notifs["notify_count"]
defer.returnValue(badge)
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index 1d2e558f9a..e0da0868ec 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -19,8 +19,6 @@ import bulk_push_rule_evaluator
import logging
-from synapse.api.constants import EventTypes
-
logger = logging.getLogger(__name__)
@@ -36,21 +34,15 @@ class ActionGenerator:
# tag (ie. we just need all the users).
@defer.inlineCallbacks
- def handle_push_actions_for_event(self, event, handler):
- if event.type == EventTypes.Redaction and event.redacts is not None:
- yield self.store.remove_push_actions_for_event_id(
- event.room_id, event.redacts
- )
-
+ def handle_push_actions_for_event(self, event, context, handler):
bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id(
event.room_id, self.hs, self.store
)
- actions_by_user = yield bulk_evaluator.action_for_event_by_user(event, handler)
-
- yield self.store.set_push_actions_for_event_and_users(
- event,
- [
- (uid, None, actions) for uid, actions in actions_by_user.items()
- ]
+ actions_by_user = yield bulk_evaluator.action_for_event_by_user(
+ event, handler, context.current_state
)
+
+ context.push_actions = [
+ (uid, None, actions) for uid, actions in actions_by_user.items()
+ ]
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 20c60422bf..8ac5ceb9ef 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -98,25 +98,21 @@ class BulkPushRuleEvaluator:
self.store = store
@defer.inlineCallbacks
- def action_for_event_by_user(self, event, handler):
+ def action_for_event_by_user(self, event, handler, current_state):
actions_by_user = {}
users_dict = yield self.store.are_guests(self.rules_by_user.keys())
filtered_by_user = yield handler._filter_events_for_clients(
- users_dict.items(), [event]
+ users_dict.items(), [event], {event.event_id: current_state}
)
evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room))
condition_cache = {}
- member_state = yield self.store.get_state_for_event(
- event.event_id,
- )
-
display_names = {}
- for ev in member_state.values():
+ for ev in current_state.values():
nm = ev.content.get("displayname", None)
if nm and ev.type == EventTypes.Member:
display_names[ev.state_key] = nm
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index dca018af95..2a2b4437dc 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -304,7 +304,7 @@ def _flatten_dict(d, prefix=[], result={}):
if isinstance(value, basestring):
result[".".join(prefix + [key])] = value.lower()
elif hasattr(value, "items"):
- _flatten_dict(value, prefix=(prefix+[key]), result=result)
+ _flatten_dict(value, prefix=(prefix + [key]), result=result)
return result
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index d1b7c0802f..d7dcb2de4b 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
from httppusher import HttpPusher
from synapse.push import PusherConfigException
+from synapse.util.logcontext import preserve_fn
import logging
@@ -76,7 +77,7 @@ class PusherPool:
"Removing pusher for app id %s, pushkey %s, user %s",
app_id, pushkey, p['user_name']
)
- self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+ yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
@defer.inlineCallbacks
def remove_pushers_by_user(self, user_id):
@@ -91,7 +92,7 @@ class PusherPool:
"Removing pusher for app id %s, pushkey %s, user %s",
p['app_id'], p['pushkey'], p['user_name']
)
- self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+ yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
@defer.inlineCallbacks
def _add_pusher_to_store(self, user_id, access_token, profile_tag, kind,
@@ -110,7 +111,7 @@ class PusherPool:
lang=lang,
data=data,
)
- self._refresh_pusher(app_id, pushkey, user_id)
+ yield self._refresh_pusher(app_id, pushkey, user_id)
def _create_pusher(self, pusherdict):
if pusherdict['kind'] == 'http':
@@ -166,7 +167,7 @@ class PusherPool:
if fullid in self.pushers:
self.pushers[fullid].stop()
self.pushers[fullid] = p
- p.start()
+ preserve_fn(p.start)()
logger.info("Started pushers")
|