diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/config/key.py | 28 | ||||
-rw-r--r-- | synapse/config/registration.py | 18 | ||||
-rw-r--r-- | synapse/events/snapshot.py | 1 | ||||
-rw-r--r-- | synapse/handlers/_base.py | 39 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 12 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 320 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 23 | ||||
-rw-r--r-- | synapse/push/__init__.py | 248 | ||||
-rw-r--r-- | synapse/push/action_generator.py | 22 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 10 | ||||
-rw-r--r-- | synapse/rest/client/v1/admin.py | 2 | ||||
-rw-r--r-- | synapse/rest/client/v1/presence.py | 4 | ||||
-rw-r--r-- | synapse/rest/client/v1/profile.py | 6 | ||||
-rw-r--r-- | synapse/storage/event_push_actions.py | 45 | ||||
-rw-r--r-- | synapse/storage/events.py | 26 | ||||
-rw-r--r-- | synapse/storage/prepare_database.py | 2 | ||||
-rw-r--r-- | synapse/storage/receipts.py | 19 | ||||
-rw-r--r-- | synapse/util/metrics.py | 30 |
18 files changed, 432 insertions, 423 deletions
diff --git a/synapse/config/key.py b/synapse/config/key.py index ac90cd3fc1..a072aec714 100644 --- a/synapse/config/key.py +++ b/synapse/config/key.py @@ -22,8 +22,14 @@ from signedjson.key import ( read_signing_keys, write_signing_keys, NACL_ED25519 ) from unpaddedbase64 import decode_base64 +from synapse.util.stringutils import random_string_with_symbols import os +import hashlib +import logging + + +logger = logging.getLogger(__name__) class KeyConfig(Config): @@ -40,9 +46,29 @@ class KeyConfig(Config): config["perspectives"] ) - def default_config(self, config_dir_path, server_name, **kwargs): + self.macaroon_secret_key = config.get( + "macaroon_secret_key", self.registration_shared_secret + ) + + if not self.macaroon_secret_key: + # Unfortunately, there are people out there that don't have this + # set. Lets just be "nice" and derive one from their secret key. + logger.warn("Config is missing missing macaroon_secret_key") + seed = self.signing_key[0].seed + self.macaroon_secret_key = hashlib.sha256(seed) + + def default_config(self, config_dir_path, server_name, is_generating_file=False, + **kwargs): base_key_name = os.path.join(config_dir_path, server_name) + + if is_generating_file: + macaroon_secret_key = random_string_with_symbols(50) + else: + macaroon_secret_key = None + return """\ + macaroon_secret_key: "%(macaroon_secret_key)s" + ## Signing Keys ## # Path to the signing key to sign messages with diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 9b6dacc5b8..ab062d528c 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -32,26 +32,14 @@ class RegistrationConfig(Config): ) self.registration_shared_secret = config.get("registration_shared_secret") - self.macaroon_secret_key = config.get("macaroon_secret_key") - if self.macaroon_secret_key is None: - raise Exception( - "Config is missing missing macaroon_secret_key - please set it" - " in your config file." - ) + self.bcrypt_rounds = config.get("bcrypt_rounds", 12) self.trusted_third_party_id_servers = config["trusted_third_party_id_servers"] self.allow_guest_access = config.get("allow_guest_access", False) - def default_config(self, is_generating_file=False, **kwargs): + def default_config(self, **kwargs): registration_shared_secret = random_string_with_symbols(50) - macaroon_line = "" - if is_generating_file: - macaroon_line += '\n macaroon_secret_key: "%s"\n' % ( - random_string_with_symbols(50), - ) - - macaroon_secret_key = random_string_with_symbols(50) return """\ ## Registration ## @@ -61,7 +49,7 @@ class RegistrationConfig(Config): # If set, allows registration by anyone who also has the shared # secret, even if registration is otherwise disabled. registration_shared_secret: "%(registration_shared_secret)s" -%(macaroon_line)s + # Set the number of bcrypt rounds used to generate password hash. # Larger numbers increase the work factor needed to generate the hash. # The default number of rounds is 12. diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index f51200d18e..8a475417a6 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -20,3 +20,4 @@ class EventContext(object): self.current_state = current_state self.state_group = None self.rejected = False + self.push_actions = [] diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index fa83d3e464..064e8723c8 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -53,25 +53,10 @@ class BaseHandler(object): self.event_builder_factory = hs.get_event_builder_factory() @defer.inlineCallbacks - def _filter_events_for_clients(self, user_tuples, events): + def _filter_events_for_clients(self, user_tuples, events, event_id_to_state): """ Returns dict of user_id -> list of events that user is allowed to see. """ - # If there is only one user, just get the state for that one user, - # otherwise just get all the state. - if len(user_tuples) == 1: - types = ( - (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, user_tuples[0][0]), - ) - else: - types = None - - event_id_to_state = yield self.store.get_state_for_events( - frozenset(e.event_id for e in events), - types=types - ) - forgotten = yield defer.gatherResults([ self.store.who_forgot_in_room( room_id, @@ -135,7 +120,17 @@ class BaseHandler(object): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, events, is_peeking=False): # Assumes that user has at some point joined the room if not is_guest. - res = yield self._filter_events_for_clients([(user_id, is_peeking)], events) + types = ( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, user_id), + ) + event_id_to_state = yield self.store.get_state_for_events( + frozenset(e.event_id for e in events), + types=types + ) + res = yield self._filter_events_for_clients( + [(user_id, is_peeking)], events, event_id_to_state + ) defer.returnValue(res.get(user_id, [])) def ratelimit(self, user_id): @@ -269,13 +264,13 @@ class BaseHandler(object): "You don't have permission to redact events" ) - (event_stream_id, max_stream_id) = yield self.store.persist_event( - event, context=context - ) - action_generator = ActionGenerator(self.hs) yield action_generator.handle_push_actions_for_event( - event, self + event, context, self + ) + + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context ) destinations = set() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b78b0502d9..da55d43541 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -236,12 +236,6 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - if not backfilled and not event.internal_metadata.is_outlier(): - action_generator = ActionGenerator(self.hs) - yield action_generator.handle_push_actions_for_event( - event, self - ) - @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): event_to_state = yield self.store.get_state_for_events( @@ -1073,6 +1067,12 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) + if not backfilled and not event.internal_metadata.is_outlier(): + action_generator = ActionGenerator(self.hs) + yield action_generator.handle_push_actions_for_event( + event, context, self + ) + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3f1cda5b0b..ddeed27965 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -19,6 +19,7 @@ from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes from synapse.util import unwrapFirstError from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.metrics import Measure from twisted.internet import defer @@ -178,18 +179,6 @@ class SyncHandler(BaseHandler): else: return self.incremental_sync_with_gap(sync_config, since_token) - def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room): - if room_id not in ephemeral_by_room: - return None - for e in ephemeral_by_room[room_id]: - if e['type'] != 'm.receipt': - continue - for receipt_event_id, val in e['content'].items(): - if 'm.read' in val: - if user_id in val['m.read']: - return receipt_event_id - return None - @defer.inlineCallbacks def full_state_sync(self, sync_config, timeline_since_token): """Get a sync for a client which is starting without any state. @@ -318,7 +307,6 @@ class SyncHandler(BaseHandler): ephemeral_by_room=ephemeral_by_room, tags_by_room=tags_by_room, account_data_by_room=account_data_by_room, - all_ephemeral_by_room=ephemeral_by_room, batch=batch, full_state=True, ) @@ -368,50 +356,51 @@ class SyncHandler(BaseHandler): typing events for that room. """ - typing_key = since_token.typing_key if since_token else "0" + with Measure(self.clock, "ephemeral_by_room"): + typing_key = since_token.typing_key if since_token else "0" - rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) - room_ids = [room.room_id for room in rooms] + rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) + room_ids = [room.room_id for room in rooms] - typing_source = self.event_sources.sources["typing"] - typing, typing_key = yield typing_source.get_new_events( - user=sync_config.user, - from_key=typing_key, - limit=sync_config.filter_collection.ephemeral_limit(), - room_ids=room_ids, - is_guest=sync_config.is_guest, - ) - now_token = now_token.copy_and_replace("typing_key", typing_key) - - ephemeral_by_room = {} - - for event in typing: - # we want to exclude the room_id from the event, but modifying the - # result returned by the event source is poor form (it might cache - # the object) - room_id = event["room_id"] - event_copy = {k: v for (k, v) in event.iteritems() - if k != "room_id"} - ephemeral_by_room.setdefault(room_id, []).append(event_copy) - - receipt_key = since_token.receipt_key if since_token else "0" - - receipt_source = self.event_sources.sources["receipt"] - receipts, receipt_key = yield receipt_source.get_new_events( - user=sync_config.user, - from_key=receipt_key, - limit=sync_config.filter_collection.ephemeral_limit(), - room_ids=room_ids, - is_guest=sync_config.is_guest, - ) - now_token = now_token.copy_and_replace("receipt_key", receipt_key) + typing_source = self.event_sources.sources["typing"] + typing, typing_key = yield typing_source.get_new_events( + user=sync_config.user, + from_key=typing_key, + limit=sync_config.filter_collection.ephemeral_limit(), + room_ids=room_ids, + is_guest=sync_config.is_guest, + ) + now_token = now_token.copy_and_replace("typing_key", typing_key) + + ephemeral_by_room = {} + + for event in typing: + # we want to exclude the room_id from the event, but modifying the + # result returned by the event source is poor form (it might cache + # the object) + room_id = event["room_id"] + event_copy = {k: v for (k, v) in event.iteritems() + if k != "room_id"} + ephemeral_by_room.setdefault(room_id, []).append(event_copy) + + receipt_key = since_token.receipt_key if since_token else "0" + + receipt_source = self.event_sources.sources["receipt"] + receipts, receipt_key = yield receipt_source.get_new_events( + user=sync_config.user, + from_key=receipt_key, + limit=sync_config.filter_collection.ephemeral_limit(), + room_ids=room_ids, + is_guest=sync_config.is_guest, + ) + now_token = now_token.copy_and_replace("receipt_key", receipt_key) - for event in receipts: - room_id = event["room_id"] - # exclude room id, as above - event_copy = {k: v for (k, v) in event.iteritems() - if k != "room_id"} - ephemeral_by_room.setdefault(room_id, []).append(event_copy) + for event in receipts: + room_id = event["room_id"] + # exclude room id, as above + event_copy = {k: v for (k, v) in event.iteritems() + if k != "room_id"} + ephemeral_by_room.setdefault(room_id, []).append(event_copy) defer.returnValue((now_token, ephemeral_by_room)) @@ -451,13 +440,6 @@ class SyncHandler(BaseHandler): ) now_token = now_token.copy_and_replace("presence_key", presence_key) - # We now fetch all ephemeral events for this room in order to get - # this users current read receipt. This could almost certainly be - # optimised. - _, all_ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token - ) - now_token, ephemeral_by_room = yield self.ephemeral_by_room( sync_config, now_token, since_token ) @@ -589,7 +571,6 @@ class SyncHandler(BaseHandler): ephemeral_by_room=ephemeral_by_room, tags_by_room=tags_by_room, account_data_by_room=account_data_by_room, - all_ephemeral_by_room=all_ephemeral_by_room, batch=batch, full_state=full_state, ) @@ -619,58 +600,64 @@ class SyncHandler(BaseHandler): """ :returns a Deferred TimelineBatch """ - filtering_factor = 2 - timeline_limit = sync_config.filter_collection.timeline_limit() - load_limit = max(timeline_limit * filtering_factor, 10) - max_repeat = 5 # Only try a few times per room, otherwise - room_key = now_token.room_key - end_key = room_key - - limited = recents is None or newly_joined_room or timeline_limit < len(recents) - - if recents is not None: - recents = sync_config.filter_collection.filter_room_timeline(recents) - recents = yield self._filter_events_for_client( - sync_config.user.to_string(), - recents, - is_peeking=sync_config.is_guest, - ) - else: - recents = [] - - since_key = None - if since_token and not newly_joined_room: - since_key = since_token.room_key - - while limited and len(recents) < timeline_limit and max_repeat: - events, end_key = yield self.store.get_room_events_stream_for_room( - room_id, - limit=load_limit + 1, - from_key=since_key, - to_key=end_key, - ) - loaded_recents = sync_config.filter_collection.filter_room_timeline(events) - loaded_recents = yield self._filter_events_for_client( - sync_config.user.to_string(), - loaded_recents, - is_peeking=sync_config.is_guest, - ) - loaded_recents.extend(recents) - recents = loaded_recents - - if len(events) <= load_limit: + with Measure(self.clock, "load_filtered_recents"): + filtering_factor = 2 + timeline_limit = sync_config.filter_collection.timeline_limit() + load_limit = max(timeline_limit * filtering_factor, 10) + max_repeat = 5 # Only try a few times per room, otherwise + room_key = now_token.room_key + end_key = room_key + + if recents is None or newly_joined_room or timeline_limit < len(recents): + limited = True + else: limited = False - break - max_repeat -= 1 - if len(recents) > timeline_limit: - limited = True - recents = recents[-timeline_limit:] - room_key = recents[0].internal_metadata.before + if recents is not None: + recents = sync_config.filter_collection.filter_room_timeline(recents) + recents = yield self._filter_events_for_client( + sync_config.user.to_string(), + recents, + is_peeking=sync_config.is_guest, + ) + else: + recents = [] + + since_key = None + if since_token and not newly_joined_room: + since_key = since_token.room_key + + while limited and len(recents) < timeline_limit and max_repeat: + events, end_key = yield self.store.get_room_events_stream_for_room( + room_id, + limit=load_limit + 1, + from_key=since_key, + to_key=end_key, + ) + loaded_recents = sync_config.filter_collection.filter_room_timeline( + events + ) + loaded_recents = yield self._filter_events_for_client( + sync_config.user.to_string(), + loaded_recents, + is_peeking=sync_config.is_guest, + ) + loaded_recents.extend(recents) + recents = loaded_recents - prev_batch_token = now_token.copy_and_replace( - "room_key", room_key - ) + if len(events) <= load_limit: + limited = False + break + max_repeat -= 1 + + if len(recents) > timeline_limit: + limited = True + recents = recents[-timeline_limit:] + room_key = recents[0].internal_metadata.before + + prev_batch_token = now_token.copy_and_replace( + "room_key", room_key + ) defer.returnValue(TimelineBatch( events=recents, @@ -683,7 +670,6 @@ class SyncHandler(BaseHandler): since_token, now_token, ephemeral_by_room, tags_by_room, account_data_by_room, - all_ephemeral_by_room, batch, full_state=False): state = yield self.compute_state_delta( room_id, batch, sync_config, since_token, now_token, @@ -714,7 +700,7 @@ class SyncHandler(BaseHandler): if room_sync: notifs = yield self.unread_notifs_for_room_id( - room_id, sync_config, all_ephemeral_by_room + room_id, sync_config ) if notifs is not None: @@ -831,50 +817,53 @@ class SyncHandler(BaseHandler): # updates even if they occured logically before the previous event. # TODO(mjark) Check for new redactions in the state events. - if full_state: - if batch: - state = yield self.store.get_state_for_event(batch.events[0].event_id) - else: - state = yield self.get_state_at( - room_id, stream_position=now_token - ) + with Measure(self.clock, "compute_state_delta"): + if full_state: + if batch: + state = yield self.store.get_state_for_event( + batch.events[0].event_id + ) + else: + state = yield self.get_state_at( + room_id, stream_position=now_token + ) - timeline_state = { - (event.type, event.state_key): event - for event in batch.events if event.is_state() - } + timeline_state = { + (event.type, event.state_key): event + for event in batch.events if event.is_state() + } - state = _calculate_state( - timeline_contains=timeline_state, - timeline_start=state, - previous={}, - ) - elif batch.limited: - state_at_previous_sync = yield self.get_state_at( - room_id, stream_position=since_token - ) + state = _calculate_state( + timeline_contains=timeline_state, + timeline_start=state, + previous={}, + ) + elif batch.limited: + state_at_previous_sync = yield self.get_state_at( + room_id, stream_position=since_token + ) - state_at_timeline_start = yield self.store.get_state_for_event( - batch.events[0].event_id - ) + state_at_timeline_start = yield self.store.get_state_for_event( + batch.events[0].event_id + ) - timeline_state = { - (event.type, event.state_key): event - for event in batch.events if event.is_state() - } + timeline_state = { + (event.type, event.state_key): event + for event in batch.events if event.is_state() + } - state = _calculate_state( - timeline_contains=timeline_state, - timeline_start=state_at_timeline_start, - previous=state_at_previous_sync, - ) - else: - state = {} + state = _calculate_state( + timeline_contains=timeline_state, + timeline_start=state_at_timeline_start, + previous=state_at_previous_sync, + ) + else: + state = {} - defer.returnValue({ - (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state(state.values()) - }) + defer.returnValue({ + (e.type, e.state_key): e + for e in sync_config.filter_collection.filter_room_state(state.values()) + }) def check_joined_room(self, sync_config, state_delta): """ @@ -895,21 +884,24 @@ class SyncHandler(BaseHandler): return False @defer.inlineCallbacks - def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room): - last_unread_event_id = self.last_read_event_id_for_room_and_user( - room_id, sync_config.user.to_string(), ephemeral_by_room - ) - - notifs = [] - if last_unread_event_id: - notifs = yield self.store.get_unread_event_push_actions_by_room_for_user( - room_id, sync_config.user.to_string(), last_unread_event_id + def unread_notifs_for_room_id(self, room_id, sync_config): + with Measure(self.clock, "unread_notifs_for_room_id"): + last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user( + user_id=sync_config.user.to_string(), + room_id=room_id, + receipt_type="m.read" ) - defer.returnValue(notifs) - # There is no new information in this period, so your notification - # count is whatever it was last time. - defer.returnValue(None) + notifs = [] + if last_unread_event_id: + notifs = yield self.store.get_unread_event_push_actions_by_room_for_user( + room_id, sync_config.user.to_string(), last_unread_event_id + ) + defer.returnValue(notifs) + + # There is no new information in this period, so your notification + # count is whatever it was last time. + defer.returnValue(None) def _action_has_highlight(actions): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 43bf600913..b16d0017df 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -19,6 +19,7 @@ from ._base import BaseHandler from synapse.api.errors import SynapseError, AuthError from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.metrics import Measure from synapse.types import UserID import logging @@ -222,6 +223,7 @@ class TypingNotificationHandler(BaseHandler): class TypingNotificationEventSource(object): def __init__(self, hs): self.hs = hs + self.clock = hs.get_clock() self._handler = None self._room_member_handler = None @@ -247,19 +249,20 @@ class TypingNotificationEventSource(object): } def get_new_events(self, from_key, room_ids, **kwargs): - from_key = int(from_key) - handler = self.handler() + with Measure(self.clock, "typing.get_new_events"): + from_key = int(from_key) + handler = self.handler() - events = [] - for room_id in room_ids: - if room_id not in handler._room_serials: - continue - if handler._room_serials[room_id] <= from_key: - continue + events = [] + for room_id in room_ids: + if room_id not in handler._room_serials: + continue + if handler._room_serials[room_id] <= from_key: + continue - events.append(self._make_event_for(room_id)) + events.append(self._make_event_for(room_id)) - return events, handler._latest_room_serial + return events, handler._latest_room_serial def get_current_key(self): return self.handler()._latest_room_serial diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index b9522a8050..8da2d8716c 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -128,7 +128,8 @@ class Pusher(object): try: if wait > 0: yield synapse.util.async.sleep(wait) - yield self.get_and_dispatch() + with Measure(self.clock, "push"): + yield self.get_and_dispatch() wait = 0 except: if wait == 0: @@ -150,27 +151,115 @@ class Pusher(object): only_keys=("room", "receipt",), ) - with Measure(self.clock, "push"): - # limiting to 1 may get 1 event plus 1 presence event, so - # pick out the actual event - single_event = None - read_receipt = None - for c in chunk['chunk']: - if 'event_id' in c: # Hmmm... - single_event = c - elif c['type'] == 'm.receipt': - read_receipt = c - - have_updated_badge = False - if read_receipt: - for receipt_part in read_receipt['content'].values(): - if 'm.read' in receipt_part: - if self.user_id in receipt_part['m.read'].keys(): - have_updated_badge = True - - if not single_event: - if have_updated_badge: - yield self.update_badge() + # limiting to 1 may get 1 event plus 1 presence event, so + # pick out the actual event + single_event = None + read_receipt = None + for c in chunk['chunk']: + if 'event_id' in c: # Hmmm... + single_event = c + elif c['type'] == 'm.receipt': + read_receipt = c + + have_updated_badge = False + if read_receipt: + for receipt_part in read_receipt['content'].values(): + if 'm.read' in receipt_part: + if self.user_id in receipt_part['m.read'].keys(): + have_updated_badge = True + + if not single_event: + if have_updated_badge: + yield self.update_badge() + self.last_token = chunk['end'] + yield self.store.update_pusher_last_token( + self.app_id, + self.pushkey, + self.user_id, + self.last_token + ) + return + + if not self.alive: + return + + processed = False + + rule_evaluator = yield \ + push_rule_evaluator.evaluator_for_user_id_and_profile_tag( + self.user_id, self.profile_tag, single_event['room_id'], self.store + ) + + actions = yield rule_evaluator.actions_for_event(single_event) + tweaks = rule_evaluator.tweaks_for_actions(actions) + + if 'notify' in actions: + self.badge = yield self._get_badge_count() + rejected = yield self.dispatch_push(single_event, tweaks, self.badge) + self.has_unread = True + if isinstance(rejected, list) or isinstance(rejected, tuple): + processed = True + for pk in rejected: + if pk != self.pushkey: + # for sanity, we only remove the pushkey if it + # was the one we actually sent... + logger.warn( + ("Ignoring rejected pushkey %s because we" + " didn't send it"), pk + ) + else: + logger.info( + "Pushkey %s was rejected: removing", + pk + ) + yield self.hs.get_pusherpool().remove_pusher( + self.app_id, pk, self.user_id + ) + else: + if have_updated_badge: + yield self.update_badge() + processed = True + + if not self.alive: + return + + if processed: + self.backoff_delay = Pusher.INITIAL_BACKOFF + self.last_token = chunk['end'] + yield self.store.update_pusher_last_token_and_success( + self.app_id, + self.pushkey, + self.user_id, + self.last_token, + self.clock.time_msec() + ) + if self.failing_since: + self.failing_since = None + yield self.store.update_pusher_failing_since( + self.app_id, + self.pushkey, + self.user_id, + self.failing_since) + else: + if not self.failing_since: + self.failing_since = self.clock.time_msec() + yield self.store.update_pusher_failing_since( + self.app_id, + self.pushkey, + self.user_id, + self.failing_since + ) + + if (self.failing_since and + self.failing_since < + self.clock.time_msec() - Pusher.GIVE_UP_AFTER): + # we really only give up so that if the URL gets + # fixed, we don't suddenly deliver a load + # of old notifications. + logger.warn("Giving up on a notification to user %s, " + "pushkey %s", + self.user_id, self.pushkey) + self.backoff_delay = Pusher.INITIAL_BACKOFF self.last_token = chunk['end'] yield self.store.update_pusher_last_token( self.app_id, @@ -178,114 +267,25 @@ class Pusher(object): self.user_id, self.last_token ) - return - - if not self.alive: - return - - processed = False - - rule_evaluator = yield \ - push_rule_evaluator.evaluator_for_user_id_and_profile_tag( - self.user_id, self.profile_tag, single_event['room_id'], self.store - ) - actions = yield rule_evaluator.actions_for_event(single_event) - tweaks = rule_evaluator.tweaks_for_actions(actions) - - if 'notify' in actions: - self.badge = yield self._get_badge_count() - rejected = yield self.dispatch_push(single_event, tweaks, self.badge) - self.has_unread = True - if isinstance(rejected, list) or isinstance(rejected, tuple): - processed = True - for pk in rejected: - if pk != self.pushkey: - # for sanity, we only remove the pushkey if it - # was the one we actually sent... - logger.warn( - ("Ignoring rejected pushkey %s because we" - " didn't send it"), pk - ) - else: - logger.info( - "Pushkey %s was rejected: removing", - pk - ) - yield self.hs.get_pusherpool().remove_pusher( - self.app_id, pk, self.user_id - ) - else: - if have_updated_badge: - yield self.update_badge() - processed = True - - if not self.alive: - return - - if processed: - self.backoff_delay = Pusher.INITIAL_BACKOFF - self.last_token = chunk['end'] - yield self.store.update_pusher_last_token_and_success( + self.failing_since = None + yield self.store.update_pusher_failing_since( self.app_id, self.pushkey, self.user_id, - self.last_token, - self.clock.time_msec() + self.failing_since ) - if self.failing_since: - self.failing_since = None - yield self.store.update_pusher_failing_since( - self.app_id, - self.pushkey, - self.user_id, - self.failing_since) else: - if not self.failing_since: - self.failing_since = self.clock.time_msec() - yield self.store.update_pusher_failing_since( - self.app_id, - self.pushkey, - self.user_id, - self.failing_since - ) - - if (self.failing_since and - self.failing_since < - self.clock.time_msec() - Pusher.GIVE_UP_AFTER): - # we really only give up so that if the URL gets - # fixed, we don't suddenly deliver a load - # of old notifications. - logger.warn("Giving up on a notification to user %s, " - "pushkey %s", - self.user_id, self.pushkey) - self.backoff_delay = Pusher.INITIAL_BACKOFF - self.last_token = chunk['end'] - yield self.store.update_pusher_last_token( - self.app_id, - self.pushkey, - self.user_id, - self.last_token - ) - - self.failing_since = None - yield self.store.update_pusher_failing_since( - self.app_id, - self.pushkey, - self.user_id, - self.failing_since - ) - else: - logger.warn("Failed to dispatch push for user %s " - "(failing for %dms)." - "Trying again in %dms", - self.user_id, - self.clock.time_msec() - self.failing_since, - self.backoff_delay) - yield synapse.util.async.sleep(self.backoff_delay / 1000.0) - self.backoff_delay *= 2 - if self.backoff_delay > Pusher.MAX_BACKOFF: - self.backoff_delay = Pusher.MAX_BACKOFF + logger.warn("Failed to dispatch push for user %s " + "(failing for %dms)." + "Trying again in %dms", + self.user_id, + self.clock.time_msec() - self.failing_since, + self.backoff_delay) + yield synapse.util.async.sleep(self.backoff_delay / 1000.0) + self.backoff_delay *= 2 + if self.backoff_delay > Pusher.MAX_BACKOFF: + self.backoff_delay = Pusher.MAX_BACKOFF def stop(self): self.alive = False diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 1d2e558f9a..e0da0868ec 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -19,8 +19,6 @@ import bulk_push_rule_evaluator import logging -from synapse.api.constants import EventTypes - logger = logging.getLogger(__name__) @@ -36,21 +34,15 @@ class ActionGenerator: # tag (ie. we just need all the users). @defer.inlineCallbacks - def handle_push_actions_for_event(self, event, handler): - if event.type == EventTypes.Redaction and event.redacts is not None: - yield self.store.remove_push_actions_for_event_id( - event.room_id, event.redacts - ) - + def handle_push_actions_for_event(self, event, context, handler): bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( event.room_id, self.hs, self.store ) - actions_by_user = yield bulk_evaluator.action_for_event_by_user(event, handler) - - yield self.store.set_push_actions_for_event_and_users( - event, - [ - (uid, None, actions) for uid, actions in actions_by_user.items() - ] + actions_by_user = yield bulk_evaluator.action_for_event_by_user( + event, handler, context.current_state ) + + context.push_actions = [ + (uid, None, actions) for uid, actions in actions_by_user.items() + ] diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 20c60422bf..8ac5ceb9ef 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -98,25 +98,21 @@ class BulkPushRuleEvaluator: self.store = store @defer.inlineCallbacks - def action_for_event_by_user(self, event, handler): + def action_for_event_by_user(self, event, handler, current_state): actions_by_user = {} users_dict = yield self.store.are_guests(self.rules_by_user.keys()) filtered_by_user = yield handler._filter_events_for_clients( - users_dict.items(), [event] + users_dict.items(), [event], {event.event_id: current_state} ) evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room)) condition_cache = {} - member_state = yield self.store.get_state_for_event( - event.event_id, - ) - display_names = {} - for ev in member_state.values(): + for ev in current_state.values(): nm = ev.content.get("displayname", None) if nm and ev.type == EventTypes.Member: display_names[ev.state_key] = nm diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 5ec52707e7..e2f5eb7b29 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) class WhoisRestServlet(ClientV1RestServlet): - PATTERNS = client_path_patterns("/admin/whois/(?P<user_id>[^/]*)$") + PATTERNS = client_path_patterns("/admin/whois/(?P<user_id>[^/]*)") @defer.inlineCallbacks def on_GET(self, request, user_id): diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index 9410ac527e..a6f8754e32 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -28,7 +28,7 @@ logger = logging.getLogger(__name__) class PresenceStatusRestServlet(ClientV1RestServlet): - PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status$") + PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status") @defer.inlineCallbacks def on_GET(self, request, user_id): @@ -73,7 +73,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet): class PresenceListRestServlet(ClientV1RestServlet): - PATTERNS = client_path_patterns("/presence/list/(?P<user_id>[^/]*)$") + PATTERNS = client_path_patterns("/presence/list/(?P<user_id>[^/]*)") @defer.inlineCallbacks def on_GET(self, request, user_id): diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py index aeda7bfa39..3c5a212920 100644 --- a/synapse/rest/client/v1/profile.py +++ b/synapse/rest/client/v1/profile.py @@ -23,7 +23,7 @@ import simplejson as json class ProfileDisplaynameRestServlet(ClientV1RestServlet): - PATTERNS = client_path_patterns("/profile/(?P<user_id>[^/]*)/displayname$") + PATTERNS = client_path_patterns("/profile/(?P<user_id>[^/]*)/displayname") @defer.inlineCallbacks def on_GET(self, request, user_id): @@ -60,7 +60,7 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet): class ProfileAvatarURLRestServlet(ClientV1RestServlet): - PATTERNS = client_path_patterns("/profile/(?P<user_id>[^/]*)/avatar_url$") + PATTERNS = client_path_patterns("/profile/(?P<user_id>[^/]*)/avatar_url") @defer.inlineCallbacks def on_GET(self, request, user_id): @@ -97,7 +97,7 @@ class ProfileAvatarURLRestServlet(ClientV1RestServlet): class ProfileRestServlet(ClientV1RestServlet): - PATTERNS = client_path_patterns("/profile/(?P<user_id>[^/]*)$") + PATTERNS = client_path_patterns("/profile/(?P<user_id>[^/]*)") @defer.inlineCallbacks def on_GET(self, request, user_id): diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index d0a969f50b..d77a817682 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -24,8 +24,7 @@ logger = logging.getLogger(__name__) class EventPushActionsStore(SQLBaseStore): - @defer.inlineCallbacks - def set_push_actions_for_event_and_users(self, event, tuples): + def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ :param event: the event set actions for :param tuples: list of tuples of (user_id, profile_tag, actions) @@ -44,18 +43,12 @@ class EventPushActionsStore(SQLBaseStore): 'highlight': 1 if _action_has_highlight(actions) else 0, }) - def f(txn): - for uid, _, __ in tuples: - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (event.room_id, uid) - ) - return self._simple_insert_many_txn(txn, "event_push_actions", values) - - yield self.runInteraction( - "set_actions_for_event_and_users", - f, - ) + for uid, _, __ in tuples: + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (event.room_id, uid) + ) + self._simple_insert_many_txn(txn, "event_push_actions", values) @cachedInlineCallbacks(num_args=3, lru=True, tree=True) def get_unread_event_push_actions_by_room_for_user( @@ -107,21 +100,15 @@ class EventPushActionsStore(SQLBaseStore): ) defer.returnValue(ret) - @defer.inlineCallbacks - def remove_push_actions_for_event_id(self, room_id, event_id): - def f(txn): - # Sad that we have to blow away the cache for the whole room here - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (room_id,) - ) - txn.execute( - "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", - (room_id, event_id) - ) - yield self.runInteraction( - "remove_push_actions_for_event_id", - f + def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id): + # Sad that we have to blow away the cache for the whole room here + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id,) + ) + txn.execute( + "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", + (room_id, event_id) ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c6ed54721c..3a5c6ee4b1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -205,23 +205,29 @@ class EventsStore(SQLBaseStore): @log_function def _persist_events_txn(self, txn, events_and_contexts, backfilled, is_new_state=True): - - # Remove the any existing cache entries for the event_ids - for event, _ in events_and_contexts: + depth_updates = {} + for event, context in events_and_contexts: + # Remove the any existing cache entries for the event_ids txn.call_after(self._invalidate_get_event_cache, event.event_id) - if not backfilled: txn.call_after( self._events_stream_cache.entity_has_changed, event.room_id, event.internal_metadata.stream_ordering, ) - depth_updates = {} - for event, _ in events_and_contexts: - if event.internal_metadata.is_outlier(): - continue - depth_updates[event.room_id] = max( - event.depth, depth_updates.get(event.room_id, event.depth) + if not event.internal_metadata.is_outlier(): + depth_updates[event.room_id] = max( + event.depth, depth_updates.get(event.room_id, event.depth) + ) + + if context.push_actions: + self._set_push_actions_for_event_and_users_txn( + txn, event, context.push_actions + ) + + if event.type == EventTypes.Redaction and event.redacts is not None: + self._remove_push_actions_for_event_id_txn( + txn, event.room_id, event.redacts ) for room_id, depth in depth_updates.items(): diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index d782b8e25b..850736c85e 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -211,7 +211,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, logger.debug("applied_delta_files: %s", applied_delta_files) for v in range(start_ver, SCHEMA_VERSION + 1): - logger.debug("Upgrading schema to v%d", v) + logger.info("Upgrading schema to v%d", v) delta_dir = os.path.join(dir_path, "schema", "delta", str(v)) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 8068c73740..4202a6b3dc 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -46,6 +46,20 @@ class ReceiptsStore(SQLBaseStore): desc="get_receipts_for_room", ) + @cached(num_args=3) + def get_last_receipt_event_id_for_user(self, user_id, room_id, receipt_type): + return self._simple_select_one_onecol( + table="receipts_linearized", + keyvalues={ + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id + }, + retcol="event_id", + desc="get_own_receipt_for_user", + allow_none=True, + ) + @cachedInlineCallbacks(num_args=2) def get_receipts_for_user(self, user_id, receipt_type): def f(txn): @@ -226,6 +240,11 @@ class ReceiptsStore(SQLBaseStore): room_id, stream_id ) + txn.call_after( + self.get_last_receipt_event_id_for_user.invalidate, + (user_id, room_id, receipt_type) + ) + # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts sql = ( diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 7c6899aa1c..c51b641125 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -48,33 +48,35 @@ block_db_txn_duration = metrics.register_distribution( class Measure(object): - __slots__ = ["clock", "name", "start_context", "start", "new_context"] + __slots__ = [ + "clock", "name", "start_context", "start", "new_context", "ru_utime", + "ru_stime", "db_txn_count", "db_txn_duration" + ] def __init__(self, clock, name): self.clock = clock self.name = name self.start_context = None self.start = None - self.new_context = LoggingContext(self.name) def __enter__(self): self.start = self.clock.time_msec() self.start_context = LoggingContext.current_context() - self.new_context.__enter__() + if self.start_context: + self.ru_utime, self.ru_stime = self.start_context.get_resource_usage() + self.db_txn_count = self.start_context.db_txn_count + self.db_txn_duration = self.start_context.db_txn_duration def __exit__(self, exc_type, exc_val, exc_tb): - current_context = LoggingContext.current_context() - - self.new_context.__exit__(exc_type, exc_val, exc_tb) - if exc_type is not None: + if exc_type is not None or not self.start_context: return duration = self.clock.time_msec() - self.start block_timer.inc_by(duration, self.name) - context = self.new_context + context = LoggingContext.current_context() - if context != current_context: + if context != self.start_context: logger.warn( "Context have unexpectedly changed from '%s' to '%s'. (%r)", context, self.start_context, self.name @@ -87,7 +89,9 @@ class Measure(object): ru_utime, ru_stime = context.get_resource_usage() - block_ru_utime.inc_by(ru_utime, self.name) - block_ru_stime.inc_by(ru_stime, self.name) - block_db_txn_count.inc_by(context.db_txn_count, self.name) - block_db_txn_duration.inc_by(context.db_txn_duration, self.name) + block_ru_utime.inc_by(ru_utime - self.ru_utime, self.name) + block_ru_stime.inc_by(ru_stime - self.ru_stime, self.name) + block_db_txn_count.inc_by(context.db_txn_count - self.db_txn_count, self.name) + block_db_txn_duration.inc_by( + context.db_txn_duration - self.db_txn_duration, self.name + ) |