summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/events/snapshot.py21
-rw-r--r--synapse/federation/federation_client.py6
-rw-r--r--synapse/federation/federation_server.py2
-rw-r--r--synapse/federation/transaction_queue.py65
-rw-r--r--synapse/handlers/devicemessage.py117
-rw-r--r--synapse/handlers/federation.py19
-rw-r--r--synapse/handlers/message.py44
-rw-r--r--synapse/handlers/presence.py33
-rw-r--r--synapse/handlers/typing.py9
-rw-r--r--synapse/replication/slave/storage/deviceinbox.py11
-rw-r--r--synapse/replication/slave/storage/events.py3
-rw-r--r--synapse/rest/client/v2_alpha/sendtodevice.py33
-rw-r--r--synapse/rest/media/v1/download_resource.py9
-rw-r--r--synapse/server.py5
-rw-r--r--synapse/state.py33
-rw-r--r--synapse/storage/__init__.py24
-rw-r--r--synapse/storage/deviceinbox.py220
-rw-r--r--synapse/storage/devices.py8
-rw-r--r--synapse/storage/events.py68
-rw-r--r--synapse/storage/prepare_database.py6
-rw-r--r--synapse/storage/schema/delta/34/device_outbox.sql36
-rw-r--r--synapse/storage/schema/delta/35/state.sql22
-rw-r--r--synapse/storage/schema/delta/35/state_dedupe.sql17
-rw-r--r--synapse/storage/state.py398
24 files changed, 1005 insertions, 204 deletions
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index e895b1c450..11605b34a3 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -15,9 +15,30 @@
 
 
 class EventContext(object):
+    __slots__ = [
+        "current_state_ids",
+        "prev_state_ids",
+        "state_group",
+        "rejected",
+        "push_actions",
+        "prev_group",
+        "delta_ids",
+        "prev_state_events",
+    ]
+
     def __init__(self):
+        # The current state including the current event
         self.current_state_ids = None
+        # The current state excluding the current event
         self.prev_state_ids = None
         self.state_group = None
+
         self.rejected = False
         self.push_actions = []
+
+        # A previously persisted state group and a delta between that
+        # and this state.
+        self.prev_group = None
+        self.delta_ids = None
+
+        self.prev_state_events = None
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 627acc6a4f..78719eed25 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -138,6 +138,12 @@ class FederationClient(FederationBase):
         return defer.succeed(None)
 
     @log_function
+    def send_device_messages(self, destination):
+        """Sends the device messages in the local database to the remote
+        destination"""
+        self._transaction_queue.enqueue_device_messages(destination)
+
+    @log_function
     def send_failure(self, failure, destination):
         self._transaction_queue.enqueue_failure(failure, destination)
         return defer.succeed(None)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 5621655098..3fa7b2315c 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -188,7 +188,7 @@ class FederationServer(FederationBase):
             except SynapseError as e:
                 logger.info("Failed to handle edu %r: %r", edu_type, e)
             except Exception as e:
-                logger.exception("Failed to handle edu %r", edu_type, e)
+                logger.exception("Failed to handle edu %r", edu_type)
         else:
             logger.warn("Received EDU of type %s with no handler", edu_type)
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index cb2ef0210c..633c79c352 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -17,7 +17,7 @@
 from twisted.internet import defer
 
 from .persistence import TransactionActions
-from .units import Transaction
+from .units import Transaction, Edu
 
 from synapse.api.errors import HttpResponseException
 from synapse.util.async import run_on_reactor
@@ -81,6 +81,8 @@ class TransactionQueue(object):
         # destination -> list of tuple(failure, deferred)
         self.pending_failures_by_dest = {}
 
+        self.last_device_stream_id_by_dest = {}
+
         # HACK to get unique tx id
         self._next_txn_id = int(self.clock.time_msec())
 
@@ -155,6 +157,17 @@ class TransactionQueue(object):
             self._attempt_new_transaction, destination
         )
 
+    def enqueue_device_messages(self, destination):
+        if destination == self.server_name or destination == "localhost":
+            return
+
+        if not self.can_send_to(destination):
+            return
+
+        preserve_context_over_fn(
+            self._attempt_new_transaction, destination
+        )
+
     @defer.inlineCallbacks
     def _attempt_new_transaction(self, destination):
         yield run_on_reactor()
@@ -175,6 +188,12 @@ class TransactionQueue(object):
             pending_edus = self.pending_edus_by_dest.pop(destination, [])
             pending_failures = self.pending_failures_by_dest.pop(destination, [])
 
+            device_message_edus, device_stream_id = (
+                yield self._get_new_device_messages(destination)
+            )
+
+            pending_edus.extend(device_message_edus)
+
             if pending_pdus:
                 logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
                              destination, len(pending_pdus))
@@ -184,13 +203,34 @@ class TransactionQueue(object):
                 return
 
             yield self._send_new_transaction(
-                destination, pending_pdus, pending_edus, pending_failures
+                destination, pending_pdus, pending_edus, pending_failures,
+                device_stream_id,
+                should_delete_from_device_stream=bool(device_message_edus)
             )
 
+    @defer.inlineCallbacks
+    def _get_new_device_messages(self, destination):
+        last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0)
+        to_device_stream_id = self.store.get_to_device_stream_token()
+        contents, stream_id = yield self.store.get_new_device_msgs_for_remote(
+            destination, last_device_stream_id, to_device_stream_id
+        )
+        edus = [
+            Edu(
+                origin=self.server_name,
+                destination=destination,
+                edu_type="m.direct_to_device",
+                content=content,
+            )
+            for content in contents
+        ]
+        defer.returnValue((edus, stream_id))
+
     @measure_func("_send_new_transaction")
     @defer.inlineCallbacks
     def _send_new_transaction(self, destination, pending_pdus, pending_edus,
-                              pending_failures):
+                              pending_failures, device_stream_id,
+                              should_delete_from_device_stream):
 
             # Sort based on the order field
             pending_pdus.sort(key=lambda t: t[1])
@@ -215,9 +255,9 @@ class TransactionQueue(object):
                     "TX [%s] {%s} Attempting new transaction"
                     " (pdus: %d, edus: %d, failures: %d)",
                     destination, txn_id,
-                    len(pending_pdus),
-                    len(pending_edus),
-                    len(pending_failures)
+                    len(pdus),
+                    len(edus),
+                    len(failures)
                 )
 
                 logger.debug("TX [%s] Persisting transaction...", destination)
@@ -242,9 +282,9 @@ class TransactionQueue(object):
                     " (PDUs: %d, EDUs: %d, failures: %d)",
                     destination, txn_id,
                     transaction.transaction_id,
-                    len(pending_pdus),
-                    len(pending_edus),
-                    len(pending_failures),
+                    len(pdus),
+                    len(edus),
+                    len(failures),
                 )
 
                 with limiter:
@@ -299,6 +339,13 @@ class TransactionQueue(object):
                         logger.info(
                             "Failed to send event %s to %s", p.event_id, destination
                         )
