summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--docs/code_style.rst37
-rwxr-xr-xsetup.py6
-rw-r--r--synapse/federation/replication.py93
-rw-r--r--synapse/federation/transport.py2
-rw-r--r--synapse/handlers/federation.py7
-rw-r--r--synapse/handlers/presence.py12
-rw-r--r--synapse/handlers/room.py7
-rw-r--r--synapse/handlers/typing.py120
-rw-r--r--synapse/http/matrixfederationclient.py35
-rw-r--r--synapse/rest/room.py32
-rw-r--r--synapse/rest/transactions.py2
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/_base.py2
-rw-r--r--synapse/storage/schema/delta/v9.sql23
-rw-r--r--synapse/storage/schema/transactions.sql6
-rw-r--r--synapse/storage/transactions.py106
-rw-r--r--tests/federation/test_federation.py5
-rw-r--r--tests/handlers/test_federation.py2
-rw-r--r--tests/handlers/test_presence.py10
-rw-r--r--tests/handlers/test_room.py44
-rw-r--r--tests/handlers/test_typing.py136
-rw-r--r--tests/rest/test_rooms.py4
-rw-r--r--tests/rest/test_typing.py113
-rw-r--r--tests/test_test_utils.py70
-rw-r--r--tests/utils.py29
26 files changed, 825 insertions, 82 deletions
diff --git a/.gitignore b/.gitignore
index 339a99e0d6..af90668c89 100644
--- a/.gitignore
+++ b/.gitignore
@@ -38,3 +38,5 @@ graph/*.dot
 **/webclient/test/environment-protractor.js
 
 uploads
+
+.idea/
diff --git a/docs/code_style.rst b/docs/code_style.rst
index d7e2d5e69e..dc40a7ab7b 100644
--- a/docs/code_style.rst
+++ b/docs/code_style.rst
@@ -1,10 +1,14 @@
 Basically, PEP8
 
-- Max line width: 80 chars.
+- NEVER tabs. 4 spaces to indent.
+- Max line width: 79 chars (with flexibility to overflow by a "few chars" if
+  the overflowing content is not semantically significant and avoids an
+  explosion of vertical whitespace).
 - Use camel case for class and type names
 - Use underscores for functions and variables.
 - Use double quotes.
-- Use parentheses instead of '\' for line continuation where ever possible (which is pretty much everywhere)
+- Use parentheses instead of '\\' for line continuation where ever possible
+  (which is pretty much everywhere)
 - There should be max a single new line between:
     - statements
     - functions in a class
@@ -14,5 +18,32 @@ Basically, PEP8
     - a single space after a comma
     - a single space before and after for '=' when used as assignment
     - no spaces before and after for '=' for default values and keyword arguments.
+- Indenting must follow PEP8; either hanging indent or multiline-visual indent
+  depending on the size and shape of the arguments and what makes more sense to
+  the author. In other words, both this::
 
