summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/federation_sender.py331
-rw-r--r--synapse/config/server.py5
-rw-r--r--synapse/federation/__init__.py7
-rw-r--r--synapse/federation/federation_client.py62
-rw-r--r--synapse/federation/replication.py5
-rw-r--r--synapse/federation/send_queue.py298
-rw-r--r--synapse/federation/transaction_queue.py95
-rw-r--r--synapse/handlers/__init__.py2
-rw-r--r--synapse/handlers/devicemessage.py4
-rw-r--r--synapse/handlers/federation.py53
-rw-r--r--synapse/handlers/initial_sync.py7
-rw-r--r--synapse/handlers/message.py13
-rw-r--r--synapse/handlers/presence.py11
-rw-r--r--synapse/handlers/receipts.py4
-rw-r--r--synapse/handlers/typing.py4
-rw-r--r--synapse/notifier.py9
-rw-r--r--synapse/replication/expire_cache.py60
-rw-r--r--synapse/replication/resource.py35
-rw-r--r--synapse/replication/slave/storage/_base.py19
-rw-r--r--synapse/replication/slave/storage/deviceinbox.py20
-rw-r--r--synapse/replication/slave/storage/events.py14
-rw-r--r--synapse/replication/slave/storage/transactions.py10
-rw-r--r--synapse/rest/client/v2_alpha/receipts.py2
-rw-r--r--synapse/server.py28
-rw-r--r--synapse/storage/_base.py18
-rw-r--r--synapse/storage/deviceinbox.py26
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/presence.py7
-rw-r--r--synapse/storage/schema/delta/39/device_federation_stream_idx.sql16
-rw-r--r--synapse/storage/schema/delta/39/federation_out_position.sql22
-rw-r--r--synapse/storage/stream.py47
-rw-r--r--synapse/storage/transactions.py47
-rw-r--r--synapse/util/jsonobject.py17
-rw-r--r--synapse/util/retryutils.py21
34 files changed, 1108 insertions, 213 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
new file mode 100644
index 0000000000..80ea4c8062
--- /dev/null
+++ b/synapse/app/federation_sender.py
@@ -0,0 +1,331 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.server import HomeServer
+from synapse.config._base import ConfigError
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
+from synapse.crypto import context_factory
+from synapse.http.site import SynapseSite
+from synapse.federation import send_queue
+from synapse.federation.units import Edu
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.storage.engines import create_engine
+from synapse.storage.presence import UserPresenceState
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+
+from synapse import events
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+import ujson as json
+
+logger = logging.getLogger("synapse.app.appservice")
+
+
+class FederationSenderSlaveStore(
+    SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
+    SlavedRegistrationStore,
+):
+    pass
+
+
+class FederationSenderServer(HomeServer):
+    def get_db_conn(self, run_new_connection=True):
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_address = listener_config.get("bind_address", "")
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+
+        root_resource = create_resource_tree(resources, Resource())
+        reactor.listenTCP(
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+            ),
+            interface=bind_address
+        )
+        logger.info("Synapse federation_sender now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                reactor.listenTCP(
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    ),
+                    interface=listener.get("bind_address", '127.0.0.1')
+                )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+    @defer.inlineCallbacks
+    def replicate(self):
+        http_client = self.get_simple_http_client()
+        store = self.get_datastore()
+        replication_url = self.config.worker_replication_url
+        send_handler = FederationSenderHandler(self)
+
+        send_handler.on_start()
+
+        while True:
+            try:
+                args = store.stream_positions()
+                args.update((yield send_handler.stream_positions()))
+                args["timeout"] = 30000
+                result = yield http_client.get_json(replication_url, args=args)
+                yield store.process_replication(result)
+                yield send_handler.process_replication(result)
+            except:
+                logger.exception("Error replicating from %r", replication_url)
+                yield sleep(30)
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse federation sender", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.federation_sender"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
+
+    events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+    database_engine = create_engine(config.database_config)
+
+    if config.send_federation:
+        sys.stderr.write(
+            "\nThe send_federation must be disabled in the main synapse process"
+            "\nbefore they can be run in a separate worker."
+            "\nPlease add ``send_federation: false`` to the main config"
+            "\n"
+        )
+        sys.exit(1)
+
+    # Force the pushers to start since they will be disabled in the main config
+    config.send_federation = True
+
+    tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+    ps = FederationSenderServer(
+        config.server_name,
+        db_config=config.database_config,
+        tls_server_context_factory=tls_server_context_factory,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+    )
+
+    ps.setup()
+    ps.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
+
+    def start():
+        ps.replicate()
+        ps.get_datastore().start_profiling()
+        ps.get_state_handler().start_caching()
+
+    reactor.callWhenRunning(start)
+
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-federation-sender",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
+
+
+class FederationSenderHandler(object):
+    """Processes the replication stream and forwards the appropriate entries
+    to the federation sender.
+    """
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.federation_sender = hs.get_federation_sender()
+
+        self._room_serials = {}
+        self._room_typing = {}
+
+    def on_start(self):
+        # There may be some events that are persisted but haven't been sent,
+        # so send them now.
+        self.federation_sender.notify_new_events(
+            self.store.get_room_max_stream_ordering()
+        )
+
+    @defer.inlineCallbacks
+    def stream_positions(self):
+        stream_id = yield self.store.get_federation_out_pos("federation")
+        defer.returnValue({
+            "federation": stream_id,
+
+            # Ack stuff we've "processed", this should only be called from
+            # one process.
+            "federation_ack": stream_id,
+        })
+
+    @defer.inlineCallbacks
+    def process_replication(self, result):
+        # The federation stream contains things that we want to send out, e.g.
+        # presence, typing, etc.
+        fed_stream = result.get("federation")
+        if fed_stream:
+            latest_id = int(fed_stream["position"])
+
+            # The federation stream containis a bunch of different types of
+            # rows that need to be handled differently. We parse the rows, put
+            # them into the appropriate collection and then send them off.
+            presence_to_send = {}
+            keyed_edus = {}
+            edus = {}
+            failures = {}
+            device_destinations = set()
+
+            # Parse the rows in the stream
+            for row in fed_stream["rows"]:
+                position, typ, content_js = row
+                content = json.loads(content_js)
+
+                if typ == send_queue.PRESENCE_TYPE:
+                    destination = content["destination"]
+                    state = UserPresenceState.from_dict(content["state"])
+
+                    presence_to_send.setdefault(destination, []).append(state)
+                elif typ == send_queue.KEYED_EDU_TYPE:
+                    key = content["key"]
+                    edu = Edu(**content["edu"])
+
+                    keyed_edus.setdefault(
+                        edu.destination, {}
+                    )[(edu.destination, tuple(key))] = edu
+                elif typ == send_queue.EDU_TYPE:
+                    edu = Edu(**content)
+
+                    edus.setdefault(edu.destination, []).append(edu)
+                elif typ == send_queue.FAILURE_TYPE:
+                    destination = content["destination"]
+                    failure = content["failure"]
+
+                    failures.setdefault(destination, []).append(failure)
+                elif typ == send_queue.DEVICE_MESSAGE_TYPE:
+                    device_destinations.add(content["destination"])
+                else:
+                    raise Exception("Unrecognised federation type: %r", typ)
+
+            # We've finished collecting, send everything off
+            for destination, states in presence_to_send.items():
+                self.federation_sender.send_presence(destination, states)
+
+            for destination, edu_map in keyed_edus.items():
+                for key, edu in edu_map.items():
+                    self.federation_sender.send_edu(
+                        edu.destination, edu.edu_type, edu.content, key=key,
+                    )
+
+            for destination, edu_list in edus.items():
+                for edu in edu_list:
+                    self.federation_sender.send_edu(
+                        edu.destination, edu.edu_type, edu.content, key=None,
+                    )
+
+            for destination, failure_list in failures.items():
+                for failure in failure_list:
+                    self.federation_sender.send_failure(destination, failure)
+
+            for destination in device_destinations:
+                self.federation_sender.send_device_messages(destination)
+
+            # Record where we are in the stream.
+            yield self.store.update_federation_out_pos(
+                "federation", latest_id
+            )
+
+        # We also need to poke the federation sender when new events happen
+        event_stream = result.get("events")
+        if event_stream:
+            latest_pos = event_stream["position"]
+            self.federation_sender.notify_new_events(latest_pos)
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        start(sys.argv[1:])
diff --git a/synapse/config/server.py b/synapse/config/server.py
index ed5417d0c3..634d8e6fe5 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -30,6 +30,11 @@ class ServerConfig(Config):
         self.use_frozen_dicts = config.get("use_frozen_dicts", False)
         self.public_baseurl = config.get("public_baseurl")
 
