summary refs log tree commit diff
diff options
context:
space:
mode:
authorDavid Baker <dbkr@matrix.org>2014-08-29 13:24:08 +0100
committerDavid Baker <dbkr@matrix.org>2014-08-29 13:24:08 +0100
commit3e6a19cf0938423b53b17b520bb6c59b9eb4dd06 (patch)
treec36e5b3a253499d6f5b2d44aa954f993c96c5b95
parentChange call signalling messages to be their own types of room events rather t... (diff)
parentShow notifications only when the user is detected as idle (diff)
downloadsynapse-3e6a19cf0938423b53b17b520bb6c59b9eb4dd06.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into develop
-rw-r--r--docs/specification.rst101
-rw-r--r--synapse/federation/replication.py2
-rw-r--r--synapse/handlers/events.py6
-rw-r--r--synapse/handlers/presence.py216
-rw-r--r--synapse/notifier.py1
-rw-r--r--synapse/streams/config.py2
-rw-r--r--synapse/util/logutils.py53
-rw-r--r--tests/handlers/test_presence.py138
-rw-r--r--tests/handlers/test_presencelike.py54
-rw-r--r--tests/utils.py37
-rw-r--r--webclient/room/room-controller.js19
11 files changed, 423 insertions, 206 deletions
diff --git a/docs/specification.rst b/docs/specification.rst
index d650683efc..c1559c886c 100644
--- a/docs/specification.rst
+++ b/docs/specification.rst
@@ -9,7 +9,9 @@ TODO(Introduction) : Matthew
 
 Architecture
 ============
-- Sending a message from A to B
+
+Clients transmit data to other clients through home servers (HSes). Clients do not communicate with each
+other directly.
 
 ::
 
@@ -26,39 +28,42 @@ Architecture
        |                  |<--------( HTTP )-----------|                  |
        +------------------+        Federation          +------------------+
 
-- Client is an end-user (web app, mobile app) which uses C-S APIs to talk to the home server.
-  A given client is typically responsible for a single user.
-- A single user is represented by a User ID, scoped to the home server which allocated the account.
-  User IDs MUST have @ prefix; looks like @foo:domain - domain indicates the user's home
-  server. 
-- Home server provides C-S APIs and has the ability to federate with other HSes.
-  Typically responsible for N clients.
-- Federation's purpose is to share content between interested HSes; no SPOF. 
-- Events are actions within the system. Typically each action (e.g. sending a message)
-  correlates with exactly one event. Each event has a ``type`` string. 
-- ``type`` values SHOULD be namespaced according to standard Java package naming conventions, 
-  with a ``.`` delimiter e.g. ``com.example.myapp.event``
-- Events are typically send in the context of a room.
+A "Client" is an end-user, typically a human using a web application or mobile app. Clients use the
+"Client-to-Server" (C-S) API to communicate with their home server. A single Client is usually
+responsible for a single user account. A user account is represented by their "User ID". This ID is
+namespaced to the home server which allocated the account and looks like::
+
+  @localpart:domain
+
+The ``localpart`` of a user ID may be a user name, or an opaque ID identifying this user.
+
+
+A "Home Server" is a server which provides C-S APIs and has the ability to federate with other HSes.
+It is typically responsible for multiple clients. "Federation" is the term used to describe the
+sharing of data between two or more home servers.
+
+Data in Matrix is encapsulated in an "Event". An event is an action within the system. Typically each
+action (e.g. sending a message) correlates with exactly one event. Each event has a ``type`` which is
+used to differentiate different kinds of data. ``type`` values SHOULD be namespaced according to standard
+Java package naming conventions, e.g. ``com.example.myapp.event``. Events are usually sent in the context
+of a "Room".
 
 Room structure
 --------------
 
-A room is a conceptual place where users can send and receive messages. Rooms 
-can be created, joined and left. Messages are sent to a room, and all 
-participants in that room will receive the message. Rooms are uniquely 
-identified via a room ID. There is exactly one room ID for each room. Each
-room can also have an alias. Each room can have many aliases.
+A room is a conceptual place where users can send and receive events. Rooms 
+can be created, joined and left. Events are sent to a room, and all 
+participants in that room will receive the event. Rooms are uniquely 
+identified via a "Room ID", which look like::
 
-- Room IDs MUST have ! prefix; looks like !foo:domain - domain is simply for namespacing,
-  the room does NOT reside on any one domain. NOT human readable.
+  !opaque_id:domain
 
-- Room Aliases MUST have # prefix; looks like #foo:domain - domain indicates where this
-  alias can be mapped to a room ID. Key point: human readable / friendly.
+There is exactly one room ID for each room. Whilst the room ID does contain a
+domain, it is simply for namespacing room IDs. The room does NOT reside on the
+domain specified. Room IDs are not meant to be human readable.
 
-- Aliases can be queried on the domain they specify, which will return a room ID if a
-  mapping exists. These mappings can change.
-
-::
+The following diagram shows an ``m.room.message`` event being sent in the room 
+``!qporfwt:matrix.org``::
 
        { @alice:matrix.org }                             { @bob:domain.com }
                |                                                 ^
@@ -73,18 +78,48 @@ room can also have an alias. Each room can have many aliases.
        |   matrix.org     |<-------Federation------->|   domain.com     |
        +------------------+                          +------------------+
                 |       .................................        |