-Comments should follow the google code style. This is so that we can generate documentation with sphinx (http://sphinxcontrib-napoleon.readthedocs.org/en/latest/) 
+    print("I am a fish %s" % "moo")
+
+  and this::
+
+    print("I am a fish %s" %
+          "moo")
+
+  and this::
+
+    print(
+        "I am a fish %s" %
+        "moo"
+    )
+
+  ...are valid, although given each one takes up 2x more vertical space than
+  the previous, it's up to the author's discretion as to which layout makes most
+  sense for their function invocation.  (e.g. if they want to add comments
+  per-argument, or put expressions in the arguments, or group related arguments
+  together, or want to deliberately extend or preserve vertical/horizontal
+  space)
+
+Comments should follow the google code style. This is so that we can generate
+documentation with sphinx (http://sphinxcontrib-napoleon.readthedocs.org/en/latest/) 
+
+Code should pass pep8 --max-line-length=100 without any warnings.
diff --git a/setup.py b/setup.py
index 9b38f790b9..9bee64e368 100755
--- a/setup.py
+++ b/setup.py
@@ -32,7 +32,7 @@ setup(
     description="Reference Synapse Home Server",
     install_requires=[
         "syutil==0.0.2",
-        "matrix_angular_sdk==0.5.1",
+        "matrix_angular_sdk==0.5.3b",
         "Twisted>=14.0.0",
         "service_identity>=1.0.0",
         "pyopenssl>=0.14",
@@ -45,7 +45,7 @@ setup(
     dependency_links=[
         "https://github.com/matrix-org/syutil/tarball/v0.0.2#egg=syutil-0.0.2",
         "https://github.com/pyca/pynacl/tarball/d4d3175589b892f6ea7c22f466e0e223853516fa#egg=pynacl-0.3.0",
-        "https://github.com/matrix-org/matrix-angular-sdk/tarball/v0.5.1/#egg=matrix_angular_sdk-0.5.1",
+        "https://github.com/matrix-org/matrix-angular-sdk/tarball/v0.5.3b/#egg=matrix_angular_sdk-0.5.3b",
     ],
     setup_requires=[
         "setuptools_trial",
@@ -59,6 +59,6 @@ setup(
     entry_points="""
     [console_scripts]
     synctl=synapse.app.synctl:main
-    synapse-homeserver=synapse.app.homeserver:run
+    synapse-homeserver=synapse.app.homeserver:main
     """
 )
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 01f87fe423..0cb632fb08 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -334,7 +334,7 @@ class ReplicationLayer(object):
             defer.returnValue(response)
             return
 
-        logger.debug("[%s] Transacition is new", transaction.transaction_id)
+        logger.debug("[%s] Transaction is new", transaction.transaction_id)
 
         with PreserveLoggingContext():
             dl = []
@@ -685,6 +685,7 @@ class _TransactionQueue(object):
         self.transport_layer = transport_layer
 
         self._clock = hs.get_clock()
+        self.store = hs.get_datastore()
 
         # Is a mapping from destinations -> deferreds. Used to keep track
         # of which destinations have transactions in flight and when they are
@@ -728,8 +729,14 @@ class _TransactionQueue(object):
                 (pdu, deferred, order)
             )
 
+            def eb(failure):
+                if not deferred.called:
+                    deferred.errback(failure)
+                else:
+                    logger.warn("Failed to send pdu", failure)
+
             with PreserveLoggingContext():
-                self._attempt_new_transaction(destination)
+                self._attempt_new_transaction(destination).addErrback(eb)
 
             deferreds.append(deferred)
 
@@ -739,6 +746,9 @@ class _TransactionQueue(object):
     def enqueue_edu(self, edu):
         destination = edu.destination
 
+        if destination == self.server_name:
+            return
+
         deferred = defer.Deferred()
         self.pending_edus_by_dest.setdefault(destination, []).append(
             (edu, deferred)
@@ -748,7 +758,7 @@ class _TransactionQueue(object):
             if not deferred.called:
                 deferred.errback(failure)
             else:
-                logger.exception("Failed to send edu", failure)
+                logger.warn("Failed to send edu", failure)
 
         with PreserveLoggingContext():
             self._attempt_new_transaction(destination).addErrback(eb)
@@ -770,10 +780,33 @@ class _TransactionQueue(object):
     @defer.inlineCallbacks
     @log_function
     def _attempt_new_transaction(self, destination):
+
+        (retry_last_ts, retry_interval) = (0, 0)
+        retry_timings = yield self.store.get_destination_retry_timings(
+            destination
+        )
+        if retry_timings:
+            (retry_last_ts, retry_interval) = (
+                retry_timings.retry_last_ts, retry_timings.retry_interval
+            )
+            if retry_last_ts + retry_interval > int(self._clock.time_msec()):
+                logger.info(
+                    "TX [%s] not ready for retry yet - "
+                    "dropping transaction for now",
+                    destination,
+                )
+                return
+            else:
+                logger.info("TX [%s] is ready for retry", destination)
+
         if destination in self.pending_transactions:
+            # XXX: pending_transactions can get stuck on by a never-ending
+            # request at which point pending_pdus_by_dest just keeps growing.
+            # we need application-layer timeouts of some flavour of these
+            # requests
             return
 
-        #  list of (pending_pdu, deferred, order)
+        # list of (pending_pdu, deferred, order)
         pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
         pending_edus = self.pending_edus_by_dest.pop(destination, [])
         pending_failures = self.pending_failures_by_dest.pop(destination, [])
@@ -781,7 +814,14 @@ class _TransactionQueue(object):
         if not pending_pdus and not pending_edus and not pending_failures:
             return
 
-        logger.debug("TX [%s] Attempting new transaction", destination)
+        logger.debug(
+            "TX [%s] Attempting new transaction "
+            "(pdus: %d, edus: %d, failures: %d)",
+            destination,
+            len(pending_pdus),
+            len(pending_edus),
+            len(pending_failures)
+        )
 
         # Sort based on the order field
         pending_pdus.sort(key=lambda t: t[2])
@@ -814,7 +854,11 @@ class _TransactionQueue(object):
             yield self.transaction_actions.prepare_to_send(transaction)
 
             logger.debug("TX [%s] Persisted transaction", destination)
-            logger.debug("TX [%s] Sending transaction...", destination)
+            logger.info(
+                "TX [%s] Sending transaction [%s]",
+                destination,
+                transaction.transaction_id,
+            )
 
             # Actually send the transaction
 
@@ -835,6 +879,8 @@ class _TransactionQueue(object):
                 transaction, json_data_cb
             )
 
+            logger.info("TX [%s] got %d response", destination, code)
+
             logger.debug("TX [%s] Sent transaction", destination)
             logger.debug("TX [%s] Marking as delivered...", destination)
 
@@ -847,8 +893,14 @@ class _TransactionQueue(object):
 
             for deferred in deferreds:
                 if code == 200:
+                    if retry_last_ts:
+                        # this host is alive! reset retry schedule
+                        yield self.store.set_destination_retry_timings(
+                            destination, 0, 0
+                        )
                     deferred.callback(None)
                 else:
+                    self.set_retrying(destination, retry_interval)
                     deferred.errback(RuntimeError("Got status %d" % code))
 
                 # Ensures we don't continue until all callbacks on that
@@ -861,11 +913,15 @@ class _TransactionQueue(object):
             logger.debug("TX [%s] Yielded to callbacks", destination)
 
         except Exception as e:
-            logger.error("TX Problem in _attempt_transaction")
-
             # We capture this here as there as nothing actually listens
             # for this finishing functions deferred.
-            logger.exception(e)
+            logger.warn(
+                "TX [%s] Problem in _attempt_transaction: %s",
+                destination,
+                e,
+            )
+
+            self.set_retrying(destination, retry_interval)
 
             for deferred in deferreds:
                 if not deferred.called:
@@ -877,3 +933,22 @@ class _TransactionQueue(object):
 
             # Check to see if there is anything else to send.
             self._attempt_new_transaction(destination)
+
+    @defer.inlineCallbacks
+    def set_retrying(self, destination, retry_interval):
+        # track that this destination is having problems and we should
+        # give it a chance to recover before trying it again
+
+        if retry_interval:
+            retry_interval *= 2
+            # plateau at hourly retries for now
+            if retry_interval >= 60 * 60 * 1000:
+                retry_interval = 60 * 60 * 1000
+        else:
+            retry_interval = 2000  # try again at first after 2 seconds
+
+        yield self.store.set_destination_retry_timings(
+            destination,
+            int(self._clock.time_msec()),
+            retry_interval
+        )
diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py
index 8d86152085..0f11c6d491 100644
--- a/synapse/federation/transport.py
+++ b/synapse/federation/transport.py
@@ -155,7 +155,7 @@ class TransportLayer(object):
     @defer.inlineCallbacks
     @log_function
     def send_transaction(self, transaction, json_data_callback=None):
-        """ Sends the given Transaction to it's destination
+        """ Sends the given Transaction to its destination
 
         Args:
             transaction (Transaction)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 925eb5376e..cfb5029774 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -207,6 +207,13 @@ class FederationHandler(BaseHandler):
                 e.msg,
                 affected=event.event_id,
             )
+            
+        # if we're receiving valid events from an origin,
+        # it's probably a good idea to mark it as not in retry-state
+        # for sending (although this is a bit of a leap)
+        retry_timings = yield self.store.get_destination_retry_timings(origin)
+        if (retry_timings and retry_timings.retry_last_ts):
+            self.store.set_destination_retry_timings(origin, 0, 0)
 
         room = yield self.store.get_room(event.room_id)
 
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 815d40f166..84a039489f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -659,10 +659,6 @@ class PresenceHandler(BaseHandler):
             if room_ids:
                 logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids)
 
-            if not observers and not room_ids:
-                logger.debug(" | no interested observers or room IDs")
-                continue
-
             state = dict(push)
             del state["user_id"]
 
@@ -683,6 +679,10 @@ class PresenceHandler(BaseHandler):
             self._user_cachemap_latest_serial += 1
             statuscache.update(state, serial=self._user_cachemap_latest_serial)
 
+            if not observers and not room_ids:
+                logger.debug(" | no interested observers or room IDs")
+                continue
+
             self.push_update_to_clients(
                 observed_user=user,
                 users_to_push=observers,
@@ -804,6 +804,7 @@ class PresenceEventSource(object):
             )
 
     @defer.inlineCallbacks
+    @log_function
     def get_new_events_for_user(self, user, from_key, limit):
         from_key = int(from_key)
 
@@ -816,7 +817,8 @@ class PresenceEventSource(object):
         # TODO(paul): use a DeferredList ? How to limit concurrency.
         for observed_user in cachemap.keys():
             cached = cachemap[observed_user]
-            if not (from_key < cached.serial):
+
+            if cached.serial <= from_key:
                 continue
 
             if (yield self.is_visible(observer_user, observed_user)):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index a000b44036..c802e8f69d 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -260,6 +260,7 @@ class RoomMemberHandler(BaseHandler):
 
         self.distributor = hs.get_distributor()
         self.distributor.declare("user_joined_room")
+        self.distributor.declare("user_left_room")
 
     @defer.inlineCallbacks
     def get_room_members(self, room_id, membership=Membership.JOIN):
@@ -387,6 +388,12 @@ class RoomMemberHandler(BaseHandler):
                 do_auth=do_auth,
             )
 
+            if prev_state and prev_state.membership == Membership.JOIN:
+                user = self.hs.parse_userid(event.user_id)
+                self.distributor.fire(
+                    "user_left_room", user=user, room_id=event.room_id
+                )
+
         defer.returnValue({"room_id": room_id})
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index d88a53242c..46a0b299a1 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -43,7 +43,23 @@ class TypingNotificationHandler(BaseHandler):
 
         self.federation.register_edu_handler("m.typing", self._recv_edu)
 
-        self._member_typing_until = {}
+        hs.get_distributor().observe("user_left_room", self.user_left_room)
+
+        self._member_typing_until = {} # clock time we expect to stop
+        self._member_typing_timer = {} # deferreds to manage theabove
+
+        # map room IDs to serial numbers
+        self._room_serials = {}
+        self._latest_room_serial = 0
+        # map room IDs to sets of users currently typing
+        self._room_typing = {}
+
+    def tearDown(self):
+        """Cancels all the pending timers.
+        Normally this shouldn't be needed, but it's required from unit tests
+        to avoid a "Reactor was unclean" warning."""
+        for t in self._member_typing_timer.values():
+            self.clock.cancel_call_later(t)
 
     @defer.inlineCallbacks
     def started_typing(self, target_user, auth_user, room_id, timeout):
@@ -53,12 +69,24 @@ class TypingNotificationHandler(BaseHandler):
         if target_user != auth_user:
             raise AuthError(400, "Cannot set another user's typing state")
 
+        yield self.auth.check_joined_room(room_id, target_user.to_string())
+
+        logger.debug(
+            "%s has started typing in %s", target_user.to_string(), room_id
+        )
+
         until = self.clock.time_msec() + timeout
         member = RoomMember(room_id=room_id, user=target_user)
 
         was_present = member in self._member_typing_until
 
+        if member in self._member_typing_timer:
+            self.clock.cancel_call_later(self._member_typing_timer[member])
+
         self._member_typing_until[member] = until
+        self._member_typing_timer[member] = self.clock.call_later(
+            timeout / 1000, lambda: self._stopped_typing(member)
+        )
 
         if was_present:
             # No point sending another notification
@@ -78,18 +106,39 @@ class TypingNotificationHandler(BaseHandler):
         if target_user != auth_user:
             raise AuthError(400, "Cannot set another user's typing state")
 
+        yield self.auth.check_joined_room(room_id, target_user.to_string())
+
+        logger.debug(
+            "%s has stopped typing in %s", target_user.to_string(), room_id
+        )
+
         member = RoomMember(room_id=room_id, user=target_user)
 
+        yield self._stopped_typing(member)
+
+    @defer.inlineCallbacks
+    def user_left_room(self, user, room_id):
+        if user.is_mine:
+            member = RoomMember(room_id=room_id, user=user)
+            yield self._stopped_typing(member)
+
+    @defer.inlineCallbacks
+    def _stopped_typing(self, member):
         if member not in self._member_typing_until:
             # No point
             defer.returnValue(None)
 
         yield self._push_update(
-            room_id=room_id,
-            user=target_user,
+            room_id=member.room_id,
+            user=member.user,
             typing=False,
         )
 
+        del self._member_typing_until[member]
+
+        self.clock.cancel_call_later(self._member_typing_timer[member])
+        del self._member_typing_timer[member]
+
     @defer.inlineCallbacks
     def _push_update(self, room_id, user, typing):
         localusers = set()
@@ -101,12 +150,11 @@ class TypingNotificationHandler(BaseHandler):
             ignore_user=user
         )
 
-        for u in localusers:
-            self.push_update_to_clients(
+        if localusers:
+            self._push_update_local(
                 room_id=room_id,
-                observer_user=u,
-                observed_user=user,
-                typing=typing,
+                user=user,
+                typing=typing
             )
 
         deferreds = []
@@ -135,29 +183,65 @@ class TypingNotificationHandler(BaseHandler):
             room_id, localusers=localusers
         )
 
-        for u in localusers:
-            self.push_update_to_clients(
+        if localusers:
+            self._push_update_local(
                 room_id=room_id,
-                observer_user=u,
-                observed_user=user,
+                user=user,
                 typing=content["typing"]
             )
 
-    def push_update_to_clients(self, room_id, observer_user, observed_user,
-                               typing):
-        # TODO(paul) steal this from presence.py
-        pass
+    def _push_update_local(self, room_id, user, typing):
+        if room_id not in self._room_serials:
+            self._room_serials[room_id] = 0
+            self._room_typing[room_id] = set()
+
+        room_set = self._room_typing[room_id]
+        if typing:
+            room_set.add(user)
+        elif user in room_set:
+            room_set.remove(user)
+
+        self._latest_room_serial += 1
+        self._room_serials[room_id] = self._latest_room_serial
+
+        self.notifier.on_new_user_event(rooms=[room_id])
 
 
 class TypingNotificationEventSource(object):
     def __init__(self, hs):
         self.hs = hs
+        self._handler = None
+
+    def handler(self):
+        # Avoid cyclic dependency in handler setup
+        if not self._handler:
+            self._handler = self.hs.get_handlers().typing_notification_handler
+        return self._handler
+
+    def _make_event_for(self, room_id):
+        typing = self.handler()._room_typing[room_id]
+        return {
+            "type": "m.typing",
+            "room_id": room_id,
+            "typing": [u.to_string() for u in typing],
+        }
 
     def get_new_events_for_user(self, user, from_key, limit):
-        return ([], from_key)
+        from_key = int(from_key)
+        handler = self.handler()
+
+        events = []
+        for room_id in handler._room_serials:
+            if handler._room_serials[room_id] <= from_key:
+                continue
+
+            # TODO: check if user is in room
+            events.append(self._make_event_for(room_id))
+
+        return (events, handler._latest_room_serial)
 
     def get_current_key(self):
-        return 0
+        return self.handler()._latest_room_serial
 
     def get_pagination_rows(self, user, pagination_config, key):
         return ([], pagination_config.from_key)
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 510f07dd7b..fc5b5ab809 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -89,8 +89,8 @@ class MatrixFederationHttpClient(object):
             ("", "", path_bytes, param_bytes, query_bytes, "",)
         )
 
-        logger.debug("Sending request to %s: %s %s",
-                     destination, method, url_bytes)
+        logger.info("Sending request to %s: %s %s",
+                    destination, method, url_bytes)
 
         logger.debug(
             "Types: %s",
@@ -101,6 +101,8 @@ class MatrixFederationHttpClient(object):
             ]
         )
 
