summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py16
-rw-r--r--synapse/api/filtering.py8
-rw-r--r--synapse/app/pusher.py32
-rw-r--r--synapse/config/emailconfig.py98
-rw-r--r--synapse/config/homeserver.py3
-rw-r--r--synapse/config/server.py8
-rw-r--r--synapse/federation/transaction_queue.py3
-rw-r--r--synapse/handlers/_base.py368
-rw-r--r--synapse/handlers/auth.py5
-rw-r--r--synapse/handlers/federation.py31
-rw-r--r--synapse/handlers/message.py215
-rw-r--r--synapse/handlers/presence.py4
-rw-r--r--synapse/handlers/room.py7
-rw-r--r--synapse/handlers/room_member.py4
-rw-r--r--synapse/handlers/search.py17
-rw-r--r--synapse/handlers/sync.py29
-rw-r--r--synapse/notifier.py5
-rw-r--r--synapse/push/action_generator.py4
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py7
-rw-r--r--synapse/push/emailpusher.py251
-rw-r--r--synapse/push/mailer.py455
-rw-r--r--synapse/push/pusher.py33
-rw-r--r--synapse/push/pusherpool.py6
-rw-r--r--synapse/python_dependencies.py4
-rw-r--r--synapse/replication/resource.py9
-rw-r--r--synapse/replication/slave/storage/events.py4
-rw-r--r--synapse/rest/client/v2_alpha/register.py29
-rw-r--r--synapse/storage/account_data.py50
-rw-r--r--synapse/storage/event_push_actions.py49
-rw-r--r--synapse/storage/events.py162
-rw-r--r--synapse/storage/pusher.py27
-rw-r--r--synapse/storage/receipts.py6
-rw-r--r--synapse/storage/registration.py3
-rw-r--r--synapse/storage/roommember.py7
-rw-r--r--synapse/storage/schema/delta/32/events.sql16
-rw-r--r--synapse/storage/schema/delta/32/pusher_throttle.sql23
-rw-r--r--synapse/storage/schema/delta/32/remove_indices.sql38
-rw-r--r--synapse/types.py4
-rw-r--r--synapse/util/presentable_names.py159
-rw-r--r--synapse/visibility.py210
40 files changed, 1943 insertions, 466 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 9e912fdfbe..d3e9837c81 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership, JoinRules
 from synapse.api.errors import AuthError, Codes, SynapseError, EventSizeError
-from synapse.types import Requester, RoomID, UserID, EventID
+from synapse.types import Requester, UserID, get_domian_from_id
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import preserve_context_over_fn
 from synapse.util.metrics import Measure
@@ -91,8 +91,8 @@ class Auth(object):
                     "Room %r does not exist" % (event.room_id,)
                 )
 
