summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/app/homeserver.py7
-rw-r--r--synapse/appservice/__init__.py6
-rw-r--r--synapse/config/metrics.py6
-rw-r--r--synapse/federation/federation_base.py1
-rw-r--r--synapse/federation/federation_client.py36
-rw-r--r--synapse/federation/transaction_queue.py31
-rw-r--r--synapse/federation/transport/client.py6
-rw-r--r--synapse/federation/transport/server.py8
-rw-r--r--synapse/handlers/appservice.py7
-rw-r--r--synapse/handlers/federation.py152
-rw-r--r--synapse/handlers/presence.py163
-rw-r--r--synapse/http/matrixfederationclient.py16
-rw-r--r--synapse/storage/event_federation.py80
13 files changed, 346 insertions, 173 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index fa43211415..f3513abb55 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -277,9 +277,12 @@ class SynapseHomeServer(HomeServer):
                     config,
                     metrics_resource,
                 ),
-                interface="127.0.0.1",
+                interface=config.metrics_bind_host,
+            )
+            logger.info(
+                "Metrics now running on %s port %d",
+                config.metrics_bind_host, config.metrics_port,
             )
-            logger.info("Metrics now running on 127.0.0.1 port %d", config.metrics_port)
 
     def run_startup_checks(self, db_conn, database_engine):
         all_users_native = are_all_users_on_domain(
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 63a18b802b..e3ca45de83 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -148,8 +148,8 @@ class ApplicationService(object):
                 and self.is_interested_in_user(event.state_key)):
             return True
         # check joined member events
-        for member in member_list:
-            if self.is_interested_in_user(member.state_key):
+        for user_id in member_list:
+            if self.is_interested_in_user(user_id):
                 return True
         return False
 
@@ -173,7 +173,7 @@ class ApplicationService(object):
             restrict_to(str): The namespace to restrict regex tests to.
             aliases_for_event(list): A list of all the known room aliases for
             this event.
-            member_list(list): A list of all joined room members in this room.
+            member_list(list): A list of all joined user_ids in this room.
         Returns:
             bool: True if this service would like to know about this event.
         """
diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py
index 71a1b1d189..0cfb30ce7f 100644
--- a/synapse/config/metrics.py
+++ b/synapse/config/metrics.py
@@ -20,6 +20,7 @@ class MetricsConfig(Config):
     def read_config(self, config):
         self.enable_metrics = config["enable_metrics"]
         self.metrics_port = config.get("metrics_port")
+        self.metrics_bind_host = config.get("metrics_bind_host", "127.0.0.1")
 
     def default_config(self, config_dir_path, server_name):
         return """\
@@ -28,6 +29,9 @@ class MetricsConfig(Config):
         # Enable collection and rendering of performance metrics
         enable_metrics: False
 
-        # Separate port to accept metrics requests on (on localhost)
+        # Separate port to accept metrics requests on
         # metrics_port: 8081
+
+        # Which host to bind the metric listener to
+        # metrics_bind_host: 127.0.0.1
         """
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 5217d91aab..f0430b2cb1 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -80,6 +80,7 @@ class FederationBase(object):
                             destinations=[pdu.origin],
                             event_id=pdu.event_id,
                             outlier=outlier,
+                            timeout=10000,
                         )
 
                         if new_pdu:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 3a7bc0c9a7..d3b46b24c1 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -22,6 +22,7 @@ from .units import Edu
 from synapse.api.errors import (
     CodeMessageException, HttpResponseException, SynapseError,
 )
+from synapse.util import unwrapFirstError
 from synapse.util.expiringcache import ExpiringCache
 from synapse.util.logutils import log_function
 from synapse.events import FrozenEvent
@@ -164,16 +165,17 @@ class FederationClient(FederationBase):
             for p in transaction_data["pdus"]
         ]
 
-        for i, pdu in enumerate(pdus):
-            pdus[i] = yield self._check_sigs_and_hash(pdu)
-
-            # FIXME: We should handle signature failures more gracefully.
+        # FIXME: We should handle signature failures more gracefully.
+        pdus[:] = yield defer.gatherResults(
+            [self._check_sigs_and_hash(pdu) for pdu in pdus],
+            consumeErrors=True,
+        ).addErrback(unwrapFirstError)
 
         defer.returnValue(pdus)
 
     @defer.inlineCallbacks
     @log_function
-    def get_pdu(self, destinations, event_id, outlier=False):
+    def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
         """Requests the PDU with given origin and ID from the remote home
         servers.
 
