diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/config/emailconfig.py | 84 | ||||
-rw-r--r-- | synapse/config/homeserver.py | 3 | ||||
-rw-r--r-- | synapse/config/server.py | 8 | ||||
-rw-r--r-- | synapse/handlers/_base.py | 2 | ||||
-rw-r--r-- | synapse/handlers/message.py | 8 | ||||
-rw-r--r-- | synapse/handlers/room.py | 2 | ||||
-rw-r--r-- | synapse/handlers/search.py | 8 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 4 | ||||
-rw-r--r-- | synapse/notifier.py | 2 | ||||
-rw-r--r-- | synapse/push/emailpusher.py | 261 | ||||
-rw-r--r-- | synapse/push/mailer.py | 373 | ||||
-rw-r--r-- | synapse/push/pusher.py | 12 | ||||
-rw-r--r-- | synapse/python_dependencies.py | 4 | ||||
-rw-r--r-- | synapse/storage/event_push_actions.py | 56 | ||||
-rw-r--r-- | synapse/storage/events.py | 2 | ||||
-rw-r--r-- | synapse/storage/pusher.py | 27 | ||||
-rw-r--r-- | synapse/storage/schema/delta/31/events.sql | 16 | ||||
-rw-r--r-- | synapse/storage/schema/delta/31/pusher_throttle.sql | 23 | ||||
-rw-r--r-- | synapse/util/presentable_names.py | 145 |
19 files changed, 1015 insertions, 25 deletions
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py new file mode 100644 index 0000000000..06b076e3f9 --- /dev/null +++ b/synapse/config/emailconfig.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file can't be called email.py because if it is, we cannot: +import email.utils + +from ._base import Config + + +class EmailConfig(Config): + """ + Email Configuration + """ + + def read_config(self, config): + self.email_enable_notifs = False + + email_config = config.get("email", None) + if email_config: + self.email_enable_notifs = email_config.get("enable_notifs", True) + + if self.email_enable_notifs: + required = [ + "smtp_host", + "smtp_port", + "notif_from", + "template_dir", + "notif_template_html", + ] + + missing = [] + for k in required: + if k not in email_config: + missing.append(k) + + if (len(missing) > 0): + raise RuntimeError( + "email.enable_notifs is True but required keys are missing: %s" % + (", ".join(["email." + k for k in missing]),) + ) + + if config.get("public_baseurl") is None: + raise RuntimeError( + "email.enable_notifs is True but no public_baseurl is set" + ) + + self.email_smtp_host = email_config["smtp_host"] + self.email_smtp_port = email_config["smtp_port"] + self.email_notif_from = email_config["notif_from"] + self.email_template_dir = email_config["template_dir"] + self.email_notif_template_html = email_config["notif_template_html"] + + # make sure it's valid + parsed = email.utils.parseaddr(self.email_notif_from) + if parsed[1] == '': + raise RuntimeError("Invalid notif_from address") + else: + self.email_enable_notifs = False + # Not much point setting defaults for the rest: it would be an + # error for them to be used. + + def default_config(self, config_dir_path, server_name, **kwargs): + return """ + # Enable sending emails for notification events + #email_config: + # enable_notifs: false + # smtp_host: "localhost" + # smtp_port: 25 + # notif_from: Your Friendly Matrix Home Server <noreply@example.com> + # template_dir: res/templates + # notif_template_html: notif.html + """ diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 9a80ac39ec..fc2445484c 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -31,13 +31,14 @@ from .cas import CasConfig from .password import PasswordConfig from .jwt import JWTConfig from .ldap import LDAPConfig +from .emailconfig import EmailConfig class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig, AppServiceConfig, KeyConfig, SAML2Config, CasConfig, - JWTConfig, LDAPConfig, PasswordConfig,): + JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,): pass diff --git a/synapse/config/server.py b/synapse/config/server.py index 46c633548a..04b9221908 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -28,6 +28,11 @@ class ServerConfig(Config): self.print_pidfile = config.get("print_pidfile") self.user_agent_suffix = config.get("user_agent_suffix") self.use_frozen_dicts = config.get("use_frozen_dicts", True) + self.public_baseurl = config.get("public_baseurl") + + if self.public_baseurl is not None: + if self.public_baseurl[-1] != '/': + self.public_baseurl += '/' self.start_pushers = config.get("start_pushers", True) self.listeners = config.get("listeners", []) @@ -143,6 +148,9 @@ class ServerConfig(Config): # Whether to serve a web client from the HTTP/HTTPS root resource. web_client: True + # The server's public-facing base URL + # https://example.com:8448/ + # Set the soft limit on the number of file descriptors synapse can use # Zero is used to indicate synapse should set the soft limit to the # hard limit. diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 13a675b208..134729069a 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -192,7 +192,7 @@ class BaseHandler(object): }) @defer.inlineCallbacks - def _filter_events_for_client(self, user_id, events, is_peeking=False): + def filter_events_for_client(self, user_id, events, is_peeking=False): """ Check which events a user is allowed to see diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index f51feda2f4..7d9e3cf364 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -123,7 +123,7 @@ class MessageHandler(BaseHandler): "end": next_token.to_string(), }) - events = yield self._filter_events_for_client( + events = yield self.filter_events_for_client( user_id, events, is_peeking=(member_event_id is None), @@ -483,7 +483,7 @@ class MessageHandler(BaseHandler): ] ).addErrback(unwrapFirstError) - messages = yield self._filter_events_for_client( + messages = yield self.filter_events_for_client( user_id, messages ) @@ -619,7 +619,7 @@ class MessageHandler(BaseHandler): end_token=stream_token ) - messages = yield self._filter_events_for_client( + messages = yield self.filter_events_for_client( user_id, messages, is_peeking=is_peeking ) @@ -700,7 +700,7 @@ class MessageHandler(BaseHandler): consumeErrors=True, ).addErrback(unwrapFirstError) - messages = yield self._filter_events_for_client( + messages = yield self.filter_events_for_client( user_id, messages, is_peeking=is_peeking, ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index dd9c18df84..fdebc9c438 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -449,7 +449,7 @@ class RoomContextHandler(BaseHandler): now_token = yield self.hs.get_event_sources().get_current_token() def filter_evts(events): - return self._filter_events_for_client( + return self.filter_events_for_client( user.to_string(), events, is_peeking=is_guest) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 9937d8dd7f..a937e87408 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -172,7 +172,7 @@ class SearchHandler(BaseHandler): filtered_events = search_filter.filter([r["event"] for r in results]) - events = yield self._filter_events_for_client( + events = yield self.filter_events_for_client( user.to_string(), filtered_events ) @@ -223,7 +223,7 @@ class SearchHandler(BaseHandler): r["event"] for r in results ]) - events = yield self._filter_events_for_client( + events = yield self.filter_events_for_client( user.to_string(), filtered_events ) @@ -281,11 +281,11 @@ class SearchHandler(BaseHandler): event.room_id, event.event_id, before_limit, after_limit ) - res["events_before"] = yield self._filter_events_for_client( + res["events_before"] = yield self.filter_events_for_client( user.to_string(), res["events_before"] ) - res["events_after"] = yield self._filter_events_for_client( + res["events_after"] = yield self.filter_events_for_client( user.to_string(), res["events_after"] ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 231140b655..b51bb651ec 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -681,7 +681,7 @@ class SyncHandler(BaseHandler): if recents is not None: recents = sync_config.filter_collection.filter_room_timeline(recents) - recents = yield self._filter_events_for_client( + recents = yield self.filter_events_for_client( sync_config.user.to_string(), recents, ) @@ -702,7 +702,7 @@ class SyncHandler(BaseHandler): loaded_recents = sync_config.filter_collection.filter_room_timeline( events ) - loaded_recents = yield self._filter_events_for_client( + loaded_recents = yield self.filter_events_for_client( sync_config.user.to_string(), loaded_recents, ) diff --git a/synapse/notifier.py b/synapse/notifier.py index 6af7a8f424..cb58dfffd4 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -399,7 +399,7 @@ class Notifier(object): if name == "room": room_member_handler = self.hs.get_handlers().room_member_handler - new_events = yield room_member_handler._filter_events_for_client( + new_events = yield room_member_handler.filter_events_for_client( user.to_string(), new_events, is_peeking=is_peeking, diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py new file mode 100644 index 0000000000..dcbee4c3fe --- /dev/null +++ b/synapse/push/emailpusher.py @@ -0,0 +1,261 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer, reactor + +import logging + +from synapse.util.metrics import Measure +from synapse.util.logcontext import LoggingContext + +from mailer import Mailer + +logger = logging.getLogger(__name__) + +# The amount of time we always wait before ever emailing about a notification +# (to give the user a chance to respond to other push or notice the window) +DELAY_BEFORE_MAIL_MS = 2 * 60 * 1000 + +THROTTLE_START_MS = 2 * 60 * 1000 +THROTTLE_MAX_MS = (2 * 60 * 1000) * (2 ** 11) # ~3 days + +# If no event triggers a notification for this long after the previous, +# the throttle is released. +THROTTLE_RESET_AFTER_MS = (2 * 60 * 1000) * (2 ** 11) # ~3 days + + +class EmailPusher(object): + """ + A pusher that sends email notifications about events (approximately) + when they happen. + This shares quite a bit of code with httpusher: it would be good to + factor out the common parts + """ + def __init__(self, hs, pusherdict): + self.hs = hs + self.store = self.hs.get_datastore() + self.clock = self.hs.get_clock() + self.pusher_id = pusherdict['id'] + self.user_id = pusherdict['user_name'] + self.app_id = pusherdict['app_id'] + self.email = pusherdict['pushkey'] + self.last_stream_ordering = pusherdict['last_stream_ordering'] + self.timed_call = None + self.throttle_params = None + + # See httppusher + self.max_stream_ordering = None + + self.processing = False + + if self.hs.config.email_enable_notifs: + self.mailer = Mailer(self.hs) + else: + self.mailer = None + + @defer.inlineCallbacks + def on_started(self): + if self.mailer is not None: + self.throttle_params = yield self.store.get_throttle_params_by_room( + self.pusher_id + ) + yield self._process() + + def on_stop(self): + if self.timed_call: + self.timed_call.cancel() + + @defer.inlineCallbacks + def on_new_notifications(self, min_stream_ordering, max_stream_ordering): + self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) + yield self._process() + + def on_new_receipts(self, min_stream_id, max_stream_id): + # We could wake up and cancel the timer but there tend to be quite a + # lot of read receipts so it's probably less work to just let the + # timer fire + return defer.succeed(None) + + @defer.inlineCallbacks + def on_timer(self): + self.timed_call = None + yield self._process() + + @defer.inlineCallbacks + def _process(self): + if self.processing: + return + + with LoggingContext("emailpush._process"): + with Measure(self.clock, "emailpush._process"): + try: + self.processing = True + # if the max ordering changes while we're running _unsafe_process, + # call it again, and so on until we've caught up. + while True: + starting_max_ordering = self.max_stream_ordering + try: + yield self._unsafe_process() + except: + logger.exception("Exception processing notifs") + if self.max_stream_ordering == starting_max_ordering: + break + finally: + self.processing = False + + @defer.inlineCallbacks + def _unsafe_process(self): + """ + Main logic of the push loop without the wrapper function that sets + up logging, measures and guards against multiple instances of it + being run. + """ + last_notifs = yield self.store.get_time_of_latest_push_action_by_room_for_user( + self.user_id + ) + + unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( + self.user_id, self.last_stream_ordering, self.max_stream_ordering + ) + + soonest_due_at = None + + for push_action in unprocessed: + received_at = push_action['received_ts'] + if received_at is None: + received_at = 0 + notif_ready_at = received_at + DELAY_BEFORE_MAIL_MS + + room_ready_at = self.room_ready_to_notify_at( + push_action['room_id'], self.get_room_last_notif_ts( + last_notifs, push_action['room_id'] + ) + ) + + should_notify_at = max(notif_ready_at, room_ready_at) + + if should_notify_at < self.clock.time_msec(): + # one of our notifications is ready for sending, so we send + # *one* email updating the user on their notifications, + # we then consider all previously outstanding notifications + # to be delivered. + yield self.send_notification(unprocessed) + + yield self.save_last_stream_ordering_and_success(max([ + ea['stream_ordering'] for ea in unprocessed + ])) + yield self.sent_notif_update_throttle( + push_action['room_id'], push_action + ) + else: + if soonest_due_at is None or should_notify_at < soonest_due_at: + soonest_due_at = should_notify_at + + if self.timed_call is not None: + self.timed_call.cancel() + self.timed_call = None + + if soonest_due_at is not None: + self.timed_call = reactor.callLater( + self.seconds_until(soonest_due_at), self.on_timer + ) + + @defer.inlineCallbacks + def save_last_stream_ordering_and_success(self, last_stream_ordering): + self.last_stream_ordering = last_stream_ordering + yield self.store.update_pusher_last_stream_ordering_and_success( + self.app_id, self.email, self.user_id, + last_stream_ordering, self.clock.time_msec() + ) + + def seconds_until(self, ts_msec): + return (ts_msec - self.clock.time_msec()) / 1000 + + def get_room_last_notif_ts(self, last_notif_by_room, room_id): + if room_id in last_notif_by_room: + return last_notif_by_room[room_id] + else: + return 0 + + def get_room_throttle_ms(self, room_id): + if room_id in self.throttle_params: + return self.throttle_params[room_id]["throttle_ms"] + else: + return 0 + + def get_room_last_sent_ts(self, room_id): + if room_id in self.throttle_params: + return self.throttle_params[room_id]["last_sent_ts"] + else: + return 0 + + def room_ready_to_notify_at(self, room_id, last_notif_time): + """ + Determines whether throttling should prevent us from sending an email + for the given room + Returns: True if we should send, False if we should not + """ + last_sent_ts = self.get_room_last_sent_ts(room_id) + throttle_ms = self.get_room_throttle_ms(room_id) + + may_send_at = last_sent_ts + throttle_ms + return may_send_at + + @defer.inlineCallbacks + def sent_notif_update_throttle(self, room_id, notified_push_action): + # We have sent a notification, so update the throttle accordingly. + # If the event that triggered the notif happened more than + # THROTTLE_RESET_AFTER_MS after the previous one that triggered a + # notif, we release the throttle. Otherwise, the throttle is increased. + time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before( + notified_push_action['stream_ordering'] + ) + + time_of_this_notifs = notified_push_action['received_ts'] + + if time_of_previous_notifs is not None and time_of_this_notifs is not None: + gap = time_of_this_notifs - time_of_previous_notifs + else: + # if we don't know the arrival time of one of the notifs (it was not + # stored prior to email notification code) then assume a gap of + # zero which will just not reset the throttle + gap = 0 + + current_throttle_ms = self.get_room_throttle_ms(room_id) + + if gap > THROTTLE_RESET_AFTER_MS: + new_throttle_ms = THROTTLE_START_MS + else: + if current_throttle_ms == 0: + new_throttle_ms = THROTTLE_START_MS + else: + new_throttle_ms = min( + current_throttle_ms * 2, + THROTTLE_MAX_MS + ) + self.throttle_params[room_id] = { + "last_sent_ts": self.clock.time_msec(), + "throttle_ms": new_throttle_ms + } + yield self.store.set_throttle_params( + self.pusher_id, room_id, self.throttle_params[room_id] + ) + + @defer.inlineCallbacks + def send_notification(self, push_actions): + logger.info("Sending notif email for user %r", self.user_id) + yield self.mailer.send_notification_mail( + self.user_id, self.email, push_actions + ) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py new file mode 100644 index 0000000000..c53ae9a547 --- /dev/null +++ b/synapse/push/mailer.py @@ -0,0 +1,373 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer +from twisted.mail.smtp import sendmail + +import email.utils +import email.mime.multipart +from email.mime.text import MIMEText + +from synapse.util.async import concurrently_execute +from synapse.util.presentable_names import ( + calculate_room_name, name_from_member_event, descriptor_from_member_events +) +from synapse.types import UserID +from synapse.api.errors import StoreError + +import jinja2 +import bleach + +import time +import urllib + + +MESSAGE_FROM_PERSON_IN_ROOM = "You have a message from %s in the %s room" +MESSAGE_FROM_PERSON = "You have a message from %s" +MESSAGES_FROM_PERSON = "You have messages from %s" +MESSAGES_IN_ROOM = "There are some messages for you in the %s room" +MESSAGES_IN_ROOMS = "Here are some messages you may have missed" +INVITE_FROM_PERSON_TO_ROOM = "%s has invited you to join the %s room" +INVITE_FROM_PERSON = "%s has invited you to chat" + +CONTEXT_BEFORE = 1 + +# From https://github.com/matrix-org/matrix-react-sdk/blob/master/src/HtmlUtils.js +ALLOWED_TAGS = [ + 'font', # custom to matrix for IRC-style font coloring + 'del', # for markdown + # deliberately no h1/h2 to stop people shouting. + 'h3', 'h4', 'h5', 'h6', 'blockquote', 'p', 'a', 'ul', 'ol', + 'nl', 'li', 'b', 'i', 'u', 'strong', 'em', 'strike', 'code', 'hr', 'br', 'div', + 'table', 'thead', 'caption', 'tbody', 'tr', 'th', 'td', 'pre' +] +ALLOWED_ATTRS = { + # custom ones first: + "font": ["color"], # custom to matrix + "a": ["href", "name", "target"], # remote target: custom to matrix + # We don't currently allow img itself by default, but this + # would make sense if we did + "img": ["src"], +} +# When bleach release a version with this option, we can specify schemes +# ALLOWED_SCHEMES = ["http", "https", "ftp", "mailto"] + + +class Mailer(object): + def __init__(self, hs): + self.hs = hs + self.store = self.hs.get_datastore() + self.state_handler = self.hs.get_state_handler() + loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir) + env = jinja2.Environment(loader=loader) + env.filters["format_ts"] = format_ts_filter + env.filters["mxc_to_http"] = self.mxc_to_http_filter + self.notif_template = env.get_template(self.hs.config.email_notif_template_html) + + @defer.inlineCallbacks + def send_notification_mail(self, user_id, email_address, push_actions): + raw_from = email.utils.parseaddr(self.hs.config.email_notif_from)[1] + raw_to = email.utils.parseaddr(email_address)[1] + + if raw_to == '': + raise RuntimeError("Invalid 'to' address") + + rooms_in_order = deduped_ordered_list( + [pa['room_id'] for pa in push_actions] + ) + + notif_events = yield self.store.get_events( + [pa['event_id'] for pa in push_actions] + ) + + notifs_by_room = {} + for pa in push_actions: + notifs_by_room.setdefault(pa["room_id"], []).append(pa) + + # collect the current state for all the rooms in which we have + # notifications + state_by_room = {} + + try: + user_display_name = yield self.store.get_profile_displayname( + UserID.from_string(user_id).localpart + ) + except StoreError: + user_display_name = user_id + + @defer.inlineCallbacks + def _fetch_room_state(room_id): + room_state = yield self.state_handler.get_current_state(room_id) + state_by_room[room_id] = room_state + + # Run at most 3 of these at once: sync does 10 at a time but email + # notifs are much realtime than sync so we can afford to wait a bit. + yield concurrently_execute(_fetch_room_state, rooms_in_order, 3) + + rooms = [] + + for r in rooms_in_order: + vars = yield self.get_room_vars( + r, user_id, notifs_by_room[r], notif_events, state_by_room[r] + ) + rooms.append(vars) + + summary_text = self.make_summary_text( + notifs_by_room, state_by_room, notif_events, user_id + ) + + template_vars = { + "user_display_name": user_display_name, + "unsubscribe_link": self.make_unsubscribe_link(), + "summary_text": summary_text, + "rooms": rooms, + } + + plainText = self.notif_template.render(**template_vars) + + text_part = MIMEText(plainText, "html", "utf8") + text_part['Subject'] = "New Matrix Notifications" + text_part['From'] = self.hs.config.email_notif_from + text_part['To'] = email_address + + yield sendmail( + self.hs.config.email_smtp_host, + raw_from, raw_to, text_part.as_string(), + port=self.hs.config.email_smtp_port + ) + + @defer.inlineCallbacks + def get_room_vars(self, room_id, user_id, notifs, notif_events, room_state): + my_member_event = room_state[("m.room.member", user_id)] + is_invite = my_member_event.content["membership"] == "invite" + + room_vars = { + "title": calculate_room_name(room_state, user_id), + "hash": string_ordinal_total(room_id), # See sender avatar hash + "notifs": [], + "invite": is_invite, + "link": self.make_room_link(room_id), + } + + if not is_invite: + for n in notifs: + vars = yield self.get_notif_vars( + n, user_id, notif_events[n['event_id']], room_state + ) + room_vars['notifs'].append(vars) + + defer.returnValue(room_vars) + + @defer.inlineCallbacks + def get_notif_vars(self, notif, user_id, notif_event, room_state): + results = yield self.store.get_events_around( + notif['room_id'], notif['event_id'], + before_limit=CONTEXT_BEFORE, after_limit=0 + ) + + ret = { + "link": self.make_notif_link(notif), + "ts": notif['received_ts'], + "messages": [], + } + + handler = self.hs.get_handlers().message_handler + the_events = yield handler.filter_events_for_client( + user_id, results["events_before"] + ) + the_events.append(notif_event) + + for event in the_events: + vars = self.get_message_vars(notif, event, room_state) + if vars is not None: + ret['messages'].append(vars) + + defer.returnValue(ret) + + def get_message_vars(self, notif, event, room_state): + if event.type != "m.room.message": + return None + + sender_state_event = room_state[("m.room.member", event.sender)] + sender_name = name_from_member_event(sender_state_event) + sender_avatar_url = sender_state_event.content["avatar_url"] + + # 'hash' for deterministically picking default images: use + # sender_hash % the number of default images to choose from + sender_hash = string_ordinal_total(event.sender) + + ret = { + "msgtype": event.content["msgtype"], + "is_historical": event.event_id != notif['event_id'], + "ts": event.origin_server_ts, + "sender_name": sender_name, + "sender_avatar_url": sender_avatar_url, + "sender_hash": sender_hash, + } + + if event.content["msgtype"] == "m.text": + self.add_text_message_vars(ret, event) + elif event.content["msgtype"] == "m.image": + self.add_image_message_vars(ret, event) + + if "body" in event.content: + ret["body_text_plain"] = event.content["body"] + + return ret + + def add_text_message_vars(self, vars, event): + if "format" in event.content: + msgformat = event.content["format"] + else: + msgformat = None + vars["format"] = msgformat + + if msgformat == "org.matrix.custom.html": + vars["body_text_html"] = safe_markup(event.content["formatted_body"]) + else: + vars["body_text_html"] = safe_text(event.content["body"]) + + return vars + + def add_image_message_vars(self, vars, event): + vars["image_url"] = event.content["url"] + + return vars + + def make_summary_text(self, notifs_by_room, state_by_room, notif_events, user_id): + if len(notifs_by_room) == 1: + # Only one room has new stuff + room_id = notifs_by_room.keys()[0] + + # If the room has some kind of name, use it, but we don't + # want the generated-from-names one here otherwise we'll + # end up with, "new message from Bob in the Bob room" + room_name = calculate_room_name( + state_by_room[room_id], user_id, fallback_to_members=False + ) + + my_member_event = state_by_room[room_id][("m.room.member", user_id)] + if my_member_event.content["membership"] == "invite": + inviter_member_event = state_by_room[room_id][ + ("m.room.member", my_member_event.sender) + ] + inviter_name = name_from_member_event(inviter_member_event) + + if room_name is None: + return INVITE_FROM_PERSON % (inviter_name,) + else: + return INVITE_FROM_PERSON_TO_ROOM % (inviter_name, room_name) + + sender_name = None + if len(notifs_by_room[room_id]) == 1: + # There is just the one notification, so give some detail + event = notif_events[notifs_by_room[room_id][0]["event_id"]] + if ("m.room.member", event.sender) in state_by_room[room_id]: + state_event = state_by_room[room_id][("m.room.member", event.sender)] + sender_name = name_from_member_event(state_event) + + if sender_name is not None and room_name is not None: + return MESSAGE_FROM_PERSON_IN_ROOM % (sender_name, room_name) + elif sender_name is not None: + return MESSAGE_FROM_PERSON % (sender_name,) + else: + # There's more than one notification for this room, so just + # say there are several + if room_name is not None: + return MESSAGES_IN_ROOM % (room_name,) + else: + # If the room doesn't have a name, say who the messages + # are from explicitly to avoid, "messages in the Bob room" + sender_ids = list(set([ + notif_events[n['event_id']].sender + for n in notifs_by_room[room_id] + ])) + + return MESSAGES_FROM_PERSON % ( + descriptor_from_member_events([ + state_by_room[room_id][("m.room.member", s)] + for s in sender_ids + ]) + ) + else: + # Stuff's happened in multiple different rooms + return MESSAGES_IN_ROOMS + + def make_room_link(self, room_id): + return "https://matrix.to/%s" % (room_id,) + + def make_notif_link(self, notif): + return "https://matrix.to/%s/%s" % ( + notif['room_id'], notif['event_id'] + ) + + def make_unsubscribe_link(self): + return "https://vector.im/#/settings" # XXX: matrix.to + + def mxc_to_http_filter(self, value, width, height, resize_method="crop"): + if value[0:6] != "mxc://": + return "" + serverAndMediaId = value[6:] + params = { + "width": width, + "height": height, + "method": resize_method, + } + return "%s_matrix/media/v1/thumbnail/%s?%s" % ( + self.hs.config.public_baseurl, + serverAndMediaId, + urllib.urlencode(params) + ) + + +def safe_markup(raw_html): + return jinja2.Markup(bleach.linkify(bleach.clean( + raw_html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRS, + # bleach master has this, but it isn't released yet + # protocols=ALLOWED_SCHEMES, + strip=True + ))) + + +def safe_text(raw_text): + """ + Process text: treat it as HTML but escape any tags (ie. just escape the + HTML) then linkify it. + """ + return jinja2.Markup(bleach.linkify(bleach.clean( + raw_text, tags=[], attributes={}, + strip=False + ))) + + +def deduped_ordered_list(l): + seen = set() + ret = [] + for item in l: + if item not in seen: + seen.add(item) + ret.append(item) + return ret + + +def string_ordinal_total(s): + tot = 0 + for c in s: + tot += ord(c) + return tot + + +def format_ts_filter(value, format): + return time.strftime(format, time.localtime(value / 1000)) diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py index 4960837504..25a45af775 100644 --- a/synapse/push/pusher.py +++ b/synapse/push/pusher.py @@ -1,10 +1,14 @@ from httppusher import HttpPusher -PUSHER_TYPES = { - 'http': HttpPusher -} - def create_pusher(hs, pusherdict): + PUSHER_TYPES = { + "http": HttpPusher, + } + + if hs.config.email_enable_notifs: + from emailpusher import EmailPusher + PUSHER_TYPES["email"] = EmailPusher + if pusherdict['kind'] in PUSHER_TYPES: return PUSHER_TYPES[pusherdict['kind']](hs, pusherdict) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 0eb3d6c1de..e0a7a19777 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -44,6 +44,10 @@ CONDITIONAL_REQUIREMENTS = { "preview_url": { "netaddr>=0.7.18": ["netaddr"], }, + "email.enable_notifs": { + "Jinja2>=2.8": ["Jinja2>=2.8"], + "bleach>=1.4.2": ["bleach>=1.4.2"], + }, } diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 86a98b6f11..f2af8bdb36 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -118,15 +118,17 @@ class EventPushActionsStore(SQLBaseStore): max_stream_ordering=None): def get_after_receipt(txn): sql = ( - "SELECT ep.event_id, ep.stream_ordering, ep.actions " + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " + "e.received_ts " "FROM event_push_actions AS ep, (" - " SELECT room_id, user_id," - " max(topological_ordering) as topological_ordering," - " max(stream_ordering) as stream_ordering" + " SELECT room_id, user_id, " + " max(topological_ordering) as topological_ordering, " + " max(stream_ordering) as stream_ordering " " FROM events" " NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'" " GROUP BY room_id, user_id" ") AS rl " + "NATURAL JOIN events e " "WHERE" " ep.room_id = rl.room_id" " AND (" @@ -153,8 +155,10 @@ class EventPushActionsStore(SQLBaseStore): def get_no_receipt(txn): sql = ( - "SELECT ep.event_id, ep.stream_ordering, ep.actions " + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " + "e.received_ts " "FROM event_push_actions AS ep " + "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id " "WHERE ep.room_id not in (" " SELECT room_id FROM events NATURAL JOIN receipts_linearized" " WHERE receipt_type = 'm.read' AND user_id = ? " @@ -175,12 +179,30 @@ class EventPushActionsStore(SQLBaseStore): defer.returnValue([ { "event_id": row[0], - "stream_ordering": row[1], - "actions": json.loads(row[2]), + "room_id": row[1], + "stream_ordering": row[2], + "actions": json.loads(row[3]), + "received_ts": row[4], } for row in after_read_receipt + no_read_receipt ]) @defer.inlineCallbacks + def get_time_of_last_push_action_before(self, stream_ordering): + def f(txn): + sql = ( + "SELECT e.received_ts " + "FROM event_push_actions AS ep " + "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id " + "WHERE ep.stream_ordering > ? " + "ORDER BY ep.stream_ordering ASC " + "LIMIT 1" + ) + txn.execute(sql, (stream_ordering,)) + return txn.fetchone() + result = yield self.runInteraction("get_time_of_last_push_action_before", f) + defer.returnValue(result[0] if result is not None else None) + + @defer.inlineCallbacks def get_latest_push_action_stream_ordering(self): def f(txn): txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") @@ -190,6 +212,26 @@ class EventPushActionsStore(SQLBaseStore): ) defer.returnValue(result[0] or 0) + @defer.inlineCallbacks + def get_time_of_latest_push_action_by_room_for_user(self, user_id): + """ + Returns only the received_ts of the last notification in each of the + user's rooms, in a dict by room_id + """ + def f(txn): + txn.execute( + "SELECT ep.room_id, MAX(e.received_ts) " + "FROM event_push_actions AS ep " + "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id " + "GROUP BY ep.room_id" + ) + return txn.fetchall() + result = yield self.runInteraction( + "get_time_of_latest_push_action_by_room_for_user", f + ) + + defer.returnValue({row[0]: row[1] for row in result}) + def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id): # Sad that we have to blow away the cache for the whole room here txn.call_after( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0307b2af3c..438eef6ba3 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -55,6 +55,7 @@ class EventsStore(SQLBaseStore): def __init__(self, hs): super(EventsStore, self).__init__(hs) + self._clock = hs.get_clock() self.register_background_update_handler( self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts ) @@ -427,6 +428,7 @@ class EventsStore(SQLBaseStore): "outlier": event.internal_metadata.is_outlier(), "content": encode_json(event.content).decode("UTF-8"), "origin_server_ts": int(event.origin_server_ts), + "received_ts": self._clock.time_msec(), } for event, _ in events_and_contexts ], diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 11feb3eb11..d9afd7ec87 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -233,3 +233,30 @@ class PusherStore(SQLBaseStore): {'failing_since': failing_since}, desc="update_pusher_failing_since", ) + + @defer.inlineCallbacks + def get_throttle_params_by_room(self, pusher_id): + res = yield self._simple_select_list( + "pusher_throttle", + {"pusher": pusher_id}, + ["room_id", "last_sent_ts", "throttle_ms"], + desc="get_throttle_params_by_room" + ) + + params_by_room = {} + for row in res: + params_by_room[row["room_id"]] = { + "last_sent_ts": row["last_sent_ts"], + "throttle_ms": row["throttle_ms"] + } + + defer.returnValue(params_by_room) + + @defer.inlineCallbacks + def set_throttle_params(self, pusher_id, room_id, params): + yield self._simple_upsert( + "pusher_throttle", + {"pusher": pusher_id, "room_id": room_id}, + params, + desc="set_throttle_params" + ) diff --git a/synapse/storage/schema/delta/31/events.sql b/synapse/storage/schema/delta/31/events.sql new file mode 100644 index 0000000000..1dd0f9e170 --- /dev/null +++ b/synapse/storage/schema/delta/31/events.sql @@ -0,0 +1,16 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE events ADD COLUMN received_ts BIGINT; diff --git a/synapse/storage/schema/delta/31/pusher_throttle.sql b/synapse/storage/schema/delta/31/pusher_throttle.sql new file mode 100644 index 0000000000..d86d30c13c --- /dev/null +++ b/synapse/storage/schema/delta/31/pusher_throttle.sql @@ -0,0 +1,23 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE pusher_throttle( + pusher BIGINT NOT NULL, + room_id TEXT NOT NULL, + last_sent_ts BIGINT, + throttle_ms BIGINT, + PRIMARY KEY (pusher, room_id) +); diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py new file mode 100644 index 0000000000..f80a7fe58e --- /dev/null +++ b/synapse/util/presentable_names.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re + +# intentionally looser than what aliases we allow to be registered since +# other HSes may allow aliases that we would not +ALIAS_RE = re.compile(r"^#.*:.+$") + +ALL_ALONE = "Empty Room" + + +def calculate_room_name(room_state, user_id, fallback_to_members=True): + # does it have a name? + if ("m.room.name", "") in room_state: + m_room_name = room_state[("m.room.name", "")] + if m_room_name.content and m_room_name.content["name"]: + return m_room_name.content["name"] + + # does it have a canonical alias? + if ("m.room.canonical_alias", "") in room_state: + canon_alias = room_state[("m.room.canonical_alias", "")] + if ( + canon_alias.content and canon_alias.content["alias"] and + _looks_like_an_alias(canon_alias.content["alias"]) + ): + return canon_alias.content["alias"] + + # at this point we're going to need to search the state by all state keys + # for an event type, so rearrange the data structure + room_state_bytype = _state_as_two_level_dict(room_state) + + # right then, any aliases at all? + if "m.room.aliases" in room_state_bytype: + m_room_aliases = room_state_bytype["m.room.aliases"] + if len(m_room_aliases.values()) > 0: + first_alias_event = m_room_aliases.values()[0] + if first_alias_event.content and first_alias_event.content["aliases"]: + the_aliases = first_alias_event.content["aliases"] + if len(the_aliases) > 0 and _looks_like_an_alias(the_aliases[0]): + return the_aliases[0] + + if not fallback_to_members: + return None + + my_member_event = None + if ("m.room.member", user_id) in room_state: + my_member_event = room_state[("m.room.member", user_id)] + + if ( + my_member_event is not None and + my_member_event.content['membership'] == "invite" + ): + if ("m.room.member", my_member_event.sender) in room_state: + inviter_member_event = room_state[("m.room.member", my_member_event.sender)] + return "Invite from %s" % (name_from_member_event(inviter_member_event),) + else: + return "Room Invite" + + # we're going to have to generate a name based on who's in the room, + # so find out who is in the room that isn't the user. + if "m.room.member" in room_state_bytype: + all_members = [ + ev for ev in room_state_bytype["m.room.member"].values() + if ev.content['membership'] == "join" or ev.content['membership'] == "invite" + ] + # Sort the member events oldest-first so the we name people in the + # order the joined (it should at least be deterministic rather than + # dictionary iteration order) + all_members.sort(key=lambda e: e.origin_server_ts) + other_members = [m for m in all_members if m.state_key != user_id] + else: + other_members = [] + all_members = [] + + if len(other_members) == 0: + if len(all_members) == 1: + # self-chat, peeked room with 1 participant, + # or inbound invite, or outbound 3PID invite. + if all_members[0].sender == user_id: + if "m.room.third_party_invite" in room_state_bytype: + third_party_invites = room_state_bytype["m.room.third_party_invite"] + if len(third_party_invites) > 0: + # technically third party invite events are not member + # events, but they are close enough + return "Inviting %s" ( + descriptor_from_member_events(third_party_invites) + ) + else: + return ALL_ALONE + else: + return name_from_member_event(all_members[0]) + else: + return ALL_ALONE + else: + return descriptor_from_member_events(other_members) + + +def descriptor_from_member_events(member_events): + if len(member_events) == 0: + return "nobody" + elif len(member_events) == 1: + return name_from_member_event(member_events[0]) + elif len(member_events) == 2: + return "%s and %s" % ( + name_from_member_event(member_events[0]), + name_from_member_event(member_events[1]), + ) + else: + return "%s and %d others" % ( + name_from_member_event(member_events[0]), + len(member_events) - 1, + ) + + +def name_from_member_event(member_event): + if ( + member_event.content and "displayname" in member_event.content and + member_event.content["displayname"] + ): + return member_event.content["displayname"] + return member_event.state_key + + +def _state_as_two_level_dict(state): + ret = {} + for k, v in state.items(): + ret.setdefault(k[0], {})[k[1]] = v + return ret + + +def _looks_like_an_alias(string): + return ALIAS_RE.match(string) is not None \ No newline at end of file |