-            creating_domain = RoomID.from_string(event.room_id).domain
-            originating_domain = UserID.from_string(event.sender).domain
+            creating_domain = get_domian_from_id(event.room_id)
+            originating_domain = get_domian_from_id(event.sender)
             if creating_domain != originating_domain:
                 if not self.can_federate(event, auth_events):
                     raise AuthError(
@@ -219,7 +219,7 @@ class Auth(object):
         for event in curr_state.values():
             if event.type == EventTypes.Member:
                 try:
-                    if UserID.from_string(event.state_key).domain != host:
+                    if get_domian_from_id(event.state_key) != host:
                         continue
                 except:
                     logger.warn("state_key not user_id: %s", event.state_key)
@@ -266,8 +266,8 @@ class Auth(object):
 
         target_user_id = event.state_key
 
-        creating_domain = RoomID.from_string(event.room_id).domain
-        target_domain = UserID.from_string(target_user_id).domain
+        creating_domain = get_domian_from_id(event.room_id)
+        target_domain = get_domian_from_id(target_user_id)
         if creating_domain != target_domain:
             if not self.can_federate(event, auth_events):
                 raise AuthError(
@@ -889,8 +889,8 @@ class Auth(object):
         if user_level >= redact_level:
             return False
 
-        redacter_domain = EventID.from_string(event.event_id).domain
-        redactee_domain = EventID.from_string(event.redacts).domain
+        redacter_domain = get_domian_from_id(event.event_id)
+        redactee_domain = get_domian_from_id(event.redacts)
         if redacter_domain == redactee_domain:
             return True
 
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index cd699ef27f..4f5a4281fa 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -15,6 +15,8 @@
 from synapse.api.errors import SynapseError
 from synapse.types import UserID, RoomID
 
+from twisted.internet import defer
+
 import ujson as json
 
 
@@ -24,10 +26,10 @@ class Filtering(object):
         super(Filtering, self).__init__()
         self.store = hs.get_datastore()
 
+    @defer.inlineCallbacks
     def get_user_filter(self, user_localpart, filter_id):
-        result = self.store.get_user_filter(user_localpart, filter_id)
-        result.addCallback(FilterCollection)
-        return result
+        result = yield self.store.get_user_filter(user_localpart, filter_id)
+        defer.returnValue(FilterCollection(result))
 
     def add_user_filter(self, user_localpart, user_filter):
         self.check_valid_filter(user_filter)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index b5339f030d..8e9c0e1960 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -20,8 +20,10 @@ from synapse.server import HomeServer
 from synapse.config._base import ConfigError
 from synapse.config.database import DatabaseConfig
 from synapse.config.logger import LoggingConfig
+from synapse.config.emailconfig import EmailConfig
 from synapse.http.site import SynapseSite
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.storage.state import StateStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
@@ -91,7 +93,7 @@ class SlaveConfig(DatabaseConfig):
         """ % locals()
 
 
-class PusherSlaveConfig(SlaveConfig, LoggingConfig):
+class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig):
     pass
 
 
@@ -110,6 +112,34 @@ class PusherSlaveStore(
         DataStore.update_pusher_last_stream_ordering.__func__
     )
 
+    get_throttle_params_by_room = (
+        DataStore.get_throttle_params_by_room.__func__
+    )
+
+    set_throttle_params = (
+        DataStore.set_throttle_params.__func__
+    )
+
+    get_time_of_last_push_action_before = (
+        DataStore.get_time_of_last_push_action_before.__func__
+    )
+
+    get_profile_displayname = (
+        DataStore.get_profile_displayname.__func__
+    )
+
+    get_state_groups = (
+        DataStore.get_state_groups.__func__
+    )
+
+    _get_state_group_for_events = (
+        StateStore.__dict__["_get_state_group_for_events"]
+    )
+
+    _get_state_group_for_event = (
+        StateStore.__dict__["_get_state_group_for_event"]
+    )
+
 
 class PusherServer(HomeServer):
 
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
new file mode 100644
index 0000000000..90bdd08f00
--- /dev/null
+++ b/synapse/config/emailconfig.py
@@ -0,0 +1,98 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file can't be called email.py because if it is, we cannot:
+import email.utils
+
+from ._base import Config
+
+
+class EmailConfig(Config):
+    def read_config(self, config):
+        self.email_enable_notifs = False
+
+        email_config = config.get("email", {})
+        self.email_enable_notifs = email_config.get("enable_notifs", False)
+
+        if self.email_enable_notifs:
+            # make sure we can import the required deps
+            import jinja2
+            import bleach
+            # prevent unused warnings
+            jinja2
+            bleach
+
+            required = [
+                "smtp_host",
+                "smtp_port",
+                "notif_from",
+                "template_dir",
+                "notif_template_html",
+                "notif_template_text",
+            ]
+
+            missing = []
+            for k in required:
+                if k not in email_config:
+                    missing.append(k)
+
+            if (len(missing) > 0):
+                raise RuntimeError(
+                    "email.enable_notifs is True but required keys are missing: %s" %
+                    (", ".join(["email." + k for k in missing]),)
+                )
+
+            if config.get("public_baseurl") is None:
+                raise RuntimeError(
+                    "email.enable_notifs is True but no public_baseurl is set"
+                )
+
+            self.email_smtp_host = email_config["smtp_host"]
+            self.email_smtp_port = email_config["smtp_port"]
+            self.email_notif_from = email_config["notif_from"]
+            self.email_template_dir = email_config["template_dir"]
+            self.email_notif_template_html = email_config["notif_template_html"]
+            self.email_notif_template_text = email_config["notif_template_text"]
+            self.email_notif_for_new_users = email_config.get(
+                "notif_for_new_users", True
+            )
+            if "app_name" in email_config:
+                self.email_app_name = email_config["app_name"]
+            else:
+                self.email_app_name = "Matrix"
+
+            # make sure it's valid
+            parsed = email.utils.parseaddr(self.email_notif_from)
+            if parsed[1] == '':
+                raise RuntimeError("Invalid notif_from address")
+        else:
+            self.email_enable_notifs = False
+            # Not much point setting defaults for the rest: it would be an
+            # error for them to be used.
+
+    def default_config(self, config_dir_path, server_name, **kwargs):
+        return """
+        # Enable sending emails for notification events
+        #email:
+        #   enable_notifs: false
+        #   smtp_host: "localhost"
+        #   smtp_port: 25
+        #   notif_from: Your Friendly Matrix Home Server <noreply@example.com>
+        #   app_name: Matrix
+        #   template_dir: res/templates
+        #   notif_template_html: notif_mail.html
+        #   notif_template_text: notif_mail.txt
+        #   notif_for_new_users: True
+        """
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 9a80ac39ec..fc2445484c 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -31,13 +31,14 @@ from .cas import CasConfig
 from .password import PasswordConfig
 from .jwt import JWTConfig
 from .ldap import LDAPConfig
+from .emailconfig import EmailConfig
 
 
 class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
                        RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
                        VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
                        AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
-                       JWTConfig, LDAPConfig, PasswordConfig,):
+                       JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,):
     pass
 
 
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 46c633548a..0b5f462e44 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -28,6 +28,11 @@ class ServerConfig(Config):
         self.print_pidfile = config.get("print_pidfile")
         self.user_agent_suffix = config.get("user_agent_suffix")
         self.use_frozen_dicts = config.get("use_frozen_dicts", True)
+        self.public_baseurl = config.get("public_baseurl")
+
+        if self.public_baseurl is not None:
+            if self.public_baseurl[-1] != '/':
+                self.public_baseurl += '/'
         self.start_pushers = config.get("start_pushers", True)
 
         self.listeners = config.get("listeners", [])
@@ -143,6 +148,9 @@ class ServerConfig(Config):
         # Whether to serve a web client from the HTTP/HTTPS root resource.
         web_client: True
 
+        # The public-facing base URL for the client API (not including _matrix/...)
+        # public_baseurl: https://example.com:8448/
+
         # Set the soft limit on the number of file descriptors synapse can use
         # Zero is used to indicate synapse should set the soft limit to the
         # hard limit.
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 1928da03b3..5787f854d4 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -20,6 +20,7 @@ from .persistence import TransactionActions
 from .units import Transaction
 
 from synapse.api.errors import HttpResponseException
+from synapse.util.async import run_on_reactor
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.retryutils import (
@@ -199,6 +200,8 @@ class TransactionQueue(object):
     @defer.inlineCallbacks
     @log_function
     def _attempt_new_transaction(self, destination):
+        yield run_on_reactor()
+
         # list of (pending_pdu, deferred, order)
         if destination in self.pending_transactions:
             # XXX: pending_transactions can get stuck on by a never-ending
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 13a675b208..c904c6c500 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -15,13 +15,10 @@
 
 from twisted.internet import defer
 
-from synapse.api.errors import LimitExceededError, SynapseError, AuthError
-from synapse.crypto.event_signing import add_hashes_and_signatures
+from synapse.api.errors import LimitExceededError
 from synapse.api.constants import Membership, EventTypes
-from synapse.types import UserID, RoomAlias, Requester
-from synapse.push.action_generator import ActionGenerator
+from synapse.types import UserID, Requester
 
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
 
 import logging
 
@@ -29,23 +26,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-VISIBILITY_PRIORITY = (
-    "world_readable",
-    "shared",
-    "invited",
-    "joined",
-)
-
-
-MEMBERSHIP_PRIORITY = (
-    Membership.JOIN,
-    Membership.INVITE,
-    Membership.KNOCK,
-    Membership.LEAVE,
-    Membership.BAN,
-)
-
-
 class BaseHandler(object):
     """
     Common base class for the event handlers.
@@ -65,161 +45,10 @@ class BaseHandler(object):
         self.clock = hs.get_clock()
         self.hs = hs
 
-        self.signing_key = hs.config.signing_key[0]
         self.server_name = hs.hostname
 
         self.event_builder_factory = hs.get_event_builder_factory()
 
-    @defer.inlineCallbacks
-    def filter_events_for_clients(self, user_tuples, events, event_id_to_state):
-        """ Returns dict of user_id -> list of events that user is allowed to
-        see.
-
-        Args:
-            user_tuples (str, bool): (user id, is_peeking) for each user to be
-                checked. is_peeking should be true if:
-                * the user is not currently a member of the room, and:
-                * the user has not been a member of the room since the
-                given events
-            events ([synapse.events.EventBase]): list of events to filter
-        """
-        forgotten = yield defer.gatherResults([
-            self.store.who_forgot_in_room(
-                room_id,
-            )
-            for room_id in frozenset(e.room_id for e in events)
-        ], consumeErrors=True)
-
-        # Set of membership event_ids that have been forgotten
-        event_id_forgotten = frozenset(
-            row["event_id"] for rows in forgotten for row in rows
-        )
-
-        def allowed(event, user_id, is_peeking):
-            """
-            Args:
-                event (synapse.events.EventBase): event to check
-                user_id (str)
-                is_peeking (bool)
-            """
-            state = event_id_to_state[event.event_id]
-
-            # get the room_visibility at the time of the event.
-            visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
-            if visibility_event:
-                visibility = visibility_event.content.get("history_visibility", "shared")
-            else:
-                visibility = "shared"
-
-            if visibility not in VISIBILITY_PRIORITY:
-                visibility = "shared"
-
-            # if it was world_readable, it's easy: everyone can read it
-            if visibility == "world_readable":
-                return True
-
-            # Always allow history visibility events on boundaries. This is done
-            # by setting the effective visibility to the least restrictive
-            # of the old vs new.
-            if event.type == EventTypes.RoomHistoryVisibility:
-                prev_content = event.unsigned.get("prev_content", {})
-                prev_visibility = prev_content.get("history_visibility", None)
-
-                if prev_visibility not in VISIBILITY_PRIORITY:
-                    prev_visibility = "shared"
-
-                new_priority = VISIBILITY_PRIORITY.index(visibility)
-                old_priority = VISIBILITY_PRIORITY.index(prev_visibility)
-                if old_priority < new_priority:
-                    visibility = prev_visibility
-
-            # likewise, if the event is the user's own membership event, use
-            # the 'most joined' membership
-            membership = None
-            if event.type == EventTypes.Member and event.state_key == user_id:
-                membership = event.content.get("membership", None)
-                if membership not in MEMBERSHIP_PRIORITY:
-                    membership = "leave"
-
-                prev_content = event.unsigned.get("prev_content", {})
-                prev_membership = prev_content.get("membership", None)
-                if prev_membership not in MEMBERSHIP_PRIORITY:
-                    prev_membership = "leave"
-
-                new_priority = MEMBERSHIP_PRIORITY.index(membership)
-                old_priority = MEMBERSHIP_PRIORITY.index(prev_membership)
-                if old_priority < new_priority:
-                    membership = prev_membership
-
-            # otherwise, get the user's membership at the time of the event.
-            if membership is None:
-                membership_event = state.get((EventTypes.Member, user_id), None)
-                if membership_event:
-                    if membership_event.event_id not in event_id_forgotten:
-                        membership = membership_event.membership
-
-            # if the user was a member of the room at the time of the event,
-            # they can see it.
-            if membership == Membership.JOIN:
-                return True
-
-            if visibility == "joined":
-                # we weren't a member at the time of the event, so we can't
-                # see this event.
-                return False
-
-            elif visibility == "invited":
-                # user can also see the event if they were *invited* at the time
-                # of the event.
-                return membership == Membership.INVITE
-
-            else:
-                # visibility is shared: user can also see the event if they have
-                # become a member since the event
-                #
-                # XXX: if the user has subsequently joined and then left again,
-                # ideally we would share history up to the point they left. But
-                # we don't know when they left.
-                return not is_peeking
-
-        defer.returnValue({
-            user_id: [
-                event
-                for event in events
-                if allowed(event, user_id, is_peeking)
-            ]
-            for user_id, is_peeking in user_tuples
-        })
-
-    @defer.inlineCallbacks
-    def _filter_events_for_client(self, user_id, events, is_peeking=False):
-        """
-        Check which events a user is allowed to see
-
-        Args:
-            user_id(str): user id to be checked
-            events([synapse.events.EventBase]): list of events to be checked
-            is_peeking(bool): should be True if:
-              * the user is not currently a member of the room, and:
-              * the user has not been a member of the room since the given
-                events
-
-        Returns:
-            [synapse.events.EventBase]
-        """
-        types = (
-            (EventTypes.RoomHistoryVisibility, ""),
-            (EventTypes.Member, user_id),
-        )
-        event_id_to_state = yield self.store.get_state_for_events(
-            frozenset(e.event_id for e in events),
-            types=types
-        )
-        res = yield self.filter_events_for_clients(
-            [(user_id, is_peeking)], events, event_id_to_state
-        )
-        defer.returnValue(res.get(user_id, []))
-
     def ratelimit(self, requester):
         time_now = self.clock.time()
         allowed, time_allowed = self.ratelimiter.send_message(
@@ -232,56 +61,6 @@ class BaseHandler(object):
                 retry_after_ms=int(1000 * (time_allowed - time_now)),
             )
 
-    @defer.inlineCallbacks
-    def _create_new_client_event(self, builder, prev_event_ids=None):
-        if prev_event_ids:
-            prev_events = yield self.store.add_event_hashes(prev_event_ids)
-            prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
-            depth = prev_max_depth + 1
-        else:
-            latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
-                builder.room_id,
-            )
-
-            if latest_ret:
-                depth = max([d for _, _, d in latest_ret]) + 1
-            else:
-                depth = 1
-
-            prev_events = [
-                (event_id, prev_hashes)
-                for event_id, prev_hashes, _ in latest_ret
-            ]
-
-        builder.prev_events = prev_events
-        builder.depth = depth
-
-        state_handler = self.state_handler
-
-        context = yield state_handler.compute_event_context(builder)
-
-        if builder.is_state():
-            builder.prev_state = yield self.store.add_event_hashes(
-                context.prev_state_events
-            )
-
-        yield self.auth.add_auth_events(builder, context)
-
-        add_hashes_and_signatures(
-            builder, self.server_name, self.signing_key
-        )
-
-        event = builder.build()
-
-        logger.debug(
-            "Created event %s with current state: %s",
-            event.event_id, context.current_state,
-        )
-
-        defer.returnValue(
-            (event, context,)
-        )
-
     def is_host_in_room(self, current_state):
         room_members = [
             (state_key, event.membership)
@@ -296,154 +75,13 @@ class BaseHandler(object):
                 return True
         for (state_key, membership) in room_members:
             if (
-                UserID.from_string(state_key).domain == self.hs.hostname
+                self.hs.is_mine_id(state_key)
                 and membership == Membership.JOIN
             ):
                 return True
         return False
 
     @defer.inlineCallbacks
-    def handle_new_client_event(
-        self,
-        requester,
-        event,
-        context,
-        ratelimit=True,
-        extra_users=[]
-    ):
-        # We now need to go and hit out to wherever we need to hit out to.
-
-        if ratelimit:
-            self.ratelimit(requester)
-
-        try:
-            self.auth.check(event, auth_events=context.current_state)
-        except AuthError as err:
-            logger.warn("Denying new event %r because %s", event, err)
-            raise err
-
-        yield self.maybe_kick_guest_users(event, context.current_state.values())
-
-        if event.type == EventTypes.CanonicalAlias:
-            # Check the alias is acually valid (at this time at least)
-            room_alias_str = event.content.get("alias", None)
-            if room_alias_str:
-                room_alias = RoomAlias.from_string(room_alias_str)
-                directory_handler = self.hs.get_handlers().directory_handler
-                mapping = yield directory_handler.get_association(room_alias)
-
-                if mapping["room_id"] != event.room_id:
-                    raise SynapseError(
-                        400,
-                        "Room alias %s does not point to the room" % (
-                            room_alias_str,
-                        )
-                    )
-
-        federation_handler = self.hs.get_handlers().federation_handler
-
-        if event.type == EventTypes.Member:
-            if event.content["membership"] == Membership.INVITE:
-                def is_inviter_member_event(e):
-                    return (
-                        e.type == EventTypes.Member and
-                        e.sender == event.sender
-                    )
-
-                event.unsigned["invite_room_state"] = [
-                    {
-                        "type": e.type,
-                        "state_key": e.state_key,
-                        "content": e.content,
-                        "sender": e.sender,
-                    }
-                    for k, e in context.current_state.items()
-                    if e.type in self.hs.config.room_invite_state_types
-                    or is_inviter_member_event(e)
-                ]
-
-                invitee = UserID.from_string(event.state_key)
-                if not self.hs.is_mine(invitee):
-                    # TODO: Can we add signature from remote server in a nicer
-                    # way? If we have been invited by a remote server, we need
-                    # to get them to sign the event.
-
-                    returned_invite = yield federation_handler.send_invite(
-                        invitee.domain,
-                        event,
-                    )
-
-                    event.unsigned.pop("room_state", None)
-
-                    # TODO: Make sure the signatures actually are correct.
-                    event.signatures.update(
-                        returned_invite.signatures
-                    )
-
-        if event.type == EventTypes.Redaction:
-            if self.auth.check_redaction(event, auth_events=context.current_state):
-                original_event = yield self.store.get_event(
-                    event.redacts,
-                    check_redacted=False,
-                    get_prev_content=False,
-                    allow_rejected=False,
-                    allow_none=False
-                )
-                if event.user_id != original_event.user_id:
-                    raise AuthError(
-                        403,
-                        "You don't have permission to redact events"
-                    )
-
-        if event.type == EventTypes.Create and context.current_state:
-            raise AuthError(
-                403,
-                "Changing the room create event is forbidden",
-            )
-
-        action_generator = ActionGenerator(self.hs)
-        yield action_generator.handle_push_actions_for_event(
-            event, context, self
-        )
-
-        (event_stream_id, max_stream_id) = yield self.store.persist_event(
-            event, context=context
-        )
-
-        # this intentionally does not yield: we don't care about the result
-        # and don't need to wait for it.
-        preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
-            event_stream_id, max_stream_id
-        )
-
-        destinations = set()
-        for k, s in context.current_state.items():
-            try:
-                if k[0] == EventTypes.Member:
-                    if s.content["membership"] == Membership.JOIN:
-                        destinations.add(
-                            UserID.from_string(s.state_key).domain
-                        )
-            except SynapseError:
-                logger.warn(
-                    "Failed to get destination from event %s", s.event_id
-                )
-
-        with PreserveLoggingContext():
-            # Don't block waiting on waking up all the listeners.
-            self.notifier.on_new_room_event(
-                event, event_stream_id, max_stream_id,
-                extra_users=extra_users
-            )
-
-        # If invite, remove room_state from unsigned before sending.
-        event.unsigned.pop("invite_room_state", None)
-
-        federation_handler.handle_new_event(
-            event, destinations=destinations,
-        )
-
-    @defer.inlineCallbacks
     def maybe_kick_guest_users(self, event, current_state):
         # Technically this function invalidates current_state by changing it.
         # Hopefully this isn't that important to the caller.
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 61fe56032a..6e7d080ecc 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -615,4 +615,7 @@ class AuthHandler(BaseHandler):
         Returns:
             Whether self.hash(password) == stored_hash (bool).
         """
-        return bcrypt.hashpw(password, stored_hash) == stored_hash
+        if stored_hash:
+            return bcrypt.hashpw(password, stored_hash) == stored_hash
+        else:
+            return False
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d95e0b23b1..c21d9d4d83 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -33,7 +33,7 @@ from synapse.util.frozenutils import unfreeze
 from synapse.crypto.event_signing import (
     compute_event_signature, add_hashes_and_signatures,
 )
-from synapse.types import UserID
+from synapse.types import UserID, get_domian_from_id
 
 from synapse.events.utils import prune_event
 
@@ -453,7 +453,7 @@ class FederationHandler(BaseHandler):
             joined_domains = {}
             for u, d in joined_users:
                 try:
-                    dom = UserID.from_string(u).domain
+                    dom = get_domian_from_id(u)
                     old_d = joined_domains.get(dom)
                     if old_d:
                         joined_domains[dom] = min(d, old_d)
@@ -682,7 +682,8 @@ class FederationHandler(BaseHandler):
         })
 
         try:
-            event, context = yield self._create_new_client_event(
+            message_handler = self.hs.get_handlers().message_handler
+            event, context = yield message_handler._create_new_client_event(
                 builder=builder,
             )
         except AuthError as e:
@@ -743,9 +744,7 @@ class FederationHandler(BaseHandler):
             try:
                 if k[0] == EventTypes.Member:
                     if s.content["membership"] == Membership.JOIN:
-                        destinations.add(
-                            UserID.from_string(s.state_key).domain
-                        )
+                        destinations.add(get_domian_from_id(s.state_key))
             except:
                 logger.warn(
                     "Failed to get destination from event %s", s.event_id
@@ -915,7 +914,8 @@ class FederationHandler(BaseHandler):
             "state_key": user_id,
         })
 
-        event, context = yield self._create_new_client_event(
+        message_handler = self.hs.get_handlers().message_handler
+        event, context = yield message_handler._create_new_client_event(
             builder=builder,
         )
 
@@ -970,9 +970,7 @@ class FederationHandler(BaseHandler):
             try:
                 if k[0] == EventTypes.Member:
                     if s.content["membership"] == Membership.LEAVE:
-                        destinations.add(
-                            UserID.from_string(s.state_key).domain
-                        )
+                        destinations.add(get_domian_from_id(s.state_key))
             except:
                 logger.warn(
                     "Failed to get destination from event %s", s.event_id
@@ -1115,7 +1113,7 @@ class FederationHandler(BaseHandler):
         if not event.internal_metadata.is_outlier():
             action_generator = ActionGenerator(self.hs)
             yield action_generator.handle_push_actions_for_event(
-                event, context, self
+                event, context
             )
 
         event_stream_id, max_stream_id = yield self.store.persist_event(
@@ -1692,7 +1690,10 @@ class FederationHandler(BaseHandler):
         if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
             builder = self.event_builder_factory.new(event_dict)
             EventValidator().validate_new(builder)
-            event, context = yield self._create_new_client_event(builder=builder)
+            message_handler = self.hs.get_handlers().message_handler
+            event, context = yield message_handler._create_new_client_event(
+                builder=builder
+            )
 
             event, context = yield self.add_display_name_to_third_party_invite(
                 event_dict, event, context
@@ -1720,7 +1721,8 @@ class FederationHandler(BaseHandler):
     def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
         builder = self.event_builder_factory.new(event_dict)
 
-        event, context = yield self._create_new_client_event(
+        message_handler = self.hs.get_handlers().message_handler
+        event, context = yield message_handler._create_new_client_event(
             builder=builder,
         )
 
@@ -1759,7 +1761,8 @@ class FederationHandler(BaseHandler):
         event_dict["content"]["third_party_invite"]["display_name"] = display_name
         builder = self.event_builder_factory.new(event_dict)
         EventValidator().validate_new(builder)
-        event, context = yield self._create_new_client_event(builder=builder)
+        message_handler = self.hs.get_handlers().message_handler
+        event, context = yield message_handler._create_new_client_event(builder=builder)
         defer.returnValue((event, context))
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index f51feda2f4..13154edb78 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -17,13 +17,19 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.streams.config import PaginationConfig
+from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
+from synapse.push.action_generator import ActionGenerator
+from synapse.streams.config import PaginationConfig
+from synapse.types import (
+    UserID, RoomAlias, RoomStreamToken, StreamToken, get_domian_from_id
+)
 from synapse.util import unwrapFirstError
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.types import UserID, RoomStreamToken, StreamToken
+from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
@@ -123,7 +129,8 @@ class MessageHandler(BaseHandler):
                 "end": next_token.to_string(),
             })
 
-        events = yield self._filter_events_for_client(
+        events = yield filter_events_for_client(
+            self.store,
             user_id,
             events,
             is_peeking=(member_event_id is None),
@@ -483,8 +490,8 @@ class MessageHandler(BaseHandler):
                     ]
                 ).addErrback(unwrapFirstError)
 
-                messages = yield self._filter_events_for_client(
-                    user_id, messages
+                messages = yield filter_events_for_client(
+                    self.store, user_id, messages
                 )
 
                 start_token = now_token.copy_and_replace("room_key", token[0])
@@ -619,8 +626,8 @@ class MessageHandler(BaseHandler):
             end_token=stream_token
         )
 
-        messages = yield self._filter_events_for_client(
-            user_id, messages, is_peeking=is_peeking
+        messages = yield filter_events_for_client(
+            self.store, user_id, messages, is_peeking=is_peeking
         )
 
         start_token = StreamToken.START.copy_and_replace("room_key", token[0])
@@ -700,8 +707,8 @@ class MessageHandler(BaseHandler):
             consumeErrors=True,
         ).addErrback(unwrapFirstError)
 
-        messages = yield self._filter_events_for_client(
-            user_id, messages, is_peeking=is_peeking,
+        messages = yield filter_events_for_client(
+            self.store, user_id, messages, is_peeking=is_peeking,
         )
 
         start_token = now_token.copy_and_replace("room_key", token[0])
@@ -724,3 +731,193 @@ class MessageHandler(BaseHandler):
             ret["membership"] = membership
 
         defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def _create_new_client_event(self, builder, prev_event_ids=None):
+        if prev_event_ids:
+            prev_events = yield self.store.add_event_hashes(prev_event_ids)
+            prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
+            depth = prev_max_depth + 1
+        else:
+            latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
+                builder.room_id,
+            )
+
+            if latest_ret:
+                depth = max([d for _, _, d in latest_ret]) + 1
+            else:
+                depth = 1
+
+            prev_events = [
+                (event_id, prev_hashes)
+                for event_id, prev_hashes, _ in latest_ret
+            ]
+
+        builder.prev_events = prev_events
+        builder.depth = depth
+
+        state_handler = self.state_handler
+
+        context = yield state_handler.compute_event_context(builder)
+
+        if builder.is_state():
+            builder.prev_state = yield self.store.add_event_hashes(
+                context.prev_state_events
+            )
+
+        yield self.auth.add_auth_events(builder, context)
+
+        signing_key = self.hs.config.signing_key[0]
+        add_hashes_and_signatures(
+            builder, self.server_name, signing_key
+        )
+
+        event = builder.build()
+
+        logger.debug(
+            "Created event %s with current state: %s",
+            event.event_id, context.current_state,
+        )
+
+        defer.returnValue(
+            (event, context,)
+        )
+
+    @defer.inlineCallbacks
+    def handle_new_client_event(
+        self,
+        requester,
+        event,
+        context,
+        ratelimit=True,
+        extra_users=[]
+    ):
+        # We now need to go and hit out to wherever we need to hit out to.
+
+        if ratelimit:
+            self.ratelimit(requester)
+
+        try:
+            self.auth.check(event, auth_events=context.current_state)
+        except AuthError as err:
+            logger.warn("Denying new event %r because %s", event, err)
+            raise err
+
+        yield self.maybe_kick_guest_users(event, context.current_state.values())
+
+        if event.type == EventTypes.CanonicalAlias:
+            # Check the alias is acually valid (at this time at least)
+            room_alias_str = event.content.get("alias", None)
+            if room_alias_str:
+                room_alias = RoomAlias.from_string(room_alias_str)
+                directory_handler = self.hs.get_handlers().directory_handler
+                mapping = yield directory_handler.get_association(room_alias)
+
+                if mapping["room_id"] != event.room_id:
+                    raise SynapseError(
+                        400,
+                        "Room alias %s does not point to the room" % (
+                            room_alias_str,
+                        )
+                    )
+
+        federation_handler = self.hs.get_handlers().federation_handler
+
+        if event.type == EventTypes.Member:
+            if event.content["membership"] == Membership.INVITE:
+                def is_inviter_member_event(e):
+                    return (
+                        e.type == EventTypes.Member and
+                        e.sender == event.sender
+                    )
+
+                event.unsigned["invite_room_state"] = [
+                    {
+                        "type": e.type,
+                        "state_key": e.state_key,
+                        "content": e.content,
+                        "sender": e.sender,
+                    }
+                    for k, e in context.current_state.items()
+                    if e.type in self.hs.config.room_invite_state_types
+                    or is_inviter_member_event(e)
+                ]
+
+                invitee = UserID.from_string(event.state_key)
+                if not self.hs.is_mine(invitee):
+                    # TODO: Can we add signature from remote server in a nicer
+                    # way? If we have been invited by a remote server, we need
+                    # to get them to sign the event.
+
+                    returned_invite = yield federation_handler.send_invite(
+                        invitee.domain,
+                        event,
+                    )
+
+                    event.unsigned.pop("room_state", None)
+
+                    # TODO: Make sure the signatures actually are correct.
+                    event.signatures.update(
+                        returned_invite.signatures
+                    )
+
+        if event.type == EventTypes.Redaction:
+            if self.auth.check_redaction(event, auth_events=context.current_state):
+                original_event = yield self.store.get_event(
+                    event.redacts,
+                    check_redacted=False,
+                    get_prev_content=False,
+                    allow_rejected=False,
+                    allow_none=False
+                )
+                if event.user_id != original_event.user_id:
+                    raise AuthError(
+                        403,
+                        "You don't have permission to redact events"
+                    )
+
+        if event.type == EventTypes.Create and context.current_state:
+            raise AuthError(
+                403,
+                "Changing the room create event is forbidden",
+            )
+
+        action_generator = ActionGenerator(self.hs)
+        yield action_generator.handle_push_actions_for_event(
+            event, context
+        )
+
+        (event_stream_id, max_stream_id) = yield self.store.persist_event(
+            event, context=context
+        )
+
+        # this intentionally does not yield: we don't care about the result
+        # and don't need to wait for it.
+        preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+            event_stream_id, max_stream_id
+        )
+
+        destinations = set()
+        for k, s in context.current_state.items():
+            try:
+                if k[0] == EventTypes.Member:
+                    if s.content["membership"] == Membership.JOIN:
+                        destinations.add(get_domian_from_id(s.state_key))
+            except SynapseError:
+                logger.warn(
+                    "Failed to get destination from event %s", s.event_id
+                )
+
+        with PreserveLoggingContext():
+            # Don't block waiting on waking up all the listeners.
+            self.notifier.on_new_room_event(
+                event, event_stream_id, max_stream_id,
+                extra_users=extra_users
+            )
+
+        # If invite, remove room_state from unsigned before sending.
+        event.unsigned.pop("invite_room_state", None)
+
+        federation_handler.handle_new_event(
+            event, destinations=destinations,
+        )
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 639567953a..a8529cce42 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -33,7 +33,7 @@ from synapse.util.logcontext import preserve_fn
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
-from synapse.types import UserID
+from synapse.types import UserID, get_domian_from_id
 import synapse.metrics
 
 from ._base import BaseHandler
@@ -440,7 +440,7 @@ class PresenceHandler(BaseHandler):
             if not local_states:
                 continue
 
-            host = UserID.from_string(user_id).domain
+            host = get_domian_from_id(user_id)
             hosts_to_states.setdefault(host, []).extend(local_states)
 
         # TODO: de-dup hosts_to_states, as a single host might have multiple
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index dd9c18df84..3d63b3c513 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -26,6 +26,7 @@ from synapse.api.errors import AuthError, StoreError, SynapseError
 from synapse.util import stringutils
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
+from synapse.visibility import filter_events_for_client
 
 from collections import OrderedDict
 
@@ -449,10 +450,12 @@ class RoomContextHandler(BaseHandler):
         now_token = yield self.hs.get_event_sources().get_current_token()
 
         def filter_evts(events):
-            return self._filter_events_for_client(
+            return filter_events_for_client(
+                self.store,
                 user.to_string(),
                 events,
-                is_peeking=is_guest)
+                is_peeking=is_guest
+            )
 
         event = yield self.store.get_event(event_id, get_prev_content=True,
                                            allow_none=True)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ed2cda837f..b44e52a515 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -113,7 +113,7 @@ class RoomMemberHandler(BaseHandler):
             prev_event_ids=prev_event_ids,
         )
 
-        yield self.handle_new_client_event(
+        yield msg_handler.handle_new_client_event(
             requester,
             event,
             context,
@@ -357,7 +357,7 @@ class RoomMemberHandler(BaseHandler):
                 # so don't really fit into the general auth process.
                 raise AuthError(403, "Guest access not allowed")
 
-        yield self.handle_new_client_event(
+        yield message_handler.handle_new_client_event(
             requester,
             event,
             context,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 9937d8dd7f..df75d70fac 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -21,6 +21,7 @@ from synapse.api.constants import Membership, EventTypes
 from synapse.api.filtering import Filter
 from synapse.api.errors import SynapseError
 from synapse.events.utils import serialize_event
+from synapse.visibility import filter_events_for_client
 
 from unpaddedbase64 import decode_base64, encode_base64
 
@@ -172,8 +173,8 @@ class SearchHandler(BaseHandler):
 
             filtered_events = search_filter.filter([r["event"] for r in results])
 
-            events = yield self._filter_events_for_client(
-                user.to_string(), filtered_events
+            events = yield filter_events_for_client(
+                self.store, user.to_string(), filtered_events
             )
 
             events.sort(key=lambda e: -rank_map[e.event_id])
@@ -223,8 +224,8 @@ class SearchHandler(BaseHandler):
                     r["event"] for r in results
                 ])
 
-                events = yield self._filter_events_for_client(
-                    user.to_string(), filtered_events
+                events = yield filter_events_for_client(
+                    self.store, user.to_string(), filtered_events
                 )
 
                 room_events.extend(events)
@@ -281,12 +282,12 @@ class SearchHandler(BaseHandler):
                     event.room_id, event.event_id, before_limit, after_limit
                 )
 
-                res["events_before"] = yield self._filter_events_for_client(
-                    user.to_string(), res["events_before"]
+                res["events_before"] = yield filter_events_for_client(
+                    self.store, user.to_string(), res["events_before"]
                 )
 
-                res["events_after"] = yield self._filter_events_for_client(
-                    user.to_string(), res["events_after"]
+                res["events_after"] = yield filter_events_for_client(
+                    self.store, user.to_string(), res["events_after"]
                 )
 
                 res["start"] = now_token.copy_and_replace(
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 231140b655..921215469f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -22,6 +22,7 @@ from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
+from synapse.visibility import filter_events_for_client
 
 from twisted.internet import defer
 
@@ -247,6 +248,10 @@ class SyncHandler(BaseHandler):
             sync_config.user.to_string()
         )
 
+        ignored_users = account_data.get(
+            "m.ignored_user_list", {}
+        ).get("ignored_users", {}).keys()
+
         joined = []
         invited = []
         archived = []
@@ -267,6 +272,8 @@ class SyncHandler(BaseHandler):
                 )
                 joined.append(room_result)
             elif event.membership == Membership.INVITE:
+                if event.sender in ignored_users:
+                    return
                 invite = yield self.store.get_event(event.event_id)
                 invited.append(InvitedSyncResult(
                     room_id=event.room_id,
@@ -515,6 +522,15 @@ class SyncHandler(BaseHandler):
                 sync_config.user
             )
 
+        ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
+            "m.ignored_user_list", user_id=user_id,
+        )
+
+        if ignored_account_data:
+            ignored_users = ignored_account_data.get("ignored_users", {}).keys()
+        else:
+            ignored_users = frozenset()
+
         # Get a list of membership change events that have happened.
         rooms_changed = yield self.store.get_membership_changes_for_user(
             user_id, since_token.room_key, now_token.room_key
@@ -549,9 +565,10 @@ class SyncHandler(BaseHandler):
             # Only bother if we're still currently invited
             should_invite = non_joins[-1].membership == Membership.INVITE
             if should_invite:
-                room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
-                if room_sync:
-                    invited.append(room_sync)
+                if event.sender not in ignored_users:
+                    room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
+                    if room_sync:
+                        invited.append(room_sync)
 
             # Always include leave/ban events. Just take the last one.
             # TODO: How do we handle ban -> leave in same batch?
@@ -681,7 +698,8 @@ class SyncHandler(BaseHandler):
 
             if recents is not None:
                 recents = sync_config.filter_collection.filter_room_timeline(recents)
-                recents = yield self._filter_events_for_client(
+                recents = yield filter_events_for_client(
+                    self.store,
                     sync_config.user.to_string(),
                     recents,
                 )
@@ -702,7 +720,8 @@ class SyncHandler(BaseHandler):
                 loaded_recents = sync_config.filter_collection.filter_room_timeline(
                     events
                 )
-                loaded_recents = yield self._filter_events_for_client(
+                loaded_recents = yield filter_events_for_client(
+                    self.store,
                     sync_config.user.to_string(),
                     loaded_recents,
                 )
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 6af7a8f424..33b79c0ec7 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -21,6 +21,7 @@ from synapse.util.logutils import log_function
 from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.types import StreamToken
+from synapse.visibility import filter_events_for_client
 import synapse.metrics
 
 from collections import namedtuple
@@ -398,8 +399,8 @@ class Notifier(object):
                 )
 
                 if name == "room":
-                    room_member_handler = self.hs.get_handlers().room_member_handler
-                    new_events = yield room_member_handler._filter_events_for_client(
+                    new_events = yield filter_events_for_client(
+                        self.store,
                         user.to_string(),
                         new_events,
                         is_peeking=is_peeking,
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index a0160994b7..9b208668b6 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -37,14 +37,14 @@ class ActionGenerator:
         # tag (ie. we just need all the users).
 
     @defer.inlineCallbacks
-    def handle_push_actions_for_event(self, event, context, handler):
+    def handle_push_actions_for_event(self, event, context):
         with Measure(self.clock, "handle_push_actions_for_event"):
             bulk_evaluator = yield evaluator_for_event(
                 event, self.hs, self.store
             )
 
             actions_by_user = yield bulk_evaluator.action_for_event_by_user(
-                event, handler, context.current_state
+                event, context.current_state
             )
 
             context.push_actions = [
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index f97df36d80..25e13b3423 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -22,6 +22,7 @@ from .baserules import list_with_base_rules
 from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
 from synapse.api.constants import EventTypes
+from synapse.visibility import filter_events_for_clients
 
 
 logger = logging.getLogger(__name__)
@@ -126,7 +127,7 @@ class BulkPushRuleEvaluator:
         self.store = store
 
     @defer.inlineCallbacks
-    def action_for_event_by_user(self, event, handler, current_state):
+    def action_for_event_by_user(self, event, current_state):
         actions_by_user = {}
 
         # None of these users can be peeking since this list of users comes
@@ -136,8 +137,8 @@ class BulkPushRuleEvaluator:
             (u, False) for u in self.rules_by_user.keys()
         ]
 
-        filtered_by_user = yield handler.filter_events_for_clients(
-            user_tuples, [event], {event.event_id: current_state}
+        filtered_by_user = yield filter_events_for_clients(
+            self.store, user_tuples, [event], {event.event_id: current_state}
         )
 
         room_members = yield self.store.get_users_in_room(self.room_id)
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
new file mode 100644
index 0000000000..3a13c7485a
--- /dev/null
+++ b/synapse/push/emailpusher.py
@@ -0,0 +1,251 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer, reactor
+
+import logging
+
+from synapse.util.metrics import Measure
+from synapse.util.logcontext import LoggingContext
+
+from mailer import Mailer
+
+logger = logging.getLogger(__name__)
+
+# The amount of time we always wait before ever emailing about a notification
+# (to give the user a chance to respond to other push or notice the window)
+DELAY_BEFORE_MAIL_MS = 2 * 60 * 1000
+
+THROTTLE_START_MS = 2 * 60 * 1000
+THROTTLE_MAX_MS = (2 * 60 * 1000) * (2 ** 11)  # ~3 days
+THROTTLE_MULTIPLIER = 2
+
+# If no event triggers a notification for this long after the previous,
+# the throttle is released.
+THROTTLE_RESET_AFTER_MS = (2 * 60 * 1000) * (2 ** 11)  # ~3 days
+
+
+class EmailPusher(object):
+    """
+    A pusher that sends email notifications about events (approximately)
+    when they happen.
+    This shares quite a bit of code with httpusher: it would be good to
+    factor out the common parts
+    """
+    def __init__(self, hs, pusherdict):
+        self.hs = hs
+        self.store = self.hs.get_datastore()
+        self.clock = self.hs.get_clock()
+        self.pusher_id = pusherdict['id']
+        self.user_id = pusherdict['user_name']
+        self.app_id = pusherdict['app_id']
+        self.email = pusherdict['pushkey']
+        self.last_stream_ordering = pusherdict['last_stream_ordering']
+        self.timed_call = None
+        self.throttle_params = None
+
+        # See httppusher
+        self.max_stream_ordering = None
+
+        self.processing = False
+
+        if self.hs.config.email_enable_notifs:
+            self.mailer = Mailer(self.hs)
+        else:
+            self.mailer = None
+
+    @defer.inlineCallbacks
+    def on_started(self):
+        if self.mailer is not None:
+            self.throttle_params = yield self.store.get_throttle_params_by_room(
+                self.pusher_id
+            )
+            yield self._process()
+
+    def on_stop(self):
+        if self.timed_call:
+            self.timed_call.cancel()
+
+    @defer.inlineCallbacks
+    def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
+        self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
+        yield self._process()
+
+    def on_new_receipts(self, min_stream_id, max_stream_id):
+        # We could wake up and cancel the timer but there tend to be quite a
+        # lot of read receipts so it's probably less work to just let the
+        # timer fire
+        return defer.succeed(None)
+
+    @defer.inlineCallbacks
+    def on_timer(self):
+        self.timed_call = None
+        yield self._process()
+
+    @defer.inlineCallbacks
+    def _process(self):
+        if self.processing:
+            return
+
+        with LoggingContext("emailpush._process"):
+            with Measure(self.clock, "emailpush._process"):
+                try:
+                    self.processing = True
+                    # if the max ordering changes while we're running _unsafe_process,
+                    # call it again, and so on until we've caught up.
+                    while True:
+                        starting_max_ordering = self.max_stream_ordering
+                        try:
+                            yield self._unsafe_process()
+                        except:
+                            logger.exception("Exception processing notifs")
+                        if self.max_stream_ordering == starting_max_ordering:
+                            break
+                finally:
+                    self.processing = False
+
+    @defer.inlineCallbacks
+    def _unsafe_process(self):
+        """
+        Main logic of the push loop without the wrapper function that sets
+        up logging, measures and guards against multiple instances of it
+        being run.
+        """
+        unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
+            self.user_id, self.last_stream_ordering, self.max_stream_ordering
+        )
+
+        soonest_due_at = None
+
+        for push_action in unprocessed:
+            received_at = push_action['received_ts']
+            if received_at is None:
+                received_at = 0
+            notif_ready_at = received_at + DELAY_BEFORE_MAIL_MS
+
+            room_ready_at = self.room_ready_to_notify_at(
+                push_action['room_id']
+            )
+
+            should_notify_at = max(notif_ready_at, room_ready_at)
+
+            if should_notify_at < self.clock.time_msec():
+                # one of our notifications is ready for sending, so we send
+                # *one* email updating the user on their notifications,
+                # we then consider all previously outstanding notifications
+                # to be delivered.
+                yield self.send_notification(unprocessed)
+
+                yield self.save_last_stream_ordering_and_success(max([
+                    ea['stream_ordering'] for ea in unprocessed
+                ]))
+                yield self.sent_notif_update_throttle(
+                    push_action['room_id'], push_action
+                )
+                break
+            else:
+                if soonest_due_at is None or should_notify_at < soonest_due_at:
+                    soonest_due_at = should_notify_at
+
+                if self.timed_call is not None:
+                    self.timed_call.cancel()
+                    self.timed_call = None
+
+        if soonest_due_at is not None:
+            self.timed_call = reactor.callLater(
+                self.seconds_until(soonest_due_at), self.on_timer
+            )
+
+    @defer.inlineCallbacks
+    def save_last_stream_ordering_and_success(self, last_stream_ordering):
+        self.last_stream_ordering = last_stream_ordering
+        yield self.store.update_pusher_last_stream_ordering_and_success(
+            self.app_id, self.email, self.user_id,
+            last_stream_ordering, self.clock.time_msec()
+        )
+
+    def seconds_until(self, ts_msec):
+        return (ts_msec - self.clock.time_msec()) / 1000
+
+    def get_room_throttle_ms(self, room_id):
+        if room_id in self.throttle_params:
+            return self.throttle_params[room_id]["throttle_ms"]
+        else:
+            return 0
+
+    def get_room_last_sent_ts(self, room_id):
+        if room_id in self.throttle_params:
+            return self.throttle_params[room_id]["last_sent_ts"]
+        else:
+            return 0
+
+    def room_ready_to_notify_at(self, room_id):
+        """
+        Determines whether throttling should prevent us from sending an email
+        for the given room
+        Returns: True if we should send, False if we should not
+        """
+        last_sent_ts = self.get_room_last_sent_ts(room_id)
+        throttle_ms = self.get_room_throttle_ms(room_id)
+
+        may_send_at = last_sent_ts + throttle_ms
+        return may_send_at
+
+    @defer.inlineCallbacks
+    def sent_notif_update_throttle(self, room_id, notified_push_action):
+        # We have sent a notification, so update the throttle accordingly.
+        # If the event that triggered the notif happened more than
+        # THROTTLE_RESET_AFTER_MS after the previous one that triggered a
+        # notif, we release the throttle. Otherwise, the throttle is increased.
+        time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before(
+            notified_push_action['stream_ordering']
+        )
+
+        time_of_this_notifs = notified_push_action['received_ts']
+
+        if time_of_previous_notifs is not None and time_of_this_notifs is not None:
+            gap = time_of_this_notifs - time_of_previous_notifs
+        else:
+            # if we don't know the arrival time of one of the notifs (it was not
+            # stored prior to email notification code) then assume a gap of
+            # zero which will just not reset the throttle
+            gap = 0
+
+        current_throttle_ms = self.get_room_throttle_ms(room_id)
+
+        if gap > THROTTLE_RESET_AFTER_MS:
+            new_throttle_ms = THROTTLE_START_MS
+        else:
+            if current_throttle_ms == 0:
+                new_throttle_ms = THROTTLE_START_MS
+            else:
+                new_throttle_ms = min(
+                    current_throttle_ms * THROTTLE_MULTIPLIER,
+                    THROTTLE_MAX_MS
+                )
+        self.throttle_params[room_id] = {
+            "last_sent_ts": self.clock.time_msec(),
+            "throttle_ms": new_throttle_ms
+        }
+        yield self.store.set_throttle_params(
+            self.pusher_id, room_id, self.throttle_params[room_id]
+        )
+
+    @defer.inlineCallbacks
+    def send_notification(self, push_actions):
+        logger.info("Sending notif email for user %r", self.user_id)
+        yield self.mailer.send_notification_mail(
+            self.user_id, self.email, push_actions
+        )
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
new file mode 100644
index 0000000000..5d60c1efcf
--- /dev/null
+++ b/synapse/push/mailer.py
@@ -0,0 +1,455 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+from twisted.mail.smtp import sendmail
+
+import email.utils
+import email.mime.multipart
+from email.mime.text import MIMEText
+from email.mime.multipart import MIMEMultipart
+
+from synapse.util.async import concurrently_execute
+from synapse.util.presentable_names import (
+    calculate_room_name, name_from_member_event, descriptor_from_member_events
+)
+from synapse.types import UserID
+from synapse.api.errors import StoreError
+from synapse.api.constants import EventTypes
+from synapse.visibility import filter_events_for_client
+
+import jinja2
+import bleach
+
+import time
+import urllib
+
+import logging
+logger = logging.getLogger(__name__)
+
+
+MESSAGE_FROM_PERSON_IN_ROOM = "You have a message on %(app)s from %(person)s " \
+                              "in the %s room..."
+MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..."
+MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..."
+MESSAGES_IN_ROOM = "There are some messages on %(app)s for you in the %(room)s room..."
+MESSAGES_IN_ROOMS = "Here are some messages on %(app)s you may have missed..."
+INVITE_FROM_PERSON_TO_ROOM = "%(person)s has invited you to join the " \
+                             "%(room)s room on %(app)s..."
+INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..."
+
+CONTEXT_BEFORE = 1
+CONTEXT_AFTER = 1
+
+# From https://github.com/matrix-org/matrix-react-sdk/blob/master/src/HtmlUtils.js
+ALLOWED_TAGS = [
+    'font',  # custom to matrix for IRC-style font coloring
+    'del',  # for markdown
+    # deliberately no h1/h2 to stop people shouting.
+    'h3', 'h4', 'h5', 'h6', 'blockquote', 'p', 'a', 'ul', 'ol',
+    'nl', 'li', 'b', 'i', 'u', 'strong', 'em', 'strike', 'code', 'hr', 'br', 'div',
+    'table', 'thead', 'caption', 'tbody', 'tr', 'th', 'td', 'pre'
+]
+ALLOWED_ATTRS = {
+    # custom ones first:
+    "font": ["color"],  # custom to matrix
+    "a": ["href", "name", "target"],  # remote target: custom to matrix
+    # We don't currently allow img itself by default, but this
+    # would make sense if we did
+    "img": ["src"],
+}
+# When bleach release a version with this option, we can specify schemes
+# ALLOWED_SCHEMES = ["http", "https", "ftp", "mailto"]
+
+
+class Mailer(object):
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = self.hs.get_datastore()
+        self.state_handler = self.hs.get_state_handler()
+        loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir)
+        self.app_name = self.hs.config.email_app_name
+        env = jinja2.Environment(loader=loader)
+        env.filters["format_ts"] = format_ts_filter
+        env.filters["mxc_to_http"] = self.mxc_to_http_filter
+        self.notif_template_html = env.get_template(
+            self.hs.config.email_notif_template_html
+        )
+        self.notif_template_text = env.get_template(
+            self.hs.config.email_notif_template_text
+        )
+
+    @defer.inlineCallbacks
+    def send_notification_mail(self, user_id, email_address, push_actions):
+        raw_from = email.utils.parseaddr(self.hs.config.email_notif_from)[1]
+        raw_to = email.utils.parseaddr(email_address)[1]
+
+        if raw_to == '':
+            raise RuntimeError("Invalid 'to' address")
+
+        rooms_in_order = deduped_ordered_list(
+            [pa['room_id'] for pa in push_actions]
+        )
+
+        notif_events = yield self.store.get_events(
+            [pa['event_id'] for pa in push_actions]
+        )
+
+        notifs_by_room = {}
+        for pa in push_actions:
+            notifs_by_room.setdefault(pa["room_id"], []).append(pa)
+
+        # collect the current state for all the rooms in which we have
+        # notifications
+        state_by_room = {}
+
+        try:
+            user_display_name = yield self.store.get_profile_displayname(
+                UserID.from_string(user_id).localpart
+            )
+        except StoreError:
+            user_display_name = user_id
+
+        @defer.inlineCallbacks
+        def _fetch_room_state(room_id):
+            room_state = yield self.state_handler.get_current_state(room_id)
+            state_by_room[room_id] = room_state
+
+        # Run at most 3 of these at once: sync does 10 at a time but email
+        # notifs are much realtime than sync so we can afford to wait a bit.
+        yield concurrently_execute(_fetch_room_state, rooms_in_order, 3)
+
+        rooms = []
+
+        for r in rooms_in_order:
+            roomvars = yield self.get_room_vars(
+                r, user_id, notifs_by_room[r], notif_events, state_by_room[r]
+            )
+            rooms.append(roomvars)
+
+        summary_text = self.make_summary_text(
+            notifs_by_room, state_by_room, notif_events, user_id
+        )
+
+        template_vars = {
+            "user_display_name": user_display_name,
+            "unsubscribe_link": self.make_unsubscribe_link(),
+            "summary_text": summary_text,
+            "app_name": self.app_name,
+            "rooms": rooms,
+        }
+
+        html_text = self.notif_template_html.render(**template_vars)
+        html_part = MIMEText(html_text, "html", "utf8")
+
+        plain_text = self.notif_template_text.render(**template_vars)
+        text_part = MIMEText(plain_text, "plain", "utf8")
+
+        multipart_msg = MIMEMultipart('alternative')
+        multipart_msg['Subject'] = "[%s] %s" % (self.app_name, summary_text)
+        multipart_msg['From'] = self.hs.config.email_notif_from
+        multipart_msg['To'] = email_address
+        multipart_msg['Date'] = email.utils.formatdate()
+        multipart_msg['Message-ID'] = email.utils.make_msgid()
+        multipart_msg.attach(text_part)
+        multipart_msg.attach(html_part)
+
+        logger.info("Sending email push notification to %s" % email_address)
+        # logger.debug(html_text)
+
+        yield sendmail(
+            self.hs.config.email_smtp_host,
+            raw_from, raw_to, multipart_msg.as_string(),
+            port=self.hs.config.email_smtp_port
+        )
+
+    @defer.inlineCallbacks
+    def get_room_vars(self, room_id, user_id, notifs, notif_events, room_state):
+        my_member_event = room_state[("m.room.member", user_id)]
+        is_invite = my_member_event.content["membership"] == "invite"
+
+        room_vars = {
+            "title": calculate_room_name(room_state, user_id),
+            "hash": string_ordinal_total(room_id),  # See sender avatar hash
+            "notifs": [],
+            "invite": is_invite,
+            "link": self.make_room_link(room_id),
+        }
+
+        if not is_invite:
+            for n in notifs:
+                notifvars = yield self.get_notif_vars(
+                    n, user_id, notif_events[n['event_id']], room_state
+                )
+
+                # merge overlapping notifs together.
+                # relies on the notifs being in chronological order.
+                merge = False
+                if room_vars['notifs'] and 'messages' in room_vars['notifs'][-1]:
+                    prev_messages = room_vars['notifs'][-1]['messages']
+                    for message in notifvars['messages']:
+                        pm = filter(lambda pm: pm['id'] == message['id'], prev_messages)
+                        if pm:
+                            if not message["is_historical"]:
+                                pm[0]["is_historical"] = False
+                            merge = True
+                        elif merge:
+                            # we're merging, so append any remaining messages
+                            # in this notif to the previous one
+                            prev_messages.append(message)
+
+                if not merge:
+                    room_vars['notifs'].append(notifvars)
+
+        defer.returnValue(room_vars)
+
+    @defer.inlineCallbacks
+    def get_notif_vars(self, notif, user_id, notif_event, room_state):
+        results = yield self.store.get_events_around(
+            notif['room_id'], notif['event_id'],
+            before_limit=CONTEXT_BEFORE, after_limit=CONTEXT_AFTER
+        )
+
+        ret = {
+            "link": self.make_notif_link(notif),
+            "ts": notif['received_ts'],
+            "messages": [],
+        }
+
+        the_events = yield filter_events_for_client(
+            self.store, user_id, results["events_before"]
+        )
+        the_events.append(notif_event)
+
+        for event in the_events:
+            messagevars = self.get_message_vars(notif, event, room_state)
+            if messagevars is not None:
+                ret['messages'].append(messagevars)
+
+        defer.returnValue(ret)
+
+    def get_message_vars(self, notif, event, room_state):
+        if event.type != EventTypes.Message:
+            return None
+
+        sender_state_event = room_state[("m.room.member", event.sender)]
+        sender_name = name_from_member_event(sender_state_event)
+        sender_avatar_url = sender_state_event.content["avatar_url"]
+
+        # 'hash' for deterministically picking default images: use
+        # sender_hash % the number of default images to choose from
+        sender_hash = string_ordinal_total(event.sender)
+
+        ret = {
+            "msgtype": event.content["msgtype"],
+            "is_historical": event.event_id != notif['event_id'],
+            "id": event.event_id,
+            "ts": event.origin_server_ts,
+            "sender_name": sender_name,
+            "sender_avatar_url": sender_avatar_url,
+            "sender_hash": sender_hash,
+        }
+
+        if event.content["msgtype"] == "m.text":
+            self.add_text_message_vars(ret, event)
+        elif event.content["msgtype"] == "m.image":
+            self.add_image_message_vars(ret, event)
+
+        if "body" in event.content:
+            ret["body_text_plain"] = event.content["body"]
+
+        return ret
+
+    def add_text_message_vars(self, messagevars, event):
+        if "format" in event.content:
+            msgformat = event.content["format"]
+        else:
+            msgformat = None
+        messagevars["format"] = msgformat
+
+        if msgformat == "org.matrix.custom.html":
+            messagevars["body_text_html"] = safe_markup(event.content["formatted_body"])
+        else:
+            messagevars["body_text_html"] = safe_text(event.content["body"])
+
+        return messagevars
+
+    def add_image_message_vars(self, messagevars, event):
+        messagevars["image_url"] = event.content["url"]
+
+        return messagevars
+
+    def make_summary_text(self, notifs_by_room, state_by_room, notif_events, user_id):
+        if len(notifs_by_room) == 1:
+            # Only one room has new stuff
+            room_id = notifs_by_room.keys()[0]
+
+            # If the room has some kind of name, use it, but we don't
+            # want the generated-from-names one here otherwise we'll
+            # end up with, "new message from Bob in the Bob room"
+            room_name = calculate_room_name(
+                state_by_room[room_id], user_id, fallback_to_members=False
+            )
+
+            my_member_event = state_by_room[room_id][("m.room.member", user_id)]
+            if my_member_event.content["membership"] == "invite":
+                inviter_member_event = state_by_room[room_id][
+                    ("m.room.member", my_member_event.sender)
+                ]
+                inviter_name = name_from_member_event(inviter_member_event)
+
+                if room_name is None:
+                    return INVITE_FROM_PERSON % {
+                        "person": inviter_name,
+                        "app": self.app_name
+                    }
+                else:
+                    return INVITE_FROM_PERSON_TO_ROOM % {
+                        "person": inviter_name,
+                        "room": room_name,
+                        "app": self.app_name,
+                    }
+
+            sender_name = None
+            if len(notifs_by_room[room_id]) == 1:
+                # There is just the one notification, so give some detail
+                event = notif_events[notifs_by_room[room_id][0]["event_id"]]
+                if ("m.room.member", event.sender) in state_by_room[room_id]:
+                    state_event = state_by_room[room_id][("m.room.member", event.sender)]
+                    sender_name = name_from_member_event(state_event)
+
+                if sender_name is not None and room_name is not None:
+                    return MESSAGE_FROM_PERSON_IN_ROOM % {
+                        "person": sender_name,
+                        "room": room_name,
+                        "app": self.app_name,
+                    }
+                elif sender_name is not None:
+                    return MESSAGE_FROM_PERSON % {
+                        "person": sender_name,
+                        "app": self.app_name,
+                    }
+            else:
+                # There's more than one notification for this room, so just
+                # say there are several
+                if room_name is not None:
+                    return MESSAGES_IN_ROOM % {
+                        "room": room_name,
+                        "app": self.app_name,
+                    }
+                else:
+                    # If the room doesn't have a name, say who the messages
+                    # are from explicitly to avoid, "messages in the Bob room"
+                    sender_ids = list(set([
+                        notif_events[n['event_id']].sender
+                        for n in notifs_by_room[room_id]
+                    ]))
+
+                    return MESSAGES_FROM_PERSON % {
+                        "person": descriptor_from_member_events([
+                            state_by_room[room_id][("m.room.member", s)]
+                            for s in sender_ids
+                        ]),
+                        "app": self.app_name,
+                    }
+        else:
+            # Stuff's happened in multiple different rooms
+            return MESSAGES_IN_ROOMS % {
+                "app": self.app_name,
+            }
+
+    def make_room_link(self, room_id):
+        # need /beta for Universal Links to work on iOS
+        if self.app_name == "Vector":
+            return "https://vector.im/beta/#/room/%s" % (room_id,)
+        else:
+            return "https://matrix.to/#/room/%s" % (room_id,)
+
+    def make_notif_link(self, notif):
+        # need /beta for Universal Links to work on iOS
+        if self.app_name == "Vector":
+            return "https://vector.im/beta/#/room/%s/%s" % (
+                notif['room_id'], notif['event_id']
+            )
+        else:
+            return "https://matrix.to/#/room/%s/%s" % (
+                notif['room_id'], notif['event_id']
+            )
+
+    def make_unsubscribe_link(self):
+        # XXX: matrix.to
+        return "https://vector.im/#/settings"
+
+    def mxc_to_http_filter(self, value, width, height, resize_method="crop"):
+        if value[0:6] != "mxc://":
+            return ""
+
+        serverAndMediaId = value[6:]
+        if '#' in serverAndMediaId:
+            (serverAndMediaId, fragment) = serverAndMediaId.split('#', 1)
+            fragment = "#" + fragment
+
+        params = {
+            "width": width,
+            "height": height,
+            "method": resize_method,
+        }
+        return "%s_matrix/media/v1/thumbnail/%s?%s%s" % (
+            self.hs.config.public_baseurl,
+            serverAndMediaId,
+            urllib.urlencode(params),
+            fragment or "",
+        )
+
+
+def safe_markup(raw_html):
+    return jinja2.Markup(bleach.linkify(bleach.clean(
+        raw_html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRS,
+        # bleach master has this, but it isn't released yet
+        # protocols=ALLOWED_SCHEMES,
+        strip=True
+    )))
+
+
+def safe_text(raw_text):
+    """
+    Process text: treat it as HTML but escape any tags (ie. just escape the
+    HTML) then linkify it.
+    """
+    return jinja2.Markup(bleach.linkify(bleach.clean(
+        raw_text, tags=[], attributes={},
+        strip=False
+    )))
+
+
+def deduped_ordered_list(l):
+    seen = set()
+    ret = []
+    for item in l:
+        if item not in seen:
+            seen.add(item)
+            ret.append(item)
+    return ret
+
+
+def string_ordinal_total(s):
+    tot = 0
+    for c in s:
+        tot += ord(c)
+    return tot
+
+
+def format_ts_filter(value, format):
+    return time.strftime(format, time.localtime(value / 1000))
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 4960837504..e6c0806415 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -1,10 +1,37 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 from httppusher import HttpPusher
 
-PUSHER_TYPES = {
-    'http': HttpPusher
-}
+import logging
+logger = logging.getLogger(__name__)
 
 
 def create_pusher(hs, pusherdict):
+    logger.info("trying to create_pusher for %r", pusherdict)
+
+    PUSHER_TYPES = {
+        "http": HttpPusher,
+    }
+
+    logger.info("email enable notifs: %r", hs.config.email_enable_notifs)
+    if hs.config.email_enable_notifs:
+        from synapse.push.emailpusher import EmailPusher
+        PUSHER_TYPES["email"] = EmailPusher
+        logger.info("defined email pusher type")
+
     if pusherdict['kind'] in PUSHER_TYPES:
+        logger.info("found pusher")
         return PUSHER_TYPES[pusherdict['kind']](hs, pusherdict)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 6ef48d63f7..5853ec36a9 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -17,7 +17,6 @@
 from twisted.internet import defer
 
 import pusher
-from synapse.push import PusherConfigException
 from synapse.util.logcontext import preserve_fn
 from synapse.util.async import run_on_reactor
 
@@ -50,6 +49,7 @@ class PusherPool:
         # recreated, added and started: this means we have only one
         # code path adding pushers.
         pusher.create_pusher(self.hs, {
+            "id": None,
             "user_name": user_id,
             "kind": kind,
             "app_id": app_id,
@@ -185,8 +185,8 @@ class PusherPool:
         for pusherdict in pushers:
             try:
                 p = pusher.create_pusher(self.hs, pusherdict)
-            except PusherConfigException:
-                logger.exception("Couldn't start a pusher: caught PusherConfigException")
+            except:
+                logger.exception("Couldn't start a pusher: caught Exception")
                 continue
             if p:
                 appid_pushkey = "%s:%s" % (
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 0eb3d6c1de..e0a7a19777 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -44,6 +44,10 @@ CONDITIONAL_REQUIREMENTS = {
     "preview_url": {
         "netaddr>=0.7.18": ["netaddr"],
     },
+    "email.enable_notifs": {
+        "Jinja2>=2.8": ["Jinja2>=2.8"],
+        "bleach>=1.4.2": ["bleach>=1.4.2"],
+    },
 }
 
 
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index ff78c60f13..0e983ae7fa 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -159,6 +159,15 @@ class ReplicationResource(Resource):
 
         result = yield self.notifier.wait_for_replication(replicate, timeout)
 
+        for stream_name, stream_content in result.items():
+            logger.info(
+                "Replicating %d rows of %s from %s -> %s",
+                len(stream_content["rows"]),
+                stream_name,
+                request_streams.get(stream_name),
+                stream_content["position"],
+            )
+
         request.write(json.dumps(result, ensure_ascii=False))
         finish_request(request)
 
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 86f00b6ff5..635febb174 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -83,6 +83,7 @@ class SlavedEventStore(BaseSlavedStore):
         DataStore.get_push_action_users_in_range.__func__
     )
     get_event = DataStore.get_event.__func__
+    get_events = DataStore.get_events.__func__
     get_current_state = DataStore.get_current_state.__func__
     get_current_state_for_key = DataStore.get_current_state_for_key.__func__
     get_rooms_for_user_where_membership_is = (
@@ -104,6 +105,7 @@ class SlavedEventStore(BaseSlavedStore):
     _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
     _parse_events_txn = DataStore._parse_events_txn.__func__
     _get_events_txn = DataStore._get_events_txn.__func__
+    _get_event_txn = DataStore._get_event_txn.__func__
     _enqueue_events = DataStore._enqueue_events.__func__
     _do_fetch = DataStore._do_fetch.__func__
     _fetch_events_txn = DataStore._fetch_events_txn.__func__
@@ -144,12 +146,14 @@ class SlavedEventStore(BaseSlavedStore):
 
         stream = result.get("forward_ex_outliers")
         if stream:
+            self._stream_id_gen.advance(stream["position"])
             for row in stream["rows"]:
                 event_id = row[1]
                 self._invalidate_get_event_cache(event_id)
 
         stream = result.get("backward_ex_outliers")
         if stream:
+            self._backfill_id_gen.advance(-stream["position"])
             for row in stream["rows"]:
                 event_id = row[1]
                 self._invalidate_get_event_cache(event_id)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index ff8f69ddbf..1ecc02d94d 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -48,6 +48,7 @@ class RegisterRestServlet(RestServlet):
         super(RegisterRestServlet, self).__init__()
         self.hs = hs
         self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
         self.auth_handler = hs.get_handlers().auth_handler
         self.registration_handler = hs.get_handlers().registration_handler
         self.identity_handler = hs.get_handlers().identity_handler
@@ -214,6 +215,34 @@ class RegisterRestServlet(RestServlet):
                         threepid['validated_at'],
                     )
 
+                    # And we add an email pusher for them by default, but only
+                    # if email notifications are enabled (so people don't start
+                    # getting mail spam where they weren't before if email
+                    # notifs are set up on a home server)
+                    if (
+                        self.hs.config.email_enable_notifs and
+                        self.hs.config.email_notif_for_new_users
+                    ):
+                        # Pull the ID of the access token back out of the db
+                        # It would really make more sense for this to be passed
+                        # up when the access token is saved, but that's quite an
+                        # invasive change I'd rather do separately.
+                        user_tuple = yield self.store.get_user_by_access_token(
+                            token
+                        )
+
+                        yield self.hs.get_pusherpool().add_pusher(
+                            user_id=user_id,
+                            access_token=user_tuple["token_id"],
+                            kind="email",
+                            app_id="m.email",
+                            app_display_name="Email Notifications",
+                            device_display_name=threepid["address"],
+                            pushkey=threepid["address"],
+                            lang=None,  # We don't know a user's language here
+                            data={},
+                        )
+
             if 'bind_email' in params and params['bind_email']:
                 logger.info("bind_email specified: binding")
 
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 7a7fbf1e52..ec7e8d40d2 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -16,6 +16,8 @@
 from ._base import SQLBaseStore
 from twisted.internet import defer
 
+from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
+
 import ujson as json
 import logging
 
@@ -24,6 +26,7 @@ logger = logging.getLogger(__name__)
 
 class AccountDataStore(SQLBaseStore):
 
+    @cached()
     def get_account_data_for_user(self, user_id):
         """Get all the client account_data for a user.
 
@@ -60,6 +63,47 @@ class AccountDataStore(SQLBaseStore):
             "get_account_data_for_user", get_account_data_for_user_txn
         )
 