@@ -189,6 +191,8 @@ class FederationClient(FederationBase):
             outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
                 it's from an arbitary point in the context as opposed to part
                 of the current block of PDUs. Defaults to `False`
+            timeout (int): How long to try (in ms) each destination for before
+                moving to the next destination. None indicates no timeout.
 
         Returns:
             Deferred: Results in the requested PDU.
@@ -212,7 +216,7 @@ class FederationClient(FederationBase):
 
                 with limiter:
                     transaction_data = yield self.transport_layer.get_event(
-                        destination, event_id
+                        destination, event_id, timeout=timeout,
                     )
 
                     logger.debug("transaction_data %r", transaction_data)
@@ -370,13 +374,17 @@ class FederationClient(FederationBase):
                     for p in content.get("auth_chain", [])
                 ]
 
-                signed_state = yield self._check_sigs_and_hash_and_fetch(
-                    destination, state, outlier=True
-                )
-
-                signed_auth = yield self._check_sigs_and_hash_and_fetch(
-                    destination, auth_chain, outlier=True
-                )
+                signed_state, signed_auth = yield defer.gatherResults(
+                    [
+                        self._check_sigs_and_hash_and_fetch(
+                            destination, state, outlier=True
+                        ),
+                        self._check_sigs_and_hash_and_fetch(
+                            destination, auth_chain, outlier=True
+                        )
+                    ],
+                    consumeErrors=True
+                ).addErrback(unwrapFirstError)
 
                 auth_chain.sort(key=lambda e: e.depth)
 
@@ -518,7 +526,7 @@ class FederationClient(FederationBase):
             # Are we missing any?
 
             seen_events = set(earliest_events_ids)
-            seen_events.update(e.event_id for e in signed_events)
+            seen_events.update(e.event_id for e in signed_events if e)
 
             missing_events = {}
             for e in itertools.chain(latest_events, signed_events):
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index ca04822fb3..32fa5e8c15 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -207,13 +207,13 @@ class TransactionQueue(object):
             # request at which point pending_pdus_by_dest just keeps growing.
             # we need application-layer timeouts of some flavour of these
             # requests
-            logger.info(
+            logger.debug(
                 "TX [%s] Transaction already in progress",
                 destination
             )
             return
 
-        logger.info("TX [%s] _attempt_new_transaction", destination)
+        logger.debug("TX [%s] _attempt_new_transaction", destination)
 
         # list of (pending_pdu, deferred, order)
         pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
@@ -221,11 +221,11 @@ class TransactionQueue(object):
         pending_failures = self.pending_failures_by_dest.pop(destination, [])
 
         if pending_pdus:
-            logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
-                        destination, len(pending_pdus))
+            logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
+                         destination, len(pending_pdus))
 
         if not pending_pdus and not pending_edus and not pending_failures:
-            logger.info("TX [%s] Nothing to send", destination)
+            logger.debug("TX [%s] Nothing to send", destination)
             return
 
         # Sort based on the order field
@@ -242,6 +242,8 @@ class TransactionQueue(object):
         try:
             self.pending_transactions[destination] = 1
 
