summary refs log tree commit diff
diff options
context:
space:
mode:
authorKegan Dougal <kegan@matrix.org>2015-02-05 14:28:03 +0000
committerKegan Dougal <kegan@matrix.org>2015-02-05 14:28:03 +0000
commit951690e54d5673431abefafc30c94af6e11341e3 (patch)
tree779471113f16dfbf7e67504731dd412101eee180
parentFix user query checks. HS>AS pushing now works. (diff)
parentMerge branch 'federation_client_retries' of github.com:matrix-org/synapse int... (diff)
downloadsynapse-951690e54d5673431abefafc30c94af6e11341e3.tar.xz
Merge branch 'develop' into application-services
Diffstat (limited to '')
-rw-r--r--CHANGES.rst5
-rw-r--r--scripts/check_auth.py65
-rw-r--r--synapse/api/auth.py14
-rw-r--r--synapse/api/errors.py8
-rw-r--r--synapse/events/__init__.py2
-rw-r--r--synapse/federation/federation_base.py118
-rw-r--r--synapse/federation/federation_client.py181
-rw-r--r--synapse/federation/federation_server.py57
-rw-r--r--synapse/federation/transaction_queue.py23
-rw-r--r--synapse/federation/transport/client.py42
-rw-r--r--synapse/handlers/directory.py11
-rw-r--r--synapse/handlers/federation.py193
-rw-r--r--synapse/handlers/message.py17
-rw-r--r--synapse/handlers/room.py12
-rw-r--r--synapse/http/matrixfederationclient.py42
-rw-r--r--synapse/push/__init__.py12
-rw-r--r--synapse/push/baserules.py3
-rw-r--r--synapse/push/httppusher.py4
-rw-r--r--synapse/push/pusherpool.py12
-rw-r--r--synapse/rest/client/v1/push_rule.py36
-rw-r--r--synapse/rest/client/v1/pusher.py4
-rw-r--r--synapse/state.py22
-rw-r--r--synapse/storage/__init__.py211
-rw-r--r--synapse/storage/pusher.py14
-rw-r--r--synapse/storage/schema/delta/v12.sql2
-rw-r--r--synapse/storage/schema/pusher.sql2
-rw-r--r--tests/handlers/test_federation.py5
27 files changed, 716 insertions, 401 deletions
diff --git a/CHANGES.rst b/CHANGES.rst
index 297ae914fd..922fa5b035 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,8 @@
+Changes in develop
+==================
+
+ * pydenticon support -- adds dep on pydenticon
+
 Changes in synapse 0.6.1 (2015-01-07)
 =====================================
 
diff --git a/scripts/check_auth.py b/scripts/check_auth.py
new file mode 100644
index 0000000000..b889ac7fa7
--- /dev/null
+++ b/scripts/check_auth.py
@@ -0,0 +1,65 @@
+from synapse.events import FrozenEvent
+from synapse.api.auth import Auth
+
+from mock import Mock
+
+import argparse
+import itertools
+import json
+import sys
+
+
+def check_auth(auth, auth_chain, events):
+    auth_chain.sort(key=lambda e: e.depth)
+
+    auth_map = {
+        e.event_id: e
+        for e in auth_chain
+    }
+
+    create_events = {}
+    for e in auth_chain:
+        if e.type == "m.room.create":
+            create_events[e.room_id] = e
+
+    for e in itertools.chain(auth_chain, events):
+        auth_events_list = [auth_map[i] for i, _ in e.auth_events]
+
+        auth_events = {
+            (e.type, e.state_key): e
+            for e in auth_events_list
+        }
+
+        auth_events[("m.room.create", "")] = create_events[e.room_id]
+
+        try:
+            auth.check(e, auth_events=auth_events)
+        except Exception as ex:
+            print "Failed:", e.event_id, e.type, e.state_key
+            print "Auth_events:", auth_events
+            print ex
+            print json.dumps(e.get_dict(), sort_keys=True, indent=4)
+            # raise
+        print "Success:", e.event_id, e.type, e.state_key
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser()
+
+    parser.add_argument(
+        'json',
+        nargs='?',
+        type=argparse.FileType('r'),
+        default=sys.stdin,
+    )
+
+    args = parser.parse_args()
+
+    js = json.load(args.json)
+
+
+    auth = Auth(Mock())
+    check_auth(
+        auth,
+        [FrozenEvent(d) for d in js["auth_chain"]],
+        [FrozenEvent(d) for d in js["pdus"]],
+    )
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 37e31d2b6f..7105ee21dc 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -102,8 +102,6 @@ class Auth(object):
     def check_host_in_room(self, room_id, host):
         curr_state = yield self.state.get_current_state(room_id)
 
-        logger.debug("Got curr_state %s", curr_state)
-
         for event in curr_state:
             if event.type == EventTypes.Member:
                 try:
@@ -360,7 +358,7 @@ class Auth(object):
     def add_auth_events(self, builder, context):
         yield run_on_reactor()
 
-        auth_ids = self.compute_auth_events(builder, context)
+        auth_ids = self.compute_auth_events(builder, context.current_state)
 
         auth_events_entries = yield self.store.add_event_hashes(
             auth_ids
@@ -374,26 +372,26 @@ class Auth(object):
             if v.event_id in auth_ids
         }
 
-    def compute_auth_events(self, event, context):
+    def compute_auth_events(self, event, current_state):
         if event.type == EventTypes.Create:
             return []
 
         auth_ids = []
 
         key = (EventTypes.PowerLevels, "", )
-        power_level_event = context.current_state.get(key)
+        power_level_event = current_state.get(key)
 
         if power_level_event:
             auth_ids.append(power_level_event.event_id)
 
         key = (EventTypes.JoinRules, "", )
-        join_rule_event = context.current_state.get(key)
+        join_rule_event = current_state.get(key)
 
         key = (EventTypes.Member, event.user_id, )
-        member_event = context.current_state.get(key)
+        member_event = current_state.get(key)
 
         key = (EventTypes.Create, "", )
-        create_event = context.current_state.get(key)
+        create_event = current_state.get(key)
         if create_event:
             auth_ids.append(create_event.event_id)
 
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index ad478aa6b7..5041828f18 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -39,7 +39,7 @@ class Codes(object):
     TOO_LARGE = "M_TOO_LARGE"
 
 
-class CodeMessageException(Exception):
+class CodeMessageException(RuntimeError):
     """An exception with integer code and message string attributes."""
 
     def __init__(self, code, msg):
@@ -227,3 +227,9 @@ class FederationError(RuntimeError):
             "affected": self.affected,
             "source": self.source if self.source else self.affected,
         }
+
+
+class HttpResponseException(CodeMessageException):
+    def __init__(self, code, msg, response):
+        self.response = response
+        super(HttpResponseException, self).__init__(code, msg)
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index bf07951027..8f0c6e959f 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -77,7 +77,7 @@ class EventBase(object):
         return self.content["membership"]
 
     def is_state(self):
