summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/federation_reader.py206
-rw-r--r--synapse/federation/federation_server.py70
-rw-r--r--synapse/handlers/federation.py42
-rw-r--r--synapse/handlers/register.py7
-rw-r--r--synapse/handlers/room.py4
-rw-r--r--synapse/handlers/sync.py2
-rw-r--r--synapse/push/emailpusher.py17
-rw-r--r--synapse/push/httppusher.py10
-rw-r--r--synapse/replication/slave/storage/directory.py23
-rw-r--r--synapse/replication/slave/storage/events.py16
-rw-r--r--synapse/replication/slave/storage/keys.py33
-rw-r--r--synapse/replication/slave/storage/room.py21
-rw-r--r--synapse/replication/slave/storage/transactions.py30
-rw-r--r--synapse/rest/client/v2_alpha/register.py16
-rw-r--r--synapse/storage/event_push_actions.py203
-rw-r--r--synapse/storage/keys.py38
-rw-r--r--synapse/util/caches/response_cache.py13
-rw-r--r--synapse/util/retryutils.py2
18 files changed, 643 insertions, 110 deletions
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
new file mode 100644
index 0000000000..58d425f9ac
--- /dev/null
+++ b/synapse/app/federation_reader.py
@@ -0,0 +1,206 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.room import RoomStore
+from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.directory import DirectoryStore
+from synapse.server import HomeServer
+from synapse.storage.engines import create_engine
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from synapse.api.urls import FEDERATION_PREFIX
+from synapse.federation.transport.server import TransportLayerServer
+from synapse.crypto import context_factory
+
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+
+logger = logging.getLogger("synapse.app.federation_reader")
+
+
+class FederationReaderSlavedStore(
+    SlavedEventStore,
+    SlavedKeyStore,
+    RoomStore,
+    DirectoryStore,
+    TransactionStore,
+    BaseSlavedStore,
+):
+    pass
+
+
+class FederationReaderServer(HomeServer):
+    def get_db_conn(self, run_new_connection=True):
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_address = listener_config.get("bind_address", "")
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+                elif name == "federation":
+                    resources.update({
+                        FEDERATION_PREFIX: TransportLayerServer(self),
+                    })
+
+        root_resource = create_resource_tree(resources, Resource())
+        reactor.listenTCP(
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+            ),
+            interface=bind_address
+        )
+        logger.info("Synapse federation reader now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                reactor.listenTCP(
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    ),
+                    interface=listener.get("bind_address", '127.0.0.1')
+                )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+    @defer.inlineCallbacks
+    def replicate(self):
+        http_client = self.get_simple_http_client()
+        store = self.get_datastore()
+        replication_url = self.config.worker_replication_url
+
+        while True:
+            try:
+                args = store.stream_positions()
+                args["timeout"] = 30000
+                result = yield http_client.get_json(replication_url, args=args)
+                yield store.process_replication(result)
+            except:
+                logger.exception("Error replicating from %r", replication_url)
+                yield sleep(5)
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse federation reader", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.federation_reader"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
+
+    database_engine = create_engine(config.database_config)
+
+    tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+    ss = FederationReaderServer(
+        config.server_name,
+        db_config=config.database_config,
+        tls_server_context_factory=tls_server_context_factory,
+        config=config,
+        version_string=get_version_string("Synapse", synapse),
+        database_engine=database_engine,
+    )
+
+    ss.setup()
+    ss.get_handlers()
+    ss.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
+
+    def start():
+        ss.get_datastore().start_profiling()
+        ss.replicate()
+
+    reactor.callWhenRunning(start)
+
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-federation-reader",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        start(sys.argv[1:])
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 85f5e752fe..612d274bdb 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -21,10 +21,11 @@ from .units import Transaction, Edu
 
 from synapse.util.async import Linearizer
 from synapse.util.logutils import log_function
+from synapse.util.caches.response_cache import ResponseCache
 from synapse.events import FrozenEvent
 import synapse.metrics
 