+            txn_id = str(self._next_txn_id)
+
             limiter = yield get_retry_limiter(
                 destination,
                 self._clock,
@@ -249,9 +251,9 @@ class TransactionQueue(object):
             )
 
             logger.debug(
-                "TX [%s] Attempting new transaction"
+                "TX [%s] {%s} Attempting new transaction"
                 " (pdus: %d, edus: %d, failures: %d)",
-                destination,
+                destination, txn_id,
                 len(pending_pdus),
                 len(pending_edus),
                 len(pending_failures)
@@ -261,7 +263,7 @@ class TransactionQueue(object):
 
             transaction = Transaction.create_new(
                 origin_server_ts=int(self._clock.time_msec()),
-                transaction_id=str(self._next_txn_id),
+                transaction_id=txn_id,
                 origin=self.server_name,
                 destination=destination,
                 pdus=pdus,
@@ -275,9 +277,13 @@ class TransactionQueue(object):
 
             logger.debug("TX [%s] Persisted transaction", destination)
             logger.info(
-                "TX [%s] Sending transaction [%s]",
-                destination,
+                "TX [%s] {%s} Sending transaction [%s],"
+                " (PDUs: %d, EDUs: %d, failures: %d)",
+                destination, txn_id,
                 transaction.transaction_id,
+                len(pending_pdus),
+                len(pending_edus),
+                len(pending_failures),
             )
 
             with limiter:
@@ -313,7 +319,10 @@ class TransactionQueue(object):
                     code = e.code
                     response = e.response
 
-                logger.info("TX [%s] got %d response", destination, code)
+                logger.info(
+                    "TX [%s] {%s} got %d response",
+                    destination, txn_id, code
+                )
 
                 logger.debug("TX [%s] Sent transaction", destination)
                 logger.debug("TX [%s] Marking as delivered...", destination)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 80d03012b7..610a4c3163 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -50,13 +50,15 @@ class TransportLayerClient(object):
         )
 
     @log_function
-    def get_event(self, destination, event_id):
+    def get_event(self, destination, event_id, timeout=None):
         """ Requests the pdu with give id and origin from the given server.
 
         Args:
             destination (str): The host name of the remote home server we want
                 to get the state from.
             event_id (str): The id of the event being requested.
+            timeout (int): How long to try (in ms) the destination for before
+                giving up. None indicates no timeout.
 
         Returns:
             Deferred: Results in a dict received from the remote homeserver.
@@ -65,7 +67,7 @@ class TransportLayerClient(object):
                      destination, event_id)
 
         path = PREFIX + "/event/%s/" % (event_id, )
-        return self.client.get_json(destination, path=path)
+        return self.client.get_json(destination, path=path, timeout=timeout)
 
     @log_function
     def backfill(self, destination, room_id, event_tuples, limit):
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 2bfe0f3c9b..af87805f34 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -196,6 +196,14 @@ class FederationSendServlet(BaseFederationServlet):
                 transaction_id, str(transaction_data)
             )
 
+            logger.info(
+                "Received txn %s from %s. (PDUs: %d, EDUs: %d, failures: %d)",
+                transaction_id, origin,
+                len(transaction_data.get("pdus", [])),
+                len(transaction_data.get("edus", [])),
+                len(transaction_data.get("failures", [])),
+            )
+
             # We should ideally be getting this from the security layer.
             # origin = body["origin"]
 
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 355ab317df..8269482e47 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -15,7 +15,7 @@
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import EventTypes
 from synapse.appservice import ApplicationService
 from synapse.types import UserID
 
@@ -147,10 +147,7 @@ class ApplicationServicesHandler(object):
                 )
             # We need to know the members associated with this event.room_id,
             # if any.
-            member_list = yield self.store.get_room_members(
-                room_id=event.room_id,
-                membership=Membership.JOIN
-            )
+            member_list = yield self.store.get_users_in_room(event.room_id)
 
         services = yield self.store.get_app_services()
         interested_list = [
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d35d9f603c..46ce3699d7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -230,27 +230,65 @@ class FederationHandler(BaseHandler):
         if not extremities:
             extremities = yield self.store.get_oldest_events_in_room(room_id)
 
-        pdus = yield self.replication_layer.backfill(
+        events = yield self.replication_layer.backfill(
             dest,
             room_id,
-            limit,
+            limit=limit,
             extremities=extremities,
         )
 
-        events = []
+        event_map = {e.event_id: e for e in events}
 
-        for pdu in pdus:
-            event = pdu
+        event_ids = set(e.event_id for e in events)
 
-            # FIXME (erikj): Not sure this actually works :/
-            context = yield self.state_handler.compute_event_context(event)
+        edges = [
+            ev.event_id
+            for ev in events
+            if set(e_id for e_id, _ in ev.prev_events) - event_ids
+        ]
 
-            events.append((event, context))
+        # For each edge get the current state.
 
-            yield self.store.persist_event(
-                event,
-                context=context,
-                backfilled=True
+        auth_events = {}
+        events_to_state = {}
+        for e_id in edges:
+            state, auth = yield self.replication_layer.get_state_for_room(
+                destination=dest,
+                room_id=room_id,
+                event_id=e_id
+            )
+            auth_events.update({a.event_id: a for a in auth})
+            events_to_state[e_id] = state
+
+        yield defer.gatherResults(
+            [
+                self._handle_new_event(dest, a)
+                for a in auth_events.values()
+            ],
+            consumeErrors=True,
+        ).addErrback(unwrapFirstError)
+
+        yield defer.gatherResults(
+            [
+                self._handle_new_event(
+                    dest, event_map[e_id],
+                    state=events_to_state[e_id],
+                    backfilled=True,
+                )
+                for e_id in events_to_state
+            ],
+            consumeErrors=True
+        ).addErrback(unwrapFirstError)
+
+        events.sort(key=lambda e: e.depth)
+
+        for event in events:
+            if event in events_to_state:
+                continue
+
+            yield self._handle_new_event(
+                dest, event,
+                backfilled=True,
             )
 
         defer.returnValue(events)
@@ -347,7 +385,7 @@ class FederationHandler(BaseHandler):
                     logger.info(e.message)
                     continue
                 except Exception as e:
-                    logger.warn(
+                    logger.exception(
                         "Failed to backfill from %s because %s",
                         dom, e,
                     )
@@ -517,30 +555,14 @@ class FederationHandler(BaseHandler):
                 # FIXME
                 pass
 
-            for e in auth_chain:
-                e.internal_metadata.outlier = True
-
-                if e.event_id == event.event_id:
-                    continue
-
-                try:
-                    auth_ids = [e_id for e_id, _ in e.auth_events]
-                    auth = {
-                        (e.type, e.state_key): e for e in auth_chain
-                        if e.event_id in auth_ids
-                    }
-                    yield self._handle_new_event(
-                        origin, e, auth_events=auth
-                    )
-                except:
-                    logger.exception(
-                        "Failed to handle auth event %s",
-                        e.event_id,
-                    )
+            yield self._handle_auth_events(
+                origin, [e for e in auth_chain if e.event_id != event.event_id]
+            )
 
-            for e in state:
+            @defer.inlineCallbacks
+            def handle_state(e):
                 if e.event_id == event.event_id:
-                    continue
+                    return
 
                 e.internal_metadata.outlier = True
                 try:
@@ -558,6 +580,8 @@ class FederationHandler(BaseHandler):
                         e.event_id,
                     )
 
+            yield defer.DeferredList([handle_state(e) for e in state])
+
             auth_ids = [e_id for e_id, _ in event.auth_events]
             auth_events = {
                 (e.type, e.state_key): e for e in auth_chain
@@ -893,9 +917,12 @@ class FederationHandler(BaseHandler):
         # This is a hack to fix some old rooms where the initial join event
         # didn't reference the create event in its auth events.
         if event.type == EventTypes.Member and not event.auth_events:
-            if len(event.prev_events) == 1:
-                c = yield self.store.get_event(event.prev_events[0][0])
-                if c.type == EventTypes.Create:
+            if len(event.prev_events) == 1 and event.depth < 5:
+                c = yield self.store.get_event(
+                    event.prev_events[0][0],
+                    allow_none=True,
+                )
+                if c and c.type == EventTypes.Create:
                     auth_events[(c.type, c.state_key)] = c
 
         try:
@@ -1314,3 +1341,52 @@ class FederationHandler(BaseHandler):
             },
             "missing": [e.event_id for e in missing_locals],
         })
+
+    @defer.inlineCallbacks
+    def _handle_auth_events(self, origin, auth_events):
+        auth_ids_to_deferred = {}
+
+        def process_auth_ev(ev):
+            auth_ids = [e_id for e_id, _ in ev.auth_events]
+
+            prev_ds = [
+                auth_ids_to_deferred[i]
+                for i in auth_ids
+                if i in auth_ids_to_deferred
+            ]
+
+            d = defer.Deferred()
+
+            auth_ids_to_deferred[ev.event_id] = d
+
+            @defer.inlineCallbacks
+            def f(*_):
+                ev.internal_metadata.outlier = True
+
+                try:
+                    auth = {
+                        (e.type, e.state_key): e for e in auth_events
+                        if e.event_id in auth_ids
+                    }
+
+                    yield self._handle_new_event(
+                        origin, ev, auth_events=auth
+                    )
+                except:
+                    logger.exception(
+                        "Failed to handle auth event %s",
+                        ev.event_id,
+                    )
+
+                d.callback(None)
+
+            if prev_ds:
+                dx = defer.DeferredList(prev_ds)
+                dx.addBoth(f)
+            else:
+                f()
+
+        for e in auth_events:
+            process_auth_ev(e)
+
+        yield defer.DeferredList(auth_ids_to_deferred.values())
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 40794187b1..670c1d353f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -146,6 +146,10 @@ class PresenceHandler(BaseHandler):
         self._user_cachemap = {}
         self._user_cachemap_latest_serial = 0
 
+        # map room_ids to the latest presence serial for a member of that
+        # room
+        self._room_serials = {}
+
         metrics.register_callback(
             "userCachemap:size",
             lambda: len(self._user_cachemap),
@@ -297,13 +301,34 @@ class PresenceHandler(BaseHandler):
 
         self.changed_presencelike_data(user, {"last_active": now})
 
+    def get_joined_rooms_for_user(self, user):
+        """Get the list of rooms a user is joined to.
+
+        Args:
+            user(UserID): The user.
+        Returns:
+            A Deferred of a list of room id strings.
+        """
+        rm_handler = self.homeserver.get_handlers().room_member_handler
+        return rm_handler.get_joined_rooms_for_user(user)
+
+    def get_joined_users_for_room_id(self, room_id):
+        rm_handler = self.homeserver.get_handlers().room_member_handler
+        return rm_handler.get_room_members(room_id)
+
+    @defer.inlineCallbacks
     def changed_presencelike_data(self, user, state):
-        statuscache = self._get_or_make_usercache(user)
+        """Updates the presence state of a local user.
 
+        Args:
+            user(UserID): The user being updated.
+            state(dict): The new presence state for the user.
+        Returns:
+            A Deferred
+        """
         self._user_cachemap_latest_serial += 1
-        statuscache.update(state, serial=self._user_cachemap_latest_serial)
-
-        return self.push_presence(user, statuscache=statuscache)
+        statuscache = yield self.update_presence_cache(user, state)
+        yield self.push_presence(user, statuscache=statuscache)
 
     @log_function
     def started_user_eventstream(self, user):
@@ -326,13 +351,12 @@ class PresenceHandler(BaseHandler):
             room_id(str): The room id the user joined.
         """
         if self.hs.is_mine(user):
-            statuscache = self._get_or_make_usercache(user)
-
             # No actual update but we need to bump the serial anyway for the
             # event source
             self._user_cachemap_latest_serial += 1
-            statuscache.update({}, serial=self._user_cachemap_latest_serial)
-
+            statuscache = yield self.update_presence_cache(
+                user, room_ids=[room_id]
+            )
             self.push_update_to_local_and_remote(
                 observed_user=user,
                 room_ids=[room_id],
@@ -340,16 +364,17 @@ class PresenceHandler(BaseHandler):
             )
 
         # We also want to tell them about current presence of people.
-        rm_handler = self.homeserver.get_handlers().room_member_handler
-        curr_users = yield rm_handler.get_room_members(room_id)
+        curr_users = yield self.get_joined_users_for_room_id(room_id)
 
         for local_user in [c for c in curr_users if self.hs.is_mine(c)]:
-            statuscache = self._get_or_offline_usercache(local_user)
-            statuscache.update({}, serial=self._user_cachemap_latest_serial)
+            statuscache = yield self.update_presence_cache(
+                local_user, room_ids=[room_id], add_to_cache=False
+            )
+
             self.push_update_to_local_and_remote(
                 observed_user=local_user,
                 users_to_push=[user],
-                statuscache=self._get_or_offline_usercache(local_user),
+                statuscache=statuscache,
             )
 
     @defer.inlineCallbacks
@@ -546,8 +571,7 @@ class PresenceHandler(BaseHandler):
 
             # Also include people in all my rooms
 
-            rm_handler = self.homeserver.get_handlers().room_member_handler
-            room_ids = yield rm_handler.get_joined_rooms_for_user(user)
+            room_ids = yield self.get_joined_rooms_for_user(user)
 
         if state is None:
             state = yield self.store.get_presence_state(user.localpart)
@@ -747,8 +771,7 @@ class PresenceHandler(BaseHandler):
         # and also user is informed of server-forced pushes
         localusers.add(user)
 
-        rm_handler = self.homeserver.get_handlers().room_member_handler
-        room_ids = yield rm_handler.get_joined_rooms_for_user(user)
+        room_ids = yield self.get_joined_rooms_for_user(user)
 
         if not localusers and not room_ids:
             defer.returnValue(None)
@@ -793,8 +816,7 @@ class PresenceHandler(BaseHandler):
                     " | %d interested local observers %r", len(observers), observers
                 )
 