+        # Whether to send federation traffic out in this process. This only
+        # applies to some federation traffic, and so shouldn't be used to
+        # "disable" federation
+        self.send_federation = config.get("send_federation", True)
+
         if self.public_baseurl is not None:
             if self.public_baseurl[-1] != '/':
                 self.public_baseurl += '/'
diff --git a/synapse/federation/__init__.py b/synapse/federation/__init__.py
index 979fdf2431..2e32d245ba 100644
--- a/synapse/federation/__init__.py
+++ b/synapse/federation/__init__.py
@@ -17,10 +17,9 @@
 """
 
 from .replication import ReplicationLayer
-from .transport.client import TransportLayerClient
 
 
-def initialize_http_replication(homeserver):
-    transport = TransportLayerClient(homeserver)
+def initialize_http_replication(hs):
+    transport = hs.get_federation_transport_client()
 
-    return ReplicationLayer(homeserver, transport)
+    return ReplicationLayer(hs, transport)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 94e76b1978..b255709165 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -18,7 +18,6 @@ from twisted.internet import defer
 
 from .federation_base import FederationBase
 from synapse.api.constants import Membership
-from .units import Edu
 
 from synapse.api.errors import (
     CodeMessageException, HttpResponseException, SynapseError,
@@ -45,10 +44,6 @@ logger = logging.getLogger(__name__)
 # synapse.federation.federation_client is a silly name
 metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
 
-sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations")
-
-sent_edus_counter = metrics.register_counter("sent_edus")
-
 sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
 
 
@@ -93,63 +88,6 @@ class FederationClient(FederationBase):
         self._get_pdu_cache.start()
 
     @log_function
-    def send_pdu(self, pdu, destinations):
-        """Informs the replication layer about a new PDU generated within the
-        home server that should be transmitted to others.
-
-        TODO: Figure out when we should actually resolve the deferred.
-
-        Args:
-            pdu (Pdu): The new Pdu.
-
-        Returns:
-            Deferred: Completes when we have successfully processed the PDU
-            and replicated it to any interested remote home servers.
-        """
-        order = self._order
-        self._order += 1
-
-        sent_pdus_destination_dist.inc_by(len(destinations))
-
-        logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
-
-        # TODO, add errback, etc.
-        self._transaction_queue.enqueue_pdu(pdu, destinations, order)
-
-        logger.debug(
-            "[%s] transaction_layer.enqueue_pdu... done",
-            pdu.event_id
-        )
-
-    def send_presence(self, destination, states):
-        if destination != self.server_name:
-            self._transaction_queue.enqueue_presence(destination, states)
-
-    @log_function
-    def send_edu(self, destination, edu_type, content, key=None):
-        edu = Edu(
-            origin=self.server_name,
-            destination=destination,
-            edu_type=edu_type,
-            content=content,
-        )
-
-        sent_edus_counter.inc()
-
-        self._transaction_queue.enqueue_edu(edu, key=key)
-
-    @log_function
-    def send_device_messages(self, destination):
-        """Sends the device messages in the local database to the remote
-        destination"""
-        self._transaction_queue.enqueue_device_messages(destination)
-
-    @log_function
-    def send_failure(self, failure, destination):
-        self._transaction_queue.enqueue_failure(failure, destination)
-        return defer.succeed(None)
-
-    @log_function
     def make_query(self, destination, query_type, args,
                    retry_on_dns_fail=False):
         """Sends a federation Query to a remote homeserver of the given type
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index ea66a5dcbc..62d865ec4b 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -20,8 +20,6 @@ a given transport.
 from .federation_client import FederationClient
 from .federation_server import FederationServer
 
-from .transaction_queue import TransactionQueue
-
 from .persistence import TransactionActions
 
 import logging
@@ -66,9 +64,6 @@ class ReplicationLayer(FederationClient, FederationServer):
         self._clock = hs.get_clock()
 
         self.transaction_actions = TransactionActions(self.store)