+                else:
+                    # Remove the acknowledged device messages from the database
+                    if should_delete_from_device_stream:
+                        yield self.store.delete_device_msgs_for_remote(
+                            destination, device_stream_id
+                        )
+                    self.last_device_stream_id_by_dest[destination] = device_stream_id
             except NotRetryingDestination:
                 logger.info(
                     "TX [%s] not ready for retry yet - "
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
new file mode 100644
index 0000000000..c5368e5df2
--- /dev/null
+++ b/synapse/handlers/devicemessage.py
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.types import get_domain_from_id
+from synapse.util.stringutils import random_string
+
+
+logger = logging.getLogger(__name__)
+
+
+class DeviceMessageHandler(object):
+
+    def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer): server
+        """
+        self.store = hs.get_datastore()
+        self.notifier = hs.get_notifier()
+        self.is_mine_id = hs.is_mine_id
+        self.federation = hs.get_replication_layer()
+
+        self.federation.register_edu_handler(
+            "m.direct_to_device", self.on_direct_to_device_edu
+        )
+
+    @defer.inlineCallbacks
+    def on_direct_to_device_edu(self, origin, content):
+        local_messages = {}
+        sender_user_id = content["sender"]
+        if origin != get_domain_from_id(sender_user_id):
+            logger.warn(
+                "Dropping device message from %r with spoofed sender %r",
+                origin, sender_user_id
+            )
+        message_type = content["type"]
+        message_id = content["message_id"]
+        for user_id, by_device in content["messages"].items():
+            messages_by_device = {
+                device_id: {
+                    "content": message_content,
+                    "type": message_type,
+                    "sender": sender_user_id,
+                }
+                for device_id, message_content in by_device.items()
+            }
+            if messages_by_device:
+                local_messages[user_id] = messages_by_device
+
+        stream_id = yield self.store.add_messages_from_remote_to_device_inbox(
+            origin, message_id, local_messages
+        )
+
+        self.notifier.on_new_event(
+            "to_device_key", stream_id, users=local_messages.keys()
+        )
+
+    @defer.inlineCallbacks
+    def send_device_message(self, sender_user_id, message_type, messages):
+
+        local_messages = {}
+        remote_messages = {}
+        for user_id, by_device in messages.items():
+            if self.is_mine_id(user_id):
+                messages_by_device = {
+                    device_id: {
+                        "content": message_content,
+                        "type": message_type,
+                        "sender": sender_user_id,
+                    }
+                    for device_id, message_content in by_device.items()
+                }
+                if messages_by_device:
+                    local_messages[user_id] = messages_by_device
+            else:
+                destination = get_domain_from_id(user_id)
+                remote_messages.setdefault(destination, {})[user_id] = by_device
+
+        message_id = random_string(16)
+
+        remote_edu_contents = {}
+        for destination, messages in remote_messages.items():
+            remote_edu_contents[destination] = {
+                "messages": messages,
+                "sender": sender_user_id,
+                "type": message_type,
+                "message_id": message_id,
+            }
+
+        stream_id = yield self.store.add_messages_to_device_inbox(
+            local_messages, remote_edu_contents
+        )
+
+        self.notifier.on_new_event(
+            "to_device_key", stream_id, users=local_messages.keys()
+        )
+
+        for destination in remote_messages.keys():
+            # Enqueue a new federation transaction to send the new
+            # device messages to each remote destination.
+            self.federation.send_device_messages(destination)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index dc90a5dde4..8a1038c44a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -832,11 +832,13 @@ class FederationHandler(BaseHandler):
 
         new_pdu = event
 
-        message_handler = self.hs.get_handlers().message_handler
-        destinations = yield message_handler.get_joined_hosts_for_room_from_state(
-            context
+        users_in_room = yield self.store.get_joined_users_from_context(event, context)
+
+        destinations = set(
+            get_domain_from_id(user_id) for user_id in users_in_room
+            if not self.hs.is_mine_id(user_id)
         )
-        destinations = set(destinations)
+
         destinations.discard(origin)
 
         logger.debug(
@@ -1055,11 +1057,12 @@ class FederationHandler(BaseHandler):
 
         new_pdu = event
 
-        message_handler = self.hs.get_handlers().message_handler
-        destinations = yield message_handler.get_joined_hosts_for_room_from_state(
-            context
+        users_in_room = yield self.store.get_joined_users_from_context(event, context)
+
+        destinations = set(
+            get_domain_from_id(user_id) for user_id in users_in_room
+            if not self.hs.is_mine_id(user_id)
         )
-        destinations = set(destinations)
         destinations.discard(origin)
 
         logger.debug(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3577db0595..178209a209 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -30,7 +30,6 @@ from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLo
 from synapse.util.caches.snapshot_cache import SnapshotCache
 from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
 from synapse.util.metrics import measure_func
-from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
@@ -945,7 +944,12 @@ class MessageHandler(BaseHandler):
             event_stream_id, max_stream_id
         )
 
-        destinations = yield self.get_joined_hosts_for_room_from_state(context)
+        users_in_room = yield self.store.get_joined_users_from_context(event, context)
+
+        destinations = [
+            get_domain_from_id(user_id) for user_id in users_in_room
+            if not self.hs.is_mine_id(user_id)
+        ]
 
         @defer.inlineCallbacks
         def _notify():
@@ -963,39 +967,3 @@ class MessageHandler(BaseHandler):
         preserve_fn(federation_handler.handle_new_event)(
             event, destinations=destinations,
         )
-
-    def get_joined_hosts_for_room_from_state(self, context):
-        state_group = context.state_group
-        if not state_group:
-            # If state_group is None it means it has yet to be assigned a
-            # state group, i.e. we need to make sure that calls with a state_group
-            # of None don't hit previous cached calls with a None state_group.
-            # To do this we set the state_group to a new object as object() != object()
-            state_group = object()
-
-        return self._get_joined_hosts_for_room_from_state(
-            state_group, context.current_state_ids
-        )
-
-    @cachedInlineCallbacks(num_args=1, cache_context=True)
-    def _get_joined_hosts_for_room_from_state(self, state_group, current_state_ids,
-                                              cache_context):
-
-        # Don't bother getting state for people on the same HS
-        current_state = yield self.store.get_events([
-            e_id for key, e_id in current_state_ids.items()
-            if key[0] == EventTypes.Member and not self.hs.is_mine_id(key[1])
-        ])
-
-        destinations = set()
-        for e in current_state.itervalues():
-            try:
-                if e.type == EventTypes.Member:
-                    if e.content["membership"] == Membership.JOIN:
-                        destinations.add(get_domain_from_id(e.state_key))
-            except SynapseError:
-                logger.warn(
-                    "Failed to get destination from event %s", e.event_id
-                )
-
-        defer.returnValue(destinations)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index cf82a2336e..7a3c16a8aa 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -52,6 +52,11 @@ bump_active_time_counter = metrics.register_counter("bump_active_time")
 
 get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
 
+notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"])
+state_transition_counter = metrics.register_counter(
+    "state_transition", labels=["from", "to"]
+)
+
 
 # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
 # "currently_active"
@@ -646,6 +651,13 @@ class PresenceHandler(object):
                 )
                 continue
 
+            if get_domain_from_id(user_id) != origin:
+                logger.info(
+                    "Got presence update from %r with bad 'user_id': %r",
+                    origin, user_id,
+                )
+                continue
+
             presence_state = push.get("presence", None)
             if not presence_state:
                 logger.info(
@@ -939,27 +951,32 @@ class PresenceHandler(object):
 def should_notify(old_state, new_state):
     """Decides if a presence state change should be sent to interested parties.
     """
+    if old_state == new_state:
+        return False
+
     if old_state.status_msg != new_state.status_msg:
+        notify_reason_counter.inc("status_msg_change")
         return True
 
-    if old_state.state == PresenceState.ONLINE:
-        if new_state.state != PresenceState.ONLINE:
-            # Always notify for online -> anything
-            return True
+    if old_state.state != new_state.state:
+        notify_reason_counter.inc("state_change")
+        state_transition_counter.inc(old_state.state, new_state.state)
+        return True
 
+    if old_state.state == PresenceState.ONLINE:
         if new_state.currently_active != old_state.currently_active:
+            notify_reason_counter.inc("current_active_change")
             return True
 
         if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
             # Only notify about last active bumps if we're not currently acive
-            if not (old_state.currently_active and new_state.currently_active):
+            if not new_state.currently_active:
+                notify_reason_counter.inc("last_active_change_online")
                 return True
 
     elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
         # Always notify for a transition where last active gets bumped.
-        return True
-
-    if old_state.state != new_state.state:
+        notify_reason_counter.inc("last_active_change_not_online")
         return True
 
     return False
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 0b530b9034..3b687957dd 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -199,7 +199,14 @@ class TypingHandler(object):
         user_id = content["user_id"]
 
         # Check that the string is a valid user id
-        UserID.from_string(user_id)
+        user = UserID.from_string(user_id)
+
+        if user.domain != origin:
+            logger.info(
+                "Got typing update from %r with bad 'user_id': %r",
+                origin, user_id,
+            )
+            return
 
         users = yield self.state.get_current_user_in_room(room_id)
         domains = set(get_domain_from_id(u) for u in users)
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 64d8eb2af1..251078ba57 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -16,6 +16,7 @@
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 from synapse.storage import DataStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 
 class SlavedDeviceInboxStore(BaseSlavedStore):
@@ -24,6 +25,10 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
         self._device_inbox_id_gen = SlavedIdTracker(
             db_conn, "device_inbox", "stream_id",
         )
+        self._device_inbox_stream_cache = StreamChangeCache(
+            "DeviceInboxStreamChangeCache",
+            self._device_inbox_id_gen.get_current_token()
+        )
 
     get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
     get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
@@ -38,5 +43,11 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
         stream = result.get("to_device")
         if stream:
             self._device_inbox_id_gen.advance(int(stream["position"]))
+            for row in stream["rows"]:
+                stream_id = row[0]
+                user_id = row[1]
+                self._device_inbox_stream_cache.entity_has_changed(
+                    user_id, stream_id
+                )
 
         return super(SlavedDeviceInboxStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index cbebd5b2f7..15c52774a2 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -86,6 +86,9 @@ class SlavedEventStore(BaseSlavedStore):
     _get_state_groups_from_groups = (
         StateStore.__dict__["_get_state_groups_from_groups"]
     )
+    _get_state_groups_from_groups_txn = (
+        DataStore._get_state_groups_from_groups_txn.__func__
+    )
     _get_state_group_from_group = (
         StateStore.__dict__["_get_state_group_from_group"]
     )
diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py
index 9c10a99acf..5975164b37 100644
--- a/synapse/rest/client/v2_alpha/sendtodevice.py
+++ b/synapse/rest/client/v2_alpha/sendtodevice.py
@@ -16,10 +16,11 @@
 import logging
 
 from twisted.internet import defer
-from synapse.http.servlet import parse_json_object_from_request
 
 from synapse.http import servlet
+from synapse.http.servlet import parse_json_object_from_request
 from synapse.rest.client.v1.transactions import HttpTransactionStore
+
 from ._base import client_v2_patterns
 
 logger = logging.getLogger(__name__)
@@ -39,10 +40,8 @@ class SendToDeviceRestServlet(servlet.RestServlet):
         super(SendToDeviceRestServlet, self).__init__()
         self.hs = hs
         self.auth = hs.get_auth()
-        self.store = hs.get_datastore()
-        self.notifier = hs.get_notifier()
-        self.is_mine_id = hs.is_mine_id
         self.txns = HttpTransactionStore()
+        self.device_message_handler = hs.get_device_message_handler()
 
     @defer.inlineCallbacks
     def on_PUT(self, request, message_type, txn_id):
@@ -57,28 +56,10 @@ class SendToDeviceRestServlet(servlet.RestServlet):
 
         content = parse_json_object_from_request(request)
 
-        # TODO: Prod the notifier to wake up sync streams.
-        # TODO: Implement replication for the messages.
-        # TODO: Send the messages to remote servers if needed.
-
-        local_messages = {}
-        for user_id, by_device in content["messages"].items():
-            if self.is_mine_id(user_id):
-                messages_by_device = {
-                    device_id: {
-                        "content": message_content,
-                        "type": message_type,
-                        "sender": requester.user.to_string(),
-                    }
-                    for device_id, message_content in by_device.items()
-                }
-                if messages_by_device:
-                    local_messages[user_id] = messages_by_device
-
-        stream_id = yield self.store.add_messages_to_device_inbox(local_messages)
-
-        self.notifier.on_new_event(
-            "to_device_key", stream_id, users=local_messages.keys()
+        sender_user_id = requester.user.to_string()
+
+        yield self.device_message_handler.send_device_message(
+            sender_user_id, message_type, content["messages"]
         )
 
         response = (200, {})
diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py
index 9f0625a822..a45ee9483e 100644
--- a/synapse/rest/media/v1/download_resource.py
+++ b/synapse/rest/media/v1/download_resource.py
@@ -45,7 +45,14 @@ class DownloadResource(Resource):
     @request_handler()
     @defer.inlineCallbacks
     def _async_render_GET(self, request):
-        request.setHeader("Content-Security-Policy", "sandbox")
+        request.setHeader(
+            "Content-Security-Policy",
+            "default-src 'none';"
+            " script-src 'none';"
+            " plugin-types application/pdf;"
+            " style-src 'unsafe-inline';"
+            " object-src 'self';"
+        )
         server_name, media_id, name = parse_media_id(request)
         if server_name == self.server_name:
             yield self._respond_local_file(request, media_id, name)
diff --git a/synapse/server.py b/synapse/server.py
index af3246504b..f516f08167 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -35,6 +35,7 @@ from synapse.federation import initialize_http_replication
 from synapse.handlers import Handlers
 from synapse.handlers.appservice import ApplicationServicesHandler
 from synapse.handlers.auth import AuthHandler
+from synapse.handlers.devicemessage import DeviceMessageHandler
 from synapse.handlers.device import DeviceHandler
 from synapse.handlers.e2e_keys import E2eKeysHandler
 from synapse.handlers.presence import PresenceHandler
@@ -100,6 +101,7 @@ class HomeServer(object):
         'application_service_api',
         'application_service_scheduler',
         'application_service_handler',
+        'device_message_handler',
         'notifier',
         'distributor',
         'client_resource',
@@ -205,6 +207,9 @@ class HomeServer(object):
     def build_device_handler(self):
         return DeviceHandler(self)
 
+    def build_device_message_handler(self):
+        return DeviceMessageHandler(self)
+
     def build_e2e_keys_handler(self):
         return E2eKeysHandler(self)
 
diff --git a/synapse/state.py b/synapse/state.py
index cd792afed1..4520fa0415 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -55,12 +55,15 @@ def _gen_state_id():
 
 
 class _StateCacheEntry(object):
-    __slots__ = ["state", "state_group", "state_id"]
+    __slots__ = ["state", "state_group", "state_id", "prev_group", "delta_ids"]
 
-    def __init__(self, state, state_group):
+    def __init__(self, state, state_group, prev_group=None, delta_ids=None):
         self.state = state
         self.state_group = state_group
 
+        self.prev_group = prev_group
+        self.delta_ids = delta_ids
+
         # The `state_id` is a unique ID we generate that can be used as ID for
         # this collection of state. Usually this would be the same as the
         # state group, but on worker instances we can't generate a new state
@@ -245,11 +248,20 @@ class StateHandler(object):
             if key in context.prev_state_ids:
                 replaces = context.prev_state_ids[key]
                 event.unsigned["replaces_state"] = replaces
+
             context.current_state_ids = dict(context.prev_state_ids)
             context.current_state_ids[key] = event.event_id
+
+            context.prev_group = entry.prev_group
+            context.delta_ids = entry.delta_ids
+            if context.delta_ids is not None:
+                context.delta_ids[key] = event.event_id
         else:
             context.current_state_ids = context.prev_state_ids
 
+            context.prev_group = entry.prev_group
+            context.delta_ids = entry.delta_ids
+
         context.prev_state_events = []
         defer.returnValue(context)
 
@@ -283,6 +295,8 @@ class StateHandler(object):
             defer.returnValue(_StateCacheEntry(
                 state=state_list,
                 state_group=name,
+                prev_group=name,
+                delta_ids={},
             ))
 
         with (yield self.resolve_linearizer.queue(group_names)):
@@ -340,9 +354,24 @@ class StateHandler(object):
                 if hasattr(self.store, "get_next_state_group"):
                     state_group = self.store.get_next_state_group()
 
+            prev_group = None
+            delta_ids = None
+            for old_group, old_ids in state_groups_ids.items():
+                if not set(new_state.iterkeys()) - set(old_ids.iterkeys()):
+                    n_delta_ids = {
+                        k: v
+                        for k, v in new_state.items()
+                        if old_ids.get(k) != v
+                    }
+                    if not delta_ids or len(n_delta_ids) < len(delta_ids):
+                        prev_group = old_group
+                        delta_ids = n_delta_ids
+
             cache = _StateCacheEntry(
                 state=new_state,
                 state_group=state_group,
+                prev_group=prev_group,
+                delta_ids=delta_ids,
             )
 
             if self._state_cache is not None:
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 6c32773f25..828e5ca60b 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -182,6 +182,30 @@ class DataStore(RoomMemberStore, RoomStore,
             prefilled_cache=push_rules_prefill,
         )
 
+        max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
+        device_inbox_prefill, min_device_inbox_id = self._get_cache_dict(
+            db_conn, "device_inbox",
+            entity_column="user_id",
+            stream_column="stream_id",
+            max_value=max_device_inbox_id
+        )
+        self._device_inbox_stream_cache = StreamChangeCache(
+            "DeviceInboxStreamChangeCache", min_device_inbox_id,
+            prefilled_cache=device_inbox_prefill,
+        )
+        # The federation outbox and the local device inbox uses the same
+        # stream_id generator.
+        device_outbox_prefill, min_device_outbox_id = self._get_cache_dict(
+            db_conn, "device_federation_outbox",
+            entity_column="destination",
+            stream_column="stream_id",
+            max_value=max_device_inbox_id,
+        )
+        self._device_federation_outbox_stream_cache = StreamChangeCache(
+            "DeviceFederationOutboxStreamChangeCache", min_device_outbox_id,
+            prefilled_cache=device_outbox_prefill,
+        )
+
         cur = LoggingTransaction(
             db_conn.cursor(),
             name="_find_stream_orderings_for_times_txn",
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 68116b0394..0d37bb961b 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -27,63 +27,139 @@ logger = logging.getLogger(__name__)
 class DeviceInboxStore(SQLBaseStore):
 
     @defer.inlineCallbacks
-    def add_messages_to_device_inbox(self, messages_by_user_then_device):
-        """
+    def add_messages_to_device_inbox(self, local_messages_by_user_then_device,
+                                     remote_messages_by_destination):
+        """Used to send messages from this server.
+
         Args:
-            messages_by_user_and_device(dict):
+            sender_user_id(str): The ID of the user sending these messages.
+            local_messages_by_user_and_device(dict):
                 Dictionary of user_id to device_id to message.
+            remote_messages_by_destination(dict):
+                Dictionary of destination server_name to the EDU JSON to send.
         Returns:
             A deferred stream_id that resolves when the messages have been
             inserted.
         """
 