-            rm_handler = self.homeserver.get_handlers().room_member_handler
-            room_ids = yield rm_handler.get_joined_rooms_for_user(user)
+            room_ids = yield self.get_joined_rooms_for_user(user)
             if room_ids:
                 logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids)
 
@@ -813,10 +835,8 @@ class PresenceHandler(BaseHandler):
                     self.clock.time_msec() - state.pop("last_active_ago")
                 )
 
-            statuscache = self._get_or_make_usercache(user)
-
             self._user_cachemap_latest_serial += 1
-            statuscache.update(state, serial=self._user_cachemap_latest_serial)
+            yield self.update_presence_cache(user, state, room_ids=room_ids)
 
             if not observers and not room_ids:
                 logger.debug(" | no interested observers or room IDs")
@@ -875,6 +895,35 @@ class PresenceHandler(BaseHandler):
         yield defer.DeferredList(deferreds, consumeErrors=True)
 
     @defer.inlineCallbacks
+    def update_presence_cache(self, user, state={}, room_ids=None,
+                              add_to_cache=True):
+        """Update the presence cache for a user with a new state and bump the
+        serial to the latest value.
+
+        Args:
+            user(UserID): The user being updated
+            state(dict): The presence state being updated
+            room_ids(None or list of str): A list of room_ids to update. If
+                room_ids is None then fetch the list of room_ids the user is
+                joined to.
+            add_to_cache: Whether to add an entry to the presence cache if the
+                user isn't already in the cache.
+        Returns:
+            A Deferred UserPresenceCache for the user being updated.
+        """
+        if room_ids is None:
+            room_ids = yield self.get_joined_rooms_for_user(user)
+
+        for room_id in room_ids:
+            self._room_serials[room_id] = self._user_cachemap_latest_serial
+        if add_to_cache:
+            statuscache = self._get_or_make_usercache(user)
+        else:
+            statuscache = self._get_or_offline_usercache(user)
+        statuscache.update(state, serial=self._user_cachemap_latest_serial)
+        defer.returnValue(statuscache)
+
+    @defer.inlineCallbacks
     def push_update_to_local_and_remote(self, observed_user, statuscache,
                                         users_to_push=[], room_ids=[],
                                         remote_domains=[]):