-from synapse.api.errors import FederationError, SynapseError
+from synapse.api.errors import AuthError, FederationError, SynapseError
 
 from synapse.crypto.event_signing import compute_event_signature
 
@@ -48,9 +49,15 @@ class FederationServer(FederationBase):
     def __init__(self, hs):
         super(FederationServer, self).__init__(hs)
 
+        self.auth = hs.get_auth()
+
         self._room_pdu_linearizer = Linearizer()
         self._server_linearizer = Linearizer()
 
+        # We cache responses to state queries, as they take a while and often
+        # come in waves.
+        self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
+
     def set_handler(self, handler):
         """Sets the handler that the replication layer will use to communicate
         receipt of new PDUs from other home servers. The required methods are
@@ -188,33 +195,50 @@ class FederationServer(FederationBase):
     @defer.inlineCallbacks
     @log_function
     def on_context_state_request(self, origin, room_id, event_id):
-        with (yield self._server_linearizer.queue((origin, room_id))):
-            if event_id:
-                pdus = yield self.handler.get_state_for_pdu(
-                    origin, room_id, event_id,
-                )
-                auth_chain = yield self.store.get_auth_chain(
-                    [pdu.event_id for pdu in pdus]
+        if not event_id:
+            raise NotImplementedError("Specify an event")
+
+        in_room = yield self.auth.check_host_in_room(room_id, origin)
+        if not in_room:
+            raise AuthError(403, "Host not in room.")
+
+        result = self._state_resp_cache.get((room_id, event_id))
+        if not result:
+            with (yield self._server_linearizer.queue((origin, room_id))):
+                resp = yield self._state_resp_cache.set(
+                    (room_id, event_id),
+                    self._on_context_state_request_compute(room_id, event_id)
                 )
+        else:
+            resp = yield result
 
-                for event in auth_chain:
-                    # We sign these again because there was a bug where we
-                    # incorrectly signed things the first time round
-                    if self.hs.is_mine_id(event.event_id):
-                        event.signatures.update(
-                            compute_event_signature(
-                                event,
-                                self.hs.hostname,
-                                self.hs.config.signing_key[0]
-                            )
-                        )
-            else:
-                raise NotImplementedError("Specify an event")
+        defer.returnValue((200, resp))
 
-        defer.returnValue((200, {
+    @defer.inlineCallbacks
+    def _on_context_state_request_compute(self, room_id, event_id):
+        pdus = yield self.handler.get_state_for_pdu(
+            room_id, event_id,
+        )
+        auth_chain = yield self.store.get_auth_chain(
+            [pdu.event_id for pdu in pdus]
+        )
+
+        for event in auth_chain:
+            # We sign these again because there was a bug where we
+            # incorrectly signed things the first time round
+            if self.hs.is_mine_id(event.event_id):
+                event.signatures.update(
+                    compute_event_signature(
+                        event,
+                        self.hs.hostname,
+                        self.hs.config.signing_key[0]
+                    )
+                )
+
+        defer.returnValue({
             "pdus": [pdu.get_pdu_json() for pdu in pdus],
             "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
-        }))
+        })
 
     @defer.inlineCallbacks
     @log_function
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3f138daf17..187bfc4315 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -124,7 +124,7 @@ class FederationHandler(BaseHandler):
 
             try:
                 event_stream_id, max_stream_id = yield self._persist_auth_tree(
-                    auth_chain, state, event
+                    origin, auth_chain, state, event
                 )
             except AuthError as e:
                 raise FederationError(
@@ -637,7 +637,7 @@ class FederationHandler(BaseHandler):
                 pass
 
             event_stream_id, max_stream_id = yield self._persist_auth_tree(
-                auth_chain, state, event
+                origin, auth_chain, state, event
             )
 
             with PreserveLoggingContext():
@@ -991,14 +991,9 @@ class FederationHandler(BaseHandler):
         defer.returnValue(None)
 
     @defer.inlineCallbacks
-    def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True):
+    def get_state_for_pdu(self, room_id, event_id):
         yield run_on_reactor()
 
-        if do_auth:
-            in_room = yield self.auth.check_host_in_room(room_id, origin)
-            if not in_room:
-                raise AuthError(403, "Host not in room.")
-
         state_groups = yield self.store.get_state_groups(
             room_id, [event_id]
         )
@@ -1155,11 +1150,19 @@ class FederationHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def _persist_auth_tree(self, auth_events, state, event):
+    def _persist_auth_tree(self, origin, auth_events, state, event):
         """Checks the auth chain is valid (and passes auth checks) for the
         state and event. Then persists the auth chain and state atomically.
         Persists the event seperately.
 