-                |______|         Shared State            |_______|
-                       |  Room ID: !qporfwt:matrix.org   |
+                |______|     Partially Shared State      |_______|
+                       | Room ID: !qporfwt:matrix.org    |
                        | Servers: matrix.org, domain.com |
                        | Members:                        |
                        |  - @alice:matrix.org            |
                        |  - @bob:domain.com              |
                        |.................................|
 
-- Federation's goal is to maintain the shared state. Don't need FULL state in order
-  to be a part of a room.
-- Introduce the DAG.
-- Events are wrapped in PDUs.
+Federation maintains shared state between multiple home servers, such that when an event is
+sent to a room, the home server knows where to forward the event on to, and how to process
+the event. Home servers do not need to have completely shared state in order to participate 
+in a room. State is scoped to a single room, and federation ensures that all home servers 
+have the information they need, even if that means the home server has to request more 
+information from another home server before processing the event.
+
+Room Aliases
+------------
+
+Each room can also have multiple "Room Aliases", which looks like::
+
+  #room_alias:domain
+
+A room alias "points" to a room ID. The room ID the alias is pointing to can be obtained
+by visiting the domain specified. Room aliases are designed to be human readable strings
+which can be used to publicise rooms. Note that the mapping from a room alias to a 
+room ID is not fixed, and may change over time to point to a different room ID. For this
+reason, Clients SHOULD resolve the room alias to a room ID once and then use that ID on
+subsequent requests.
+
+::
+
+          GET    
+   #matrix:domain.com      !aaabaa:matrix.org
+           |                    ^
+           |                    |
+    _______V____________________|____
+   |          domain.com            |
+   | Mappings:                      |
+   | #matrix >> !aaabaa:matrix.org  |
+   | #golf >> !wfeiofh:sport.com    |
+   | #bike >> !4rguxf:matrix.org    |
+   |________________________________|
 
        
 Identity
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 7868575a2e..cadf574b3b 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -543,6 +543,8 @@ class _TransactionQueue(object):
         def eb(failure):
             if not deferred.called:
                 deferred.errback(failure)
+            else:
+                logger.exception("Failed to send edu", failure)
         self._attempt_new_transaction(destination).addErrback(eb)
 
         return deferred
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index e08231406d..980a169b25 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -16,6 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.events import SynapseEvent
+from synapse.util.logutils import log_function
 
 from ._base import BaseHandler
 
@@ -44,6 +45,7 @@ class EventStreamHandler(BaseHandler):
         self.notifier = hs.get_notifier()
 
     @defer.inlineCallbacks
+    @log_function
     def get_stream(self, auth_user_id, pagin_config, timeout=0):
         auth_user = self.hs.parse_userid(auth_user_id)
 
@@ -90,13 +92,15 @@ class EventStreamHandler(BaseHandler):
                 # 10 seconds of grace to allow the client to reconnect again
                 #   before we think they're gone
                 def _later():
+                    logger.debug("_later stopped_user_eventstream %s", auth_user)
                     self.distributor.fire(
                         "stopped_user_eventstream", auth_user
                     )
                     del self._stop_timer_per_user[auth_user]
 
+                logger.debug("Scheduling _later: for %s", auth_user)
                 self._stop_timer_per_user[auth_user] = (
-                    self.clock.call_later(5, _later)
+                    self.clock.call_later(30, _later)
                 )
 
 
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index bef1508892..7731de85c0 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -18,6 +18,8 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError, AuthError
 from synapse.api.constants import PresenceState
 
+from synapse.util.logutils import log_function
+
 from ._base import BaseHandler
 
 import logging
@@ -141,6 +143,10 @@ class PresenceHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def is_presence_visible(self, observer_user, observed_user):
+        defer.returnValue(True)
+        # return
+        # FIXME (erikj): This code path absolutely kills the database.
+
         assert(observed_user.is_mine)
 
         if observer_user == observed_user:
@@ -184,7 +190,12 @@ class PresenceHandler(BaseHandler):
         defer.returnValue(state)
 
     @defer.inlineCallbacks
+    @log_function
     def set_state(self, target_user, auth_user, state):
+        # return
+        # TODO (erikj): Turn this back on. Why did we end up sending EDUs
+        # everywhere?
+
         if not target_user.is_mine:
             raise SynapseError(400, "User is not hosted on this Home Server")
 
@@ -237,33 +248,42 @@ class PresenceHandler(BaseHandler):
 
         self.push_presence(user, statuscache=statuscache)
 
+    @log_function
     def started_user_eventstream(self, user):
         # TODO(paul): Use "last online" state
         self.set_state(user, user, {"state": PresenceState.ONLINE})
 
+    @log_function
     def stopped_user_eventstream(self, user):
         # TODO(paul): Save current state as "last online" state
         self.set_state(user, user, {"state": PresenceState.OFFLINE})
 
     @defer.inlineCallbacks
     def user_joined_room(self, user, room_id):
-        localusers = set()
-        remotedomains = set()
-
-        rm_handler = self.homeserver.get_handlers().room_member_handler
-        yield rm_handler.fetch_room_distributions_into(room_id,
-                localusers=localusers, remotedomains=remotedomains,
-                ignore_user=user)
 
         if user.is_mine:
-            yield self._send_presence_to_distribution(srcuser=user,
-                localusers=localusers, remotedomains=remotedomains,
+            self.push_update_to_local_and_remote(
+                observed_user=user,
+                room_ids=[room_id],
                 statuscache=self._get_or_offline_usercache(user),
             )
 
-        for srcuser in localusers:
-            yield self._send_presence(srcuser=srcuser, destuser=user,
-                statuscache=self._get_or_offline_usercache(srcuser),
+        else:
+            self.push_update_to_clients(
+                observed_user=user,
+                room_ids=[room_id],
+                statuscache=self._get_or_offline_usercache(user),
+            )
+
+        # We also want to tell them about current presence of people.
+        rm_handler = self.homeserver.get_handlers().room_member_handler
+        curr_users = yield rm_handler.get_room_members(room_id)
+
+        for local_user in [c for c in curr_users if c.is_mine]:
+            self.push_update_to_local_and_remote(
+                observed_user=local_user,
+                users_to_push=[user],
+                statuscache=self._get_or_offline_usercache(local_user),
             )
 
     @defer.inlineCallbacks
@@ -374,11 +394,13 @@ class PresenceHandler(BaseHandler):
         defer.returnValue(presence)
 
     @defer.inlineCallbacks
+    @log_function
     def start_polling_presence(self, user, target_user=None, state=None):
         logger.debug("Start polling for presence from %s", user)
 
         if target_user:
             target_users = set([target_user])
+            room_ids = []
         else:
             presence = yield self.store.get_presence_list(
                 user.localpart, accepted=True
@@ -392,23 +414,37 @@ class PresenceHandler(BaseHandler):
             rm_handler = self.homeserver.get_handlers().room_member_handler
             room_ids = yield rm_handler.get_rooms_for_user(user)
 
-            for room_id in room_ids:
-                for member in (yield rm_handler.get_room_members(room_id)):
-                    target_users.add(member)
-
         if state is None:
             state = yield self.store.get_presence_state(user.localpart)
-
-        localusers, remoteusers = partitionbool(
-            target_users,
-            lambda u: u.is_mine
+        else:
+#            statuscache = self._get_or_make_usercache(user)
+#            self._user_cachemap_latest_serial += 1
+#            statuscache.update(state, self._user_cachemap_latest_serial)
+            pass
+
+        yield self.push_update_to_local_and_remote(
+            observed_user=user,
+            users_to_push=target_users,
+            room_ids=room_ids,
+            statuscache=self._get_or_make_usercache(user),
         )
 
-        for target_user in localusers:
-            self._start_polling_local(user, target_user)
+        for target_user in target_users:
+            if target_user.is_mine:
+                self._start_polling_local(user, target_user)
+
+                # We want to tell the person that just came online
+                # presence state of people they are interested in?
+                self.push_update_to_clients(
+                    observed_user=target_user,
+                    users_to_push=[user],
+                    statuscache=self._get_or_offline_usercache(target_user),
+                )
 
         deferreds = []
-        remoteusers_by_domain = partition(remoteusers, lambda u: u.domain)
+        remote_users = [u for u in target_users if not u.is_mine]
+        remoteusers_by_domain = partition(remote_users, lambda u: u.domain)
+        # Only poll for people in our get_presence_list
         for domain in remoteusers_by_domain:
             remoteusers = remoteusers_by_domain[domain]
 
@@ -430,12 +466,6 @@ class PresenceHandler(BaseHandler):
 
         self._local_pushmap[target_localpart].add(user)
 
-        self.push_update_to_clients(
-            observer_user=user,
-            observed_user=target_user,
-            statuscache=self._get_or_offline_usercache(target_user),
-        )
-
     def _start_polling_remote(self, user, domain, remoteusers):
         to_poll = set()
 
@@ -455,6 +485,7 @@ class PresenceHandler(BaseHandler):
             content={"poll": [u.to_string() for u in to_poll]}
         )
 
+    @log_function
     def stop_polling_presence(self, user, target_user=None):
         logger.debug("Stop polling for presence from %s", user)
 
@@ -494,6 +525,7 @@ class PresenceHandler(BaseHandler):
             if not self._local_pushmap[localpart]:
                 del self._local_pushmap[localpart]
 
+    @log_function
     def _stop_polling_remote(self, user, domain, remoteusers):
         to_unpoll = set()
 
@@ -514,6 +546,7 @@ class PresenceHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
+    @log_function
     def push_presence(self, user, statuscache):
         assert(user.is_mine)
 
@@ -529,53 +562,17 @@ class PresenceHandler(BaseHandler):
         rm_handler = self.homeserver.get_handlers().room_member_handler
         room_ids = yield rm_handler.get_rooms_for_user(user)
 
-        for room_id in room_ids:
-            yield rm_handler.fetch_room_distributions_into(
-                room_id, localusers=localusers, remotedomains=remotedomains,
-                ignore_user=user,
-            )
-
-        if not localusers and not remotedomains:
+        if not localusers and not room_ids:
             defer.returnValue(None)
 
-        yield self._send_presence_to_distribution(user,
-            localusers=localusers, remotedomains=remotedomains,
-            statuscache=statuscache
+        yield self.push_update_to_local_and_remote(
+            observed_user=user,
+            users_to_push=localusers,
+            remote_domains=remotedomains,
+            room_ids=room_ids,
+            statuscache=statuscache,
         )
 
-    def _send_presence(self, srcuser, destuser, statuscache):
-        if destuser.is_mine:
-            self.push_update_to_clients(
-                observer_user=destuser,
-                observed_user=srcuser,
-                statuscache=statuscache)
-            return defer.succeed(None)
-        else:
-            return self._push_presence_remote(srcuser, destuser.domain,
-                state=statuscache.get_state()
-            )
-
-    @defer.inlineCallbacks
-    def _send_presence_to_distribution(self, srcuser, localusers=set(),
-            remotedomains=set(), statuscache=None):
-
-        for u in localusers:
-            logger.debug(" | push to local user %s", u)
-            self.push_update_to_clients(
-                observer_user=u,
-                observed_user=srcuser,
-                statuscache=statuscache,
-            )
-
-        deferreds = []
-        for domain in remotedomains:
-            logger.debug(" | push to remote domain %s", domain)
-            deferreds.append(self._push_presence_remote(srcuser, domain,
-                state=statuscache.get_state())
-            )
-
-        yield defer.DeferredList(deferreds)
-
     @defer.inlineCallbacks
     def _push_presence_remote(self, user, destination, state=None):
         if state is None:
@@ -591,12 +588,17 @@ class PresenceHandler(BaseHandler):
                 self.clock.time_msec() - state.pop("mtime")
             )
 
+        user_state = {
+            "user_id": user.to_string(),
+        }
+        user_state.update(**state)
+
         yield self.federation.send_edu(
             destination=destination,
             edu_type="m.presence",
             content={
                 "push": [
-                    dict(user_id=user.to_string(), **state),
+                    user_state,
                 ],
             }
         )
@@ -615,12 +617,7 @@ class PresenceHandler(BaseHandler):
             rm_handler = self.homeserver.get_handlers().room_member_handler
             room_ids = yield rm_handler.get_rooms_for_user(user)
 
-            for room_id in room_ids:
-                yield rm_handler.fetch_room_distributions_into(
-                    room_id, localusers=observers, ignore_user=user
-                )
-
-            if not observers:
+            if not observers and not room_ids:
                 break
 
             state = dict(push)
@@ -636,12 +633,12 @@ class PresenceHandler(BaseHandler):
             self._user_cachemap_latest_serial += 1
             statuscache.update(state, serial=self._user_cachemap_latest_serial)
 
-            for observer_user in observers:
-                self.push_update_to_clients(
-                    observer_user=observer_user,
-                    observed_user=user,
-                    statuscache=statuscache,
-                )
+            self.push_update_to_clients(
+                observed_user=user,
+                users_to_push=observers,
+                room_ids=room_ids,
+                statuscache=statuscache,
+            )
 
             if state["state"] == PresenceState.OFFLINE:
                 del self._user_cachemap[user]
@@ -675,12 +672,53 @@ class PresenceHandler(BaseHandler):
 
         yield defer.DeferredList(deferreds)
 
-    def push_update_to_clients(self, observer_user, observed_user,
-                               statuscache):
-        statuscache.make_event(user=observed_user, clock=self.clock)
+    @defer.inlineCallbacks
+    def push_update_to_local_and_remote(self, observed_user,
+                                        users_to_push=[], room_ids=[],
+                                        remote_domains=[],
+                                        statuscache=None):
+
+        localusers, remoteusers = partitionbool(
+            users_to_push,
+            lambda u: u.is_mine
+        )
+
+        localusers = set(localusers)
+
+        self.push_update_to_clients(
+            observed_user=observed_user,
+            users_to_push=localusers,
+            room_ids=room_ids,
+            statuscache=statuscache,
+        )
+
+        remote_domains = set(remote_domains)
+        remote_domains |= set([r.domain for r in remoteusers])
+        for room_id in room_ids:
+            remote_domains.update(
+                (yield self.store.get_joined_hosts_for_room(room_id))
+            )
+
+        remote_domains.discard(self.hs.hostname)
+
+        deferreds = []
+        for domain in remote_domains:
+            logger.debug(" | push to remote domain %s", domain)
+            deferreds.append(
+                self._push_presence_remote(
+                    observed_user, domain, state=statuscache.get_state()
+                )
+            )
+
+        yield defer.DeferredList(deferreds)
+
+        defer.returnValue((localusers, remote_domains))
 
+    def push_update_to_clients(self, observed_user, users_to_push=[],
+                                 room_ids=[], statuscache=None):
         self.notifier.on_new_user_event(
-            [observer_user],
+            users_to_push,
+            room_ids,
         )
 
 
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 3d3fcdabdb..b6d5ec4820 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -119,6 +119,7 @@ class Notifier(object):
                 )
 
     @defer.inlineCallbacks
+    @log_function
     def on_new_user_event(self, users=[], rooms=[]):
         """ Used to inform listeners that something has happend
         presence/user event wise.
diff --git a/synapse/streams/config.py b/synapse/streams/config.py
index 2434844d80..01bab568ff 100644
--- a/synapse/streams/config.py
+++ b/synapse/streams/config.py
@@ -81,4 +81,4 @@ class PaginationConfig(object):
         return (
             "<PaginationConfig from_tok=%s, to_tok=%s, "
             "direction=%s, limit=%s>"
-        ) % (self.from_tok, self.to_tok, self.direction, self.limit)
+        ) % (self.from_token, self.to_token, self.direction, self.limit)
diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py
index 021649071b..b94a749786 100644
--- a/synapse/util/logutils.py
+++ b/synapse/util/logutils.py
@@ -18,6 +18,8 @@ from inspect import getcallargs
 from functools import wraps
 
 import logging
+import inspect
+import traceback
 
 
 def log_function(f):
@@ -65,4 +67,55 @@ def log_function(f):
 
         return f(*args, **kwargs)
 
+    wrapped.__name__ = func_name
+    return wrapped
+
+
+def trace_function(f):
+    func_name = f.__name__
+    linenum = f.func_code.co_firstlineno
+    pathname = f.func_code.co_filename
+
+    def wrapped(*args, **kwargs):
+        name = f.__module__
+        logger = logging.getLogger(name)
+        level = logging.DEBUG
+
+        s = inspect.currentframe().f_back
+
+        to_print = [
+            "\t%s:%s %s. Args: args=%s, kwargs=%s" % (
+                pathname, linenum, func_name, args, kwargs
+            )
+        ]
+        while s:
+            if True or s.f_globals["__name__"].startswith("synapse"):
+                filename, lineno, function, _, _ = inspect.getframeinfo(s)
+                args_string = inspect.formatargvalues(*inspect.getargvalues(s))
+
+                to_print.append(
+                    "\t%s:%d %s. Args: %s" % (
+                        filename, lineno, function, args_string
+                    )
+                )
+
+            s = s.f_back
+
+        msg = "\nTraceback for %s:\n" % (func_name,) + "\n".join(to_print)
+
+        record = logging.LogRecord(
+            name=name,
+            level=level,
+            pathname=pathname,
+            lineno=lineno,
+            msg=msg,
+            args=None,
+            exc_info=None
+        )
+
+        logger.handle(record)
+
+        return f(*args, **kwargs)
+
+    wrapped.__name__ = func_name
     return wrapped
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 8d094fd1f9..fcd7a784cd 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -193,6 +193,8 @@ class PresenceStateTestCase(unittest.TestCase):
             SynapseError
         )
 
+    test_get_disallowed_state.skip = "Presence permissions are disabled"
+
     @defer.inlineCallbacks
     def test_set_my_state(self):
         mocked_set = self.datastore.set_presence_state
@@ -497,6 +499,7 @@ class PresencePushTestCase(unittest.TestCase):
                 db_pool=None,
                 datastore=Mock(spec=[
                     "set_presence_state",
+                    "get_joined_hosts_for_room",
 
                     # Bits that Federation needs
                     "prep_send_transaction",
@@ -511,8 +514,12 @@ class PresencePushTestCase(unittest.TestCase):
             )
         hs.handlers = JustPresenceHandlers(hs)
 
+        def update(*args,**kwargs):
+            # print "mock_update_client: Args=%s, kwargs=%s" %(args, kwargs,)
+            return defer.succeed(None)
+
         self.mock_update_client = Mock()
-        self.mock_update_client.return_value = defer.succeed(None)
+        self.mock_update_client.side_effect = update
 
         self.datastore = hs.get_datastore()
 
@@ -546,6 +553,14 @@ class PresencePushTestCase(unittest.TestCase):
                 return defer.succeed([])
         self.room_member_handler.get_room_members = get_room_members
 
+        def get_room_hosts(room_id):
+            if room_id == "a-room":
+                hosts = set([u.domain for u in self.room_members])
+                return defer.succeed(hosts)
+            else:
+                return defer.succeed([])
+        self.datastore.get_joined_hosts_for_room = get_room_hosts
+
         @defer.inlineCallbacks
         def fetch_room_distributions_into(room_id, localusers=None,
                 remotedomains=None, ignore_user=None):
@@ -611,18 +626,10 @@ class PresencePushTestCase(unittest.TestCase):
                 {"state": ONLINE})
 
         self.mock_update_client.assert_has_calls([
-                call(observer_user=self.u_apple,
+                call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]),
+                    room_ids=["a-room"],
                     observed_user=self.u_apple,
                     statuscache=ANY), # self-reflection
-                call(observer_user=self.u_banana,
-                    observed_user=self.u_apple,
-                    statuscache=ANY),
-                call(observer_user=self.u_clementine,
-                    observed_user=self.u_apple,
-                    statuscache=ANY),
-                call(observer_user=self.u_elderberry,
-                    observed_user=self.u_apple,
-                    statuscache=ANY),
         ], any_order=True)
         self.mock_update_client.reset_mock()
 
@@ -651,7 +658,8 @@ class PresencePushTestCase(unittest.TestCase):
         ], presence)
 
         self.mock_update_client.assert_has_calls([
-                call(observer_user=self.u_banana,
+                call(users_to_push=set([self.u_banana]),
+                    room_ids=[],
                     observed_user=self.u_banana,
                     statuscache=ANY), # self-reflection
         ]) # and no others...
@@ -659,21 +667,21 @@ class PresencePushTestCase(unittest.TestCase):
     @defer.inlineCallbacks
     def test_push_remote(self):
         put_json = self.mock_http_client.put_json
-        put_json.expect_call_and_return(
-            call("remote",
-                path=ANY,  # Can't guarantee which txn ID will be which
-                data=_expect_edu("remote", "m.presence",
-                    content={
-                        "push": [
-                            {"user_id": "@apple:test",
-                             "state": "online",
-                             "mtime_age": 0},
-                        ],
-                    }
-                )
-            ),
-            defer.succeed((200, "OK"))
-        )
+#        put_json.expect_call_and_return(
+#            call("remote",
+#                path=ANY,  # Can't guarantee which txn ID will be which
+#                data=_expect_edu("remote", "m.presence",
+#                    content={
+#                        "push": [
+#                            {"user_id": "@apple:test",
+#                             "state": "online",
+#                             "mtime_age": 0},
+#                        ],
+#                    }
+#                )
+#            ),
+#            defer.succeed((200, "OK"))
+#        )
         put_json.expect_call_and_return(
             call("farm",
                 path=ANY,  # Can't guarantee which txn ID will be which
@@ -681,7 +689,7 @@ class PresencePushTestCase(unittest.TestCase):
                     content={
                         "push": [
                             {"user_id": "@apple:test",
-                             "state": "online",
+                             "state": u"online",
                              "mtime_age": 0},
                         ],
                     }
@@ -730,10 +738,8 @@ class PresencePushTestCase(unittest.TestCase):
         )
 
         self.mock_update_client.assert_has_calls([
-                call(observer_user=self.u_apple,
-                    observed_user=self.u_potato,
-                    statuscache=ANY),
-                call(observer_user=self.u_banana,
+                call(users_to_push=set([self.u_apple]),
+                    room_ids=["a-room"],
                     observed_user=self.u_potato,
                     statuscache=ANY),
         ], any_order=True)
@@ -753,19 +759,17 @@ class PresencePushTestCase(unittest.TestCase):
         )
 
         self.mock_update_client.assert_has_calls([
-            # Apple and Elderberry see each other
-            call(observer_user=self.u_apple,
+            call(room_ids=["a-room"],
                 observed_user=self.u_elderberry,
+                users_to_push=set(),
                 statuscache=ANY),
-            call(observer_user=self.u_elderberry,
+            call(users_to_push=set([self.u_elderberry]),
                 observed_user=self.u_apple,
+                room_ids=[],
                 statuscache=ANY),
-            # Banana and Elderberry see each other
-            call(observer_user=self.u_banana,
-                observed_user=self.u_elderberry,
-                statuscache=ANY),
-            call(observer_user=self.u_elderberry,
+            call(users_to_push=set([self.u_elderberry]),
                 observed_user=self.u_banana,
+                room_ids=[],
                 statuscache=ANY),
         ], any_order=True)
 
@@ -887,7 +891,12 @@ class PresencePollingTestCase(unittest.TestCase):
         self.datastore.get_received_txn_response = get_received_txn_response
 
         self.mock_update_client = Mock()
-        self.mock_update_client.return_value = defer.succeed(None)
+
+        def update(*args,**kwargs):
+            # print "mock_update_client: Args=%s, kwargs=%s" %(args, kwargs,)
+            return defer.succeed(None)
+
+        self.mock_update_client.side_effect = update
 
         self.handler = hs.get_handlers().presence_handler
         self.handler.push_update_to_clients = self.mock_update_client
@@ -951,10 +960,10 @@ class PresencePollingTestCase(unittest.TestCase):
 
         # apple should see both banana and clementine currently offline
         self.mock_update_client.assert_has_calls([
-                call(observer_user=self.u_apple,
+                call(users_to_push=[self.u_apple],
                     observed_user=self.u_banana,
                     statuscache=ANY),
-                call(observer_user=self.u_apple,
+                call(users_to_push=[self.u_apple],
                     observed_user=self.u_clementine,
                     statuscache=ANY),
         ], any_order=True)
@@ -974,10 +983,11 @@ class PresencePollingTestCase(unittest.TestCase):
 
         # apple and banana should now both see each other online
         self.mock_update_client.assert_has_calls([
-                call(observer_user=self.u_apple,
+                call(users_to_push=set([self.u_apple]),
                     observed_user=self.u_banana,
+                    room_ids=[],
                     statuscache=ANY),
-                call(observer_user=self.u_banana,
+                call(users_to_push=[self.u_banana],
                     observed_user=self.u_apple,
                     statuscache=ANY),
         ], any_order=True)
@@ -994,8 +1004,9 @@ class PresencePollingTestCase(unittest.TestCase):
 
         # banana should now be told apple is offline
         self.mock_update_client.assert_has_calls([
-                call(observer_user=self.u_banana,
+                call(users_to_push=set([self.u_banana, self.u_apple]),
                     observed_user=self.u_apple,
+                    room_ids=[],
                     statuscache=ANY),
         ], any_order=True)
 
@@ -1008,7 +1019,7 @@ class PresencePollingTestCase(unittest.TestCase):
         put_json = self.mock_http_client.put_json
         put_json.expect_call_and_return(
             call("remote",
-                path="/matrix/federation/v1/send/1000000/",
+                path=ANY,
                 data=_expect_edu("remote", "m.presence",
                     content={
                         "poll": [ "@potato:remote" ],
@@ -1018,6 +1029,18 @@ class PresencePollingTestCase(unittest.TestCase):
             defer.succeed((200, "OK"))
         )
 
+        put_json.expect_call_and_return(
+            call("remote",
+                path=ANY,
+                data=_expect_edu("remote", "m.presence",
+                    content={
+                        "push": [ {"user_id": "@clementine:test" }],
+                    },
+                ),
+            ),
+            defer.succeed((200, "OK"))
+        )
+
         # clementine goes online
         yield self.handler.set_state(
                 target_user=self.u_clementine, auth_user=self.u_clementine,
@@ -1032,15 +1055,28 @@ class PresencePollingTestCase(unittest.TestCase):
         self.assertTrue(self.u_clementine in
                 self.handler._remote_recvmap[self.u_potato])
 
+
+        put_json.expect_call_and_return(
+            call("remote",
+                path=ANY,
+                data=_expect_edu("remote", "m.presence",
+                    content={
+                        "push": [ {"user_id": "@fig:test" }],
+                    },
+                ),
+            ),
+            defer.succeed((200, "OK"))
+        )
+
         # fig goes online; shouldn't send a second poll
         yield self.handler.set_state(
             target_user=self.u_fig, auth_user=self.u_fig,
             state={"state": ONLINE}
         )
 
-        reactor.iterate(delay=0)
+        # reactor.iterate(delay=0)
 
-        put_json.assert_had_no_calls()
+        yield put_json.await_calls()
 
         # fig goes offline
         yield self.handler.set_state(
@@ -1054,7 +1090,7 @@ class PresencePollingTestCase(unittest.TestCase):
 
         put_json.expect_call_and_return(
             call("remote",
-                path="/matrix/federation/v1/send/1000001/",
+                path=ANY,
                 data=_expect_edu("remote", "m.presence",
                     content={
                         "unpoll": [ "@potato:remote" ],
@@ -1069,7 +1105,7 @@ class PresencePollingTestCase(unittest.TestCase):
                 target_user=self.u_clementine, auth_user=self.u_clementine,
                 state={"state": OFFLINE})
 
-        put_json.await_calls()
+        yield put_json.await_calls()
 
         self.assertFalse(self.u_potato in self.handler._remote_recvmap,
             msg="expected potato not to be in _remote_recvmap"
diff --git a/tests/handlers/test_presencelike.py b/tests/handlers/test_presencelike.py
index da06a06647..e81d7ce101 100644
--- a/tests/handlers/test_presencelike.py
+++ b/tests/handlers/test_presencelike.py
@@ -81,7 +81,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
 
         self.replication = hs.get_replication_layer()
         self.replication.send_edu = Mock()
-        self.replication.send_edu.return_value = defer.succeed((200, "OK"))
+
+        def send_edu(*args, **kwargs):
+            # print "send_edu: %s, %s" % (args, kwargs)
+            return defer.succeed((200, "OK"))
+        self.replication.send_edu.side_effect = send_edu
 
         def get_profile_displayname(user_localpart):
             return defer.succeed("Frank")
@@ -95,11 +99,12 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
             return defer.succeed("http://foo")
         self.datastore.get_profile_avatar_url = get_profile_avatar_url
 
+        self.presence_list = [
+            {"observed_user_id": "@banana:test"},
+            {"observed_user_id": "@clementine:test"},
+        ]
         def get_presence_list(user_localpart, accepted=None):
-            return defer.succeed([
-                {"observed_user_id": "@banana:test"},
-                {"observed_user_id": "@clementine:test"},
-            ])
+            return defer.succeed(self.presence_list)
         self.datastore.get_presence_list = get_presence_list
 
         def do_users_share_a_room(userlist):
@@ -109,7 +114,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
         self.handlers = hs.get_handlers()
 
         self.mock_update_client = Mock()
-        self.mock_update_client.return_value = defer.succeed(None)
+        def update(*args, **kwargs):
+            # print "mock_update_client: %s, %s" %(args, kwargs)
+            return defer.succeed(None)
+        self.mock_update_client.side_effect = update
 
         self.handlers.presence_handler.push_update_to_clients = (
                 self.mock_update_client)
@@ -130,6 +138,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
 
     @defer.inlineCallbacks
     def test_set_my_state(self):
+        self.presence_list = [
+            {"observed_user_id": "@banana:test"},
+            {"observed_user_id": "@clementine:test"},
+        ]
+
         mocked_set = self.datastore.set_presence_state
         mocked_set.return_value = defer.succeed({"state": OFFLINE})
 
@@ -142,6 +155,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
 
     @defer.inlineCallbacks
     def test_push_local(self):
+        self.presence_list = [
+            {"observed_user_id": "@banana:test"},
+            {"observed_user_id": "@clementine:test"},
+        ]
+
         self.datastore.set_presence_state.return_value = defer.succeed(
                 {"state": ONLINE})
 
@@ -173,12 +191,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
         presence)
 
         self.mock_update_client.assert_has_calls([
-            call(observer_user=self.u_apple,
+            call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]),
+                room_ids=[],
                 observed_user=self.u_apple,
                 statuscache=ANY), # self-reflection
-            call(observer_user=self.u_banana,
-                observed_user=self.u_apple,
-                statuscache=ANY),
         ], any_order=True)
 
         statuscache = self.mock_update_client.call_args[1]["statuscache"]
@@ -198,12 +214,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
                 self.u_apple, "I am an Apple")
 
         self.mock_update_client.assert_has_calls([
-            call(observer_user=self.u_apple,
+            call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]),
+                room_ids=[],
                 observed_user=self.u_apple,
                 statuscache=ANY), # self-reflection
-            call(observer_user=self.u_banana,
-                observed_user=self.u_apple,
-                statuscache=ANY),
         ], any_order=True)
 
         statuscache = self.mock_update_client.call_args[1]["statuscache"]
@@ -217,6 +231,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
 
     @defer.inlineCallbacks
     def test_push_remote(self):
+        self.presence_list = [
+            {"observed_user_id": "@potato:remote"},
+        ]
+
         self.datastore.set_presence_state.return_value = defer.succeed(
                 {"state": ONLINE})
 
@@ -247,6 +265,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
 
     @defer.inlineCallbacks
     def test_recv_remote(self):
+        self.presence_list = [
+            {"observed_user_id": "@banana:test"},
+            {"observed_user_id": "@clementine:test"},
+        ]
+
         # TODO(paul): Gut-wrenching
         potato_set = self.handlers.presence_handler._remote_recvmap.setdefault(
                 self.u_potato, set())
@@ -264,7 +287,8 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
         )
 
         self.mock_update_client.assert_called_with(
-            observer_user=self.u_apple,
+            users_to_push=set([self.u_apple]),
+            room_ids=[],
             observed_user=self.u_potato,
             statuscache=ANY)
 
diff --git a/tests/utils.py b/tests/utils.py
index 98d4f9ed58..37b759febc 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -21,7 +21,7 @@ from synapse.api.events.room import (
     RoomMemberEvent, MessageEvent
 )
 
-from twisted.internet import defer
+from twisted.internet import defer, reactor
 
 from collections import namedtuple
 from mock import patch, Mock
@@ -263,18 +263,43 @@ class DeferredMockCallable(object):
                 d.callback(None)
                 return result
 
-        raise AssertionError("Was not expecting call(%s)" %
+        failure = AssertionError("Was not expecting call(%s)" %
             _format_call(args, kwargs)
         )
 
+        for _, _, d in self.expectations:
+            try:
+                d.errback(failure)
+            except:
+                pass
+
+        raise failure
+
     def expect_call_and_return(self, call, result):
         self.expectations.append((call, result, defer.Deferred()))
 
     @defer.inlineCallbacks
-    def await_calls(self):
-        while self.expectations:
-            (_, _, d) = self.expectations.pop(0)
-            yield d
+    def await_calls(self, timeout=1000):
+        deferred = defer.DeferredList(
+            [d for _, _, d in self.expectations],
+            fireOnOneErrback=True
+        )
+
+        timer = reactor.callLater(
+            timeout/1000,
+            deferred.errback,
+            AssertionError(
+                "%d pending calls left: %s"% (
+                    len([e for e in self.expectations if not e[2].called]),
+                    [e for e in self.expectations if not e[2].called]
+                )
+            )
+        )
+
+        yield deferred
+
+        timer.cancel()
+
         self.calls = []
 
     def assert_had_no_calls(self):
diff --git a/webclient/room/room-controller.js b/webclient/room/room-controller.js
index 8bb48b3692..6232ce8ed3 100644
--- a/webclient/room/room-controller.js
+++ b/webclient/room/room-controller.js
@@ -15,8 +15,8 @@ limitations under the License.
 */
 
 angular.module('RoomController', ['ngSanitize', 'mFileInput'])
-.controller('RoomController', ['$scope', '$timeout', '$routeParams', '$location', '$rootScope', 'matrixService', 'eventHandlerService', 'mFileUpload', 'matrixPhoneService', 'MatrixCall',
-                               function($scope, $timeout, $routeParams, $location, $rootScope, matrixService, eventHandlerService, mFileUpload, matrixPhoneService, MatrixCall) {
+.controller('RoomController', ['$scope', '$timeout', '$routeParams', '$location', '$rootScope', 'matrixService', 'eventHandlerService', 'mFileUpload', 'mPresence', 'matrixPhoneService', 'MatrixCall',
+                               function($scope, $timeout, $routeParams, $location, $rootScope, matrixService, eventHandlerService, mFileUpload, mPresence, matrixPhoneService, MatrixCall) {
    'use strict';
     var MESSAGES_PER_PAGINATION = 30;
     var THUMBNAIL_SIZE = 320;
@@ -57,15 +57,14 @@ angular.module('RoomController', ['ngSanitize', 'mFileInput'])
             scrollToBottom();
             
             if (window.Notification) {
-                // FIXME: we should also notify based on a timer or other heuristics
-                // rather than the window being minimised
-                if (document.hidden) {
+                // Show notification when the user is idle
+                if (matrixService.presence.offline === mPresence.getState()) {
                     var notification = new window.Notification(
                         ($scope.members[event.user_id].displayname || event.user_id) +
                         " (" + ($scope.room_alias || $scope.room_id) + ")", // FIXME: don't leak room_ids here
                     {
                         "body": event.content.body,
-                        "icon": $scope.members[event.user_id].avatar_url,
+                        "icon": $scope.members[event.user_id].avatar_url
                     });
                     $timeout(function() {
                         notification.close();
@@ -230,7 +229,7 @@ angular.module('RoomController', ['ngSanitize', 'mFileInput'])
             var member = $scope.members[target_user_id];
             member.content.membership = chunk.content.membership;
         }
-    }
+    };
 
     var updatePresence = function(chunk) {
         if (!(chunk.content.user_id in $scope.members)) {
@@ -257,10 +256,10 @@ angular.module('RoomController', ['ngSanitize', 'mFileInput'])
         if ("avatar_url" in chunk.content) {
             member.avatar_url = chunk.content.avatar_url;
         }
-    }
+    };
 
     $scope.send = function() {
-        if ($scope.textInput == "") {
+        if ($scope.textInput === "") {
             return;
         }
 
@@ -269,7 +268,7 @@ angular.module('RoomController', ['ngSanitize', 'mFileInput'])
         // Send the text message
         var promise;
         // FIXME: handle other commands too
-        if ($scope.textInput.indexOf("/me") == 0) {
+        if ($scope.textInput.indexOf("/me") === 0) {
             promise = matrixService.sendEmoteMessage($scope.room_id, $scope.textInput.substr(4));
         }
         else {