@@ -997,38 +1046,10 @@ class PresenceEventSource(object):
         self.clock = hs.get_clock()
 
     @defer.inlineCallbacks
-    def is_visible(self, observer_user, observed_user):
-        if observer_user == observed_user:
-            defer.returnValue(True)
-
-        presence = self.hs.get_handlers().presence_handler
-
-        if (yield presence.store.user_rooms_intersect(
-                [u.to_string() for u in observer_user, observed_user])):
-            defer.returnValue(True)
-
-        if self.hs.is_mine(observed_user):
-            pushmap = presence._local_pushmap
-
-            defer.returnValue(
-                observed_user.localpart in pushmap and
-                observer_user in pushmap[observed_user.localpart]
-            )
-        else:
-            recvmap = presence._remote_recvmap
-
-            defer.returnValue(
-                observed_user in recvmap and
-                observer_user in recvmap[observed_user]
-            )
-
-    @defer.inlineCallbacks
     @log_function
     def get_new_events_for_user(self, user, from_key, limit):
         from_key = int(from_key)
 
-        observer_user = user
-
         presence = self.hs.get_handlers().presence_handler
         cachemap = presence._user_cachemap
 
@@ -1037,17 +1058,27 @@ class PresenceEventSource(object):
         clock = self.clock
         latest_serial = 0
 