+        # XXX: Would be much nicer to retry only at the transaction-layer
+        # (once we have reliable transactions in place)
         retries_left = 5
 
         endpoint = self._getEndpoint(reactor, destination)
@@ -127,11 +129,20 @@ class MatrixFederationHttpClient(object):
                 break
             except Exception as e:
                 if not retry_on_dns_fail and isinstance(e, DNSLookupError):
-                    logger.warn("DNS Lookup failed to %s with %s", destination,
-                                e)
+                    logger.warn(
+                        "DNS Lookup failed to %s with %s",
+                        destination,
+                        e
+                    )
                     raise SynapseError(400, "Domain specified not found.")
 
-                logger.exception("Got error in _create_request")
+                logger.warn(
+                    "Sending request failed to %s: %s %s : %s",
+                    destination,
+                    method,
+                    url_bytes,
+                    e
+                )
                 _print_ex(e)
 
                 if retries_left:
@@ -140,15 +151,21 @@ class MatrixFederationHttpClient(object):
                 else:
                     raise
 
+        logger.info(
+            "Received response %d %s for %s: %s %s",
+            response.code,
+            response.phrase,
+            destination,
+            method,
+            url_bytes
+        )
+
         if 200 <= response.code < 300:
             # We need to update the transactions table to say it was sent?
             pass
         else:
             # :'(
             # Update transactions table?
