From 2a37467fa1358eb41513893efe44cbd294dca36c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 1 Apr 2016 16:08:59 +0100 Subject: Use google style doc strings. pycharm supports them so there is no need to use the other format. Might as well convert the existing strings to reduce the risk of people accidentally cargo culting the wrong doc string format. --- synapse/storage/registration.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'synapse/storage/registration.py') diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index bd4eb88a92..d46a963bb8 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -458,12 +458,15 @@ class RegistrationStore(SQLBaseStore): """ Gets the 3pid's guest access token if exists, else saves access_token. - :param medium (str): Medium of the 3pid. Must be "email". - :param address (str): 3pid address. - :param access_token (str): The access token to persist if none is - already persisted. - :param inviter_user_id (str): User ID of the inviter. - :return (deferred str): Whichever access token is persisted at the end + Args: + medium (str): Medium of the 3pid. Must be "email". + address (str): 3pid address. + access_token (str): The access token to persist if none is + already persisted. + inviter_user_id (str): User ID of the inviter. + + Returns: + deferred str: Whichever access token is persisted at the end of this function call. """ def insert(txn): -- cgit 1.5.1 From 7e2c89a37f3a5261f43b4d472b36219ac41dfb16 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Apr 2016 15:42:15 +0100 Subject: Make pushers use the event_push_actions table instead of listening on an event stream & running the rules again. Sytest passes, but remaining to do: * Make badges work again * Remove old, unused code --- synapse/handlers/_base.py | 8 +- synapse/handlers/federation.py | 8 +- synapse/push/bulk_push_rule_evaluator.py | 25 +++- synapse/push/httppusher.py | 204 +++++++++++++++++++++++------ synapse/push/push_tools.py | 66 ++++++++++ synapse/push/pusher.py | 10 ++ synapse/push/pusherpool.py | 75 ++++++----- synapse/storage/event_push_actions.py | 48 +++++++ synapse/storage/events.py | 12 ++ synapse/storage/pusher.py | 81 ++++++++---- synapse/storage/registration.py | 20 --- synapse/storage/roommember.py | 1 + synapse/storage/schema/delta/31/pushers.py | 75 +++++++++++ 13 files changed, 503 insertions(+), 130 deletions(-) create mode 100644 synapse/push/push_tools.py create mode 100644 synapse/push/pusher.py create mode 100644 synapse/storage/schema/delta/31/pushers.py (limited to 'synapse/storage/registration.py') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index c77afe7f51..9c92ea01ed 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -21,7 +21,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.types import UserID, RoomAlias, Requester from synapse.push.action_generator import ActionGenerator -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import PreserveLoggingContext, preserve_fn import logging @@ -377,6 +377,12 @@ class BaseHandler(object): event, context=context ) + # this intentionally does not yield: we don't care about the result + # and don't need to wait for it. + preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + event_stream_id, max_stream_id + ) + destinations = set() for k, s in context.current_state.items(): try: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 026ebe52be..fc5e0b0590 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -26,7 +26,7 @@ from synapse.api.errors import ( from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze @@ -1094,6 +1094,12 @@ class FederationHandler(BaseHandler): context=context, ) + # this intentionally does not yield: we don't care about the result + # and don't need to wait for it. + preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + event_stream_id, max_stream_id + ) + defer.returnValue((context, event_stream_id, max_stream_id)) @defer.inlineCallbacks diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 76d7eb7ce0..7f94591dcb 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -70,11 +70,17 @@ def _get_rules(room_id, user_ids, store): @defer.inlineCallbacks def evaluator_for_room_id(room_id, hs, store): - results = yield store.get_receipts_for_room(room_id, "m.read") - user_ids = [ - row["user_id"] for row in results - if hs.is_mine_id(row["user_id"]) - ] + users_with_pushers = yield store.get_users_with_pushers_in_room(room_id) + receipts = yield store.get_receipts_for_room(room_id, "m.read") + + # any users with pushers must be ours: they have pushers + user_ids = set(users_with_pushers) + for r in receipts: + if hs.is_mine_id(r['user_id']): + user_ids.add(r['user_id']) + + user_ids = list(user_ids) + rules_by_user = yield _get_rules(room_id, user_ids, store) defer.returnValue(BulkPushRuleEvaluator( @@ -101,10 +107,15 @@ class BulkPushRuleEvaluator: def action_for_event_by_user(self, event, handler, current_state): actions_by_user = {} - users_dict = yield self.store.are_guests(self.rules_by_user.keys()) + # None of these users can be peeking since this list of users comes + # from the set of users in the room, so we know for sure they're all + # actually in the room. + user_tuples = [ + (u, False) for u in self.rules_by_user.keys() + ] filtered_by_user = yield handler.filter_events_for_clients( - users_dict.items(), [event], {event.event_id: current_state} + user_tuples, [event], {event.event_id: current_state} ) room_members = yield self.store.get_users_in_room(self.room_id) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 9be4869360..d695885649 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -13,60 +13,188 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.push import Pusher, PusherConfigException +from synapse.push import PusherConfigException -from twisted.internet import defer +from twisted.internet import defer, reactor import logging +import push_rule_evaluator +import push_tools logger = logging.getLogger(__name__) -class HttpPusher(Pusher): - def __init__(self, _hs, user_id, app_id, - app_display_name, device_display_name, pushkey, pushkey_ts, - data, last_token, last_success, failing_since): - super(HttpPusher, self).__init__( - _hs, - user_id, - app_id, - app_display_name, - device_display_name, - pushkey, - pushkey_ts, - data, - last_token, - last_success, - failing_since +class HttpPusher(object): + INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes + MAX_BACKOFF_SEC = 60 * 60 + + # This one's in ms because we compare it against the clock + GIVE_UP_AFTER_MS = 24 * 60 * 60 * 1000 + + def __init__(self, hs, pusherdict): + self.hs = hs + self.store = self.hs.get_datastore() + self.clock = self.hs.get_clock() + self.user_id = pusherdict['user_name'] + self.app_id = pusherdict['app_id'] + self.app_display_name = pusherdict['app_display_name'] + self.device_display_name = pusherdict['device_display_name'] + self.pushkey = pusherdict['pushkey'] + self.pushkey_ts = pusherdict['ts'] + self.data = pusherdict['data'] + self.last_stream_ordering = pusherdict['last_stream_ordering'] + self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC + self.failing_since = pusherdict['failing_since'] + self.timed_call = None + + # This is the highest stream ordering we know it's safe to process. + # When new events arrive, we'll be given a window of new events: we + # should honour this rather than just looking for anything higher + # because of potential out-of-order event serialisation. This starts + # off as None though as we don't know any better. + self.max_stream_ordering = None + + if 'data' not in pusherdict: + raise PusherConfigException( + "No 'data' key for HTTP pusher" + ) + self.data = pusherdict['data'] + + self.name = "%s/%s/%s" % ( + pusherdict['user_name'], + pusherdict['app_id'], + pusherdict['pushkey'], ) - if 'url' not in data: + + if 'url' not in self.data: raise PusherConfigException( "'url' required in data for HTTP pusher" ) - self.url = data['url'] - self.http_client = _hs.get_simple_http_client() + self.url = self.data['url'] + self.http_client = hs.get_simple_http_client() self.data_minus_url = {} self.data_minus_url.update(self.data) del self.data_minus_url['url'] + def on_started(self): + self._process() + + def on_new_notifications(self, min_stream_ordering, max_stream_ordering): + self.max_stream_ordering = max_stream_ordering + self._process() + + def on_timer(self): + self._process() + + def on_stop(self): + if self.timed_call: + self.timed_call.cancel() + @defer.inlineCallbacks - def _build_notification_dict(self, event, tweaks, badge): - # we probably do not want to push for every presence update - # (we may want to be able to set up notifications when specific - # people sign in, but we'd want to only deliver the pertinent ones) - # Actually, presence events will not get this far now because we - # need to filter them out in the main Pusher code. - if 'event_id' not in event: - defer.returnValue(None) + def _process(self): + unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( + self.user_id, self.last_stream_ordering, self.max_stream_ordering + ) + + for push_action in unprocessed: + processed = yield self._process_one(push_action) + if processed: + self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC + self.last_stream_ordering = push_action['stream_ordering'] + self.store.update_pusher_last_stream_ordering_and_success( + self.app_id, self.pushkey, self.user_id, + self.last_stream_ordering, + self.clock.time_msec() + ) + self.failing_since = None + yield self.store.update_pusher_failing_since( + self.app_id, self.pushkey, self.user_id, + self.failing_since + ) + else: + self.failing_since = self.clock.time_msec() + yield self.store.update_pusher_failing_since( + self.app_id, self.pushkey, self.user_id, + self.failing_since + ) + + if ( + self.failing_since and + self.failing_since < + self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER + ): + # we really only give up so that if the URL gets + # fixed, we don't suddenly deliver a load + # of old notifications. + logger.warn("Giving up on a notification to user %s, " + "pushkey %s", + self.user_id, self.pushkey) + self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC + self.last_stream_ordering = push_action['stream_ordering'] + yield self.store.update_pusher_last_stream_ordering( + self.app_id, + self.pushkey, + self.user_id, + self.last_stream_ordering + ) + + self.failing_since = None + yield self.store.update_pusher_failing_since( + self.app_id, + self.pushkey, + self.user_id, + self.failing_since + ) + else: + logger.info("Push failed: delaying for %ds", self.backoff_delay) + self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer) + self.backoff_delay = min(self.backoff_delay, self.MAX_BACKOFF_SEC) + break + + @defer.inlineCallbacks + def _process_one(self, push_action): + if 'notify' not in push_action['actions']: + defer.returnValue(True) - ctx = yield self.get_context_for_event(event) + tweaks = push_rule_evaluator.PushRuleEvaluator.tweaks_for_actions(push_action['actions']) + badge = yield push_tools.get_badge_count(self.hs, self.user_id) + + event = yield self.store.get_event(push_action['event_id'], allow_none=True) + if event is None: + defer.returnValue(True) # It's been redacted + rejected = yield self.dispatch_push(event, tweaks, badge) + if rejected is False: + defer.returnValue(False) + + if isinstance(rejected, list) or isinstance(rejected, tuple): + for pk in rejected: + if pk != self.pushkey: + # for sanity, we only remove the pushkey if it + # was the one we actually sent... + logger.warn( + ("Ignoring rejected pushkey %s because we" + " didn't send it"), pk + ) + else: + logger.info( + "Pushkey %s was rejected: removing", + pk + ) + yield self.hs.get_pusherpool().remove_pusher( + self.app_id, pk, self.user_id + ) + defer.returnValue(True) + + @defer.inlineCallbacks + def _build_notification_dict(self, event, tweaks, badge): + ctx = yield push_tools.get_context_for_event(self.hs, event) d = { 'notification': { - 'id': event['event_id'], - 'room_id': event['room_id'], - 'type': event['type'], - 'sender': event['user_id'], + 'id': event.event_id, + 'room_id': event.room_id, + 'type': event.type, + 'sender': event.user_id, 'counts': { # -- we don't mark messages as read yet so # we have no way of knowing # Just set the badge to 1 until we have read receipts @@ -84,11 +212,11 @@ class HttpPusher(Pusher): ] } } - if event['type'] == 'm.room.member': - d['notification']['membership'] = event['content']['membership'] - d['notification']['user_is_target'] = event['state_key'] == self.user_id + if event.type == 'm.room.member': + d['notification']['membership'] = event.content['membership'] + d['notification']['user_is_target'] = event.state_key == self.user_id if 'content' in event: - d['notification']['content'] = event['content'] + d['notification']['content'] = event.content if len(ctx['aliases']): d['notification']['room_alias'] = ctx['aliases'][0] diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py new file mode 100644 index 0000000000..e1e61e49e8 --- /dev/null +++ b/synapse/push/push_tools.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + + +@defer.inlineCallbacks +def get_badge_count(hs, user_id): + invites, joins = yield defer.gatherResults([ + hs.get_datastore().get_invited_rooms_for_user(user_id), + hs.get_datastore().get_rooms_for_user(user_id), + ], consumeErrors=True) + + my_receipts_by_room = yield hs.get_datastore().get_receipts_for_user( + user_id, "m.read", + ) + + badge = len(invites) + + for r in joins: + if r.room_id in my_receipts_by_room: + last_unread_event_id = my_receipts_by_room[r.room_id] + + notifs = yield ( + hs.get_datastore().get_unread_event_push_actions_by_room_for_user( + r.room_id, user_id, last_unread_event_id + ) + ) + badge += notifs["notify_count"] + defer.returnValue(badge) + + +@defer.inlineCallbacks +def get_context_for_event(hs, ev): + name_aliases = yield hs.get_datastore().get_room_name_and_aliases( + ev.room_id + ) + + ctx = {'aliases': name_aliases[1]} + if name_aliases[0] is not None: + ctx['name'] = name_aliases[0] + + their_member_events_for_room = yield hs.get_datastore().get_current_state( + room_id=ev.room_id, + event_type='m.room.member', + state_key=ev.user_id + ) + for mev in their_member_events_for_room: + if mev.content['membership'] == 'join' and 'displayname' in mev.content: + dn = mev.content['displayname'] + if dn is not None: + ctx['sender_display_name'] = dn + + defer.returnValue(ctx) diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py new file mode 100644 index 0000000000..4960837504 --- /dev/null +++ b/synapse/push/pusher.py @@ -0,0 +1,10 @@ +from httppusher import HttpPusher + +PUSHER_TYPES = { + 'http': HttpPusher +} + + +def create_pusher(hs, pusherdict): + if pusherdict['kind'] in PUSHER_TYPES: + return PUSHER_TYPES[pusherdict['kind']](hs, pusherdict) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 0b463c6fdb..b67ad455ea 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -16,9 +16,10 @@ from twisted.internet import defer -from .httppusher import HttpPusher +import pusher from synapse.push import PusherConfigException from synapse.util.logcontext import preserve_fn +from synapse.util.async import run_on_reactor import logging @@ -48,7 +49,7 @@ class PusherPool: # will then get pulled out of the database, # recreated, added and started: this means we have only one # code path adding pushers. - self._create_pusher({ + pusher.create_pusher(self.hs, { "user_name": user_id, "kind": kind, "app_id": app_id, @@ -58,10 +59,18 @@ class PusherPool: "ts": time_now_msec, "lang": lang, "data": data, - "last_token": None, + "last_stream_ordering": None, "last_success": None, "failing_since": None }) + + # create the pusher setting last_stream_ordering to the current maximum + # stream ordering in event_push_actions, so it will process + # pushes from this point onwards. + last_stream_ordering = ( + yield self.store.get_latest_push_action_stream_ordering() + ) + yield self.store.add_pusher( user_id=user_id, access_token=access_token, @@ -73,6 +82,7 @@ class PusherPool: pushkey_ts=time_now_msec, lang=lang, data=data, + last_stream_ordering=last_stream_ordering, profile_tag=profile_tag, ) yield self._refresh_pusher(app_id, pushkey, user_id) @@ -106,26 +116,19 @@ class PusherPool: ) yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) - def _create_pusher(self, pusherdict): - if pusherdict['kind'] == 'http': - return HttpPusher( - self.hs, - user_id=pusherdict['user_name'], - app_id=pusherdict['app_id'], - app_display_name=pusherdict['app_display_name'], - device_display_name=pusherdict['device_display_name'], - pushkey=pusherdict['pushkey'], - pushkey_ts=pusherdict['ts'], - data=pusherdict['data'], - last_token=pusherdict['last_token'], - last_success=pusherdict['last_success'], - failing_since=pusherdict['failing_since'] - ) - else: - raise PusherConfigException( - "Unknown pusher type '%s' for user %s" % - (pusherdict['kind'], pusherdict['user_name']) + @defer.inlineCallbacks + def on_new_notifications(self, min_stream_id, max_stream_id): + yield run_on_reactor() + try: + users_affected = yield self.store.get_push_action_users_in_range( + min_stream_id, max_stream_id ) + for u in users_affected: + if u in self.pushers: + for p in self.pushers[u].values(): + p.on_new_notifications(min_stream_id, max_stream_id) + except: + logger.exception("Exception in pusher on_new_notifications") @defer.inlineCallbacks def _refresh_pusher(self, app_id, pushkey, user_id): @@ -146,30 +149,34 @@ class PusherPool: logger.info("Starting %d pushers", len(pushers)) for pusherdict in pushers: try: - p = self._create_pusher(pusherdict) + p = pusher.create_pusher(self.hs, pusherdict) except PusherConfigException: logger.exception("Couldn't start a pusher: caught PusherConfigException") continue if p: - fullid = "%s:%s:%s" % ( + appid_pushkey = "%s:%s" % ( pusherdict['app_id'], pusherdict['pushkey'], - pusherdict['user_name'] ) - if fullid in self.pushers: - self.pushers[fullid].stop() - self.pushers[fullid] = p - preserve_fn(p.start)() + byuser = self.pushers.setdefault(pusherdict['user_name'], {}) + + if appid_pushkey in byuser: + byuser[appid_pushkey].on_stop() + byuser[appid_pushkey] = p + preserve_fn(p.on_started)() logger.info("Started pushers") @defer.inlineCallbacks def remove_pusher(self, app_id, pushkey, user_id): - fullid = "%s:%s:%s" % (app_id, pushkey, user_id) - if fullid in self.pushers: - logger.info("Stopping pusher %s", fullid) - self.pushers[fullid].stop() - del self.pushers[fullid] + appid_pushkey = "%s:%s" % (app_id, pushkey) + + byuser = self.pushers.get(user_id, {}) + + if appid_pushkey in byuser: + logger.info("Stopping pusher %s / %s", user_id, appid_pushkey) + byuser[appid_pushkey].on_stop() + del byuser[appid_pushkey] yield self.store.delete_pusher_by_app_id_pushkey_user_id( app_id, pushkey, user_id ) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 3933b6e2c5..5f61743e34 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -100,6 +100,54 @@ class EventPushActionsStore(SQLBaseStore): ) defer.returnValue(ret) + @defer.inlineCallbacks + def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering): + def f(txn): + sql = ( + "SELECT DISTINCT(user_id) FROM event_push_actions WHERE" + " stream_ordering >= ? AND stream_ordering >= ?" + ) + txn.execute(sql, (min_stream_ordering, max_stream_ordering)) + return [r[0] for r in txn.fetchall()] + ret = yield self.runInteraction("get_push_action_users_in_range", f) + defer.returnValue(ret) + + @defer.inlineCallbacks + def get_unread_push_actions_for_user_in_range(self, user_id, + min_stream_ordering, + max_stream_ordering=None): + def f(txn): + sql = ( + "SELECT event_id, stream_ordering, actions" + " FROM event_push_actions" + " WHERE user_id = ? AND stream_ordering > ?" + ) + args = [user_id, min_stream_ordering] + if max_stream_ordering is not None: + sql += " AND stream_ordering <= ?" + args.append(max_stream_ordering) + sql += " ORDER BY stream_ordering ASC" + txn.execute(sql, args) + return txn.fetchall() + ret = yield self.runInteraction("get_unread_push_actions_for_user_in_range", f) + defer.returnValue([ + { + "event_id": row[0], + "stream_ordering": row[1], + "actions": json.loads(row[2]), + } for row in ret + ]) + + @defer.inlineCallbacks + def get_latest_push_action_stream_ordering(self): + def f(txn): + txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") + return txn.fetchone() + result = yield self.runInteraction( + "get_latest_push_action_stream_ordering", f + ) + defer.returnValue(result[0] or 0) + def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id): # Sad that we have to blow away the cache for the whole room here txn.call_after( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5d299a1132..ceae8715ce 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -61,6 +61,17 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def persist_events(self, events_and_contexts, backfilled=False): + """ + Write events to the database + Args: + events_and_contexts: list of tuples of (event, context) + backfilled: ? + + Returns: Tuple of stream_orderings where the first is the minimum and + last is the maximum stream ordering assigned to the events when + persisting. + + """ if not events_and_contexts: return @@ -191,6 +202,7 @@ class EventsStore(SQLBaseStore): txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) + txn.call_after(self.get_users_with_pushers_in_room.invalidate, (event.room_id,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,)) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index d1669c778a..f7886dd1bb 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -18,6 +18,8 @@ from twisted.internet import defer from canonicaljson import encode_canonical_json +from synapse.util.caches.descriptors import cachedInlineCallbacks + import logging import simplejson as json import types @@ -107,31 +109,46 @@ class PusherStore(SQLBaseStore): "get_all_updated_pushers", get_all_updated_pushers_txn ) + @cachedInlineCallbacks(num_args=1) + def get_users_with_pushers_in_room(self, room_id): + users = yield self.get_users_in_room(room_id) + + result = yield self._simple_select_many_batch( + 'pushers', 'user_name', users, ['user_name'] + ) + + defer.returnValue([r['user_name'] for r in result]) + @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, app_display_name, device_display_name, - pushkey, pushkey_ts, lang, data, profile_tag=""): - with self._pushers_id_gen.get_next() as stream_id: - yield self._simple_upsert( - "pushers", - dict( - app_id=app_id, - pushkey=pushkey, - user_name=user_id, - ), - dict( - access_token=access_token, - kind=kind, - app_display_name=app_display_name, - device_display_name=device_display_name, - ts=pushkey_ts, - lang=lang, - data=encode_canonical_json(data), - profile_tag=profile_tag, - id=stream_id, - ), - desc="add_pusher", - ) + pushkey, pushkey_ts, lang, data, last_stream_ordering, + profile_tag=""): + def f(txn): + txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) + with self._pushers_id_gen.get_next() as stream_id: + return self._simple_upsert_txn( + txn, + "pushers", + dict( + app_id=app_id, + pushkey=pushkey, + user_name=user_id, + ), + dict( + access_token=access_token, + kind=kind, + app_display_name=app_display_name, + device_display_name=device_display_name, + ts=pushkey_ts, + lang=lang, + data=encode_canonical_json(data), + last_stream_ordering=last_stream_ordering, + profile_tag=profile_tag, + id=stream_id, + ), + ) + defer.returnValue((yield self.runInteraction("add_pusher", f))) @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): @@ -153,22 +170,28 @@ class PusherStore(SQLBaseStore): ) @defer.inlineCallbacks - def update_pusher_last_token(self, app_id, pushkey, user_id, last_token): + def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id, + last_stream_ordering): yield self._simple_update_one( "pushers", {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, - {'last_token': last_token}, - desc="update_pusher_last_token", + {'last_stream_ordering': last_stream_ordering}, + desc="update_pusher_last_stream_ordering", ) @defer.inlineCallbacks - def update_pusher_last_token_and_success(self, app_id, pushkey, user_id, - last_token, last_success): + def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey, + user_id, + last_stream_ordering, + last_success): yield self._simple_update_one( "pushers", {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, - {'last_token': last_token, 'last_success': last_success}, - desc="update_pusher_last_token_and_success", + { + 'last_stream_ordering': last_stream_ordering, + 'last_success': last_success + }, + desc="update_pusher_last_stream_ordering_and_success", ) @defer.inlineCallbacks diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index d46a963bb8..701dd2f656 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -319,26 +319,6 @@ class RegistrationStore(SQLBaseStore): defer.returnValue(res if res else False) - @cachedList(cache=is_guest.cache, list_name="user_ids", num_args=1, - inlineCallbacks=True) - def are_guests(self, user_ids): - sql = "SELECT name, is_guest FROM users WHERE name IN (%s)" % ( - ",".join("?" for _ in user_ids), - ) - - rows = yield self._execute( - "are_guests", self.cursor_to_dict, sql, *user_ids - ) - - result = {user_id: False for user_id in user_ids} - - result.update({ - row["name"]: bool(row["is_guest"]) - for row in rows - }) - - defer.returnValue(result) - def _query_for_auth(self, txn, token): sql = ( "SELECT users.name, users.is_guest, access_tokens.id as token_id" diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 66e7a40e3c..22a690aa8d 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -58,6 +58,7 @@ class RoomMemberStore(SQLBaseStore): txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) + txn.call_after(self.get_users_with_pushers_in_room.invalidate, (event.room_id,)) txn.call_after( self._membership_stream_cache.entity_has_changed, event.state_key, event.internal_metadata.stream_ordering diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py new file mode 100644 index 0000000000..7e0e385fb5 --- /dev/null +++ b/synapse/storage/schema/delta/31/pushers.py @@ -0,0 +1,75 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Change the last_token to last_stream_ordering now that pushers no longer +# listen on an event stream but instead select out of the event_push_actions +# table. + + +import logging + +logger = logging.getLogger(__name__) + + +def token_to_stream_ordering(token): + return int(token[1:].split('_')[0]) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + logger.info("Porting pushers table, delta 31...") + cur.execute(""" + CREATE TABLE IF NOT EXISTS pushers2 ( + id BIGINT PRIMARY KEY, + user_name TEXT NOT NULL, + access_token BIGINT DEFAULT NULL, + profile_tag VARCHAR(32) NOT NULL, + kind VARCHAR(8) NOT NULL, + app_id VARCHAR(64) NOT NULL, + app_display_name VARCHAR(64) NOT NULL, + device_display_name VARCHAR(128) NOT NULL, + pushkey TEXT NOT NULL, + ts BIGINT NOT NULL, + lang VARCHAR(8), + data TEXT, + last_stream_ordering INTEGER, + last_success BIGINT, + failing_since BIGINT, + UNIQUE (app_id, pushkey, user_name) + ) + """) + cur.execute("""SELECT + id, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, + pushkey, ts, lang, data, last_token, last_success, + failing_since + FROM pushers + """) + count = 0 + for row in cur.fetchall(): + row = list(row) + row[12] = token_to_stream_ordering(row[12]) + cur.execute(database_engine.convert_param_style(""" + INSERT into pushers2 ( + id, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, + pushkey, ts, lang, data, last_stream_ordering, last_success, + failing_since + ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))), + row + ) + count += 1 + cur.execute("DROP TABLE pushers") + cur.execute("ALTER TABLE pushers2 RENAME TO pushers") + logger.info("Moved %d pushers to new table", count) -- cgit 1.5.1 From 0fd1cd24003b54e475985cf90db4223c3098375d Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Apr 2016 16:50:47 +0100 Subject: pep8 --- synapse/storage/event_push_actions.py | 2 +- synapse/storage/events.py | 4 +++- synapse/storage/registration.py | 2 +- synapse/storage/roommember.py | 4 +++- 4 files changed, 8 insertions(+), 4 deletions(-) (limited to 'synapse/storage/registration.py') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 5f61743e34..4d72e4a85e 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -144,7 +144,7 @@ class EventPushActionsStore(SQLBaseStore): txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") return txn.fetchone() result = yield self.runInteraction( - "get_latest_push_action_stream_ordering", f + "get_latest_push_action_stream_ordering", f ) defer.returnValue(result[0] or 0) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ceae8715ce..5be5bc01b1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -202,7 +202,9 @@ class EventsStore(SQLBaseStore): txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) - txn.call_after(self.get_users_with_pushers_in_room.invalidate, (event.room_id,)) + txn.call_after( + self.get_users_with_pushers_in_room.invalidate, (event.room_id,) + ) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,)) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 701dd2f656..7af0cae6a5 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -20,7 +20,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError, Codes from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks class RegistrationStore(SQLBaseStore): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 22a690aa8d..088ad0f914 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -58,7 +58,9 @@ class RoomMemberStore(SQLBaseStore): txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) - txn.call_after(self.get_users_with_pushers_in_room.invalidate, (event.room_id,)) + txn.call_after( + self.get_users_with_pushers_in_room.invalidate, (event.room_id,) + ) txn.call_after( self._membership_stream_cache.entity_has_changed, event.state_key, event.internal_metadata.stream_ordering -- cgit 1.5.1 From 4ea762c1a28edbf18d0a183ed35fb8a5a11847c5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 May 2016 10:08:21 +0100 Subject: Add cache to get_user_by_id --- synapse/storage/registration.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/storage/registration.py') diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 7af0cae6a5..bda84a744a 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -101,6 +101,7 @@ class RegistrationStore(SQLBaseStore): make_guest, appservice_id ) + self.get_user_by_id.invalidate((user_id,)) self.is_guest.invalidate((user_id,)) def _register( @@ -156,6 +157,7 @@ class RegistrationStore(SQLBaseStore): (next_id, user_id, token,) ) + @cached() def get_user_by_id(self, user_id): return self._simple_select_one( table="users", @@ -193,6 +195,7 @@ class RegistrationStore(SQLBaseStore): }, { 'password_hash': password_hash }) + self.get_user_by_id.invalidate((user_id,)) @defer.inlineCallbacks def user_delete_access_tokens(self, user_id, except_token_ids=[]): -- cgit 1.5.1