+        user_ids_to_check = {user}
+        presence_list = yield presence.store.get_presence_list(
+            user.localpart, accepted=True
+        )
+        if presence_list is not None:
+            user_ids_to_check |= set(
+                UserID.from_string(p["observed_user_id"]) for p in presence_list
+            )
+        room_ids = yield presence.get_joined_rooms_for_user(user)
+        for room_id in set(room_ids) & set(presence._room_serials):
+            if presence._room_serials[room_id] > from_key:
+                joined = yield presence.get_joined_users_for_room_id(room_id)
+                user_ids_to_check |= set(joined)
+
         updates = []
-        # TODO(paul): use a DeferredList ? How to limit concurrency.
-        for observed_user in cachemap.keys():
+        for observed_user in user_ids_to_check & set(cachemap):
             cached = cachemap[observed_user]
 
             if cached.serial <= from_key or cached.serial > max_serial:
                 continue
 
-            if not (yield self.is_visible(observer_user, observed_user)):
-                continue
-
             latest_serial = max(cached.serial, latest_serial)
             updates.append(cached.make_event(user=observed_user, clock=clock))
 
@@ -1084,8 +1115,6 @@ class PresenceEventSource(object):
     def get_pagination_rows(self, user, pagination_config, key):
         # TODO (erikj): Does this make sense? Ordering?
 
