diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 88d8b9ba54..aaf6b1b837 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -21,7 +21,7 @@ from synapse.api.constants import Membership, EventTypes
from synapse.types import UserID, RoomAlias, Requester
from synapse.push.action_generator import ActionGenerator
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
import logging
@@ -406,6 +406,12 @@ class BaseHandler(object):
event, context=context
)
+ # this intentionally does not yield: we don't care about the result
+ # and don't need to wait for it.
+ preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+ event_stream_id, max_stream_id
+ )
+
destinations = set()
for k, s in context.current_state.items():
try:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c28226f840..83dab32bcb 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -26,7 +26,7 @@ from synapse.api.errors import (
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.util.frozenutils import unfreeze
@@ -1094,6 +1094,12 @@ class FederationHandler(BaseHandler):
context=context,
)
+ # this intentionally does not yield: we don't care about the result
+ # and don't need to wait for it.
+ preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+ event_stream_id, max_stream_id
+ )
+
defer.returnValue((context, event_stream_id, max_stream_id))
@defer.inlineCallbacks
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 935c339707..a390a1b8bd 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -80,6 +80,9 @@ class ReceiptsHandler(BaseHandler):
def _handle_new_receipts(self, receipts):
"""Takes a list of receipts, stores them and informs the notifier.
"""
+ min_batch_id = None
+ max_batch_id = None
+
for receipt in receipts:
room_id = receipt["room_id"]
receipt_type = receipt["receipt_type"]
@@ -97,10 +100,21 @@ class ReceiptsHandler(BaseHandler):
stream_id, max_persisted_id = res
- with PreserveLoggingContext():
- self.notifier.on_new_event(
- "receipt_key", max_persisted_id, rooms=[room_id]
- )
+ if min_batch_id is None or stream_id < min_batch_id:
+ min_batch_id = stream_id
+ if max_batch_id is None or max_persisted_id > max_batch_id:
+ max_batch_id = max_persisted_id
+
+ affected_room_ids = list(set([r["room_id"] for r in receipts]))
+
+ with PreserveLoggingContext():
+ self.notifier.on_new_event(
+ "receipt_key", max_batch_id, rooms=affected_room_ids
+ )
+ # Note that the min here shouldn't be relied upon to be accurate.
+ self.hs.get_pusherpool().on_new_receipts(
+ min_batch_id, max_batch_id, affected_room_ids
+ )
defer.returnValue(True)
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 296c4447ec..edf45dc599 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -13,333 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
-
-from synapse.streams.config import PaginationConfig
-from synapse.types import StreamToken
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
-
-import synapse.util.async
-from .push_rule_evaluator import evaluator_for_user_id
-
-import logging
-import random
-
-logger = logging.getLogger(__name__)
-
-
-_NEXT_ID = 1
-
-
-def _get_next_id():
- global _NEXT_ID
- _id = _NEXT_ID
- _NEXT_ID += 1
- return _id
-
-
-# Pushers could now be moved to pull out of the event_push_actions table instead
-# of listening on the event stream: this would avoid them having to run the
-# rules again.
-class Pusher(object):
- INITIAL_BACKOFF = 1000
- MAX_BACKOFF = 60 * 60 * 1000
- GIVE_UP_AFTER = 24 * 60 * 60 * 1000
-
- def __init__(self, _hs, user_id, app_id,
- app_display_name, device_display_name, pushkey, pushkey_ts,
- data, last_token, last_success, failing_since):
- self.hs = _hs
- self.evStreamHandler = self.hs.get_handlers().event_stream_handler
- self.store = self.hs.get_datastore()
- self.clock = self.hs.get_clock()
- self.user_id = user_id
- self.app_id = app_id
- self.app_display_name = app_display_name
- self.device_display_name = device_display_name
- self.pushkey = pushkey
- self.pushkey_ts = pushkey_ts
- self.data = data
- self.last_token = last_token
- self.last_success = last_success # not actually used
- self.backoff_delay = Pusher.INITIAL_BACKOFF
- self.failing_since = failing_since
- self.alive = True
- self.badge = None
-
- self.name = "Pusher-%d" % (_get_next_id(),)
-
- # The last value of last_active_time that we saw
- self.last_last_active_time = 0
- self.has_unread = True
-
- @defer.inlineCallbacks
- def get_context_for_event(self, ev):
- name_aliases = yield self.store.get_room_name_and_aliases(
- ev['room_id']
- )
-
- ctx = {'aliases': name_aliases[1]}
- if name_aliases[0] is not None:
- ctx['name'] = name_aliases[0]
-
- their_member_events_for_room = yield self.store.get_current_state(
- room_id=ev['room_id'],
- event_type='m.room.member',
- state_key=ev['user_id']
- )
- for mev in their_member_events_for_room:
- if mev.content['membership'] == 'join' and 'displayname' in mev.content:
- dn = mev.content['displayname']
- if dn is not None:
- ctx['sender_display_name'] = dn
-
- defer.returnValue(ctx)
-
- @defer.inlineCallbacks
- def start(self):
- with LoggingContext(self.name):
- if not self.last_token:
- # First-time setup: get a token to start from (we can't
- # just start from no token, ie. 'now'
- # because we need the result to be reproduceable in case
- # we fail to dispatch the push)
- config = PaginationConfig(from_token=None, limit='1')
- chunk = yield self.evStreamHandler.get_stream(
- self.user_id, config, timeout=0, affect_presence=False
- )
- self.last_token = chunk['end']
- yield self.store.update_pusher_last_token(
- self.app_id, self.pushkey, self.user_id, self.last_token
- )
- logger.info("New pusher %s for user %s starting from token %s",
- self.pushkey, self.user_id, self.last_token)
-
- else:
- logger.info(
- "Old pusher %s for user %s starting",
- self.pushkey, self.user_id,
- )
-
- wait = 0
- while self.alive:
- try:
- if wait > 0:
- yield synapse.util.async.sleep(wait)
- with Measure(self.clock, "push"):
- yield self.get_and_dispatch()
- wait = 0
- except:
- if wait == 0:
- wait = 1
- else:
- wait = min(wait * 2, 1800)
- logger.exception(
- "Exception in pusher loop for pushkey %s. Pausing for %ds",
- self.pushkey, wait
- )
-
- @defer.inlineCallbacks
- def get_and_dispatch(self):
- from_tok = StreamToken.from_string(self.last_token)
- config = PaginationConfig(from_token=from_tok, limit='1')
- timeout = (300 + random.randint(-60, 60)) * 1000
- chunk = yield self.evStreamHandler.get_stream(
- self.user_id, config, timeout=timeout, affect_presence=False,
- only_keys=("room", "receipt",),
- )
-
- # limiting to 1 may get 1 event plus 1 presence event, so
- # pick out the actual event
- single_event = None
- read_receipt = None
- for c in chunk['chunk']:
- if 'event_id' in c: # Hmmm...
- single_event = c
- elif c['type'] == 'm.receipt':
- read_receipt = c
-
- have_updated_badge = False
- if read_receipt:
- for receipt_part in read_receipt['content'].values():
- if 'm.read' in receipt_part:
- if self.user_id in receipt_part['m.read'].keys():
- have_updated_badge = True
-
- if not single_event:
- if have_updated_badge:
- yield self.update_badge()
- self.last_token = chunk['end']
- yield self.store.update_pusher_last_token(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.last_token
- )
- return
-
- if not self.alive:
- return
-
- processed = False
-
- rule_evaluator = yield \
- evaluator_for_user_id(
- self.user_id, single_event['room_id'], self.store
- )
-
- actions = yield rule_evaluator.actions_for_event(single_event)
- tweaks = rule_evaluator.tweaks_for_actions(actions)
-
- if 'notify' in actions:
- self.badge = yield self._get_badge_count()
- rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
- self.has_unread = True
- if isinstance(rejected, list) or isinstance(rejected, tuple):
- processed = True
- for pk in rejected:
- if pk != self.pushkey:
- # for sanity, we only remove the pushkey if it
- # was the one we actually sent...
- logger.warn(
- ("Ignoring rejected pushkey %s because we"
- " didn't send it"), pk
- )
- else:
- logger.info(
- "Pushkey %s was rejected: removing",
- pk
- )
- yield self.hs.get_pusherpool().remove_pusher(
- self.app_id, pk, self.user_id
- )
- else:
- if have_updated_badge:
- yield self.update_badge()
- processed = True
-
- if not self.alive:
- return
-
- if processed:
- self.backoff_delay = Pusher.INITIAL_BACKOFF
- self.last_token = chunk['end']
- yield self.store.update_pusher_last_token_and_success(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.last_token,
- self.clock.time_msec()
- )
- if self.failing_since:
- self.failing_since = None
- yield self.store.update_pusher_failing_since(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.failing_since)
- else:
- if not self.failing_since:
- self.failing_since = self.clock.time_msec()
- yield self.store.update_pusher_failing_since(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.failing_since
- )
-
- if (self.failing_since and
- self.failing_since <
- self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
- # we really only give up so that if the URL gets
- # fixed, we don't suddenly deliver a load
- # of old notifications.
- logger.warn("Giving up on a notification to user %s, "
- "pushkey %s",
- self.user_id, self.pushkey)
- self.backoff_delay = Pusher.INITIAL_BACKOFF
- self.last_token = chunk['end']
- yield self.store.update_pusher_last_token(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.last_token
- )
-
- self.failing_since = None
- yield self.store.update_pusher_failing_since(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.failing_since
- )
- else:
- logger.warn("Failed to dispatch push for user %s "
- "(failing for %dms)."
- "Trying again in %dms",
- self.user_id,
- self.clock.time_msec() - self.failing_since,
- self.backoff_delay)
- yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
- self.backoff_delay *= 2
- if self.backoff_delay > Pusher.MAX_BACKOFF:
- self.backoff_delay = Pusher.MAX_BACKOFF
-
- def stop(self):
- self.alive = False
-
- def dispatch_push(self, p, tweaks, badge):
- """
- Overridden by implementing classes to actually deliver the notification
- Args:
- p: The event to notify for as a single event from the event stream
- Returns: If the notification was delivered, an array containing any
- pushkeys that were rejected by the push gateway.
- False if the notification could not be delivered (ie.
- should be retried).
- """
- pass
-
- @defer.inlineCallbacks
- def update_badge(self):
- new_badge = yield self._get_badge_count()
- if self.badge != new_badge:
- self.badge = new_badge
- yield self.send_badge(self.badge)
-
- def send_badge(self, badge):
- """
- Overridden by implementing classes to send an updated badge count
- """
- pass
-
- @defer.inlineCallbacks
- def _get_badge_count(self):
- invites, joins = yield defer.gatherResults([
- self.store.get_invited_rooms_for_user(self.user_id),
- self.store.get_rooms_for_user(self.user_id),
- ], consumeErrors=True)
-
- my_receipts_by_room = yield self.store.get_receipts_for_user(
- self.user_id,
- "m.read",
- )
-
- badge = len(invites)
-
- for r in joins:
- if r.room_id in my_receipts_by_room:
- last_unread_event_id = my_receipts_by_room[r.room_id]
-
- notifs = yield (
- self.store.get_unread_event_push_actions_by_room_for_user(
- r.room_id, self.user_id, last_unread_event_id
- )
- )
- badge += notifs["notify_count"]
- defer.returnValue(badge)
-
class PusherConfigException(Exception):
def __init__(self, msg):
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index 84efcdd184..59e512f507 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -15,7 +15,7 @@
from twisted.internet import defer
-from .bulk_push_rule_evaluator import evaluator_for_room_id
+from .bulk_push_rule_evaluator import evaluator_for_event
import logging
@@ -35,8 +35,8 @@ class ActionGenerator:
@defer.inlineCallbacks
def handle_push_actions_for_event(self, event, context, handler):
- bulk_evaluator = yield evaluator_for_room_id(
- event.room_id, self.hs, self.store
+ bulk_evaluator = yield evaluator_for_event(
+ event, self.hs, self.store
)
actions_by_user = yield bulk_evaluator.action_for_event_by_user(
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 6add94beeb..8a174feeaf 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -79,7 +79,7 @@ def make_base_append_rules(kind, modified_base_rules):
rules = []
if kind == 'override':
- rules = BASE_APPEND_OVRRIDE_RULES
+ rules = BASE_APPEND_OVERRIDE_RULES
elif kind == 'underride':
rules = BASE_APPEND_UNDERRIDE_RULES
elif kind == 'content':
@@ -148,7 +148,7 @@ BASE_PREPEND_OVERRIDE_RULES = [
]
-BASE_APPEND_OVRRIDE_RULES = [
+BASE_APPEND_OVERRIDE_RULES = [
{
'rule_id': 'global/override/.m.rule.suppress_notices',
'conditions': [
@@ -163,6 +163,40 @@ BASE_APPEND_OVRRIDE_RULES = [
'dont_notify',
]
},
+ # NB. .m.rule.invite_for_me must be higher prio than .m.rule.member_event
+ # otherwise invites will be matched by .m.rule.member_event
+ {
+ 'rule_id': 'global/underride/.m.rule.invite_for_me',
+ 'conditions': [
+ {
+ 'kind': 'event_match',
+ 'key': 'type',
+ 'pattern': 'm.room.member',
+ '_id': '_member',
+ },
+ {
+ 'kind': 'event_match',
+ 'key': 'content.membership',
+ 'pattern': 'invite',
+ '_id': '_invite_member',
+ },
+ {
+ 'kind': 'event_match',
+ 'key': 'state_key',
+ 'pattern_type': 'user_id'
+ },
+ ],
+ 'actions': [
+ 'notify',
+ {
+ 'set_tweak': 'sound',
+ 'value': 'default'
+ }, {
+ 'set_tweak': 'highlight',
+ 'value': False
+ }
+ ]
+ },
# Will we sometimes want to know about people joining and leaving?
# Perhaps: if so, this could be expanded upon. Seems the most usual case
# is that we don't though. We add this override rule so that even if
@@ -252,38 +286,6 @@ BASE_APPEND_UNDERRIDE_RULES = [
]
},
{
- 'rule_id': 'global/underride/.m.rule.invite_for_me',
- 'conditions': [
- {
- 'kind': 'event_match',
- 'key': 'type',
- 'pattern': 'm.room.member',
- '_id': '_member',
- },
- {
- 'kind': 'event_match',
- 'key': 'content.membership',
- 'pattern': 'invite',
- '_id': '_invite_member',
- },
- {
- 'kind': 'event_match',
- 'key': 'state_key',
- 'pattern_type': 'user_id'
- },
- ],
- 'actions': [
- 'notify',
- {
- 'set_tweak': 'sound',
- 'value': 'default'
- }, {
- 'set_tweak': 'highlight',
- 'value': False
- }
- ]
- },
- {
'rule_id': 'global/underride/.m.rule.message',
'conditions': [
{
@@ -315,7 +317,7 @@ for r in BASE_PREPEND_OVERRIDE_RULES:
r['default'] = True
BASE_RULE_IDS.add(r['rule_id'])
-for r in BASE_APPEND_OVRRIDE_RULES:
+for r in BASE_APPEND_OVERRIDE_RULES:
r['priority_class'] = PRIORITY_CLASS_MAP['override']
r['default'] = True
BASE_RULE_IDS.add(r['rule_id'])
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 76d7eb7ce0..49216f0c15 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -69,12 +69,28 @@ def _get_rules(room_id, user_ids, store):
@defer.inlineCallbacks
-def evaluator_for_room_id(room_id, hs, store):
- results = yield store.get_receipts_for_room(room_id, "m.read")
- user_ids = [
- row["user_id"] for row in results
- if hs.is_mine_id(row["user_id"])
- ]
+def evaluator_for_event(event, hs, store):
+ room_id = event.room_id
+ users_with_pushers = yield store.get_users_with_pushers_in_room(room_id)
+ receipts = yield store.get_receipts_for_room(room_id, "m.read")
+
+ # any users with pushers must be ours: they have pushers
+ user_ids = set(users_with_pushers)
+ for r in receipts:
+ if hs.is_mine_id(r['user_id']):
+ user_ids.add(r['user_id'])
+
+ # if this event is an invite event, we may need to run rules for the user
+ # who's been invited, otherwise they won't get told they've been invited
+ if event.type == 'm.room.member' and event.content['membership'] == 'invite':
+ invited_user = event.state_key
+ if invited_user and hs.is_mine_id(invited_user):
+ has_pusher = yield store.user_has_pusher(invited_user)
+ if has_pusher:
+ user_ids.add(invited_user)
+
+ user_ids = list(user_ids)
+
rules_by_user = yield _get_rules(room_id, user_ids, store)
defer.returnValue(BulkPushRuleEvaluator(
@@ -101,10 +117,15 @@ class BulkPushRuleEvaluator:
def action_for_event_by_user(self, event, handler, current_state):
actions_by_user = {}
- users_dict = yield self.store.are_guests(self.rules_by_user.keys())
+ # None of these users can be peeking since this list of users comes
+ # from the set of users in the room, so we know for sure they're all
+ # actually in the room.
+ user_tuples = [
+ (u, False) for u in self.rules_by_user.keys()
+ ]
filtered_by_user = yield handler.filter_events_for_clients(
- users_dict.items(), [event], {event.event_id: current_state}
+ user_tuples, [event], {event.event_id: current_state}
)
room_members = yield self.store.get_users_in_room(self.room_id)
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 9be4869360..57f0a69e03 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -13,60 +13,236 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.push import Pusher, PusherConfigException
+from synapse.push import PusherConfigException
-from twisted.internet import defer
+from twisted.internet import defer, reactor
import logging
+import push_rule_evaluator
+import push_tools
+
+from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
-class HttpPusher(Pusher):
- def __init__(self, _hs, user_id, app_id,
- app_display_name, device_display_name, pushkey, pushkey_ts,
- data, last_token, last_success, failing_since):
- super(HttpPusher, self).__init__(
- _hs,
- user_id,
- app_id,
- app_display_name,
- device_display_name,
- pushkey,
- pushkey_ts,
- data,
- last_token,
- last_success,
- failing_since
+class HttpPusher(object):
+ INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes
+ MAX_BACKOFF_SEC = 60 * 60
+
+ # This one's in ms because we compare it against the clock
+ GIVE_UP_AFTER_MS = 24 * 60 * 60 * 1000
+
+ def __init__(self, hs, pusherdict):
+ self.hs = hs
+ self.store = self.hs.get_datastore()
+ self.clock = self.hs.get_clock()
+ self.user_id = pusherdict['user_name']
+ self.app_id = pusherdict['app_id']
+ self.app_display_name = pusherdict['app_display_name']
+ self.device_display_name = pusherdict['device_display_name']
+ self.pushkey = pusherdict['pushkey']
+ self.pushkey_ts = pusherdict['ts']
+ self.data = pusherdict['data']
+ self.last_stream_ordering = pusherdict['last_stream_ordering']
+ self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
+ self.failing_since = pusherdict['failing_since']
+ self.timed_call = None
+ self.processing = False
+
+ # This is the highest stream ordering we know it's safe to process.
+ # When new events arrive, we'll be given a window of new events: we
+ # should honour this rather than just looking for anything higher
+ # because of potential out-of-order event serialisation. This starts
+ # off as None though as we don't know any better.
+ self.max_stream_ordering = None
+
+ if 'data' not in pusherdict:
+ raise PusherConfigException(
+ "No 'data' key for HTTP pusher"
+ )
+ self.data = pusherdict['data']
+
+ self.name = "%s/%s/%s" % (
+ pusherdict['user_name'],
+ pusherdict['app_id'],
+ pusherdict['pushkey'],
)
- if 'url' not in data:
+
+ if 'url' not in self.data:
raise PusherConfigException(
"'url' required in data for HTTP pusher"
)
- self.url = data['url']
- self.http_client = _hs.get_simple_http_client()
+ self.url = self.data['url']
+ self.http_client = hs.get_simple_http_client()
self.data_minus_url = {}
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
@defer.inlineCallbacks
- def _build_notification_dict(self, event, tweaks, badge):
- # we probably do not want to push for every presence update
- # (we may want to be able to set up notifications when specific
- # people sign in, but we'd want to only deliver the pertinent ones)
- # Actually, presence events will not get this far now because we
- # need to filter them out in the main Pusher code.
- if 'event_id' not in event:
- defer.returnValue(None)
+ def on_started(self):
+ yield self._process()
+
+ @defer.inlineCallbacks
+ def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
+ with Measure(self.clock, "push.on_new_notifications"):
+ self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
+ yield self._process()
+
+ @defer.inlineCallbacks
+ def on_new_receipts(self, min_stream_id, max_stream_id):
+ # Note that the min here shouldn't be relied upon to be accurate.
+
+ # We could check the receipts are actually m.read receipts here,
+ # but currently that's the only type of receipt anyway...
+ with Measure(self.clock, "push.on_new_receipts"):
+ badge = yield push_tools.get_badge_count(
+ self.hs.get_datastore(), self.user_id
+ )
+ yield self.send_badge(badge)
+
+ @defer.inlineCallbacks
+ def on_timer(self):
+ with Measure(self.clock, "push.on_timer"):
+ yield self._process()
+
+ def on_stop(self):
+ if self.timed_call:
+ self.timed_call.cancel()
- ctx = yield self.get_context_for_event(event)
+ @defer.inlineCallbacks
+ def _process(self):
+ if self.processing:
+ return
+ try:
+ self.processing = True
+ # if the max ordering changes while we're running _unsafe_process,
+ # call it again, and so on until we've caught up.
+ while True:
+ starting_max_ordering = self.max_stream_ordering
+ try:
+ yield self._unsafe_process()
+ except:
+ logger.exception("Exception processing notifs")
+ if self.max_stream_ordering == starting_max_ordering:
+ break
+ finally:
+ self.processing = False
+
+ @defer.inlineCallbacks
+ def _unsafe_process(self):
+ """
+ Looks for unset notifications and dispatch them, in order
+ Never call this directly: use _process which will only allow this to
+ run once per pusher.
+ """
+
+ unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
+ self.user_id, self.last_stream_ordering, self.max_stream_ordering
+ )
+
+ for push_action in unprocessed:
+ processed = yield self._process_one(push_action)
+ if processed:
+ self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
+ self.last_stream_ordering = push_action['stream_ordering']
+ self.store.update_pusher_last_stream_ordering_and_success(
+ self.app_id, self.pushkey, self.user_id,
+ self.last_stream_ordering,
+ self.clock.time_msec()
+ )
+ if self.failing_since:
+ self.failing_since = None
+ yield self.store.update_pusher_failing_since(
+ self.app_id, self.pushkey, self.user_id,
+ self.failing_since
+ )
+ else:
+ if not self.failing_since:
+ self.failing_since = self.clock.time_msec()
+ yield self.store.update_pusher_failing_since(
+ self.app_id, self.pushkey, self.user_id,
+ self.failing_since
+ )
+
+ if (
+ self.failing_since and
+ self.failing_since <
+ self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS
+ ):
+ # we really only give up so that if the URL gets
+ # fixed, we don't suddenly deliver a load
+ # of old notifications.
+ logger.warn("Giving up on a notification to user %s, "
+ "pushkey %s",
+ self.user_id, self.pushkey)
+ self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
+ self.last_stream_ordering = push_action['stream_ordering']
+ yield self.store.update_pusher_last_stream_ordering(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.last_stream_ordering
+ )
+
+ self.failing_since = None
+ yield self.store.update_pusher_failing_since(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.failing_since
+ )
+ else:
+ logger.info("Push failed: delaying for %ds", self.backoff_delay)
+ self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
+ self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
+ break
+
+ @defer.inlineCallbacks
+ def _process_one(self, push_action):
+ if 'notify' not in push_action['actions']:
+ defer.returnValue(True)
+
+ tweaks = push_rule_evaluator.tweaks_for_actions(push_action['actions'])
+ badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
+
+ event = yield self.store.get_event(push_action['event_id'], allow_none=True)
+ if event is None:
+ defer.returnValue(True) # It's been redacted
+ rejected = yield self.dispatch_push(event, tweaks, badge)
+ if rejected is False:
+ defer.returnValue(False)
+
+ if isinstance(rejected, list) or isinstance(rejected, tuple):
+ for pk in rejected:
+ if pk != self.pushkey:
+ # for sanity, we only remove the pushkey if it
+ # was the one we actually sent...
+ logger.warn(
+ ("Ignoring rejected pushkey %s because we"
+ " didn't send it"), pk
+ )
+ else:
+ logger.info(
+ "Pushkey %s was rejected: removing",
+ pk
+ )
+ yield self.hs.get_pusherpool().remove_pusher(
+ self.app_id, pk, self.user_id
+ )
+ defer.returnValue(True)
+
+ @defer.inlineCallbacks
+ def _build_notification_dict(self, event, tweaks, badge):
+ ctx = yield push_tools.get_context_for_event(self.hs.get_datastore(), event)
d = {
'notification': {
- 'id': event['event_id'],
- 'room_id': event['room_id'],
- 'type': event['type'],
- 'sender': event['user_id'],
+ 'id': event.event_id, # deprecated: remove soon
+ 'event_id': event.event_id,
+ 'room_id': event.room_id,
+ 'type': event.type,
+ 'sender': event.user_id,
'counts': { # -- we don't mark messages as read yet so
# we have no way of knowing
# Just set the badge to 1 until we have read receipts
@@ -84,11 +260,11 @@ class HttpPusher(Pusher):
]
}
}
- if event['type'] == 'm.room.member':
- d['notification']['membership'] = event['content']['membership']
- d['notification']['user_is_target'] = event['state_key'] == self.user_id
+ if event.type == 'm.room.member':
+ d['notification']['membership'] = event.content['membership']
+ d['notification']['user_is_target'] = event.state_key == self.user_id
if 'content' in event:
- d['notification']['content'] = event['content']
+ d['notification']['content'] = event.content
if len(ctx['aliases']):
d['notification']['room_alias'] = ctx['aliases'][0]
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index c3c2877629..4db76f18bd 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -13,12 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
-
-from .baserules import list_with_base_rules
-
import logging
-import simplejson as json
import re
from synapse.types import UserID
@@ -32,22 +27,6 @@ IS_GLOB = re.compile(r'[\?\*\[\]]')
INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
-@defer.inlineCallbacks
-def evaluator_for_user_id(user_id, room_id, store):
- rawrules = yield store.get_push_rules_for_user(user_id)
- enabled_map = yield store.get_push_rules_enabled_for_user(user_id)
- our_member_event = yield store.get_current_state(
- room_id=room_id,
- event_type='m.room.member',
- state_key=user_id,
- )
-
- defer.returnValue(PushRuleEvaluator(
- user_id, rawrules, enabled_map,
- room_id, our_member_event, store
- ))
-
-
def _room_member_count(ev, condition, room_member_count):
if 'is' not in condition:
return False
@@ -74,111 +53,14 @@ def _room_member_count(ev, condition, room_member_count):
return False
-class PushRuleEvaluator:
- DEFAULT_ACTIONS = []
-
- def __init__(self, user_id, raw_rules, enabled_map, room_id,
- our_member_event, store):
- self.user_id = user_id
- self.room_id = room_id
- self.our_member_event = our_member_event
- self.store = store
-
- rules = []
- for raw_rule in raw_rules:
- rule = dict(raw_rule)
- rule['conditions'] = json.loads(raw_rule['conditions'])
- rule['actions'] = json.loads(raw_rule['actions'])
- rules.append(rule)
-
- self.rules = list_with_base_rules(rules)
-
- self.enabled_map = enabled_map
-
- @staticmethod
- def tweaks_for_actions(actions):
- tweaks = {}
- for a in actions:
- if not isinstance(a, dict):
- continue
- if 'set_tweak' in a and 'value' in a:
- tweaks[a['set_tweak']] = a['value']
- return tweaks
-
- @defer.inlineCallbacks
- def actions_for_event(self, ev):
- """
- This should take into account notification settings that the user
- has configured both globally and per-room when we have the ability
- to do such things.
- """
- if ev['user_id'] == self.user_id:
- # let's assume you probably know about messages you sent yourself
- defer.returnValue([])
-
- room_id = ev['room_id']
-
- # get *our* member event for display name matching
- my_display_name = None
-
- if self.our_member_event:
- my_display_name = self.our_member_event[0].content.get("displayname")
-
- room_members = yield self.store.get_users_in_room(room_id)
- room_member_count = len(room_members)
-
- evaluator = PushRuleEvaluatorForEvent(ev, room_member_count)
-
- for r in self.rules:
- enabled = self.enabled_map.get(r['rule_id'], None)
- if enabled is not None and not enabled:
- continue
- elif enabled is None and not r.get("enabled", True):
- # if no override, check enabled on the rule itself
- # (may have come from a base rule)
- continue
-
- conditions = r['conditions']
- actions = r['actions']
-
- # ignore rules with no actions (we have an explict 'dont_notify')
- if len(actions) == 0:
- logger.warn(
- "Ignoring rule id %s with no actions for user %s",
- r['rule_id'], self.user_id
- )
- continue
-
- matches = True
- for c in conditions:
- matches = evaluator.matches(
- c, self.user_id, my_display_name
- )
- if not matches:
- break
-
- logger.debug(
- "Rule %s %s",
- r['rule_id'], "matches" if matches else "doesn't match"
- )
-
- if matches:
- logger.debug(
- "%s matches for user %s, event %s",
- r['rule_id'], self.user_id, ev['event_id']
- )
-
- # filter out dont_notify as we treat an empty actions list
- # as dont_notify, and this doesn't take up a row in our database
- actions = [x for x in actions if x != 'dont_notify']
-
- defer.returnValue(actions)
-
- logger.debug(
- "No rules match for user %s, event %s",
- self.user_id, ev['event_id']
- )
- defer.returnValue(PushRuleEvaluator.DEFAULT_ACTIONS)
+def tweaks_for_actions(actions):
+ tweaks = {}
+ for a in actions:
+ if not isinstance(a, dict):
+ continue
+ if 'set_tweak' in a and 'value' in a:
+ tweaks[a['set_tweak']] = a['value']
+ return tweaks
class PushRuleEvaluatorForEvent(object):
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
new file mode 100644
index 0000000000..e71d01e77d
--- /dev/null
+++ b/synapse/push/push_tools.py
@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+
+@defer.inlineCallbacks
+def get_badge_count(store, user_id):
+ invites, joins = yield defer.gatherResults([
+ store.get_invited_rooms_for_user(user_id),
+ store.get_rooms_for_user(user_id),
+ ], consumeErrors=True)
+
+ my_receipts_by_room = yield store.get_receipts_for_user(
+ user_id, "m.read",
+ )
+
+ badge = len(invites)
+
+ for r in joins:
+ if r.room_id in my_receipts_by_room:
+ last_unread_event_id = my_receipts_by_room[r.room_id]
+
+ notifs = yield (
+ store.get_unread_event_push_actions_by_room_for_user(
+ r.room_id, user_id, last_unread_event_id
+ )
+ )
+ badge += notifs["notify_count"]
+ defer.returnValue(badge)
+
+
+@defer.inlineCallbacks
+def get_context_for_event(store, ev):
+ name_aliases = yield store.get_room_name_and_aliases(
+ ev.room_id
+ )
+
+ ctx = {'aliases': name_aliases[1]}
+ if name_aliases[0] is not None:
+ ctx['name'] = name_aliases[0]
+
+ their_member_events_for_room = yield store.get_current_state(
+ room_id=ev.room_id,
+ event_type='m.room.member',
+ state_key=ev.user_id
+ )
+ for mev in their_member_events_for_room:
+ if mev.content['membership'] == 'join' and 'displayname' in mev.content:
+ dn = mev.content['displayname']
+ if dn is not None:
+ ctx['sender_display_name'] = dn
+
+ defer.returnValue(ctx)
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
new file mode 100644
index 0000000000..4960837504
--- /dev/null
+++ b/synapse/push/pusher.py
@@ -0,0 +1,10 @@
+from httppusher import HttpPusher
+
+PUSHER_TYPES = {
+ 'http': HttpPusher
+}
+
+
+def create_pusher(hs, pusherdict):
+ if pusherdict['kind'] in PUSHER_TYPES:
+ return PUSHER_TYPES[pusherdict['kind']](hs, pusherdict)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 0b463c6fdb..ba513601e7 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -16,9 +16,10 @@
from twisted.internet import defer
-from .httppusher import HttpPusher
+import pusher
from synapse.push import PusherConfigException
from synapse.util.logcontext import preserve_fn
+from synapse.util.async import run_on_reactor
import logging
@@ -48,7 +49,7 @@ class PusherPool:
# will then get pulled out of the database,
# recreated, added and started: this means we have only one
# code path adding pushers.
- self._create_pusher({
+ pusher.create_pusher(self.hs, {
"user_name": user_id,
"kind": kind,
"app_id": app_id,
@@ -58,10 +59,18 @@ class PusherPool:
"ts": time_now_msec,
"lang": lang,
"data": data,
- "last_token": None,
+ "last_stream_ordering": None,
"last_success": None,
"failing_since": None
})
+
+ # create the pusher setting last_stream_ordering to the current maximum
+ # stream ordering in event_push_actions, so it will process
+ # pushes from this point onwards.
+ last_stream_ordering = (
+ yield self.store.get_latest_push_action_stream_ordering()
+ )
+
yield self.store.add_pusher(
user_id=user_id,
access_token=access_token,
@@ -73,6 +82,7 @@ class PusherPool:
pushkey_ts=time_now_msec,
lang=lang,
data=data,
+ last_stream_ordering=last_stream_ordering,
profile_tag=profile_tag,
)
yield self._refresh_pusher(app_id, pushkey, user_id)
@@ -106,26 +116,51 @@ class PusherPool:
)
yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
- def _create_pusher(self, pusherdict):
- if pusherdict['kind'] == 'http':
- return HttpPusher(
- self.hs,
- user_id=pusherdict['user_name'],
- app_id=pusherdict['app_id'],
- app_display_name=pusherdict['app_display_name'],
- device_display_name=pusherdict['device_display_name'],
- pushkey=pusherdict['pushkey'],
- pushkey_ts=pusherdict['ts'],
- data=pusherdict['data'],
- last_token=pusherdict['last_token'],
- last_success=pusherdict['last_success'],
- failing_since=pusherdict['failing_since']
+ @defer.inlineCallbacks
+ def on_new_notifications(self, min_stream_id, max_stream_id):
+ yield run_on_reactor()
+ try:
+ users_affected = yield self.store.get_push_action_users_in_range(
+ min_stream_id, max_stream_id
)
- else:
- raise PusherConfigException(
- "Unknown pusher type '%s' for user %s" %
- (pusherdict['kind'], pusherdict['user_name'])
+
+ deferreds = []
+
+ for u in users_affected:
+ if u in self.pushers:
+ for p in self.pushers[u].values():
+ deferreds.append(
+ p.on_new_notifications(min_stream_id, max_stream_id)
+ )
+
+ yield defer.gatherResults(deferreds)
+ except:
+ logger.exception("Exception in pusher on_new_notifications")
+
+ @defer.inlineCallbacks
+ def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
+ yield run_on_reactor()
+ try:
+ # Need to subtract 1 from the minimum because the lower bound here
+ # is not inclusive
+ updated_receipts = yield self.store.get_all_updated_receipts(
+ min_stream_id - 1, max_stream_id
)
+ # This returns a tuple, user_id is at index 3
+ users_affected = set([r[3] for r in updated_receipts])
+
+ deferreds = []
+
+ for u in users_affected:
+ if u in self.pushers:
+ for p in self.pushers[u].values():
+ deferreds.append(
+ p.on_new_receipts(min_stream_id, max_stream_id)
+ )
+
+ yield defer.gatherResults(deferreds)
+ except:
+ logger.exception("Exception in pusher on_new_receipts")
@defer.inlineCallbacks
def _refresh_pusher(self, app_id, pushkey, user_id):
@@ -146,30 +181,34 @@ class PusherPool:
logger.info("Starting %d pushers", len(pushers))
for pusherdict in pushers:
try:
- p = self._create_pusher(pusherdict)
+ p = pusher.create_pusher(self.hs, pusherdict)
except PusherConfigException:
logger.exception("Couldn't start a pusher: caught PusherConfigException")
continue
if p:
- fullid = "%s:%s:%s" % (
+ appid_pushkey = "%s:%s" % (
pusherdict['app_id'],
pusherdict['pushkey'],
- pusherdict['user_name']
)
- if fullid in self.pushers:
- self.pushers[fullid].stop()
- self.pushers[fullid] = p
- preserve_fn(p.start)()
+ byuser = self.pushers.setdefault(pusherdict['user_name'], {})
+
+ if appid_pushkey in byuser:
+ byuser[appid_pushkey].on_stop()
+ byuser[appid_pushkey] = p
+ preserve_fn(p.on_started)()
logger.info("Started pushers")
@defer.inlineCallbacks
def remove_pusher(self, app_id, pushkey, user_id):
- fullid = "%s:%s:%s" % (app_id, pushkey, user_id)
- if fullid in self.pushers:
- logger.info("Stopping pusher %s", fullid)
- self.pushers[fullid].stop()
- del self.pushers[fullid]
+ appid_pushkey = "%s:%s" % (app_id, pushkey)
+
+ byuser = self.pushers.get(user_id, {})
+
+ if appid_pushkey in byuser:
+ logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
+ byuser[appid_pushkey].on_stop()
+ del byuser[appid_pushkey]
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id, pushkey, user_id
)
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 3933b6e2c5..355478957d 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -100,6 +100,54 @@ class EventPushActionsStore(SQLBaseStore):
)
defer.returnValue(ret)
+ @defer.inlineCallbacks
+ def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
+ def f(txn):
+ sql = (
+ "SELECT DISTINCT(user_id) FROM event_push_actions WHERE"
+ " stream_ordering >= ? AND stream_ordering <= ?"
+ )
+ txn.execute(sql, (min_stream_ordering, max_stream_ordering))
+ return [r[0] for r in txn.fetchall()]
+ ret = yield self.runInteraction("get_push_action_users_in_range", f)
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def get_unread_push_actions_for_user_in_range(self, user_id,
+ min_stream_ordering,
+ max_stream_ordering=None):
+ def f(txn):
+ sql = (
+ "SELECT event_id, stream_ordering, actions"
+ " FROM event_push_actions"
+ " WHERE user_id = ? AND stream_ordering > ?"
+ )
+ args = [user_id, min_stream_ordering]
+ if max_stream_ordering is not None:
+ sql += " AND stream_ordering <= ?"
+ args.append(max_stream_ordering)
+ sql += " ORDER BY stream_ordering ASC"
+ txn.execute(sql, args)
+ return txn.fetchall()
+ ret = yield self.runInteraction("get_unread_push_actions_for_user_in_range", f)
+ defer.returnValue([
+ {
+ "event_id": row[0],
+ "stream_ordering": row[1],
+ "actions": json.loads(row[2]),
+ } for row in ret
+ ])
+
+ @defer.inlineCallbacks
+ def get_latest_push_action_stream_ordering(self):
+ def f(txn):
+ txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
+ return txn.fetchone()
+ result = yield self.runInteraction(
+ "get_latest_push_action_stream_ordering", f
+ )
+ defer.returnValue(result[0] or 0)
+
def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
# Sad that we have to blow away the cache for the whole room here
txn.call_after(
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ee87a71719..308a2c9b02 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -61,6 +61,17 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
+ """
+ Write events to the database
+ Args:
+ events_and_contexts: list of tuples of (event, context)
+ backfilled: ?
+
+ Returns: Tuple of stream_orderings where the first is the minimum and
+ last is the maximum stream ordering assigned to the events when
+ persisting.
+
+ """
if not events_and_contexts:
return
@@ -191,6 +202,9 @@ class EventsStore(SQLBaseStore):
txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
+ txn.call_after(
+ self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
+ )
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index d1669c778a..19888a8e76 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -18,6 +18,8 @@ from twisted.internet import defer
from canonicaljson import encode_canonical_json
+from synapse.util.caches.descriptors import cachedInlineCallbacks
+
import logging
import simplejson as json
import types
@@ -48,6 +50,13 @@ class PusherStore(SQLBaseStore):
return rows
@defer.inlineCallbacks
+ def user_has_pusher(self, user_id):
+ ret = yield self._simple_select_one_onecol(
+ "pushers", {"user_name": user_id}, "id", allow_none=True
+ )
+ defer.returnValue(ret is not None)
+
+ @defer.inlineCallbacks
def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
def r(txn):
sql = (
@@ -107,31 +116,46 @@ class PusherStore(SQLBaseStore):
"get_all_updated_pushers", get_all_updated_pushers_txn
)
+ @cachedInlineCallbacks(num_args=1)
+ def get_users_with_pushers_in_room(self, room_id):
+ users = yield self.get_users_in_room(room_id)
+
+ result = yield self._simple_select_many_batch(
+ 'pushers', 'user_name', users, ['user_name']
+ )
+
+ defer.returnValue([r['user_name'] for r in result])
+
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
app_display_name, device_display_name,
- pushkey, pushkey_ts, lang, data, profile_tag=""):
+ pushkey, pushkey_ts, lang, data, last_stream_ordering,
+ profile_tag=""):
with self._pushers_id_gen.get_next() as stream_id:
- yield self._simple_upsert(
- "pushers",
- dict(
- app_id=app_id,
- pushkey=pushkey,
- user_name=user_id,
- ),
- dict(
- access_token=access_token,
- kind=kind,
- app_display_name=app_display_name,
- device_display_name=device_display_name,
- ts=pushkey_ts,
- lang=lang,
- data=encode_canonical_json(data),
- profile_tag=profile_tag,
- id=stream_id,
- ),
- desc="add_pusher",
- )
+ def f(txn):
+ txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+ return self._simple_upsert_txn(
+ txn,
+ "pushers",
+ {
+ "app_id": app_id,
+ "pushkey": pushkey,
+ "user_name": user_id,
+ },
+ {
+ "access_token": access_token,
+ "kind": kind,
+ "app_display_name": app_display_name,
+ "device_display_name": device_display_name,
+ "ts": pushkey_ts,
+ "lang": lang,
+ "data": encode_canonical_json(data),
+ "last_stream_ordering": last_stream_ordering,
+ "profile_tag": profile_tag,
+ "id": stream_id,
+ },
+ )
+ defer.returnValue((yield self.runInteraction("add_pusher", f)))
@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
@@ -153,22 +177,28 @@ class PusherStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def update_pusher_last_token(self, app_id, pushkey, user_id, last_token):
+ def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id,
+ last_stream_ordering):
yield self._simple_update_one(
"pushers",
{'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
- {'last_token': last_token},
- desc="update_pusher_last_token",
+ {'last_stream_ordering': last_stream_ordering},
+ desc="update_pusher_last_stream_ordering",
)
@defer.inlineCallbacks
- def update_pusher_last_token_and_success(self, app_id, pushkey, user_id,
- last_token, last_success):
+ def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey,
+ user_id,
+ last_stream_ordering,
+ last_success):
yield self._simple_update_one(
"pushers",
{'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
- {'last_token': last_token, 'last_success': last_success},
- desc="update_pusher_last_token_and_success",
+ {
+ 'last_stream_ordering': last_stream_ordering,
+ 'last_success': last_success
+ },
+ desc="update_pusher_last_stream_ordering_and_success",
)
@defer.inlineCallbacks
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 7fdd84bbdc..3b8805593e 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -390,16 +390,19 @@ class ReceiptsStore(SQLBaseStore):
}
)
- def get_all_updated_receipts(self, last_id, current_id, limit):
+ def get_all_updated_receipts(self, last_id, current_id, limit=None):
def get_all_updated_receipts_txn(txn):
sql = (
"SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
" FROM receipts_linearized"
" WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC"
- " LIMIT ?"
)
- txn.execute(sql, (last_id, current_id, limit))
+ args = [last_id, current_id]
+ if limit is not None:
+ sql += " LIMIT ?"
+ args.append(limit)
+ txn.execute(sql, args)
return txn.fetchall()
return self.runInteraction(
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 1f71773aaa..7af0cae6a5 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -20,7 +20,7 @@ from twisted.internet import defer
from synapse.api.errors import StoreError, Codes
from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
class RegistrationStore(SQLBaseStore):
@@ -319,26 +319,6 @@ class RegistrationStore(SQLBaseStore):
defer.returnValue(res if res else False)
- @cachedList(cached_method_name="is_guest", list_name="user_ids", num_args=1,
- inlineCallbacks=True)
- def are_guests(self, user_ids):
- sql = "SELECT name, is_guest FROM users WHERE name IN (%s)" % (
- ",".join("?" for _ in user_ids),
- )
-
- rows = yield self._execute(
- "are_guests", self.cursor_to_dict, sql, *user_ids
- )
-
- result = {user_id: False for user_id in user_ids}
-
- result.update({
- row["name"]: bool(row["is_guest"])
- for row in rows
- })
-
- defer.returnValue(result)
-
def _query_for_auth(self, txn, token):
sql = (
"SELECT users.name, users.is_guest, access_tokens.id as token_id"
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 77518e893f..08a54cbdd1 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -59,6 +59,9 @@ class RoomMemberStore(SQLBaseStore):
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after(
+ self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
+ )
+ txn.call_after(
self._membership_stream_cache.entity_has_changed,
event.state_key, event.internal_metadata.stream_ordering
)
diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py
new file mode 100644
index 0000000000..93367fa09e
--- /dev/null
+++ b/synapse/storage/schema/delta/31/pushers.py
@@ -0,0 +1,79 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Change the last_token to last_stream_ordering now that pushers no longer
+# listen on an event stream but instead select out of the event_push_actions
+# table.
+
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+def token_to_stream_ordering(token):
+ return int(token[1:].split('_')[0])
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+ logger.info("Porting pushers table, delta 31...")
+ cur.execute("""
+ CREATE TABLE IF NOT EXISTS pushers2 (
+ id BIGINT PRIMARY KEY,
+ user_name TEXT NOT NULL,
+ access_token BIGINT DEFAULT NULL,
+ profile_tag VARCHAR(32) NOT NULL,
+ kind VARCHAR(8) NOT NULL,
+ app_id VARCHAR(64) NOT NULL,
+ app_display_name VARCHAR(64) NOT NULL,
+ device_display_name VARCHAR(128) NOT NULL,
+ pushkey TEXT NOT NULL,
+ ts BIGINT NOT NULL,
+ lang VARCHAR(8),
+ data TEXT,
+ last_stream_ordering INTEGER,
+ last_success BIGINT,
+ failing_since BIGINT,
+ UNIQUE (app_id, pushkey, user_name)
+ )
+ """)
+ cur.execute("""SELECT
+ id, user_name, access_token, profile_tag, kind,
+ app_id, app_display_name, device_display_name,
+ pushkey, ts, lang, data, last_token, last_success,
+ failing_since
+ FROM pushers
+ """)
+ count = 0
+ for row in cur.fetchall():
+ row = list(row)
+ row[12] = token_to_stream_ordering(row[12])
+ cur.execute(database_engine.convert_param_style("""
+ INSERT into pushers2 (
+ id, user_name, access_token, profile_tag, kind,
+ app_id, app_display_name, device_display_name,
+ pushkey, ts, lang, data, last_stream_ordering, last_success,
+ failing_since
+ ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))),
+ row
+ )
+ count += 1
+ cur.execute("DROP TABLE pushers")
+ cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
+ logger.info("Moved %d pushers to new table", count)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+ pass
|