-            logger.error(
-                "Got response %d %s", response.code, response.phrase
-            )
             raise CodeMessageException(
                 response.code, response.phrase
             )
@@ -284,7 +301,7 @@ def _print_ex(e):
         for ex in e.reasons:
             _print_ex(ex)
     else:
-        logger.exception(e)
+        logger.warn(e)
 
 
 class _JsonProducer(object):
diff --git a/synapse/rest/room.py b/synapse/rest/room.py
index 3147d7a60b..e7f28c2784 100644
--- a/synapse/rest/room.py
+++ b/synapse/rest/room.py
@@ -483,6 +483,37 @@ class RoomRedactEventRestServlet(RestServlet):
         defer.returnValue(response)
 
 
+class RoomTypingRestServlet(RestServlet):
+    PATTERN = client_path_pattern("/rooms/(?P<room_id>[^/]*)/typing/(?P<user_id>[^/]*)$")
+
+    @defer.inlineCallbacks
+    def on_PUT(self, request, room_id, user_id):
+        auth_user = yield self.auth.get_user_by_req(request)
+
+        room_id = urllib.unquote(room_id)
+        target_user = self.hs.parse_userid(urllib.unquote(user_id))
+
+        content = _parse_json(request)
+
+        typing_handler = self.handlers.typing_notification_handler
+
+        if content["typing"]:
+            yield typing_handler.started_typing(
+                target_user=target_user,
+                auth_user=auth_user,
+                room_id=room_id,
+                timeout=content.get("timeout", 30000),
+            )
+        else:
+            yield typing_handler.stopped_typing(
+                target_user=target_user,
+                auth_user=auth_user,
+                room_id=room_id,
+            )
+
+        defer.returnValue((200, {}))
+
+
 def _parse_json(request):
     try:
         content = json.loads(request.content.read())
@@ -538,3 +569,4 @@ def register_servlets(hs, http_server):
     RoomStateRestServlet(hs).register(http_server)
     RoomInitialSyncRestServlet(hs).register(http_server)
     RoomRedactEventRestServlet(hs).register(http_server)
+    RoomTypingRestServlet(hs).register(http_server)
diff --git a/synapse/rest/transactions.py b/synapse/rest/transactions.py
index 93c0122f30..8c41ab4edb 100644
--- a/synapse/rest/transactions.py
+++ b/synapse/rest/transactions.py
@@ -19,7 +19,7 @@ import logging
 
 logger = logging.getLogger(__name__)
 
