summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-05-18 13:54:27 +0100
committerErik Johnston <erik@matrix.org>2017-05-18 13:54:27 +0100
commit3accee1a8c2804620713ac4ff068a4a18a7de192 (patch)
tree5bfad3c1e8653713f8e14062732f892797279a92 /synapse/handlers
parentMerge pull request #2136 from bbigras/patch-1 (diff)
parentBump changelog and version (diff)
downloadsynapse-3accee1a8c2804620713ac4ff068a4a18a7de192.tar.xz
Merge branch 'release-v0.21.0' of github.com:matrix-org/synapse v0.21.0
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py34
-rw-r--r--synapse/handlers/device.py29
-rw-r--r--synapse/handlers/e2e_keys.py86
-rw-r--r--synapse/handlers/federation.py125
-rw-r--r--synapse/handlers/identity.py11
-rw-r--r--synapse/handlers/message.py37
-rw-r--r--synapse/handlers/presence.py218
-rw-r--r--synapse/handlers/profile.py2
-rw-r--r--synapse/handlers/read_marker.py64
-rw-r--r--synapse/handlers/register.py7
-rw-r--r--synapse/handlers/room.py2
-rw-r--r--synapse/handlers/room_member.py27
-rw-r--r--synapse/handlers/typing.py7
13 files changed, 450 insertions, 199 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index e83adc8339..faa5609c0c 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -53,7 +53,20 @@ class BaseHandler(object):
 
         self.event_builder_factory = hs.get_event_builder_factory()
 
-    def ratelimit(self, requester):
+    @defer.inlineCallbacks
+    def ratelimit(self, requester, update=True):
+        """Ratelimits requests.
+
+        Args:
+            requester (Requester)
+            update (bool): Whether to record that a request is being processed.
+                Set to False when doing multiple checks for one request (e.g.
+                to check up front if we would reject the request), and set to
+                True for the last call for a given request.
+
+        Raises:
+            LimitExceededError if the request should be ratelimited
+        """
         time_now = self.clock.time()
         user_id = requester.user.to_string()
 
@@ -67,10 +80,25 @@ class BaseHandler(object):
         if requester.app_service and not requester.app_service.is_rate_limited():
             return
 