-        observer_user = user
-
         from_key = int(pagination_config.from_key)
 
         if pagination_config.to_key:
@@ -1096,14 +1125,26 @@ class PresenceEventSource(object):
         presence = self.hs.get_handlers().presence_handler
         cachemap = presence._user_cachemap
 
+        user_ids_to_check = {user}
+        presence_list = yield presence.store.get_presence_list(
+            user.localpart, accepted=True
+        )
+        if presence_list is not None:
+            user_ids_to_check |= set(
+                UserID.from_string(p["observed_user_id"]) for p in presence_list
+            )
+        room_ids = yield presence.get_joined_rooms_for_user(user)
+        for room_id in set(room_ids) & set(presence._room_serials):
+            if presence._room_serials[room_id] >= from_key:
+                joined = yield presence.get_joined_users_for_room_id(room_id)
+                user_ids_to_check |= set(joined)
+
         updates = []
-        # TODO(paul): use a DeferredList ? How to limit concurrency.
-        for observed_user in cachemap.keys():
+        for observed_user in user_ids_to_check & set(cachemap):
             if not (to_key < cachemap[observed_user].serial <= from_key):
                 continue
 
-            if (yield self.is_visible(observer_user, observed_user)):
-                updates.append((observed_user, cachemap[observed_user]))
+            updates.append((observed_user, cachemap[observed_user]))
 
         # TODO(paul): limit
 
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index c99d237c73..6f976d5ce8 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -110,7 +110,8 @@ class MatrixFederationHttpClient(object):
     @defer.inlineCallbacks
     def _create_request(self, destination, method, path_bytes,
                         body_callback, headers_dict={}, param_bytes=b"",
-                        query_bytes=b"", retry_on_dns_fail=True):
+                        query_bytes=b"", retry_on_dns_fail=True,
+                        timeout=None):
         """ Creates and sends a request to the given url
         """
         headers_dict[b"User-Agent"] = [self.version_string]
@@ -158,7 +159,7 @@ class MatrixFederationHttpClient(object):
 
                 response = yield self.clock.time_bound_deferred(
                     request_deferred,
-                    time_out=60,
+                    time_out=timeout/1000. if timeout else 60,
                 )
 
                 logger.debug("Got response to %s", method)
@@ -181,7 +182,7 @@ class MatrixFederationHttpClient(object):
                     _flatten_response_never_received(e),
                 )
 
-                if retries_left:
+                if retries_left and not timeout:
                     yield sleep(2 ** (5 - retries_left))
                     retries_left -= 1
                 else:
@@ -334,7 +335,8 @@ class MatrixFederationHttpClient(object):
         defer.returnValue(json.loads(body))
 
     @defer.inlineCallbacks
