diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
new file mode 100644
index 0000000000..06b076e3f9
--- /dev/null
+++ b/synapse/config/emailconfig.py
@@ -0,0 +1,84 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file can't be called email.py because if it is, we cannot:
+import email.utils
+
+from ._base import Config
+
+
+class EmailConfig(Config):
+ """
+ Email Configuration
+ """
+
+ def read_config(self, config):
+ self.email_enable_notifs = False
+
+ email_config = config.get("email", None)
+ if email_config:
+ self.email_enable_notifs = email_config.get("enable_notifs", True)
+
+ if self.email_enable_notifs:
+ required = [
+ "smtp_host",
+ "smtp_port",
+ "notif_from",
+ "template_dir",
+ "notif_template_html",
+ ]
+
+ missing = []
+ for k in required:
+ if k not in email_config:
+ missing.append(k)
+
+ if (len(missing) > 0):
+ raise RuntimeError(
+ "email.enable_notifs is True but required keys are missing: %s" %
+ (", ".join(["email." + k for k in missing]),)
+ )
+
+ if config.get("public_baseurl") is None:
+ raise RuntimeError(
+ "email.enable_notifs is True but no public_baseurl is set"
+ )
+
+ self.email_smtp_host = email_config["smtp_host"]
+ self.email_smtp_port = email_config["smtp_port"]
+ self.email_notif_from = email_config["notif_from"]
+ self.email_template_dir = email_config["template_dir"]
+ self.email_notif_template_html = email_config["notif_template_html"]
+
+ # make sure it's valid
+ parsed = email.utils.parseaddr(self.email_notif_from)
+ if parsed[1] == '':
+ raise RuntimeError("Invalid notif_from address")
+ else:
+ self.email_enable_notifs = False
+ # Not much point setting defaults for the rest: it would be an
+ # error for them to be used.
+
+ def default_config(self, config_dir_path, server_name, **kwargs):
+ return """
+ # Enable sending emails for notification events
+ #email_config:
+ # enable_notifs: false
+ # smtp_host: "localhost"
+ # smtp_port: 25
+ # notif_from: Your Friendly Matrix Home Server <noreply@example.com>
+ # template_dir: res/templates
+ # notif_template_html: notif.html
+ """
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 9a80ac39ec..fc2445484c 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -31,13 +31,14 @@ from .cas import CasConfig
from .password import PasswordConfig
from .jwt import JWTConfig
from .ldap import LDAPConfig
+from .emailconfig import EmailConfig
class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
- JWTConfig, LDAPConfig, PasswordConfig,):
+ JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,):
pass
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 46c633548a..04b9221908 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -28,6 +28,11 @@ class ServerConfig(Config):
self.print_pidfile = config.get("print_pidfile")
self.user_agent_suffix = config.get("user_agent_suffix")
self.use_frozen_dicts = config.get("use_frozen_dicts", True)
+ self.public_baseurl = config.get("public_baseurl")
+
+ if self.public_baseurl is not None:
+ if self.public_baseurl[-1] != '/':
+ self.public_baseurl += '/'
self.start_pushers = config.get("start_pushers", True)
self.listeners = config.get("listeners", [])
@@ -143,6 +148,9 @@ class ServerConfig(Config):
# Whether to serve a web client from the HTTP/HTTPS root resource.
web_client: True
+ # The server's public-facing base URL
+ # https://example.com:8448/
+
# Set the soft limit on the number of file descriptors synapse can use
# Zero is used to indicate synapse should set the soft limit to the
# hard limit.
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 13a675b208..134729069a 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -192,7 +192,7 @@ class BaseHandler(object):
})
@defer.inlineCallbacks
- def _filter_events_for_client(self, user_id, events, is_peeking=False):
+ def filter_events_for_client(self, user_id, events, is_peeking=False):
"""
Check which events a user is allowed to see
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index f51feda2f4..7d9e3cf364 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -123,7 +123,7 @@ class MessageHandler(BaseHandler):
"end": next_token.to_string(),
})
- events = yield self._filter_events_for_client(
+ events = yield self.filter_events_for_client(
user_id,
events,
is_peeking=(member_event_id is None),
@@ -483,7 +483,7 @@ class MessageHandler(BaseHandler):
]
).addErrback(unwrapFirstError)
- messages = yield self._filter_events_for_client(
+ messages = yield self.filter_events_for_client(
user_id, messages
)
@@ -619,7 +619,7 @@ class MessageHandler(BaseHandler):
end_token=stream_token
)
- messages = yield self._filter_events_for_client(
+ messages = yield self.filter_events_for_client(
user_id, messages, is_peeking=is_peeking
)
@@ -700,7 +700,7 @@ class MessageHandler(BaseHandler):
consumeErrors=True,
).addErrback(unwrapFirstError)
- messages = yield self._filter_events_for_client(
+ messages = yield self.filter_events_for_client(
user_id, messages, is_peeking=is_peeking,
)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index dd9c18df84..fdebc9c438 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -449,7 +449,7 @@ class RoomContextHandler(BaseHandler):
now_token = yield self.hs.get_event_sources().get_current_token()
def filter_evts(events):
- return self._filter_events_for_client(
+ return self.filter_events_for_client(
user.to_string(),
events,
is_peeking=is_guest)
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 9937d8dd7f..a937e87408 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -172,7 +172,7 @@ class SearchHandler(BaseHandler):
filtered_events = search_filter.filter([r["event"] for r in results])
- events = yield self._filter_events_for_client(
+ events = yield self.filter_events_for_client(
user.to_string(), filtered_events
)
@@ -223,7 +223,7 @@ class SearchHandler(BaseHandler):
r["event"] for r in results
])
- events = yield self._filter_events_for_client(
+ events = yield self.filter_events_for_client(
user.to_string(), filtered_events
)
@@ -281,11 +281,11 @@ class SearchHandler(BaseHandler):
event.room_id, event.event_id, before_limit, after_limit
)
- res["events_before"] = yield self._filter_events_for_client(
+ res["events_before"] = yield self.filter_events_for_client(
user.to_string(), res["events_before"]
)
- res["events_after"] = yield self._filter_events_for_client(
+ res["events_after"] = yield self.filter_events_for_client(
user.to_string(), res["events_after"]
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 231140b655..b51bb651ec 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -681,7 +681,7 @@ class SyncHandler(BaseHandler):
if recents is not None:
recents = sync_config.filter_collection.filter_room_timeline(recents)
- recents = yield self._filter_events_for_client(
+ recents = yield self.filter_events_for_client(
sync_config.user.to_string(),
recents,
)
@@ -702,7 +702,7 @@ class SyncHandler(BaseHandler):
loaded_recents = sync_config.filter_collection.filter_room_timeline(
events
)
- loaded_recents = yield self._filter_events_for_client(
+ loaded_recents = yield self.filter_events_for_client(
sync_config.user.to_string(),
loaded_recents,
)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 6af7a8f424..cb58dfffd4 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -399,7 +399,7 @@ class Notifier(object):
if name == "room":
room_member_handler = self.hs.get_handlers().room_member_handler
- new_events = yield room_member_handler._filter_events_for_client(
+ new_events = yield room_member_handler.filter_events_for_client(
user.to_string(),
new_events,
is_peeking=is_peeking,
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
new file mode 100644
index 0000000000..dcbee4c3fe
--- /dev/null
+++ b/synapse/push/emailpusher.py
@@ -0,0 +1,261 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer, reactor
+
+import logging
+
+from synapse.util.metrics import Measure
+from synapse.util.logcontext import LoggingContext
+
+from mailer import Mailer
+
+logger = logging.getLogger(__name__)
+
+# The amount of time we always wait before ever emailing about a notification
+# (to give the user a chance to respond to other push or notice the window)
+DELAY_BEFORE_MAIL_MS = 2 * 60 * 1000
+
+THROTTLE_START_MS = 2 * 60 * 1000
+THROTTLE_MAX_MS = (2 * 60 * 1000) * (2 ** 11) # ~3 days
+
+# If no event triggers a notification for this long after the previous,
+# the throttle is released.
+THROTTLE_RESET_AFTER_MS = (2 * 60 * 1000) * (2 ** 11) # ~3 days
+
+
+class EmailPusher(object):
+ """
+ A pusher that sends email notifications about events (approximately)
+ when they happen.
+ This shares quite a bit of code with httpusher: it would be good to
+ factor out the common parts
+ """
+ def __init__(self, hs, pusherdict):
+ self.hs = hs
+ self.store = self.hs.get_datastore()
+ self.clock = self.hs.get_clock()
+ self.pusher_id = pusherdict['id']
+ self.user_id = pusherdict['user_name']
+ self.app_id = pusherdict['app_id']
+ self.email = pusherdict['pushkey']
+ self.last_stream_ordering = pusherdict['last_stream_ordering']
+ self.timed_call = None
+ self.throttle_params = None
+
+ # See httppusher
+ self.max_stream_ordering = None
+
+ self.processing = False
+
+ if self.hs.config.email_enable_notifs:
+ self.mailer = Mailer(self.hs)
+ else:
+ self.mailer = None
+
+ @defer.inlineCallbacks
+ def on_started(self):
+ if self.mailer is not None:
+ self.throttle_params = yield self.store.get_throttle_params_by_room(
+ self.pusher_id
+ )
+ yield self._process()
+
+ def on_stop(self):
+ if self.timed_call:
+ self.timed_call.cancel()
+
+ @defer.inlineCallbacks
+ def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
+ self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
+ yield self._process()
+
+ def on_new_receipts(self, min_stream_id, max_stream_id):
+ # We could wake up and cancel the timer but there tend to be quite a
+ # lot of read receipts so it's probably less work to just let the
+ # timer fire
+ return defer.succeed(None)
+
+ @defer.inlineCallbacks
+ def on_timer(self):
+ self.timed_call = None
+ yield self._process()
+
+ @defer.inlineCallbacks
+ def _process(self):
+ if self.processing:
+ return
+
+ with LoggingContext("emailpush._process"):
+ with Measure(self.clock, "emailpush._process"):
+ try:
+ self.processing = True
+ # if the max ordering changes while we're running _unsafe_process,
+ # call it again, and so on until we've caught up.
+ while True:
+ starting_max_ordering = self.max_stream_ordering
+ try:
+ yield self._unsafe_process()
+ except:
+ logger.exception("Exception processing notifs")
+ if self.max_stream_ordering == starting_max_ordering:
+ break
+ finally:
+ self.processing = False
+
+ @defer.inlineCallbacks
+ def _unsafe_process(self):
+ """
+ Main logic of the push loop without the wrapper function that sets
+ up logging, measures and guards against multiple instances of it
+ being run.
+ """
+ last_notifs = yield self.store.get_time_of_latest_push_action_by_room_for_user(
+ self.user_id
+ )
+
+ unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
+ self.user_id, self.last_stream_ordering, self.max_stream_ordering
+ )
+
+ soonest_due_at = None
+
+ for push_action in unprocessed:
+ received_at = push_action['received_ts']
+ if received_at is None:
+ received_at = 0
+ notif_ready_at = received_at + DELAY_BEFORE_MAIL_MS
+
+ room_ready_at = self.room_ready_to_notify_at(
+ push_action['room_id'], self.get_room_last_notif_ts(
+ last_notifs, push_action['room_id']
+ )
+ )
+
+ should_notify_at = max(notif_ready_at, room_ready_at)
+
+ if should_notify_at < self.clock.time_msec():
+ # one of our notifications is ready for sending, so we send
+ # *one* email updating the user on their notifications,
+ # we then consider all previously outstanding notifications
+ # to be delivered.
+ yield self.send_notification(unprocessed)
+
+ yield self.save_last_stream_ordering_and_success(max([
+ ea['stream_ordering'] for ea in unprocessed
+ ]))
+ yield self.sent_notif_update_throttle(
+ push_action['room_id'], push_action
+ )
+ else:
+ if soonest_due_at is None or should_notify_at < soonest_due_at:
+ soonest_due_at = should_notify_at
+
+ if self.timed_call is not None:
+ self.timed_call.cancel()
+ self.timed_call = None
+
+ if soonest_due_at is not None:
+ self.timed_call = reactor.callLater(
+ self.seconds_until(soonest_due_at), self.on_timer
+ )
+
+ @defer.inlineCallbacks
+ def save_last_stream_ordering_and_success(self, last_stream_ordering):
+ self.last_stream_ordering = last_stream_ordering
+ yield self.store.update_pusher_last_stream_ordering_and_success(
+ self.app_id, self.email, self.user_id,
+ last_stream_ordering, self.clock.time_msec()
+ )
+
+ def seconds_until(self, ts_msec):
+ return (ts_msec - self.clock.time_msec()) / 1000
+
+ def get_room_last_notif_ts(self, last_notif_by_room, room_id):
+ if room_id in last_notif_by_room:
+ return last_notif_by_room[room_id]
+ else:
+ return 0
+
+ def get_room_throttle_ms(self, room_id):
+ if room_id in self.throttle_params:
+ return self.throttle_params[room_id]["throttle_ms"]
+ else:
+ return 0
+
+ def get_room_last_sent_ts(self, room_id):
+ if room_id in self.throttle_params:
+ return self.throttle_params[room_id]["last_sent_ts"]
+ else:
+ return 0
+
+ def room_ready_to_notify_at(self, room_id, last_notif_time):
+ """
+ Determines whether throttling should prevent us from sending an email
+ for the given room
+ Returns: True if we should send, False if we should not
+ """
+ last_sent_ts = self.get_room_last_sent_ts(room_id)
+ throttle_ms = self.get_room_throttle_ms(room_id)
+
+ may_send_at = last_sent_ts + throttle_ms
+ return may_send_at
+
+ @defer.inlineCallbacks
+ def sent_notif_update_throttle(self, room_id, notified_push_action):
+ # We have sent a notification, so update the throttle accordingly.
+ # If the event that triggered the notif happened more than
+ # THROTTLE_RESET_AFTER_MS after the previous one that triggered a
+ # notif, we release the throttle. Otherwise, the throttle is increased.
+ time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before(
+ notified_push_action['stream_ordering']
+ )
+
+ time_of_this_notifs = notified_push_action['received_ts']
+
+ if time_of_previous_notifs is not None and time_of_this_notifs is not None:
+ gap = time_of_this_notifs - time_of_previous_notifs
+ else:
+ # if we don't know the arrival time of one of the notifs (it was not
+ # stored prior to email notification code) then assume a gap of
+ # zero which will just not reset the throttle
+ gap = 0
+
+ current_throttle_ms = self.get_room_throttle_ms(room_id)
+
+ if gap > THROTTLE_RESET_AFTER_MS:
+ new_throttle_ms = THROTTLE_START_MS
+ else:
+ if current_throttle_ms == 0:
+ new_throttle_ms = THROTTLE_START_MS
+ else:
+ new_throttle_ms = min(
+ current_throttle_ms * 2,
+ THROTTLE_MAX_MS
+ )
+ self.throttle_params[room_id] = {
+ "last_sent_ts": self.clock.time_msec(),
+ "throttle_ms": new_throttle_ms
+ }
+ yield self.store.set_throttle_params(
+ self.pusher_id, room_id, self.throttle_params[room_id]
+ )
+
+ @defer.inlineCallbacks
+ def send_notification(self, push_actions):
+ logger.info("Sending notif email for user %r", self.user_id)
+ yield self.mailer.send_notification_mail(
+ self.user_id, self.email, push_actions
+ )
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
new file mode 100644
index 0000000000..c53ae9a547
--- /dev/null
+++ b/synapse/push/mailer.py
@@ -0,0 +1,373 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+from twisted.mail.smtp import sendmail
+
+import email.utils
+import email.mime.multipart
+from email.mime.text import MIMEText
+
+from synapse.util.async import concurrently_execute
+from synapse.util.presentable_names import (
+ calculate_room_name, name_from_member_event, descriptor_from_member_events
+)
+from synapse.types import UserID
+from synapse.api.errors import StoreError
+
+import jinja2
+import bleach
+
+import time
+import urllib
+
+
+MESSAGE_FROM_PERSON_IN_ROOM = "You have a message from %s in the %s room"
+MESSAGE_FROM_PERSON = "You have a message from %s"
+MESSAGES_FROM_PERSON = "You have messages from %s"
+MESSAGES_IN_ROOM = "There are some messages for you in the %s room"
+MESSAGES_IN_ROOMS = "Here are some messages you may have missed"
+INVITE_FROM_PERSON_TO_ROOM = "%s has invited you to join the %s room"
+INVITE_FROM_PERSON = "%s has invited you to chat"
+
+CONTEXT_BEFORE = 1
+
+# From https://github.com/matrix-org/matrix-react-sdk/blob/master/src/HtmlUtils.js
+ALLOWED_TAGS = [
+ 'font', # custom to matrix for IRC-style font coloring
+ 'del', # for markdown
+ # deliberately no h1/h2 to stop people shouting.
+ 'h3', 'h4', 'h5', 'h6', 'blockquote', 'p', 'a', 'ul', 'ol',
+ 'nl', 'li', 'b', 'i', 'u', 'strong', 'em', 'strike', 'code', 'hr', 'br', 'div',
+ 'table', 'thead', 'caption', 'tbody', 'tr', 'th', 'td', 'pre'
+]
+ALLOWED_ATTRS = {
+ # custom ones first:
+ "font": ["color"], # custom to matrix
+ "a": ["href", "name", "target"], # remote target: custom to matrix
+ # We don't currently allow img itself by default, but this
+ # would make sense if we did
+ "img": ["src"],
+}
+# When bleach release a version with this option, we can specify schemes
+# ALLOWED_SCHEMES = ["http", "https", "ftp", "mailto"]
+
+
+class Mailer(object):
+ def __init__(self, hs):
+ self.hs = hs
+ self.store = self.hs.get_datastore()
+ self.state_handler = self.hs.get_state_handler()
+ loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir)
+ env = jinja2.Environment(loader=loader)
+ env.filters["format_ts"] = format_ts_filter
+ env.filters["mxc_to_http"] = self.mxc_to_http_filter
+ self.notif_template = env.get_template(self.hs.config.email_notif_template_html)
+
+ @defer.inlineCallbacks
+ def send_notification_mail(self, user_id, email_address, push_actions):
+ raw_from = email.utils.parseaddr(self.hs.config.email_notif_from)[1]
+ raw_to = email.utils.parseaddr(email_address)[1]
+
+ if raw_to == '':
+ raise RuntimeError("Invalid 'to' address")
+
+ rooms_in_order = deduped_ordered_list(
+ [pa['room_id'] for pa in push_actions]
+ )
+
+ notif_events = yield self.store.get_events(
+ [pa['event_id'] for pa in push_actions]
+ )
+
+ notifs_by_room = {}
+ for pa in push_actions:
+ notifs_by_room.setdefault(pa["room_id"], []).append(pa)
+
+ # collect the current state for all the rooms in which we have
+ # notifications
+ state_by_room = {}
+
+ try:
+ user_display_name = yield self.store.get_profile_displayname(
+ UserID.from_string(user_id).localpart
+ )
+ except StoreError:
+ user_display_name = user_id
+
+ @defer.inlineCallbacks
+ def _fetch_room_state(room_id):
+ room_state = yield self.state_handler.get_current_state(room_id)
+ state_by_room[room_id] = room_state
+
+ # Run at most 3 of these at once: sync does 10 at a time but email
+ # notifs are much realtime than sync so we can afford to wait a bit.
+ yield concurrently_execute(_fetch_room_state, rooms_in_order, 3)
+
+ rooms = []
+
+ for r in rooms_in_order:
+ vars = yield self.get_room_vars(
+ r, user_id, notifs_by_room[r], notif_events, state_by_room[r]
+ )
+ rooms.append(vars)
+
+ summary_text = self.make_summary_text(
+ notifs_by_room, state_by_room, notif_events, user_id
+ )
+
+ template_vars = {
+ "user_display_name": user_display_name,
+ "unsubscribe_link": self.make_unsubscribe_link(),
+ "summary_text": summary_text,
+ "rooms": rooms,
+ }
+
+ plainText = self.notif_template.render(**template_vars)
+
+ text_part = MIMEText(plainText, "html", "utf8")
+ text_part['Subject'] = "New Matrix Notifications"
+ text_part['From'] = self.hs.config.email_notif_from
+ text_part['To'] = email_address
+
+ yield sendmail(
+ self.hs.config.email_smtp_host,
+ raw_from, raw_to, text_part.as_string(),
+ port=self.hs.config.email_smtp_port
+ )
+
+ @defer.inlineCallbacks
+ def get_room_vars(self, room_id, user_id, notifs, notif_events, room_state):
+ my_member_event = room_state[("m.room.member", user_id)]
+ is_invite = my_member_event.content["membership"] == "invite"
+
+ room_vars = {
+ "title": calculate_room_name(room_state, user_id),
+ "hash": string_ordinal_total(room_id), # See sender avatar hash
+ "notifs": [],
+ "invite": is_invite,
+ "link": self.make_room_link(room_id),
+ }
+
+ if not is_invite:
+ for n in notifs:
+ vars = yield self.get_notif_vars(
+ n, user_id, notif_events[n['event_id']], room_state
+ )
+ room_vars['notifs'].append(vars)
+
+ defer.returnValue(room_vars)
+
+ @defer.inlineCallbacks
+ def get_notif_vars(self, notif, user_id, notif_event, room_state):
+ results = yield self.store.get_events_around(
+ notif['room_id'], notif['event_id'],
+ before_limit=CONTEXT_BEFORE, after_limit=0
+ )
+
+ ret = {
+ "link": self.make_notif_link(notif),
+ "ts": notif['received_ts'],
+ "messages": [],
+ }
+
+ handler = self.hs.get_handlers().message_handler
+ the_events = yield handler.filter_events_for_client(
+ user_id, results["events_before"]
+ )
+ the_events.append(notif_event)
+
+ for event in the_events:
+ vars = self.get_message_vars(notif, event, room_state)
+ if vars is not None:
+ ret['messages'].append(vars)
+
+ defer.returnValue(ret)
+
+ def get_message_vars(self, notif, event, room_state):
+ if event.type != "m.room.message":
+ return None
+
+ sender_state_event = room_state[("m.room.member", event.sender)]
+ sender_name = name_from_member_event(sender_state_event)
+ sender_avatar_url = sender_state_event.content["avatar_url"]
+
+ # 'hash' for deterministically picking default images: use
+ # sender_hash % the number of default images to choose from
+ sender_hash = string_ordinal_total(event.sender)
+
+ ret = {
+ "msgtype": event.content["msgtype"],
+ "is_historical": event.event_id != notif['event_id'],
+ "ts": event.origin_server_ts,
+ "sender_name": sender_name,
+ "sender_avatar_url": sender_avatar_url,
+ "sender_hash": sender_hash,
+ }
+
+ if event.content["msgtype"] == "m.text":
+ self.add_text_message_vars(ret, event)
+ elif event.content["msgtype"] == "m.image":
+ self.add_image_message_vars(ret, event)
+
+ if "body" in event.content:
+ ret["body_text_plain"] = event.content["body"]
+
+ return ret
+
+ def add_text_message_vars(self, vars, event):
+ if "format" in event.content:
+ msgformat = event.content["format"]
+ else:
+ msgformat = None
+ vars["format"] = msgformat
+
+ if msgformat == "org.matrix.custom.html":
+ vars["body_text_html"] = safe_markup(event.content["formatted_body"])
+ else:
+ vars["body_text_html"] = safe_text(event.content["body"])
+
+ return vars
+
+ def add_image_message_vars(self, vars, event):
+ vars["image_url"] = event.content["url"]
+
+ return vars
+
+ def make_summary_text(self, notifs_by_room, state_by_room, notif_events, user_id):
+ if len(notifs_by_room) == 1:
+ # Only one room has new stuff
+ room_id = notifs_by_room.keys()[0]
+
+ # If the room has some kind of name, use it, but we don't
+ # want the generated-from-names one here otherwise we'll
+ # end up with, "new message from Bob in the Bob room"
+ room_name = calculate_room_name(
+ state_by_room[room_id], user_id, fallback_to_members=False
+ )
+
+ my_member_event = state_by_room[room_id][("m.room.member", user_id)]
+ if my_member_event.content["membership"] == "invite":
+ inviter_member_event = state_by_room[room_id][
+ ("m.room.member", my_member_event.sender)
+ ]
+ inviter_name = name_from_member_event(inviter_member_event)
+
+ if room_name is None:
+ return INVITE_FROM_PERSON % (inviter_name,)
+ else:
+ return INVITE_FROM_PERSON_TO_ROOM % (inviter_name, room_name)
+
+ sender_name = None
+ if len(notifs_by_room[room_id]) == 1:
+ # There is just the one notification, so give some detail
+ event = notif_events[notifs_by_room[room_id][0]["event_id"]]
+ if ("m.room.member", event.sender) in state_by_room[room_id]:
+ state_event = state_by_room[room_id][("m.room.member", event.sender)]
+ sender_name = name_from_member_event(state_event)
+
+ if sender_name is not None and room_name is not None:
+ return MESSAGE_FROM_PERSON_IN_ROOM % (sender_name, room_name)
+ elif sender_name is not None:
+ return MESSAGE_FROM_PERSON % (sender_name,)
+ else:
+ # There's more than one notification for this room, so just
+ # say there are several
+ if room_name is not None:
+ return MESSAGES_IN_ROOM % (room_name,)
+ else:
+ # If the room doesn't have a name, say who the messages
+ # are from explicitly to avoid, "messages in the Bob room"
+ sender_ids = list(set([
+ notif_events[n['event_id']].sender
+ for n in notifs_by_room[room_id]
+ ]))
+
+ return MESSAGES_FROM_PERSON % (
+ descriptor_from_member_events([
+ state_by_room[room_id][("m.room.member", s)]
+ for s in sender_ids
+ ])
+ )
+ else:
+ # Stuff's happened in multiple different rooms
+ return MESSAGES_IN_ROOMS
+
+ def make_room_link(self, room_id):
+ return "https://matrix.to/%s" % (room_id,)
+
+ def make_notif_link(self, notif):
+ return "https://matrix.to/%s/%s" % (
+ notif['room_id'], notif['event_id']
+ )
+
+ def make_unsubscribe_link(self):
+ return "https://vector.im/#/settings" # XXX: matrix.to
+
+ def mxc_to_http_filter(self, value, width, height, resize_method="crop"):
+ if value[0:6] != "mxc://":
+ return ""
+ serverAndMediaId = value[6:]
+ params = {
+ "width": width,
+ "height": height,
+ "method": resize_method,
+ }
+ return "%s_matrix/media/v1/thumbnail/%s?%s" % (
+ self.hs.config.public_baseurl,
+ serverAndMediaId,
+ urllib.urlencode(params)
+ )
+
+
+def safe_markup(raw_html):
+ return jinja2.Markup(bleach.linkify(bleach.clean(
+ raw_html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRS,
+ # bleach master has this, but it isn't released yet
+ # protocols=ALLOWED_SCHEMES,
+ strip=True
+ )))
+
+
+def safe_text(raw_text):
+ """
+ Process text: treat it as HTML but escape any tags (ie. just escape the
+ HTML) then linkify it.
+ """
+ return jinja2.Markup(bleach.linkify(bleach.clean(
+ raw_text, tags=[], attributes={},
+ strip=False
+ )))
+
+
+def deduped_ordered_list(l):
+ seen = set()
+ ret = []
+ for item in l:
+ if item not in seen:
+ seen.add(item)
+ ret.append(item)
+ return ret
+
+
+def string_ordinal_total(s):
+ tot = 0
+ for c in s:
+ tot += ord(c)
+ return tot
+
+
+def format_ts_filter(value, format):
+ return time.strftime(format, time.localtime(value / 1000))
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 4960837504..25a45af775 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -1,10 +1,14 @@
from httppusher import HttpPusher
-PUSHER_TYPES = {
- 'http': HttpPusher
-}
-
def create_pusher(hs, pusherdict):
+ PUSHER_TYPES = {
+ "http": HttpPusher,
+ }
+
+ if hs.config.email_enable_notifs:
+ from emailpusher import EmailPusher
+ PUSHER_TYPES["email"] = EmailPusher
+
if pusherdict['kind'] in PUSHER_TYPES:
return PUSHER_TYPES[pusherdict['kind']](hs, pusherdict)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 0eb3d6c1de..e0a7a19777 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -44,6 +44,10 @@ CONDITIONAL_REQUIREMENTS = {
"preview_url": {
"netaddr>=0.7.18": ["netaddr"],
},
+ "email.enable_notifs": {
+ "Jinja2>=2.8": ["Jinja2>=2.8"],
+ "bleach>=1.4.2": ["bleach>=1.4.2"],
+ },
}
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 86a98b6f11..f2af8bdb36 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -118,15 +118,17 @@ class EventPushActionsStore(SQLBaseStore):
max_stream_ordering=None):
def get_after_receipt(txn):
sql = (
- "SELECT ep.event_id, ep.stream_ordering, ep.actions "
+ "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
+ "e.received_ts "
"FROM event_push_actions AS ep, ("
- " SELECT room_id, user_id,"
- " max(topological_ordering) as topological_ordering,"
- " max(stream_ordering) as stream_ordering"
+ " SELECT room_id, user_id, "
+ " max(topological_ordering) as topological_ordering, "
+ " max(stream_ordering) as stream_ordering "
" FROM events"
" NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'"
" GROUP BY room_id, user_id"
") AS rl "
+ "NATURAL JOIN events e "
"WHERE"
" ep.room_id = rl.room_id"
" AND ("
@@ -153,8 +155,10 @@ class EventPushActionsStore(SQLBaseStore):
def get_no_receipt(txn):
sql = (
- "SELECT ep.event_id, ep.stream_ordering, ep.actions "
+ "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
+ "e.received_ts "
"FROM event_push_actions AS ep "
+ "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id "
"WHERE ep.room_id not in ("
" SELECT room_id FROM events NATURAL JOIN receipts_linearized"
" WHERE receipt_type = 'm.read' AND user_id = ? "
@@ -175,12 +179,30 @@ class EventPushActionsStore(SQLBaseStore):
defer.returnValue([
{
"event_id": row[0],
- "stream_ordering": row[1],
- "actions": json.loads(row[2]),
+ "room_id": row[1],
+ "stream_ordering": row[2],
+ "actions": json.loads(row[3]),
+ "received_ts": row[4],
} for row in after_read_receipt + no_read_receipt
])
@defer.inlineCallbacks
+ def get_time_of_last_push_action_before(self, stream_ordering):
+ def f(txn):
+ sql = (
+ "SELECT e.received_ts "
+ "FROM event_push_actions AS ep "
+ "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id "
+ "WHERE ep.stream_ordering > ? "
+ "ORDER BY ep.stream_ordering ASC "
+ "LIMIT 1"
+ )
+ txn.execute(sql, (stream_ordering,))
+ return txn.fetchone()
+ result = yield self.runInteraction("get_time_of_last_push_action_before", f)
+ defer.returnValue(result[0] if result is not None else None)
+
+ @defer.inlineCallbacks
def get_latest_push_action_stream_ordering(self):
def f(txn):
txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
@@ -190,6 +212,26 @@ class EventPushActionsStore(SQLBaseStore):
)
defer.returnValue(result[0] or 0)
+ @defer.inlineCallbacks
+ def get_time_of_latest_push_action_by_room_for_user(self, user_id):
+ """
+ Returns only the received_ts of the last notification in each of the
+ user's rooms, in a dict by room_id
+ """
+ def f(txn):
+ txn.execute(
+ "SELECT ep.room_id, MAX(e.received_ts) "
+ "FROM event_push_actions AS ep "
+ "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id "
+ "GROUP BY ep.room_id"
+ )
+ return txn.fetchall()
+ result = yield self.runInteraction(
+ "get_time_of_latest_push_action_by_room_for_user", f
+ )
+
+ defer.returnValue({row[0]: row[1] for row in result})
+
def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
# Sad that we have to blow away the cache for the whole room here
txn.call_after(
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 0307b2af3c..438eef6ba3 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -55,6 +55,7 @@ class EventsStore(SQLBaseStore):
def __init__(self, hs):
super(EventsStore, self).__init__(hs)
+ self._clock = hs.get_clock()
self.register_background_update_handler(
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
)
@@ -427,6 +428,7 @@ class EventsStore(SQLBaseStore):
"outlier": event.internal_metadata.is_outlier(),
"content": encode_json(event.content).decode("UTF-8"),
"origin_server_ts": int(event.origin_server_ts),
+ "received_ts": self._clock.time_msec(),
}
for event, _ in events_and_contexts
],
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 11feb3eb11..d9afd7ec87 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -233,3 +233,30 @@ class PusherStore(SQLBaseStore):
{'failing_since': failing_since},
desc="update_pusher_failing_since",
)
+
+ @defer.inlineCallbacks
+ def get_throttle_params_by_room(self, pusher_id):
+ res = yield self._simple_select_list(
+ "pusher_throttle",
+ {"pusher": pusher_id},
+ ["room_id", "last_sent_ts", "throttle_ms"],
+ desc="get_throttle_params_by_room"
+ )
+
+ params_by_room = {}
+ for row in res:
+ params_by_room[row["room_id"]] = {
+ "last_sent_ts": row["last_sent_ts"],
+ "throttle_ms": row["throttle_ms"]
+ }
+
+ defer.returnValue(params_by_room)
+
+ @defer.inlineCallbacks
+ def set_throttle_params(self, pusher_id, room_id, params):
+ yield self._simple_upsert(
+ "pusher_throttle",
+ {"pusher": pusher_id, "room_id": room_id},
+ params,
+ desc="set_throttle_params"
+ )
diff --git a/synapse/storage/schema/delta/31/events.sql b/synapse/storage/schema/delta/31/events.sql
new file mode 100644
index 0000000000..1dd0f9e170
--- /dev/null
+++ b/synapse/storage/schema/delta/31/events.sql
@@ -0,0 +1,16 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE events ADD COLUMN received_ts BIGINT;
diff --git a/synapse/storage/schema/delta/31/pusher_throttle.sql b/synapse/storage/schema/delta/31/pusher_throttle.sql
new file mode 100644
index 0000000000..d86d30c13c
--- /dev/null
+++ b/synapse/storage/schema/delta/31/pusher_throttle.sql
@@ -0,0 +1,23 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE pusher_throttle(
+ pusher BIGINT NOT NULL,
+ room_id TEXT NOT NULL,
+ last_sent_ts BIGINT,
+ throttle_ms BIGINT,
+ PRIMARY KEY (pusher, room_id)
+);
diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py
new file mode 100644
index 0000000000..f80a7fe58e
--- /dev/null
+++ b/synapse/util/presentable_names.py
@@ -0,0 +1,145 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+
+# intentionally looser than what aliases we allow to be registered since
+# other HSes may allow aliases that we would not
+ALIAS_RE = re.compile(r"^#.*:.+$")
+
+ALL_ALONE = "Empty Room"
+
+
+def calculate_room_name(room_state, user_id, fallback_to_members=True):
+ # does it have a name?
+ if ("m.room.name", "") in room_state:
+ m_room_name = room_state[("m.room.name", "")]
+ if m_room_name.content and m_room_name.content["name"]:
+ return m_room_name.content["name"]
+
+ # does it have a canonical alias?
+ if ("m.room.canonical_alias", "") in room_state:
+ canon_alias = room_state[("m.room.canonical_alias", "")]
+ if (
+ canon_alias.content and canon_alias.content["alias"] and
+ _looks_like_an_alias(canon_alias.content["alias"])
+ ):
+ return canon_alias.content["alias"]
+
+ # at this point we're going to need to search the state by all state keys
+ # for an event type, so rearrange the data structure
+ room_state_bytype = _state_as_two_level_dict(room_state)
+
+ # right then, any aliases at all?
+ if "m.room.aliases" in room_state_bytype:
+ m_room_aliases = room_state_bytype["m.room.aliases"]
+ if len(m_room_aliases.values()) > 0:
+ first_alias_event = m_room_aliases.values()[0]
+ if first_alias_event.content and first_alias_event.content["aliases"]:
+ the_aliases = first_alias_event.content["aliases"]
+ if len(the_aliases) > 0 and _looks_like_an_alias(the_aliases[0]):
+ return the_aliases[0]
+
+ if not fallback_to_members:
+ return None
+
+ my_member_event = None
+ if ("m.room.member", user_id) in room_state:
+ my_member_event = room_state[("m.room.member", user_id)]
+
+ if (
+ my_member_event is not None and
+ my_member_event.content['membership'] == "invite"
+ ):
+ if ("m.room.member", my_member_event.sender) in room_state:
+ inviter_member_event = room_state[("m.room.member", my_member_event.sender)]
+ return "Invite from %s" % (name_from_member_event(inviter_member_event),)
+ else:
+ return "Room Invite"
+
+ # we're going to have to generate a name based on who's in the room,
+ # so find out who is in the room that isn't the user.
+ if "m.room.member" in room_state_bytype:
+ all_members = [
+ ev for ev in room_state_bytype["m.room.member"].values()
+ if ev.content['membership'] == "join" or ev.content['membership'] == "invite"
+ ]
+ # Sort the member events oldest-first so the we name people in the
+ # order the joined (it should at least be deterministic rather than
+ # dictionary iteration order)
+ all_members.sort(key=lambda e: e.origin_server_ts)
+ other_members = [m for m in all_members if m.state_key != user_id]
+ else:
+ other_members = []
+ all_members = []
+
+ if len(other_members) == 0:
+ if len(all_members) == 1:
+ # self-chat, peeked room with 1 participant,
+ # or inbound invite, or outbound 3PID invite.
+ if all_members[0].sender == user_id:
+ if "m.room.third_party_invite" in room_state_bytype:
+ third_party_invites = room_state_bytype["m.room.third_party_invite"]
+ if len(third_party_invites) > 0:
+ # technically third party invite events are not member
+ # events, but they are close enough
+ return "Inviting %s" (
+ descriptor_from_member_events(third_party_invites)
+ )
+ else:
+ return ALL_ALONE
+ else:
+ return name_from_member_event(all_members[0])
+ else:
+ return ALL_ALONE
+ else:
+ return descriptor_from_member_events(other_members)
+
+
+def descriptor_from_member_events(member_events):
+ if len(member_events) == 0:
+ return "nobody"
+ elif len(member_events) == 1:
+ return name_from_member_event(member_events[0])
+ elif len(member_events) == 2:
+ return "%s and %s" % (
+ name_from_member_event(member_events[0]),
+ name_from_member_event(member_events[1]),
+ )
+ else:
+ return "%s and %d others" % (
+ name_from_member_event(member_events[0]),
+ len(member_events) - 1,
+ )
+
+
+def name_from_member_event(member_event):
+ if (
+ member_event.content and "displayname" in member_event.content and
+ member_event.content["displayname"]
+ ):
+ return member_event.content["displayname"]
+ return member_event.state_key
+
+
+def _state_as_two_level_dict(state):
+ ret = {}
+ for k, v in state.items():
+ ret.setdefault(k[0], {})[k[1]] = v
+ return ret
+
+
+def _looks_like_an_alias(string):
+ return ALIAS_RE.match(string) is not None
\ No newline at end of file
|