+    @cachedInlineCallbacks(num_args=2)
+    def get_global_account_data_by_type_for_user(self, data_type, user_id):
+        """
+        Returns:
+            Deferred: A dict
+        """
+        result = yield self._simple_select_one_onecol(
+            table="account_data",
+            keyvalues={
+                "user_id": user_id,
+                "account_data_type": data_type,
+            },
+            retcol="content",
+            desc="get_global_account_data_by_type_for_user",
+            allow_none=True,
+        )
+
+        if result:
+            defer.returnValue(json.loads(result))
+        else:
+            defer.returnValue(None)
+
+    @cachedList(cached_method_name="get_global_account_data_by_type_for_user",
+                num_args=2, list_name="user_ids", inlineCallbacks=True)
+    def get_global_account_data_by_type_for_users(self, data_type, user_ids):
+        rows = yield self._simple_select_many_batch(
+            table="account_data",
+            column="user_id",
+            iterable=user_ids,
+            keyvalues={
+                "account_data_type": data_type,
+            },
+            retcols=("user_id", "content",),
+            desc="get_global_account_data_by_type_for_users",
+        )
+
+        defer.returnValue({
+            row["user_id"]: json.loads(row["content"]) if row["content"] else None
+            for row in rows
+        })
+
     def get_account_data_for_room(self, user_id, room_id):
         """Get all the client account_data for a user for a room.
 
@@ -193,6 +237,7 @@ class AccountDataStore(SQLBaseStore):
                 self._account_data_stream_cache.entity_has_changed,
                 user_id, next_id,
             )
+            txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
             self._update_max_stream_id(txn, next_id)
 
         with self._account_data_id_gen.get_next() as next_id:
@@ -232,6 +277,11 @@ class AccountDataStore(SQLBaseStore):
                 self._account_data_stream_cache.entity_has_changed,
                 user_id, next_id,
             )
+            txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
+            txn.call_after(
+                self.get_global_account_data_by_type_for_user.invalidate,
+                (account_data_type, user_id,)
+            )
             self._update_max_stream_id(txn, next_id)
 
         with self._account_data_id_gen.get_next() as next_id:
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 312c0071f1..9705db5c47 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -118,16 +118,19 @@ class EventPushActionsStore(SQLBaseStore):
                                                   max_stream_ordering=None):
         def get_after_receipt(txn):
             sql = (
-                "SELECT ep.event_id, ep.stream_ordering, ep.actions "
-                "FROM event_push_actions AS ep, ("
-                "   SELECT room_id, user_id,"
-                "       max(topological_ordering) as topological_ordering,"
-                "       max(stream_ordering) as stream_ordering"
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
+                "e.received_ts "
+                "FROM ("
+                "   SELECT room_id, user_id, "
+                "       max(topological_ordering) as topological_ordering, "
+                "       max(stream_ordering) as stream_ordering "
                 "       FROM events"
                 "   NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'"
                 "   GROUP BY room_id, user_id"
-                ") AS rl "
-                "WHERE"
+                ") AS rl,"
+                " event_push_actions AS ep"
+                " INNER JOIN events AS e USING (room_id, event_id)"
+                " WHERE"
                 "   ep.room_id = rl.room_id"
                 "   AND ("
                 "       ep.topological_ordering > rl.topological_ordering"
@@ -153,11 +156,13 @@ class EventPushActionsStore(SQLBaseStore):
 
         def get_no_receipt(txn):
             sql = (
-                "SELECT ep.event_id, ep.stream_ordering, ep.actions "
-                "FROM event_push_actions AS ep "
-                "WHERE ep.room_id not in ("
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
+                " e.received_ts"
+                " FROM event_push_actions AS ep"
+                " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
+                " WHERE ep.room_id not in ("
                 "   SELECT room_id FROM events NATURAL JOIN receipts_linearized"
-                "   WHERE receipt_type = 'm.read' AND user_id = ? "
+                "   WHERE receipt_type = 'm.read' AND user_id = ?"
                 "   GROUP BY room_id"
                 ") AND ep.user_id = ? AND ep.stream_ordering > ?"
             )
@@ -175,12 +180,30 @@ class EventPushActionsStore(SQLBaseStore):
         defer.returnValue([
             {
                 "event_id": row[0],
-                "stream_ordering": row[1],
-                "actions": json.loads(row[2]),
+                "room_id": row[1],
+                "stream_ordering": row[2],
+                "actions": json.loads(row[3]),
+                "received_ts": row[4],
             } for row in after_read_receipt + no_read_receipt
         ])
 
     @defer.inlineCallbacks
+    def get_time_of_last_push_action_before(self, stream_ordering):
+        def f(txn):
+            sql = (
+                "SELECT e.received_ts"
+                " FROM event_push_actions AS ep"
+                " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
+                " WHERE ep.stream_ordering > ?"
+                " ORDER BY ep.stream_ordering ASC"
+                " LIMIT 1"
+            )
+            txn.execute(sql, (stream_ordering,))
+            return txn.fetchone()
+        result = yield self.runInteraction("get_time_of_last_push_action_before", f)
+        defer.returnValue(result[0] if result else None)
+
+    @defer.inlineCallbacks
     def get_latest_push_action_stream_ordering(self):
         def f(txn):
             txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 0307b2af3c..4655669ba0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,12 +19,14 @@ from twisted.internet import defer, reactor
 from synapse.events import FrozenEvent, USE_FROZEN_DICTS
 from synapse.events.utils import prune_event
 
+from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
 
 from canonicaljson import encode_canonical_json
-from collections import namedtuple
+from collections import deque, namedtuple
+
 
 import logging
 import math
@@ -50,28 +52,169 @@ EVENT_QUEUE_ITERATIONS = 3  # No. times we block waiting for requests for events
 EVENT_QUEUE_TIMEOUT_S = 0.1  # Timeout when waiting for requests for events
 
 
+class _EventPeristenceQueue(object):
+    """Queues up events so that they can be persisted in bulk with only one
+    concurrent transaction per room.
+    """
+
+    _EventPersistQueueItem = namedtuple("_EventPersistQueueItem", (
+        "events_and_contexts", "current_state", "backfilled", "deferred",
+    ))
+
+    def __init__(self):
+        self._event_persist_queues = {}
+        self._currently_persisting_rooms = set()
+
+    def add_to_queue(self, room_id, events_and_contexts, backfilled, current_state):
+        """Add events to the queue, with the given persist_event options.
+        """
+        queue = self._event_persist_queues.setdefault(room_id, deque())
+        if queue:
+            end_item = queue[-1]
+            if end_item.current_state or current_state:
+                # We perist events with current_state set to True one at a time
+                pass
+            if end_item.backfilled == backfilled:
+                end_item.events_and_contexts.extend(events_and_contexts)
+                return end_item.deferred.observe()
+
+        deferred = ObservableDeferred(defer.Deferred())
+
+        queue.append(self._EventPersistQueueItem(
+            events_and_contexts=events_and_contexts,
+            backfilled=backfilled,
+            current_state=current_state,
+            deferred=deferred,
+        ))
+
+        return deferred.observe()
+
+    def handle_queue(self, room_id, per_item_callback):
+        """Attempts to handle the queue for a room if not already being handled.
+
+        The given callback will be invoked with for each item in the queue,1
+        of type _EventPersistQueueItem. The per_item_callback will continuously
+        be called with new items, unless the queue becomnes empty. The return
+        value of the function will be given to the deferreds waiting on the item,
+        exceptions will be passed to the deferres as well.
+
+        This function should therefore be called whenever anything is added
+        to the queue.
+
+        If another callback is currently handling the queue then it will not be
+        invoked.
+        """
+
+        if room_id in self._currently_persisting_rooms:
+            return
+
+        self._currently_persisting_rooms.add(room_id)
+
+        @defer.inlineCallbacks
+        def handle_queue_loop():
+            try:
+                queue = self._get_drainining_queue(room_id)
+                for item in queue:
+                    try:
+                        ret = yield per_item_callback(item)
+                        item.deferred.callback(ret)
+                    except Exception as e:
+                        item.deferred.errback(e)
+            finally:
+                queue = self._event_persist_queues.pop(room_id, None)
+                if queue:
+                    self._event_persist_queues[room_id] = queue
+                self._currently_persisting_rooms.discard(room_id)
+
+        preserve_fn(handle_queue_loop)()
+
+    def _get_drainining_queue(self, room_id):
+        queue = self._event_persist_queues.setdefault(room_id, deque())
+
+        try:
+            while True:
+                yield queue.popleft()
+        except IndexError:
+            # Queue has been drained.
+            pass
+
+
 class EventsStore(SQLBaseStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
 
     def __init__(self, hs):
         super(EventsStore, self).__init__(hs)
+        self._clock = hs.get_clock()
         self.register_background_update_handler(
             self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
         )
 
-    @defer.inlineCallbacks
+        self._event_persist_queue = _EventPeristenceQueue()
+
     def persist_events(self, events_and_contexts, backfilled=False):
         """
         Write events to the database
         Args:
             events_and_contexts: list of tuples of (event, context)
             backfilled: ?