+        Will attempt to fetch missing auth events.
+
+        Args:
+            origin (str): Where the events came from
+            auth_events (list)
+            state (list)
+            event (Event)
+
         Returns:
             2-tuple of (event_stream_id, max_stream_id) from the persist_event
             call for `event`
@@ -1172,7 +1175,7 @@ class FederationHandler(BaseHandler):
 
         event_map = {
             e.event_id: e
-            for e in auth_events
+            for e in itertools.chain(auth_events, state, [event])
         }
 
         create_event = None
@@ -1181,10 +1184,29 @@ class FederationHandler(BaseHandler):
                 create_event = e
                 break
 
+        missing_auth_events = set()
+        for e in itertools.chain(auth_events, state, [event]):
+            for e_id, _ in e.auth_events:
+                if e_id not in event_map:
+                    missing_auth_events.add(e_id)
+
+        for e_id in missing_auth_events:
+            m_ev = yield self.replication_layer.get_pdu(
+                [origin],
+                e_id,
+                outlier=True,
+                timeout=10000,
+            )
+            if m_ev and m_ev.event_id == e_id:
+                event_map[e_id] = m_ev
+            else:
+                logger.info("Failed to find auth event %r", e_id)
+
         for e in itertools.chain(auth_events, state, [event]):
             auth_for_e = {
                 (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
                 for e_id, _ in e.auth_events
+                if e_id in event_map
             }
             if create_event:
                 auth_for_e[(EventTypes.Create, "")] = create_event
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index b9b5880d64..dd75c4fecf 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -53,6 +53,13 @@ class RegistrationHandler(BaseHandler):
                 Codes.INVALID_USERNAME
             )
 
+        if localpart[0] == '_':
+            raise SynapseError(
+                400,
+                "User ID may not begin with _",
+                Codes.INVALID_USERNAME
+            )
+
         user = UserID(localpart, self.hs.hostname)
         user_id = user.to_string()
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index ae44c7a556..bf6b1c1535 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -345,8 +345,8 @@ class RoomCreationHandler(BaseHandler):
 class RoomListHandler(BaseHandler):
     def __init__(self, hs):
         super(RoomListHandler, self).__init__(hs)
-        self.response_cache = ResponseCache()
-        self.remote_list_request_cache = ResponseCache()
+        self.response_cache = ResponseCache(hs)
+        self.remote_list_request_cache = ResponseCache(hs)
         self.remote_list_cache = {}
         self.fetch_looping_call = hs.get_clock().looping_call(
             self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index be26a491ff..0ee4ebe504 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -138,7 +138,7 @@ class SyncHandler(object):
         self.presence_handler = hs.get_presence_handler()
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
-        self.response_cache = ResponseCache()
+        self.response_cache = ResponseCache(hs)
 
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                full_state=False):
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 12a3ec7fd8..6600c9cd55 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from twisted.internet import defer, reactor
+from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
 import logging
 
@@ -92,7 +93,11 @@ class EmailPusher(object):
 
     def on_stop(self):
         if self.timed_call:
-            self.timed_call.cancel()
+            try:
+                self.timed_call.cancel()
+            except (AlreadyCalled, AlreadyCancelled):
+                pass
+            self.timed_call = None
 
     @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