-        return hasattr(self, "state_key")
+        return hasattr(self, "state_key") and self.state_key is not None
 
     def get_dict(self):
         d = dict(self._event_dict)
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
new file mode 100644
index 0000000000..a990aec4fd
--- /dev/null
+++ b/synapse/federation/federation_base.py
@@ -0,0 +1,118 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from twisted.internet import defer
+
+from synapse.events.utils import prune_event
+
+from syutil.jsonutil import encode_canonical_json
+
+from synapse.crypto.event_signing import check_event_content_hash
+
+from synapse.api.errors import SynapseError
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class FederationBase(object):
+    @defer.inlineCallbacks
+    def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False):
+        """Takes a list of PDUs and checks the signatures and hashs of each
+        one. If a PDU fails its signature check then we check if we have it in
+        the database and if not then request if from the originating server of
+        that PDU.
+
+        If a PDU fails its content hash check then it is redacted.
+
+        The given list of PDUs are not modified, instead the function returns
+        a new list.
+
+        Args:
+            pdu (list)
+            outlier (bool)
+
+        Returns:
+            Deferred : A list of PDUs that have valid signatures and hashes.
+        """
+        signed_pdus = []
+        for pdu in pdus:
+            try:
+                new_pdu = yield self._check_sigs_and_hash(pdu)
+                signed_pdus.append(new_pdu)
+            except SynapseError:
+                # FIXME: We should handle signature failures more gracefully.
+
+                # Check local db.
+                new_pdu = yield self.store.get_event(
+                    pdu.event_id,
+                    allow_rejected=True
+                )
+                if new_pdu:
+                    signed_pdus.append(new_pdu)
+                    continue
+
+                # Check pdu.origin
+                if pdu.origin != origin:
+                    new_pdu = yield self.get_pdu(
+                        destinations=[pdu.origin],
+                        event_id=pdu.event_id,
+                        outlier=outlier,
+                    )
+
+                    if new_pdu:
+                        signed_pdus.append(new_pdu)
+                        continue
+
+                logger.warn("Failed to find copy of %s with valid signature")
+
+        defer.returnValue(signed_pdus)
+
+    @defer.inlineCallbacks
+    def _check_sigs_and_hash(self, pdu):
+        """Throws a SynapseError if the PDU does not have the correct
+        signatures.
+
+        Returns:
+            FrozenEvent: Either the given event or it redacted if it failed the
+            content hash check.
+        """
+        # Check signatures are correct.
+        redacted_event = prune_event(pdu)
+        redacted_pdu_json = redacted_event.get_pdu_json()
+
+        try:
+            yield self.keyring.verify_json_for_server(
+                pdu.origin, redacted_pdu_json
+            )
+        except SynapseError:
+            logger.warn(
+                "Signature check failed for %s redacted to %s",
+                encode_canonical_json(pdu.get_pdu_json()),
+                encode_canonical_json(redacted_pdu_json),
+            )
+            raise
+
+        if not check_event_content_hash(pdu):
+            logger.warn(
+                "Event content has been tampered, redacting %s, %s",
+                pdu.event_id, encode_canonical_json(pdu.get_dict())
+            )
+            defer.returnValue(redacted_event)
+
+        defer.returnValue(pdu)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index e1539bd0e0..70c9a6f46b 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -16,17 +16,12 @@
 
 from twisted.internet import defer
 
+from .federation_base import FederationBase
 from .units import Edu
 
+from synapse.api.errors import CodeMessageException
 from synapse.util.logutils import log_function
 from synapse.events import FrozenEvent
-from synapse.events.utils import prune_event
-
-from syutil.jsonutil import encode_canonical_json
-
-from synapse.crypto.event_signing import check_event_content_hash
-
-from synapse.api.errors import SynapseError
 
 import logging
 
@@ -34,7 +29,7 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class FederationClient(object):
+class FederationClient(FederationBase):
     @log_function
     def send_pdu(self, pdu, destinations):
         """Informs the replication layer about a new PDU generated within the
@@ -186,7 +181,8 @@ class FederationClient(object):
                     pdu = yield self._check_sigs_and_hash(pdu)
 
                     break
-
+            except CodeMessageException:
+                raise
             except Exception as e:
                 logger.info(
                     "Failed to get PDU %s from %s because %s",
@@ -224,17 +220,17 @@ class FederationClient(object):
             for p in result.get("auth_chain", [])
         ]
 
-        for i, pdu in enumerate(pdus):
-            pdus[i] = yield self._check_sigs_and_hash(pdu)
-
-            # FIXME: We should handle signature failures more gracefully.
+        signed_pdus = yield self._check_sigs_and_hash_and_fetch(
+            destination, pdus, outlier=True
+        )
 
-        for i, pdu in enumerate(auth_chain):
-            auth_chain[i] = yield self._check_sigs_and_hash(pdu)
+        signed_auth = yield self._check_sigs_and_hash_and_fetch(
+            destination, auth_chain, outlier=True
+        )
 
-            # FIXME: We should handle signature failures more gracefully.
+        signed_auth.sort(key=lambda e: e.depth)
 
-        defer.returnValue((pdus, auth_chain))
+        defer.returnValue((signed_pdus, signed_auth))
 
     @defer.inlineCallbacks
     @log_function
@@ -248,65 +244,88 @@ class FederationClient(object):
             for p in res["auth_chain"]
         ]
 
-        for i, pdu in enumerate(auth_chain):
-            auth_chain[i] = yield self._check_sigs_and_hash(pdu)
-
-            # FIXME: We should handle signature failures more gracefully.
+        signed_auth = yield self._check_sigs_and_hash_and_fetch(
+            destination, auth_chain, outlier=True
+        )
 
-        auth_chain.sort(key=lambda e: e.depth)
+        signed_auth.sort(key=lambda e: e.depth)
 
-        defer.returnValue(auth_chain)
+        defer.returnValue(signed_auth)
 
     @defer.inlineCallbacks
-    def make_join(self, destination, room_id, user_id):
-        ret = yield self.transport_layer.make_join(
-            destination, room_id, user_id
-        )
+    def make_join(self, destinations, room_id, user_id):
+        for destination in destinations:
+            try:
+                ret = yield self.transport_layer.make_join(
+                    destination, room_id, user_id
+                )
 
-        pdu_dict = ret["event"]
+                pdu_dict = ret["event"]
 
-        logger.debug("Got response to make_join: %s", pdu_dict)
+                logger.debug("Got response to make_join: %s", pdu_dict)
 
-        defer.returnValue(self.event_from_pdu_json(pdu_dict))
+                defer.returnValue(
+                    (destination, self.event_from_pdu_json(pdu_dict))
+                )
+                break
+            except CodeMessageException:
+                raise
+            except Exception as e:
+                logger.warn(
+                    "Failed to make_join via %s: %s",
+                    destination, e.message
+                )
 
-    @defer.inlineCallbacks
-    def send_join(self, destination, pdu):
-        time_now = self._clock.time_msec()
-        _, content = yield self.transport_layer.send_join(
-            destination=destination,
-            room_id=pdu.room_id,
-            event_id=pdu.event_id,
-            content=pdu.get_pdu_json(time_now),
-        )
+        raise RuntimeError("Failed to send to any server.")
 
-        logger.debug("Got content: %s", content)
+    @defer.inlineCallbacks
+    def send_join(self, destinations, pdu):
+        for destination in destinations:
+            try:
+                time_now = self._clock.time_msec()
+                _, content = yield self.transport_layer.send_join(
+                    destination=destination,
+                    room_id=pdu.room_id,
+                    event_id=pdu.event_id,
+                    content=pdu.get_pdu_json(time_now),
+                )
 
-        state = [
-            self.event_from_pdu_json(p, outlier=True)
-            for p in content.get("state", [])
-        ]
+                logger.debug("Got content: %s", content)
 
-        auth_chain = [
-            self.event_from_pdu_json(p, outlier=True)
-            for p in content.get("auth_chain", [])
-        ]
+                state = [
+                    self.event_from_pdu_json(p, outlier=True)
+                    for p in content.get("state", [])
+                ]
 
-        for i, pdu in enumerate(state):
-            state[i] = yield self._check_sigs_and_hash(pdu)
+                auth_chain = [
+                    self.event_from_pdu_json(p, outlier=True)
+                    for p in content.get("auth_chain", [])
+                ]
 
-            # FIXME: We should handle signature failures more gracefully.
+                signed_state = yield self._check_sigs_and_hash_and_fetch(
+                    destination, state, outlier=True
+                )
 
-        for i, pdu in enumerate(auth_chain):
-            auth_chain[i] = yield self._check_sigs_and_hash(pdu)
+                signed_auth = yield self._check_sigs_and_hash_and_fetch(
+                    destination, auth_chain, outlier=True
+                )
 
-            # FIXME: We should handle signature failures more gracefully.
+                auth_chain.sort(key=lambda e: e.depth)
 
-        auth_chain.sort(key=lambda e: e.depth)
+                defer.returnValue({
+                    "state": signed_state,
+                    "auth_chain": signed_auth,
+                    "origin": destination,
+                })
+            except CodeMessageException:
+                raise
+            except Exception as e:
+                logger.warn(
+                    "Failed to send_join via %s: %s",
+                    destination, e.message
+                )
 
-        defer.returnValue({
-            "state": state,
-            "auth_chain": auth_chain,
-        })
+        raise RuntimeError("Failed to send to any server.")
 
     @defer.inlineCallbacks
     def send_invite(self, destination, room_id, event_id, pdu):
@@ -353,12 +372,18 @@ class FederationClient(object):
         )
 
         auth_chain = [
-            (yield self._check_sigs_and_hash(self.event_from_pdu_json(e)))
+            self.event_from_pdu_json(e)
             for e in content["auth_chain"]
         ]
 
+        signed_auth = yield self._check_sigs_and_hash_and_fetch(
+            destination, auth_chain, outlier=True
+        )
+
+        signed_auth.sort(key=lambda e: e.depth)
+
         ret = {
-            "auth_chain": auth_chain,
+            "auth_chain": signed_auth,
             "rejects": content.get("rejects", []),
             "missing": content.get("missing", []),
         }
@@ -373,37 +398,3 @@ class FederationClient(object):
         event.internal_metadata.outlier = outlier
 
         return event
-
-    @defer.inlineCallbacks
-    def _check_sigs_and_hash(self, pdu):
-        """Throws a SynapseError if the PDU does not have the correct
-        signatures.
-
-        Returns:
-            FrozenEvent: Either the given event or it redacted if it failed the
-            content hash check.
-        """
-        # Check signatures are correct.
-        redacted_event = prune_event(pdu)
-        redacted_pdu_json = redacted_event.get_pdu_json()
-
-        try:
-            yield self.keyring.verify_json_for_server(
-                pdu.origin, redacted_pdu_json
-            )
-        except SynapseError:
-            logger.warn(
-                "Signature check failed for %s redacted to %s",
-                encode_canonical_json(pdu.get_pdu_json()),
-                encode_canonical_json(redacted_pdu_json),
-            )
-            raise
-
-        if not check_event_content_hash(pdu):
-            logger.warn(
-                "Event content has been tampered, redacting %s, %s",
-                pdu.event_id, encode_canonical_json(pdu.get_dict())
-            )
-            defer.returnValue(redacted_event)
-
-        defer.returnValue(pdu)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 5fbd8b19de..4742ca9390 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -16,16 +16,12 @@
 
 from twisted.internet import defer
 
+from .federation_base import FederationBase
 from .units import Transaction, Edu
 
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.events import FrozenEvent
-from synapse.events.utils import prune_event
-
-from syutil.jsonutil import encode_canonical_json
-
-from synapse.crypto.event_signing import check_event_content_hash
 
 from synapse.api.errors import FederationError, SynapseError
 
@@ -35,7 +31,7 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class FederationServer(object):
+class FederationServer(FederationBase):
     def set_handler(self, handler):
         """Sets the handler that the replication layer will use to communicate
         receipt of new PDUs from other home servers. The required methods are