-
+# FIXME: elsewhere we use FooStore to indicate something in the storage layer...
 class HttpTransactionStore(object):
 
     def __init__(self):
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f15e3dfe62..04ab39341d 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -67,7 +67,7 @@ SCHEMAS = [
 
 # Remember to update this number every time an incompatible change is made to
 # database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 8
+SCHEMA_VERSION = 9
 
 
 class _RollbackButIsFineException(Exception):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 4881f03368..e72200e2f7 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -650,7 +650,7 @@ class JoinHelper(object):
     to dump the results into.
 
     Attributes:
-        taples (list): List of `Table` classes
+        tables (list): List of `Table` classes
         EntryType (type)
     """
 
diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql
new file mode 100644
index 0000000000..ad680c64da
--- /dev/null
+++ b/synapse/storage/schema/delta/v9.sql
@@ -0,0 +1,23 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+    destination TEXT PRIMARY KEY,
+    retry_last_ts INTEGER,
+    retry_interval INTEGER
+);
+
+PRAGMA user_version = 9;
\ No newline at end of file
diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/transactions.sql
index 88e3e4e04d..de461bfa15 100644
--- a/synapse/storage/schema/transactions.sql
+++ b/synapse/storage/schema/transactions.sql
@@ -59,3 +59,9 @@ CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(tra
 CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
 CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination);
 
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+    destination TEXT PRIMARY KEY,
+    retry_last_ts INTEGER,
+    retry_interval INTEGER
+);
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 00d0f48082..423cc3f02a 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -17,6 +17,8 @@ from ._base import SQLBaseStore, Table
 
 from collections import namedtuple
 
+from twisted.internet import defer
+
 import logging
 
 logger = logging.getLogger(__name__)
@@ -26,6 +28,10 @@ class TransactionStore(SQLBaseStore):
     """A collection of queries for handling PDUs.
     """
 
+    # a write-through cache of DestinationsTable.EntryType indexed by
+    # destination string
+    destination_retry_cache = {}
+
     def get_received_txn_response(self, transaction_id, origin):
         """For an incoming transaction from a given origin, check if we have
         already responded to it. If so, return the response code and response
@@ -114,7 +120,7 @@ class TransactionStore(SQLBaseStore):
     def _prep_send_transaction(self, txn, transaction_id, destination,
                                origin_server_ts):
 
-        # First we find out what the prev_txs should be.
+        # First we find out what the prev_txns should be.
         # Since we know that we are only sending one transaction at a time,
         # we can simply take the last one.
         query = "%s ORDER BY id DESC LIMIT 1" % (
@@ -205,6 +211,92 @@ class TransactionStore(SQLBaseStore):
 
         return ReceivedTransactionsTable.decode_results(txn.fetchall())
 
+    def get_destination_retry_timings(self, destination):
+        """Gets the current retry timings (if any) for a given destination.
+
+        Args:
+            destination (str)
+
+        Returns:
+            None if not retrying
+            Otherwise a DestinationsTable.EntryType for the retry scheme
+        """
+        if destination in self.destination_retry_cache:
+            return defer.succeed(self.destination_retry_cache[destination])
+
+        return self.runInteraction(
+            "get_destination_retry_timings",
+            self._get_destination_retry_timings, destination)
+
+    def _get_destination_retry_timings(cls, txn, destination):
+        query = DestinationsTable.select_statement("destination = ?")
+        txn.execute(query, (destination,))
+        result = txn.fetchall()
+        if result:
+            result = DestinationsTable.decode_single_result(result)
+            if result.retry_last_ts > 0:
+                return result
+            else:
+                return None
+
+    def set_destination_retry_timings(self, destination,
+                                      retry_last_ts, retry_interval):
+        """Sets the current retry timings for a given destination.
+        Both timings should be zero if retrying is no longer occuring.
+
+        Args:
+            destination (str)
+            retry_last_ts (int) - time of last retry attempt in unix epoch ms
+            retry_interval (int) - how long until next retry in ms
+        """
+
+        self.destination_retry_cache[destination] = (
+            DestinationsTable.EntryType(
+                destination,
+                retry_last_ts,
+                retry_interval
+            )
+        )
+
+        # XXX: we could chose to not bother persisting this if our cache thinks
+        # this is a NOOP
+        return self.runInteraction(
+            "set_destination_retry_timings",
+            self._set_destination_retry_timings,
+            destination,
+            retry_last_ts,
+            retry_interval,
+        )
+
+    def _set_destination_retry_timings(cls, txn, destination,
+                                       retry_last_ts, retry_interval):
+
+        query = (
+            "INSERT OR REPLACE INTO %s "
+            "(destination, retry_last_ts, retry_interval) "
+            "VALUES (?, ?, ?) "
+        ) % DestinationsTable.table_name
+
+        txn.execute(query, (destination, retry_last_ts, retry_interval))
+
+    def get_destinations_needing_retry(self):
+        """Get all destinations which are due a retry for sending a transaction.
+
+        Returns:
+            list: A list of `DestinationsTable.EntryType`
+        """
+
+        return self.runInteraction(
+            "get_destinations_needing_retry",
+            self._get_destinations_needing_retry
+        )
+
+    def _get_destinations_needing_retry(cls, txn):
+        where = "retry_last_ts > 0 and retry_next_ts < now()"
+        query = DestinationsTable.select_statement(where)
+        txn.execute(query)
+        return DestinationsTable.decode_results(txn.fetchall())
+
 
 class ReceivedTransactionsTable(Table):
     table_name = "received_transactions"
@@ -247,3 +339,15 @@ class TransactionsToPduTable(Table):
     ]
 
     EntryType = namedtuple("TransactionsToPduEntry", fields)
+
+
+class DestinationsTable(Table):
+    table_name = "destinations"
+
+    fields = [
+        "destination",
+        "retry_last_ts",
+        "retry_interval",
+    ]
+
+    EntryType = namedtuple("DestinationsEntry", fields)
diff --git a/tests/federation/test_federation.py b/tests/federation/test_federation.py
index 73dd289276..f6b41e2c4c 100644
--- a/tests/federation/test_federation.py
+++ b/tests/federation/test_federation.py
@@ -25,6 +25,7 @@ from synapse.server import HomeServer
 from synapse.federation import initialize_http_replication
 from synapse.api.events import SynapseEvent
 
+from synapse.storage.transactions import DestinationsTable
 
 def make_pdu(prev_pdus=[], **kwargs):
     """Provide some default fields for making a PduTuple."""
@@ -55,10 +56,14 @@ class FederationTestCase(unittest.TestCase):
             "delivered_txn",
             "get_received_txn_response",
             "set_received_txn_response",
+            "get_destination_retry_timings",
         ])
         self.mock_persistence.get_received_txn_response.return_value = (
             defer.succeed(None)
         )
+        self.mock_persistence.get_destination_retry_timings.return_value = (
+            defer.succeed(DestinationsTable.EntryType("", 0, 0))
+        )
         self.mock_config = Mock()
         self.mock_config.signing_key = [MockKey()]
         self.clock = MockClock()
diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py
index 33016c16ef..fae33716a3 100644
--- a/tests/handlers/test_federation.py
+++ b/tests/handlers/test_federation.py
@@ -53,6 +53,8 @@ class FederationTestCase(unittest.TestCase):
                 "persist_event",
                 "store_room",
                 "get_room",
