summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2014-08-29 12:10:00 +0100
committerErik Johnston <erik@matrix.org>2014-08-29 12:10:00 +0100
commit47fb286184dd27d8098af2fd24d22dd82205d3d5 (patch)
tree3e186226e18131b7c5d9ca89fdb4075667110c6d
parentExpand architecture section to introduce room IDs, room aliases, user IDs, ev... (diff)
parentFix a couple of bugs in presence handler related to pushing updatesto the cor... (diff)
downloadsynapse-47fb286184dd27d8098af2fd24d22dd82205d3d5.tar.xz
Merge branch 'presence_logging' into develop
-rw-r--r--synapse/federation/replication.py2
-rw-r--r--synapse/handlers/events.py6
-rw-r--r--synapse/handlers/presence.py216
-rw-r--r--synapse/notifier.py1
-rw-r--r--synapse/streams/config.py2
-rw-r--r--synapse/util/logutils.py53
-rw-r--r--tests/handlers/test_presence.py138
-rw-r--r--tests/handlers/test_presencelike.py54
-rw-r--r--tests/utils.py37
9 files changed, 346 insertions, 163 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 7868575a2e..cadf574b3b 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -543,6 +543,8 @@ class _TransactionQueue(object):
         def eb(failure):
             if not deferred.called:
                 deferred.errback(failure)
+            else:
+                logger.exception("Failed to send edu", failure)
         self._attempt_new_transaction(destination).addErrback(eb)
 
         return deferred
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index e08231406d..980a169b25 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -16,6 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.events import SynapseEvent
+from synapse.util.logutils import log_function
 
 from ._base import BaseHandler
 
@@ -44,6 +45,7 @@ class EventStreamHandler(BaseHandler):
         self.notifier = hs.get_notifier()
 
     @defer.inlineCallbacks
+    @log_function
     def get_stream(self, auth_user_id, pagin_config, timeout=0):
         auth_user = self.hs.parse_userid(auth_user_id)
 
@@ -90,13 +92,15 @@ class EventStreamHandler(BaseHandler):
                 # 10 seconds of grace to allow the client to reconnect again
                 #   before we think they're gone
                 def _later():
+                    logger.debug("_later stopped_user_eventstream %s", auth_user)
                     self.distributor.fire(
                         "stopped_user_eventstream", auth_user
                     )
                     del self._stop_timer_per_user[auth_user]
 
+                logger.debug("Scheduling _later: for %s", auth_user)
                 self._stop_timer_per_user[auth_user] = (
-                    self.clock.call_later(5, _later)
+                    self.clock.call_later(30, _later)
                 )
 
 
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index bef1508892..7731de85c0 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -18,6 +18,8 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError, AuthError
 from synapse.api.constants import PresenceState
 
+from synapse.util.logutils import log_function
+
 from ._base import BaseHandler
 
 import logging
@@ -141,6 +143,10 @@ class PresenceHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def is_presence_visible(self, observer_user, observed_user):
+        defer.returnValue(True)
+        # return
+        # FIXME (erikj): This code path absolutely kills the database.
+
         assert(observed_user.is_mine)
 
         if observer_user == observed_user:
@@ -184,7 +190,12 @@ class PresenceHandler(BaseHandler):
         defer.returnValue(state)
 
     @defer.inlineCallbacks
+    @log_function
     def set_state(self, target_user, auth_user, state):
+        # return
+        # TODO (erikj): Turn this back on. Why did we end up sending EDUs
+        # everywhere?
+
         if not target_user.is_mine:
             raise SynapseError(400, "User is not hosted on this Home Server")
 
@@ -237,33 +248,42 @@ class PresenceHandler(BaseHandler):
 
         self.push_presence(user, statuscache=statuscache)
 
+    @log_function
     def started_user_eventstream(self, user):
         # TODO(paul): Use "last online" state
         self.set_state(user, user, {"state": PresenceState.ONLINE})
 
+    @log_function
     def stopped_user_eventstream(self, user):
         # TODO(paul): Save current state as "last online" state
         self.set_state(user, user, {"state": PresenceState.OFFLINE})
 
     @defer.inlineCallbacks
     def user_joined_room(self, user, room_id):