@@ -251,17 +247,20 @@ class FederationServer(object):
             Deferred: Results in `dict` with the same format as `content`
         """
         auth_chain = [
-            (yield self._check_sigs_and_hash(self.event_from_pdu_json(e)))
+            self.event_from_pdu_json(e)
             for e in content["auth_chain"]
         ]
 
-        missing = [
-            (yield self._check_sigs_and_hash(self.event_from_pdu_json(e)))
-            for e in content.get("missing", [])
-        ]
+        signed_auth = yield self._check_sigs_and_hash_and_fetch(
+            origin, auth_chain, outlier=True
+        )
 
         ret = yield self.handler.on_query_auth(
-            origin, event_id, auth_chain, content.get("rejects", []), missing
+            origin,
+            event_id,
+            signed_auth,
+            content.get("rejects", []),
+            content.get("missing", []),
         )
 
         time_now = self._clock.time_msec()
@@ -426,37 +425,3 @@ class FederationServer(object):
         event.internal_metadata.outlier = outlier
 
         return event
-
-    @defer.inlineCallbacks
-    def _check_sigs_and_hash(self, pdu):
-        """Throws a SynapseError if the PDU does not have the correct
-        signatures.
-
-        Returns:
-            FrozenEvent: Either the given event or it redacted if it failed the
-            content hash check.
-        """
-        # Check signatures are correct.
-        redacted_event = prune_event(pdu)
-        redacted_pdu_json = redacted_event.get_pdu_json()
-
-        try:
-            yield self.keyring.verify_json_for_server(
-                pdu.origin, redacted_pdu_json
-            )
-        except SynapseError:
-            logger.warn(
-                "Signature check failed for %s redacted to %s",
-                encode_canonical_json(pdu.get_pdu_json()),
-                encode_canonical_json(redacted_pdu_json),
-            )
-            raise
-
-        if not check_event_content_hash(pdu):
-            logger.warn(
-                "Event content has been tampered, redacting %s, %s",
-                pdu.event_id, encode_canonical_json(pdu.get_dict())
-            )
-            defer.returnValue(redacted_event)
-
-        defer.returnValue(pdu)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 9d4f2c09a2..f38aeba7cc 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
 from .persistence import TransactionActions
 from .units import Transaction
 
+from synapse.api.errors import HttpResponseException
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
 
@@ -238,9 +239,14 @@ class TransactionQueue(object):
                             del p["age_ts"]
                 return data
 
-            code, response = yield self.transport_layer.send_transaction(
-                transaction, json_data_cb
-            )
+            try:
+                response = yield self.transport_layer.send_transaction(
+                    transaction, json_data_cb
+                )
+                code = 200
+            except HttpResponseException as e:
+                code = e.code
+                response = e.response
 
             logger.info("TX [%s] got %d response", destination, code)
 
@@ -274,8 +280,7 @@ class TransactionQueue(object):
                     pass
 
             logger.debug("TX [%s] Yielded to callbacks", destination)
-
-        except Exception as e:
+        except RuntimeError as e:
             # We capture this here as there as nothing actually listens
             # for this finishing functions deferred.
             logger.warn(
@@ -283,6 +288,14 @@ class TransactionQueue(object):
                 destination,
                 e,
             )
+        except Exception as e:
+            # We capture this here as there as nothing actually listens
+            # for this finishing functions deferred.
+            logger.exception(
+                "TX [%s] Problem in _attempt_transaction: %s",
+                destination,
+                e,
+            )
 
             self.set_retrying(destination, retry_interval)
 
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 4cb1dea2de..8b137e7128 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -19,7 +19,6 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX
 from synapse.util.logutils import log_function
 
 import logging
-import json
 
 
 logger = logging.getLogger(__name__)
@@ -129,7 +128,7 @@ class TransportLayerClient(object):
         # generated by the json_data_callback.
         json_data = transaction.get_dict()
 
-        code, response = yield self.client.put_json(
+        response = yield self.client.put_json(
             transaction.destination,
             path=PREFIX + "/send/%s/" % transaction.transaction_id,
             data=json_data,
@@ -137,95 +136,86 @@ class TransportLayerClient(object):
         )
 
         logger.debug(
-            "send_data dest=%s, txid=%s, got response: %d",
-            transaction.destination, transaction.transaction_id, code
+            "send_data dest=%s, txid=%s, got response: 200",
+            transaction.destination, transaction.transaction_id,
         )
 
-        defer.returnValue((code, response))
+        defer.returnValue(response)
 
     @defer.inlineCallbacks
     @log_function
     def make_query(self, destination, query_type, args, retry_on_dns_fail):
         path = PREFIX + "/query/%s" % query_type
 
-        response = yield self.client.get_json(
+        content = yield self.client.get_json(
             destination=destination,
             path=path,
             args=args,
             retry_on_dns_fail=retry_on_dns_fail,
         )
 
-        defer.returnValue(response)
+        defer.returnValue(content)
 
     @defer.inlineCallbacks
     @log_function
     def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True):
         path = PREFIX + "/make_join/%s/%s" % (room_id, user_id)
 
-        response = yield self.client.get_json(
+        content = yield self.client.get_json(
             destination=destination,
             path=path,
             retry_on_dns_fail=retry_on_dns_fail,
         )
 
-        defer.returnValue(response)
+        defer.returnValue(content)
 
     @defer.inlineCallbacks
     @log_function
     def send_join(self, destination, room_id, event_id, content):
         path = PREFIX + "/send_join/%s/%s" % (room_id, event_id)
 
-        code, content = yield self.client.put_json(
+        response = yield self.client.put_json(
             destination=destination,
             path=path,
             data=content,
         )
 
-        if not 200 <= code < 300:
-            raise RuntimeError("Got %d from send_join", code)
-
-        defer.returnValue(json.loads(content))
+        defer.returnValue(response)
 
     @defer.inlineCallbacks
     @log_function
     def send_invite(self, destination, room_id, event_id, content):
         path = PREFIX + "/invite/%s/%s" % (room_id, event_id)
 
-        code, content = yield self.client.put_json(
+        response = yield self.client.put_json(
             destination=destination,
             path=path,
             data=content,
         )
 
-        if not 200 <= code < 300:
-            raise RuntimeError("Got %d from send_invite", code)
-
-        defer.returnValue(json.loads(content))
+        defer.returnValue(response)
 
     @defer.inlineCallbacks
     @log_function
     def get_event_auth(self, destination, room_id, event_id):
         path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id)
 
-        response = yield self.client.get_json(
+        content = yield self.client.get_json(
             destination=destination,
             path=path,
         )
 
-        defer.returnValue(response)
+        defer.returnValue(content)
 
     @defer.inlineCallbacks
     @log_function
     def send_query_auth(self, destination, room_id, event_id, content):
         path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id)
 
-        code, content = yield self.client.post_json(
+        content = yield self.client.post_json(
             destination=destination,
             path=path,
             data=content,
         )
 
-        if not 200 <= code < 300:
-            raise RuntimeError("Got %d from send_invite", code)
-
-        defer.returnValue(json.loads(content))
+        defer.returnValue(content)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 000bf5793c..24ea3573d3 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -120,7 +120,16 @@ class DirectoryHandler(BaseHandler):
             )
 
         extra_servers = yield self.store.get_joined_hosts_for_room(room_id)
-        servers = list(set(extra_servers) | set(servers))
+        servers = set(extra_servers) | set(servers)
+
+        # If this server is in the list of servers, return it first.
+        if self.server_name in servers:
+            servers = (
+                [self.server_name]
+                + [s for s in servers if s != self.server_name]
+            )
+        else:
+            servers = list(servers)
 
         defer.returnValue({
             "room_id": room_id,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8bf5a4cc11..aba266c2bc 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -30,6 +30,7 @@ from synapse.types import UserID
 
 from twisted.internet import defer
 
+import itertools
 import logging
 
 
@@ -123,8 +124,21 @@ class FederationHandler(BaseHandler):
             logger.debug("Got event for room we're not in.")
             current_state = state
 
+        event_ids = set()
+        if state:
+            event_ids |= {e.event_id for e in state}
+        if auth_chain:
+            event_ids |= {e.event_id for e in auth_chain}
+
+        seen_ids = (yield self.store.have_events(event_ids)).keys()
+
         if state and auth_chain is not None:
-            for e in state:
+            # If we have any state or auth_chain given to us by the replication
+            # layer, then we should handle them (if we haven't before.)
+            for e in itertools.chain(auth_chain, state):
+                if e.event_id in seen_ids:
+                    continue
+
                 e.internal_metadata.outlier = True
                 try:
                     auth_ids = [e_id for e_id, _ in e.auth_events]
@@ -132,7 +146,10 @@ class FederationHandler(BaseHandler):
                         (e.type, e.state_key): e for e in auth_chain
                         if e.event_id in auth_ids
                     }
-                    yield self._handle_new_event(origin, e, auth_events=auth)
+                    yield self._handle_new_event(
+                        origin, e, auth_events=auth
+                    )
+                    seen_ids.add(e.event_id)
                 except:
                     logger.exception(
                         "Failed to handle state event %s",
@@ -256,7 +273,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def do_invite_join(self, target_host, room_id, joinee, content, snapshot):
+    def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot):
         """ Attempts to join the `joinee` to the room `room_id` via the
         server `target_host`.
 