-        self._transaction_queue = TransactionQueue(hs, transport_layer)
-
-        self._order = 0
 
         self.hs = hs
 
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
new file mode 100644
index 0000000000..5c9f7a86f0
--- /dev/null
+++ b/synapse/federation/send_queue.py
@@ -0,0 +1,298 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A federation sender that forwards things to be sent across replication to
+a worker process.
+
+It assumes there is a single worker process feeding off of it.
+
+Each row in the replication stream consists of a type and some json, where the
+types indicate whether they are presence, or edus, etc.
+
+Ephemeral or non-event data are queued up in-memory. When the worker requests
+updates since a particular point, all in-memory data since before that point is
+dropped. We also expire things in the queue after 5 minutes, to ensure that a
+dead worker doesn't cause the queues to grow limitlessly.
+
+Events are replicated via a separate events stream.
+"""
+
+from .units import Edu
+
+from synapse.util.metrics import Measure
+import synapse.metrics
+
+from blist import sorteddict
+import ujson
+
+
+metrics = synapse.metrics.get_metrics_for(__name__)
+
+
+PRESENCE_TYPE = "p"
+KEYED_EDU_TYPE = "k"
+EDU_TYPE = "e"
+FAILURE_TYPE = "f"
+DEVICE_MESSAGE_TYPE = "d"
+
+
+class FederationRemoteSendQueue(object):
+    """A drop in replacement for TransactionQueue"""
+
+    def __init__(self, hs):
+        self.server_name = hs.hostname
+        self.clock = hs.get_clock()
+
+        self.presence_map = {}
+        self.presence_changed = sorteddict()
+
+        self.keyed_edu = {}
+        self.keyed_edu_changed = sorteddict()
+
+        self.edus = sorteddict()
+
+        self.failures = sorteddict()
+
+        self.device_messages = sorteddict()
+
+        self.pos = 1
+        self.pos_time = sorteddict()
+
+        # EVERYTHING IS SAD. In particular, python only makes new scopes when
+        # we make a new function, so we need to make a new function so the inner
+        # lambda binds to the queue rather than to the name of the queue which
+        # changes. ARGH.
+        def register(name, queue):
+            metrics.register_callback(
+                queue_name + "_size",
+                lambda: len(queue),
+            )
+
+        for queue_name in [
+            "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
+            "edus", "failures", "device_messages", "pos_time",
+        ]:
+            register(queue_name, getattr(self, queue_name))
+
+        self.clock.looping_call(self._clear_queue, 30 * 1000)
+
+    def _next_pos(self):
+        pos = self.pos
+        self.pos += 1
+        self.pos_time[self.clock.time_msec()] = pos
+        return pos
+
+    def _clear_queue(self):
+        """Clear the queues for anything older than N minutes"""
+
+        FIVE_MINUTES_AGO = 5 * 60 * 1000
+        now = self.clock.time_msec()
+
+        keys = self.pos_time.keys()
+        time = keys.bisect_left(now - FIVE_MINUTES_AGO)
+        if not keys[:time]:
+            return
+
+        position_to_delete = max(keys[:time])
+        for key in keys[:time]:
+            del self.pos_time[key]
+
+        self._clear_queue_before_pos(position_to_delete)
+
+    def _clear_queue_before_pos(self, position_to_delete):
+        """Clear all the queues from before a given position"""
+        with Measure(self.clock, "send_queue._clear"):
+            # Delete things out of presence maps
+            keys = self.presence_changed.keys()
+            i = keys.bisect_left(position_to_delete)
+            for key in keys[:i]:
+                del self.presence_changed[key]
+
+            user_ids = set(
+                user_id for uids in self.presence_changed.values() for _, user_id in uids
+            )
+
+            to_del = [
+                user_id for user_id in self.presence_map if user_id not in user_ids
+            ]
+            for user_id in to_del:
+                del self.presence_map[user_id]
+
+            # Delete things out of keyed edus
+            keys = self.keyed_edu_changed.keys()
+            i = keys.bisect_left(position_to_delete)
+            for key in keys[:i]:
+                del self.keyed_edu_changed[key]
+
+            live_keys = set()
+            for edu_key in self.keyed_edu_changed.values():
+                live_keys.add(edu_key)
+
+            to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys]
+            for edu_key in to_del:
+                del self.keyed_edu[edu_key]
+
+            # Delete things out of edu map
+            keys = self.edus.keys()
+            i = keys.bisect_left(position_to_delete)
+            for key in keys[:i]:
+                del self.edus[key]
+
+            # Delete things out of failure map
+            keys = self.failures.keys()
+            i = keys.bisect_left(position_to_delete)
+            for key in keys[:i]:
+                del self.failures[key]
+
+            # Delete things out of device map
+            keys = self.device_messages.keys()
+            i = keys.bisect_left(position_to_delete)
+            for key in keys[:i]:
+                del self.device_messages[key]
+
+    def notify_new_events(self, current_id):
+        """As per TransactionQueue"""
+        # We don't need to replicate this as it gets sent down a different
+        # stream.
+        pass
+
+    def send_edu(self, destination, edu_type, content, key=None):
+        """As per TransactionQueue"""
+        pos = self._next_pos()
+
+        edu = Edu(
+            origin=self.server_name,
+            destination=destination,
+            edu_type=edu_type,
+            content=content,
+        )
+
+        if key:
+            assert isinstance(key, tuple)
+            self.keyed_edu[(destination, key)] = edu
+            self.keyed_edu_changed[pos] = (destination, key)
+        else:
+            self.edus[pos] = edu
+
+    def send_presence(self, destination, states):
+        """As per TransactionQueue"""
+        pos = self._next_pos()
+
+        self.presence_map.update({
+            state.user_id: state
+            for state in states
+        })
+
+        self.presence_changed[pos] = [
+            (destination, state.user_id) for state in states
+        ]
+
+    def send_failure(self, failure, destination):
+        """As per TransactionQueue"""
+        pos = self._next_pos()
+
+        self.failures[pos] = (destination, str(failure))
+
+    def send_device_messages(self, destination):
+        """As per TransactionQueue"""
+        pos = self._next_pos()
+        self.device_messages[pos] = destination
+
+    def get_current_token(self):
+        return self.pos - 1
+
+    def get_replication_rows(self, token, limit, federation_ack=None):
+        """
+        Args:
+            token (int)
+            limit (int)
+            federation_ack (int): Optional. The position where the worker is
+                explicitly acknowledged it has handled. Allows us to drop
+                data from before that point
+        """
+        # TODO: Handle limit.
+
+        # To handle restarts where we wrap around
+        if token > self.pos:
+            token = -1
+
+        rows = []
+
+        # There should be only one reader, so lets delete everything its
+        # acknowledged its seen.
+        if federation_ack:
+            self._clear_queue_before_pos(federation_ack)
+
+        # Fetch changed presence
+        keys = self.presence_changed.keys()
+        i = keys.bisect_right(token)
+        dest_user_ids = set(
+            (pos, dest_user_id)
+            for pos in keys[i:]
+            for dest_user_id in self.presence_changed[pos]
+        )
+
+        for (key, (dest, user_id)) in dest_user_ids:
+            rows.append((key, PRESENCE_TYPE, ujson.dumps({
+                "destination": dest,
+                "state": self.presence_map[user_id].as_dict(),
+            })))
+
+        # Fetch changes keyed edus
+        keys = self.keyed_edu_changed.keys()
+        i = keys.bisect_right(token)
+        keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
+
+        for (pos, (destination, edu_key)) in keyed_edus:
+            rows.append(
+                (pos, KEYED_EDU_TYPE, ujson.dumps({
+                    "key": edu_key,
+                    "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(),
+                }))
+            )
+
+        # Fetch changed edus
+        keys = self.edus.keys()
+        i = keys.bisect_right(token)
+        edus = set((k, self.edus[k]) for k in keys[i:])
+
+        for (pos, edu) in edus:
+            rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
+
+        # Fetch changed failures
+        keys = self.failures.keys()
+        i = keys.bisect_right(token)
+        failures = set((k, self.failures[k]) for k in keys[i:])
+
+        for (pos, (destination, failure)) in failures:
+            rows.append((pos, FAILURE_TYPE, ujson.dumps({
+                "destination": destination,
+                "failure": failure,
+            })))
+
+        # Fetch changed device messages
+        keys = self.device_messages.keys()
+        i = keys.bisect_right(token)
+        device_messages = set((k, self.device_messages[k]) for k in keys[i:])
+
+        for (pos, destination) in device_messages:
+            rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
+                "destination": destination,
+            })))
+
+        # Sort rows based on pos
+        rows.sort()
+
+        return rows
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index f8ca93e4c3..c94c74a67e 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
 from .persistence import TransactionActions
 from .units import Transaction, Edu
 
+from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import HttpResponseException
 from synapse.util.async import run_on_reactor
 from synapse.util.logcontext import preserve_context_over_fn
@@ -26,6 +27,7 @@ from synapse.util.retryutils import (
     get_retry_limiter, NotRetryingDestination,
 )
 from synapse.util.metrics import measure_func
+from synapse.types import get_domain_from_id
 from synapse.handlers.presence import format_user_presence_state
 import synapse.metrics
 
@@ -36,6 +38,12 @@ logger = logging.getLogger(__name__)
 
 metrics = synapse.metrics.get_metrics_for(__name__)
 
+client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
+sent_pdus_destination_dist = client_metrics.register_distribution(
+    "sent_pdu_destinations"
+)
+sent_edus_counter = client_metrics.register_counter("sent_edus")
+
 
 class TransactionQueue(object):
     """This class makes sure we only have one transaction in flight at