-        localusers = set()
-        remotedomains = set()
-
-        rm_handler = self.homeserver.get_handlers().room_member_handler
-        yield rm_handler.fetch_room_distributions_into(room_id,
-                localusers=localusers, remotedomains=remotedomains,
-                ignore_user=user)
 
         if user.is_mine:
-            yield self._send_presence_to_distribution(srcuser=user,
-                localusers=localusers, remotedomains=remotedomains,
+            self.push_update_to_local_and_remote(
+                observed_user=user,
+                room_ids=[room_id],
                 statuscache=self._get_or_offline_usercache(user),
             )
 
-        for srcuser in localusers:
-            yield self._send_presence(srcuser=srcuser, destuser=user,
-                statuscache=self._get_or_offline_usercache(srcuser),
+        else:
+            self.push_update_to_clients(
+                observed_user=user,
+                room_ids=[room_id],
+                statuscache=self._get_or_offline_usercache(user),
+            )
+
+        # We also want to tell them about current presence of people.
+        rm_handler = self.homeserver.get_handlers().room_member_handler
+        curr_users = yield rm_handler.get_room_members(room_id)
+
+        for local_user in [c for c in curr_users if c.is_mine]:
+            self.push_update_to_local_and_remote(
+                observed_user=local_user,
+                users_to_push=[user],
+                statuscache=self._get_or_offline_usercache(local_user),
             )
 
     @defer.inlineCallbacks
@@ -374,11 +394,13 @@ class PresenceHandler(BaseHandler):
         defer.returnValue(presence)
 
     @defer.inlineCallbacks
+    @log_function
     def start_polling_presence(self, user, target_user=None, state=None):
         logger.debug("Start polling for presence from %s", user)
 
         if target_user:
             target_users = set([target_user])
+            room_ids = []
         else:
             presence = yield self.store.get_presence_list(
                 user.localpart, accepted=True
@@ -392,23 +414,37 @@ class PresenceHandler(BaseHandler):
             rm_handler = self.homeserver.get_handlers().room_member_handler
             room_ids = yield rm_handler.get_rooms_for_user(user)
 
-            for room_id in room_ids:
-                for member in (yield rm_handler.get_room_members(room_id)):
-                    target_users.add(member)
-
         if state is None:
             state = yield self.store.get_presence_state(user.localpart)
-
-        localusers, remoteusers = partitionbool(
-            target_users,
-            lambda u: u.is_mine
+        else:
+#            statuscache = self._get_or_make_usercache(user)
+#            self._user_cachemap_latest_serial += 1
+#            statuscache.update(state, self._user_cachemap_latest_serial)
+            pass
+
+        yield self.push_update_to_local_and_remote(
+            observed_user=user,
+            users_to_push=target_users,
+            room_ids=room_ids,
+            statuscache=self._get_or_make_usercache(user),
         )
 
-        for target_user in localusers:
-            self._start_polling_local(user, target_user)
+        for target_user in target_users:
+            if target_user.is_mine:
+                self._start_polling_local(user, target_user)
+
+                # We want to tell the person that just came online
+                # presence state of people they are interested in?
+                self.push_update_to_clients(
+                    observed_user=target_user,
+                    users_to_push=[user],
+                    statuscache=self._get_or_offline_usercache(target_user),
+                )
 
         deferreds = []
-        remoteusers_by_domain = partition(remoteusers, lambda u: u.domain)
+        remote_users = [u for u in target_users if not u.is_mine]
+        remoteusers_by_domain = partition(remote_users, lambda u: u.domain)
+        # Only poll for people in our get_presence_list
         for domain in remoteusers_by_domain:
             remoteusers = remoteusers_by_domain[domain]
 
@@ -430,12 +466,6 @@ class PresenceHandler(BaseHandler):
 
         self._local_pushmap[target_localpart].add(user)
 
-        self.push_update_to_clients(
-            observer_user=user,
-            observed_user=target_user,
-            statuscache=self._get_or_offline_usercache(target_user),
-        )
-
     def _start_polling_remote(self, user, domain, remoteusers):
         to_poll = set()
 
@@ -455,6 +485,7 @@ class PresenceHandler(BaseHandler):
             content={"poll": [u.to_string() for u in to_poll]}
         )
 
+    @log_function
     def stop_polling_presence(self, user, target_user=None):
         logger.debug("Stop polling for presence from %s", user)
 
@@ -494,6 +525,7 @@ class PresenceHandler(BaseHandler):
             if not self._local_pushmap[localpart]:
                 del self._local_pushmap[localpart]
 
+    @log_function
     def _stop_polling_remote(self, user, domain, remoteusers):
         to_unpoll = set()
 
@@ -514,6 +546,7 @@ class PresenceHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
+    @log_function
     def push_presence(self, user, statuscache):
         assert(user.is_mine)
 
@@ -529,53 +562,17 @@ class PresenceHandler(BaseHandler):
         rm_handler = self.homeserver.get_handlers().room_member_handler
         room_ids = yield rm_handler.get_rooms_for_user(user)
 
-        for room_id in room_ids:
-            yield rm_handler.fetch_room_distributions_into(
-                room_id, localusers=localusers, remotedomains=remotedomains,
-                ignore_user=user,
-            )
-
-        if not localusers and not remotedomains:
+        if not localusers and not room_ids:
             defer.returnValue(None)
 
-        yield self._send_presence_to_distribution(user,
-            localusers=localusers, remotedomains=remotedomains,
-            statuscache=statuscache
+        yield self.push_update_to_local_and_remote(
+            observed_user=user,
+            users_to_push=localusers,
+            remote_domains=remotedomains,
+            room_ids=room_ids,
+            statuscache=statuscache,
         )
 
-    def _send_presence(self, srcuser, destuser, statuscache):
-        if destuser.is_mine:
-            self.push_update_to_clients(
-                observer_user=destuser,
-                observed_user=srcuser,
-                statuscache=statuscache)
-            return defer.succeed(None)
-        else:
-            return self._push_presence_remote(srcuser, destuser.domain,
-                state=statuscache.get_state()
-            )
-
-    @defer.inlineCallbacks
-    def _send_presence_to_distribution(self, srcuser, localusers=set(),
-            remotedomains=set(), statuscache=None):
-
-        for u in localusers:
-            logger.debug(" | push to local user %s", u)
-            self.push_update_to_clients(
-                observer_user=u,
-                observed_user=srcuser,
-                statuscache=statuscache,
-            )
-
-        deferreds = []
-        for domain in remotedomains:
-            logger.debug(" | push to remote domain %s", domain)
-            deferreds.append(self._push_presence_remote(srcuser, domain,
-                state=statuscache.get_state())
-            )
-
-        yield defer.DeferredList(deferreds)
-
     @defer.inlineCallbacks
     def _push_presence_remote(self, user, destination, state=None):
         if state is None:
@@ -591,12 +588,17 @@ class PresenceHandler(BaseHandler):
                 self.clock.time_msec() - state.pop("mtime")
             )
 
+        user_state = {
+            "user_id": user.to_string(),
+        }
+        user_state.update(**state)
+
         yield self.federation.send_edu(
             destination=destination,
             edu_type="m.presence",
             content={
                 "push": [
-                    dict(user_id=user.to_string(), **state),
+                    user_state,
                 ],
             }
         )
@@ -615,12 +617,7 @@ class PresenceHandler(BaseHandler):
             rm_handler = self.homeserver.get_handlers().room_member_handler
             room_ids = yield rm_handler.get_rooms_for_user(user)
 
-            for room_id in room_ids:
-                yield rm_handler.fetch_room_distributions_into(
-                    room_id, localusers=observers, ignore_user=user
-                )
-
-            if not observers:
+            if not observers and not room_ids:
                 break
 
             state = dict(push)
@@ -636,12 +633,12 @@ class PresenceHandler(BaseHandler):
             self._user_cachemap_latest_serial += 1
             statuscache.update(state, serial=self._user_cachemap_latest_serial)
 
-            for observer_user in observers:
-                self.push_update_to_clients(
-                    observer_user=observer_user,
-                    observed_user=user,
-                    statuscache=statuscache,
-                )
+            self.push_update_to_clients(
+                observed_user=user,
+                users_to_push=observers,
+                room_ids=room_ids,
+                statuscache=statuscache,
+            )
 
             if state["state"] == PresenceState.OFFLINE:
                 del self._user_cachemap[user]
@@ -675,12 +672,53 @@ class PresenceHandler(BaseHandler):
 
         yield defer.DeferredList(deferreds)
 
-    def push_update_to_clients(self, observer_user, observed_user,
-                               statuscache):
-        statuscache.make_event(user=observed_user, clock=self.clock)
+    @defer.inlineCallbacks
+    def push_update_to_local_and_remote(self, observed_user,
+                                        users_to_push=[], room_ids=[],
+                                        remote_domains=[],
+                                        statuscache=None):
+
+        localusers, remoteusers = partitionbool(
+            users_to_push,
+            lambda u: u.is_mine
+        )
+
+        localusers = set(localusers)
+
+        self.push_update_to_clients(
+            observed_user=observed_user,
+            users_to_push=localusers,
+            room_ids=room_ids,
+            statuscache=statuscache,
+        )
+
+        remote_domains = set(remote_domains)
+        remote_domains |= set([r.domain for r in remoteusers])
+        for room_id in room_ids:
+            remote_domains.update(
+                (yield self.store.get_joined_hosts_for_room(room_id))
+            )
+
+        remote_domains.discard(self.hs.hostname)
+
+        deferreds = []
+        for domain in remote_domains:
+            logger.debug(" | push to remote domain %s", domain)
+            deferreds.append(
+                self._push_presence_remote(
+                    observed_user, domain, state=statuscache.get_state()
+                )
+            )
+
+        yield defer.DeferredList(deferreds)
+
+        defer.returnValue((localusers, remote_domains))
 
+    def push_update_to_clients(self, observed_user, users_to_push=[],
+                                 room_ids=[], statuscache=None):
         self.notifier.on_new_user_event(
-            [observer_user],
+            users_to_push,
+            room_ids,
         )
 
 
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 3d3fcdabdb..b6d5ec4820 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -119,6 +119,7 @@ class Notifier(object):
                 )
 
     @defer.inlineCallbacks
+    @log_function
     def on_new_user_event(self, users=[], rooms=[]):
         """ Used to inform listeners that something has happend
         presence/user event wise.
diff --git a/synapse/streams/config.py b/synapse/streams/config.py
index 2434844d80..01bab568ff 100644
--- a/synapse/streams/config.py
+++ b/synapse/streams/config.py
@@ -81,4 +81,4 @@ class PaginationConfig(object):
         return (
             "<PaginationConfig from_tok=%s, to_tok=%s, "
             "direction=%s, limit=%s>"
-        ) % (self.from_tok, self.to_tok, self.direction, self.limit)
+        ) % (self.from_token, self.to_token, self.direction, self.limit)
diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py
index 021649071b..b94a749786 100644
--- a/synapse/util/logutils.py
+++ b/synapse/util/logutils.py
@@ -18,6 +18,8 @@ from inspect import getcallargs
 from functools import wraps
 
 import logging
+import inspect
+import traceback
 
 
 def log_function(f):
@@ -65,4 +67,55 @@ def log_function(f):
 
         return f(*args, **kwargs)
 
+    wrapped.__name__ = func_name
+    return wrapped
+
+
+def trace_function(f):
+    func_name = f.__name__
+    linenum = f.func_code.co_firstlineno
+    pathname = f.func_code.co_filename
+
+    def wrapped(*args, **kwargs):
+        name = f.__module__
+        logger = logging.getLogger(name)
+        level = logging.DEBUG
+
+        s = inspect.currentframe().f_back
+
+        to_print = [
+            "\t%s:%s %s. Args: args=%s, kwargs=%s" % (
+                pathname, linenum, func_name, args, kwargs
+            )
+        ]
+        while s:
+            if True or s.f_globals["__name__"].startswith("synapse"):
+                filename, lineno, function, _, _ = inspect.getframeinfo(s)
+                args_string = inspect.formatargvalues(*inspect.getargvalues(s))
+
+                to_print.append(
+                    "\t%s:%d %s. Args: %s" % (
+                        filename, lineno, function, args_string
+                    )
+                )
+
+            s = s.f_back
+
+        msg = "\nTraceback for %s:\n" % (func_name,) + "\n".join(to_print)
+
+        record = logging.LogRecord(
+            name=name,
+            level=level,
+            pathname=pathname,
+            lineno=lineno,
+            msg=msg,
+            args=None,
+            exc_info=None
+        )
+
+        logger.handle(record)
+
+        return f(*args, **kwargs)
+
+    wrapped.__name__ = func_name
     return wrapped
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 8d094fd1f9..fcd7a784cd 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -193,6 +193,8 @@ class PresenceStateTestCase(unittest.TestCase):
             SynapseError
         )
 
+    test_get_disallowed_state.skip = "Presence permissions are disabled"
+
     @defer.inlineCallbacks
     def test_set_my_state(self):
         mocked_set = self.datastore.set_presence_state
@@ -497,6 +499,7 @@ class PresencePushTestCase(unittest.TestCase):
                 db_pool=None,
                 datastore=Mock(spec=[
                     "set_presence_state",
+                    "get_joined_hosts_for_room",
 
                     # Bits that Federation needs
                     "prep_send_transaction",
@@ -511,8 +514,12 @@ class PresencePushTestCase(unittest.TestCase):
             )
         hs.handlers = JustPresenceHandlers(hs)
 
+        def update(*args,**kwargs):
+            # print "mock_update_client: Args=%s, kwargs=%s" %(args, kwargs,)
+            return defer.succeed(None)
+
         self.mock_update_client = Mock()
-        self.mock_update_client.return_value = defer.succeed(None)
+        self.mock_update_client.side_effect = update
 
         self.datastore = hs.get_datastore()
 
@@ -546,6 +553,14 @@ class PresencePushTestCase(unittest.TestCase):
                 return defer.succeed([])
         self.room_member_handler.get_room_members = get_room_members
 
+        def get_room_hosts(room_id):
+            if room_id == "a-room":
+                hosts = set([u.domain for u in self.room_members])
+                return defer.succeed(hosts)
+            else:
+                return defer.succeed([])
+        self.datastore.get_joined_hosts_for_room = get_room_hosts
+
         @defer.inlineCallbacks
         def fetch_room_distributions_into(room_id, localusers=None,
                 remotedomains=None, ignore_user=None):
@@ -611,18 +626,10 @@ class PresencePushTestCase(unittest.TestCase):
                 {"state": ONLINE})
 
         self.mock_update_client.assert_has_calls([
-                call(observer_user=self.u_apple,
+                call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]),
+                    room_ids=["a-room"],
                     observed_user=self.u_apple,
                     statuscache=ANY), # self-reflection
-                call(observer_user=self.u_banana,
-                    observed_user=self.u_apple,
-                    statuscache=ANY),
-                call(observer_user=self.u_clementine,
-                    observed_user=self.u_apple,
-                    statuscache=ANY),
-                call(observer_user=self.u_elderberry,
-                    observed_user=self.u_apple,
-                    statuscache=ANY),
         ], any_order=True)
         self.mock_update_client.reset_mock()
 
@@ -651,7 +658,8 @@ class PresencePushTestCase(unittest.TestCase):
         ], presence)
 
         self.mock_update_client.assert_has_calls([
-                call(observer_user=self.u_banana,
+                call(users_to_push=set([self.u_banana]),
+                    room_ids=[],
                     observed_user=self.u_banana,
                     statuscache=ANY), # self-reflection
         ]) # and no others...
@@ -659,21 +667,21 @@ class PresencePushTestCase(unittest.TestCase):
     @defer.inlineCallbacks
     def test_push_remote(self):
         put_json = self.mock_http_client.put_json
-        put_json.expect_call_and_return(
-            call("remote",
-                path=ANY,  # Can't guarantee which txn ID will be which
-                data=_expect_edu("remote", "m.presence",
-                    content={
-                        "push": [
-                            {"user_id": "@apple:test",
-                             "state": "online",
-                             "mtime_age": 0},
-                        ],
-                    }
-                )
-            ),
-            defer.succeed((200, "OK"))
-        )
+#        put_json.expect_call_and_return(
+#            call("remote",
+#                path=ANY,  # Can't guarantee which txn ID will be which
+#                data=_expect_edu("remote", "m.presence",
+#                    content={
+#                        "push": [
+#                            {"user_id": "@apple:test",
+#                             "state": "online",
+#                             "mtime_age": 0},
+#                        ],
+#                    }
+#                )
+#            ),
+#            defer.succeed((200, "OK"))
+#        )
         put_json.expect_call_and_return(
             call("farm",
                 path=ANY,  # Can't guarantee which txn ID will be which
@@ -681,7 +689,7 @@ class PresencePushTestCase(unittest.TestCase):
                     content={
                         "push": [
                             {"user_id": "@apple:test",
-                             "state": "online",
+                             "state": u"online",
                              "mtime_age": 0},
                         ],
                     }
@@ -730,10 +738,8 @@ class PresencePushTestCase(unittest.TestCase):
         )
 
         self.mock_update_client.assert_has_calls([
-                call(observer_user=self.u_apple,
-                    observed_user=self.u_potato,
-                    statuscache=ANY),
-                call(observer_user=self.u_banana,
+                call(users_to_push=set([self.u_apple]),
+                    room_ids=["a-room"],
                     observed_user=self.u_potato,
                     statuscache=ANY),
         ], any_order=True)
@@ -753,19 +759,17 @@ class PresencePushTestCase(unittest.TestCase):
         )
 
         self.mock_update_client.assert_has_calls([
-            # Apple and Elderberry see each other
-            call(observer_user=self.u_apple,
+            call(room_ids=["a-room"],
                 observed_user=self.u_elderberry,
+                users_to_push=set(),
                 statuscache=ANY),
-            call(observer_user=self.u_elderberry,
+            call(users_to_push=set([self.u_elderberry]),
                 observed_user=self.u_apple,
+                room_ids=[],
                 statuscache=ANY),
-            # Banana and Elderberry see each other
-            call(observer_user=self.u_banana,
-                observed_user=self.u_elderberry,
-                statuscache=ANY),
-            call(observer_user=self.u_elderberry,
+            call(users_to_push=set([self.u_elderberry]),
                 observed_user=self.u_banana,
+                room_ids=[],
                 statuscache=ANY),
         ], any_order=True)
 
@@ -887,7 +891,12 @@ class PresencePollingTestCase(unittest.TestCase):
         self.datastore.get_received_txn_response = get_received_txn_response
 
         self.mock_update_client = Mock()
-        self.mock_update_client.return_value = defer.succeed(None)
+
+        def update(*args,**kwargs):
+            # print "mock_update_client: Args=%s, kwargs=%s" %(args, kwargs,)
+            return defer.succeed(None)
+
+        self.mock_update_client.side_effect = update
 
         self.handler = hs.get_handlers().presence_handler
         self.handler.push_update_to_clients = self.mock_update_client
@@ -951,10 +960,10 @@ class PresencePollingTestCase(unittest.TestCase):
 
         # apple should see both banana and clementine currently offline
         self.mock_update_client.assert_has_calls([
-                call(observer_user=self.u_apple,
+                call(users_to_push=[self.u_apple],
                     observed_user=self.u_banana,
                     statuscache=ANY),
-                call(observer_user=self.u_apple,
+                call(users_to_push=[self.u_apple],
                     observed_user=self.u_clementine,
                     statuscache=ANY),
         ], any_order=True)
@@ -974,10 +983,11 @@ class PresencePollingTestCase(unittest.TestCase):
 
         # apple and banana should now both see each other online
         self.mock_update_client.assert_has_calls([
-                call(observer_user=self.u_apple,
+                call(users_to_push=set([self.u_apple]),
                     observed_user=self.u_banana,
+                    room_ids=[],
                     statuscache=ANY),
-                call(observer_user=self.u_banana,
+                call(users_to_push=[self.u_banana],
                     observed_user=self.u_apple,
                     statuscache=ANY),
         ], any_order=True)
@@ -994,8 +1004,9 @@ class PresencePollingTestCase(unittest.TestCase):
 
         # banana should now be told apple is offline
         self.mock_update_client.assert_has_calls([
-                call(observer_user=self.u_banana,
+                call(users_to_push=set([self.u_banana, self.u_apple]),
                     observed_user=self.u_apple,
+                    room_ids=[],
                     statuscache=ANY),
         ], any_order=True)
 
@@ -1008,7 +1019,7 @@ class PresencePollingTestCase(unittest.TestCase):
         put_json = self.mock_http_client.put_json
         put_json.expect_call_and_return(
             call("remote",
-                path="/matrix/federation/v1/send/1000000/",
+                path=ANY,
                 data=_expect_edu("remote", "m.presence",
                     content={
                         "poll": [ "@potato:remote" ],
@@ -1018,6 +1029,18 @@ class PresencePollingTestCase(unittest.TestCase):
             defer.succeed((200, "OK"))
         )
 
+        put_json.expect_call_and_return(
+            call("remote",
+                path=ANY,
+                data=_expect_edu("remote", "m.presence",
+                    content={
+                        "push": [ {"user_id": "@clementine:test" }],
+                    },
+                ),
+            ),
+            defer.succeed((200, "OK"))
+        )
+
         # clementine goes online
         yield self.handler.set_state(
                 target_user=self.u_clementine, auth_user=self.u_clementine,
@@ -1032,15 +1055,28 @@ class PresencePollingTestCase(unittest.TestCase):
         self.assertTrue(self.u_clementine in
                 self.handler._remote_recvmap[self.u_potato])
 
+
+        put_json.expect_call_and_return(
+            call("remote",
+                path=ANY,
+                data=_expect_edu("remote", "m.presence",
+                    content={
+                        "push": [ {"user_id": "@fig:test" }],
+                    },
+                ),
+            ),
+            defer.succeed((200, "OK"))
+        )
+
         # fig goes online; shouldn't send a second poll
         yield self.handler.set_state(
             target_user=self.u_fig, auth_user=self.u_fig,
             state={"state": ONLINE}
         )
 
-        reactor.iterate(delay=0)
+        # reactor.iterate(delay=0)
 
-        put_json.assert_had_no_calls()
+        yield put_json.await_calls()
 
         # fig goes offline
         yield self.handler.set_state(
@@ -1054,7 +1090,7 @@ class PresencePollingTestCase(unittest.TestCase):
 
         put_json.expect_call_and_return(
             call("remote",
-                path="/matrix/federation/v1/send/1000001/",
+                path=ANY,
                 data=_expect_edu("remote", "m.presence",
                     content={
                         "unpoll": [ "@potato:remote" ],
@@ -1069,7 +1105,7 @@ class PresencePollingTestCase(unittest.TestCase):
                 target_user=self.u_clementine, auth_user=self.u_clementine,
                 state={"state": OFFLINE})
 
-        put_json.await_calls()
+        yield put_json.await_calls()
 
         self.assertFalse(self.u_potato in self.handler._remote_recvmap,
             msg="expected potato not to be in _remote_recvmap"
diff --git a/tests/handlers/test_presencelike.py b/tests/handlers/test_presencelike.py
index da06a06647..e81d7ce101 100644
--- a/tests/handlers/test_presencelike.py
+++ b/tests/handlers/test_presencelike.py
@@ -81,7 +81,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
 
         self.replication = hs.get_replication_layer()
         self.replication.send_edu = Mock()
-        self.replication.send_edu.return_value = defer.succeed((200, "OK"))
+
+        def send_edu(*args, **kwargs):
+            # print "send_edu: %s, %s" % (args, kwargs)
+            return defer.succeed((200, "OK"))
+        self.replication.send_edu.side_effect = send_edu
 
         def get_profile_displayname(user_localpart):
             return defer.succeed("Frank")
@@ -95,11 +99,12 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
             return defer.succeed("http://foo")
         self.datastore.get_profile_avatar_url = get_profile_avatar_url
 
+        self.presence_list = [
+            {"observed_user_id": "@banana:test"},
+            {"observed_user_id": "@clementine:test"},
+        ]
         def get_presence_list(user_localpart, accepted=None):
-            return defer.succeed([
-                {"observed_user_id": "@banana:test"},
-                {"observed_user_id": "@clementine:test"},
-            ])
+            return defer.succeed(self.presence_list)
         self.datastore.get_presence_list = get_presence_list
 
         def do_users_share_a_room(userlist):
@@ -109,7 +114,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
         self.handlers = hs.get_handlers()
 
         self.mock_update_client = Mock()
-        self.mock_update_client.return_value = defer.succeed(None)
+        def update(*args, **kwargs):
+            # print "mock_update_client: %s, %s" %(args, kwargs)
+            return defer.succeed(None)
+        self.mock_update_client.side_effect = update
 
         self.handlers.presence_handler.push_update_to_clients = (
                 self.mock_update_client)
@@ -130,6 +138,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
 
     @defer.inlineCallbacks
     def test_set_my_state(self):
+        self.presence_list = [
+            {"observed_user_id": "@banana:test"},
+            {"observed_user_id": "@clementine:test"},
+        ]
+
         mocked_set = self.datastore.set_presence_state
         mocked_set.return_value = defer.succeed({"state": OFFLINE})
 
@@ -142,6 +155,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
 
     @defer.inlineCallbacks
     def test_push_local(self):
+        self.presence_list = [
+            {"observed_user_id": "@banana:test"},
+            {"observed_user_id": "@clementine:test"},
+        ]
+
         self.datastore.set_presence_state.return_value = defer.succeed(
                 {"state": ONLINE})
 
@@ -173,12 +191,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
         presence)
 
         self.mock_update_client.assert_has_calls([
-            call(observer_user=self.u_apple,
+            call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]),
+                room_ids=[],
                 observed_user=self.u_apple,
                 statuscache=ANY), # self-reflection
-            call(observer_user=self.u_banana,
-                observed_user=self.u_apple,
-                statuscache=ANY),
         ], any_order=True)
 
         statuscache = self.mock_update_client.call_args[1]["statuscache"]
@@ -198,12 +214,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
                 self.u_apple, "I am an Apple")
 
         self.mock_update_client.assert_has_calls([
-            call(observer_user=self.u_apple,
+            call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]),
+                room_ids=[],
                 observed_user=self.u_apple,
                 statuscache=ANY), # self-reflection
-            call(observer_user=self.u_banana,
-                observed_user=self.u_apple,
-                statuscache=ANY),
         ], any_order=True)
 
         statuscache = self.mock_update_client.call_args[1]["statuscache"]
@@ -217,6 +231,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
 
     @defer.inlineCallbacks
     def test_push_remote(self):
+        self.presence_list = [
+            {"observed_user_id": "@potato:remote"},
+        ]
+
         self.datastore.set_presence_state.return_value = defer.succeed(
                 {"state": ONLINE})
 
@@ -247,6 +265,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
 
     @defer.inlineCallbacks
     def test_recv_remote(self):
+        self.presence_list = [
+            {"observed_user_id": "@banana:test"},
+            {"observed_user_id": "@clementine:test"},
+        ]
+
         # TODO(paul): Gut-wrenching
         potato_set = self.handlers.presence_handler._remote_recvmap.setdefault(
                 self.u_potato, set())
@@ -264,7 +287,8 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
         )
 
         self.mock_update_client.assert_called_with(
-            observer_user=self.u_apple,
+            users_to_push=set([self.u_apple]),
+            room_ids=[],
             observed_user=self.u_potato,
             statuscache=ANY)
 
diff --git a/tests/utils.py b/tests/utils.py
index 98d4f9ed58..37b759febc 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -21,7 +21,7 @@ from synapse.api.events.room import (
     RoomMemberEvent, MessageEvent
 )
 
-from twisted.internet import defer
+from twisted.internet import defer, reactor
 
 from collections import namedtuple
 from mock import patch, Mock
@@ -263,18 +263,43 @@ class DeferredMockCallable(object):
                 d.callback(None)
                 return result
 
-        raise AssertionError("Was not expecting call(%s)" %
+        failure = AssertionError("Was not expecting call(%s)" %
             _format_call(args, kwargs)
         )
 
+        for _, _, d in self.expectations:
+            try:
+                d.errback(failure)
+            except:
+                pass
+
+        raise failure
+
     def expect_call_and_return(self, call, result):
         self.expectations.append((call, result, defer.Deferred()))
 
     @defer.inlineCallbacks
-    def await_calls(self):
-        while self.expectations:
-            (_, _, d) = self.expectations.pop(0)
-            yield d
+    def await_calls(self, timeout=1000):
+        deferred = defer.DeferredList(
+            [d for _, _, d in self.expectations],
+            fireOnOneErrback=True
+        )
+
+        timer = reactor.callLater(
+            timeout/1000,
+            deferred.errback,
+            AssertionError(
+                "%d pending calls left: %s"% (
+                    len([e for e in self.expectations if not e[2].called]),
+                    [e for e in self.expectations if not e[2].called]
+                )
+            )
+        )
+
+        yield deferred
+
+        timer.cancel()
+
         self.calls = []
 
     def assert_had_no_calls(self):