+                "get_destination_retry_timings",
+                "set_destination_retry_timings",
             ]),
             resource_for_federation=NonCallableMock(),
             http_client=NonCallableMock(spec_set=[]),
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index fe69ce47eb..b85a89052a 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -30,7 +30,7 @@ from synapse.api.constants import PresenceState
 from synapse.api.errors import SynapseError
 from synapse.handlers.presence import PresenceHandler, UserPresenceCache
 from synapse.streams.config import SourcePaginationConfig
-
+from synapse.storage.transactions import DestinationsTable
 
 OFFLINE = PresenceState.OFFLINE
 UNAVAILABLE = PresenceState.UNAVAILABLE
@@ -528,6 +528,7 @@ class PresencePushTestCase(unittest.TestCase):
                     "delivered_txn",
                     "get_received_txn_response",
                     "set_received_txn_response",
+                    "get_destination_retry_timings",
                 ]),
                 handlers=None,
                 resource_for_client=Mock(),
@@ -539,6 +540,9 @@ class PresencePushTestCase(unittest.TestCase):
         hs.handlers = JustPresenceHandlers(hs)
 
         self.datastore = hs.get_datastore()
+        self.datastore.get_destination_retry_timings.return_value = (
+            defer.succeed(DestinationsTable.EntryType("", 0, 0))
+        )
 
         def get_received_txn_response(*args):
             return defer.succeed(None)
@@ -1037,6 +1041,7 @@ class PresencePollingTestCase(unittest.TestCase):
                     "delivered_txn",
                     "get_received_txn_response",
                     "set_received_txn_response",
+                    "get_destination_retry_timings",
                 ]),
                 handlers=None,
                 resource_for_client=Mock(),
@@ -1048,6 +1053,9 @@ class PresencePollingTestCase(unittest.TestCase):
         hs.handlers = JustPresenceHandlers(hs)
 
         self.datastore = hs.get_datastore()
+        self.datastore.get_destination_retry_timings.return_value = (
+            defer.succeed(DestinationsTable.EntryType("", 0, 0))
+        )
 
         def get_received_txn_response(*args):
             return defer.succeed(None)
diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py
index 0279ab703a..3a71ed0aed 100644
--- a/tests/handlers/test_room.py
+++ b/tests/handlers/test_room.py
@@ -222,14 +222,52 @@ class RoomMemberHandlerTestCase(unittest.TestCase):
             user=user, room_id=room_id
         )
 
-    def _create_member(self, user_id, room_id):
+    @defer.inlineCallbacks
+    def test_simple_leave(self):
+        room_id = "!foo:red"
+        user_id = "@bob:red"
+        user = self.hs.parse_userid(user_id)
+
+        event = self._create_member(
+            user_id=user_id,
+            room_id=room_id,
+            membership=Membership.LEAVE,
+        )
+
+        prev_state = NonCallableMock()
+        prev_state.membership = Membership.JOIN
+        prev_state.sender = user_id
+        self.datastore.get_room_member.return_value = defer.succeed(prev_state)
+
+        event.state_events = {
+            (RoomMemberEvent.TYPE, user_id): event,
+        }
+
+        event.old_state_events = {
+            (RoomMemberEvent.TYPE, user_id): self._create_member(
+                user_id=user_id,
+                room_id=room_id,
+            ),
+        }
+
+        leave_signal_observer = Mock()
+        self.distributor.observe("user_left_room", leave_signal_observer)
+
+        # Actual invocation
+        yield self.room_member_handler.change_membership(event)
+
+        leave_signal_observer.assert_called_with(
+            user=user, room_id=room_id
+        )
+
+    def _create_member(self, user_id, room_id, membership=Membership.JOIN):
         return self.hs.get_event_factory().create_event(
             etype=RoomMemberEvent.TYPE,
             user_id=user_id,
             state_key=user_id,
             room_id=room_id,
-            membership=Membership.JOIN,
-            content={"membership": Membership.JOIN},
+            membership=membership,
+            content={"membership": membership},
         )
 
 
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index adb5148351..bc19db8dfa 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -22,9 +22,12 @@ import json
 
 from ..utils import MockHttpResource, MockClock, DeferredMockCallable, MockKey
 
+from synapse.api.errors import AuthError
 from synapse.server import HomeServer
 from synapse.handlers.typing import TypingNotificationHandler
 