@@ -44,13 +52,14 @@ class TransactionQueue(object):
     It batches pending PDUs into single transactions.
     """
 
-    def __init__(self, hs, transport_layer):
+    def __init__(self, hs):
         self.server_name = hs.hostname
 
         self.store = hs.get_datastore()
+        self.state = hs.get_state_handler()
         self.transaction_actions = TransactionActions(self.store)
 
-        self.transport_layer = transport_layer
+        self.transport_layer = hs.get_federation_transport_client()
 
         self.clock = hs.get_clock()
 
@@ -95,6 +104,11 @@ class TransactionQueue(object):
         # HACK to get unique tx id
         self._next_txn_id = int(self.clock.time_msec())
 
+        self._order = 1
+
+        self._is_processing = False
+        self._last_poked_id = -1
+
     def can_send_to(self, destination):
         """Can we send messages to the given server?
 
@@ -115,11 +129,61 @@ class TransactionQueue(object):
         else:
             return not destination.startswith("localhost")
 
-    def enqueue_pdu(self, pdu, destinations, order):
+    @defer.inlineCallbacks
+    def notify_new_events(self, current_id):
+        """This gets called when we have some new events we might want to
+        send out to other servers.
+        """
+        self._last_poked_id = max(current_id, self._last_poked_id)
+
+        if self._is_processing:
+            return
+
+        try:
+            self._is_processing = True
+            while True:
+                last_token = yield self.store.get_federation_out_pos("events")
+                next_token, events = yield self.store.get_all_new_events_stream(
+                    last_token, self._last_poked_id, limit=20,
+                )
+
+                logger.debug("Handling %s -> %s", last_token, next_token)
+
+                if not events and next_token >= self._last_poked_id:
+                    break
+
+                for event in events:
+                    users_in_room = yield self.state.get_current_user_in_room(
+                        event.room_id, latest_event_ids=[event.event_id],
+                    )
+
+                    destinations = set(
+                        get_domain_from_id(user_id) for user_id in users_in_room
+                    )
+
+                    if event.type == EventTypes.Member:
+                        if event.content["membership"] == Membership.JOIN:
+                            destinations.add(get_domain_from_id(event.state_key))
+
+                    logger.debug("Sending %s to %r", event, destinations)
+
+                    self._send_pdu(event, destinations)
+
+                yield self.store.update_federation_out_pos(
+                    "events", next_token
+                )
+
+        finally:
+            self._is_processing = False
+
+    def _send_pdu(self, pdu, destinations):
         # We loop through all destinations to see whether we already have
         # a transaction in progress. If we do, stick it in the pending_pdus
         # table and we'll get back to it later.
 
+        order = self._order
+        self._order += 1
+
         destinations = set(destinations)
         destinations = set(
             dest for dest in destinations if self.can_send_to(dest)
@@ -130,6 +194,8 @@ class TransactionQueue(object):
         if not destinations:
             return
 
+        sent_pdus_destination_dist.inc_by(len(destinations))
+
         for destination in destinations:
             self.pending_pdus_by_dest.setdefault(destination, []).append(
                 (pdu, order)
@@ -139,7 +205,10 @@ class TransactionQueue(object):
                 self._attempt_new_transaction, destination
             )
 
-    def enqueue_presence(self, destination, states):
+    def send_presence(self, destination, states):
+        if not self.can_send_to(destination):
+            return
+
         self.pending_presence_by_dest.setdefault(destination, {}).update({
             state.user_id: state for state in states
         })
@@ -148,12 +217,19 @@ class TransactionQueue(object):
             self._attempt_new_transaction, destination
         )
 
-    def enqueue_edu(self, edu, key=None):
-        destination = edu.destination
+    def send_edu(self, destination, edu_type, content, key=None):
+        edu = Edu(
+            origin=self.server_name,
+            destination=destination,
+            edu_type=edu_type,
+            content=content,
+        )
 
         if not self.can_send_to(destination):
             return
 
+        sent_edus_counter.inc()
+
         if key:
             self.pending_edus_keyed_by_dest.setdefault(
                 destination, {}
@@ -165,7 +241,7 @@ class TransactionQueue(object):
             self._attempt_new_transaction, destination
         )
 
-    def enqueue_failure(self, failure, destination):
+    def send_failure(self, failure, destination):
         if destination == self.server_name or destination == "localhost":
             return
 
@@ -180,7 +256,7 @@ class TransactionQueue(object):
             self._attempt_new_transaction, destination
         )
 
-    def enqueue_device_messages(self, destination):
+    def send_device_messages(self, destination):
         if destination == self.server_name or destination == "localhost":
             return
 
@@ -191,6 +267,9 @@ class TransactionQueue(object):
             self._attempt_new_transaction, destination
         )
 
+    def get_current_token(self):
+        return 0
+
     @defer.inlineCallbacks
     def _attempt_new_transaction(self, destination):
         # list of (pending_pdu, deferred, order)
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 63d05f2531..5ad408f549 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -24,7 +24,6 @@ from .profile import ProfileHandler
 from .directory import DirectoryHandler
 from .admin import AdminHandler
 from .identity import IdentityHandler
-from .receipts import ReceiptsHandler
 from .search import SearchHandler
 
 
@@ -56,7 +55,6 @@ class Handlers(object):
         self.profile_handler = ProfileHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
         self.admin_handler = AdminHandler(hs)
-        self.receipts_handler = ReceiptsHandler(hs)
         self.identity_handler = IdentityHandler(hs)
         self.search_handler = SearchHandler(hs)
         self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index c5368e5df2..f7fad15c62 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -34,9 +34,9 @@ class DeviceMessageHandler(object):
         self.store = hs.get_datastore()
         self.notifier = hs.get_notifier()
         self.is_mine_id = hs.is_mine_id
-        self.federation = hs.get_replication_layer()
+        self.federation = hs.get_federation_sender()
 