+        # Check if there is a per user override in the DB.
+        override = yield self.store.get_ratelimit_for_user(user_id)
+        if override:
+            # If overriden with a null Hz then ratelimiting has been entirely
+            # disabled for the user
+            if not override.messages_per_second:
+                return
+
+            messages_per_second = override.messages_per_second
+            burst_count = override.burst_count
+        else:
+            messages_per_second = self.hs.config.rc_messages_per_second
+            burst_count = self.hs.config.rc_message_burst_count
+
         allowed, time_allowed = self.ratelimiter.send_message(
             user_id, time_now,
-            msg_rate_hz=self.hs.config.rc_messages_per_second,
-            burst_count=self.hs.config.rc_message_burst_count,
+            msg_rate_hz=messages_per_second,
+            burst_count=burst_count,
+            update=update,
         )
         if not allowed:
             raise LimitExceededError(
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index c22f65ce5d..982cda3edf 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -17,6 +17,7 @@ from synapse.api.constants import EventTypes
 from synapse.util import stringutils
 from synapse.util.async import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.retryutils import NotRetryingDestination
 from synapse.util.metrics import measure_func
 from synapse.types import get_domain_from_id, RoomStreamToken
 from twisted.internet import defer
@@ -425,12 +426,38 @@ class DeviceListEduUpdater(object):
                 # This can happen since we batch updates
                 return
 
+            # Given a list of updates we check if we need to resync. This
+            # happens if we've missed updates.
             resync = yield self._need_to_do_resync(user_id, pending_updates)
 
             if resync:
                 # Fetch all devices for the user.
                 origin = get_domain_from_id(user_id)
-                result = yield self.federation.query_user_devices(origin, user_id)
+                try:
+                    result = yield self.federation.query_user_devices(origin, user_id)
+                except NotRetryingDestination:
+                    # TODO: Remember that we are now out of sync and try again
+                    # later
+                    logger.warn(
+                        "Failed to handle device list update for %s,"
+                        " we're not retrying the remote",
+                        user_id,
+                    )
+                    # We abort on exceptions rather than accepting the update
+                    # as otherwise synapse will 'forget' that its device list
+                    # is out of date. If we bail then we will retry the resync
+                    # next time we get a device list update for this user_id.
+                    # This makes it more likely that the device lists will
+                    # eventually become consistent.
+                    return
+                except Exception:
+                    # TODO: Remember that we are now out of sync and try again
+                    # later
+                    logger.exception(
+                        "Failed to handle device list update for %s", user_id
+                    )
+                    return
+
                 stream_id = result["stream_id"]
                 devices = result["devices"]
                 yield self.store.update_remote_device_list_cache(
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index c2b38d72a9..668a90e495 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -21,7 +21,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import SynapseError, CodeMessageException
 from synapse.types import get_domain_from_id
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
 from synapse.util.retryutils import NotRetryingDestination
 
 logger = logging.getLogger(__name__)
@@ -145,7 +145,7 @@ class E2eKeysHandler(object):
                     "status": 503, "message": e.message
                 }
 
-        yield preserve_context_over_deferred(defer.gatherResults([
+        yield make_deferred_yieldable(defer.gatherResults([
             preserve_fn(do_remote_query)(destination)
             for destination in remote_queries_not_in_cache
         ]))
@@ -257,11 +257,21 @@ class E2eKeysHandler(object):
                     "status": 503, "message": e.message
                 }
 
-        yield preserve_context_over_deferred(defer.gatherResults([
+        yield make_deferred_yieldable(defer.gatherResults([
             preserve_fn(claim_client_keys)(destination)
             for destination in remote_queries
         ]))
 
+        logger.info(
+            "Claimed one-time-keys: %s",
+            ",".join((
+                "%s for %s:%s" % (key_id, user_id, device_id)
+                for user_id, user_keys in json_result.iteritems()
+                for device_id, device_keys in user_keys.iteritems()
+                for key_id, _ in device_keys.iteritems()
+            )),
+        )
+
         defer.returnValue({
             "one_time_keys": json_result,
             "failures": failures
@@ -288,19 +298,8 @@ class E2eKeysHandler(object):
 
         one_time_keys = keys.get("one_time_keys", None)
         if one_time_keys:
-            logger.info(
-                "Adding %d one_time_keys for device %r for user %r at %d",
-                len(one_time_keys), device_id, user_id, time_now
-            )
-            key_list = []
-            for key_id, key_json in one_time_keys.items():
-                algorithm, key_id = key_id.split(":")
-                key_list.append((
-                    algorithm, key_id, encode_canonical_json(key_json)
-                ))
-
-            yield self.store.add_e2e_one_time_keys(
-                user_id, device_id, time_now, key_list
+            yield self._upload_one_time_keys_for_user(
+                user_id, device_id, time_now, one_time_keys,
             )
 
         # the device should have been registered already, but it may have been
@@ -313,3 +312,58 @@ class E2eKeysHandler(object):
         result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
 
         defer.returnValue({"one_time_key_counts": result})
+
+    @defer.inlineCallbacks
+    def _upload_one_time_keys_for_user(self, user_id, device_id, time_now,
+                                       one_time_keys):
+        logger.info(
+            "Adding one_time_keys %r for device %r for user %r at %d",
+            one_time_keys.keys(), device_id, user_id, time_now,
+        )
+
+        # make a list of (alg, id, key) tuples
+        key_list = []
+        for key_id, key_obj in one_time_keys.items():
+            algorithm, key_id = key_id.split(":")
+            key_list.append((
+                algorithm, key_id, key_obj
+            ))
+
+        # First we check if we have already persisted any of the keys.
+        existing_key_map = yield self.store.get_e2e_one_time_keys(
+            user_id, device_id, [k_id for _, k_id, _ in key_list]
+        )
+
+        new_keys = []  # Keys that we need to insert. (alg, id, json) tuples.
+        for algorithm, key_id, key in key_list:
+            ex_json = existing_key_map.get((algorithm, key_id), None)
+            if ex_json:
+                if not _one_time_keys_match(ex_json, key):
+                    raise SynapseError(
+                        400,
+                        ("One time key %s:%s already exists. "
+                         "Old key: %s; new key: %r") %
+                        (algorithm, key_id, ex_json, key)
+                    )
+            else:
+                new_keys.append((algorithm, key_id, encode_canonical_json(key)))
+
+        yield self.store.add_e2e_one_time_keys(
+            user_id, device_id, time_now, new_keys
+        )
+
+
+def _one_time_keys_match(old_key_json, new_key):
+    old_key = json.loads(old_key_json)
+
+    # if either is a string rather than an object, they must match exactly
+    if not isinstance(old_key, dict) or not isinstance(new_key, dict):
+        return old_key == new_key
+
+    # otherwise, we strip off the 'signatures' if any, because it's legitimate
+    # for different upload attempts to have different signatures.
+    old_key.pop("signatures", None)
+    new_key_copy = dict(new_key)
+    new_key_copy.pop("signatures", None)
+
+    return old_key == new_key_copy
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 53f9296399..52d97dfbf3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -28,7 +28,7 @@ from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.events.validator import EventValidator
 from synapse.util import unwrapFirstError
 from synapse.util.logcontext import (
-    PreserveLoggingContext, preserve_fn, preserve_context_over_deferred
+    preserve_fn, preserve_context_over_deferred
 )
 from synapse.util.metrics import measure_func
 from synapse.util.logutils import log_function
@@ -172,8 +172,22 @@ class FederationHandler(BaseHandler):
                             origin, pdu, prevs, min_depth
                         )
 
-            prevs = {e_id for e_id, _ in pdu.prev_events}
-            seen = set(have_seen.keys())
+                        # Update the set of things we've seen after trying to
+                        # fetch the missing stuff
+                        have_seen = yield self.store.have_events(prevs)
+                        seen = set(have_seen.iterkeys())
+
+                        if not prevs - seen:
+                            logger.info(
+                                "Found all missing prev events for %s", pdu.event_id
+                            )
+                elif prevs - seen:
+                    logger.info(
+                        "Not fetching %d missing events for room %r,event %s: %r...",
+                        len(prevs - seen), pdu.room_id, pdu.event_id,
+                        list(prevs - seen)[:5],
+                    )
+
             if prevs - seen:
                 logger.info(
                     "Still missing %d events for room %r: %r...",
@@ -208,19 +222,15 @@ class FederationHandler(BaseHandler):
         Args:
             origin (str): Origin of the pdu. Will be called to get the missing events
             pdu: received pdu
-            prevs (str[]): List of event ids which we are missing
+            prevs (set(str)): List of event ids which we are missing
             min_depth (int): Minimum depth of events to return.
-
-        Returns:
-            Deferred<dict(str, str?)>: updated have_seen dictionary
         """
         # We recalculate seen, since it may have changed.
         have_seen = yield self.store.have_events(prevs)
         seen = set(have_seen.keys())
 
         if not prevs - seen:
-            # nothing left to do
-            defer.returnValue(have_seen)
+            return
 
         latest = yield self.store.get_latest_event_ids_in_room(
             pdu.room_id
@@ -232,8 +242,8 @@ class FederationHandler(BaseHandler):
         latest |= seen
 
         logger.info(
-            "Missing %d events for room %r: %r...",
-            len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+            "Missing %d events for room %r pdu %s: %r...",
+            len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
         )
 
         # XXX: we set timeout to 10s to help workaround
@@ -265,22 +275,23 @@ class FederationHandler(BaseHandler):
             timeout=10000,
         )
 
+        logger.info(
+            "Got %d events: %r...",
+            len(missing_events), [e.event_id for e in missing_events[:5]]
+        )
+
         # We want to sort these by depth so we process them and
         # tell clients about them in order.
         missing_events.sort(key=lambda x: x.depth)
 
         for e in missing_events:
+            logger.info("Handling found event %s", e.event_id)
             yield self.on_receive_pdu(
                 origin,
                 e,
                 get_missing=False
             )
 
-        have_seen = yield self.store.have_events(
-            [ev for ev, _ in pdu.prev_events]
-        )
-        defer.returnValue(have_seen)
-
     @log_function
     @defer.inlineCallbacks
     def _process_received_pdu(self, origin, pdu, state, auth_chain):
@@ -369,13 +380,6 @@ class FederationHandler(BaseHandler):
                     affected=event.event_id,
                 )
 
-        # if we're receiving valid events from an origin,
-        # it's probably a good idea to mark it as not in retry-state
-        # for sending (although this is a bit of a leap)
-        retry_timings = yield self.store.get_destination_retry_timings(origin)
-        if retry_timings and retry_timings["retry_last_ts"]:
-            self.store.set_destination_retry_timings(origin, 0, 0)
-
         room = yield self.store.get_room(event.room_id)
 
         if not room:
@@ -394,11 +398,10 @@ class FederationHandler(BaseHandler):
             target_user = UserID.from_string(target_user_id)
             extra_users.append(target_user)
 
-        with PreserveLoggingContext():
-            self.notifier.on_new_room_event(
-                event, event_stream_id, max_stream_id,
-                extra_users=extra_users
-            )
+        self.notifier.on_new_room_event(
+            event, event_stream_id, max_stream_id,
+            extra_users=extra_users
+        )
 
         if event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
@@ -916,11 +919,10 @@ class FederationHandler(BaseHandler):
                 origin, auth_chain, state, event
             )
 
-            with PreserveLoggingContext():
-                self.notifier.on_new_room_event(
-                    event, event_stream_id, max_stream_id,
-                    extra_users=[joinee]
-                )
+            self.notifier.on_new_room_event(
+                event, event_stream_id, max_stream_id,
+                extra_users=[joinee]
+            )
 
             logger.debug("Finished joining %s to %s", joinee, room_id)
         finally:
@@ -1035,10 +1037,9 @@ class FederationHandler(BaseHandler):
             target_user = UserID.from_string(target_user_id)
             extra_users.append(target_user)
 
-        with PreserveLoggingContext():
-            self.notifier.on_new_room_event(
-                event, event_stream_id, max_stream_id, extra_users=extra_users
-            )
+        self.notifier.on_new_room_event(
+            event, event_stream_id, max_stream_id, extra_users=extra_users
+        )
 
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.JOIN:
@@ -1084,29 +1085,22 @@ class FederationHandler(BaseHandler):
         )
 
         target_user = UserID.from_string(event.state_key)
-        with PreserveLoggingContext():
-            self.notifier.on_new_room_event(
-                event, event_stream_id, max_stream_id,
-                extra_users=[target_user],
-            )
+        self.notifier.on_new_room_event(
+            event, event_stream_id, max_stream_id,
+            extra_users=[target_user],
+        )
 
         defer.returnValue(event)
 
     @defer.inlineCallbacks
     def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
-        try:
-            origin, event = yield self._make_and_verify_event(
-                target_hosts,
-                room_id,
-                user_id,
-                "leave"
-            )
-            event = self._sign_event(event)
-        except SynapseError:
-            raise
-        except CodeMessageException as e:
-            logger.warn("Failed to reject invite: %s", e)
-            raise SynapseError(500, "Failed to reject invite")
+        origin, event = yield self._make_and_verify_event(
+            target_hosts,
+            room_id,
+            user_id,
+            "leave"
+        )
+        event = self._sign_event(event)
 
         # Try the host that we succesfully called /make_leave/ on first for
         # the /send_leave/ request.
@@ -1116,16 +1110,10 @@ class FederationHandler(BaseHandler):
         except ValueError:
             pass
 
-        try:
-            yield self.replication_layer.send_leave(
-                target_hosts,
-                event
-            )
-        except SynapseError:
-            raise
-        except CodeMessageException as e:
-            logger.warn("Failed to reject invite: %s", e)
-            raise SynapseError(500, "Failed to reject invite")
+        yield self.replication_layer.send_leave(
+            target_hosts,
+            event
+        )
 
         context = yield self.state_handler.compute_event_context(event)
 
@@ -1246,10 +1234,9 @@ class FederationHandler(BaseHandler):
             target_user = UserID.from_string(target_user_id)
             extra_users.append(target_user)
 
-        with PreserveLoggingContext():
-            self.notifier.on_new_room_event(
-                event, event_stream_id, max_stream_id, extra_users=extra_users
-            )
+        self.notifier.on_new_room_event(
+            event, event_stream_id, max_stream_id, extra_users=extra_users
+        )
 
         defer.returnValue(None)
 
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 6a53c5eb47..9efcdff1d6 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -18,7 +18,7 @@
 from twisted.internet import defer
 
 from synapse.api.errors import (
-    CodeMessageException
+    MatrixCodeMessageException, CodeMessageException
 )
 from ._base import BaseHandler
 from synapse.util.async import run_on_reactor
@@ -90,6 +90,9 @@ class IdentityHandler(BaseHandler):
                 ),
                 {'sid': creds['sid'], 'client_secret': client_secret}
             )
+        except MatrixCodeMessageException as e:
+            logger.info("getValidated3pid failed with Matrix error: %r", e)
+            raise SynapseError(e.code, e.msg, e.errcode)
         except CodeMessageException as e:
             data = json.loads(e.msg)
 
@@ -159,6 +162,9 @@ class IdentityHandler(BaseHandler):
                 params
             )
             defer.returnValue(data)
+        except MatrixCodeMessageException as e:
+            logger.info("Proxied requestToken failed with Matrix error: %r", e)
+            raise SynapseError(e.code, e.msg, e.errcode)
         except CodeMessageException as e:
             logger.info("Proxied requestToken failed: %r", e)
             raise e
@@ -193,6 +199,9 @@ class IdentityHandler(BaseHandler):
                 params
             )
             defer.returnValue(data)
+        except MatrixCodeMessageException as e:
+            logger.info("Proxied requestToken failed with Matrix error: %r", e)
+            raise SynapseError(e.code, e.msg, e.errcode)
         except CodeMessageException as e:
             logger.info("Proxied requestToken failed: %r", e)
             raise e
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7a498af5a2..196925edad 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,7 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, SynapseError, LimitExceededError
+from synapse.api.errors import AuthError, Codes, SynapseError
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
@@ -175,7 +175,8 @@ class MessageHandler(BaseHandler):
         defer.returnValue(chunk)
 
     @defer.inlineCallbacks
-    def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None):
+    def create_event(self, requester, event_dict, token_id=None, txn_id=None,
+                     prev_event_ids=None):
         """
         Given a dict from a client, create a new event.
 
@@ -185,6 +186,7 @@ class MessageHandler(BaseHandler):
         Adds display names to Join membership events.
 
         Args:
+            requester
             event_dict (dict): An entire event
             token_id (str)
             txn_id (str)
@@ -226,6 +228,7 @@ class MessageHandler(BaseHandler):
 
             event, context = yield self._create_new_client_event(
                 builder=builder,
+                requester=requester,
                 prev_event_ids=prev_event_ids,
             )
 
@@ -251,17 +254,7 @@ class MessageHandler(BaseHandler):
         # We check here if we are currently being rate limited, so that we
         # don't do unnecessary work. We check again just before we actually
         # send the event.
-        time_now = self.clock.time()
-        allowed, time_allowed = self.ratelimiter.send_message(
-            event.sender, time_now,
-            msg_rate_hz=self.hs.config.rc_messages_per_second,
-            burst_count=self.hs.config.rc_message_burst_count,
-            update=False,
-        )
-        if not allowed:
-            raise LimitExceededError(
-                retry_after_ms=int(1000 * (time_allowed - time_now)),
-            )
+        yield self.ratelimit(requester, update=False)
 
         user = UserID.from_string(event.sender)
 
@@ -319,6 +312,7 @@ class MessageHandler(BaseHandler):
         See self.create_event and self.send_nonmember_event.
         """
         event, context = yield self.create_event(
+            requester,
             event_dict,
             token_id=requester.access_token_id,
             txn_id=txn_id
@@ -416,7 +410,7 @@ class MessageHandler(BaseHandler):
 
     @measure_func("_create_new_client_event")
     @defer.inlineCallbacks
-    def _create_new_client_event(self, builder, prev_event_ids=None):
+    def _create_new_client_event(self, builder, requester=None, prev_event_ids=None):
         if prev_event_ids:
             prev_events = yield self.store.add_event_hashes(prev_event_ids)
             prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
@@ -456,6 +450,8 @@ class MessageHandler(BaseHandler):
         state_handler = self.state_handler
 
         context = yield state_handler.compute_event_context(builder)
+        if requester:
+            context.app_service = requester.app_service
 
         if builder.is_state():
             builder.prev_state = yield self.store.add_event_hashes(
@@ -493,7 +489,7 @@ class MessageHandler(BaseHandler):
         # We now need to go and hit out to wherever we need to hit out to.
 
         if ratelimit:
-            self.ratelimit(requester)
+            yield self.ratelimit(requester)
 
         try:
             yield self.auth.check_from_context(event, context)
@@ -531,9 +527,9 @@ class MessageHandler(BaseHandler):
 
                 state_to_include_ids = [
                     e_id
-                    for k, e_id in context.current_state_ids.items()
+                    for k, e_id in context.current_state_ids.iteritems()
                     if k[0] in self.hs.config.room_invite_state_types
-                    or k[0] == EventTypes.Member and k[1] == event.sender
+                    or k == (EventTypes.Member, event.sender)
                 ]
 
                 state_to_include = yield self.store.get_events(state_to_include_ids)
@@ -545,7 +541,7 @@ class MessageHandler(BaseHandler):
                         "content": e.content,
                         "sender": e.sender,
                     }
-                    for e in state_to_include.values()
+                    for e in state_to_include.itervalues()
                 ]
 
                 invitee = UserID.from_string(event.state_key)
@@ -612,12 +608,9 @@ class MessageHandler(BaseHandler):
         @defer.inlineCallbacks
         def _notify():
             yield run_on_reactor()
-            yield self.notifier.on_new_room_event(
+            self.notifier.on_new_room_event(
                 event, event_stream_id, max_stream_id,
                 extra_users=extra_users
             )
 
         preserve_fn(_notify)()
-
-        # If invite, remove room_state from unsigned before sending.
-        event.unsigned.pop("invite_room_state", None)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 1ede117c79..c7c0b0a1e2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -30,6 +30,7 @@ from synapse.api.constants import PresenceState
 from synapse.storage.presence import UserPresenceState
 
 from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.async import Linearizer
 from synapse.util.logcontext import preserve_fn
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
@@ -187,6 +188,7 @@ class PresenceHandler(object):
         # process_id to millisecond timestamp last updated.
         self.external_process_to_current_syncs = {}
         self.external_process_last_updated_ms = {}
+        self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
 
         # Start a LoopingCall in 30s that fires every 5s.
         # The initial delay is to allow disconnected clients a chance to
@@ -316,11 +318,7 @@ class PresenceHandler(object):
             if to_federation_ping:
                 federation_presence_out_counter.inc_by(len(to_federation_ping))
 
-                _, _, hosts_to_states = yield self._get_interested_parties(
-                    to_federation_ping.values()
-                )
-
-                self._push_to_remotes(hosts_to_states)
+                self._push_to_remotes(to_federation_ping.values())
 
     def _handle_timeouts(self):
         """Checks the presence of users that have timed out and updates as
@@ -509,6 +507,73 @@ class PresenceHandler(object):
         self.external_process_to_current_syncs[process_id] = syncing_user_ids
 
     @defer.inlineCallbacks
+    def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
+        """Update the syncing users for an external process as a delta.
+
+        Args:
+            process_id (str): An identifier for the process the users are
+                syncing against. This allows synapse to process updates
+                as user start and stop syncing against a given process.
+            user_id (str): The user who has started or stopped syncing
+            is_syncing (bool): Whether or not the user is now syncing
+            sync_time_msec(int): Time in ms when the user was last syncing
+        """
+        with (yield self.external_sync_linearizer.queue(process_id)):
+            prev_state = yield self.current_state_for_user(user_id)
+
+            process_presence = self.external_process_to_current_syncs.setdefault(
+                process_id, set()
+            )
+
+            updates = []
+            if is_syncing and user_id not in process_presence:
+                if prev_state.state == PresenceState.OFFLINE:
+                    updates.append(prev_state.copy_and_replace(
+                        state=PresenceState.ONLINE,
+                        last_active_ts=sync_time_msec,
+                        last_user_sync_ts=sync_time_msec,
+                    ))
+                else:
+                    updates.append(prev_state.copy_and_replace(
+                        last_user_sync_ts=sync_time_msec,
+                    ))
+                process_presence.add(user_id)
+            elif user_id in process_presence:
+                updates.append(prev_state.copy_and_replace(
+                    last_user_sync_ts=sync_time_msec,
+                ))
+
+            if not is_syncing:
+                process_presence.discard(user_id)
+
+            if updates:
+                yield self._update_states(updates)
+
+            self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
+
+    @defer.inlineCallbacks
+    def update_external_syncs_clear(self, process_id):
+        """Marks all users that had been marked as syncing by a given process
+        as offline.
+
+        Used when the process has stopped/disappeared.
+        """
+        with (yield self.external_sync_linearizer.queue(process_id)):
+            process_presence = self.external_process_to_current_syncs.pop(
+                process_id, set()
+            )
+            prev_states = yield self.current_state_for_users(process_presence)
+            time_now_ms = self.clock.time_msec()
+
+            yield self._update_states([
+                prev_state.copy_and_replace(
+                    last_user_sync_ts=time_now_ms,
+                )
+                for prev_state in prev_states.itervalues()
+            ])
+            self.external_process_last_updated_ms.pop(process_id, None)
+
+    @defer.inlineCallbacks
     def current_state_for_user(self, user_id):
         """Get the current presence state for a user.
         """
@@ -527,14 +592,14 @@ class PresenceHandler(object):
             for user_id in user_ids
         }
 
-        missing = [user_id for user_id, state in states.items() if not state]
+        missing = [user_id for user_id, state in states.iteritems() if not state]
         if missing:
             # There are things not in our in memory cache. Lets pull them out of
             # the database.
             res = yield self.store.get_presence_for_users(missing)
             states.update(res)
 
-            missing = [user_id for user_id, state in states.items() if not state]
+            missing = [user_id for user_id, state in states.iteritems() if not state]
             if missing:
                 new = {
                     user_id: UserPresenceState.default(user_id)
@@ -546,88 +611,39 @@ class PresenceHandler(object):
         defer.returnValue(states)
 
     @defer.inlineCallbacks
-    def _get_interested_parties(self, states, calculate_remote_hosts=True):
-        """Given a list of states return which entities (rooms, users, servers)
-        are interested in the given states.
-
-        Returns:
-            3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`,
-            with each item being a dict of `entity_name` -> `[UserPresenceState]`
-        """
-        room_ids_to_states = {}
-        users_to_states = {}
-        for state in states:
-            room_ids = yield self.store.get_rooms_for_user(state.user_id)
-            for room_id in room_ids:
-                room_ids_to_states.setdefault(room_id, []).append(state)
-
-            plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
-            for u in plist:
-                users_to_states.setdefault(u, []).append(state)
-
-            # Always notify self
-            users_to_states.setdefault(state.user_id, []).append(state)
-
-        hosts_to_states = {}
-        if calculate_remote_hosts:
-            for room_id, states in room_ids_to_states.items():
-                local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
-                if not local_states:
-                    continue
-
-                hosts = yield self.store.get_hosts_in_room(room_id)
-
-                for host in hosts:
-                    hosts_to_states.setdefault(host, []).extend(local_states)
-
-        for user_id, states in users_to_states.items():
-            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
-            if not local_states:
-                continue
-
-            host = get_domain_from_id(user_id)
-            hosts_to_states.setdefault(host, []).extend(local_states)
-
-        # TODO: de-dup hosts_to_states, as a single host might have multiple
-        # of same presence
-
-        defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states))
-
-    @defer.inlineCallbacks
     def _persist_and_notify(self, states):
         """Persist states in the database, poke the notifier and send to
         interested remote servers
         """
         stream_id, max_token = yield self.store.update_presence(states)
 
-        parties = yield self._get_interested_parties(states)
-        room_ids_to_states, users_to_states, hosts_to_states = parties
+        parties = yield get_interested_parties(self.store, states)
+        room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
             "presence_key", stream_id, rooms=room_ids_to_states.keys(),
-            users=[UserID.from_string(u) for u in users_to_states.keys()]
+            users=[UserID.from_string(u) for u in users_to_states]
         )
 
-        self._push_to_remotes(hosts_to_states)
+        self._push_to_remotes(states)
 
     @defer.inlineCallbacks
     def notify_for_states(self, state, stream_id):
-        parties = yield self._get_interested_parties([state])
-        room_ids_to_states, users_to_states, hosts_to_states = parties
+        parties = yield get_interested_parties(self.store, [state])
+        room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
             "presence_key", stream_id, rooms=room_ids_to_states.keys(),
-            users=[UserID.from_string(u) for u in users_to_states.keys()]
+            users=[UserID.from_string(u) for u in users_to_states]
         )
 
-    def _push_to_remotes(self, hosts_to_states):
+    def _push_to_remotes(self, states):
         """Sends state updates to remote servers.
 
         Args:
-            hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]`
+            states (list(UserPresenceState))
         """
-        for host, states in hosts_to_states.items():
-            self.federation.send_presence(host, states)
+        self.federation.send_presence(states)
 
     @defer.inlineCallbacks
     def incoming_presence(self, origin, content):
@@ -764,18 +780,17 @@ class PresenceHandler(object):
         # don't need to send to local clients here, as that is done as part
         # of the event stream/sync.
         # TODO: Only send to servers not already in the room.
-        user_ids = yield self.store.get_users_in_room(room_id)
         if self.is_mine(user):
             state = yield self.current_state_for_user(user.to_string())
 
-            hosts = set(get_domain_from_id(u) for u in user_ids)
-            self._push_to_remotes({host: (state,) for host in hosts})
+            self._push_to_remotes([state])
         else:
+            user_ids = yield self.store.get_users_in_room(room_id)
             user_ids = filter(self.is_mine_id, user_ids)
 
             states = yield self.current_state_for_users(user_ids)
 
-            self._push_to_remotes({user.domain: states.values()})
+            self._push_to_remotes(states.values())
 
     @defer.inlineCallbacks
     def get_presence_list(self, observer_user, accepted=None):
@@ -1275,3 +1290,66 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
         persist_and_notify = True
 
     return new_state, persist_and_notify, federation_ping
+
+
+@defer.inlineCallbacks
+def get_interested_parties(store, states):
+    """Given a list of states return which entities (rooms, users)
+    are interested in the given states.
+
+    Args:
+        states (list(UserPresenceState))
+
+    Returns:
+        2-tuple: `(room_ids_to_states, users_to_states)`,
+        with each item being a dict of `entity_name` -> `[UserPresenceState]`
+    """
+    room_ids_to_states = {}
+    users_to_states = {}
+    for state in states:
+        room_ids = yield store.get_rooms_for_user(state.user_id)
+        for room_id in room_ids:
+            room_ids_to_states.setdefault(room_id, []).append(state)
+
+        plist = yield store.get_presence_list_observers_accepted(state.user_id)
+        for u in plist:
+            users_to_states.setdefault(u, []).append(state)
+
+        # Always notify self
+        users_to_states.setdefault(state.user_id, []).append(state)
+
+    defer.returnValue((room_ids_to_states, users_to_states))
+
+
+@defer.inlineCallbacks
+def get_interested_remotes(store, states, state_handler):
+    """Given a list of presence states figure out which remote servers
+    should be sent which.
+
+    All the presence states should be for local users only.
+
+    Args:
+        store (DataStore)
+        states (list(UserPresenceState))
+
+    Returns:
+        Deferred list of ([destinations], [UserPresenceState]), where for
+        each row the list of UserPresenceState should be sent to each
+        destination
+    """
+    hosts_and_states = []
+
+    # First we look up the rooms each user is in (as well as any explicit
+    # subscriptions), then for each distinct room we look up the remote
+    # hosts in those rooms.
+    room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
+
+    for room_id, states in room_ids_to_states.iteritems():
+        hosts = yield state_handler.get_current_hosts_in_room(room_id)
+        hosts_and_states.append((hosts, states))
+
+    for user_id, states in users_to_states.iteritems():
+        host = get_domain_from_id(user_id)
+        hosts_and_states.append(([host], states))
+
+    defer.returnValue(hosts_and_states)
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 9bf638f818..7abee98dea 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -156,7 +156,7 @@ class ProfileHandler(BaseHandler):
         if not self.hs.is_mine(user):
             return
 
-        self.ratelimit(requester)
+        yield self.ratelimit(requester)
 
         room_ids = yield self.store.get_rooms_for_user(
             user.to_string(),
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
new file mode 100644
index 0000000000..b5b0303d54
--- /dev/null
+++ b/synapse/handlers/read_marker.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseHandler
+
+from twisted.internet import defer
+
+from synapse.util.async import Linearizer
+
+import logging
+logger = logging.getLogger(__name__)
+
+
+class ReadMarkerHandler(BaseHandler):
+    def __init__(self, hs):
+        super(ReadMarkerHandler, self).__init__(hs)
+        self.server_name = hs.config.server_name
+        self.store = hs.get_datastore()
+        self.read_marker_linearizer = Linearizer(name="read_marker")
+        self.notifier = hs.get_notifier()
+
+    @defer.inlineCallbacks
+    def received_client_read_marker(self, room_id, user_id, event_id):
+        """Updates the read marker for a given user in a given room if the event ID given
+        is ahead in the stream relative to the current read marker.
+
+        This uses a notifier to indicate that account data should be sent down /sync if
+        the read marker has changed.
+        """
+
+        with (yield self.read_marker_linearizer.queue((room_id, user_id))):
+            account_data = yield self.store.get_account_data_for_room(user_id, room_id)
+
+            existing_read_marker = account_data.get("m.fully_read", None)
+
+            should_update = True
+
+            if existing_read_marker:
+                # Only update if the new marker is ahead in the stream
+                should_update = yield self.store.is_event_after(
+                    event_id,
+                    existing_read_marker['event_id']
+                )
+
+            if should_update:
+                content = {
+                    "event_id": event_id
+                }
+                max_id = yield self.store.add_account_data_to_room(
+                    user_id, room_id, "m.fully_read", content
+                )
+                self.notifier.on_new_event("account_data_key", max_id, users=[user_id])
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 03c6a85fc6..ee3a2269a8 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -54,6 +54,13 @@ class RegistrationHandler(BaseHandler):
                 Codes.INVALID_USERNAME
             )
 
+        if not localpart:
+            raise SynapseError(
+                400,
+                "User ID cannot be empty",
+                Codes.INVALID_USERNAME
+            )
+
         if localpart[0] == '_':
             raise SynapseError(
                 400,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 99cb7db0db..d2a0d6520a 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -75,7 +75,7 @@ class RoomCreationHandler(BaseHandler):
         """
         user_id = requester.user.to_string()
 
-        self.ratelimit(requester)
+        yield self.ratelimit(requester)
 
         if "room_alias_name" in config:
             for wchar in string.whitespace:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 2052d6d05f..1ca88517a2 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -70,6 +70,7 @@ class RoomMemberHandler(BaseHandler):
             content["kind"] = "guest"
 
         event, context = yield msg_handler.create_event(
+            requester,
             {
                 "type": EventTypes.Member,
                 "content": content,
@@ -139,13 +140,6 @@ class RoomMemberHandler(BaseHandler):
         )
         yield user_joined_room(self.distributor, user, room_id)
 
-    def reject_remote_invite(self, user_id, room_id, remote_room_hosts):
-        return self.hs.get_handlers().federation_handler.do_remotely_reject_invite(
-            remote_room_hosts,
-            room_id,
-            user_id
-        )
-
     @defer.inlineCallbacks
     def update_membership(
             self,
@@ -286,13 +280,21 @@ class RoomMemberHandler(BaseHandler):
                 else:
                     # send the rejection to the inviter's HS.
                     remote_room_hosts = remote_room_hosts + [inviter.domain]
-
+                    fed_handler = self.hs.get_handlers().federation_handler
                     try:
-                        ret = yield self.reject_remote_invite(
-                            target.to_string(), room_id, remote_room_hosts
+                        ret = yield fed_handler.do_remotely_reject_invite(
+                            remote_room_hosts,
+                            room_id,
+                            target.to_string(),
                         )
                         defer.returnValue(ret)
-                    except SynapseError as e:
+                    except Exception as e:
+                        # if we were unable to reject the exception, just mark
+                        # it as rejected on our end and plough ahead.
+                        #
+                        # The 'except' clause is very broad, but we need to
+                        # capture everything from DNS failures upwards
+                        #
                         logger.warn("Failed to reject invite: %s", e)
 
                         yield self.store.locally_reject_invite(
@@ -737,10 +739,11 @@ class RoomMemberHandler(BaseHandler):
         if len(current_state_ids) == 1 and create_event_id:
             defer.returnValue(self.hs.is_mine_id(create_event_id))
 
-        for (etype, state_key), event_id in current_state_ids.items():
+        for etype, state_key in current_state_ids:
             if etype != EventTypes.Member or not self.hs.is_mine_id(state_key):
                 continue
 
+            event_id = current_state_ids[(etype, state_key)]
             event = yield self.store.get_event(event_id, allow_none=True)
             if not event:
                 continue
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 0eea7f8f9c..3b7818af5c 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -24,7 +24,6 @@ from synapse.types import UserID, get_domain_from_id
 import logging
 
 from collections import namedtuple
-import ujson as json
 
 logger = logging.getLogger(__name__)
 
@@ -288,11 +287,13 @@ class TypingHandler(object):
         for room_id, serial in self._room_serials.items():
             if last_id < serial and serial <= current_id:
                 typing = self._room_typing[room_id]
-                typing_bytes = json.dumps(list(typing), ensure_ascii=False)
-                rows.append((serial, room_id, typing_bytes))
+                rows.append((serial, room_id, list(typing)))
         rows.sort()
         return rows
 
+    def get_current_token(self):
+        return self._latest_room_serial
+
 
 class TypingNotificationEventSource(object):
     def __init__(self, hs):