-        def select_devices_txn(txn, user_id, devices):
-            if not devices:
-                return []
-            sql = (
-                "SELECT user_id, device_id FROM devices"
-                " WHERE user_id = ? AND device_id IN ("
-                + ",".join("?" * len(devices))
-                + ")"
+        def add_messages_txn(txn, now_ms, stream_id):
+            # Add the local messages directly to the local inbox.
+            self._add_messages_to_local_device_inbox_txn(
+                txn, stream_id, local_messages_by_user_then_device
             )
-            # TODO: Maybe this needs to be done in batches if there are
-            # too many local devices for a given user.
-            args = [user_id] + devices
-            txn.execute(sql, args)
-            return [tuple(row) for row in txn.fetchall()]
-
-        def add_messages_to_device_inbox_txn(txn, stream_id):
-            local_users_and_devices = set()
-            for user_id, messages_by_device in messages_by_user_then_device.items():
-                local_users_and_devices.update(
-                    select_devices_txn(txn, user_id, messages_by_device.keys())
-                )
 
+            # Add the remote messages to the federation outbox.
+            # We'll send them to a remote server when we next send a
+            # federation transaction to that destination.
             sql = (
-                "INSERT INTO device_inbox"
-                " (user_id, device_id, stream_id, message_json)"
+                "INSERT INTO device_federation_outbox"
+                " (destination, stream_id, queued_ts, messages_json)"
                 " VALUES (?,?,?,?)"
             )
             rows = []
-            for user_id, messages_by_device in messages_by_user_then_device.items():
-                for device_id, message in messages_by_device.items():
-                    message_json = ujson.dumps(message)
-                    # Only insert into the local inbox if the device exists on
-                    # this server
-                    if (user_id, device_id) in local_users_and_devices:
-                        rows.append((user_id, device_id, stream_id, message_json))
-
+            for destination, edu in remote_messages_by_destination.items():
+                edu_json = ujson.dumps(edu)
+                rows.append((destination, stream_id, now_ms, edu_json))
             txn.executemany(sql, rows)
 
         with self._device_inbox_id_gen.get_next() as stream_id:
+            now_ms = self.clock.time_msec()
             yield self.runInteraction(
                 "add_messages_to_device_inbox",
-                add_messages_to_device_inbox_txn,
-                stream_id
+                add_messages_txn,
+                now_ms,
+                stream_id,
             )
+            for user_id in local_messages_by_user_then_device.keys():
+                self._device_inbox_stream_cache.entity_has_changed(
+                    user_id, stream_id
+                )
+            for destination in remote_messages_by_destination.keys():
+                self._device_federation_outbox_stream_cache.entity_has_changed(
+                    destination, stream_id
+                )
 
         defer.returnValue(self._device_inbox_id_gen.get_current_token())
 
+    @defer.inlineCallbacks
+    def add_messages_from_remote_to_device_inbox(
+        self, origin, message_id, local_messages_by_user_then_device
+    ):
+        def add_messages_txn(txn, now_ms, stream_id):
+            # Check if we've already inserted a matching message_id for that
+            # origin. This can happen if the origin doesn't receive our
+            # acknowledgement from the first time we received the message.
+            already_inserted = self._simple_select_one_txn(
+                txn, table="device_federation_inbox",
+                keyvalues={"origin": origin, "message_id": message_id},
+                retcols=("message_id",),
+                allow_none=True,
+            )
+            if already_inserted is not None:
+                return
+
+            # Add an entry for this message_id so that we know we've processed
+            # it.
+            self._simple_insert_txn(
+                txn, table="device_federation_inbox",
+                values={
+                    "origin": origin,
+                    "message_id": message_id,
+                    "received_ts": now_ms,
+                },
+            )
+
+            # Add the messages to the approriate local device inboxes so that
+            # they'll be sent to the devices when they next sync.
+            self._add_messages_to_local_device_inbox_txn(
+                txn, stream_id, local_messages_by_user_then_device
+            )
+
+        with self._device_inbox_id_gen.get_next() as stream_id:
+            now_ms = self.clock.time_msec()
+            yield self.runInteraction(
+                "add_messages_from_remote_to_device_inbox",
+                add_messages_txn,
+                now_ms,
+                stream_id,
+            )
+            for user_id in local_messages_by_user_then_device.keys():
+                self._device_inbox_stream_cache.entity_has_changed(
+                    user_id, stream_id
+                )
+
+    def _add_messages_to_local_device_inbox_txn(self, txn, stream_id,
+                                                messages_by_user_then_device):
+        local_users_and_devices = set()
+        for user_id, messages_by_device in messages_by_user_then_device.items():
+            devices = messages_by_device.keys()
+            sql = (
+                "SELECT user_id, device_id FROM devices"
+                " WHERE user_id = ? AND device_id IN ("
+                + ",".join("?" * len(devices))
+                + ")"
+            )
+            # TODO: Maybe this needs to be done in batches if there are
+            # too many local devices for a given user.
+            txn.execute(sql, [user_id] + devices)
+            local_users_and_devices.update(map(tuple, txn.fetchall()))
+
+        sql = (
+            "INSERT INTO device_inbox"
+            " (user_id, device_id, stream_id, message_json)"
+            " VALUES (?,?,?,?)"
+        )
+        rows = []
+        for user_id, messages_by_device in messages_by_user_then_device.items():
+            for device_id, message in messages_by_device.items():
+                message_json = ujson.dumps(message)
+                # Only insert into the local inbox if the device exists on
+                # this server
+                if (user_id, device_id) in local_users_and_devices:
+                    rows.append((user_id, device_id, stream_id, message_json))
+
+        txn.executemany(sql, rows)
+
     def get_new_messages_for_device(
         self, user_id, device_id, last_stream_id, current_stream_id, limit=100
     ):
@@ -97,6 +173,12 @@ class DeviceInboxStore(SQLBaseStore):
             Deferred ([dict], int): List of messages for the device and where
                 in the stream the messages got to.
         """
+        has_changed = self._device_inbox_stream_cache.has_entity_changed(
+            user_id, last_stream_id
+        )
+        if not has_changed:
+            return defer.succeed(([], current_stream_id))
+
         def get_new_messages_for_device_txn(txn):
             sql = (
                 "SELECT stream_id, message_json FROM device_inbox"
@@ -182,3 +264,71 @@ class DeviceInboxStore(SQLBaseStore):
 
     def get_to_device_stream_token(self):
         return self._device_inbox_id_gen.get_current_token()
+
+    def get_new_device_msgs_for_remote(
+        self, destination, last_stream_id, current_stream_id, limit=100
+    ):
+        """
+        Args:
+            destination(str): The name of the remote server.
+            last_stream_id(int): The last position of the device message stream
+                that the server sent up to.
+            current_stream_id(int): The current position of the device
+                message stream.
+        Returns:
+            Deferred ([dict], int): List of messages for the device and where
+                in the stream the messages got to.
+        """
+
+        has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
+            destination, last_stream_id
+        )
+        if not has_changed:
+            return defer.succeed(([], current_stream_id))
+
+        def get_new_messages_for_remote_destination_txn(txn):
+            sql = (
+                "SELECT stream_id, messages_json FROM device_federation_outbox"
+                " WHERE destination = ?"
+                " AND ? < stream_id AND stream_id <= ?"
+                " ORDER BY stream_id ASC"
+                " LIMIT ?"
+            )
+            txn.execute(sql, (
+                destination, last_stream_id, current_stream_id, limit
+            ))
+            messages = []
+            for row in txn.fetchall():
+                stream_pos = row[0]
+                messages.append(ujson.loads(row[1]))
+            if len(messages) < limit:
+                stream_pos = current_stream_id
+            return (messages, stream_pos)
+
+        return self.runInteraction(
+            "get_new_device_msgs_for_remote",
+            get_new_messages_for_remote_destination_txn,
+        )
+
+    def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
+        """Used to delete messages when the remote destination acknowledges
+        their receipt.
+
+        Args:
+            destination(str): The destination server_name
+            up_to_stream_id(int): Where to delete messages up to.
+        Returns:
+            A deferred that resolves when the messages have been deleted.
+        """
+        def delete_messages_for_remote_destination_txn(txn):
+            sql = (
+                "DELETE FROM device_federation_outbox"
+                " WHERE destination = ?"
+                " AND stream_id <= ?"
+            )
+            txn.execute(sql, (destination, up_to_stream_id))
+
+        return self.runInteraction(
+            "delete_device_msgs_for_remote",
+            delete_messages_for_remote_destination_txn
+        )
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index afd6530cab..17920d4480 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -54,8 +54,12 @@ class DeviceStore(SQLBaseStore):
                 or_ignore=ignore_if_known,
             )
         except Exception as e:
-            logger.error("store_device with device_id=%s failed: %s",
-                         device_id, e)
+            logger.error("store_device with device_id=%s(%r) user_id=%s(%r)"
+                         " display_name=%s(%r) failed: %s",
+                         type(device_id).__name__, device_id,
+                         type(user_id).__name__, user_id,
+                         type(initial_device_display_name).__name__,
+                         initial_device_display_name, e)
             raise StoreError(500, "Problem storing device.")
 
     def get_device(self, user_id, device_id):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 1a7d4c5199..ed182c8d11 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -497,7 +497,11 @@ class EventsStore(SQLBaseStore):
 
                 # insert into the state_group, state_groups_state and
                 # event_to_state_groups tables.
-                self._store_mult_state_groups_txn(txn, ((event, context),))
+                try:
+                    self._store_mult_state_groups_txn(txn, ((event, context),))
+                except Exception:
+                    logger.exception("")
+                    raise
 
                 metadata_json = encode_json(
                     event.internal_metadata.get_dict()
@@ -1543,6 +1547,9 @@ class EventsStore(SQLBaseStore):
         )
         event_rows = txn.fetchall()
 
+        for event_id, state_key in event_rows:
+            txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
+
         # We calculate the new entries for the backward extremeties by finding
         # all events that point to events that are to be purged
         txn.execute(
@@ -1582,7 +1589,66 @@ class EventsStore(SQLBaseStore):
             " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
             (room_id, topological_ordering, topological_ordering)
         )
+
         state_rows = txn.fetchall()
+        state_groups_to_delete = [sg for sg, in state_rows]
+
+        # Now we get all the state groups that rely on these state groups
+        new_state_edges = []
+        chunks = [
+            state_groups_to_delete[i:i + 100]
+            for i in xrange(0, len(state_groups_to_delete), 100)
+        ]
+        for chunk in chunks:
+            rows = self._simple_select_many_txn(
+                txn,
+                table="state_group_edges",
+                column="prev_state_group",
+                iterable=chunk,
+                retcols=["state_group"],
+                keyvalues={},
+            )
+            new_state_edges.extend(row["state_group"] for row in rows)
+
+        # Now we turn the state groups that reference to-be-deleted state groups
+        # to non delta versions.
+        for new_state_edge in new_state_edges:
+            curr_state = self._get_state_groups_from_groups_txn(
+                txn, [new_state_edge], types=None
+            )
+            curr_state = curr_state[new_state_edge]
+
+            self._simple_delete_txn(
+                txn,
+                table="state_groups_state",
+                keyvalues={
+                    "state_group": new_state_edge,
+                }
+            )
+
+            self._simple_delete_txn(
+                txn,
+                table="state_group_edges",
+                keyvalues={
+                    "state_group": new_state_edge,
+                }
+            )
+
+            self._simple_insert_many_txn(
+                txn,
+                table="state_groups_state",
+                values=[
+                    {
+                        "state_group": new_state_edge,
+                        "room_id": room_id,
+                        "type": key[0],
+                        "state_key": key[1],
+                        "event_id": state_id,
+                    }
+                    for key, state_id in curr_state.items()
+                ],
+            )
+
         txn.executemany(
             "DELETE FROM state_groups_state WHERE state_group = ?",
             state_rows
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index b94ce7bea1..7efbe51cda 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 34
+SCHEMA_VERSION = 35
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
@@ -242,7 +242,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
                     module = imp.load_source(
                         module_name, absolute_path, python_file
                     )
-                logger.debug("Running script %s", relative_path)
+                logger.info("Running script %s", relative_path)
                 module.run_create(cur, database_engine)
                 if not is_empty:
                     module.run_upgrade(cur, database_engine, config=config)
@@ -253,7 +253,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
                 pass
             elif ext == ".sql":
                 # A plain old .sql file, just read and execute it
-                logger.debug("Applying schema %s", relative_path)
+                logger.info("Applying schema %s", relative_path)
                 executescript(cur, absolute_path)
             else:
                 # Not a valid delta file.
diff --git a/synapse/storage/schema/delta/34/device_outbox.sql b/synapse/storage/schema/delta/34/device_outbox.sql
new file mode 100644
index 0000000000..e87066d9a1
--- /dev/null
+++ b/synapse/storage/schema/delta/34/device_outbox.sql
@@ -0,0 +1,36 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE device_federation_outbox (
+    destination TEXT NOT NULL,
+    stream_id BIGINT NOT NULL,
+    queued_ts BIGINT NOT NULL,
+    messages_json TEXT NOT NULL
+);
+
+
+CREATE INDEX device_federation_outbox_destination_id
+    ON device_federation_outbox(destination, stream_id);
+
+
+CREATE TABLE device_federation_inbox (
+    origin TEXT NOT NULL,
+    message_id TEXT NOT NULL,
+    received_ts BIGINT NOT NULL
+);
+
+
+CREATE INDEX device_federation_inbox_sender_id
+    ON device_federation_inbox(origin, message_id);
diff --git a/synapse/storage/schema/delta/35/state.sql b/synapse/storage/schema/delta/35/state.sql
new file mode 100644
index 0000000000..0f1fa68a89
--- /dev/null
+++ b/synapse/storage/schema/delta/35/state.sql
@@ -0,0 +1,22 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE state_group_edges(
+    state_group BIGINT NOT NULL,
+    prev_state_group BIGINT NOT NULL
+);
+
+CREATE INDEX state_group_edges_idx ON state_group_edges(state_group);
+CREATE INDEX state_group_edges_prev_idx ON state_group_edges(prev_state_group);
diff --git a/synapse/storage/schema/delta/35/state_dedupe.sql b/synapse/storage/schema/delta/35/state_dedupe.sql
new file mode 100644
index 0000000000..97e5067ef4
--- /dev/null
+++ b/synapse/storage/schema/delta/35/state_dedupe.sql
@@ -0,0 +1,17 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('state_group_state_deduplication', '{}');
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index ec551b0b4f..fef87834ca 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -16,6 +16,7 @@
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches import intern_string
+from synapse.storage.engines import PostgresEngine
 
 from twisted.internet import defer
 
@@ -24,6 +25,9 @@ import logging
 logger = logging.getLogger(__name__)
 
 
+MAX_STATE_DELTA_HOPS = 100
+
+
 class StateStore(SQLBaseStore):
     """ Keeps track of the state at a given event.
 
@@ -43,6 +47,15 @@ class StateStore(SQLBaseStore):
       * `state_groups_state`: Maps state group to state events.
     """
 
+    STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
+
+    def __init__(self, hs):
+        super(StateStore, self).__init__(hs)
+        self.register_background_update_handler(
+            self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME,
+            self._background_deduplicate_state,
+        )
+
     @defer.inlineCallbacks
     def get_state_groups_ids(self, room_id, event_ids):
         if not event_ids:
@@ -103,11 +116,8 @@ class StateStore(SQLBaseStore):
             state_groups[event.event_id] = context.state_group
 
             if self._have_persisted_state_group_txn(txn, context.state_group):
-                logger.info("Already persisted state_group: %r", context.state_group)
                 continue
 
-            state_event_ids = dict(context.current_state_ids)
-
             self._simple_insert_txn(
                 txn,
                 table="state_groups",
@@ -118,20 +128,51 @@ class StateStore(SQLBaseStore):
                 },
             )
 
-            self._simple_insert_many_txn(
-                txn,
-                table="state_groups_state",
-                values=[
-                    {
+            # We persist as a delta if we can, while also ensuring the chain
+            # of deltas isn't tooo long, as otherwise read performance degrades.
+            if context.prev_group:
+                potential_hops = self._count_state_group_hops_txn(
+                    txn, context.prev_group
+                )
+            if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS:
+                self._simple_insert_txn(
+                    txn,
+                    table="state_group_edges",
+                    values={
                         "state_group": context.state_group,
-                        "room_id": event.room_id,
-                        "type": key[0],
-                        "state_key": key[1],
-                        "event_id": state_id,
-                    }
-                    for key, state_id in state_event_ids.items()
-                ],
-            )
+                        "prev_state_group": context.prev_group,
+                    },
+                )
+
+                self._simple_insert_many_txn(
+                    txn,
+                    table="state_groups_state",
+                    values=[
+                        {
+                            "state_group": context.state_group,
+                            "room_id": event.room_id,
+                            "type": key[0],
+                            "state_key": key[1],
+                            "event_id": state_id,
+                        }
+                        for key, state_id in context.delta_ids.items()
+                    ],
+                )
+            else:
+                self._simple_insert_many_txn(
+                    txn,
+                    table="state_groups_state",
+                    values=[
+                        {
+                            "state_group": context.state_group,
+                            "room_id": event.room_id,
+                            "type": key[0],
+                            "state_key": key[1],
+                            "event_id": state_id,
+                        }
+                        for key, state_id in context.current_state_ids.items()
+                    ],
+                )
 
         self._simple_insert_many_txn(
             txn,
@@ -145,6 +186,47 @@ class StateStore(SQLBaseStore):
             ],
         )
 
+    def _count_state_group_hops_txn(self, txn, state_group):
+        """Given a state group, count how many hops there are in the tree.
+
+        This is used to ensure the delta chains don't get too long.
+        """
+        if isinstance(self.database_engine, PostgresEngine):
+            sql = ("""
+                WITH RECURSIVE state(state_group) AS (
+                    VALUES(?::bigint)
+                    UNION ALL
+                    SELECT prev_state_group FROM state_group_edges e, state s
+                    WHERE s.state_group = e.state_group
+                )
+                SELECT count(*) FROM state;
+            """)
+
+            txn.execute(sql, (state_group,))
+            row = txn.fetchone()
+            if row and row[0]:
+                return row[0]
+            else:
+                return 0
+        else:
+            # We don't use WITH RECURSIVE on sqlite3 as there are distributions
+            # that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
+            next_group = state_group
+            count = 0
+
+            while next_group:
+                next_group = self._simple_select_one_onecol_txn(
+                    txn,
+                    table="state_group_edges",
+                    keyvalues={"state_group": next_group},
+                    retcol="prev_state_group",
+                    allow_none=True,
+                )
+                if next_group:
+                    count += 1
+
+            return count
+
     @defer.inlineCallbacks
     def get_current_state(self, room_id, event_type=None, state_key=""):
         if event_type and state_key is not None:
@@ -206,48 +288,108 @@ class StateStore(SQLBaseStore):
     def _get_state_groups_from_groups(self, groups, types):
         """Returns dictionary state_group -> (dict of (type, state_key) -> event id)
         """
-        def f(txn, groups):
-            if types is not None:
-                where_clause = "AND (%s)" % (
-                    " OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
-                )
-            else:
-                where_clause = ""
-
-            sql = (
-                "SELECT state_group, event_id, type, state_key"
-                " FROM state_groups_state WHERE"
-                " state_group IN (%s) %s" % (
-                    ",".join("?" for _ in groups),
-                    where_clause,
-                )
-            )
-
-            args = list(groups)
-            if types is not None:
-                args.extend([i for typ in types for i in typ])
-
-            txn.execute(sql, args)
-            rows = self.cursor_to_dict(txn)
-
-            results = {group: {} for group in groups}
-            for row in rows:
-                key = (row["type"], row["state_key"])
-                results[row["state_group"]][key] = row["event_id"]
-            return results
-
         results = {}
 
         chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
         for chunk in chunks:
             res = yield self.runInteraction(
                 "_get_state_groups_from_groups",
-                f, chunk
+                self._get_state_groups_from_groups_txn, chunk, types,
             )
             results.update(res)
 
         defer.returnValue(results)
 
+    def _get_state_groups_from_groups_txn(self, txn, groups, types=None):
+        if types is not None:
+            where_clause = "AND (%s)" % (
+                " OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
+            )
+        else:
+            where_clause = ""
+
+        results = {group: {} for group in groups}
+        if isinstance(self.database_engine, PostgresEngine):
+            # Temporarily disable sequential scans in this transaction. This is
+            # a temporary hack until we can add the right indices in
+            txn.execute("SET LOCAL enable_seqscan=off")
+
+            # The below query walks the state_group tree so that the "state"
+            # table includes all state_groups in the tree. It then joins
+            # against `state_groups_state` to fetch the latest state.
+            # It assumes that previous state groups are always numerically
+            # lesser.
+            # The PARTITION is used to get the event_id in the greatest state
+            # group for the given type, state_key.
+            # This may return multiple rows per (type, state_key), but last_value
+            # should be the same.
+            sql = ("""
+                WITH RECURSIVE state(state_group) AS (
+                    VALUES(?::bigint)
+                    UNION ALL
+                    SELECT prev_state_group FROM state_group_edges e, state s
+                    WHERE s.state_group = e.state_group
+                )
+                SELECT type, state_key, last_value(event_id) OVER (
+                    PARTITION BY type, state_key ORDER BY state_group ASC
+                    ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
+                ) AS event_id FROM state_groups_state
+                WHERE state_group IN (
+                    SELECT state_group FROM state
+                )
+                %s;
+            """) % (where_clause,)
+
+            for group in groups:
+                args = [group]
+                if types is not None:
+                    args.extend([i for typ in types for i in typ])
+
+                txn.execute(sql, args)
+                rows = self.cursor_to_dict(txn)
+                for row in rows:
+                    key = (row["type"], row["state_key"])
+                    results[group][key] = row["event_id"]
+        else:
+            # We don't use WITH RECURSIVE on sqlite3 as there are distributions
+            # that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
+            for group in groups:
+                group_tree = [group]
+                next_group = group
+
+                while next_group:
+                    next_group = self._simple_select_one_onecol_txn(
+                        txn,
+                        table="state_group_edges",
+                        keyvalues={"state_group": next_group},
+                        retcol="prev_state_group",
+                        allow_none=True,
+                    )
+                    if next_group:
+                        group_tree.append(next_group)
+
+                sql = ("""
+                    SELECT type, state_key, event_id FROM state_groups_state
+                    INNER JOIN (
+                        SELECT type, state_key, max(state_group) as state_group
+                        FROM state_groups_state
+                        WHERE state_group IN (%s) %s
+                        GROUP BY type, state_key
+                    ) USING (type, state_key, state_group);
+                """) % (",".join("?" for _ in group_tree), where_clause,)
+
+                args = list(group_tree)
+                if types is not None:
+                    args.extend([i for typ in types for i in typ])
+
+                txn.execute(sql, args)
+                rows = self.cursor_to_dict(txn)
+                for row in rows:
+                    key = (row["type"], row["state_key"])
+                    results[group][key] = row["event_id"]
+
+        return results
+
     @defer.inlineCallbacks
     def get_state_for_events(self, event_ids, types):
         """Given a list of event_ids and type tuples, return a list of state
@@ -504,32 +646,150 @@ class StateStore(SQLBaseStore):
 
         defer.returnValue(results)
 
-    def get_all_new_state_groups(self, last_id, current_id, limit):
-        def get_all_new_state_groups_txn(txn):
-            sql = (
-                "SELECT id, room_id, event_id FROM state_groups"
-                " WHERE ? < id AND id <= ? ORDER BY id LIMIT ?"
+    def get_next_state_group(self):
+        return self._state_groups_id_gen.get_next()
+
+    @defer.inlineCallbacks
+    def _background_deduplicate_state(self, progress, batch_size):
+        """This background update will slowly deduplicate state by reencoding
+        them as deltas.
+        """
+        last_state_group = progress.get("last_state_group", 0)
+        rows_inserted = progress.get("rows_inserted", 0)
+        max_group = progress.get("max_group", None)
+
+        BATCH_SIZE_SCALE_FACTOR = 100
+
+        batch_size = max(1, int(batch_size / BATCH_SIZE_SCALE_FACTOR))
+
+        if max_group is None:
+            rows = yield self._execute(
+                "_background_deduplicate_state", None,
+                "SELECT coalesce(max(id), 0) FROM state_groups",
             )
-            txn.execute(sql, (last_id, current_id, limit))
-            groups = txn.fetchall()
+            max_group = rows[0][0]
+
+        def reindex_txn(txn):
+            new_last_state_group = last_state_group
+            for count in xrange(batch_size):
+                txn.execute(
+                    "SELECT id, room_id FROM state_groups"
+                    " WHERE ? < id AND id <= ?"
+                    " ORDER BY id ASC"
+                    " LIMIT 1",
+                    (new_last_state_group, max_group,)
+                )
+                row = txn.fetchone()
+                if row:
+                    state_group, room_id = row
 
-            if not groups:
-                return ([], [])
+                if not row or not state_group:
+                    return True, count
 
-            lower_bound = groups[0][0]
-            upper_bound = groups[-1][0]
-            sql = (
-                "SELECT state_group, type, state_key, event_id"
-                " FROM state_groups_state"
-                " WHERE ? <= state_group AND state_group <= ?"
+                txn.execute(
+                    "SELECT state_group FROM state_group_edges"
+                    " WHERE state_group = ?",
+                    (state_group,)
+                )
+
+                # If we reach a point where we've already started inserting
+                # edges we should stop.
+                if txn.fetchall():
+                    return True, count
+
+                txn.execute(
+                    "SELECT coalesce(max(id), 0) FROM state_groups"
+                    " WHERE id < ? AND room_id = ?",
+                    (state_group, room_id,)
+                )
+                prev_group, = txn.fetchone()
+                new_last_state_group = state_group
+
+                if prev_group:
+                    potential_hops = self._count_state_group_hops_txn(
+                        txn, prev_group
+                    )
+                    if potential_hops >= MAX_STATE_DELTA_HOPS:
+                        # We want to ensure chains are at most this long,#
+                        # otherwise read performance degrades.
+                        continue
+
+                    prev_state = self._get_state_groups_from_groups_txn(
+                        txn, [prev_group], types=None
+                    )
+                    prev_state = prev_state[prev_group]
+
+                    curr_state = self._get_state_groups_from_groups_txn(
+                        txn, [state_group], types=None
+                    )
+                    curr_state = curr_state[state_group]
+
+                    if not set(prev_state.keys()) - set(curr_state.keys()):
+                        # We can only do a delta if the current has a strict super set
+                        # of keys
+
+                        delta_state = {
+                            key: value for key, value in curr_state.items()
+                            if prev_state.get(key, None) != value
+                        }
+
+                        self._simple_delete_txn(
+                            txn,
+                            table="state_group_edges",
+                            keyvalues={
+                                "state_group": state_group,
+                            }
+                        )
+
+                        self._simple_insert_txn(
+                            txn,
+                            table="state_group_edges",
+                            values={
+                                "state_group": state_group,
+                                "prev_state_group": prev_group,
+                            }
+                        )
+
+                        self._simple_delete_txn(
+                            txn,
+                            table="state_groups_state",
+                            keyvalues={
+                                "state_group": state_group,
+                            }
+                        )
+
+                        self._simple_insert_many_txn(
+                            txn,
+                            table="state_groups_state",
+                            values=[
+                                {
+                                    "state_group": state_group,
+                                    "room_id": room_id,
+                                    "type": key[0],
+                                    "state_key": key[1],
+                                    "event_id": state_id,
+                                }
+                                for key, state_id in delta_state.items()
+                            ],
+                        )
+
+            progress = {
+                "last_state_group": state_group,
+                "rows_inserted": rows_inserted + batch_size,
+                "max_group": max_group,
+            }
+
+            self._background_update_progress_txn(
+                txn, self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, progress
             )
 
-            txn.execute(sql, (lower_bound, upper_bound))
-            state_group_state = txn.fetchall()
-            return (groups, state_group_state)
-        return self.runInteraction(
-            "get_all_new_state_groups", get_all_new_state_groups_txn
+            return False, batch_size
+
+        finished, result = yield self.runInteraction(
+            self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, reindex_txn
         )
 
-    def get_next_state_group(self):
-        return self._state_groups_id_gen.get_next()
+        if finished:
+            yield self._end_background_update(self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME)
+
+        defer.returnValue(result * BATCH_SIZE_SCALE_FACTOR)