@@ -140,9 +145,8 @@ class EmailPusher(object):
         being run.
         """
         start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
-        unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
-            self.user_id, start, self.max_stream_ordering
-        )
+        fn = self.store.get_unread_push_actions_for_user_in_range_for_email
+        unprocessed = yield fn(self.user_id, start, self.max_stream_ordering)
 
         soonest_due_at = None
 
@@ -190,7 +194,10 @@ class EmailPusher(object):
                     soonest_due_at = should_notify_at
 
                 if self.timed_call is not None:
-                    self.timed_call.cancel()
+                    try:
+                        self.timed_call.cancel()
+                    except (AlreadyCalled, AlreadyCancelled):
+                        pass
                     self.timed_call = None
 
         if soonest_due_at is not None:
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 2acc6cc214..feedb075e2 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -16,6 +16,7 @@
 from synapse.push import PusherConfigException
 
 from twisted.internet import defer, reactor
+from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
 import logging
 import push_rule_evaluator
@@ -109,7 +110,11 @@ class HttpPusher(object):
 
     def on_stop(self):
         if self.timed_call:
-            self.timed_call.cancel()
+            try:
+                self.timed_call.cancel()
+            except (AlreadyCalled, AlreadyCancelled):
+                pass
+            self.timed_call = None
 
     @defer.inlineCallbacks
     def _process(self):
@@ -141,7 +146,8 @@ class HttpPusher(object):
         run once per pusher.
         """
 
-        unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
+        fn = self.store.get_unread_push_actions_for_user_in_range_for_http
+        unprocessed = yield fn(
             self.user_id, self.last_stream_ordering, self.max_stream_ordering
         )
 
diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py
new file mode 100644
index 0000000000..5fbe3a303a
--- /dev/null
+++ b/synapse/replication/slave/storage/directory.py
@@ -0,0 +1,23 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage.directory import DirectoryStore
+
+
+class DirectoryStore(BaseSlavedStore):
+    get_aliases_for_room = DirectoryStore.__dict__[
+        "get_aliases_for_room"
+    ].orig
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 369d839464..f4f31f2d27 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -93,8 +93,11 @@ class SlavedEventStore(BaseSlavedStore):
         StreamStore.__dict__["get_recent_event_ids_for_room"]
     )
 