-    def get_json(self, destination, path, args={}, retry_on_dns_fail=True):
+    def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
+                 timeout=None):
         """ GETs some json from the given host homeserver and path
 
         Args:
@@ -343,6 +345,9 @@ class MatrixFederationHttpClient(object):
             path (str): The HTTP path.
             args (dict): A dictionary used to create query strings, defaults to
                 None.
+            timeout (int): How long to try (in ms) the destination for before
+                giving up. None indicates no timeout and that the request will
+                be retried.
         Returns:
             Deferred: Succeeds when we get *any* HTTP response.
 
@@ -370,7 +375,8 @@ class MatrixFederationHttpClient(object):
             path.encode("ascii"),
             query_bytes=query_bytes,
             body_callback=body_callback,
-            retry_on_dns_fail=retry_on_dns_fail
+            retry_on_dns_fail=retry_on_dns_fail,
+            timeout=timeout,
         )
 
         if 200 <= response.code < 300:
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 5d4b7843f3..1ba073884b 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -19,6 +19,7 @@ from ._base import SQLBaseStore, cached
 from syutil.base64util import encode_base64
 
 import logging
+from Queue import PriorityQueue, Empty
 
 
 logger = logging.getLogger(__name__)
@@ -330,12 +331,13 @@ class EventFederationStore(SQLBaseStore):
                 " WHERE event_id = ? AND room_id = ?"
                 " )"
                 " AND NOT EXISTS ("
-                " SELECT 1 FROM events WHERE event_id = ? AND room_id = ?"
+                " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+                " AND outlier = ?"
                 " )"
             )
 
             txn.executemany(query, [
-                (e_id, room_id, e_id, room_id, e_id, room_id, )
+                (e_id, room_id, e_id, room_id, e_id, room_id, False)
                 for e_id, _ in prev_events
             ])
 
@@ -362,7 +364,11 @@ class EventFederationStore(SQLBaseStore):
         return self.runInteraction(
             "get_backfill_events",
             self._get_backfill_events, room_id, event_list, limit
-        ).addCallback(self._get_events)
+        ).addCallback(
+            self._get_events
+        ).addCallback(
+            lambda l: sorted(l, key=lambda e: -e.depth)
+        )
 
     def _get_backfill_events(self, txn, room_id, event_list, limit):
         logger.debug(
@@ -370,43 +376,54 @@ class EventFederationStore(SQLBaseStore):
             room_id, repr(event_list), limit
         )
 
-        event_results = event_list
+        event_results = set()
 
-        front = event_list
+        # We want to make sure that we do a breadth-first, "depth" ordered
+        # search.
 
         query = (
-            "SELECT prev_event_id FROM event_edges "
-            "WHERE room_id = ? AND event_id = ? "
-            "LIMIT ?"
+            "SELECT depth, prev_event_id FROM event_edges"
+            " INNER JOIN events"
+            " ON prev_event_id = events.event_id"
+            " AND event_edges.room_id = events.room_id"
+            " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+            " AND event_edges.is_state = ?"
+            " LIMIT ?"
         )
 
-        # We iterate through all event_ids in `front` to select their previous
-        # events. These are dumped in `new_front`.
-        # We continue until we reach the limit *or* new_front is empty (i.e.,
-        # we've run out of things to select
-        while front and len(event_results) < limit:
+        queue = PriorityQueue()
 
-            new_front = []
-            for event_id in front:
-                logger.debug(
-                    "_backfill_interaction: id=%s",
-                    event_id
-                )
+        for event_id in event_list:
+            depth = self._simple_select_one_onecol_txn(
+                txn,
+                table="events",
+                keyvalues={
+                    "event_id": event_id,
+                },
+                retcol="depth"
+            )
 
-                txn.execute(
-                    query,
-                    (room_id, event_id, limit - len(event_results))
-                )
+            queue.put((-depth, event_id))
 
-                for row in txn.fetchall():
-                    logger.debug(
-                        "_backfill_interaction: got id=%s",
-                        *row
-                    )
-                    new_front.append(row[0])
+        while not queue.empty() and len(event_results) < limit:
+            try:
+                _, event_id = queue.get_nowait()
+            except Empty:
+                break
 
-            front = new_front
-            event_results += new_front
+            if event_id in event_results:
+                continue
+
+            event_results.add(event_id)
+
+            txn.execute(
+                query,
+                (room_id, event_id, False, limit - len(event_results))
+            )
+
+            for row in txn.fetchall():
+                if row[1] not in event_results:
+                    queue.put((-row[0], row[1]))
 
         return event_results
 
@@ -472,3 +489,4 @@ class EventFederationStore(SQLBaseStore):
         query = "DELETE FROM event_forward_extremities WHERE room_id = ?"
 
         txn.execute(query, (room_id,))
+        txn.call_after(self.get_latest_event_ids_in_room.invalidate, room_id)