@@ -270,8 +287,8 @@ class FederationHandler(BaseHandler):
         """
         logger.debug("Joining %s to %s", joinee, room_id)
 
-        pdu = yield self.replication_layer.make_join(
-            target_host,
+        origin, pdu = yield self.replication_layer.make_join(
+            target_hosts,
             room_id,
             joinee
         )
@@ -313,11 +330,17 @@ class FederationHandler(BaseHandler):
 
             new_event = builder.build()
 
+            # Try the host we successfully got a response to /make_join/
+            # request first.
+            target_hosts.remove(origin)
+            target_hosts.insert(0, origin)
+
             ret = yield self.replication_layer.send_join(
-                target_host,
+                target_hosts,
                 new_event
             )
 
+            origin = ret["origin"]
             state = ret["state"]
             auth_chain = ret["auth_chain"]
             auth_chain.sort(key=lambda e: e.depth)
@@ -354,7 +377,7 @@ class FederationHandler(BaseHandler):
                         if e.event_id in auth_ids
                     }
                     yield self._handle_new_event(
-                        target_host, e, auth_events=auth
+                        origin, e, auth_events=auth
                     )
                 except:
                     logger.exception(
@@ -374,7 +397,7 @@ class FederationHandler(BaseHandler):
                         if e.event_id in auth_ids
                     }
                     yield self._handle_new_event(
-                        target_host, e, auth_events=auth
+                        origin, e, auth_events=auth
                     )
                 except:
                     logger.exception(
@@ -389,7 +412,7 @@ class FederationHandler(BaseHandler):
             }
 
             yield self._handle_new_event(
-                target_host,
+                origin,
                 new_event,
                 state=state,
                 current_state=state,
@@ -498,6 +521,8 @@ class FederationHandler(BaseHandler):
                     "Failed to get destination from event %s", s.event_id
                 )
 
+        destinations.remove(origin)
+
         logger.debug(
             "on_send_join_request: Sending event: %s, signatures: %s",
             event.event_id,
@@ -618,6 +643,7 @@ class FederationHandler(BaseHandler):
         event = yield self.store.get_event(
             event_id,
             allow_none=True,
+            allow_rejected=True,
         )
 
         if event:
@@ -701,6 +727,8 @@ class FederationHandler(BaseHandler):
 
             context.rejected = RejectedReason.AUTH_ERROR
 
+            # FIXME: Don't store as rejected with AUTH_ERROR if we haven't
+            # seen all the auth events.
             yield self.store.persist_event(
                 event,
                 context=context,
@@ -750,7 +778,7 @@ class FederationHandler(BaseHandler):
                 )
             )
 
-        logger.debug("on_query_auth reutrning: %s", ret)
+        logger.debug("on_query_auth returning: %s", ret)
 
         defer.returnValue(ret)
 
@@ -770,41 +798,45 @@ class FederationHandler(BaseHandler):
         if missing_auth:
             logger.debug("Missing auth: %s", missing_auth)
             # If we don't have all the auth events, we need to get them.
-            remote_auth_chain = yield self.replication_layer.get_event_auth(
-                origin, event.room_id, event.event_id
-            )
+            try:
+                remote_auth_chain = yield self.replication_layer.get_event_auth(
+                    origin, event.room_id, event.event_id
+                )
 
-            seen_remotes = yield self.store.have_events(
-                [e.event_id for e in remote_auth_chain]
-            )
+                seen_remotes = yield self.store.have_events(
+                    [e.event_id for e in remote_auth_chain]
+                )
 
-            for e in remote_auth_chain:
-                if e.event_id in seen_remotes.keys():
-                    continue
+                for e in remote_auth_chain:
+                    if e.event_id in seen_remotes.keys():
+                        continue
 
-                if e.event_id == event.event_id:
-                    continue
+                    if e.event_id == event.event_id:
+                        continue
 
-                try:
-                    auth_ids = [e_id for e_id, _ in e.auth_events]
-                    auth = {
-                        (e.type, e.state_key): e for e in remote_auth_chain
-                        if e.event_id in auth_ids
-                    }
-                    e.internal_metadata.outlier = True
+                    try:
+                        auth_ids = [e_id for e_id, _ in e.auth_events]
+                        auth = {
+                            (e.type, e.state_key): e for e in remote_auth_chain
+                            if e.event_id in auth_ids
+                        }
+                        e.internal_metadata.outlier = True
 
-                    logger.debug(
-                        "do_auth %s missing_auth: %s",
-                        event.event_id, e.event_id
-                    )
-                    yield self._handle_new_event(
-                        origin, e, auth_events=auth
-                    )
+                        logger.debug(
+                            "do_auth %s missing_auth: %s",
+                            event.event_id, e.event_id
+                        )
+                        yield self._handle_new_event(
+                            origin, e, auth_events=auth
+                        )
 
-                    if e.event_id in event_auth_events:
-                        auth_events[(e.type, e.state_key)] = e
-                except AuthError:
-                    pass
+                        if e.event_id in event_auth_events:
+                            auth_events[(e.type, e.state_key)] = e
+                    except AuthError:
+                        pass
+            except:
+                # FIXME:
+                logger.exception("Failed to get auth chain")
 
         # FIXME: Assumes we have and stored all the state for all the
         # prev_events
@@ -816,50 +848,57 @@ class FederationHandler(BaseHandler):
             logger.debug("Different auth: %s", different_auth)
 
             # 1. Get what we think is the auth chain.
-            auth_ids = self.auth.compute_auth_events(event, context)
-            local_auth_chain = yield self.store.get_auth_chain(auth_ids)
-
-            # 2. Get remote difference.
-            result = yield self.replication_layer.query_auth(
-                origin,
-                event.room_id,
-                event.event_id,
-                local_auth_chain,
-            )
-
-            seen_remotes = yield self.store.have_events(
-                [e.event_id for e in result["auth_chain"]]
+            auth_ids = self.auth.compute_auth_events(
+                event, context.current_state
             )
+            local_auth_chain = yield self.store.get_auth_chain(auth_ids)
 
-            # 3. Process any remote auth chain events we haven't seen.
-            for ev in result["auth_chain"]:
-                if ev.event_id in seen_remotes.keys():
-                    continue
+            try:
+                # 2. Get remote difference.
+                result = yield self.replication_layer.query_auth(
+                    origin,
+                    event.room_id,
+                    event.event_id,
+                    local_auth_chain,
+                )
 
-                if ev.event_id == event.event_id:
-                    continue
+                seen_remotes = yield self.store.have_events(
+                    [e.event_id for e in result["auth_chain"]]
+                )
 
-                try:
-                    auth_ids = [e_id for e_id, _ in ev.auth_events]
-                    auth = {
-                        (e.type, e.state_key): e for e in result["auth_chain"]
-                        if e.event_id in auth_ids
-                    }
-                    ev.internal_metadata.outlier = True
+                # 3. Process any remote auth chain events we haven't seen.
+                for ev in result["auth_chain"]:
+                    if ev.event_id in seen_remotes.keys():
+                        continue
+
+                    if ev.event_id == event.event_id:
+                        continue
+
+                    try:
+                        auth_ids = [e_id for e_id, _ in ev.auth_events]
+                        auth = {
+                            (e.type, e.state_key): e for e in result["auth_chain"]
+                            if e.event_id in auth_ids
+                        }
+                        ev.internal_metadata.outlier = True
+
+                        logger.debug(
+                            "do_auth %s different_auth: %s",
+                            event.event_id, e.event_id
+                        )
 
-                    logger.debug(
-                        "do_auth %s different_auth: %s",
-                        event.event_id, e.event_id
-                    )
+                        yield self._handle_new_event(
+                            origin, ev, auth_events=auth
+                        )
 
-                    yield self._handle_new_event(
-                        origin, ev, auth_events=auth
-                    )
+                        if ev.event_id in event_auth_events:
+                            auth_events[(ev.type, ev.state_key)] = ev
+                    except AuthError:
+                        pass
 
-                    if ev.event_id in event_auth_events:
-                        auth_events[(ev.type, ev.state_key)] = ev
-                except AuthError:
-                    pass
+            except:
+                # FIXME:
+                logger.exception("Failed to query auth chain")
 
             # 4. Look at rejects and their proofs.
             # TODO.
@@ -983,7 +1022,7 @@ class FederationHandler(BaseHandler):
             if reason is None:
                 # FIXME: ERRR?!
                 logger.warn("Could not find reason for %s", e.event_id)
-                raise RuntimeError("")
+                raise RuntimeError("Could not find reason for %s" % e.event_id)
 
             reason_map[e.event_id] = reason
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6fbd2af4ab..3f51f38f18 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,7 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import RoomError
+from synapse.api.errors import RoomError, SynapseError
 from synapse.streams.config import PaginationConfig
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
@@ -372,10 +372,17 @@ class MessageHandler(BaseHandler):
                     as_event=True,
                 )
                 presence.append(member_presence)
-            except Exception:
-                logger.exception(
-                    "Failed to get member presence of %r", m.user_id
-                )
+            except SynapseError as e:
+                if e.code == 404:
+                    # FIXME: We are doing this as a warn since this gets hit a
+                    # lot and spams the logs. Why is this happening?
+                    logger.warn(
+                        "Failed to get member presence of %r", m.user_id
+                    )
+                else:
+                    logger.exception(
+                        "Failed to get member presence of %r", m.user_id
+                    )
 
         time_now = self.clock.time_msec()
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 23821d321f..0369b907a5 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -389,8 +389,6 @@ class RoomMemberHandler(BaseHandler):
         if not hosts:
             raise SynapseError(404, "No known servers")
 
-        host = hosts[0]
-
         # If event doesn't include a display name, add one.
         yield self.distributor.fire(
             "collect_presencelike_data", joinee, content
@@ -407,12 +405,12 @@ class RoomMemberHandler(BaseHandler):
         })
         event, context = yield self._create_new_client_event(builder)
 
-        yield self._do_join(event, context, room_host=host, do_auth=True)
+        yield self._do_join(event, context, room_hosts=hosts, do_auth=True)
 
         defer.returnValue({"room_id": room_id})
 
     @defer.inlineCallbacks
-    def _do_join(self, event, context, room_host=None, do_auth=True):
+    def _do_join(self, event, context, room_hosts=None, do_auth=True):
         joinee = UserID.from_string(event.state_key)
         # room_id = RoomID.from_string(event.room_id, self.hs)
         room_id = event.room_id
@@ -441,7 +439,7 @@ class RoomMemberHandler(BaseHandler):
 
         if is_host_in_room:
             should_do_dance = False
-        elif room_host:  # TODO: Shouldn't this be remote_room_host?
+        elif room_hosts:  # TODO: Shouldn't this be remote_room_host?
             should_do_dance = True
         else:
             # TODO(markjh): get prev_state from snapshot
@@ -453,7 +451,7 @@ class RoomMemberHandler(BaseHandler):
                 inviter = UserID.from_string(prev_state.user_id)
 
                 should_do_dance = not self.hs.is_mine(inviter)
-                room_host = inviter.domain
+                room_hosts = [inviter.domain]
             else:
                 # return the same error as join_room_alias does
                 raise SynapseError(404, "No known servers")
@@ -461,7 +459,7 @@ class RoomMemberHandler(BaseHandler):
         if should_do_dance:
             handler = self.hs.get_handlers().federation_handler
             yield handler.do_invite_join(
-                room_host,
+                room_hosts,
                 room_id,
                 event.user_id,
                 event.get_dict()["content"],  # FIXME To get a non-frozen dict
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index c7bf1b47b8..8559d06b7f 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -27,7 +27,9 @@ from synapse.util.logcontext import PreserveLoggingContext
 
 from syutil.jsonutil import encode_canonical_json
 
-from synapse.api.errors import CodeMessageException, SynapseError, Codes
+from synapse.api.errors import (
+    SynapseError, Codes, HttpResponseException,
+)
 
 from syutil.crypto.jsonsign import sign_json
 
@@ -163,13 +165,12 @@ class MatrixFederationHttpClient(object):
         )
 
         if 200 <= response.code < 300:
-            # We need to update the transactions table to say it was sent?
             pass
         else:
             # :'(
             # Update transactions table?
-            raise CodeMessageException(
-                response.code, response.phrase
+            raise HttpResponseException(
+                response.code, response.phrase, response
             )
 
         defer.returnValue(response)
@@ -238,11 +239,20 @@ class MatrixFederationHttpClient(object):
             headers_dict={"Content-Type": ["application/json"]},
         )
 
+        if 200 <= response.code < 300:
+            # We need to update the transactions table to say it was sent?
+            c_type = response.headers.getRawHeaders("Content-Type")
+
+            if "application/json" not in c_type:
+                raise RuntimeError(
+                    "Content-Type not application/json"
+                )
+
         logger.debug("Getting resp body")
         body = yield readBody(response)
         logger.debug("Got resp body")
 
-        defer.returnValue((response.code, body))
+        defer.returnValue(json.loads(body))
 
     @defer.inlineCallbacks
     def post_json(self, destination, path, data={}):
@@ -275,11 +285,20 @@ class MatrixFederationHttpClient(object):
             headers_dict={"Content-Type": ["application/json"]},
         )
 
+        if 200 <= response.code < 300:
+            # We need to update the transactions table to say it was sent?
+            c_type = response.headers.getRawHeaders("Content-Type")
+
+            if "application/json" not in c_type:
+                raise RuntimeError(
+                    "Content-Type not application/json"
+                )
+
         logger.debug("Getting resp body")
         body = yield readBody(response)
         logger.debug("Got resp body")
 
-        defer.returnValue((response.code, body))
+        defer.returnValue(json.loads(body))
 
     @defer.inlineCallbacks
     def get_json(self, destination, path, args={}, retry_on_dns_fail=True):
@@ -321,7 +340,18 @@ class MatrixFederationHttpClient(object):
             retry_on_dns_fail=retry_on_dns_fail
         )
 
+        if 200 <= response.code < 300:
+            # We need to update the transactions table to say it was sent?
+            c_type = response.headers.getRawHeaders("Content-Type")
+
+            if "application/json" not in c_type:
+                raise RuntimeError(
+                    "Content-Type not application/json"
+                )
+
+        logger.debug("Getting resp body")
         body = yield readBody(response)
+        logger.debug("Got resp body")
 
         defer.returnValue(json.loads(body))
 
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 28e5dae81d..8c6f0a6571 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -37,14 +37,14 @@ class Pusher(object):
 
     INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
 
-    def __init__(self, _hs, instance_handle, user_name, app_id,
+    def __init__(self, _hs, profile_tag, user_name, app_id,
                  app_display_name, device_display_name, pushkey, pushkey_ts,
                  data, last_token, last_success, failing_since):
         self.hs = _hs
         self.evStreamHandler = self.hs.get_handlers().event_stream_handler
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
-        self.instance_handle = instance_handle
+        self.profile_tag = profile_tag
         self.user_name = user_name
         self.app_id = app_id
         self.app_display_name = app_display_name
@@ -147,9 +147,9 @@ class Pusher(object):
                 return False
             return fnmatch.fnmatch(val.upper(), pat.upper())
         elif condition['kind'] == 'device':
-            if 'instance_handle' not in condition:
+            if 'profile_tag' not in condition:
                 return True
-            return condition['instance_handle'] == self.instance_handle
+            return condition['profile_tag'] == self.profile_tag
         elif condition['kind'] == 'contains_display_name':
             # This is special because display names can be different
             # between rooms and so you can't really hard code it in a rule.
@@ -400,8 +400,8 @@ def _tweaks_for_actions(actions):
     for a in actions:
         if not isinstance(a, dict):
             continue
-        if 'set_sound' in a:
-            tweaks['sound'] = a['set_sound']
+        if 'set_tweak' in a and 'value' in a:
+            tweaks[a['set_tweak']] = a['value']
     return tweaks
 
 
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 382de118e0..376d1d4d33 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -38,7 +38,8 @@ def make_base_rules(user_name):
             'actions': [
                 'notify',
                 {
-                    'set_sound': 'default'
+                    'set_tweak': 'sound',
+                    'value': 'default'
                 }
             ]
         }
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 7c6953c989..5788db4eba 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -24,12 +24,12 @@ logger = logging.getLogger(__name__)
 
 
 class HttpPusher(Pusher):
-    def __init__(self, _hs, instance_handle, user_name, app_id,
+    def __init__(self, _hs, profile_tag, user_name, app_id,
                  app_display_name, device_display_name, pushkey, pushkey_ts,
                  data, last_token, last_success, failing_since):
         super(HttpPusher, self).__init__(
             _hs,
-            instance_handle,
+            profile_tag,
             user_name,
             app_id,
             app_display_name,
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 4892c21e7b..5a525befd7 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -55,7 +55,7 @@ class PusherPool:
         self._start_pushers(pushers)
 
     @defer.inlineCallbacks
-    def add_pusher(self, user_name, instance_handle, kind, app_id,
+    def add_pusher(self, user_name, profile_tag, kind, app_id,
                    app_display_name, device_display_name, pushkey, lang, data):
         # we try to create the pusher just to validate the config: it
         # will then get pulled out of the database,
@@ -64,7 +64,7 @@ class PusherPool:
         self._create_pusher({
             "user_name": user_name,
             "kind": kind,
-            "instance_handle": instance_handle,
+            "profile_tag": profile_tag,
             "app_id": app_id,
             "app_display_name": app_display_name,
             "device_display_name": device_display_name,
@@ -77,18 +77,18 @@ class PusherPool:
             "failing_since": None
         })
         yield self._add_pusher_to_store(
-            user_name, instance_handle, kind, app_id,
+            user_name, profile_tag, kind, app_id,
             app_display_name, device_display_name,
             pushkey, lang, data
         )
 
     @defer.inlineCallbacks
-    def _add_pusher_to_store(self, user_name, instance_handle, kind, app_id,
+    def _add_pusher_to_store(self, user_name, profile_tag, kind, app_id,
                              app_display_name, device_display_name,
                              pushkey, lang, data):
         yield self.store.add_pusher(
             user_name=user_name,
-            instance_handle=instance_handle,
+            profile_tag=profile_tag,
             kind=kind,
             app_id=app_id,
             app_display_name=app_display_name,
@@ -104,7 +104,7 @@ class PusherPool:
         if pusherdict['kind'] == 'http':
             return HttpPusher(
                 self.hs,
-                instance_handle=pusherdict['instance_handle'],
+                profile_tag=pusherdict['profile_tag'],
                 user_name=pusherdict['user_name'],
                 app_id=pusherdict['app_id'],
                 app_display_name=pusherdict['app_display_name'],
diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py
index faa7919fbb..5582f33c8e 100644
--- a/synapse/rest/client/v1/push_rule.py
+++ b/synapse/rest/client/v1/push_rule.py
@@ -73,7 +73,7 @@ class PushRuleRestServlet(ClientV1RestServlet):
             'rule_id': rule_id
         }
         if device:
-            spec['device'] = device
+            spec['profile_tag'] = device
         return spec
 
     def rule_tuple_from_request_object(self, rule_template, rule_id, req_obj, device=None):
@@ -112,7 +112,7 @@ class PushRuleRestServlet(ClientV1RestServlet):
         if device:
             conditions.append({
                 'kind': 'device',
-                'instance_handle': device
+                'profile_tag': device
             })
 
         if 'actions' not in req_obj:
@@ -188,15 +188,15 @@ class PushRuleRestServlet(ClientV1RestServlet):
 
         user, _ = yield self.auth.get_user_by_req(request)
 
-        if 'device' in spec:
+        if 'profile_tag' in spec:
             rules = yield self.hs.get_datastore().get_push_rules_for_user_name(
                 user.to_string()
             )
 
             for r in rules:
                 conditions = json.loads(r['conditions'])
-                ih = _instance_handle_from_conditions(conditions)
-                if ih == spec['device'] and r['priority_class'] == priority_class:
+                pt = _profile_tag_from_conditions(conditions)
+                if pt == spec['profile_tag'] and r['priority_class'] == priority_class:
                     yield self.hs.get_datastore().delete_push_rule(
                         user.to_string(), spec['rule_id']
                     )
@@ -239,19 +239,19 @@ class PushRuleRestServlet(ClientV1RestServlet):
 
             if r['priority_class'] > PushRuleRestServlet.PRIORITY_CLASS_MAP['override']:
                 # per-device rule
-                instance_handle = _instance_handle_from_conditions(r["conditions"])
+                profile_tag = _profile_tag_from_conditions(r["conditions"])
                 r = _strip_device_condition(r)
-                if not instance_handle:
+                if not profile_tag:
                     continue
-                if instance_handle not in rules['device']:
-                    rules['device'][instance_handle] = {}
-                    rules['device'][instance_handle] = (
+                if profile_tag not in rules['device']:
+                    rules['device'][profile_tag] = {}
+                    rules['device'][profile_tag] = (
                         _add_empty_priority_class_arrays(
-                            rules['device'][instance_handle]
+                            rules['device'][profile_tag]
                         )
                     )
 
-                rulearray = rules['device'][instance_handle][template_name]
+                rulearray = rules['device'][profile_tag][template_name]
             else:
                 rulearray = rules['global'][template_name]
 
@@ -282,13 +282,13 @@ class PushRuleRestServlet(ClientV1RestServlet):
             if path[0] == '':
                 defer.returnValue((200, rules['device']))
 
-            instance_handle = path[0]
+            profile_tag = path[0]
             path = path[1:]
-            if instance_handle not in rules['device']:
+            if profile_tag not in rules['device']:
                 ret = {}
                 ret = _add_empty_priority_class_arrays(ret)
                 defer.returnValue((200, ret))
-            ruleset = rules['device'][instance_handle]
+            ruleset = rules['device'][profile_tag]
             result = _filter_ruleset_with_path(ruleset, path)
             defer.returnValue((200, result))
         else:
@@ -304,14 +304,14 @@ def _add_empty_priority_class_arrays(d):
     return d
 
 
-def _instance_handle_from_conditions(conditions):
+def _profile_tag_from_conditions(conditions):
     """