+        """
+        partitioned = {}
+        for event, ctx in events_and_contexts:
+            partitioned.setdefault(event.room_id, []).append((event, ctx))
+
+        deferreds = []
+        for room_id, evs_ctxs in partitioned.items():
+            d = self._event_persist_queue.add_to_queue(
+                room_id, evs_ctxs,
+                backfilled=backfilled,
+                current_state=None,
+            )
+            deferreds.append(d)
 
-        Returns: Tuple of stream_orderings where the first is the minimum and
-            last is the maximum stream ordering assigned to the events when
-            persisting.
+        for room_id in partitioned.keys():
+            self._maybe_start_persisting(room_id)
 
-        """
+        return defer.gatherResults(deferreds, consumeErrors=True)
+
+    @defer.inlineCallbacks
+    @log_function
+    def persist_event(self, event, context, current_state=None, backfilled=False):
+        deferred = self._event_persist_queue.add_to_queue(
+            event.room_id, [(event, context)],
+            backfilled=backfilled,
+            current_state=current_state,
+        )
+
+        self._maybe_start_persisting(event.room_id)
+
+        yield deferred
+
+        max_persisted_id = yield self._stream_id_gen.get_current_token()
+        defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
+
+    def _maybe_start_persisting(self, room_id):
+        @defer.inlineCallbacks
+        def persisting_queue(item):
+            if item.current_state:
+                for event, context in item.events_and_contexts:
+                    # There should only ever be one item in
+                    # events_and_contexts when current_state is
+                    # not None
+                    yield self._persist_event(
+                        event, context,
+                        current_state=item.current_state,
+                        backfilled=item.backfilled,
+                    )
+            else:
+                yield self._persist_events(
+                    item.events_and_contexts,
+                    backfilled=item.backfilled,
+                )
+
+        self._event_persist_queue.handle_queue(room_id, persisting_queue)
+
+    @defer.inlineCallbacks
+    def _persist_events(self, events_and_contexts, backfilled=False):
         if not events_and_contexts:
             return
 