-        self.federation.register_edu_handler(
+        hs.get_replication_layer().register_edu_handler(
             "m.direct_to_device", self.on_direct_to_device_edu
         )
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2d801bad47..771ab3bc43 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -80,22 +80,6 @@ class FederationHandler(BaseHandler):
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
 
-    def handle_new_event(self, event, destinations):
-        """ Takes in an event from the client to server side, that has already
-        been authed and handled by the state module, and sends it to any
-        remote home servers that may be interested.
-
-        Args:
-            event: The event to send
-            destinations: A list of destinations to send it to
-
-        Returns:
-            Deferred: Resolved when it has successfully been queued for
-            processing.
-        """
-
-        return self.replication_layer.send_pdu(event, destinations)
-
     @log_function
     @defer.inlineCallbacks
     def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
@@ -830,25 +814,6 @@ class FederationHandler(BaseHandler):
                 user = UserID.from_string(event.state_key)
                 yield user_joined_room(self.distributor, user, event.room_id)
 
-        new_pdu = event
-
-        users_in_room = yield self.store.get_joined_users_from_context(event, context)
-
-        destinations = set(
-            get_domain_from_id(user_id) for user_id in users_in_room
-            if not self.hs.is_mine_id(user_id)
-        )
-
-        destinations.discard(origin)
-
-        logger.debug(
-            "on_send_join_request: Sending event: %s, signatures: %s",
-            event.event_id,
-            event.signatures,
-        )
-
-        self.replication_layer.send_pdu(new_pdu, destinations)
-
         state_ids = context.prev_state_ids.values()
         auth_chain = yield self.store.get_auth_chain(set(
             [event.event_id] + state_ids
@@ -1055,24 +1020,6 @@ class FederationHandler(BaseHandler):
                 event, event_stream_id, max_stream_id, extra_users=extra_users
             )
 
-        new_pdu = event
-
-        users_in_room = yield self.store.get_joined_users_from_context(event, context)
-
-        destinations = set(
-            get_domain_from_id(user_id) for user_id in users_in_room
-            if not self.hs.is_mine_id(user_id)
-        )
-        destinations.discard(origin)
-
-        logger.debug(
-            "on_send_leave_request: Sending event: %s, signatures: %s",
-            event.event_id,
-            event.signatures,
-        )
-
-        self.replication_layer.send_pdu(new_pdu, destinations)
-
         defer.returnValue(None)
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index fbfa5a0281..e0ade4c164 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -372,11 +372,12 @@ class InitialSyncHandler(BaseHandler):
 
         @defer.inlineCallbacks
         def get_receipts():
-            receipts_handler = self.hs.get_handlers().receipts_handler
-            receipts = yield receipts_handler.get_receipts_for_room(
+            receipts = yield self.store.get_linearized_receipts_for_room(
                 room_id,
-                now_token.receipt_key
+                to_key=now_token.receipt_key,
             )
+            if not receipts:
+                receipts = []
             defer.returnValue(receipts)
 
         presence, receipts, (messages, token) = yield defer.gatherResults(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 81df45177a..fd09397226 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,7 +22,7 @@ from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.push.action_generator import ActionGenerator
 from synapse.types import (
-    UserID, RoomAlias, RoomStreamToken, get_domain_from_id
+    UserID, RoomAlias, RoomStreamToken,
 )
 from synapse.util.async import run_on_reactor, ReadWriteLock
 from synapse.util.logcontext import preserve_fn
@@ -599,13 +599,6 @@ class MessageHandler(BaseHandler):
             event_stream_id, max_stream_id
         )
 
-        users_in_room = yield self.store.get_joined_users_from_context(event, context)
-
-        destinations = [
-            get_domain_from_id(user_id) for user_id in users_in_room
-            if not self.hs.is_mine_id(user_id)
-        ]
-
         @defer.inlineCallbacks
         def _notify():
             yield run_on_reactor()
@@ -618,7 +611,3 @@ class MessageHandler(BaseHandler):
 
         # If invite, remove room_state from unsigned before sending.
         event.unsigned.pop("invite_room_state", None)
-
-        preserve_fn(federation_handler.handle_new_event)(
-            event, destinations=destinations,
-        )
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index b047ae2250..1b89dc6274 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -91,28 +91,29 @@ class PresenceHandler(object):
         self.store = hs.get_datastore()
         self.wheel_timer = WheelTimer()
         self.notifier = hs.get_notifier()
-        self.federation = hs.get_replication_layer()
+        self.replication = hs.get_replication_layer()
+        self.federation = hs.get_federation_sender()
 
         self.state = hs.get_state_handler()
 
-        self.federation.register_edu_handler(
+        self.replication.register_edu_handler(
             "m.presence", self.incoming_presence
         )
-        self.federation.register_edu_handler(
+        self.replication.register_edu_handler(
             "m.presence_invite",
             lambda origin, content: self.invite_presence(
                 observed_user=UserID.from_string(content["observed_user"]),
                 observer_user=UserID.from_string(content["observer_user"]),
             )
         )
-        self.federation.register_edu_handler(
+        self.replication.register_edu_handler(
             "m.presence_accept",
             lambda origin, content: self.accept_presence(
                 observed_user=UserID.from_string(content["observed_user"]),
                 observer_user=UserID.from_string(content["observer_user"]),
             )
         )
-        self.federation.register_edu_handler(
+        self.replication.register_edu_handler(
             "m.presence_deny",
             lambda origin, content: self.deny_presence(
                 observed_user=UserID.from_string(content["observed_user"]),
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index e536a909d0..916e80a48e 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -33,8 +33,8 @@ class ReceiptsHandler(BaseHandler):
         self.server_name = hs.config.server_name
         self.store = hs.get_datastore()
         self.hs = hs
-        self.federation = hs.get_replication_layer()
-        self.federation.register_edu_handler(
+        self.federation = hs.get_federation_sender()
+        hs.get_replication_layer().register_edu_handler(
             "m.receipt", self._received_remote_receipt
         )
         self.clock = self.hs.get_clock()
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 27ee715ff0..0eea7f8f9c 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -55,9 +55,9 @@ class TypingHandler(object):
         self.clock = hs.get_clock()
         self.wheel_timer = WheelTimer(bucket_size=5000)
 
-        self.federation = hs.get_replication_layer()
+        self.federation = hs.get_federation_sender()
 
-        self.federation.register_edu_handler("m.typing", self._recv_edu)
+        hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu)
 
         hs.get_distributor().observe("user_left_room", self.user_left_room)
 
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 48653ae843..054ca59ad2 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -143,6 +143,12 @@ class Notifier(object):
 
         self.clock = hs.get_clock()
         self.appservice_handler = hs.get_application_service_handler()
+
+        if hs.should_send_federation():
+            self.federation_sender = hs.get_federation_sender()
+        else:
+            self.federation_sender = None
+
         self.state_handler = hs.get_state_handler()
 
         self.clock.looping_call(
@@ -220,6 +226,9 @@ class Notifier(object):
         # poke any interested application service.
         self.appservice_handler.notify_interested_services(room_stream_id)
 
+        if self.federation_sender:
+            self.federation_sender.notify_new_events(room_stream_id)
+
         if event.type == EventTypes.Member and event.membership == Membership.JOIN:
             self._user_joined_room(event.state_key, event.room_id)
 
diff --git a/synapse/replication/expire_cache.py b/synapse/replication/expire_cache.py
new file mode 100644
index 0000000000..c05a50d7a6
--- /dev/null
+++ b/synapse/replication/expire_cache.py
@@ -0,0 +1,60 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.http.server import respond_with_json_bytes, request_handler
+from synapse.http.servlet import parse_json_object_from_request
+
+from twisted.web.resource import Resource
+from twisted.web.server import NOT_DONE_YET
+
+
+class ExpireCacheResource(Resource):
+    """
+    HTTP endpoint for expiring storage caches.
+
+    POST /_synapse/replication/expire_cache HTTP/1.1
+    Content-Type: application/json
+
+    {
+        "invalidate": [
+            {
+                "name": "func_name",
+                "keys": ["key1", "key2"]
+            }
+        ]
+    }
+    """
+
+    def __init__(self, hs):
+        Resource.__init__(self)  # Resource is old-style, so no super()
+
+        self.store = hs.get_datastore()
+        self.version_string = hs.version_string
+        self.clock = hs.get_clock()
+
+    def render_POST(self, request):
+        self._async_render_POST(request)
+        return NOT_DONE_YET
+
+    @request_handler()
+    def _async_render_POST(self, request):
+        content = parse_json_object_from_request(request)
+
+        for row in content["invalidate"]:
+            name = row["name"]
+            keys = tuple(row["keys"])
+
+            getattr(self.store, name).invalidate(keys)
+
+        respond_with_json_bytes(request, 200, "{}")
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 5a14c51d23..d79b421cba 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -17,6 +17,7 @@ from synapse.http.servlet import parse_integer, parse_string
 from synapse.http.server import request_handler, finish_request
 from synapse.replication.pusher_resource import PusherResource
 from synapse.replication.presence_resource import PresenceResource
+from synapse.replication.expire_cache import ExpireCacheResource
 from synapse.api.errors import SynapseError
 
 from twisted.web.resource import Resource
@@ -44,6 +45,7 @@ STREAM_NAMES = (
     ("caches",),
     ("to_device",),
     ("public_rooms",),
+    ("federation",),
 )
 
 
@@ -116,11 +118,14 @@ class ReplicationResource(Resource):
         self.sources = hs.get_event_sources()
         self.presence_handler = hs.get_presence_handler()
         self.typing_handler = hs.get_typing_handler()
+        self.federation_sender = hs.get_federation_sender()
         self.notifier = hs.notifier
         self.clock = hs.get_clock()
+        self.config = hs.get_config()
 
         self.putChild("remove_pushers", PusherResource(hs))
         self.putChild("syncing_users", PresenceResource(hs))
+        self.putChild("expire_cache", ExpireCacheResource(hs))
 
     def render_GET(self, request):
         self._async_render_GET(request)
@@ -134,6 +139,7 @@ class ReplicationResource(Resource):
         pushers_token = self.store.get_pushers_stream_token()
         caches_token = self.store.get_cache_stream_token()
         public_rooms_token = self.store.get_current_public_room_stream_id()
+        federation_token = self.federation_sender.get_current_token()
 
         defer.returnValue(_ReplicationToken(
             room_stream_token,
@@ -148,6 +154,7 @@ class ReplicationResource(Resource):
             caches_token,
             int(stream_token.to_device_key),
             int(public_rooms_token),
+            int(federation_token),
         ))
 
     @request_handler()
@@ -164,8 +171,13 @@ class ReplicationResource(Resource):
         }
         request_streams["streams"] = parse_string(request, "streams")
 
+        federation_ack = parse_integer(request, "federation_ack", None)
+
         def replicate():
-            return self.replicate(request_streams, limit)
+            return self.replicate(
+                request_streams, limit,
+                federation_ack=federation_ack
+            )
 
         writer = yield self.notifier.wait_for_replication(replicate, timeout)
         result = writer.finish()
@@ -183,7 +195,7 @@ class ReplicationResource(Resource):
         finish_request(request)
 
     @defer.inlineCallbacks
-    def replicate(self, request_streams, limit):
+    def replicate(self, request_streams, limit, federation_ack=None):
         writer = _Writer()
         current_token = yield self.current_replication_token()
         logger.debug("Replicating up to %r", current_token)
@@ -202,6 +214,7 @@ class ReplicationResource(Resource):
         yield self.caches(writer, current_token, limit, request_streams)
         yield self.to_device(writer, current_token, limit, request_streams)
         yield self.public_rooms(writer, current_token, limit, request_streams)
+        self.federation(writer, current_token, limit, request_streams, federation_ack)
         self.streams(writer, current_token, request_streams)
 
         logger.debug("Replicated %d rows", writer.total)
@@ -465,6 +478,23 @@ class ReplicationResource(Resource):
                 "position", "room_id", "visibility"
             ), position=upto_token)
 
+    def federation(self, writer, current_token, limit, request_streams, federation_ack):
+        if self.config.send_federation:
+            return
+
+        current_position = current_token.federation
+
+        federation = request_streams.get("federation")
+
+        if federation is not None and federation != current_position:
+            federation_rows = self.federation_sender.get_replication_rows(
+                federation, limit, federation_ack=federation_ack,
+            )
+            upto_token = _position_from_rows(federation_rows, current_position)
+            writer.write_header_and_rows("federation", federation_rows, (
+                "position", "type", "content",
+            ), position=upto_token)
+
 
 class _Writer(object):
     """Writes the streams as a JSON object as the response to the request"""
@@ -497,6 +527,7 @@ class _Writer(object):
 class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
     "events", "presence", "typing", "receipts", "account_data", "backfill",
     "push_rules", "pushers", "state", "caches", "to_device", "public_rooms",
+    "federation",
 ))):
     __slots__ = []
 
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index f19540d6bb..18076e0f3b 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -34,6 +34,9 @@ class BaseSlavedStore(SQLBaseStore):
         else:
             self._cache_id_gen = None
 
+        self.expire_cache_url = hs.config.worker_replication_url + "/expire_cache"
+        self.http_client = hs.get_simple_http_client()
+
     def stream_positions(self):
         pos = {}
         if self._cache_id_gen:
@@ -54,3 +57,19 @@ class BaseSlavedStore(SQLBaseStore):
                     logger.info("Got unexpected cache_func: %r", cache_func)
             self._cache_id_gen.advance(int(stream["position"]))
         return defer.succeed(None)
+
+    def _invalidate_cache_and_stream(self, txn, cache_func, keys):
+        txn.call_after(cache_func.invalidate, keys)
+        txn.call_after(self._send_invalidation_poke, cache_func, keys)
+
+    @defer.inlineCallbacks
+    def _send_invalidation_poke(self, cache_func, keys):
+        try:
+            yield self.http_client.post_json_get_json(self.expire_cache_url, {
+                "invalidate": [{
+                    "name": cache_func.__name__,
+                    "keys": list(keys),
+                }]
+            })
+        except:
+            logger.exception("Failed to poke on expire_cache")
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 3bfd5e8213..cc860f9f9b 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -29,10 +29,16 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
             "DeviceInboxStreamChangeCache",
             self._device_inbox_id_gen.get_current_token()
         )
+        self._device_federation_outbox_stream_cache = StreamChangeCache(
+            "DeviceFederationOutboxStreamChangeCache",
+            self._device_inbox_id_gen.get_current_token()
+        )
 
     get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
     get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
+    get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__
     delete_messages_for_device = DataStore.delete_messages_for_device.__func__
+    delete_device_msgs_for_remote = DataStore.delete_device_msgs_for_remote.__func__
 
     def stream_positions(self):
         result = super(SlavedDeviceInboxStore, self).stream_positions()
@@ -45,9 +51,15 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
             self._device_inbox_id_gen.advance(int(stream["position"]))
             for row in stream["rows"]:
                 stream_id = row[0]
-                user_id = row[1]
-                self._device_inbox_stream_cache.entity_has_changed(
-                    user_id, stream_id
-                )
+                entity = row[1]
+
+                if entity.startswith("@"):
+                    self._device_inbox_stream_cache.entity_has_changed(
+                        entity, stream_id
+                    )
+                else:
+                    self._device_federation_outbox_stream_cache.entity_has_changed(
+                        entity, stream_id
+                    )
 
         return super(SlavedDeviceInboxStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 0c26e96e98..64f18bbb3e 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -26,6 +26,11 @@ from synapse.storage.stream import StreamStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 import ujson as json
+import logging
+
+
+logger = logging.getLogger(__name__)
+
 
 # So, um, we want to borrow a load of functions intended for reading from
 # a DataStore, but we don't want to take functions that either write to the
@@ -180,6 +185,11 @@ class SlavedEventStore(BaseSlavedStore):
         EventFederationStore.__dict__["_get_forward_extremeties_for_room"]
     )
 
+    get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
+
+    get_federation_out_pos = DataStore.get_federation_out_pos.__func__
+    update_federation_out_pos = DataStore.update_federation_out_pos.__func__
+
     def stream_positions(self):
         result = super(SlavedEventStore, self).stream_positions()
         result["events"] = self._stream_id_gen.get_current_token()
@@ -194,6 +204,10 @@ class SlavedEventStore(BaseSlavedStore):
         stream = result.get("events")
         if stream:
             self._stream_id_gen.advance(int(stream["position"]))
+
+            if stream["rows"]:
+                logger.info("Got %d event rows", len(stream["rows"]))
+
             for row in stream["rows"]:
                 self._process_replication_row(
                     row, backfilled=False, state_resets=state_resets
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
index 6f2ba98af5..fbb58f35da 100644
--- a/synapse/replication/slave/storage/transactions.py
+++ b/synapse/replication/slave/storage/transactions.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
 from ._base import BaseSlavedStore
 from synapse.storage import DataStore
 from synapse.storage.transactions import TransactionStore
@@ -22,9 +21,10 @@ from synapse.storage.transactions import TransactionStore
 class TransactionStore(BaseSlavedStore):
     get_destination_retry_timings = TransactionStore.__dict__[
         "get_destination_retry_timings"
-    ].orig
+    ]
     _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
+    set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__
+    _set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__
 
-    # For now, don't record the destination rety timings
-    def set_destination_retry_timings(*args, **kwargs):
-        return defer.succeed(None)
+    prep_send_transaction = DataStore.prep_send_transaction.__func__
+    delivered_txn = DataStore.delivered_txn.__func__
diff --git a/synapse/rest/client/v2_alpha/receipts.py b/synapse/rest/client/v2_alpha/receipts.py
index 891cef99c6..1fbff2edd8 100644
--- a/synapse/rest/client/v2_alpha/receipts.py
+++ b/synapse/rest/client/v2_alpha/receipts.py
@@ -36,7 +36,7 @@ class ReceiptRestServlet(RestServlet):
         super(ReceiptRestServlet, self).__init__()
         self.hs = hs
         self.auth = hs.get_auth()
-        self.receipts_handler = hs.get_handlers().receipts_handler
+        self.receipts_handler = hs.get_receipts_handler()
         self.presence_handler = hs.get_presence_handler()
 
     @defer.inlineCallbacks
diff --git a/synapse/server.py b/synapse/server.py
index 374124a147..0bfb411269 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -32,6 +32,9 @@ from synapse.appservice.scheduler import ApplicationServiceScheduler
 from synapse.crypto.keyring import Keyring
 from synapse.events.builder import EventBuilderFactory
 from synapse.federation import initialize_http_replication
+from synapse.federation.send_queue import FederationRemoteSendQueue
+from synapse.federation.transport.client import TransportLayerClient
+from synapse.federation.transaction_queue import TransactionQueue
 from synapse.handlers import Handlers
 from synapse.handlers.appservice import ApplicationServicesHandler
 from synapse.handlers.auth import AuthHandler
@@ -44,6 +47,7 @@ from synapse.handlers.sync import SyncHandler
 from synapse.handlers.typing import TypingHandler
 from synapse.handlers.events import EventHandler, EventStreamHandler
 from synapse.handlers.initial_sync import InitialSyncHandler
+from synapse.handlers.receipts import ReceiptsHandler
 from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.notifier import Notifier
@@ -124,6 +128,9 @@ class HomeServer(object):
         'http_client_context_factory',
         'simple_http_client',
         'media_repository',
+        'federation_transport_client',
+        'federation_sender',
+        'receipts_handler',
     ]
 
     def __init__(self, hostname, **kwargs):
@@ -265,9 +272,30 @@ class HomeServer(object):
     def build_media_repository(self):
         return MediaRepository(self)
 
+    def build_federation_transport_client(self):
+        return TransportLayerClient(self)
+
+    def build_federation_sender(self):
+        if self.should_send_federation():
+            return TransactionQueue(self)
+        elif not self.config.worker_app:
+            return FederationRemoteSendQueue(self)
+        else:
+            raise Exception("Workers cannot send federation traffic")
+
+    def build_receipts_handler(self):
+        return ReceiptsHandler(self)
+
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
 
+    def should_send_federation(self):
+        "Should this server be sending federation traffic directly?"
+        return self.config.send_federation and (
+            not self.config.worker_app
+            or self.config.worker_app == "synapse.app.federation_sender"
+        )
+
 
 def _make_dependency_method(depname):
     def _get(hs):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index d828d6ee1d..b62c459d8b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -561,12 +561,17 @@ class SQLBaseStore(object):
 
     @staticmethod
     def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
+        if keyvalues:
+            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+        else:
+            where = ""
+
         sql = (
-            "SELECT %(retcol)s FROM %(table)s WHERE %(where)s"
+            "SELECT %(retcol)s FROM %(table)s %(where)s"
         ) % {
             "retcol": retcol,
             "table": table,
-            "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()),
+            "where": where,
         }
 
         txn.execute(sql, keyvalues.values())
@@ -744,10 +749,15 @@ class SQLBaseStore(object):
 
     @staticmethod
     def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
-        update_sql = "UPDATE %s SET %s WHERE %s" % (
+        if keyvalues:
+            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+        else:
+            where = ""
+
+        update_sql = "UPDATE %s SET %s %s" % (
             table,
             ", ".join("%s = ?" % (k,) for k in updatevalues),
-            " AND ".join("%s = ?" % (k,) for k in keyvalues)
+            where,
         )
 
         txn.execute(
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index f640e73714..87398d60bc 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -269,27 +269,29 @@ class DeviceInboxStore(SQLBaseStore):
             return defer.succeed([])
 
         def get_all_new_device_messages_txn(txn):
+            # We limit like this as we might have multiple rows per stream_id, and
+            # we want to make sure we always get all entries for any stream_id
+            # we return.
+            upper_pos = min(current_pos, last_pos + limit)
             sql = (
-                "SELECT stream_id FROM device_inbox"
+                "SELECT stream_id, user_id"
+                " FROM device_inbox"
                 " WHERE ? < stream_id AND stream_id <= ?"
-                " GROUP BY stream_id"
                 " ORDER BY stream_id ASC"
-                " LIMIT ?"
             )
-            txn.execute(sql, (last_pos, current_pos, limit))
-            stream_ids = txn.fetchall()
-            if not stream_ids:
-                return []
-            max_stream_id_in_limit = stream_ids[-1]
+            txn.execute(sql, (last_pos, upper_pos))
+            rows = txn.fetchall()
 
             sql = (
-                "SELECT stream_id, user_id, device_id, message_json"
-                " FROM device_inbox"
+                "SELECT stream_id, destination"
+                " FROM device_federation_outbox"
                 " WHERE ? < stream_id AND stream_id <= ?"
                 " ORDER BY stream_id ASC"
             )
-            txn.execute(sql, (last_pos, max_stream_id_in_limit))
-            return txn.fetchall()
+            txn.execute(sql, (last_pos, upper_pos))
+            rows.extend(txn.fetchall())
+
+            return rows
 
         return self.runInteraction(
             "get_all_new_device_messages", get_all_new_device_messages_txn
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 6576a30098..e46ae6502e 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 38
+SCHEMA_VERSION = 39
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 21d0696640..7460f98a1f 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -37,6 +37,13 @@ class UserPresenceState(namedtuple("UserPresenceState",
     status_msg (str): User set status message.
     """
 
+    def as_dict(self):
+        return dict(self._asdict())
+
+    @staticmethod
+    def from_dict(d):
+        return UserPresenceState(**d)
+
     def copy_and_replace(self, **kwargs):
         return self._replace(**kwargs)
 
diff --git a/synapse/storage/schema/delta/39/device_federation_stream_idx.sql b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql
new file mode 100644
index 0000000000..00be801e90
--- /dev/null
+++ b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql
@@ -0,0 +1,16 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE INDEX device_federation_outbox_id ON device_federation_outbox(stream_id);
diff --git a/synapse/storage/schema/delta/39/federation_out_position.sql b/synapse/storage/schema/delta/39/federation_out_position.sql
new file mode 100644
index 0000000000..edbd8e132f
--- /dev/null
+++ b/synapse/storage/schema/delta/39/federation_out_position.sql
@@ -0,0 +1,22 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ CREATE TABLE federation_stream_position(
+     type TEXT NOT NULL,
+     stream_id INTEGER NOT NULL
+ );
+
+ INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1);
+ INSERT INTO federation_stream_position (type, stream_id) VALUES ('events', -1);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 888b1cb35d..7fa63b58a7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -765,3 +765,50 @@ class StreamStore(SQLBaseStore):
                 "token": end_token,
             },
         }
+
+    @defer.inlineCallbacks
+    def get_all_new_events_stream(self, from_id, current_id, limit):
+        """Get all new events"""
+
+        def get_all_new_events_stream_txn(txn):
+            sql = (
+                "SELECT e.stream_ordering, e.event_id"
+                " FROM events AS e"
+                " WHERE"
+                " ? < e.stream_ordering AND e.stream_ordering <= ?"
+                " ORDER BY e.stream_ordering ASC"
+                " LIMIT ?"
+            )
+
+            txn.execute(sql, (from_id, current_id, limit))
+            rows = txn.fetchall()
+
+            upper_bound = current_id
+            if len(rows) == limit:
+                upper_bound = rows[-1][0]
+
+            return upper_bound, [row[1] for row in rows]
+
+        upper_bound, event_ids = yield self.runInteraction(
+            "get_all_new_events_stream", get_all_new_events_stream_txn,
+        )
+
+        events = yield self._get_events(event_ids)
+
+        defer.returnValue((upper_bound, events))
+
+    def get_federation_out_pos(self, typ):
+        return self._simple_select_one_onecol(
+            table="federation_stream_position",
+            retcol="stream_id",
+            keyvalues={"type": typ},
+            desc="get_federation_out_pos"
+        )
+
+    def update_federation_out_pos(self, typ, stream_id):
+        return self._simple_update_one(
+            table="federation_stream_position",
+            keyvalues={"type": typ},
+            updatevalues={"stream_id": stream_id},
+            desc="update_federation_out_pos",
+        )
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index adab520c78..809fdd311f 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -200,25 +200,48 @@ class TransactionStore(SQLBaseStore):
 
     def _set_destination_retry_timings(self, txn, destination,
                                        retry_last_ts, retry_interval):
-        txn.call_after(self.get_destination_retry_timings.invalidate, (destination,))
+        self.database_engine.lock_table(txn, "destinations")
 
-        self._simple_upsert_txn(
+        self._invalidate_cache_and_stream(
+            txn, self.get_destination_retry_timings, (destination,)
+        )
+
+        # We need to be careful here as the data may have changed from under us
+        # due to a worker setting the timings.
+
+        prev_row = self._simple_select_one_txn(
             txn,
-            "destinations",
+            table="destinations",
             keyvalues={
                 "destination": destination,
             },
-            values={
-                "retry_last_ts": retry_last_ts,
-                "retry_interval": retry_interval,
-            },
-            insertion_values={
-                "destination": destination,
-                "retry_last_ts": retry_last_ts,
-                "retry_interval": retry_interval,
-            }
+            retcols=("retry_last_ts", "retry_interval"),
+            allow_none=True,
         )
 
+        if not prev_row:
+            self._simple_insert_txn(
+                txn,
+                table="destinations",
+                values={
+                    "destination": destination,
+                    "retry_last_ts": retry_last_ts,
+                    "retry_interval": retry_interval,
+                }
+            )
+        elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
+            self._simple_update_one_txn(
+                txn,
+                "destinations",
+                keyvalues={
+                    "destination": destination,
+                },
+                updatevalues={
+                    "retry_last_ts": retry_last_ts,
+                    "retry_interval": retry_interval,
+                },
+            )
+
     def get_destinations_needing_retry(self):
         """Get all destinations which are due a retry for sending a transaction.
 
diff --git a/synapse/util/jsonobject.py b/synapse/util/jsonobject.py
index 3fd5c3d9fd..d668e5a6b8 100644
--- a/synapse/util/jsonobject.py
+++ b/synapse/util/jsonobject.py
@@ -76,15 +76,26 @@ class JsonEncodedObject(object):
         d.update(self.unrecognized_keys)
         return d
 
+    def get_internal_dict(self):
+        d = {
+            k: _encode(v, internal=True) for (k, v) in self.__dict__.items()
+            if k in self.valid_keys
+        }
+        d.update(self.unrecognized_keys)
+        return d
+
     def __str__(self):
         return "(%s, %s)" % (self.__class__.__name__, repr(self.__dict__))
 
 
-def _encode(obj):
+def _encode(obj, internal=False):
     if type(obj) is list:
-        return [_encode(o) for o in obj]
+        return [_encode(o, internal=internal) for o in obj]
 
     if isinstance(obj, JsonEncodedObject):
-        return obj.get_dict()
+        if internal:
+            return obj.get_internal_dict()
+        else:
+            return obj.get_dict()
 
     return obj
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 49527f4d21..46ef5a8ec7 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -121,12 +121,6 @@ class RetryDestinationLimiter(object):
         pass
 
     def __exit__(self, exc_type, exc_val, exc_tb):
-        def err(failure):
-            logger.exception(
-                "Failed to store set_destination_retry_timings",
-                failure.value
-            )
-
         valid_err_code = False
         if exc_type is not None and issubclass(exc_type, CodeMessageException):
             valid_err_code = 0 <= exc_val.code < 500
@@ -151,6 +145,15 @@ class RetryDestinationLimiter(object):
 
             retry_last_ts = int(self.clock.time_msec())
 
-        self.store.set_destination_retry_timings(
-            self.destination, retry_last_ts, self.retry_interval
-        ).addErrback(err)
+        @defer.inlineCallbacks
+        def store_retry_timings():
+            try:
+                yield self.store.set_destination_retry_timings(
+                    self.destination, retry_last_ts, self.retry_interval
+                )
+            except:
+                logger.exception(
+                    "Failed to store set_destination_retry_timings",
+                )
+
+        store_retry_timings()