-    Given a list of conditions, return the instance handle of the
+    Given a list of conditions, return the profile tag of the
     device rule if there is one
     """
     for c in conditions:
         if c['kind'] == 'device':
-            return c['instance_handle']
+            return c['profile_tag']
     return None
 
 
diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py
index 353a4a6589..e10d2576d2 100644
--- a/synapse/rest/client/v1/pusher.py
+++ b/synapse/rest/client/v1/pusher.py
@@ -41,7 +41,7 @@ class PusherRestServlet(ClientV1RestServlet):
             )
             defer.returnValue((200, {}))
 
-        reqd = ['instance_handle', 'kind', 'app_id', 'app_display_name',
+        reqd = ['profile_tag', 'kind', 'app_id', 'app_display_name',
                 'device_display_name', 'pushkey', 'lang', 'data']
         missing = []
         for i in reqd:
@@ -54,7 +54,7 @@ class PusherRestServlet(ClientV1RestServlet):
         try:
             yield pusher_pool.add_pusher(
                 user_name=user.to_string(),
-                instance_handle=content['instance_handle'],
+                profile_tag=content['profile_tag'],
                 kind=content['kind'],
                 app_id=content['app_id'],
                 app_display_name=content['app_display_name'],
diff --git a/synapse/state.py b/synapse/state.py
index 8a056ee955..695a5e7ac4 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -37,7 +37,10 @@ def _get_state_key_from_event(event):
 KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key"))
 
 
-AuthEventTypes = (EventTypes.Create, EventTypes.Member, EventTypes.PowerLevels,)
+AuthEventTypes = (
+    EventTypes.Create, EventTypes.Member, EventTypes.PowerLevels,
+    EventTypes.JoinRules,
+)
 
 
 class StateHandler(object):
@@ -100,7 +103,9 @@ class StateHandler(object):
             context.state_group = None
 
             if hasattr(event, "auth_events") and event.auth_events:
-                auth_ids = zip(*event.auth_events)[0]
+                auth_ids = self.hs.get_auth().compute_auth_events(
+                    event, context.current_state
+                )
                 context.auth_events = {
                     k: v
                     for k, v in context.current_state.items()
@@ -146,7 +151,9 @@ class StateHandler(object):
                 event.unsigned["replaces_state"] = replaces.event_id
 
         if hasattr(event, "auth_events") and event.auth_events:
-            auth_ids = zip(*event.auth_events)[0]
+            auth_ids = self.hs.get_auth().compute_auth_events(
+                event, context.current_state
+            )
             context.auth_events = {
                 k: v
                 for k, v in context.current_state.items()
@@ -259,6 +266,15 @@ class StateHandler(object):
         auth_events.update(resolved_state)
 
         for key, events in conflicted_state.items():
+            if key[0] == EventTypes.JoinRules:
+                resolved_state[key] = self._resolve_auth_events(
+                    events,
+                    auth_events
+                )
+
+        auth_events.update(resolved_state)
+
+        for key, events in conflicted_state.items():
             if key[0] == EventTypes.Member:
                 resolved_state[key] = self._resolve_auth_events(
                     events,
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 6ff0093136..2225be96be 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -131,21 +131,144 @@ class DataStore(RoomMemberStore, RoomStore,
             pass
 
     @defer.inlineCallbacks
-    def get_event(self, event_id, allow_none=False):
-        events = yield self._get_events([event_id])
+    def get_event(self, event_id, check_redacted=True,
+                  get_prev_content=False, allow_rejected=False,
+                  allow_none=False):
+        """Get an event from the database by event_id.
+
+        Args:
+            event_id (str): The event_id of the event to fetch
+            check_redacted (bool): If True, check if event has been redacted
+                and redact it.
+            get_prev_content (bool): If True and event is a state event,
+                include the previous states content in the unsigned field.
+            allow_rejected (bool): If True return rejected events.
+            allow_none (bool): If True, return None if no event found, if
+                False throw an exception.
 