+from synapse.storage.transactions import DestinationsTable
+
 
 def _expect_edu(destination, edu_type, content, origin="test"):
     return {
@@ -63,7 +66,13 @@ class TypingNotificationsTestCase(unittest.TestCase):
         self.mock_config = Mock()
         self.mock_config.signing_key = [MockKey()]
 
+        mock_notifier = Mock(spec=["on_new_user_event"])
+        self.on_new_user_event = mock_notifier.on_new_user_event
+
+        self.auth = Mock(spec=[])
+
         hs = HomeServer("test",
+                auth=self.auth,
                 clock=self.clock,
                 db_pool=None,
                 datastore=Mock(spec=[
@@ -72,8 +81,10 @@ class TypingNotificationsTestCase(unittest.TestCase):
                     "delivered_txn",
                     "get_received_txn_response",
                     "set_received_txn_response",
+                    "get_destination_retry_timings",
                 ]),
                 handlers=None,
+                notifier=mock_notifier,
                 resource_for_client=Mock(),
                 resource_for_federation=self.mock_federation_resource,
                 http_client=self.mock_http_client,
@@ -82,13 +93,14 @@ class TypingNotificationsTestCase(unittest.TestCase):
             )
         hs.handlers = JustTypingNotificationHandlers(hs)
 
-        self.mock_update_client = Mock()
-        self.mock_update_client.return_value = defer.succeed(None)
-
         self.handler = hs.get_handlers().typing_notification_handler
-        self.handler.push_update_to_clients = self.mock_update_client
+
+        self.event_source = hs.get_event_sources().sources["typing"]
 
         self.datastore = hs.get_datastore()
+        self.datastore.get_destination_retry_timings.return_value = (
+            defer.succeed(DestinationsTable.EntryType("", 0, 0))
+        )
 
         def get_received_txn_response(*args):
             return defer.succeed(None)
@@ -134,6 +146,12 @@ class TypingNotificationsTestCase(unittest.TestCase):
         self.room_member_handler.fetch_room_distributions_into = (
                 fetch_room_distributions_into)
 
+        def check_joined_room(room_id, user_id):
+            if user_id not in [u.to_string() for u in self.room_members]:
+                raise AuthError(401, "User is not in the room")
+
+        self.auth.check_joined_room = check_joined_room
+
         # Some local users to test with
         self.u_apple = hs.parse_userid("@apple:test")
         self.u_banana = hs.parse_userid("@banana:test")
@@ -145,6 +163,8 @@ class TypingNotificationsTestCase(unittest.TestCase):
     def test_started_typing_local(self):
         self.room_members = [self.u_apple, self.u_banana]
 
+        self.assertEquals(self.event_source.get_current_key(), 0)
+
         yield self.handler.started_typing(
             target_user=self.u_apple,
             auth_user=self.u_apple,
@@ -152,13 +172,20 @@ class TypingNotificationsTestCase(unittest.TestCase):
             timeout=20000,
         )
 
-        self.mock_update_client.assert_has_calls([
-            call(observer_user=self.u_banana,
-                observed_user=self.u_apple,
-                room_id=self.room_id,
-                typing=True),
+        self.on_new_user_event.assert_has_calls([
+            call(rooms=[self.room_id]),
         ])
 
+        self.assertEquals(self.event_source.get_current_key(), 1)
+        self.assertEquals(
+            self.event_source.get_new_events_for_user(self.u_apple, 0, None)[0],
+            [
+                {"type": "m.typing",
+                 "room_id": self.room_id,
+                 "typing": [self.u_apple.to_string()]},
+            ]
+        )
+
     @defer.inlineCallbacks
     def test_started_typing_remote_send(self):
         self.room_members = [self.u_apple, self.u_onion]
@@ -192,6 +219,8 @@ class TypingNotificationsTestCase(unittest.TestCase):
     def test_started_typing_remote_recv(self):
         self.room_members = [self.u_apple, self.u_onion]
 
+        self.assertEquals(self.event_source.get_current_key(), 0)
+
         yield self.mock_federation_resource.trigger("PUT",
             "/_matrix/federation/v1/send/1000000/",
             _make_edu_json("farm", "m.typing",
@@ -203,13 +232,20 @@ class TypingNotificationsTestCase(unittest.TestCase):
             )
         )
 
-        self.mock_update_client.assert_has_calls([
-            call(observer_user=self.u_apple,
-                observed_user=self.u_onion,
-                room_id=self.room_id,
-                typing=True),
+        self.on_new_user_event.assert_has_calls([
+            call(rooms=[self.room_id]),
         ])
 
+        self.assertEquals(self.event_source.get_current_key(), 1)
+        self.assertEquals(
+            self.event_source.get_new_events_for_user(self.u_apple, 0, None)[0],
+            [
+                {"type": "m.typing",
+                 "room_id": self.room_id,
+                 "typing": [self.u_onion.to_string()]},
+            ]
+        )
+
     @defer.inlineCallbacks
     def test_stopped_typing(self):
         self.room_members = [self.u_apple, self.u_banana, self.u_onion]
@@ -232,9 +268,14 @@ class TypingNotificationsTestCase(unittest.TestCase):
 
         # Gut-wrenching
         from synapse.handlers.typing import RoomMember
-        self.handler._member_typing_until[
-            RoomMember(self.room_id, self.u_apple)
-        ] = 1002000
+        member = RoomMember(self.room_id, self.u_apple)
+        self.handler._member_typing_until[member] = 1002000
+        self.handler._member_typing_timer[member] = (
+            self.clock.call_later(1002, lambda: 0)
+        )
+        self.handler._room_typing[self.room_id] = set((self.u_apple,))
+
+        self.assertEquals(self.event_source.get_current_key(), 0)
 
         yield self.handler.stopped_typing(
             target_user=self.u_apple,
@@ -242,11 +283,62 @@ class TypingNotificationsTestCase(unittest.TestCase):
             room_id=self.room_id,
         )
 
-        self.mock_update_client.assert_has_calls([
-            call(observer_user=self.u_banana,
-                observed_user=self.u_apple,
-                room_id=self.room_id,
-                typing=False),
+        self.on_new_user_event.assert_has_calls([
+            call(rooms=[self.room_id]),
         ])
 
         yield put_json.await_calls()
+
+        self.assertEquals(self.event_source.get_current_key(), 1)
+        self.assertEquals(
+            self.event_source.get_new_events_for_user(self.u_apple, 0, None)[0],
+            [
+                {"type": "m.typing",
+                 "room_id": self.room_id,
+                 "typing": []},
+            ]
+        )
+
+    @defer.inlineCallbacks
+    def test_typing_timeout(self):
+        self.room_members = [self.u_apple, self.u_banana]
+
+        self.assertEquals(self.event_source.get_current_key(), 0)
+
+        yield self.handler.started_typing(
+            target_user=self.u_apple,
+            auth_user=self.u_apple,
+            room_id=self.room_id,
+            timeout=10000,
+        )
+
+        self.on_new_user_event.assert_has_calls([
+            call(rooms=[self.room_id]),
+        ])
+        self.on_new_user_event.reset_mock()
+
+        self.assertEquals(self.event_source.get_current_key(), 1)
+        self.assertEquals(
+            self.event_source.get_new_events_for_user(self.u_apple, 0, None)[0],
+            [
+                {"type": "m.typing",
+                 "room_id": self.room_id,
+                 "typing": [self.u_apple.to_string()]},
+            ]
+        )
+
+        self.clock.advance_time(11)
+
+        self.on_new_user_event.assert_has_calls([
+            call(rooms=[self.room_id]),
+        ])
+
+        self.assertEquals(self.event_source.get_current_key(), 2)
+        self.assertEquals(
+            self.event_source.get_new_events_for_user(self.u_apple, 1, None)[0],
+            [
+                {"type": "m.typing",
+                 "room_id": self.room_id,
+                 "typing": []},
+            ]
+        )
diff --git a/tests/rest/test_rooms.py b/tests/rest/test_rooms.py
index ff7c9f0530..1f719beb0a 100644
--- a/tests/rest/test_rooms.py
+++ b/tests/rest/test_rooms.py
@@ -1066,7 +1066,3 @@ class RoomInitialSyncTestCase(RestTestCase):
         }
         self.assertTrue(self.user_id in presence_by_user)
         self.assertEquals("m.presence", presence_by_user[self.user_id]["type"])
-
-#        (code, response) = yield self.mock_resource.trigger("GET", path, None)
-#        self.assertEquals(200, code, msg=str(response))
-#        self.assert_dict(json.loads(content), response)
diff --git a/tests/rest/test_typing.py b/tests/rest/test_typing.py
new file mode 100644
index 0000000000..0b95d70718
--- /dev/null
+++ b/tests/rest/test_typing.py
@@ -0,0 +1,113 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests REST events for /rooms paths."""
+
+# twisted imports
+from twisted.internet import defer
+
+import synapse.rest.room
+from synapse.server import HomeServer
+
+from ..utils import MockHttpResource, SQLiteMemoryDbPool, MockKey
+from .utils import RestTestCase
+
+from mock import Mock, NonCallableMock
+
+
+PATH_PREFIX = "/_matrix/client/api/v1"
+
+
+class RoomTypingTestCase(RestTestCase):
+    """ Tests /rooms/$room_id/typing/$user_id REST API. """
+    user_id = "@sid:red"
+
+    @defer.inlineCallbacks
+    def setUp(self):
+        self.mock_resource = MockHttpResource(prefix=PATH_PREFIX)
+        self.auth_user_id = self.user_id
+
+        self.mock_config = NonCallableMock()
+        self.mock_config.signing_key = [MockKey()]
+
+        db_pool = SQLiteMemoryDbPool()
+        yield db_pool.prepare()
+
+        hs = HomeServer(
+            "red",
+            db_pool=db_pool,
+            http_client=None,
+            replication_layer=Mock(),
+            ratelimiter=NonCallableMock(spec_set=[
+                "send_message",
+            ]),
+            config=self.mock_config,
+        )
+        self.hs = hs
+
+        self.event_source = hs.get_event_sources().sources["typing"]
+
+        self.ratelimiter = hs.get_ratelimiter()
+        self.ratelimiter.send_message.return_value = (True, 0)
+
+        hs.get_handlers().federation_handler = Mock()
+
+        def _get_user_by_token(token=None):
+            return {
+                "user": hs.parse_userid(self.auth_user_id),
+                "admin": False,
+                "device_id": None,
+            }
+
+        hs.get_auth().get_user_by_token = _get_user_by_token
+
+        def _insert_client_ip(*args, **kwargs):
+            return defer.succeed(None)
+        hs.get_datastore().insert_client_ip = _insert_client_ip
+
+        synapse.rest.room.register_servlets(hs, self.mock_resource)
+
+        self.room_id = yield self.create_room_as(self.user_id)
+        # Need another user to make notifications actually work
+        yield self.join(self.room_id, user="@jim:red")
+
+    def tearDown(self):
+        self.hs.get_handlers().typing_notification_handler.tearDown()
+
+    @defer.inlineCallbacks
+    def test_set_typing(self):
+        (code, _) = yield self.mock_resource.trigger("PUT",
+            "/rooms/%s/typing/%s" % (self.room_id, self.user_id),
+            '{"typing": true, "timeout": 30000}'
+        )
+        self.assertEquals(200, code)
+
+        self.assertEquals(self.event_source.get_current_key(), 1)
+        self.assertEquals(
+            self.event_source.get_new_events_for_user(self.user_id, 0, None)[0],
+            [
+                {"type": "m.typing",
+                 "room_id": self.room_id,
+                 "typing": [self.user_id]},
+            ]
+        )
+
+    @defer.inlineCallbacks
+    def test_set_not_typing(self):
+        (code, _) = yield self.mock_resource.trigger("PUT",
+            "/rooms/%s/typing/%s" % (self.room_id, self.user_id),
+            '{"typing": false}'
+        )
+        self.assertEquals(200, code)
diff --git a/tests/test_test_utils.py b/tests/test_test_utils.py
new file mode 100644
index 0000000000..b42787dd25
--- /dev/null
+++ b/tests/test_test_utils.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from tests import unittest
+
+from tests.utils import MockClock
+
+class MockClockTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.clock = MockClock()
+
+    def test_advance_time(self):
+        start_time = self.clock.time()
+
+        self.clock.advance_time(20)
+
+        self.assertEquals(20, self.clock.time() - start_time)
+
+    def test_later(self):
+        invoked = [0, 0]
+
+        def _cb0():
+            invoked[0] = 1
+        self.clock.call_later(10, _cb0)
+
+        def _cb1():
+            invoked[1] = 1
+        self.clock.call_later(20, _cb1)
+
+        self.assertFalse(invoked[0])
+
+        self.clock.advance_time(15)
+
+        self.assertTrue(invoked[0])
+        self.assertFalse(invoked[1])
+
+        self.clock.advance_time(5)
+
+        self.assertTrue(invoked[1])
+
+    def test_cancel_later(self):
+        invoked = [0, 0]
+
+        def _cb0():
+            invoked[0] = 1
+        t0 = self.clock.call_later(10, _cb0)
+
+        def _cb1():
+            invoked[1] = 1
+        t1 = self.clock.call_later(20, _cb1)
+
+        self.clock.cancel_call_later(t0)
+
+        self.clock.advance_time(30)
+
+        self.assertFalse(invoked[0])
+        self.assertTrue(invoked[1])
diff --git a/tests/utils.py b/tests/utils.py
index d8be73dba8..f9a34748cd 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -18,6 +18,8 @@ from synapse.api.errors import cs_error, CodeMessageException, StoreError
 from synapse.api.constants import Membership
 from synapse.storage import prepare_database
 
+from synapse.util.logcontext import LoggingContext
+
 from synapse.api.events.room import (
     RoomMemberEvent, MessageEvent
 )
@@ -134,16 +136,43 @@ class MockKey(object):
 class MockClock(object):
     now = 1000
 
+    def __init__(self):
+        # list of tuples of (absolute_time, callback) in no particular order
+        self.timers = []
+
     def time(self):
         return self.now
 
     def time_msec(self):
         return self.time() * 1000
 
+    def call_later(self, delay, callback):
+        current_context = LoggingContext.current_context()
+
+        def wrapped_callback():
+            LoggingContext.thread_local.current_context = current_context
+            callback()
+
+        t = (self.now + delay, wrapped_callback)
+        self.timers.append(t)
+        return t
+
+    def cancel_call_later(self, timer):
+        self.timers = [t for t in self.timers if t != timer]
+
     # For unit testing
     def advance_time(self, secs):
         self.now += secs
 
+        timers = self.timers
+        self.timers = []
+
+        for time, callback in timers:
+            if self.now >= time:
+                callback()
+            else:
+                self.timers.append((time, callback))
+
 
 class SQLiteMemoryDbPool(ConnectionPool, object):
     def __init__(self):