@@ -118,8 +261,7 @@ class EventsStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     @log_function
-    def persist_event(self, event, context, current_state=None, backfilled=False):
-
+    def _persist_event(self, event, context, current_state=None, backfilled=False):
         try:
             with self._stream_id_gen.get_next() as stream_ordering:
                 with self._state_groups_id_gen.get_next() as state_group_id:
@@ -136,9 +278,6 @@ class EventsStore(SQLBaseStore):
         except _RollbackButIsFineException:
             pass
 
-        max_persisted_id = yield self._stream_id_gen.get_current_token()
-        defer.returnValue((stream_ordering, max_persisted_id))
-
     @defer.inlineCallbacks
     def get_event(self, event_id, check_redacted=True,
                   get_prev_content=False, allow_rejected=False,
@@ -427,6 +566,7 @@ class EventsStore(SQLBaseStore):
                     "outlier": event.internal_metadata.is_outlier(),
                     "content": encode_json(event.content).decode("UTF-8"),
                     "origin_server_ts": int(event.origin_server_ts),
+                    "received_ts": self._clock.time_msec(),
                 }
                 for event, _ in events_and_contexts
             ],
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 11feb3eb11..d9afd7ec87 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -233,3 +233,30 @@ class PusherStore(SQLBaseStore):
             {'failing_since': failing_since},
             desc="update_pusher_failing_since",
         )