-        if not events:
-            if allow_none:
-                defer.returnValue(None)
-            else:
-                raise RuntimeError("Could not find event %s" % (event_id,))
+        Returns:
+            Deferred : A FrozenEvent.
+        """
+        event = yield self.runInteraction(
+            "get_event", self._get_event_txn,
+            event_id,
+            check_redacted=check_redacted,
+            get_prev_content=get_prev_content,
+            allow_rejected=allow_rejected,
+        )
 
-        defer.returnValue(events[0])
+        if not event and not allow_none:
+            raise RuntimeError("Could not find event %s" % (event_id,))
+
+        defer.returnValue(event)
 
     @log_function
     def _persist_event_txn(self, txn, event, context, backfilled,
                            stream_ordering=None, is_new_state=True,
                            current_state=None):
+
+        # We purposefully do this first since if we include a `current_state`
+        # key, we *want* to update the `current_state_events` table
+        if current_state:
+            txn.execute(
+                "DELETE FROM current_state_events WHERE room_id = ?",
+                (event.room_id,)
+            )
+
+            for s in current_state:
+                self._simple_insert_txn(
+                    txn,
+                    "current_state_events",
+                    {
+                        "event_id": s.event_id,
+                        "room_id": s.room_id,
+                        "type": s.type,
+                        "state_key": s.state_key,
+                    },
+                    or_replace=True,
+                )
+
+        if event.is_state() and is_new_state:
+            if not backfilled and not context.rejected:
+                self._simple_insert_txn(
+                    txn,
+                    table="state_forward_extremities",
+                    values={
+                        "event_id": event.event_id,
+                        "room_id": event.room_id,
+                        "type": event.type,
+                        "state_key": event.state_key,
+                    },
+                    or_replace=True,
+                )
+
+                for prev_state_id, _ in event.prev_state:
+                    self._simple_delete_txn(
+                        txn,
+                        table="state_forward_extremities",
+                        keyvalues={
+                            "event_id": prev_state_id,
+                        }
+                    )
+
+        outlier = event.internal_metadata.is_outlier()
+
+        if not outlier:
+            self._store_state_groups_txn(txn, event, context)
+
+            self._update_min_depth_for_room_txn(
+                txn,
+                event.room_id,
+                event.depth
+            )
+
+        self._handle_prev_events(
+            txn,
+            outlier=outlier,
+            event_id=event.event_id,
+            prev_events=event.prev_events,
+            room_id=event.room_id,
+        )
+
+        have_persisted = self._simple_select_one_onecol_txn(
+            txn,
+            table="event_json",
+            keyvalues={"event_id": event.event_id},
+            retcol="event_id",
+            allow_none=True,
+        )
+
+        metadata_json = encode_canonical_json(
+            event.internal_metadata.get_dict()
+        )
+
+        # If we have already persisted this event, we don't need to do any
+        # more processing.
+        # The processing above must be done on every call to persist event,
+        # since they might not have happened on previous calls. For example,
+        # if we are persisting an event that we had persisted as an outlier,
+        # but is no longer one.
+        if have_persisted:
+            if not outlier:
+                sql = (
+                    "UPDATE event_json SET internal_metadata = ?"
+                    " WHERE event_id = ?"
+                )
+                txn.execute(
+                    sql,
+                    (metadata_json.decode("UTF-8"), event.event_id,)
+                )
+
+                sql = (
+                    "UPDATE events SET outlier = 0"
+                    " WHERE event_id = ?"
+                )
+                txn.execute(
+                    sql,
+                    (event.event_id,)
+                )
+            return
+
         if event.type == EventTypes.Member:
             self._store_room_member_txn(txn, event)
         elif event.type == EventTypes.Feedback:
@@ -157,8 +280,6 @@ class DataStore(RoomMemberStore, RoomStore,
         elif event.type == EventTypes.Redaction:
             self._store_redaction(txn, event)
 
-        outlier = event.internal_metadata.is_outlier()
-
         event_dict = {
             k: v
             for k, v in event.get_dict().items()
@@ -168,10 +289,6 @@ class DataStore(RoomMemberStore, RoomStore,
             ]
         }
 
-        metadata_json = encode_canonical_json(
-            event.internal_metadata.get_dict()
-        )
-
         self._simple_insert_txn(
             txn,
             table="event_json",
@@ -227,41 +344,10 @@ class DataStore(RoomMemberStore, RoomStore,
             )
             raise _RollbackButIsFineException("_persist_event")
 
-        self._handle_prev_events(
-            txn,
-            outlier=outlier,
-            event_id=event.event_id,
-            prev_events=event.prev_events,
-            room_id=event.room_id,
-        )
-
-        if not outlier:
-            self._store_state_groups_txn(txn, event, context)
-
         if context.rejected:
             self._store_rejections_txn(txn, event.event_id, context.rejected)
 
-        if current_state:
-            txn.execute(
-                "DELETE FROM current_state_events WHERE room_id = ?",
-                (event.room_id,)
-            )
-
-            for s in current_state:
-                self._simple_insert_txn(
-                    txn,
-                    "current_state_events",
-                    {
-                        "event_id": s.event_id,
-                        "room_id": s.room_id,
-                        "type": s.type,
-                        "state_key": s.state_key,
-                    },
-                    or_replace=True,
-                )
-
-        is_state = hasattr(event, "state_key") and event.state_key is not None
-        if is_state:
+        if event.is_state():
             vals = {
                 "event_id": event.event_id,
                 "room_id": event.room_id,
@@ -269,6 +355,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 "state_key": event.state_key,
             }
 
+            # TODO: How does this work with backfilling?
             if hasattr(event, "replaces_state"):
                 vals["prev_state"] = event.replaces_state
 
@@ -305,28 +392,6 @@ class DataStore(RoomMemberStore, RoomStore,
                     or_ignore=True,
                 )
 
-            if not backfilled and not context.rejected:
-                self._simple_insert_txn(
-                    txn,
-                    table="state_forward_extremities",
-                    values={
-                        "event_id": event.event_id,
-                        "room_id": event.room_id,
-                        "type": event.type,
-                        "state_key": event.state_key,
-                    },
-                    or_replace=True,
-                )
-
-                for prev_state_id, _ in event.prev_state:
-                    self._simple_delete_txn(
-                        txn,
-                        table="state_forward_extremities",
-                        keyvalues={
-                            "event_id": prev_state_id,
-                        }
-                    )
-
         for hash_alg, hash_base64 in event.hashes.items():
             hash_bytes = decode_base64(hash_base64)
             self._store_event_content_hash_txn(
@@ -357,13 +422,6 @@ class DataStore(RoomMemberStore, RoomStore,
             txn, event.event_id, ref_alg, ref_hash_bytes
         )
 
-        if not outlier:
-            self._update_min_depth_for_room_txn(
-                txn,
-                event.room_id,
-                event.depth
-            )
-
     def _store_redaction(self, txn, event):
         txn.execute(
             "INSERT OR IGNORE INTO redactions "
@@ -480,6 +538,9 @@ class DataStore(RoomMemberStore, RoomStore,
             the rejected reason string if we rejected the event, else maps to
             None.
         """
