diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 76d7eb7ce0..7f94591dcb 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -70,11 +70,17 @@ def _get_rules(room_id, user_ids, store):
@defer.inlineCallbacks
def evaluator_for_room_id(room_id, hs, store):
- results = yield store.get_receipts_for_room(room_id, "m.read")
- user_ids = [
- row["user_id"] for row in results
- if hs.is_mine_id(row["user_id"])
- ]
+ users_with_pushers = yield store.get_users_with_pushers_in_room(room_id)
+ receipts = yield store.get_receipts_for_room(room_id, "m.read")
+
+ # any users with pushers must be ours: they have pushers
+ user_ids = set(users_with_pushers)
+ for r in receipts:
+ if hs.is_mine_id(r['user_id']):
+ user_ids.add(r['user_id'])
+
+ user_ids = list(user_ids)
+
rules_by_user = yield _get_rules(room_id, user_ids, store)
defer.returnValue(BulkPushRuleEvaluator(
@@ -101,10 +107,15 @@ class BulkPushRuleEvaluator:
def action_for_event_by_user(self, event, handler, current_state):
actions_by_user = {}
- users_dict = yield self.store.are_guests(self.rules_by_user.keys())
+ # None of these users can be peeking since this list of users comes
+ # from the set of users in the room, so we know for sure they're all
+ # actually in the room.
+ user_tuples = [
+ (u, False) for u in self.rules_by_user.keys()
+ ]
filtered_by_user = yield handler.filter_events_for_clients(
- users_dict.items(), [event], {event.event_id: current_state}
+ user_tuples, [event], {event.event_id: current_state}
)
room_members = yield self.store.get_users_in_room(self.room_id)
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 9be4869360..d695885649 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -13,60 +13,188 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.push import Pusher, PusherConfigException
+from synapse.push import PusherConfigException
-from twisted.internet import defer
+from twisted.internet import defer, reactor
import logging
+import push_rule_evaluator
+import push_tools
logger = logging.getLogger(__name__)
-class HttpPusher(Pusher):
- def __init__(self, _hs, user_id, app_id,
- app_display_name, device_display_name, pushkey, pushkey_ts,
- data, last_token, last_success, failing_since):
- super(HttpPusher, self).__init__(
- _hs,
- user_id,
- app_id,
- app_display_name,
- device_display_name,
- pushkey,
- pushkey_ts,
- data,
- last_token,
- last_success,
- failing_since
+class HttpPusher(object):
+ INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes
+ MAX_BACKOFF_SEC = 60 * 60
+
+ # This one's in ms because we compare it against the clock
+ GIVE_UP_AFTER_MS = 24 * 60 * 60 * 1000
+
+ def __init__(self, hs, pusherdict):
+ self.hs = hs
+ self.store = self.hs.get_datastore()
+ self.clock = self.hs.get_clock()
+ self.user_id = pusherdict['user_name']
+ self.app_id = pusherdict['app_id']
+ self.app_display_name = pusherdict['app_display_name']
+ self.device_display_name = pusherdict['device_display_name']
+ self.pushkey = pusherdict['pushkey']
+ self.pushkey_ts = pusherdict['ts']
+ self.data = pusherdict['data']
+ self.last_stream_ordering = pusherdict['last_stream_ordering']
+ self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
+ self.failing_since = pusherdict['failing_since']
+ self.timed_call = None
+
+ # This is the highest stream ordering we know it's safe to process.
+ # When new events arrive, we'll be given a window of new events: we
+ # should honour this rather than just looking for anything higher
+ # because of potential out-of-order event serialisation. This starts
+ # off as None though as we don't know any better.
+ self.max_stream_ordering = None
+
+ if 'data' not in pusherdict:
+ raise PusherConfigException(
+ "No 'data' key for HTTP pusher"
+ )
+ self.data = pusherdict['data']
+
+ self.name = "%s/%s/%s" % (
+ pusherdict['user_name'],
+ pusherdict['app_id'],
+ pusherdict['pushkey'],
)
- if 'url' not in data:
+
+ if 'url' not in self.data:
raise PusherConfigException(
"'url' required in data for HTTP pusher"
)
- self.url = data['url']
- self.http_client = _hs.get_simple_http_client()
+ self.url = self.data['url']
+ self.http_client = hs.get_simple_http_client()
self.data_minus_url = {}
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
+ def on_started(self):
+ self._process()
+
+ def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
+ self.max_stream_ordering = max_stream_ordering
+ self._process()
+
+ def on_timer(self):
+ self._process()
+
+ def on_stop(self):
+ if self.timed_call:
+ self.timed_call.cancel()
+
@defer.inlineCallbacks
- def _build_notification_dict(self, event, tweaks, badge):
- # we probably do not want to push for every presence update
- # (we may want to be able to set up notifications when specific
- # people sign in, but we'd want to only deliver the pertinent ones)
- # Actually, presence events will not get this far now because we
- # need to filter them out in the main Pusher code.
- if 'event_id' not in event:
- defer.returnValue(None)
+ def _process(self):
+ unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
+ self.user_id, self.last_stream_ordering, self.max_stream_ordering
+ )
+
+ for push_action in unprocessed:
+ processed = yield self._process_one(push_action)
+ if processed:
+ self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
+ self.last_stream_ordering = push_action['stream_ordering']
+ self.store.update_pusher_last_stream_ordering_and_success(
+ self.app_id, self.pushkey, self.user_id,
+ self.last_stream_ordering,
+ self.clock.time_msec()
+ )
+ self.failing_since = None
+ yield self.store.update_pusher_failing_since(
+ self.app_id, self.pushkey, self.user_id,
+ self.failing_since
+ )
+ else:
+ self.failing_since = self.clock.time_msec()
+ yield self.store.update_pusher_failing_since(
+ self.app_id, self.pushkey, self.user_id,
+ self.failing_since
+ )
+
+ if (
+ self.failing_since and
+ self.failing_since <
+ self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER
+ ):
+ # we really only give up so that if the URL gets
+ # fixed, we don't suddenly deliver a load
+ # of old notifications.
+ logger.warn("Giving up on a notification to user %s, "
+ "pushkey %s",
+ self.user_id, self.pushkey)
+ self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
+ self.last_stream_ordering = push_action['stream_ordering']
+ yield self.store.update_pusher_last_stream_ordering(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.last_stream_ordering
+ )
+
+ self.failing_since = None
+ yield self.store.update_pusher_failing_since(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.failing_since
+ )
+ else:
+ logger.info("Push failed: delaying for %ds", self.backoff_delay)
+ self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
+ self.backoff_delay = min(self.backoff_delay, self.MAX_BACKOFF_SEC)
+ break
+
+ @defer.inlineCallbacks
+ def _process_one(self, push_action):
+ if 'notify' not in push_action['actions']:
+ defer.returnValue(True)
- ctx = yield self.get_context_for_event(event)
+ tweaks = push_rule_evaluator.PushRuleEvaluator.tweaks_for_actions(push_action['actions'])
+ badge = yield push_tools.get_badge_count(self.hs, self.user_id)
+
+ event = yield self.store.get_event(push_action['event_id'], allow_none=True)
+ if event is None:
+ defer.returnValue(True) # It's been redacted
+ rejected = yield self.dispatch_push(event, tweaks, badge)
+ if rejected is False:
+ defer.returnValue(False)
+
+ if isinstance(rejected, list) or isinstance(rejected, tuple):
+ for pk in rejected:
+ if pk != self.pushkey:
+ # for sanity, we only remove the pushkey if it
+ # was the one we actually sent...
+ logger.warn(
+ ("Ignoring rejected pushkey %s because we"
+ " didn't send it"), pk
+ )
+ else:
+ logger.info(
+ "Pushkey %s was rejected: removing",
+ pk
+ )
+ yield self.hs.get_pusherpool().remove_pusher(
+ self.app_id, pk, self.user_id
+ )
+ defer.returnValue(True)
+
+ @defer.inlineCallbacks
+ def _build_notification_dict(self, event, tweaks, badge):
+ ctx = yield push_tools.get_context_for_event(self.hs, event)
d = {
'notification': {
- 'id': event['event_id'],
- 'room_id': event['room_id'],
- 'type': event['type'],
- 'sender': event['user_id'],
+ 'id': event.event_id,
+ 'room_id': event.room_id,
+ 'type': event.type,
+ 'sender': event.user_id,
'counts': { # -- we don't mark messages as read yet so
# we have no way of knowing
# Just set the badge to 1 until we have read receipts
@@ -84,11 +212,11 @@ class HttpPusher(Pusher):
]
}
}
- if event['type'] == 'm.room.member':
- d['notification']['membership'] = event['content']['membership']
- d['notification']['user_is_target'] = event['state_key'] == self.user_id
+ if event.type == 'm.room.member':
+ d['notification']['membership'] = event.content['membership']
+ d['notification']['user_is_target'] = event.state_key == self.user_id
if 'content' in event:
- d['notification']['content'] = event['content']
+ d['notification']['content'] = event.content
if len(ctx['aliases']):
d['notification']['room_alias'] = ctx['aliases'][0]
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
new file mode 100644
index 0000000000..e1e61e49e8
--- /dev/null
+++ b/synapse/push/push_tools.py
@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+
+@defer.inlineCallbacks
+def get_badge_count(hs, user_id):
+ invites, joins = yield defer.gatherResults([
+ hs.get_datastore().get_invited_rooms_for_user(user_id),
+ hs.get_datastore().get_rooms_for_user(user_id),
+ ], consumeErrors=True)
+
+ my_receipts_by_room = yield hs.get_datastore().get_receipts_for_user(
+ user_id, "m.read",
+ )
+
+ badge = len(invites)
+
+ for r in joins:
+ if r.room_id in my_receipts_by_room:
+ last_unread_event_id = my_receipts_by_room[r.room_id]
+
+ notifs = yield (
+ hs.get_datastore().get_unread_event_push_actions_by_room_for_user(
+ r.room_id, user_id, last_unread_event_id
+ )
+ )
+ badge += notifs["notify_count"]
+ defer.returnValue(badge)
+
+
+@defer.inlineCallbacks
+def get_context_for_event(hs, ev):
+ name_aliases = yield hs.get_datastore().get_room_name_and_aliases(
+ ev.room_id
+ )
+
+ ctx = {'aliases': name_aliases[1]}
+ if name_aliases[0] is not None:
+ ctx['name'] = name_aliases[0]
+
+ their_member_events_for_room = yield hs.get_datastore().get_current_state(
+ room_id=ev.room_id,
+ event_type='m.room.member',
+ state_key=ev.user_id
+ )
+ for mev in their_member_events_for_room:
+ if mev.content['membership'] == 'join' and 'displayname' in mev.content:
+ dn = mev.content['displayname']
+ if dn is not None:
+ ctx['sender_display_name'] = dn
+
+ defer.returnValue(ctx)
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
new file mode 100644
index 0000000000..4960837504
--- /dev/null
+++ b/synapse/push/pusher.py
@@ -0,0 +1,10 @@
+from httppusher import HttpPusher
+
+PUSHER_TYPES = {
+ 'http': HttpPusher
+}
+
+
+def create_pusher(hs, pusherdict):
+ if pusherdict['kind'] in PUSHER_TYPES:
+ return PUSHER_TYPES[pusherdict['kind']](hs, pusherdict)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 0b463c6fdb..b67ad455ea 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -16,9 +16,10 @@
from twisted.internet import defer
-from .httppusher import HttpPusher
+import pusher
from synapse.push import PusherConfigException
from synapse.util.logcontext import preserve_fn
+from synapse.util.async import run_on_reactor
import logging
@@ -48,7 +49,7 @@ class PusherPool:
# will then get pulled out of the database,
# recreated, added and started: this means we have only one
# code path adding pushers.
- self._create_pusher({
+ pusher.create_pusher(self.hs, {
"user_name": user_id,
"kind": kind,
"app_id": app_id,
@@ -58,10 +59,18 @@ class PusherPool:
"ts": time_now_msec,
"lang": lang,
"data": data,
- "last_token": None,
+ "last_stream_ordering": None,
"last_success": None,
"failing_since": None
})
+
+ # create the pusher setting last_stream_ordering to the current maximum
+ # stream ordering in event_push_actions, so it will process
+ # pushes from this point onwards.
+ last_stream_ordering = (
+ yield self.store.get_latest_push_action_stream_ordering()
+ )
+
yield self.store.add_pusher(
user_id=user_id,
access_token=access_token,
@@ -73,6 +82,7 @@ class PusherPool:
pushkey_ts=time_now_msec,
lang=lang,
data=data,
+ last_stream_ordering=last_stream_ordering,
profile_tag=profile_tag,
)
yield self._refresh_pusher(app_id, pushkey, user_id)
@@ -106,26 +116,19 @@ class PusherPool:
)
yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
- def _create_pusher(self, pusherdict):
- if pusherdict['kind'] == 'http':
- return HttpPusher(
- self.hs,
- user_id=pusherdict['user_name'],
- app_id=pusherdict['app_id'],
- app_display_name=pusherdict['app_display_name'],
- device_display_name=pusherdict['device_display_name'],
- pushkey=pusherdict['pushkey'],
- pushkey_ts=pusherdict['ts'],
- data=pusherdict['data'],
- last_token=pusherdict['last_token'],
- last_success=pusherdict['last_success'],
- failing_since=pusherdict['failing_since']
- )
- else:
- raise PusherConfigException(
- "Unknown pusher type '%s' for user %s" %
- (pusherdict['kind'], pusherdict['user_name'])
+ @defer.inlineCallbacks
+ def on_new_notifications(self, min_stream_id, max_stream_id):
+ yield run_on_reactor()
+ try:
+ users_affected = yield self.store.get_push_action_users_in_range(
+ min_stream_id, max_stream_id
)
+ for u in users_affected:
+ if u in self.pushers:
+ for p in self.pushers[u].values():
+ p.on_new_notifications(min_stream_id, max_stream_id)
+ except:
+ logger.exception("Exception in pusher on_new_notifications")
@defer.inlineCallbacks
def _refresh_pusher(self, app_id, pushkey, user_id):
@@ -146,30 +149,34 @@ class PusherPool:
logger.info("Starting %d pushers", len(pushers))
for pusherdict in pushers:
try:
- p = self._create_pusher(pusherdict)
+ p = pusher.create_pusher(self.hs, pusherdict)
except PusherConfigException:
logger.exception("Couldn't start a pusher: caught PusherConfigException")
continue
if p:
- fullid = "%s:%s:%s" % (
+ appid_pushkey = "%s:%s" % (
pusherdict['app_id'],
pusherdict['pushkey'],
- pusherdict['user_name']
)
- if fullid in self.pushers:
- self.pushers[fullid].stop()
- self.pushers[fullid] = p
- preserve_fn(p.start)()
+ byuser = self.pushers.setdefault(pusherdict['user_name'], {})
+
+ if appid_pushkey in byuser:
+ byuser[appid_pushkey].on_stop()
+ byuser[appid_pushkey] = p
+ preserve_fn(p.on_started)()
logger.info("Started pushers")
@defer.inlineCallbacks
def remove_pusher(self, app_id, pushkey, user_id):
- fullid = "%s:%s:%s" % (app_id, pushkey, user_id)
- if fullid in self.pushers:
- logger.info("Stopping pusher %s", fullid)
- self.pushers[fullid].stop()
- del self.pushers[fullid]
+ appid_pushkey = "%s:%s" % (app_id, pushkey)
+
+ byuser = self.pushers.get(user_id, {})
+
+ if appid_pushkey in byuser:
+ logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
+ byuser[appid_pushkey].on_stop()
+ del byuser[appid_pushkey]
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id, pushkey, user_id
)
|