+
+    @defer.inlineCallbacks
+    def get_throttle_params_by_room(self, pusher_id):
+        res = yield self._simple_select_list(
+            "pusher_throttle",
+            {"pusher": pusher_id},
+            ["room_id", "last_sent_ts", "throttle_ms"],
+            desc="get_throttle_params_by_room"
+        )
+
+        params_by_room = {}
+        for row in res:
+            params_by_room[row["room_id"]] = {
+                "last_sent_ts": row["last_sent_ts"],
+                "throttle_ms": row["throttle_ms"]
+            }
+
+        defer.returnValue(params_by_room)
+
+    @defer.inlineCallbacks
+    def set_throttle_params(self, pusher_id, room_id, params):
+        yield self._simple_upsert(
+            "pusher_throttle",
+            {"pusher": pusher_id, "room_id": room_id},
+            params,
+            desc="set_throttle_params"
+        )
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index cd1b611a0c..94be820f86 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -100,7 +100,7 @@ class ReceiptsStore(SQLBaseStore):
 
         defer.returnValue([ev for res in results.values() for ev in res])
 
-    @cachedInlineCallbacks(num_args=3, max_entries=5000)
+    @cachedInlineCallbacks(num_args=3, max_entries=5000, lru=True, tree=True)
     def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
         """Get receipts for a single room for sending to clients.
 
@@ -232,7 +232,7 @@ class ReceiptsStore(SQLBaseStore):
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
-        txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
+        txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
 
         txn.call_after(
             self._receipts_stream_cache.entity_has_changed,
@@ -375,7 +375,7 @@ class ReceiptsStore(SQLBaseStore):
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
-        txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
+        txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
 
         self._simple_delete_txn(
             txn,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 7af0cae6a5..bda84a744a 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -101,6 +101,7 @@ class RegistrationStore(SQLBaseStore):
             make_guest,
             appservice_id
         )
+        self.get_user_by_id.invalidate((user_id,))
         self.is_guest.invalidate((user_id,))
 
     def _register(
@@ -156,6 +157,7 @@ class RegistrationStore(SQLBaseStore):
                 (next_id, user_id, token,)
             )
 
+    @cached()
     def get_user_by_id(self, user_id):
         return self._simple_select_one(
             table="users",
@@ -193,6 +195,7 @@ class RegistrationStore(SQLBaseStore):
         }, {
             'password_hash': password_hash
         })
+        self.get_user_by_id.invalidate((user_id,))
 
     @defer.inlineCallbacks
     def user_delete_access_tokens(self, user_id, except_token_ids=[]):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 08a54cbdd1..9d6bfd5245 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -21,7 +21,7 @@ from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
 from synapse.api.constants import Membership
-from synapse.types import UserID
+from synapse.types import get_domian_from_id
 
 import logging
 
@@ -273,10 +273,7 @@ class RoomMemberStore(SQLBaseStore):
             room_id, membership=Membership.JOIN
         )
 
-        joined_domains = set(
-            UserID.from_string(r["user_id"]).domain
-            for r in rows
-        )
+        joined_domains = set(get_domian_from_id(r["user_id"]) for r in rows)
 
         return joined_domains
 
diff --git a/synapse/storage/schema/delta/32/events.sql b/synapse/storage/schema/delta/32/events.sql
new file mode 100644
index 0000000000..1dd0f9e170
--- /dev/null
+++ b/synapse/storage/schema/delta/32/events.sql
@@ -0,0 +1,16 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE events ADD COLUMN received_ts BIGINT;
diff --git a/synapse/storage/schema/delta/32/pusher_throttle.sql b/synapse/storage/schema/delta/32/pusher_throttle.sql
new file mode 100644
index 0000000000..d86d30c13c
--- /dev/null
+++ b/synapse/storage/schema/delta/32/pusher_throttle.sql
@@ -0,0 +1,23 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE pusher_throttle(
+    pusher BIGINT NOT NULL,
+    room_id TEXT NOT NULL,
+    last_sent_ts BIGINT,
+    throttle_ms BIGINT,
+    PRIMARY KEY (pusher, room_id)
+);
diff --git a/synapse/storage/schema/delta/32/remove_indices.sql b/synapse/storage/schema/delta/32/remove_indices.sql
new file mode 100644
index 0000000000..f859be46a6
--- /dev/null
+++ b/synapse/storage/schema/delta/32/remove_indices.sql
@@ -0,0 +1,38 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- The following indices are redundant, other indices are equivalent or
+-- supersets
+DROP INDEX IF EXISTS events_room_id; -- Prefix of events_room_stream
+DROP INDEX IF EXISTS events_order; -- Prefix of events_order_topo_stream_room
+DROP INDEX IF EXISTS events_topological_ordering; -- Prefix of events_order_topo_stream_room
+DROP INDEX IF EXISTS events_stream_ordering; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS state_groups_id; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS event_to_state_groups_id; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS event_push_actions_room_id_event_id_user_id_profile_tag; -- Duplicate of UNIQUE CONSTRAINT
+
+DROP INDEX IF EXISTS event_destinations_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS st_extrem_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_content_hashes_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_signatures_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_edge_hashes_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS redactions_event_id; -- Duplicate of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS room_hosts_room_id; -- Prefix of UNIQUE CONSTRAINT
+
+-- The following indices were unused
+DROP INDEX IF EXISTS remote_media_cache_thumbnails_media_id;
+DROP INDEX IF EXISTS evauth_edges_auth_id;
+DROP INDEX IF EXISTS presence_stream_state;
diff --git a/synapse/types.py b/synapse/types.py
index 5b166835bd..42fd9c7204 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -21,6 +21,10 @@ from collections import namedtuple
 Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"])
 
 
+def get_domian_from_id(string):
+    return string.split(":", 1)[1]
+
+
 class DomainSpecificString(
         namedtuple("DomainSpecificString", ("localpart", "domain"))
 ):
diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py
new file mode 100644
index 0000000000..3efa8a8206
--- /dev/null
+++ b/synapse/util/presentable_names.py
@@ -0,0 +1,159 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+
+# intentionally looser than what aliases we allow to be registered since
+# other HSes may allow aliases that we would not
+ALIAS_RE = re.compile(r"^#.*:.+$")
+
+ALL_ALONE = "Empty Room"
+
+
+def calculate_room_name(room_state, user_id, fallback_to_members=True):
+    """
+    Works out a user-facing name for the given room as per Matrix
+    spec recommendations.
+    Does not yet support internationalisation.
+    Args:
+        room_state: Dictionary of the room's state
+        user_id: The ID of the user to whom the room name is being presented
+        fallback_to_members: If False, return None instead of generating a name
+                             based on the room's members if the room has no
+                             title or aliases.
+
+    Returns:
+        (string or None) A human readable name for the room.
+    """
+    # does it have a name?
+    if ("m.room.name", "") in room_state:
+        m_room_name = room_state[("m.room.name", "")]
+        if m_room_name.content and m_room_name.content["name"]:
+            return m_room_name.content["name"]
+
+    # does it have a canonical alias?
+    if ("m.room.canonical_alias", "") in room_state:
+        canon_alias = room_state[("m.room.canonical_alias", "")]
+        if (
+            canon_alias.content and canon_alias.content["alias"] and
+            _looks_like_an_alias(canon_alias.content["alias"])
+        ):
+            return canon_alias.content["alias"]
+
+    # at this point we're going to need to search the state by all state keys
+    # for an event type, so rearrange the data structure
+    room_state_bytype = _state_as_two_level_dict(room_state)
+
+    # right then, any aliases at all?
+    if "m.room.aliases" in room_state_bytype:
+        m_room_aliases = room_state_bytype["m.room.aliases"]
+        if len(m_room_aliases.values()) > 0:
+            first_alias_event = m_room_aliases.values()[0]
+            if first_alias_event.content and first_alias_event.content["aliases"]:
+                the_aliases = first_alias_event.content["aliases"]
+                if len(the_aliases) > 0 and _looks_like_an_alias(the_aliases[0]):
+                    return the_aliases[0]
+
+    if not fallback_to_members:
+        return None
+
+    my_member_event = None
+    if ("m.room.member", user_id) in room_state:
+        my_member_event = room_state[("m.room.member", user_id)]
+
+    if (
+        my_member_event is not None and
+        my_member_event.content['membership'] == "invite"
+    ):
+        if ("m.room.member", my_member_event.sender) in room_state:
+            inviter_member_event = room_state[("m.room.member", my_member_event.sender)]
+            return "Invite from %s" % (name_from_member_event(inviter_member_event),)
+        else:
+            return "Room Invite"
+
+    # we're going to have to generate a name based on who's in the room,
+    # so find out who is in the room that isn't the user.
+    if "m.room.member" in room_state_bytype:
+        all_members = [
+            ev for ev in room_state_bytype["m.room.member"].values()
+            if ev.content['membership'] == "join" or ev.content['membership'] == "invite"
+        ]
+        # Sort the member events oldest-first so the we name people in the
+        # order the joined (it should at least be deterministic rather than
+        # dictionary iteration order)
+        all_members.sort(key=lambda e: e.origin_server_ts)
+        other_members = [m for m in all_members if m.state_key != user_id]
+    else:
+        other_members = []
+        all_members = []
+
+    if len(other_members) == 0:
+        if len(all_members) == 1:
+            # self-chat, peeked room with 1 participant,
+            # or inbound invite, or outbound 3PID invite.
+            if all_members[0].sender == user_id:
+                if "m.room.third_party_invite" in room_state_bytype:
+                    third_party_invites = room_state_bytype["m.room.third_party_invite"]
+                    if len(third_party_invites) > 0:
+                        # technically third party invite events are not member
+                        # events, but they are close enough
+                        return "Inviting %s" (
+                            descriptor_from_member_events(third_party_invites)
+                        )
+                    else:
+                        return ALL_ALONE
+            else:
+                return name_from_member_event(all_members[0])
+        else:
+            return ALL_ALONE
+    else:
+        return descriptor_from_member_events(other_members)
+
+
+def descriptor_from_member_events(member_events):
+    if len(member_events) == 0:
+        return "nobody"
+    elif len(member_events) == 1:
+        return name_from_member_event(member_events[0])
+    elif len(member_events) == 2:
+        return "%s and %s" % (
+            name_from_member_event(member_events[0]),
+            name_from_member_event(member_events[1]),
+        )
+    else:
+        return "%s and %d others" % (
+            name_from_member_event(member_events[0]),
+            len(member_events) - 1,
+        )
+
+
+def name_from_member_event(member_event):
+    if (
+        member_event.content and "displayname" in member_event.content and
+        member_event.content["displayname"]
+    ):
+        return member_event.content["displayname"]
+    return member_event.state_key
+
+
+def _state_as_two_level_dict(state):
+    ret = {}
+    for k, v in state.items():
+        ret.setdefault(k[0], {})[k[1]] = v
+    return ret
+
+
+def _looks_like_an_alias(string):
+    return ALIAS_RE.match(string) is not None
diff --git a/synapse/visibility.py b/synapse/visibility.py
new file mode 100644
index 0000000000..948ad51772
--- /dev/null
+++ b/synapse/visibility.py
@@ -0,0 +1,210 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 - 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.constants import Membership, EventTypes
+
+from synapse.util.logcontext import preserve_fn
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+VISIBILITY_PRIORITY = (
+    "world_readable",
+    "shared",
+    "invited",
+    "joined",
+)
+
+
+MEMBERSHIP_PRIORITY = (
+    Membership.JOIN,
+    Membership.INVITE,
+    Membership.KNOCK,
+    Membership.LEAVE,
+    Membership.BAN,
+)
+
+
+@defer.inlineCallbacks
+def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
+    """ Returns dict of user_id -> list of events that user is allowed to
+    see.
+
+    Args:
+        user_tuples (str, bool): (user id, is_peeking) for each user to be
+            checked. is_peeking should be true if:
+            * the user is not currently a member of the room, and:
+            * the user has not been a member of the room since the
+            given events
+        events ([synapse.events.EventBase]): list of events to filter
+    """
+    forgotten = yield defer.gatherResults([
+        preserve_fn(store.who_forgot_in_room)(
+            room_id,
+        )
+        for room_id in frozenset(e.room_id for e in events)
+    ], consumeErrors=True)
+
+    # Set of membership event_ids that have been forgotten
+    event_id_forgotten = frozenset(
+        row["event_id"] for rows in forgotten for row in rows
+    )
+
+    ignore_dict_content = yield store.get_global_account_data_by_type_for_users(
+        "m.ignored_user_list", user_ids=[user_id for user_id, _ in user_tuples]
+    )
+
+    # FIXME: This will explode if people upload something incorrect.
+    ignore_dict = {
+        user_id: frozenset(
+            content.get("ignored_users", {}).keys() if content else []
+        )
+        for user_id, content in ignore_dict_content.items()
+    }
+
+    def allowed(event, user_id, is_peeking, ignore_list):
+        """
+        Args:
+            event (synapse.events.EventBase): event to check
+            user_id (str)
+            is_peeking (bool)
+            ignore_list (list): list of users to ignore
+        """
+        if not event.is_state() and event.sender in ignore_list:
+            return False
+
+        state = event_id_to_state[event.event_id]
+
+        # get the room_visibility at the time of the event.
+        visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
+        if visibility_event:
+            visibility = visibility_event.content.get("history_visibility", "shared")
+        else:
+            visibility = "shared"
+
+        if visibility not in VISIBILITY_PRIORITY:
+            visibility = "shared"
+
+        # if it was world_readable, it's easy: everyone can read it
+        if visibility == "world_readable":
+            return True
+
+        # Always allow history visibility events on boundaries. This is done
+        # by setting the effective visibility to the least restrictive
+        # of the old vs new.
+        if event.type == EventTypes.RoomHistoryVisibility:
+            prev_content = event.unsigned.get("prev_content", {})
+            prev_visibility = prev_content.get("history_visibility", None)
+
+            if prev_visibility not in VISIBILITY_PRIORITY:
+                prev_visibility = "shared"
+
+            new_priority = VISIBILITY_PRIORITY.index(visibility)
+            old_priority = VISIBILITY_PRIORITY.index(prev_visibility)
+            if old_priority < new_priority:
+                visibility = prev_visibility
+
+        # likewise, if the event is the user's own membership event, use
+        # the 'most joined' membership
+        membership = None
+        if event.type == EventTypes.Member and event.state_key == user_id:
+            membership = event.content.get("membership", None)
+            if membership not in MEMBERSHIP_PRIORITY:
+                membership = "leave"
+
+            prev_content = event.unsigned.get("prev_content", {})
+            prev_membership = prev_content.get("membership", None)
+            if prev_membership not in MEMBERSHIP_PRIORITY:
+                prev_membership = "leave"
+
+            new_priority = MEMBERSHIP_PRIORITY.index(membership)
+            old_priority = MEMBERSHIP_PRIORITY.index(prev_membership)
+            if old_priority < new_priority:
+                membership = prev_membership
+
+        # otherwise, get the user's membership at the time of the event.
+        if membership is None:
+            membership_event = state.get((EventTypes.Member, user_id), None)
+            if membership_event:
+                if membership_event.event_id not in event_id_forgotten:
+                    membership = membership_event.membership
+
+        # if the user was a member of the room at the time of the event,
+        # they can see it.
+        if membership == Membership.JOIN:
+            return True
+
+        if visibility == "joined":
+            # we weren't a member at the time of the event, so we can't
+            # see this event.
+            return False
+
+        elif visibility == "invited":
+            # user can also see the event if they were *invited* at the time
+            # of the event.
+            return membership == Membership.INVITE
+
+        else:
+            # visibility is shared: user can also see the event if they have
+            # become a member since the event
+            #
+            # XXX: if the user has subsequently joined and then left again,
+            # ideally we would share history up to the point they left. But
+            # we don't know when they left.
+            return not is_peeking
+
+    defer.returnValue({
+        user_id: [
+            event
+            for event in events
+            if allowed(event, user_id, is_peeking, ignore_dict.get(user_id, []))
+        ]
+        for user_id, is_peeking in user_tuples
+    })
+
+
+@defer.inlineCallbacks
+def filter_events_for_client(store, user_id, events, is_peeking=False):
+    """
+    Check which events a user is allowed to see
+
+    Args:
+        user_id(str): user id to be checked
+        events([synapse.events.EventBase]): list of events to be checked
+        is_peeking(bool): should be True if:
+          * the user is not currently a member of the room, and:
+          * the user has not been a member of the room since the given
+            events
+
+    Returns:
+        [synapse.events.EventBase]
+    """
+    types = (
+        (EventTypes.RoomHistoryVisibility, ""),
+        (EventTypes.Member, user_id),
+    )
+    event_id_to_state = yield store.get_state_for_events(
+        frozenset(e.event_id for e in events),
+        types=types
+    )
+    res = yield filter_events_for_clients(
+        store, [(user_id, is_peeking)], events, event_id_to_state
+    )
+    defer.returnValue(res.get(user_id, []))