+        if not event_ids:
+            return defer.succeed({})
+
         def f(txn):
             sql = (
                 "SELECT e.event_id, reason FROM events as e "
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index f253c9e2c3..e2a662a6c7 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -29,7 +29,7 @@ class PusherStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
         sql = (
-            "SELECT id, user_name, kind, instance_handle, app_id,"
+            "SELECT id, user_name, kind, profile_tag, app_id,"
             "app_display_name, device_display_name, pushkey, ts, data, "
             "last_token, last_success, failing_since "
             "FROM pushers "
@@ -45,7 +45,7 @@ class PusherStore(SQLBaseStore):
                 "id": r[0],
                 "user_name": r[1],
                 "kind": r[2],
-                "instance_handle": r[3],
+                "profile_tag": r[3],
                 "app_id": r[4],
                 "app_display_name": r[5],
                 "device_display_name": r[6],
@@ -64,7 +64,7 @@ class PusherStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_all_pushers(self):
         sql = (
-            "SELECT id, user_name, kind, instance_handle, app_id,"
+            "SELECT id, user_name, kind, profile_tag, app_id,"
             "app_display_name, device_display_name, pushkey, ts, data, "
             "last_token, last_success, failing_since "
             "FROM pushers"
@@ -77,7 +77,7 @@ class PusherStore(SQLBaseStore):
                 "id": r[0],
                 "user_name": r[1],
                 "kind": r[2],
-                "instance_handle": r[3],
+                "profile_tag": r[3],
                 "app_id": r[4],
                 "app_display_name": r[5],
                 "device_display_name": r[6],
@@ -94,7 +94,7 @@ class PusherStore(SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def add_pusher(self, user_name, instance_handle, kind, app_id,
+    def add_pusher(self, user_name, profile_tag, kind, app_id,
                    app_display_name, device_display_name,
                    pushkey, pushkey_ts, lang, data):
         try:
@@ -107,7 +107,7 @@ class PusherStore(SQLBaseStore):
                 dict(
                     user_name=user_name,
                     kind=kind,
-                    instance_handle=instance_handle,
+                    profile_tag=profile_tag,
                     app_display_name=app_display_name,
                     device_display_name=device_display_name,
                     ts=pushkey_ts,
@@ -158,7 +158,7 @@ class PushersTable(Table):
         "id",
         "user_name",
         "kind",
-        "instance_handle",
+        "profile_tag",
         "app_id",
         "app_display_name",
         "device_display_name",
diff --git a/synapse/storage/schema/delta/v12.sql b/synapse/storage/schema/delta/v12.sql
index a6867cba62..16c2258ca4 100644
--- a/synapse/storage/schema/delta/v12.sql
+++ b/synapse/storage/schema/delta/v12.sql
@@ -24,7 +24,7 @@ CREATE TABLE IF NOT EXISTS rejections(
 CREATE TABLE IF NOT EXISTS pushers (
   id INTEGER PRIMARY KEY AUTOINCREMENT,
   user_name TEXT NOT NULL,
-  instance_handle varchar(32) NOT NULL,
+  profile_tag varchar(32) NOT NULL,
   kind varchar(8) NOT NULL,
   app_id varchar(64) NOT NULL,
   app_display_name varchar(64) NOT NULL,
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
index 8c4dfd5c1b..3735b11547 100644
--- a/synapse/storage/schema/pusher.sql
+++ b/synapse/storage/schema/pusher.sql
@@ -16,7 +16,7 @@
 CREATE TABLE IF NOT EXISTS pushers (
   id INTEGER PRIMARY KEY AUTOINCREMENT,
   user_name TEXT NOT NULL,
-  instance_handle varchar(32) NOT NULL,
+  profile_tag varchar(32) NOT NULL,
   kind varchar(8) NOT NULL,
   app_id varchar(64) NOT NULL,
   app_display_name varchar(64) NOT NULL,
diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py
index 44dbce6bea..4270481139 100644
--- a/tests/handlers/test_federation.py
+++ b/tests/handlers/test_federation.py
@@ -91,7 +91,10 @@ class FederationTestCase(unittest.TestCase):
         self.datastore.persist_event.return_value = defer.succeed(None)
         self.datastore.get_room.return_value = defer.succeed(True)
         self.auth.check_host_in_room.return_value = defer.succeed(True)
-        self.datastore.have_events.return_value = defer.succeed({})
+
+        def have_events(event_ids):
+            return defer.succeed({})
+        self.datastore.have_events.side_effect = have_events
 
         def annotate(ev, old_state=None):
             context = Mock()