-    get_unread_push_actions_for_user_in_range = (
-        DataStore.get_unread_push_actions_for_user_in_range.__func__
+    get_unread_push_actions_for_user_in_range_for_http = (
+        DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
+    )
+    get_unread_push_actions_for_user_in_range_for_email = (
+        DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__
     )
     get_push_action_users_in_range = (
         DataStore.get_push_action_users_in_range.__func__
@@ -142,6 +145,15 @@ class SlavedEventStore(BaseSlavedStore):
     _get_events_around_txn = DataStore._get_events_around_txn.__func__
     _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
 
+    get_backfill_events = DataStore.get_backfill_events.__func__
+    _get_backfill_events = DataStore._get_backfill_events.__func__
+    get_missing_events = DataStore.get_missing_events.__func__
+    _get_missing_events = DataStore._get_missing_events.__func__
+
+    get_auth_chain = DataStore.get_auth_chain.__func__
+    get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__
+    _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__
+
     def stream_positions(self):
         result = super(SlavedEventStore, self).stream_positions()
         result["events"] = self._stream_id_gen.get_current_token()
diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py
new file mode 100644
index 0000000000..dd2ae49e48
--- /dev/null
+++ b/synapse/replication/slave/storage/keys.py
@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.storage.keys import KeyStore
+
+
+class SlavedKeyStore(BaseSlavedStore):
+    _get_server_verify_key = KeyStore.__dict__[
+        "_get_server_verify_key"
+    ]
+
+    get_server_verify_keys = DataStore.get_server_verify_keys.__func__
+    store_server_verify_key = DataStore.store_server_verify_key.__func__
+
+    get_server_certificate = DataStore.get_server_certificate.__func__
+    store_server_certificate = DataStore.store_server_certificate.__func__
+
+    get_server_keys_json = DataStore.get_server_keys_json.__func__
+    store_server_keys_json = DataStore.store_server_keys_json.__func__
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
new file mode 100644
index 0000000000..d5bb0f98ea
--- /dev/null
+++ b/synapse/replication/slave/storage/room.py
@@ -0,0 +1,21 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+
+
+class RoomStore(BaseSlavedStore):
+    get_public_room_ids = DataStore.get_public_room_ids.__func__
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
new file mode 100644
index 0000000000..6f2ba98af5
--- /dev/null
+++ b/synapse/replication/slave/storage/transactions.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+from ._base import BaseSlavedStore
+from synapse.storage import DataStore
+from synapse.storage.transactions import TransactionStore
+
+
+class TransactionStore(BaseSlavedStore):
+    get_destination_retry_timings = TransactionStore.__dict__[
+        "get_destination_retry_timings"
+    ].orig
+    _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
+
+    # For now, don't record the destination rety timings
+    def set_destination_retry_timings(*args, **kwargs):
+        return defer.succeed(None)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 9f599ea8bb..943f5676a3 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -196,12 +196,12 @@ class RegisterRestServlet(RestServlet):
                 [LoginType.EMAIL_IDENTITY]
             ]
 
-        authed, result, params, session_id = yield self.auth_handler.check_auth(
+        authed, auth_result, params, session_id = yield self.auth_handler.check_auth(
             flows, body, self.hs.get_ip_from_request(request)
         )
 
         if not authed:
-            defer.returnValue((401, result))
+            defer.returnValue((401, auth_result))
             return
 
         if registered_user_id is not None:
@@ -236,18 +236,18 @@ class RegisterRestServlet(RestServlet):
 
             add_email = True
 
-        result = yield self._create_registration_details(
+        return_dict = yield self._create_registration_details(
             registered_user_id, params
         )
 
-        if add_email and result and LoginType.EMAIL_IDENTITY in result:
-            threepid = result[LoginType.EMAIL_IDENTITY]
+        if add_email and auth_result and LoginType.EMAIL_IDENTITY in auth_result:
+            threepid = auth_result[LoginType.EMAIL_IDENTITY]
             yield self._register_email_threepid(
-                registered_user_id, threepid, result["access_token"],
+                registered_user_id, threepid, return_dict["access_token"],
                 params.get("bind_email")
             )
 
-        defer.returnValue((200, result))
+        defer.returnValue((200, return_dict))
 
     def on_OPTIONS(self, _):
         return 200, {}
@@ -356,8 +356,6 @@ class RegisterRestServlet(RestServlet):
         else:
             logger.info("bind_email not specified: not binding email")
 
-        defer.returnValue()
-
     @defer.inlineCallbacks
     def _create_registration_details(self, user_id, params):
         """Complete registration of newly-registered user
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 3d93285f84..df4000d0da 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -117,24 +117,42 @@ class EventPushActionsStore(SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def get_unread_push_actions_for_user_in_range(self, user_id,
-                                                  min_stream_ordering,
-                                                  max_stream_ordering=None,
-                                                  limit=20):
+    def get_unread_push_actions_for_user_in_range_for_http(
+        self, user_id, min_stream_ordering, max_stream_ordering, limit=20
+    ):
+        """Get a list of the most recent unread push actions for a given user,
+        within the given stream ordering range. Called by the httppusher.
+
+        Args:
+            user_id (str): The user to fetch push actions for.
+            min_stream_ordering(int): The exclusive lower bound on the
+                stream ordering of event push actions to fetch.
+            max_stream_ordering(int): The inclusive upper bound on the
+                stream ordering of event push actions to fetch.
+            limit (int): The maximum number of rows to return.
+        Returns:
+            A promise which resolves to a list of dicts with the keys "event_id",
+            "room_id", "stream_ordering", "actions".
+            The list will be ordered by ascending stream_ordering.
+            The list will have between 0~limit entries.
+        """
+        # find rooms that have a read receipt in them and return the next
+        # push actions
         def get_after_receipt(txn):
+            # find rooms that have a read receipt in them and return the next
+            # push actions
             sql = (
-                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
-                "e.received_ts "
-                "FROM ("
-                "   SELECT room_id, user_id, "
-                "       max(topological_ordering) as topological_ordering, "
-                "       max(stream_ordering) as stream_ordering "
-                "       FROM events"
-                "   NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'"
-                "   GROUP BY room_id, user_id"
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions"
+                " FROM ("
+                "   SELECT room_id,"
+                "       MAX(topological_ordering) as topological_ordering,"
+                "       MAX(stream_ordering) as stream_ordering"
+                "   FROM events"
+                "   INNER JOIN receipts_linearized USING (room_id, event_id)"
+                "   WHERE receipt_type = 'm.read' AND user_id = ?"
+                "   GROUP BY room_id"
                 ") AS rl,"
                 " event_push_actions AS ep"
-                " INNER JOIN events AS e USING (room_id, event_id)"
                 " WHERE"
                 "   ep.room_id = rl.room_id"
                 "   AND ("
@@ -144,44 +162,159 @@ class EventPushActionsStore(SQLBaseStore):
                 "           AND ep.stream_ordering > rl.stream_ordering"
                 "       )"
                 "   )"
-                "   AND ep.stream_ordering > ?"
                 "   AND ep.user_id = ?"
-                "   AND ep.user_id = rl.user_id"
+                "   AND ep.stream_ordering > ?"
+                "   AND ep.stream_ordering <= ?"
+                " ORDER BY ep.stream_ordering ASC LIMIT ?"
             )
-            args = [min_stream_ordering, user_id]
-            if max_stream_ordering is not None:
-                sql += " AND ep.stream_ordering <= ?"
-                args.append(max_stream_ordering)
-            sql += " ORDER BY ep.stream_ordering DESC LIMIT ?"
-            args.append(limit)
+            args = [
+                user_id, user_id,
+                min_stream_ordering, max_stream_ordering, limit,
+            ]
             txn.execute(sql, args)
             return txn.fetchall()
         after_read_receipt = yield self.runInteraction(
-            "get_unread_push_actions_for_user_in_range", get_after_receipt
+            "get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt
         )
 
+        # There are rooms with push actions in them but you don't have a read receipt in
+        # them e.g. rooms you've been invited to, so get push actions for rooms which do
+        # not have read receipts in them too.
         def get_no_receipt(txn):
             sql = (
                 "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
                 " e.received_ts"
                 " FROM event_push_actions AS ep"
-                " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
-                " WHERE ep.room_id not in ("
-                "   SELECT room_id FROM events NATURAL JOIN receipts_linearized"
+                " INNER JOIN events AS e USING (room_id, event_id)"
+                " WHERE"
+                "   ep.room_id NOT IN ("
+                "     SELECT room_id FROM receipts_linearized"
+                "       WHERE receipt_type = 'm.read' AND user_id = ?"
+                "       GROUP BY room_id"
+                "   )"
+                "   AND ep.user_id = ?"
+                "   AND ep.stream_ordering > ?"
+                "   AND ep.stream_ordering <= ?"
+                " ORDER BY ep.stream_ordering ASC LIMIT ?"
+            )
+            args = [
+                user_id, user_id,
+                min_stream_ordering, max_stream_ordering, limit,
+            ]
+            txn.execute(sql, args)
+            return txn.fetchall()
+        no_read_receipt = yield self.runInteraction(
+            "get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt
+        )
+
+        notifs = [
+            {
+                "event_id": row[0],
+                "room_id": row[1],
+                "stream_ordering": row[2],
+                "actions": json.loads(row[3]),
+            } for row in after_read_receipt + no_read_receipt
+        ]
+
+        # Now sort it so it's ordered correctly, since currently it will
+        # contain results from the first query, correctly ordered, followed
+        # by results from the second query, but we want them all ordered
+        # by stream_ordering, oldest first.
+        notifs.sort(key=lambda r: r['stream_ordering'])
+
+        # Take only up to the limit. We have to stop at the limit because
+        # one of the subqueries may have hit the limit.
+        defer.returnValue(notifs[:limit])
+
+    @defer.inlineCallbacks
+    def get_unread_push_actions_for_user_in_range_for_email(
+        self, user_id, min_stream_ordering, max_stream_ordering, limit=20
+    ):
+        """Get a list of the most recent unread push actions for a given user,
+        within the given stream ordering range. Called by the emailpusher
+
+        Args:
+            user_id (str): The user to fetch push actions for.
+            min_stream_ordering(int): The exclusive lower bound on the
+                stream ordering of event push actions to fetch.
+            max_stream_ordering(int): The inclusive upper bound on the
+                stream ordering of event push actions to fetch.
+            limit (int): The maximum number of rows to return.
+        Returns:
+            A promise which resolves to a list of dicts with the keys "event_id",
+            "room_id", "stream_ordering", "actions", "received_ts".
+            The list will be ordered by descending received_ts.
+            The list will have between 0~limit entries.
+        """
+        # find rooms that have a read receipt in them and return the most recent
+        # push actions
+        def get_after_receipt(txn):
+            sql = (
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
+                "  e.received_ts"
+                " FROM ("
+                "   SELECT room_id,"
+                "       MAX(topological_ordering) as topological_ordering,"
+                "       MAX(stream_ordering) as stream_ordering"
+                "   FROM events"
+                "   INNER JOIN receipts_linearized USING (room_id, event_id)"
                 "   WHERE receipt_type = 'm.read' AND user_id = ?"
                 "   GROUP BY room_id"
-                ") AND ep.user_id = ? AND ep.stream_ordering > ?"
+                ") AS rl,"
+                " event_push_actions AS ep"
+                " INNER JOIN events AS e USING (room_id, event_id)"
+                " WHERE"
+                "   ep.room_id = rl.room_id"
+                "   AND ("
+                "       ep.topological_ordering > rl.topological_ordering"
+                "       OR ("
+                "           ep.topological_ordering = rl.topological_ordering"
+                "           AND ep.stream_ordering > rl.stream_ordering"
+                "       )"
+                "   )"
+                "   AND ep.user_id = ?"
+                "   AND ep.stream_ordering > ?"
+                "   AND ep.stream_ordering <= ?"
+                " ORDER BY ep.stream_ordering DESC LIMIT ?"
+            )
+            args = [
+                user_id, user_id,
+                min_stream_ordering, max_stream_ordering, limit,
+            ]
+            txn.execute(sql, args)
+            return txn.fetchall()
+        after_read_receipt = yield self.runInteraction(
+            "get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt
+        )
+
+        # There are rooms with push actions in them but you don't have a read receipt in
+        # them e.g. rooms you've been invited to, so get push actions for rooms which do
+        # not have read receipts in them too.
+        def get_no_receipt(txn):
+            sql = (
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
+                " e.received_ts"
+                " FROM event_push_actions AS ep"
+                " INNER JOIN events AS e USING (room_id, event_id)"
+                " WHERE"
+                "   ep.room_id NOT IN ("
+                "     SELECT room_id FROM receipts_linearized"
+                "       WHERE receipt_type = 'm.read' AND user_id = ?"
+                "       GROUP BY room_id"
+                "   )"
+                "   AND ep.user_id = ?"
+                "   AND ep.stream_ordering > ?"
+                "   AND ep.stream_ordering <= ?"
+                " ORDER BY ep.stream_ordering DESC LIMIT ?"
             )
-            args = [user_id, user_id, min_stream_ordering]
-            if max_stream_ordering is not None:
-                sql += " AND ep.stream_ordering <= ?"
-                args.append(max_stream_ordering)
-            sql += " ORDER BY ep.stream_ordering DESC LIMIT ?"
-            args.append(limit)
+            args = [
+                user_id, user_id,
+                min_stream_ordering, max_stream_ordering, limit,
+            ]
             txn.execute(sql, args)
             return txn.fetchall()
         no_read_receipt = yield self.runInteraction(
-            "get_unread_push_actions_for_user_in_range", get_no_receipt
+            "get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt
         )
 
         # Make a list of dicts from the two sets of results.
@@ -198,7 +331,7 @@ class EventPushActionsStore(SQLBaseStore):
         # Now sort it so it's ordered correctly, since currently it will
         # contain results from the first query, correctly ordered, followed
         # by results from the second query, but we want them all ordered
-        # by received_ts
+        # by received_ts (most recent first)
         notifs.sort(key=lambda r: -(r['received_ts'] or 0))
 
         # Now return the first `limit`
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index a495a8a7d9..86b37b9ddd 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -22,6 +22,10 @@ import OpenSSL
 from signedjson.key import decode_verify_key_bytes
 import hashlib
 
+import logging
+
+logger = logging.getLogger(__name__)
+
 
 class KeyStore(SQLBaseStore):
     """Persistence for signature verification keys and tls X.509 certificates
@@ -74,22 +78,22 @@ class KeyStore(SQLBaseStore):
         )
 
     @cachedInlineCallbacks()
-    def get_all_server_verify_keys(self, server_name):
-        rows = yield self._simple_select_list(
+    def _get_server_verify_key(self, server_name, key_id):
+        verify_key_bytes = yield self._simple_select_one_onecol(
             table="server_signature_keys",
             keyvalues={
                 "server_name": server_name,
+                "key_id": key_id,
             },
-            retcols=["key_id", "verify_key"],
-            desc="get_all_server_verify_keys",
+            retcol="verify_key",
+            desc="_get_server_verify_key",
+            allow_none=True,
         )
 
-        defer.returnValue({
-            row["key_id"]: decode_verify_key_bytes(
-                row["key_id"], str(row["verify_key"])
-            )
-            for row in rows
-        })
+        if verify_key_bytes:
+            defer.returnValue(decode_verify_key_bytes(
+                key_id, str(verify_key_bytes)
+            ))
 
     @defer.inlineCallbacks
     def get_server_verify_keys(self, server_name, key_ids):
@@ -101,12 +105,12 @@ class KeyStore(SQLBaseStore):
         Returns:
             (list of VerifyKey): The verification keys.
         """
-        keys = yield self.get_all_server_verify_keys(server_name)
-        defer.returnValue({
-            k: keys[k]
-            for k in key_ids
-            if k in keys and keys[k]
-        })
+        keys = {}
+        for key_id in key_ids:
+            key = yield self._get_server_verify_key(server_name, key_id)
+            if key:
+                keys[key_id] = key
+        defer.returnValue(keys)
 
     @defer.inlineCallbacks
     def store_server_verify_key(self, server_name, from_server, time_now_ms,
@@ -133,8 +137,6 @@ class KeyStore(SQLBaseStore):
             desc="store_server_verify_key",
         )
 
-        self.get_all_server_verify_keys.invalidate((server_name,))
-
     def store_server_keys_json(self, server_name, key_id, from_server,
                                ts_now_ms, ts_expires_ms, key_json_bytes):
         """Stores the JSON bytes for a set of keys from a server
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 36686b479e..00af539880 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -24,9 +24,12 @@ class ResponseCache(object):
     used rather than trying to compute a new response.
     """
 
-    def __init__(self):
+    def __init__(self, hs, timeout_ms=0):
         self.pending_result_cache = {}  # Requests that haven't finished yet.
 
+        self.clock = hs.get_clock()
+        self.timeout_sec = timeout_ms / 1000.
+
     def get(self, key):
         result = self.pending_result_cache.get(key)
         if result is not None:
@@ -39,7 +42,13 @@ class ResponseCache(object):
         self.pending_result_cache[key] = result
 
         def remove(r):
-            self.pending_result_cache.pop(key, None)
+            if self.timeout_sec:
+                self.clock.call_later(
+                    self.timeout_sec,
+                    self.pending_result_cache.pop, key, None,
+                )
+            else:
+                self.pending_result_cache.pop(key, None)
             return r
 
         result.addBoth(remove)
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 43cf11f3f6..49527f4d21 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -128,7 +128,7 @@ class RetryDestinationLimiter(object):
             )
 
         valid_err_code = False
-        if exc_type is CodeMessageException:
+        if exc_type is not None and issubclass(exc_type, CodeMessageException):
             valid_err_code = 0 <= exc_val.code < 500
 
         if exc_type is None or valid_err_code: