summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py108
-rw-r--r--synapse/api/constants.py1
-rw-r--r--synapse/api/errors.py9
-rw-r--r--synapse/api/filtering.py196
-rw-r--r--synapse/appservice/scheduler.py4
-rw-r--r--synapse/config/registration.py6
-rw-r--r--synapse/events/utils.py4
-rw-r--r--synapse/federation/federation_client.py73
-rw-r--r--synapse/federation/federation_server.py33
-rw-r--r--synapse/federation/transaction_queue.py28
-rw-r--r--synapse/federation/transport/client.py27
-rw-r--r--synapse/federation/transport/server.py22
-rw-r--r--synapse/handlers/__init__.py5
-rw-r--r--synapse/handlers/_base.py57
-rw-r--r--synapse/handlers/auth.py3
-rw-r--r--synapse/handlers/federation.py227
-rw-r--r--synapse/handlers/message.py79
-rw-r--r--synapse/handlers/private_user_data.py46
-rw-r--r--synapse/handlers/room.py311
-rw-r--r--synapse/handlers/search.py154
-rw-r--r--synapse/handlers/sync.py352
-rw-r--r--synapse/http/client.py4
-rw-r--r--synapse/notifier.py2
-rw-r--r--synapse/rest/client/v1/login.py32
-rw-r--r--synapse/rest/client/v1/room.py89
-rw-r--r--synapse/rest/client/v2_alpha/__init__.py2
-rw-r--r--synapse/rest/client/v2_alpha/sync.py44
-rw-r--r--synapse/rest/client/v2_alpha/tags.py106
-rw-r--r--synapse/storage/__init__.py4
-rw-r--r--synapse/storage/_base.py2
-rw-r--r--synapse/storage/engines/sqlite3.py27
-rw-r--r--synapse/storage/events.py2
-rw-r--r--synapse/storage/filtering.py4
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/room.py32
-rw-r--r--synapse/storage/roommember.py13
-rw-r--r--synapse/storage/schema/delta/25/fts.py127
-rw-r--r--synapse/storage/schema/delta/25/tags.sql38
-rw-r--r--synapse/storage/search.py111
-rw-r--r--synapse/storage/state.py1
-rw-r--r--synapse/storage/stream.py137
-rw-r--r--synapse/storage/tags.py216
-rw-r--r--synapse/storage/transactions.py48
-rw-r--r--synapse/streams/events.py5
-rw-r--r--synapse/types.py26
-rw-r--r--synapse/util/emailutils.py71
-rw-r--r--synapse/util/third_party_invites.py69
47 files changed, 2443 insertions, 516 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index e3b8c3099a..88445fe999 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -14,13 +14,18 @@
 # limitations under the License.
 
 """This module contains classes for authenticating the user."""
+from canonicaljson import encode_canonical_json
+from signedjson.key import decode_verify_key_bytes
+from signedjson.sign import verify_signed_json, SignatureVerifyException
 
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership, JoinRules
-from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.util.logutils import log_function
+from synapse.api.errors import AuthError, Codes, SynapseError, EventSizeError
 from synapse.types import RoomID, UserID, EventID
+from synapse.util.logutils import log_function
+from synapse.util import third_party_invites
+from unpaddedbase64 import decode_base64
 
 import logging
 import pymacaroons
@@ -31,6 +36,7 @@ logger = logging.getLogger(__name__)
 AuthEventTypes = (
     EventTypes.Create, EventTypes.Member, EventTypes.PowerLevels,
     EventTypes.JoinRules, EventTypes.RoomHistoryVisibility,
+    EventTypes.ThirdPartyInvite,
 )
 
 
@@ -59,6 +65,8 @@ class Auth(object):
         Returns:
             True if the auth checks pass.
         """
+        self.check_size_limits(event)
+
         try:
             if not hasattr(event, "room_id"):
                 raise AuthError(500, "Event has no room_id: %s" % event)
@@ -126,6 +134,23 @@ class Auth(object):
             logger.info("Denying! %s", event)
             raise
 
+    def check_size_limits(self, event):
+        def too_big(field):
+            raise EventSizeError("%s too large" % (field,))
+
+        if len(event.user_id) > 255:
+            too_big("user_id")
+        if len(event.room_id) > 255:
+            too_big("room_id")
+        if event.is_state() and len(event.state_key) > 255:
+            too_big("state_key")
+        if len(event.type) > 255:
+            too_big("type")
+        if len(event.event_id) > 255:
+            too_big("event_id")
+        if len(encode_canonical_json(event.get_pdu_json())) > 65536:
+            too_big("event")
+
     @defer.inlineCallbacks
     def check_joined_room(self, room_id, user_id, current_state=None):
         """Check if the user is currently joined in the room
@@ -303,7 +328,11 @@ class Auth(object):
         )
 
         if Membership.JOIN != membership:
-            # JOIN is the only action you can perform if you're not in the room
+            if (caller_invited
+                    and Membership.LEAVE == membership
+                    and target_user_id == event.user_id):
+                return True
+
             if not caller_in_room:  # caller isn't joined
                 raise AuthError(
                     403,
@@ -341,7 +370,8 @@ class Auth(object):
                 pass
             elif join_rule == JoinRules.INVITE:
                 if not caller_in_room and not caller_invited:
-                    raise AuthError(403, "You are not invited to this room.")
+                    if not self._verify_third_party_invite(event, auth_events):
+                        raise AuthError(403, "You are not invited to this room.")
             else:
                 # TODO (erikj): may_join list
                 # TODO (erikj): private rooms
@@ -367,6 +397,68 @@ class Auth(object):
 
         return True
 
+    def _verify_third_party_invite(self, event, auth_events):
+        """
+        Validates that the join event is authorized by a previous third-party invite.
+
+        Checks that the public key, and keyserver, match those in the invite,
+        and that the join event has a signature issued using that public key.
+
+        Args:
+            event: The m.room.member join event being validated.
+            auth_events: All relevant previous context events which may be used
+                for authorization decisions.
+
+        Return:
+            True if the event fulfills the expectations of a previous third party
+            invite event.
+        """
+        if not third_party_invites.join_has_third_party_invite(event.content):
+            return False
+        join_third_party_invite = event.content["third_party_invite"]
+        token = join_third_party_invite["token"]
+        invite_event = auth_events.get(
+            (EventTypes.ThirdPartyInvite, token,)
+        )
+        if not invite_event:
+            logger.info("Failing 3pid invite because no invite found for token %s", token)
+            return False
+        try:
+            public_key = join_third_party_invite["public_key"]
+            key_validity_url = join_third_party_invite["key_validity_url"]
+            if invite_event.content["public_key"] != public_key:
+                logger.info(
+                    "Failing 3pid invite because public key invite: %s != join: %s",
+                    invite_event.content["public_key"],
+                    public_key
+                )
+                return False
+            if invite_event.content["key_validity_url"] != key_validity_url:
+                logger.info(
+                    "Failing 3pid invite because key_validity_url invite: %s != join: %s",
+                    invite_event.content["key_validity_url"],
+                    key_validity_url
+                )
+                return False
+            signed = join_third_party_invite["signed"]
+            if signed["mxid"] != event.user_id:
+                return False
+            if signed["token"] != token:
+                return False
+            for server, signature_block in signed["signatures"].items():
+                for key_name, encoded_signature in signature_block.items():
+                    if not key_name.startswith("ed25519:"):
+                        return False
+                    verify_key = decode_verify_key_bytes(
+                        key_name,
+                        decode_base64(public_key)
+                    )
+                    verify_signed_json(signed, server, verify_key)
+                    return True
+            return False
+        except (KeyError, SignatureVerifyException,):
+            return False
+
     def _get_power_level_event(self, auth_events):
         key = (EventTypes.PowerLevels, "", )
         return auth_events.get(key)
@@ -646,6 +738,14 @@ class Auth(object):
             if e_type == Membership.JOIN:
                 if member_event and not is_public:
                     auth_ids.append(member_event.event_id)
+                if third_party_invites.join_has_third_party_invite(event.content):
+                    key = (
+                        EventTypes.ThirdPartyInvite,
+                        event.content["third_party_invite"]["token"]
+                    )
+                    invite = current_state.get(key)
+                    if invite:
+                        auth_ids.append(invite.event_id)
             else:
                 if member_event:
                     auth_ids.append(member_event.event_id)
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 008ee64727..41125e8719 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -63,6 +63,7 @@ class EventTypes(object):
     PowerLevels = "m.room.power_levels"
     Aliases = "m.room.aliases"
     Redaction = "m.room.redaction"
+    ThirdPartyInvite = "m.room.third_party_invite"
 
     RoomHistoryVisibility = "m.room.history_visibility"
     CanonicalAlias = "m.room.canonical_alias"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index d1356eb4d9..b3fea27d0e 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -119,6 +119,15 @@ class AuthError(SynapseError):
         super(AuthError, self).__init__(*args, **kwargs)
 
 
+class EventSizeError(SynapseError):
+    """An error raised when an event is too big."""
+
+    def __init__(self, *args, **kwargs):
+        if "errcode" not in kwargs:
+            kwargs["errcode"] = Codes.TOO_LARGE
+        super(EventSizeError, self).__init__(413, *args, **kwargs)
+
+
 class EventStreamError(SynapseError):
     """An error raised when there a problem with the event stream."""
     def __init__(self, *args, **kwargs):
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index e79e91e7eb..e4e3d1c59d 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -24,7 +24,7 @@ class Filtering(object):
 
     def get_user_filter(self, user_localpart, filter_id):
         result = self.store.get_user_filter(user_localpart, filter_id)
-        result.addCallback(Filter)
+        result.addCallback(FilterCollection)
         return result
 
     def add_user_filter(self, user_localpart, user_filter):
@@ -131,125 +131,133 @@ class Filtering(object):
             raise SynapseError(400, "Bad bundle_updates: expected bool.")
 
 
-class Filter(object):
+class FilterCollection(object):
     def __init__(self, filter_json):
         self.filter_json = filter_json
 
+        self.room_timeline_filter = Filter(
+            self.filter_json.get("room", {}).get("timeline", {})
+        )
+
+        self.room_state_filter = Filter(
+            self.filter_json.get("room", {}).get("state", {})
+        )
+
+        self.room_ephemeral_filter = Filter(
+            self.filter_json.get("room", {}).get("ephemeral", {})
+        )
+
+        self.room_private_user_data = Filter(
+            self.filter_json.get("room", {}).get("private_user_data", {})
+        )
+
+        self.presence_filter = Filter(
+            self.filter_json.get("presence", {})
+        )
+
     def timeline_limit(self):
-        return self.filter_json.get("room", {}).get("timeline", {}).get("limit", 10)
+        return self.room_timeline_filter.limit()
 
     def presence_limit(self):
-        return self.filter_json.get("presence", {}).get("limit", 10)
+        return self.presence_filter.limit()
 
     def ephemeral_limit(self):
-        return self.filter_json.get("room", {}).get("ephemeral", {}).get("limit", 10)
+        return self.room_ephemeral_filter.limit()
 
     def filter_presence(self, events):
-        return self._filter_on_key(events, ["presence"])
+        return self.presence_filter.filter(events)
 
     def filter_room_state(self, events):
-        return self._filter_on_key(events, ["room", "state"])
+        return self.room_state_filter.filter(events)
 
     def filter_room_timeline(self, events):
-        return self._filter_on_key(events, ["room", "timeline"])
+        return self.room_timeline_filter.filter(events)
 
     def filter_room_ephemeral(self, events):
-        return self._filter_on_key(events, ["room", "ephemeral"])
-
-    def _filter_on_key(self, events, keys):
-        filter_json = self.filter_json
-        if not filter_json:
-            return events
-
-        try:
-            # extract the right definition from the filter
-            definition = filter_json
-            for key in keys:
-                definition = definition[key]
-            return self._filter_with_definition(events, definition)
-        except KeyError:
-            # return all events if definition isn't specified.
-            return events
-
-    def _filter_with_definition(self, events, definition):
-        return [e for e in events if self._passes_definition(definition, e)]
-
-    def _passes_definition(self, definition, event):
-        """Check if the event passes the filter definition
-        Args:
-            definition(dict): The filter definition to check against
-            event(dict or Event): The event to check
+        return self.room_ephemeral_filter.filter(events)
+
+    def filter_room_private_user_data(self, events):
+        return self.room_private_user_data.filter(events)
+
+
+class Filter(object):
+    def __init__(self, filter_json):
+        self.filter_json = filter_json
+
+    def check(self, event):
+        """Checks whether the filter matches the given event.
+
         Returns:
-            True if the event passes the filter in the definition
+            bool: True if the event matches
         """
-        if type(event) is dict:
-            room_id = event.get("room_id")
-            sender = event.get("sender")
-            event_type = event["type"]
+        if isinstance(event, dict):
+            return self.check_fields(
+                event.get("room_id", None),
+                event.get("sender", None),
+                event.get("type", None),
+            )
         else:
-            room_id = getattr(event, "room_id", None)
-            sender = getattr(event, "sender", None)
-            event_type = event.type
-        return self._event_passes_definition(
-            definition, room_id, sender, event_type
-        )
+            return self.check_fields(
+                getattr(event, "room_id", None),
+                getattr(event, "sender", None),
+                event.type,
+            )
 
-    def _event_passes_definition(self, definition, room_id, sender,
-                                 event_type):
-        """Check if the event passes through the given definition.
+    def check_fields(self, room_id, sender, event_type):
+        """Checks whether the filter matches the given event fields.
 
-        Args:
-            definition(dict): The definition to check against.
-            room_id(str): The id of the room this event is in or None.
-            sender(str): The sender of the event
-            event_type(str): The type of the event.
         Returns:
-            True if the event passes through the filter.
+            bool: True if the event fields match
         """
-        # Algorithm notes:
-        # For each key in the definition, check the event meets the criteria:
-        #   * For types: Literal match or prefix match (if ends with wildcard)
-        #   * For senders/rooms: Literal match only
-        #   * "not_" checks take presedence (e.g. if "m.*" is in both 'types'
-        #     and 'not_types' then it is treated as only being in 'not_types')
-
-        # room checks
-        if room_id is not None:
-            allow_rooms = definition.get("rooms", None)
-            reject_rooms = definition.get("not_rooms", None)
-            if reject_rooms and room_id in reject_rooms:
-                return False
-            if allow_rooms and room_id not in allow_rooms:
+        literal_keys = {
+            "rooms": lambda v: room_id == v,
+            "senders": lambda v: sender == v,
+            "types": lambda v: _matches_wildcard(event_type, v)
+        }
+
+        for name, match_func in literal_keys.items():
+            not_name = "not_%s" % (name,)
+            disallowed_values = self.filter_json.get(not_name, [])
+            if any(map(match_func, disallowed_values)):
                 return False
 
-        # sender checks
-        if sender is not None:
-            allow_senders = definition.get("senders", None)
-            reject_senders = definition.get("not_senders", None)
-            if reject_senders and sender in reject_senders:
-                return False
-            if allow_senders and sender not in allow_senders:
-                return False
-
-        # type checks
-        if "not_types" in definition:
-            for def_type in definition["not_types"]:
-                if self._event_matches_type(event_type, def_type):
+            allowed_values = self.filter_json.get(name, None)
+            if allowed_values is not None:
+                if not any(map(match_func, allowed_values)):
                     return False
-        if "types" in definition:
-            included = False
-            for def_type in definition["types"]:
-                if self._event_matches_type(event_type, def_type):
-                    included = True
-                    break
-            if not included:
-                return False
 
         return True
 
-    def _event_matches_type(self, event_type, def_type):
-        if def_type.endswith("*"):
-            type_prefix = def_type[:-1]
-            return event_type.startswith(type_prefix)
-        else:
-            return event_type == def_type
+    def filter_rooms(self, room_ids):
+        """Apply the 'rooms' filter to a given list of rooms.
+
+        Args:
+            room_ids (list): A list of room_ids.
+
+        Returns:
+            list: A list of room_ids that match the filter
+        """
+        room_ids = set(room_ids)
+
+        disallowed_rooms = set(self.filter_json.get("not_rooms", []))
+        room_ids -= disallowed_rooms
+
+        allowed_rooms = self.filter_json.get("rooms", None)
+        if allowed_rooms is not None:
+            room_ids &= set(allowed_rooms)
+
+        return room_ids
+
+    def filter(self, events):
+        return filter(self.check, events)
+
+    def limit(self):
+        return self.filter_json.get("limit", 10)
+
+
+def _matches_wildcard(actual_value, filter_value):
+    if filter_value.endswith("*"):
+        type_prefix = filter_value[:-1]
+        return actual_value.startswith(type_prefix)
+    else:
+        return actual_value == filter_value
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 59b0b1f4ac..44dc2c4744 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -224,8 +224,8 @@ class _Recoverer(object):
         self.clock.call_later((2 ** self.backoff_counter), self.retry)
 
     def _backoff(self):
-        # cap the backoff to be around 18h => (2^16) = 65536 secs
-        if self.backoff_counter < 16:
+        # cap the backoff to be around 8.5min => (2^9) = 512 secs
+        if self.backoff_counter < 9:
             self.backoff_counter += 1
         self.recover()
 
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index fa98eced34..f5ef36a9f4 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -33,6 +33,7 @@ class RegistrationConfig(Config):
 
         self.registration_shared_secret = config.get("registration_shared_secret")
         self.macaroon_secret_key = config.get("macaroon_secret_key")
+        self.bcrypt_rounds = config.get("bcrypt_rounds", 12)
 
     def default_config(self, **kwargs):
         registration_shared_secret = random_string_with_symbols(50)
@@ -48,6 +49,11 @@ class RegistrationConfig(Config):
         registration_shared_secret: "%(registration_shared_secret)s"
 
         macaroon_secret_key: "%(macaroon_secret_key)s"
+
+        # Set the number of bcrypt rounds used to generate password hash.
+        # Larger numbers increase the work factor needed to generate the hash.
+        # The default number of rounds is 12.
+        bcrypt_rounds: 12
         """ % locals()
 
     def add_arguments(self, parser):
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index b36eec0993..9989b76591 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -66,7 +66,6 @@ def prune_event(event):
             "users_default",
             "events",
             "events_default",
-            "events_default",
             "state_default",
             "ban",
             "kick",
@@ -155,7 +154,8 @@ def serialize_event(e, time_now_ms, as_client_event=True,
 
     if "redacted_because" in e.unsigned:
         d["unsigned"]["redacted_because"] = serialize_event(
-            e.unsigned["redacted_because"], time_now_ms
+            e.unsigned["redacted_because"], time_now_ms,
+            event_format=event_format
         )
 
     if token_id is not None:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index f5e346cdbc..723f571284 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -17,6 +17,7 @@
 from twisted.internet import defer
 
 from .federation_base import FederationBase
+from synapse.api.constants import Membership
 from .units import Edu
 
 from synapse.api.errors import (
@@ -25,6 +26,7 @@ from synapse.api.errors import (
 from synapse.util import unwrapFirstError
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.logutils import log_function
+from synapse.util import third_party_invites
 from synapse.events import FrozenEvent
 import synapse.metrics
 
@@ -356,19 +358,51 @@ class FederationClient(FederationBase):
         defer.returnValue(signed_auth)
 
     @defer.inlineCallbacks
-    def make_join(self, destinations, room_id, user_id):
+    def make_membership_event(self, destinations, room_id, user_id, membership, content):
+        """
+        Creates an m.room.member event, with context, without participating in the room.
+
+        Does so by asking one of the already participating servers to create an
+        event with proper context.
+
+        Note that this does not append any events to any graphs.
+
+        Args:
+            destinations (str): Candidate homeservers which are probably
+                participating in the room.
+            room_id (str): The room in which the event will happen.
+            user_id (str): The user whose membership is being evented.
+            membership (str): The "membership" property of the event. Must be
+                one of "join" or "leave".
+            content (object): Any additional data to put into the content field
+                of the event.
+        Return:
+            A tuple of (origin (str), event (object)) where origin is the remote
+            homeserver which generated the event.
+        """
+        valid_memberships = {Membership.JOIN, Membership.LEAVE}
+        if membership not in valid_memberships:
+            raise RuntimeError(
+                "make_membership_event called with membership='%s', must be one of %s" %
+                (membership, ",".join(valid_memberships))
+            )
         for destination in destinations:
             if destination == self.server_name:
                 continue
 
+            args = {}
+            if third_party_invites.join_has_third_party_invite(content):
+                args = third_party_invites.extract_join_keys(
+                    content["third_party_invite"]
+                )
             try:
-                ret = yield self.transport_layer.make_join(
-                    destination, room_id, user_id
+                ret = yield self.transport_layer.make_membership_event(
+                    destination, room_id, user_id, membership, args
                 )
 
                 pdu_dict = ret["event"]
 
-                logger.debug("Got response to make_join: %s", pdu_dict)
+                logger.debug("Got response to make_%s: %s", membership, pdu_dict)
 
                 defer.returnValue(
                     (destination, self.event_from_pdu_json(pdu_dict))
@@ -378,8 +412,8 @@ class FederationClient(FederationBase):
                 raise
             except Exception as e:
                 logger.warn(
-                    "Failed to make_join via %s: %s",
-                    destination, e.message
+                    "Failed to make_%s via %s: %s",
+                    membership, destination, e.message
                 )
 
         raise RuntimeError("Failed to send to any server.")
@@ -486,6 +520,33 @@ class FederationClient(FederationBase):
         defer.returnValue(pdu)
 
     @defer.inlineCallbacks
+    def send_leave(self, destinations, pdu):
+        for destination in destinations:
+            if destination == self.server_name:
+                continue
+
+            try:
+                time_now = self._clock.time_msec()
+                _, content = yield self.transport_layer.send_leave(
+                    destination=destination,
+                    room_id=pdu.room_id,
+                    event_id=pdu.event_id,
+                    content=pdu.get_pdu_json(time_now),
+                )
+
+                logger.debug("Got content: %s", content)
+                defer.returnValue(None)
+            except CodeMessageException:
+                raise
+            except Exception as e:
+                logger.exception(
+                    "Failed to send_leave via %s: %s",
+                    destination, e.message
+                )
+
+        raise RuntimeError("Failed to send to any server.")
+
+    @defer.inlineCallbacks
     def query_auth(self, destination, room_id, event_id, local_auth):
         """
         Params:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 725c6f3fa5..9e2d9ee74c 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -23,10 +23,12 @@ from synapse.util.logutils import log_function
 from synapse.events import FrozenEvent
 import synapse.metrics
 
-from synapse.api.errors import FederationError, SynapseError
+from synapse.api.errors import FederationError, SynapseError, Codes
 
 from synapse.crypto.event_signing import compute_event_signature
 
+from synapse.util import third_party_invites
+
 import simplejson as json
 import logging
 
@@ -228,8 +230,19 @@ class FederationServer(FederationBase):
             )
 
     @defer.inlineCallbacks
-    def on_make_join_request(self, room_id, user_id):
-        pdu = yield self.handler.on_make_join_request(room_id, user_id)
+    def on_make_join_request(self, room_id, user_id, query):
+        threepid_details = {}
+        if third_party_invites.has_join_keys(query):
+            for k in third_party_invites.JOIN_KEYS:
+                if not isinstance(query[k], list) or len(query[k]) != 1:
+                    raise FederationError(
+                        "FATAL",
+                        Codes.MISSING_PARAM,
+                        "key %s value %s" % (k, query[k],),
+                        None
+                    )
+                threepid_details[k] = query[k][0]
+        pdu = yield self.handler.on_make_join_request(room_id, user_id, threepid_details)
         time_now = self._clock.time_msec()
         defer.returnValue({"event": pdu.get_pdu_json(time_now)})
 
@@ -255,6 +268,20 @@ class FederationServer(FederationBase):
         }))
 
     @defer.inlineCallbacks
+    def on_make_leave_request(self, room_id, user_id):
+        pdu = yield self.handler.on_make_leave_request(room_id, user_id)
+        time_now = self._clock.time_msec()
+        defer.returnValue({"event": pdu.get_pdu_json(time_now)})
+
+    @defer.inlineCallbacks
+    def on_send_leave_request(self, origin, content):
+        logger.debug("on_send_leave_request: content: %s", content)
+        pdu = self.event_from_pdu_json(content)
+        logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
+        yield self.handler.on_send_leave_request(origin, pdu)
+        defer.returnValue((200, {}))
+
+    @defer.inlineCallbacks
     def on_event_auth(self, origin, room_id, event_id):
         time_now = self._clock.time_msec()
         auth_pdus = yield self.handler.on_event_auth(event_id)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 32fa5e8c15..aac6f1c167 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -202,6 +202,7 @@ class TransactionQueue(object):
     @defer.inlineCallbacks
     @log_function
     def _attempt_new_transaction(self, destination):
+        # list of (pending_pdu, deferred, order)
         if destination in self.pending_transactions:
             # XXX: pending_transactions can get stuck on by a never-ending
             # request at which point pending_pdus_by_dest just keeps growing.
@@ -213,9 +214,6 @@ class TransactionQueue(object):
             )
             return
 
-        logger.debug("TX [%s] _attempt_new_transaction", destination)
-
-        # list of (pending_pdu, deferred, order)
         pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
         pending_edus = self.pending_edus_by_dest.pop(destination, [])
         pending_failures = self.pending_failures_by_dest.pop(destination, [])
@@ -228,20 +226,22 @@ class TransactionQueue(object):
             logger.debug("TX [%s] Nothing to send", destination)
             return
 
-        # Sort based on the order field
-        pending_pdus.sort(key=lambda t: t[2])
-
-        pdus = [x[0] for x in pending_pdus]
-        edus = [x[0] for x in pending_edus]
-        failures = [x[0].get_dict() for x in pending_failures]
-        deferreds = [
-            x[1]
-            for x in pending_pdus + pending_edus + pending_failures
-        ]
-
         try:
             self.pending_transactions[destination] = 1
 
+            logger.debug("TX [%s] _attempt_new_transaction", destination)
+
+            # Sort based on the order field
+            pending_pdus.sort(key=lambda t: t[2])
+
+            pdus = [x[0] for x in pending_pdus]
+            edus = [x[0] for x in pending_edus]
+            failures = [x[0].get_dict() for x in pending_failures]
+            deferreds = [
+                x[1]
+                for x in pending_pdus + pending_edus + pending_failures
+            ]
+
             txn_id = str(self._next_txn_id)
 
             limiter = yield get_retry_limiter(
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index ced703364b..a81b3c4345 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from twisted.internet import defer
+from synapse.api.constants import Membership
 
 from synapse.api.urls import FEDERATION_PREFIX as PREFIX
 from synapse.util.logutils import log_function
@@ -160,13 +161,20 @@ class TransportLayerClient(object):
 
     @defer.inlineCallbacks
     @log_function
-    def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True):
-        path = PREFIX + "/make_join/%s/%s" % (room_id, user_id)
+    def make_membership_event(self, destination, room_id, user_id, membership, args={}):
+        valid_memberships = {Membership.JOIN, Membership.LEAVE}
+        if membership not in valid_memberships:
+            raise RuntimeError(
+                "make_membership_event called with membership='%s', must be one of %s" %
+                (membership, ",".join(valid_memberships))
+            )
+        path = PREFIX + "/make_%s/%s/%s" % (membership, room_id, user_id)
 
         content = yield self.client.get_json(
             destination=destination,
             path=path,
-            retry_on_dns_fail=retry_on_dns_fail,
+            args=args,
+            retry_on_dns_fail=True,
         )
 
         defer.returnValue(content)
@@ -186,6 +194,19 @@ class TransportLayerClient(object):
 
     @defer.inlineCallbacks
     @log_function
+    def send_leave(self, destination, room_id, event_id, content):
+        path = PREFIX + "/send_leave/%s/%s" % (room_id, event_id)
+
+        response = yield self.client.put_json(
+            destination=destination,
+            path=path,
+            data=content,
+        )
+
+        defer.returnValue(response)
+
+    @defer.inlineCallbacks
+    @log_function
     def send_invite(self, destination, room_id, event_id, content):
         path = PREFIX + "/invite/%s/%s" % (room_id, event_id)
 
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 36f250e1a3..8184159210 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -292,7 +292,25 @@ class FederationMakeJoinServlet(BaseFederationServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, origin, content, query, context, user_id):
-        content = yield self.handler.on_make_join_request(context, user_id)
+        content = yield self.handler.on_make_join_request(context, user_id, query)
+        defer.returnValue((200, content))
+
+
+class FederationMakeLeaveServlet(BaseFederationServlet):
+    PATH = "/make_leave/([^/]*)/([^/]*)"
+
+    @defer.inlineCallbacks
+    def on_GET(self, origin, content, query, context, user_id):
+        content = yield self.handler.on_make_leave_request(context, user_id)
+        defer.returnValue((200, content))
+
+
+class FederationSendLeaveServlet(BaseFederationServlet):
+    PATH = "/send_leave/([^/]*)/([^/]*)"
+
+    @defer.inlineCallbacks
+    def on_PUT(self, origin, content, query, room_id, txid):
+        content = yield self.handler.on_send_leave_request(origin, content)
         defer.returnValue((200, content))
 
 
@@ -385,8 +403,10 @@ SERVLET_CLASSES = (
     FederationBackfillServlet,
     FederationQueryServlet,
     FederationMakeJoinServlet,
+    FederationMakeLeaveServlet,
     FederationEventServlet,
     FederationSendJoinServlet,
+    FederationSendLeaveServlet,
     FederationInviteServlet,
     FederationQueryAuthServlet,
     FederationGetMissingEventsServlet,
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 8725c3c420..6a2339f2eb 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,7 +17,7 @@ from synapse.appservice.scheduler import AppServiceScheduler
 from synapse.appservice.api import ApplicationServiceApi
 from .register import RegistrationHandler
 from .room import (
-    RoomCreationHandler, RoomMemberHandler, RoomListHandler
+    RoomCreationHandler, RoomMemberHandler, RoomListHandler, RoomContextHandler,
 )
 from .message import MessageHandler
 from .events import EventStreamHandler, EventHandler
@@ -32,6 +32,7 @@ from .sync import SyncHandler
 from .auth import AuthHandler
 from .identity import IdentityHandler
 from .receipts import ReceiptsHandler
+from .search import SearchHandler
 
 
 class Handlers(object):
@@ -68,3 +69,5 @@ class Handlers(object):
         self.sync_handler = SyncHandler(hs)
         self.auth_handler = AuthHandler(hs)
         self.identity_handler = IdentityHandler(hs)
+        self.search_handler = SearchHandler(hs)
+        self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index c488ee0f6d..6a26cb1879 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -21,6 +21,7 @@ from synapse.api.constants import Membership, EventTypes
 from synapse.types import UserID, RoomAlias
 
 from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util import third_party_invites
 
 import logging
 
@@ -45,6 +46,52 @@ class BaseHandler(object):
 
         self.event_builder_factory = hs.get_event_builder_factory()
 
+    @defer.inlineCallbacks
+    def _filter_events_for_client(self, user_id, events):
+        event_id_to_state = yield self.store.get_state_for_events(
+            frozenset(e.event_id for e in events),
+            types=(
+                (EventTypes.RoomHistoryVisibility, ""),
+                (EventTypes.Member, user_id),
+            )
+        )
+
+        def allowed(event, state):
+            if event.type == EventTypes.RoomHistoryVisibility:
+                return True
+
+            membership_ev = state.get((EventTypes.Member, user_id), None)
+            if membership_ev:
+                membership = membership_ev.membership
+            else:
+                membership = Membership.LEAVE
+
+            if membership == Membership.JOIN:
+                return True
+
+            history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
+            if history:
+                visibility = history.content.get("history_visibility", "shared")
+            else:
+                visibility = "shared"
+
+            if visibility == "public":
+                return True
+            elif visibility == "shared":
+                return True
+            elif visibility == "joined":
+                return membership == Membership.JOIN
+            elif visibility == "invited":
+                return membership == Membership.INVITE
+
+            return True
+
+        defer.returnValue([
+            event
+            for event in events
+            if allowed(event, event_id_to_state[event.event_id])
+        ])
+
     def ratelimit(self, user_id):
         time_now = self.clock.time()
         allowed, time_allowed = self.ratelimiter.send_message(
@@ -123,6 +170,16 @@ class BaseHandler(object):
                         )
                     )
 
+        if (
+            event.type == EventTypes.Member and
+            event.content["membership"] == Membership.JOIN and
+            third_party_invites.join_has_third_party_invite(event.content)
+        ):
+            yield third_party_invites.check_key_valid(
+                self.hs.get_simple_http_client(),
+                event
+            )
+
         federation_handler = self.hs.get_handlers().federation_handler
 
         if event.type == EventTypes.Member:
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 484f719253..055d395b20 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -44,6 +44,7 @@ class AuthHandler(BaseHandler):
             LoginType.EMAIL_IDENTITY: self._check_email_identity,
             LoginType.DUMMY: self._check_dummy_auth,
         }
+        self.bcrypt_rounds = hs.config.bcrypt_rounds
         self.sessions = {}
 
     @defer.inlineCallbacks
@@ -432,7 +433,7 @@ class AuthHandler(BaseHandler):
         Returns:
             Hashed password (str).
         """
-        return bcrypt.hashpw(password, bcrypt.gensalt())
+        return bcrypt.hashpw(password, bcrypt.gensalt(self.bcrypt_rounds))
 
     def validate_hash(self, password, stored_hash):
         """Validates that self.hash(password) == stored_hash.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index a710bdcfdb..ae9d227586 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -39,7 +39,7 @@ from twisted.internet import defer
 
 import itertools
 import logging
-
+from synapse.util import third_party_invites
 
 logger = logging.getLogger(__name__)
 
@@ -565,7 +565,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot):
+    def do_invite_join(self, target_hosts, room_id, joinee, content):
         """ Attempts to join the `joinee` to the room `room_id` via the
         server `target_host`.
 
@@ -581,49 +581,19 @@ class FederationHandler(BaseHandler):
 
         yield self.store.clean_room_for_join(room_id)
 
-        origin, pdu = yield self.replication_layer.make_join(
+        origin, event = yield self._make_and_verify_event(
             target_hosts,
             room_id,
-            joinee
+            joinee,
+            "join",
+            content
         )
 
-        logger.debug("Got response to make_join: %s", pdu)
-
-        event = pdu
-
-        # We should assert some things.
-        # FIXME: Do this in a nicer way
-        assert(event.type == EventTypes.Member)
-        assert(event.user_id == joinee)
-        assert(event.state_key == joinee)
-        assert(event.room_id == room_id)
-
-        event.internal_metadata.outlier = False
-
         self.room_queues[room_id] = []
-
-        builder = self.event_builder_factory.new(
-            unfreeze(event.get_pdu_json())
-        )
-
         handled_events = set()
 
         try:
-            builder.event_id = self.event_builder_factory.create_event_id()
-            builder.origin = self.hs.hostname
-            builder.content = content
-
-            if not hasattr(event, "signatures"):
-                builder.signatures = {}
-
-            add_hashes_and_signatures(
-                builder,
-                self.hs.hostname,
-                self.hs.config.signing_key[0],
-            )
-
-            new_event = builder.build()
-
+            new_event = self._sign_event(event)
             # Try the host we successfully got a response to /make_join/
             # request first.
             try:
@@ -631,11 +601,7 @@ class FederationHandler(BaseHandler):
                 target_hosts.insert(0, origin)
             except ValueError:
                 pass
-
-            ret = yield self.replication_layer.send_join(
-                target_hosts,
-                new_event
-            )
+            ret = yield self.replication_layer.send_join(target_hosts, new_event)
 
             origin = ret["origin"]
             state = ret["state"]
@@ -697,14 +663,20 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def on_make_join_request(self, room_id, user_id):
+    def on_make_join_request(self, room_id, user_id, query):
         """ We've received a /make_join/ request, so we create a partial
-        join event for the room and return that. We don *not* persist or
+        join event for the room and return that. We do *not* persist or
         process it until the other server has signed it and sent it back.
         """
+        event_content = {"membership": Membership.JOIN}
+        if third_party_invites.has_join_keys(query):
+            event_content["third_party_invite"] = (
+                third_party_invites.extract_join_keys(query)
+            )
+
         builder = self.event_builder_factory.new({
             "type": EventTypes.Member,
-            "content": {"membership": Membership.JOIN},
+            "content": event_content,
             "room_id": room_id,
             "sender": user_id,
             "state_key": user_id,
@@ -716,6 +688,9 @@ class FederationHandler(BaseHandler):
 
         self.auth.check(event, auth_events=context.current_state)
 
+        if third_party_invites.join_has_third_party_invite(event.content):
+            third_party_invites.check_key_valid(self.hs.get_simple_http_client(), event)
+
         defer.returnValue(event)
 
     @defer.inlineCallbacks
@@ -850,6 +825,168 @@ class FederationHandler(BaseHandler):
         defer.returnValue(event)
 
     @defer.inlineCallbacks
+    def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
+        origin, event = yield self._make_and_verify_event(
+            target_hosts,
+            room_id,
+            user_id,
+            "leave",
+            {}
+        )
+        signed_event = self._sign_event(event)
+
+        # Try the host we successfully got a response to /make_join/
+        # request first.
+        try:
+            target_hosts.remove(origin)
+            target_hosts.insert(0, origin)
+        except ValueError:
+            pass
+
+        yield self.replication_layer.send_leave(
+            target_hosts,
+            signed_event
+        )
+        defer.returnValue(None)
+
+    @defer.inlineCallbacks
+    def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, content):
+        origin, pdu = yield self.replication_layer.make_membership_event(
+            target_hosts,
+            room_id,
+            user_id,
+            membership,
+            content
+        )
+
+        logger.debug("Got response to make_%s: %s", membership, pdu)
+
+        event = pdu
+
+        # We should assert some things.
+        # FIXME: Do this in a nicer way
+        assert(event.type == EventTypes.Member)
+        assert(event.user_id == user_id)
+        assert(event.state_key == user_id)
+        assert(event.room_id == room_id)
+        defer.returnValue((origin, event))
+
+    def _sign_event(self, event):
+        event.internal_metadata.outlier = False
+
+        builder = self.event_builder_factory.new(
+            unfreeze(event.get_pdu_json())
+        )
+
+        builder.event_id = self.event_builder_factory.create_event_id()
+        builder.origin = self.hs.hostname
+
+        if not hasattr(event, "signatures"):
+            builder.signatures = {}
+
+        add_hashes_and_signatures(
+            builder,
+            self.hs.hostname,
+            self.hs.config.signing_key[0],
+        )
+
+        return builder.build()
+
+    @defer.inlineCallbacks
+    @log_function
+    def on_make_leave_request(self, room_id, user_id):
+        """ We've received a /make_leave/ request, so we create a partial
+        join event for the room and return that. We do *not* persist or
+        process it until the other server has signed it and sent it back.
+        """
+        builder = self.event_builder_factory.new({
+            "type": EventTypes.Member,
+            "content": {"membership": Membership.LEAVE},
+            "room_id": room_id,
+            "sender": user_id,
+            "state_key": user_id,
+        })
+
+        event, context = yield self._create_new_client_event(
+            builder=builder,
+        )
+
+        self.auth.check(event, auth_events=context.current_state)
+
+        defer.returnValue(event)
+
+    @defer.inlineCallbacks
+    @log_function
+    def on_send_leave_request(self, origin, pdu):
+        """ We have received a leave event for a room. Fully process it."""
+        event = pdu
+
+        logger.debug(
+            "on_send_leave_request: Got event: %s, signatures: %s",
+            event.event_id,
+            event.signatures,
+        )
+
+        event.internal_metadata.outlier = False
+
+        context, event_stream_id, max_stream_id = yield self._handle_new_event(
+            origin, event
+        )
+
+        logger.debug(
+            "on_send_leave_request: After _handle_new_event: %s, sigs: %s",
+            event.event_id,
+            event.signatures,
+        )
+
+        extra_users = []
+        if event.type == EventTypes.Member:
+            target_user_id = event.state_key
+            target_user = UserID.from_string(target_user_id)
+            extra_users.append(target_user)
+
+        with PreserveLoggingContext():
+            d = self.notifier.on_new_room_event(
+                event, event_stream_id, max_stream_id, extra_users=extra_users
+            )
+
+        def log_failure(f):
+            logger.warn(
+                "Failed to notify about %s: %s",
+                event.event_id, f.value
+            )
+
+        d.addErrback(log_failure)
+
+        new_pdu = event
+
+        destinations = set()
+
+        for k, s in context.current_state.items():
+            try:
+                if k[0] == EventTypes.Member:
+                    if s.content["membership"] == Membership.LEAVE:
+                        destinations.add(
+                            UserID.from_string(s.state_key).domain
+                        )
+            except:
+                logger.warn(
+                    "Failed to get destination from event %s", s.event_id
+                )
+
+        destinations.discard(origin)
+
+        logger.debug(
+            "on_send_leave_request: Sending event: %s, signatures: %s",
+            event.event_id,
+            event.signatures,
+        )
+
+        self.replication_layer.send_pdu(new_pdu, destinations)
+
+        defer.returnValue(None)
+
+    @defer.inlineCallbacks
     def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True):
         yield run_on_reactor()
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index dfeeae76db..0f947993d1 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -146,7 +146,7 @@ class MessageHandler(BaseHandler):
                 "end": next_token.to_string(),
             })
 
-        events = yield self._filter_events_for_client(user_id, room_id, events)
+        events = yield self._filter_events_for_client(user_id, events)
 
         time_now = self.clock.time_msec()
 
@@ -162,52 +162,6 @@ class MessageHandler(BaseHandler):
         defer.returnValue(chunk)
 
     @defer.inlineCallbacks
-    def _filter_events_for_client(self, user_id, room_id, events):
-        event_id_to_state = yield self.store.get_state_for_events(
-            frozenset(e.event_id for e in events),
-            types=(
-                (EventTypes.RoomHistoryVisibility, ""),
-                (EventTypes.Member, user_id),
-            )
-        )
-
-        def allowed(event, state):
-            if event.type == EventTypes.RoomHistoryVisibility:
-                return True
-
-            membership_ev = state.get((EventTypes.Member, user_id), None)
-            if membership_ev:
-                membership = membership_ev.membership
-            else:
-                membership = Membership.LEAVE
-
-            if membership == Membership.JOIN:
-                return True
-
-            history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
-            if history:
-                visibility = history.content.get("history_visibility", "shared")
-            else:
-                visibility = "shared"
-
-            if visibility == "public":
-                return True
-            elif visibility == "shared":
-                return True
-            elif visibility == "joined":
-                return membership == Membership.JOIN
-            elif visibility == "invited":
-                return membership == Membership.INVITE
-
-            return True
-
-        defer.returnValue([
-            event
-            for event in events
-            if allowed(event, event_id_to_state[event.event_id])
-        ])
-
-    @defer.inlineCallbacks
     def create_and_send_event(self, event_dict, ratelimit=True,
                               token_id=None, txn_id=None):
         """ Given a dict from a client, create and handle a new event.
@@ -368,6 +322,8 @@ class MessageHandler(BaseHandler):
             user, pagination_config.get_source_config("receipt"), None
         )
 
+        tags_by_room = yield self.store.get_tags_for_user(user_id)
+
         public_room_ids = yield self.store.get_public_room_ids()
 
         limit = pagin_config.limit
@@ -424,7 +380,7 @@ class MessageHandler(BaseHandler):
                 ).addErrback(unwrapFirstError)
 
                 messages = yield self._filter_events_for_client(
-                    user_id, event.room_id, messages
+                    user_id, messages
                 )
 
                 start_token = now_token.copy_and_replace("room_key", token[0])
@@ -444,6 +400,15 @@ class MessageHandler(BaseHandler):
                     serialize_event(c, time_now, as_client_event)
                     for c in current_state.values()
                 ]
+
+                private_user_data = []
+                tags = tags_by_room.get(event.room_id)
+                if tags:
+                    private_user_data.append({
+                        "type": "m.tag",
+                        "content": {"tags": tags},
+                    })
+                d["private_user_data"] = private_user_data
             except:
                 logger.exception("Failed to get snapshot")
 
@@ -493,6 +458,16 @@ class MessageHandler(BaseHandler):
             result = yield self._room_initial_sync_parted(
                 user_id, room_id, pagin_config, member_event
             )
+
+        private_user_data = []
+        tags = yield self.store.get_tags_for_room(user_id, room_id)
+        if tags:
+            private_user_data.append({
+                "type": "m.tag",
+                "content": {"tags": tags},
+            })
+        result["private_user_data"] = private_user_data
+
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -519,11 +494,11 @@ class MessageHandler(BaseHandler):
         )
 
         messages = yield self._filter_events_for_client(
-            user_id, room_id, messages
+            user_id, messages
         )
 
-        start_token = StreamToken(token[0], 0, 0, 0)
-        end_token = StreamToken(token[1], 0, 0, 0)
+        start_token = StreamToken(token[0], 0, 0, 0, 0)
+        end_token = StreamToken(token[1], 0, 0, 0, 0)
 
         time_now = self.clock.time_msec()
 
@@ -599,7 +574,7 @@ class MessageHandler(BaseHandler):
         ).addErrback(unwrapFirstError)
 
         messages = yield self._filter_events_for_client(
-            user_id, room_id, messages
+            user_id, messages
         )
 
         start_token = now_token.copy_and_replace("room_key", token[0])
diff --git a/synapse/handlers/private_user_data.py b/synapse/handlers/private_user_data.py
new file mode 100644
index 0000000000..1778c71325
--- /dev/null
+++ b/synapse/handlers/private_user_data.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+
+class PrivateUserDataEventSource(object):
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+
+    def get_current_key(self, direction='f'):
+        return self.store.get_max_private_user_data_stream_id()
+
+    @defer.inlineCallbacks
+    def get_new_events_for_user(self, user, from_key, limit):
+        user_id = user.to_string()
+        last_stream_id = from_key
+
+        current_stream_id = yield self.store.get_max_private_user_data_stream_id()
+        tags = yield self.store.get_updated_tags(user_id, last_stream_id)
+
+        results = []
+        for room_id, room_tags in tags.items():
+            results.append({
+                "type": "m.tag",
+                "content": {"tags": room_tags},
+                "room_id": room_id,
+            })
+
+        defer.returnValue((results, current_stream_id))
+
+    @defer.inlineCallbacks
+    def get_pagination_rows(self, user, config, key):
+        defer.returnValue(([], config.to_id))
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 3364a5de14..36878a6c20 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -22,12 +22,18 @@ from synapse.types import UserID, RoomAlias, RoomID
 from synapse.api.constants import (
     EventTypes, Membership, JoinRules, RoomCreationPreset,
 )
-from synapse.api.errors import StoreError, SynapseError
+from synapse.api.errors import AuthError, StoreError, SynapseError
 from synapse.util import stringutils, unwrapFirstError
 from synapse.util.async import run_on_reactor
 
+from signedjson.sign import verify_signed_json
+from signedjson.key import decode_verify_key_bytes
+
 from collections import OrderedDict
+from unpaddedbase64 import decode_base64
+
 import logging
+import math
 import string
 
 logger = logging.getLogger(__name__)
@@ -384,7 +390,22 @@ class RoomMemberHandler(BaseHandler):
         if event.membership == Membership.JOIN:
             yield self._do_join(event, context, do_auth=do_auth)
         else:
-            # This is not a JOIN, so we can handle it normally.
+            if event.membership == Membership.LEAVE:
+                is_host_in_room = yield self.is_host_in_room(room_id, context)
+                if not is_host_in_room:
+                    # Rejecting an invite, rather than leaving a joined room
+                    handler = self.hs.get_handlers().federation_handler
+                    inviter = yield self.get_inviter(event)
+                    if not inviter:
+                        # return the same error as join_room_alias does
+                        raise SynapseError(404, "No known servers")
+                    yield handler.do_remotely_reject_invite(
+                        [inviter.domain],
+                        room_id,
+                        event.user_id
+                    )
+                    defer.returnValue({"room_id": room_id})
+                    return
 
             # FIXME: This isn't idempotency.
             if prev_state and prev_state.membership == event.membership:
@@ -408,7 +429,7 @@ class RoomMemberHandler(BaseHandler):
         defer.returnValue({"room_id": room_id})
 
     @defer.inlineCallbacks
-    def join_room_alias(self, joinee, room_alias, do_auth=True, content={}):
+    def join_room_alias(self, joinee, room_alias, content={}):
         directory_handler = self.hs.get_handlers().directory_handler
         mapping = yield directory_handler.get_association(room_alias)
 
@@ -442,8 +463,6 @@ class RoomMemberHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _do_join(self, event, context, room_hosts=None, do_auth=True):
-        joinee = UserID.from_string(event.state_key)
-        # room_id = RoomID.from_string(event.room_id, self.hs)
         room_id = event.room_id
 
         # XXX: We don't do an auth check if we are doing an invite
@@ -451,41 +470,18 @@ class RoomMemberHandler(BaseHandler):
         # that we are allowed to join when we decide whether or not we
         # need to do the invite/join dance.
 
-        is_host_in_room = yield self.auth.check_host_in_room(
-            event.room_id,
-            self.hs.hostname
-        )
-        if not is_host_in_room:
-            # is *anyone* in the room?
-            room_member_keys = [
-                v for (k, v) in context.current_state.keys() if (
-                    k == "m.room.member"
-                )
-            ]
-            if len(room_member_keys) == 0:
-                # has the room been created so we can join it?
-                create_event = context.current_state.get(("m.room.create", ""))
-                if create_event:
-                    is_host_in_room = True
-
+        is_host_in_room = yield self.is_host_in_room(room_id, context)
         if is_host_in_room:
             should_do_dance = False
         elif room_hosts:  # TODO: Shouldn't this be remote_room_host?
             should_do_dance = True
         else:
-            # TODO(markjh): get prev_state from snapshot
-            prev_state = yield self.store.get_room_member(
-                joinee.to_string(), room_id
-            )
-
-            if prev_state and prev_state.membership == Membership.INVITE:
-                inviter = UserID.from_string(prev_state.user_id)
-
-                should_do_dance = not self.hs.is_mine(inviter)
-                room_hosts = [inviter.domain]
-            else:
+            inviter = yield self.get_inviter(event)
+            if not inviter:
                 # return the same error as join_room_alias does
                 raise SynapseError(404, "No known servers")
+            should_do_dance = not self.hs.is_mine(inviter)
+            room_hosts = [inviter.domain]
 
         if should_do_dance:
             handler = self.hs.get_handlers().federation_handler
@@ -493,8 +489,7 @@ class RoomMemberHandler(BaseHandler):
                 room_hosts,
                 room_id,
                 event.user_id,
-                event.content,  # FIXME To get a non-frozen dict
-                context
+                event.content  # FIXME To get a non-frozen dict
             )
         else:
             logger.debug("Doing normal join")
@@ -512,6 +507,44 @@ class RoomMemberHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
+    def get_inviter(self, event):
+        # TODO(markjh): get prev_state from snapshot
+        prev_state = yield self.store.get_room_member(
+            event.user_id, event.room_id
+        )
+
+        if prev_state and prev_state.membership == Membership.INVITE:
+            defer.returnValue(UserID.from_string(prev_state.user_id))
+            return
+        elif "third_party_invite" in event.content:
+            if "sender" in event.content["third_party_invite"]:
+                inviter = UserID.from_string(
+                    event.content["third_party_invite"]["sender"]
+                )
+                defer.returnValue(inviter)
+        defer.returnValue(None)
+
+    @defer.inlineCallbacks
+    def is_host_in_room(self, room_id, context):
+        is_host_in_room = yield self.auth.check_host_in_room(
+            room_id,
+            self.hs.hostname
+        )
+        if not is_host_in_room:
+            # is *anyone* in the room?
+            room_member_keys = [
+                v for (k, v) in context.current_state.keys() if (
+                    k == "m.room.member"
+                )
+            ]
+            if len(room_member_keys) == 0:
+                # has the room been created so we can join it?
+                create_event = context.current_state.get(("m.room.create", ""))
+                if create_event:
+                    is_host_in_room = True
+        defer.returnValue(is_host_in_room)
+
+    @defer.inlineCallbacks
     def get_joined_rooms_for_user(self, user):
         """Returns a list of roomids that the user has any of the given
         membership states in."""
@@ -540,6 +573,160 @@ class RoomMemberHandler(BaseHandler):
             suppress_auth=(not do_auth),
         )
 
+    @defer.inlineCallbacks
+    def do_3pid_invite(
+            self,
+            room_id,
+            inviter,
+            medium,
+            address,
+            id_server,
+            display_name,
+            token_id,
+            txn_id
+    ):
+        invitee = yield self._lookup_3pid(
+            id_server, medium, address
+        )
+
+        if invitee:
+            # make sure it looks like a user ID; it'll throw if it's invalid.
+            UserID.from_string(invitee)
+            yield self.hs.get_handlers().message_handler.create_and_send_event(
+                {
+                    "type": EventTypes.Member,
+                    "content": {
+                        "membership": unicode("invite")
+                    },
+                    "room_id": room_id,
+                    "sender": inviter.to_string(),
+                    "state_key": invitee,
+                },
+                token_id=token_id,
+                txn_id=txn_id,
+            )
+        else:
+            yield self._make_and_store_3pid_invite(
+                id_server,
+                display_name,
+                medium,
+                address,
+                room_id,
+                inviter,
+                token_id,
+                txn_id=txn_id
+            )
+
+    @defer.inlineCallbacks
+    def _lookup_3pid(self, id_server, medium, address):
+        """Looks up a 3pid in the passed identity server.
+
+        Args:
+            id_server (str): The server name (including port, if required)
+                of the identity server to use.
+            medium (str): The type of the third party identifier (e.g. "email").
+            address (str): The third party identifier (e.g. "foo@example.com").
+
+        Returns:
+            (str) the matrix ID of the 3pid, or None if it is not recognized.
+        """
+        try:
+            data = yield self.hs.get_simple_http_client().get_json(
+                "https://%s/_matrix/identity/api/v1/lookup" % (id_server,),
+                {
+                    "medium": medium,
+                    "address": address,
+                }
+            )
+
+            if "mxid" in data:
+                if "signatures" not in data:
+                    raise AuthError(401, "No signatures on 3pid binding")
+                self.verify_any_signature(data, id_server)
+                defer.returnValue(data["mxid"])
+
+        except IOError as e:
+            logger.warn("Error from identity server lookup: %s" % (e,))
+            defer.returnValue(None)
+
+    @defer.inlineCallbacks
+    def verify_any_signature(self, data, server_hostname):
+        if server_hostname not in data["signatures"]:
+            raise AuthError(401, "No signature from server %s" % (server_hostname,))
+        for key_name, signature in data["signatures"][server_hostname].items():
+            key_data = yield self.hs.get_simple_http_client().get_json(
+                "https://%s/_matrix/identity/api/v1/pubkey/%s" %
+                (server_hostname, key_name,),
+            )
+            if "public_key" not in key_data:
+                raise AuthError(401, "No public key named %s from %s" %
+                                (key_name, server_hostname,))
+            verify_signed_json(
+                data,
+                server_hostname,
+                decode_verify_key_bytes(key_name, decode_base64(key_data["public_key"]))
+            )
+            return
+
+    @defer.inlineCallbacks
+    def _make_and_store_3pid_invite(
+            self,
+            id_server,
+            display_name,
+            medium,
+            address,
+            room_id,
+            user,
+            token_id,
+            txn_id
+    ):
+        token, public_key, key_validity_url = (
+            yield self._ask_id_server_for_third_party_invite(
+                id_server,
+                medium,
+                address,
+                room_id,
+                user.to_string()
+            )
+        )
+        msg_handler = self.hs.get_handlers().message_handler
+        yield msg_handler.create_and_send_event(
+            {
+                "type": EventTypes.ThirdPartyInvite,
+                "content": {
+                    "display_name": display_name,
+                    "key_validity_url": key_validity_url,
+                    "public_key": public_key,
+                },
+                "room_id": room_id,
+                "sender": user.to_string(),
+                "state_key": token,
+            },
+            token_id=token_id,
+            txn_id=txn_id,
+        )
+
+    @defer.inlineCallbacks
+    def _ask_id_server_for_third_party_invite(
+            self, id_server, medium, address, room_id, sender):
+        is_url = "https://%s/_matrix/identity/api/v1/store-invite" % (id_server,)
+        data = yield self.hs.get_simple_http_client().post_urlencoded_get_json(
+            is_url,
+            {
+                "medium": medium,
+                "address": address,
+                "room_id": room_id,
+                "sender": sender,
+            }
+        )
+        # TODO: Check for success
+        token = data["token"]
+        public_key = data["public_key"]
+        key_validity_url = "https://%s/_matrix/identity/api/v1/pubkey/isvalid" % (
+            id_server,
+        )
+        defer.returnValue((token, public_key, key_validity_url))
+
 
 class RoomListHandler(BaseHandler):
 
@@ -561,6 +748,60 @@ class RoomListHandler(BaseHandler):
         defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
 
 
+class RoomContextHandler(BaseHandler):
+    @defer.inlineCallbacks
+    def get_event_context(self, user, room_id, event_id, limit):
+        """Retrieves events, pagination tokens and state around a given event
+        in a room.
+
+        Args:
+            user (UserID)
+            room_id (str)
+            event_id (str)
+            limit (int): The maximum number of events to return in total
+                (excluding state).
+
+        Returns:
+            dict
+        """
+        before_limit = math.floor(limit/2.)
+        after_limit = limit - before_limit
+
+        now_token = yield self.hs.get_event_sources().get_current_token()
+
+        results = yield self.store.get_events_around(
+            room_id, event_id, before_limit, after_limit
+        )
+
+        results["events_before"] = yield self._filter_events_for_client(
+            user.to_string(), results["events_before"]
+        )
+
+        results["events_after"] = yield self._filter_events_for_client(
+            user.to_string(), results["events_after"]
+        )
+
+        if results["events_after"]:
+            last_event_id = results["events_after"][-1].event_id
+        else:
+            last_event_id = event_id
+
+        state = yield self.store.get_state_for_events(
+            [last_event_id], None
+        )
+        results["state"] = state[last_event_id].values()
+
+        results["start"] = now_token.copy_and_replace(
+            "room_key", results["start"]
+        ).to_string()
+
+        results["end"] = now_token.copy_and_replace(
+            "room_key", results["end"]
+        ).to_string()
+
+        defer.returnValue(results)
+
+
 class RoomEventSource(object):
     def __init__(self, hs):
         self.store = hs.get_datastore()
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
new file mode 100644
index 0000000000..2718e9482e
--- /dev/null
+++ b/synapse/handlers/search.py
@@ -0,0 +1,154 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from ._base import BaseHandler
+
+from synapse.api.constants import Membership
+from synapse.api.filtering import Filter
+from synapse.api.errors import SynapseError
+from synapse.events.utils import serialize_event
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class SearchHandler(BaseHandler):
+
+    def __init__(self, hs):
+        super(SearchHandler, self).__init__(hs)
+
+    @defer.inlineCallbacks
+    def search(self, user, content):
+        """Performs a full text search for a user.
+
+        Args:
+            user (UserID)
+            content (dict): Search parameters
+
+        Returns:
+            dict to be returned to the client with results of search
+        """
+
+        try:
+            search_term = content["search_categories"]["room_events"]["search_term"]
+            keys = content["search_categories"]["room_events"].get("keys", [
+                "content.body", "content.name", "content.topic",
+            ])
+            filter_dict = content["search_categories"]["room_events"].get("filter", {})
+            event_context = content["search_categories"]["room_events"].get(
+                "event_context", None
+            )
+
+            if event_context is not None:
+                before_limit = int(event_context.get(
+                    "before_limit", 5
+                ))
+                after_limit = int(event_context.get(
+                    "after_limit", 5
+                ))
+        except KeyError:
+            raise SynapseError(400, "Invalid search query")
+
+        search_filter = Filter(filter_dict)
+
+        # TODO: Search through left rooms too
+        rooms = yield self.store.get_rooms_for_user_where_membership_is(
+            user.to_string(),
+            membership_list=[Membership.JOIN],
+            # membership_list=[Membership.JOIN, Membership.LEAVE, Membership.Ban],
+        )
+        room_ids = set(r.room_id for r in rooms)
+
+        room_ids = search_filter.filter_rooms(room_ids)
+
+        rank_map, event_map, _ = yield self.store.search_msgs(
+            room_ids, search_term, keys
+        )
+
+        filtered_events = search_filter.filter(event_map.values())
+
+        allowed_events = yield self._filter_events_for_client(
+            user.to_string(), filtered_events
+        )
+
+        allowed_events.sort(key=lambda e: -rank_map[e.event_id])
+        allowed_events = allowed_events[:search_filter.limit()]
+
+        if event_context is not None:
+            now_token = yield self.hs.get_event_sources().get_current_token()
+
+            contexts = {}
+            for event in allowed_events:
+                res = yield self.store.get_events_around(
+                    event.room_id, event.event_id, before_limit, after_limit
+                )
+
+                res["events_before"] = yield self._filter_events_for_client(
+                    user.to_string(), res["events_before"]
+                )
+
+                res["events_after"] = yield self._filter_events_for_client(
+                    user.to_string(), res["events_after"]
+                )
+
+                res["start"] = now_token.copy_and_replace(
+                    "room_key", res["start"]
+                ).to_string()
+
+                res["end"] = now_token.copy_and_replace(
+                    "room_key", res["end"]
+                ).to_string()
+
+                contexts[event.event_id] = res
+        else:
+            contexts = {}
+
+        # TODO: Add a limit
+
+        time_now = self.clock.time_msec()
+
+        for context in contexts.values():
+            context["events_before"] = [
+                serialize_event(e, time_now)
+                for e in context["events_before"]
+            ]
+            context["events_after"] = [
+                serialize_event(e, time_now)
+                for e in context["events_after"]
+            ]
+
+        results = {
+            e.event_id: {
+                "rank": rank_map[e.event_id],
+                "result": serialize_event(e, time_now),
+                "context": contexts.get(e.event_id, {}),
+            }
+            for e in allowed_events
+        }
+
+        logger.info("Found %d results", len(results))
+
+        defer.returnValue({
+            "search_categories": {
+                "room_events": {
+                    "results": results,
+                    "count": len(results)
+                }
+            }
+        })
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 21cf50101a..d6527c1ae8 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -51,6 +51,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
     "timeline",
     "state",
     "ephemeral",
+    "private_user_data",
 ])):
     __slots__ = []
 
@@ -58,7 +59,31 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
         """Make the result appear empty if there are no updates. This is used
         to tell if room needs to be part of the sync result.
         """
-        return bool(self.timeline or self.state or self.ephemeral)
+        return bool(
+            self.timeline
+            or self.state
+            or self.ephemeral
+            or self.private_user_data
+        )
+
+
+class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [
+    "room_id",
+    "timeline",
+    "state",
+    "private_user_data",
+])):
+    __slots__ = []
+
+    def __nonzero__(self):
+        """Make the result appear empty if there are no updates. This is used
+        to tell if room needs to be part of the sync result.
+        """
+        return bool(
+            self.timeline
+            or self.state
+            or self.private_user_data
+        )
 
 
 class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
@@ -67,12 +92,17 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
 ])):
     __slots__ = []
 
+    def __nonzero__(self):
+        """Invited rooms should always be reported to the client"""
+        return True
+
 
 class SyncResult(collections.namedtuple("SyncResult", [
     "next_batch",  # Token for the next sync
     "presence",  # List of presence events for the user.
     "joined",  # JoinedSyncResult for each joined room.
     "invited",  # InvitedSyncResult for each invited room.
+    "archived",  # ArchivedSyncResult for each archived room.
 ])):
     __slots__ = []
 
@@ -94,15 +124,20 @@ class SyncHandler(BaseHandler):
         self.clock = hs.get_clock()
 
     @defer.inlineCallbacks
-    def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0):
+    def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
+                               full_state=False):
         """Get the sync for a client if we have new data for it now. Otherwise
         wait for new data to arrive on the server. If the timeout expires, then
         return an empty sync result.
         Returns:
             A Deferred SyncResult.
         """
-        if timeout == 0 or since_token is None:
-            result = yield self.current_sync_for_user(sync_config, since_token)
+
+        if timeout == 0 or since_token is None or full_state:
+            # we are going to return immediately, so don't bother calling
+            # notifier.wait_for_events.
+            result = yield self.current_sync_for_user(sync_config, since_token,
+                                                      full_state=full_state)
             defer.returnValue(result)
         else:
             def current_sync_callback(before_token, after_token):
@@ -127,24 +162,33 @@ class SyncHandler(BaseHandler):
             )
             defer.returnValue(result)
 
-    def current_sync_for_user(self, sync_config, since_token=None):
+    def current_sync_for_user(self, sync_config, since_token=None,
+                              full_state=False):
         """Get the sync for client needed to match what the server has now.
         Returns:
             A Deferred SyncResult.
         """
-        if since_token is None:
-            return self.initial_sync(sync_config)
+        if since_token is None or full_state:
+            return self.full_state_sync(sync_config, since_token)
         else:
             return self.incremental_sync_with_gap(sync_config, since_token)
 
     @defer.inlineCallbacks
-    def initial_sync(self, sync_config):
-        """Get a sync for a client which is starting without any state
+    def full_state_sync(self, sync_config, timeline_since_token):
+        """Get a sync for a client which is starting without any state.
+
+        If a 'message_since_token' is given, only timeline events which have
+        happened since that token will be returned.
+
         Returns:
             A Deferred SyncResult.
         """
         now_token = yield self.event_sources.get_current_token()
 
+        now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+            sync_config, now_token
+        )
+
         presence_stream = self.event_sources.sources["presence"]
         # TODO (mjark): This looks wrong, shouldn't we be getting the presence
         # UP to the present rather than after the present?
@@ -156,15 +200,30 @@ class SyncHandler(BaseHandler):
         )
         room_list = yield self.store.get_rooms_for_user_where_membership_is(
             user_id=sync_config.user.to_string(),
-            membership_list=[Membership.INVITE, Membership.JOIN]
+            membership_list=(
+                Membership.INVITE,
+                Membership.JOIN,
+                Membership.LEAVE,
+                Membership.BAN
+            )
+        )
+
+        tags_by_room = yield self.store.get_tags_for_user(
+            sync_config.user.to_string()
         )
 
         joined = []
         invited = []
+        archived = []
         for event in room_list:
             if event.membership == Membership.JOIN:
-                room_sync = yield self.initial_sync_for_joined_room(
-                    event.room_id, sync_config, now_token,
+                room_sync = yield self.full_state_sync_for_joined_room(
+                    room_id=event.room_id,
+                    sync_config=sync_config,
+                    now_token=now_token,
+                    timeline_since_token=timeline_since_token,
+                    ephemeral_by_room=ephemeral_by_room,
+                    tags_by_room=tags_by_room,
                 )
                 joined.append(room_sync)
             elif event.membership == Membership.INVITE:
@@ -173,23 +232,39 @@ class SyncHandler(BaseHandler):
                     room_id=event.room_id,
                     invite=invite,
                 ))
+            elif event.membership in (Membership.LEAVE, Membership.BAN):
+                leave_token = now_token.copy_and_replace(
+                    "room_key", "s%d" % (event.stream_ordering,)
+                )
+                room_sync = yield self.full_state_sync_for_archived_room(
+                    sync_config=sync_config,
+                    room_id=event.room_id,
+                    leave_event_id=event.event_id,
+                    leave_token=leave_token,
+                    timeline_since_token=timeline_since_token,
+                    tags_by_room=tags_by_room,
+                )
+                archived.append(room_sync)
 
         defer.returnValue(SyncResult(
             presence=presence,
             joined=joined,
             invited=invited,
+            archived=archived,
             next_batch=now_token,
         ))
 
     @defer.inlineCallbacks
-    def initial_sync_for_joined_room(self, room_id, sync_config, now_token):
+    def full_state_sync_for_joined_room(self, room_id, sync_config,
+                                        now_token, timeline_since_token,
+                                        ephemeral_by_room, tags_by_room):
         """Sync a room for a client which is starting without any state
         Returns:
             A Deferred JoinedSyncResult.
         """
 
         batch = yield self.load_filtered_recents(
-            room_id, sync_config, now_token,
+            room_id, sync_config, now_token, since_token=timeline_since_token
         )
 
         current_state = yield self.state_handler.get_current_state(
@@ -201,7 +276,92 @@ class SyncHandler(BaseHandler):
             room_id=room_id,
             timeline=batch,
             state=current_state_events,
-            ephemeral=[],
+            ephemeral=ephemeral_by_room.get(room_id, []),
+            private_user_data=self.private_user_data_for_room(
+                room_id, tags_by_room
+            ),
+        ))
+
+    def private_user_data_for_room(self, room_id, tags_by_room):
+        private_user_data = []
+        tags = tags_by_room.get(room_id)
+        if tags:
+            private_user_data.append({
+                "type": "m.tag",
+                "content": {"tags": tags},
+            })
+        return private_user_data
+
+    @defer.inlineCallbacks
+    def ephemeral_by_room(self, sync_config, now_token, since_token=None):
+        """Get the ephemeral events for each room the user is in
+        Args:
+            sync_config (SyncConfig): The flags, filters and user for the sync.
+            now_token (StreamToken): Where the server is currently up to.
+            since_token (StreamToken): Where the server was when the client
+                last synced.
+        Returns:
+            A tuple of the now StreamToken, updated to reflect the which typing
+            events are included, and a dict mapping from room_id to a list of
+            typing events for that room.
+        """
+
+        typing_key = since_token.typing_key if since_token else "0"
+
+        typing_source = self.event_sources.sources["typing"]
+        typing, typing_key = yield typing_source.get_new_events_for_user(
+            user=sync_config.user,
+            from_key=typing_key,
+            limit=sync_config.filter.ephemeral_limit(),
+        )
+        now_token = now_token.copy_and_replace("typing_key", typing_key)
+
+        ephemeral_by_room = {}
+
+        for event in typing:
+            room_id = event.pop("room_id")
+            ephemeral_by_room.setdefault(room_id, []).append(event)
+
+        receipt_key = since_token.receipt_key if since_token else "0"
+
+        receipt_source = self.event_sources.sources["receipt"]
+        receipts, receipt_key = yield receipt_source.get_new_events_for_user(
+            user=sync_config.user,
+            from_key=receipt_key,
+            limit=sync_config.filter.ephemeral_limit(),
+        )
+        now_token = now_token.copy_and_replace("receipt_key", receipt_key)
+
+        for event in receipts:
+            room_id = event.pop("room_id")
+            ephemeral_by_room.setdefault(room_id, []).append(event)
+
+        defer.returnValue((now_token, ephemeral_by_room))
+
+    @defer.inlineCallbacks
+    def full_state_sync_for_archived_room(self, room_id, sync_config,
+                                          leave_event_id, leave_token,
+                                          timeline_since_token, tags_by_room):
+        """Sync a room for a client which is starting without any state
+        Returns:
+            A Deferred JoinedSyncResult.
+        """
+
+        batch = yield self.load_filtered_recents(
+            room_id, sync_config, leave_token, since_token=timeline_since_token
+        )
+
+        leave_state = yield self.store.get_state_for_events(
+            [leave_event_id], None
+        )
+
+        defer.returnValue(ArchivedSyncResult(
+            room_id=room_id,
+            timeline=batch,
+            state=leave_state[leave_event_id].values(),
+            private_user_data=self.private_user_data_for_room(
+                room_id, tags_by_room
+            ),
         ))
 
     @defer.inlineCallbacks
@@ -221,18 +381,9 @@ class SyncHandler(BaseHandler):
         )
         now_token = now_token.copy_and_replace("presence_key", presence_key)
 
-        typing_source = self.event_sources.sources["typing"]
-        typing, typing_key = yield typing_source.get_new_events_for_user(
-            user=sync_config.user,
-            from_key=since_token.typing_key,
-            limit=sync_config.filter.ephemeral_limit(),
+        now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+            sync_config, now_token, since_token
         )
-        now_token = now_token.copy_and_replace("typing_key", typing_key)
-
-        typing_by_room = {event["room_id"]: [event] for event in typing}
-        for event in typing:
-            event.pop("room_id")
-        logger.debug("Typing %r", typing_by_room)
 
         rm_handler = self.hs.get_handlers().room_member_handler
         app_service = yield self.store.get_app_service_by_user_id(
@@ -256,19 +407,28 @@ class SyncHandler(BaseHandler):
             limit=timeline_limit + 1,
         )
 
+        tags_by_room = yield self.store.get_updated_tags(
+            sync_config.user.to_string(),
+            since_token.private_user_data_key,
+        )
+
         joined = []
+        archived = []
         if len(room_events) <= timeline_limit:
             # There is no gap in any of the rooms. Therefore we can just
             # partition the new events by room and return them.
             invite_events = []
+            leave_events = []
             events_by_room_id = {}
             for event in room_events:
                 events_by_room_id.setdefault(event.room_id, []).append(event)
                 if event.room_id not in joined_room_ids:
                     if (event.type == EventTypes.Member
-                            and event.membership == Membership.INVITE
                             and event.state_key == sync_config.user.to_string()):
-                        invite_events.append(event)
+                        if event.membership == Membership.INVITE:
+                            invite_events.append(event)
+                        elif event.membership in (Membership.LEAVE, Membership.BAN):
+                            leave_events.append(event)
 
             for room_id in joined_room_ids:
                 recents = events_by_room_id.get(room_id, [])
@@ -280,7 +440,7 @@ class SyncHandler(BaseHandler):
                 else:
                     prev_batch = now_token
 
-                state = yield self.check_joined_room(
+                state, limited = yield self.check_joined_room(
                     sync_config, room_id, state
                 )
 
@@ -289,26 +449,40 @@ class SyncHandler(BaseHandler):
                     timeline=TimelineBatch(
                         events=recents,
                         prev_batch=prev_batch,
-                        limited=False,
+                        limited=limited,
                     ),
                     state=state,
-                    ephemeral=typing_by_room.get(room_id, [])
+                    ephemeral=ephemeral_by_room.get(room_id, []),
+                    private_user_data=self.private_user_data_for_room(
+                        room_id, tags_by_room
+                    ),
                 )
                 if room_sync:
                     joined.append(room_sync)
+
         else:
             invite_events = yield self.store.get_invites_for_user(
                 sync_config.user.to_string()
             )
 
+            leave_events = yield self.store.get_leave_and_ban_events_for_user(
+                sync_config.user.to_string()
+            )
+
             for room_id in joined_room_ids:
                 room_sync = yield self.incremental_sync_with_gap_for_room(
                     room_id, sync_config, since_token, now_token,
-                    typing_by_room
+                    ephemeral_by_room, tags_by_room
                 )
                 if room_sync:
                     joined.append(room_sync)
 
+        for leave_event in leave_events:
+            room_sync = yield self.incremental_sync_for_archived_room(
+                sync_config, leave_event, since_token, tags_by_room
+            )
+            archived.append(room_sync)
+
         invited = [
             InvitedSyncResult(room_id=event.room_id, invite=event)
             for event in invite_events
@@ -318,56 +492,11 @@ class SyncHandler(BaseHandler):
             presence=presence,
             joined=joined,
             invited=invited,
+            archived=archived,
             next_batch=now_token,
         ))
 
     @defer.inlineCallbacks
-    def _filter_events_for_client(self, user_id, room_id, events):
-        event_id_to_state = yield self.store.get_state_for_events(
-            frozenset(e.event_id for e in events),
-            types=(
-                (EventTypes.RoomHistoryVisibility, ""),
-                (EventTypes.Member, user_id),
-            )
-        )
-
-        def allowed(event, state):
-            if event.type == EventTypes.RoomHistoryVisibility:
-                return True
-
-            membership_ev = state.get((EventTypes.Member, user_id), None)
-            if membership_ev:
-                membership = membership_ev.membership
-            else:
-                membership = Membership.LEAVE
-
-            if membership == Membership.JOIN:
-                return True
-
-            history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
-            if history:
-                visibility = history.content.get("history_visibility", "shared")
-            else:
-                visibility = "shared"
-
-            if visibility == "public":
-                return True
-            elif visibility == "shared":
-                return True
-            elif visibility == "joined":
-                return membership == Membership.JOIN
-            elif visibility == "invited":
-                return membership == Membership.INVITE
-
-            return True
-
-        defer.returnValue([
-            event
-            for event in events
-            if allowed(event, event_id_to_state[event.event_id])
-        ])
-
-    @defer.inlineCallbacks
     def load_filtered_recents(self, room_id, sync_config, now_token,
                               since_token=None):
         limited = True
@@ -390,7 +519,7 @@ class SyncHandler(BaseHandler):
             end_key = "s" + room_key.split('-')[-1]
             loaded_recents = sync_config.filter.filter_room_timeline(events)
             loaded_recents = yield self._filter_events_for_client(
-                sync_config.user.to_string(), room_id, loaded_recents,
+                sync_config.user.to_string(), loaded_recents,
             )
             loaded_recents.extend(recents)
             recents = loaded_recents
@@ -414,7 +543,7 @@ class SyncHandler(BaseHandler):
     @defer.inlineCallbacks
     def incremental_sync_with_gap_for_room(self, room_id, sync_config,
                                            since_token, now_token,
-                                           typing_by_room):
+                                           ephemeral_by_room, tags_by_room):
         """ Get the incremental delta needed to bring the client up to date for
         the room. Gives the client the most recent events and the changes to
         state.
@@ -447,7 +576,7 @@ class SyncHandler(BaseHandler):
             current_state=current_state_events,
         )
 
-        state_events_delta = yield self.check_joined_room(
+        state_events_delta, _ = yield self.check_joined_room(
             sync_config, room_id, state_events_delta
         )
 
@@ -455,7 +584,62 @@ class SyncHandler(BaseHandler):
             room_id=room_id,
             timeline=batch,
             state=state_events_delta,
-            ephemeral=typing_by_room.get(room_id, [])
+            ephemeral=ephemeral_by_room.get(room_id, []),
+            private_user_data=self.private_user_data_for_room(
+                room_id, tags_by_room
+            ),
+        )
+
+        logging.debug("Room sync: %r", room_sync)
+
+        defer.returnValue(room_sync)
+
+    @defer.inlineCallbacks
+    def incremental_sync_for_archived_room(self, sync_config, leave_event,
+                                           since_token, tags_by_room):
+        """ Get the incremental delta needed to bring the client up to date for
+        the archived room.
+        Returns:
+            A Deferred ArchivedSyncResult
+        """
+
+        stream_token = yield self.store.get_stream_token_for_event(
+            leave_event.event_id
+        )
+
+        leave_token = since_token.copy_and_replace("room_key", stream_token)
+
+        batch = yield self.load_filtered_recents(
+            leave_event.room_id, sync_config, leave_token, since_token,
+        )
+
+        logging.debug("Recents %r", batch)
+
+        # TODO(mjark): This seems racy since this isn't being passed a
+        # token to indicate what point in the stream this is
+        leave_state = yield self.store.get_state_for_events(
+            [leave_event.event_id], None
+        )
+
+        state_events_at_leave = leave_state[leave_event.event_id].values()
+
+        state_at_previous_sync = yield self.get_state_at_previous_sync(
+            leave_event.room_id, since_token=since_token
+        )
+
+        state_events_delta = yield self.compute_state_delta(
+            since_token=since_token,
+            previous_state=state_at_previous_sync,
+            current_state=state_events_at_leave,
+        )
+
+        room_sync = ArchivedSyncResult(
+            room_id=leave_event.room_id,
+            timeline=batch,
+            state=state_events_delta,
+            private_user_data=self.private_user_data_for_room(
+                leave_event.room_id, tags_by_room
+            ),
         )
 
         logging.debug("Room sync: %r", room_sync)
@@ -505,6 +689,7 @@ class SyncHandler(BaseHandler):
     @defer.inlineCallbacks
     def check_joined_room(self, sync_config, room_id, state_delta):
         joined = False
+        limited = False
         for event in state_delta:
             if (
                 event.type == EventTypes.Member
@@ -516,5 +701,6 @@ class SyncHandler(BaseHandler):
         if joined:
             res = yield self.state_handler.get_current_state(room_id)
             state_delta = res.values()
+            limited = True
 
-        defer.returnValue(state_delta)
+        defer.returnValue((state_delta, limited))
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 9a5869abee..27e5190224 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -24,7 +24,6 @@ from canonicaljson import encode_canonical_json
 from twisted.internet import defer, reactor, ssl
 from twisted.web.client import (
     Agent, readBody, FileBodyProducer, PartialDownloadError,
-    HTTPConnectionPool,
 )
 from twisted.web.http_headers import Headers
 
@@ -59,11 +58,8 @@ class SimpleHttpClient(object):
         # The default context factory in Twisted 14.0.0 (which we require) is
         # BrowserLikePolicyForHTTPS which will do regular cert validation
         # 'like a browser'
-        pool = HTTPConnectionPool(reactor)
-        pool.maxPersistentPerHost = 10
         self.agent = Agent(
             reactor,
-            pool=pool,
             connectTimeout=15,
             contextFactory=hs.get_http_client_context_factory()
         )
diff --git a/synapse/notifier.py b/synapse/notifier.py
index f998fc83bf..a78ee3c1e7 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -270,7 +270,7 @@ class Notifier(object):
 
     @defer.inlineCallbacks
     def wait_for_events(self, user, rooms, timeout, callback,
-                        from_token=StreamToken("s0", "0", "0", "0")):
+                        from_token=StreamToken("s0", "0", "0", "0", "0")):
         """Wait until the callback returns a non empty response or the
         timeout fires.
         """
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 00ec8fcd74..4ea06c1434 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -107,6 +107,8 @@ class LoginRestServlet(ClientV1RestServlet):
             user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
                 login_submission['medium'], login_submission['address']
             )
+            if not user_id:
+                raise LoginError(403, "", errcode=Codes.FORBIDDEN)
         else:
             user_id = login_submission['user']
 
@@ -198,36 +200,6 @@ class LoginRestServlet(ClientV1RestServlet):
         return (user, attributes)
 
 
-class LoginFallbackRestServlet(ClientV1RestServlet):
-    PATTERN = client_path_pattern("/login/fallback$")
-
-    def on_GET(self, request):
-        # TODO(kegan): This should be returning some HTML which is capable of
-        # hitting LoginRestServlet
-        return (200, {})
-
-
-class PasswordResetRestServlet(ClientV1RestServlet):
-    PATTERN = client_path_pattern("/login/reset")
-
-    @defer.inlineCallbacks
-    def on_POST(self, request):
-        reset_info = _parse_json(request)
-        try:
-            email = reset_info["email"]
-            user_id = reset_info["user_id"]
-            handler = self.handlers.login_handler
-            yield handler.reset_password(user_id, email)
-            # purposefully give no feedback to avoid people hammering different
-            # combinations.
-            defer.returnValue((200, {}))
-        except KeyError:
-            raise SynapseError(
-                400,
-                "Missing keys. Requires 'email' and 'user_id'."
-            )
-
-
 class SAML2RestServlet(ClientV1RestServlet):
     PATTERN = client_path_pattern("/login/saml2")
 
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 23871f161e..2dcaee86cd 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -26,7 +26,7 @@ from synapse.events.utils import serialize_event
 import simplejson as json
 import logging
 import urllib
-
+from synapse.util import third_party_invites
 
 logger = logging.getLogger(__name__)
 
@@ -397,6 +397,41 @@ class RoomTriggerBackfill(ClientV1RestServlet):
         defer.returnValue((200, res))
 
 
+class RoomEventContext(ClientV1RestServlet):
+    PATTERN = client_path_pattern(
+        "/rooms/(?P<room_id>[^/]*)/context/(?P<event_id>[^/]*)$"
+    )
+
+    def __init__(self, hs):
+        super(RoomEventContext, self).__init__(hs)
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, room_id, event_id):
+        user, _ = yield self.auth.get_user_by_req(request)
+
+        limit = int(request.args.get("limit", [10])[0])
+
+        results = yield self.handlers.room_context_handler.get_event_context(
+            user, room_id, event_id, limit,
+        )
+
+        time_now = self.clock.time_msec()
+        results["events_before"] = [
+            serialize_event(event, time_now) for event in results["events_before"]
+        ]
+        results["events_after"] = [
+            serialize_event(event, time_now) for event in results["events_after"]
+        ]
+        results["state"] = [
+            serialize_event(event, time_now) for event in results["state"]
+        ]
+
+        logger.info("Responding with %r", results)
+
+        defer.returnValue((200, results))
+
+
 # TODO: Needs unit testing
 class RoomMembershipRestServlet(ClientV1RestServlet):
 
@@ -414,10 +449,26 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
 
         # target user is you unless it is an invite
         state_key = user.to_string()
-        if membership_action in ["invite", "ban", "kick"]:
-            if "user_id" not in content:
+
+        if membership_action == "invite" and third_party_invites.has_invite_keys(content):
+            yield self.handlers.room_member_handler.do_3pid_invite(
+                room_id,
+                user,
+                content["medium"],
+                content["address"],
+                content["id_server"],
+                content["display_name"],
+                token_id,
+                txn_id
+            )
+            defer.returnValue((200, {}))
+            return
+        elif membership_action in ["invite", "ban", "kick"]:
+            if "user_id" in content:
+                state_key = content["user_id"]
+            else:
                 raise SynapseError(400, "Missing user_id key.")
-            state_key = content["user_id"]
+
             # make sure it looks like a user ID; it'll throw if it's invalid.
             UserID.from_string(state_key)
 
@@ -425,10 +476,20 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
                 membership_action = "leave"
 
         msg_handler = self.handlers.message_handler
+
+        event_content = {
+            "membership": unicode(membership_action),
+        }
+
+        if membership_action == "join" and third_party_invites.has_join_keys(content):
+            event_content["third_party_invite"] = (
+                third_party_invites.extract_join_keys(content)
+            )
+
         yield msg_handler.create_and_send_event(
             {
                 "type": EventTypes.Member,
-                "content": {"membership": unicode(membership_action)},
+                "content": event_content,
                 "room_id": room_id,
                 "sender": user.to_string(),
                 "state_key": state_key,
@@ -529,6 +590,22 @@ class RoomTypingRestServlet(ClientV1RestServlet):
         defer.returnValue((200, {}))
 
 
+class SearchRestServlet(ClientV1RestServlet):
+    PATTERN = client_path_pattern(
+        "/search$"
+    )
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        auth_user, _ = yield self.auth.get_user_by_req(request)
+
+        content = _parse_json(request)
+
+        results = yield self.handlers.search_handler.search(auth_user, content)
+
+        defer.returnValue((200, results))
+
+
 def _parse_json(request):
     try:
         content = json.loads(request.content.read())
@@ -585,3 +662,5 @@ def register_servlets(hs, http_server):
     RoomInitialSyncRestServlet(hs).register(http_server)
     RoomRedactEventRestServlet(hs).register(http_server)
     RoomTypingRestServlet(hs).register(http_server)
+    SearchRestServlet(hs).register(http_server)
+    RoomEventContext(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/__init__.py b/synapse/rest/client/v2_alpha/__init__.py
index 5831ff0e62..a108132346 100644
--- a/synapse/rest/client/v2_alpha/__init__.py
+++ b/synapse/rest/client/v2_alpha/__init__.py
@@ -22,6 +22,7 @@ from . import (
     receipts,
     keys,
     tokenrefresh,
+    tags,
 )
 
 from synapse.http.server import JsonResource
@@ -44,3 +45,4 @@ class ClientV2AlphaRestResource(JsonResource):
         receipts.register_servlets(hs, client_resource)
         keys.register_servlets(hs, client_resource)
         tokenrefresh.register_servlets(hs, client_resource)
+        tags.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index fffecb24f5..32a1087c91 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -16,14 +16,14 @@
 from twisted.internet import defer
 
 from synapse.http.servlet import (
-    RestServlet, parse_string, parse_integer
+    RestServlet, parse_string, parse_integer, parse_boolean
 )
 from synapse.handlers.sync import SyncConfig
 from synapse.types import StreamToken
 from synapse.events.utils import (
     serialize_event, format_event_for_client_v2_without_event_id,
 )
-from synapse.api.filtering import Filter
+from synapse.api.filtering import FilterCollection
 from ._base import client_v2_pattern
 
 import copy
@@ -90,6 +90,7 @@ class SyncRestServlet(RestServlet):
             allowed_values=self.ALLOWED_PRESENCE
         )
         filter_id = parse_string(request, "filter", default=None)
+        full_state = parse_boolean(request, "full_state", default=False)
 
         logger.info(
             "/sync: user=%r, timeout=%r, since=%r,"
@@ -103,7 +104,7 @@ class SyncRestServlet(RestServlet):
                 user.localpart, filter_id
             )
         except:
-            filter = Filter({})
+            filter = FilterCollection({})
 
         sync_config = SyncConfig(
             user=user,
@@ -120,7 +121,8 @@ class SyncRestServlet(RestServlet):
 
         try:
             sync_result = yield self.sync_handler.wait_for_sync_for_user(
-                sync_config, since_token=since_token, timeout=timeout
+                sync_config, since_token=since_token, timeout=timeout,
+                full_state=full_state
             )
         finally:
             if set_presence == "online":
@@ -136,6 +138,10 @@ class SyncRestServlet(RestServlet):
             sync_result.invited, filter, time_now, token_id
         )
 
+        archived = self.encode_archived(
+            sync_result.archived, filter, time_now, token_id
+        )
+
         response_content = {
             "presence": self.encode_presence(
                 sync_result.presence, filter, time_now
@@ -143,7 +149,7 @@ class SyncRestServlet(RestServlet):
             "rooms": {
                 "joined": joined,
                 "invited": invited,
-                "archived": {},
+                "archived": archived,
             },
             "next_batch": sync_result.next_batch.to_string(),
         }
@@ -182,14 +188,20 @@ class SyncRestServlet(RestServlet):
 
         return invited
 
+    def encode_archived(self, rooms, filter, time_now, token_id):
+        joined = {}
+        for room in rooms:
+            joined[room.room_id] = self.encode_room(
+                room, filter, time_now, token_id, joined=False
+            )
+
+        return joined
+
     @staticmethod
-    def encode_room(room, filter, time_now, token_id):
+    def encode_room(room, filter, time_now, token_id, joined=True):
         event_map = {}
         state_events = filter.filter_room_state(room.state)
-        timeline_events = filter.filter_room_timeline(room.timeline.events)
-        ephemeral_events = filter.filter_room_ephemeral(room.ephemeral)
         state_event_ids = []
-        timeline_event_ids = []
         for event in state_events:
             # TODO(mjark): Respect formatting requirements in the filter.
             event_map[event.event_id] = serialize_event(
@@ -198,6 +210,8 @@ class SyncRestServlet(RestServlet):
             )
             state_event_ids.append(event.event_id)
 
+        timeline_events = filter.filter_room_timeline(room.timeline.events)
+        timeline_event_ids = []
         for event in timeline_events:
             # TODO(mjark): Respect formatting requirements in the filter.
             event_map[event.event_id] = serialize_event(
@@ -205,6 +219,11 @@ class SyncRestServlet(RestServlet):
                 event_format=format_event_for_client_v2_without_event_id,
             )
             timeline_event_ids.append(event.event_id)
+
+        private_user_data = filter.filter_room_private_user_data(
+            room.private_user_data
+        )
+
         result = {
             "event_map": event_map,
             "timeline": {
@@ -213,8 +232,13 @@ class SyncRestServlet(RestServlet):
                 "limited": room.timeline.limited,
             },
             "state": {"events": state_event_ids},
-            "ephemeral": {"events": ephemeral_events},
+            "private_user_data": {"events": private_user_data},
         }
+
+        if joined:
+            ephemeral_events = filter.filter_room_ephemeral(room.ephemeral)
+            result["ephemeral"] = {"events": ephemeral_events}
+
         return result
 
 
diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py
new file mode 100644
index 0000000000..dcfe6bd20e
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/tags.py
@@ -0,0 +1,106 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import client_v2_pattern
+
+from synapse.http.servlet import RestServlet
+from synapse.api.errors import AuthError, SynapseError
+
+from twisted.internet import defer
+
+import logging
+
+import simplejson as json
+
+logger = logging.getLogger(__name__)
+
+
+class TagListServlet(RestServlet):
+    """
+    GET /user/{user_id}/rooms/{room_id}/tags HTTP/1.1
+    """
+    PATTERN = client_v2_pattern(
+        "/user/(?P<user_id>[^/]*)/rooms/(?P<room_id>[^/]*)/tags"
+    )
+
+    def __init__(self, hs):
+        super(TagListServlet, self).__init__()
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, user_id, room_id):
+        auth_user, _ = yield self.auth.get_user_by_req(request)
+        if user_id != auth_user.to_string():
+            raise AuthError(403, "Cannot get tags for other users.")
+
+        tags = yield self.store.get_tags_for_room(user_id, room_id)
+
+        defer.returnValue((200, {"tags": tags}))
+
+
+class TagServlet(RestServlet):
+    """
+    PUT /user/{user_id}/rooms/{room_id}/tags/{tag} HTTP/1.1
+    DELETE /user/{user_id}/rooms/{room_id}/tags/{tag} HTTP/1.1
+    """
+    PATTERN = client_v2_pattern(
+        "/user/(?P<user_id>[^/]*)/rooms/(?P<room_id>[^/]*)/tags/(?P<tag>[^/]*)"
+    )
+
+    def __init__(self, hs):
+        super(TagServlet, self).__init__()
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
+        self.notifier = hs.get_notifier()
+
+    @defer.inlineCallbacks
+    def on_PUT(self, request, user_id, room_id, tag):
+        auth_user, _ = yield self.auth.get_user_by_req(request)
+        if user_id != auth_user.to_string():
+            raise AuthError(403, "Cannot add tags for other users.")
+
+        try:
+            content_bytes = request.content.read()
+            body = json.loads(content_bytes)
+        except:
+            raise SynapseError(400, "Invalid tag JSON")
+
+        max_id = yield self.store.add_tag_to_room(user_id, room_id, tag, body)
+
+        yield self.notifier.on_new_event(
+            "private_user_data_key", max_id, users=[user_id]
+        )
+
+        defer.returnValue((200, {}))
+
+    @defer.inlineCallbacks
+    def on_DELETE(self, request, user_id, room_id, tag):
+        auth_user, _ = yield self.auth.get_user_by_req(request)
+        if user_id != auth_user.to_string():
+            raise AuthError(403, "Cannot add tags for other users.")
+
+        max_id = yield self.store.remove_tag_from_room(user_id, room_id, tag)
+
+        yield self.notifier.on_new_event(
+            "private_user_data_key", max_id, users=[user_id]
+        )
+
+        defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+    TagListServlet(hs).register(http_server)
+    TagServlet(hs).register(http_server)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 48a0633746..e7443f2838 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -40,6 +40,8 @@ from .filtering import FilteringStore
 from .end_to_end_keys import EndToEndKeyStore
 
 from .receipts import ReceiptsStore
+from .search import SearchStore
+from .tags import TagsStore
 
 
 import logging
@@ -69,6 +71,8 @@ class DataStore(RoomMemberStore, RoomStore,
                 EventsStore,
                 ReceiptsStore,
                 EndToEndKeyStore,
+                SearchStore,
+                TagsStore,
                 ):
 
     def __init__(self, hs):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 693784ad38..218e708054 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -519,7 +519,7 @@ class SQLBaseStore(object):
                                   allow_none=False,
                                   desc="_simple_select_one_onecol"):
         """Executes a SELECT query on the named table, which is expected to
-        return a single row, returning a single column from it."
+        return a single row, returning a single column from it.
 
         Args:
             table : string giving the table name
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index bad3b5c5ac..a5a54ec011 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -17,6 +17,8 @@ from synapse.storage.prepare_database import (
     prepare_database, prepare_sqlite3_database
 )
 
+import struct
+
 
 class Sqlite3Engine(object):
     single_threaded = True
@@ -32,6 +34,7 @@ class Sqlite3Engine(object):
 
     def on_new_connection(self, db_conn):
         self.prepare_database(db_conn)
+        db_conn.create_function("rank", 1, _rank)
 
     def prepare_database(self, db_conn):
         prepare_sqlite3_database(db_conn)
@@ -45,3 +48,27 @@ class Sqlite3Engine(object):
 
     def lock_table(self, txn, table):
         return
+
+
+# Following functions taken from: https://github.com/coleifer/peewee
+
+def _parse_match_info(buf):
+    bufsize = len(buf)
+    return [struct.unpack('@I', buf[i:i+4])[0] for i in range(0, bufsize, 4)]
+
+
+def _rank(raw_match_info):
+    """Handle match_info called w/default args 'pcx' - based on the example rank
+    function http://sqlite.org/fts3.html#appendix_a
+    """
+    match_info = _parse_match_info(raw_match_info)
+    score = 0.0
+    p, c = match_info[:2]
+    for phrase_num in range(p):
+        phrase_info_idx = 2 + (phrase_num * c * 3)
+        for col_num in range(c):
+            col_idx = phrase_info_idx + (col_num * 3)
+            x1, x2 = match_info[col_idx:col_idx + 2]
+            if x1 > 0:
+                score += float(x1) / x2
+    return score
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 416ef6af93..e6c1abfc27 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -307,6 +307,8 @@ class EventsStore(SQLBaseStore):
                 self._store_room_name_txn(txn, event)
             elif event.type == EventTypes.Topic:
                 self._store_room_topic_txn(txn, event)
+            elif event.type == EventTypes.Message:
+                self._store_room_message_txn(txn, event)
             elif event.type == EventTypes.Redaction:
                 self._store_redaction(txn, event)
 
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 8800116570..fcd43c7fdd 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -34,10 +34,10 @@ class FilteringStore(SQLBaseStore):
             desc="get_user_filter",
         )
 
-        defer.returnValue(json.loads(def_json))
+        defer.returnValue(json.loads(str(def_json).decode("utf-8")))
 
     def add_user_filter(self, user_localpart, user_filter):
-        def_json = json.dumps(user_filter)
+        def_json = json.dumps(user_filter).encode("utf-8")
 
         # Need an atomic transaction to SELECT the maximal ID so far then
         # INSERT a new one
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 1ddf55be4d..1a74d6e360 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 24
+SCHEMA_VERSION = 25
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 5e07b7e0e5..13441fcdce 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -19,6 +19,7 @@ from synapse.api.errors import StoreError
 
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks
+from .engines import PostgresEngine, Sqlite3Engine
 
 import collections
 import logging
@@ -175,6 +176,10 @@ class RoomStore(SQLBaseStore):
                 },
             )
 
+            self._store_event_search_txn(
+                txn, event, "content.topic", event.content["topic"]
+            )
+
     def _store_room_name_txn(self, txn, event):
         if hasattr(event, "content") and "name" in event.content:
             self._simple_insert_txn(
@@ -187,6 +192,33 @@ class RoomStore(SQLBaseStore):
                 }
             )
 
+            self._store_event_search_txn(
+                txn, event, "content.name", event.content["name"]
+            )
+
+    def _store_room_message_txn(self, txn, event):
+        if hasattr(event, "content") and "body" in event.content:
+            self._store_event_search_txn(
+                txn, event, "content.body", event.content["body"]
+            )
+
+    def _store_event_search_txn(self, txn, event, key, value):
+        if isinstance(self.database_engine, PostgresEngine):
+            sql = (
+                "INSERT INTO event_search (event_id, room_id, key, vector)"
+                " VALUES (?,?,?,to_tsvector('english', ?))"
+            )
+        elif isinstance(self.database_engine, Sqlite3Engine):
+            sql = (
+                "INSERT INTO event_search (event_id, room_id, key, value)"
+                " VALUES (?,?,?,?)"
+            )
+        else:
+            # This should be unreachable.
+            raise Exception("Unrecognized database engine")
+
+        txn.execute(sql, (event.event_id, event.room_id, key, value,))
+
     @cachedInlineCallbacks()
     def get_room_name_and_aliases(self, room_id):
         def f(txn):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index dd98dcfda8..ae1ad56d9a 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -124,6 +124,19 @@ class RoomMemberStore(SQLBaseStore):
             invites.event_id for invite in invites
         ]))
 
+    def get_leave_and_ban_events_for_user(self, user_id):
+        """ Get all the leave events for a user
+        Args:
+            user_id (str): The user ID.
+        Returns:
+            A deferred list of event objects.
+        """
+        return self.get_rooms_for_user_where_membership_is(
+            user_id, (Membership.LEAVE, Membership.BAN)
+        ).addCallback(lambda leaves: self._get_events([
+            leave.event_id for leave in leaves
+        ]))
+
     def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
         """ Get all the rooms for this user where the membership for this user
         matches one in the membership list.
diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
new file mode 100644
index 0000000000..b7cd0ce3b8
--- /dev/null
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -0,0 +1,127 @@
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.storage.prepare_database import get_statements
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+
+import ujson
+
+logger = logging.getLogger(__name__)
+
+
+POSTGRES_SQL = """
+CREATE TABLE IF NOT EXISTS event_search (
+    event_id TEXT,
+    room_id TEXT,
+    sender TEXT,
+    key TEXT,
+    vector tsvector
+);
+
+INSERT INTO event_search SELECT
+    event_id, room_id, json::json->>'sender', 'content.body',
+    to_tsvector('english', json::json->'content'->>'body')
+    FROM events NATURAL JOIN event_json WHERE type = 'm.room.message';
+
+INSERT INTO event_search SELECT
+    event_id, room_id, json::json->>'sender', 'content.name',
+    to_tsvector('english', json::json->'content'->>'name')
+    FROM events NATURAL JOIN event_json WHERE type = 'm.room.name';
+
+INSERT INTO event_search SELECT
+    event_id, room_id, json::json->>'sender', 'content.topic',
+    to_tsvector('english', json::json->'content'->>'topic')
+    FROM events NATURAL JOIN event_json WHERE type = 'm.room.topic';
+
+
+CREATE INDEX event_search_fts_idx ON event_search USING gin(vector);
+CREATE INDEX event_search_ev_idx ON event_search(event_id);
+CREATE INDEX event_search_ev_ridx ON event_search(room_id);
+"""
+
+
+SQLITE_TABLE = (
+    "CREATE VIRTUAL TABLE IF NOT EXISTS event_search"
+    " USING fts4 ( event_id, room_id, sender, key, value )"
+)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    if isinstance(database_engine, PostgresEngine):
+        run_postgres_upgrade(cur)
+        return
+
+    if isinstance(database_engine, Sqlite3Engine):
+        run_sqlite_upgrade(cur)
+        return
+
+
+def run_postgres_upgrade(cur):
+    for statement in get_statements(POSTGRES_SQL.splitlines()):
+        cur.execute(statement)
+
+
+def run_sqlite_upgrade(cur):
+        cur.execute(SQLITE_TABLE)
+
+        rowid = -1
+        while True:
+            cur.execute(
+                "SELECT rowid, json FROM event_json"
+                " WHERE rowid > ?"
+                " ORDER BY rowid ASC LIMIT 100",
+                (rowid,)
+            )
+
+            res = cur.fetchall()
+
+            if not res:
+                break
+
+            events = [
+                ujson.loads(js)
+                for _, js in res
+            ]
+
+            rowid = max(rid for rid, _ in res)
+
+            rows = []
+            for ev in events:
+                content = ev.get("content", {})
+                body = content.get("body", None)
+                name = content.get("name", None)
+                topic = content.get("topic", None)
+                sender = ev.get("sender", None)
+                if ev["type"] == "m.room.message" and body:
+                    rows.append((
+                        ev["event_id"], ev["room_id"], sender, "content.body", body
+                    ))
+                if ev["type"] == "m.room.name" and name:
+                    rows.append((
+                        ev["event_id"], ev["room_id"], sender, "content.name", name
+                    ))
+                if ev["type"] == "m.room.topic" and topic:
+                    rows.append((
+                        ev["event_id"], ev["room_id"], sender, "content.topic", topic
+                    ))
+
+            if rows:
+                logger.info(rows)
+                cur.executemany(
+                    "INSERT INTO event_search (event_id, room_id, sender, key, value)"
+                    " VALUES (?,?,?,?,?)",
+                    rows
+                )
diff --git a/synapse/storage/schema/delta/25/tags.sql b/synapse/storage/schema/delta/25/tags.sql
new file mode 100644
index 0000000000..527424c998
--- /dev/null
+++ b/synapse/storage/schema/delta/25/tags.sql
@@ -0,0 +1,38 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE IF NOT EXISTS room_tags(
+    user_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    tag     TEXT NOT NULL,  -- The name of the tag.
+    content TEXT NOT NULL,  -- The JSON content of the tag.
+    CONSTRAINT room_tag_uniqueness UNIQUE (user_id, room_id, tag)
+);
+
+CREATE TABLE IF NOT EXISTS room_tags_revisions (
+    user_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    stream_id BIGINT NOT NULL, -- The current version of the room tags.
+    CONSTRAINT room_tag_revisions_uniqueness UNIQUE (user_id, room_id)
+);
+
+CREATE TABLE IF NOT EXISTS private_user_data_max_stream_id(
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
+    stream_id  BIGINT NOT NULL,
+    CHECK (Lock='X')
+);
+
+INSERT INTO private_user_data_max_stream_id (stream_id) VALUES (0);
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
new file mode 100644
index 0000000000..cdf003502f
--- /dev/null
+++ b/synapse/storage/search.py
@@ -0,0 +1,111 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from _base import SQLBaseStore
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+
+from collections import namedtuple
+
+"""The result of a search.
+
+Fields:
+    rank_map (dict): Mapping event_id -> rank
+    event_map (dict): Mapping event_id -> event
+    pagination_token (str): Pagination token
+"""
+SearchResult = namedtuple("SearchResult", ("rank_map", "event_map", "pagination_token"))
+
+
+class SearchStore(SQLBaseStore):
+    @defer.inlineCallbacks
+    def search_msgs(self, room_ids, search_term, keys):
+        """Performs a full text search over events with given keys.
+
+        Args:
+            room_ids (list): List of room ids to search in
+            search_term (str): Search term to search for
+            keys (list): List of keys to search in, currently supports
+                "content.body", "content.name", "content.topic"
+
+        Returns:
+            SearchResult
+        """
+        clauses = []
+        args = []
+
+        # Make sure we don't explode because the person is in too many rooms.
+        # We filter the results below regardless.
+        if len(room_ids) < 500:
+            clauses.append(
+                "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),)
+            )
+            args.extend(room_ids)
+
+        local_clauses = []
+        for key in keys:
+            local_clauses.append("key = ?")
+            args.append(key)
+
+        clauses.append(
+            "(%s)" % (" OR ".join(local_clauses),)
+        )
+
+        if isinstance(self.database_engine, PostgresEngine):
+            sql = (
+                "SELECT ts_rank_cd(vector, query) AS rank, room_id, event_id"
+                " FROM plainto_tsquery('english', ?) as query, event_search"
+                " WHERE vector @@ query"
+            )
+        elif isinstance(self.database_engine, Sqlite3Engine):
+            sql = (
+                "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id"
+                " FROM event_search"
+                " WHERE value MATCH ?"
+            )
+        else:
+            # This should be unreachable.
+            raise Exception("Unrecognized database engine")
+
+        for clause in clauses:
+            sql += " AND " + clause
+
+        # We add an arbitrary limit here to ensure we don't try to pull the
+        # entire table from the database.
+        sql += " ORDER BY rank DESC LIMIT 500"
+
+        results = yield self._execute(
+            "search_msgs", self.cursor_to_dict, sql, *([search_term] + args)
+        )
+
+        results = filter(lambda row: row["room_id"] in room_ids, results)
+
+        events = yield self._get_events([r["event_id"] for r in results])
+
+        event_map = {
+            ev.event_id: ev
+            for ev in events
+        }
+
+        defer.returnValue(SearchResult(
+            {
+                r["event_id"]: r["rank"]
+                for r in results
+                if r["event_id"] in event_map
+            },
+            event_map,
+            None
+        ))
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 6f2a50d585..acfb322a53 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -214,7 +214,6 @@ class StateStore(SQLBaseStore):
         that are in the `types` list.
 
         Args:
-            room_id (str)
             event_ids (list)
             types (list): List of (type, state_key) tuples which are used to
                 filter the state fetched. `state_key` may be None, which matches
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 3cab06fdef..15d4c2bf68 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -23,7 +23,7 @@ paginate bacwards.
 
 This is implemented by keeping two ordering columns: stream_ordering and
 topological_ordering. Stream ordering is basically insertion/received order
-(except for events from backfill requests). The topolgical_ordering is a
+(except for events from backfill requests). The topological_ordering is a
 weak ordering of events based on the pdu graph.
 
 This means that we have to have two different types of tokens, depending on
@@ -436,3 +436,138 @@ class StreamStore(SQLBaseStore):
             internal = event.internal_metadata
             internal.before = str(RoomStreamToken(topo, stream - 1))
             internal.after = str(RoomStreamToken(topo, stream))
+
+    @defer.inlineCallbacks
+    def get_events_around(self, room_id, event_id, before_limit, after_limit):
+        """Retrieve events and pagination tokens around a given event in a
+        room.
+
+        Args:
+            room_id (str)
+            event_id (str)
+            before_limit (int)
+            after_limit (int)
+
+        Returns:
+            dict
+        """
+
+        results = yield self.runInteraction(
+            "get_events_around", self._get_events_around_txn,
+            room_id, event_id, before_limit, after_limit
+        )
+
+        events_before = yield self._get_events(
+            [e for e in results["before"]["event_ids"]],
+            get_prev_content=True
+        )
+
+        events_after = yield self._get_events(
+            [e for e in results["after"]["event_ids"]],
+            get_prev_content=True
+        )
+
+        defer.returnValue({
+            "events_before": events_before,
+            "events_after": events_after,
+            "start": results["before"]["token"],
+            "end": results["after"]["token"],
+        })
+
+    def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit):
+        """Retrieves event_ids and pagination tokens around a given event in a
+        room.
+
+        Args:
+            room_id (str)
+            event_id (str)
+            before_limit (int)
+            after_limit (int)
+
+        Returns:
+            dict
+        """
+
+        results = self._simple_select_one_txn(
+            txn,
+            "events",
+            keyvalues={
+                "event_id": event_id,
+                "room_id": room_id,
+            },
+            retcols=["stream_ordering", "topological_ordering"],
+        )
+
+        stream_ordering = results["stream_ordering"]
+        topological_ordering = results["topological_ordering"]
+
+        query_before = (
+            "SELECT topological_ordering, stream_ordering, event_id FROM events"
+            " WHERE room_id = ? AND (topological_ordering < ?"
+            " OR (topological_ordering = ? AND stream_ordering < ?))"
+            " ORDER BY topological_ordering DESC, stream_ordering DESC"
+            " LIMIT ?"
+        )
+
+        query_after = (
+            "SELECT topological_ordering, stream_ordering, event_id FROM events"
+            " WHERE room_id = ? AND (topological_ordering > ?"
+            " OR (topological_ordering = ? AND stream_ordering > ?))"
+            " ORDER BY topological_ordering ASC, stream_ordering ASC"
+            " LIMIT ?"
+        )
+
+        txn.execute(
+            query_before,
+            (
+                room_id, topological_ordering, topological_ordering,
+                stream_ordering, before_limit,
+            )
+        )
+
+        rows = self.cursor_to_dict(txn)
+        events_before = [r["event_id"] for r in rows]
+
+        if rows:
+            start_token = str(RoomStreamToken(
+                rows[0]["topological_ordering"],
+                rows[0]["stream_ordering"] - 1,
+            ))
+        else:
+            start_token = str(RoomStreamToken(
+                topological_ordering,
+                stream_ordering - 1,
+            ))
+
+        txn.execute(
+            query_after,
+            (
+                room_id, topological_ordering, topological_ordering,
+                stream_ordering, after_limit,
+            )
+        )
+
+        rows = self.cursor_to_dict(txn)
+        events_after = [r["event_id"] for r in rows]
+
+        if rows:
+            end_token = str(RoomStreamToken(
+                rows[-1]["topological_ordering"],
+                rows[-1]["stream_ordering"],
+            ))
+        else:
+            end_token = str(RoomStreamToken(
+                topological_ordering,
+                stream_ordering,
+            ))
+
+        return {
+            "before": {
+                "event_ids": events_before,
+                "token": start_token,
+            },
+            "after": {
+                "event_ids": events_after,
+                "token": end_token,
+            },
+        }
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
new file mode 100644
index 0000000000..641ea250f0
--- /dev/null
+++ b/synapse/storage/tags.py
@@ -0,0 +1,216 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore
+from synapse.util.caches.descriptors import cached
+from twisted.internet import defer
+from .util.id_generators import StreamIdGenerator
+
+import ujson as json
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class TagsStore(SQLBaseStore):
+    def __init__(self, hs):
+        super(TagsStore, self).__init__(hs)
+
+        self._private_user_data_id_gen = StreamIdGenerator(
+            "private_user_data_max_stream_id", "stream_id"
+        )
+
+    def get_max_private_user_data_stream_id(self):
+        """Get the current max stream id for the private user data stream
+
+        Returns:
+            A deferred int.
+        """
+        return self._private_user_data_id_gen.get_max_token(self)
+
+    @cached()
+    def get_tags_for_user(self, user_id):
+        """Get all the tags for a user.
+
+
+        Args:
+            user_id(str): The user to get the tags for.
+        Returns:
+            A deferred dict mapping from room_id strings to lists of tag
+            strings.
+        """
+
+        deferred = self._simple_select_list(
+            "room_tags", {"user_id": user_id}, ["room_id", "tag", "content"]
+        )
+
+        @deferred.addCallback
+        def tags_by_room(rows):
+            tags_by_room = {}
+            for row in rows:
+                room_tags = tags_by_room.setdefault(row["room_id"], {})
+                room_tags[row["tag"]] = json.loads(row["content"])
+            return tags_by_room
+
+        return deferred
+
+    @defer.inlineCallbacks
+    def get_updated_tags(self, user_id, stream_id):
+        """Get all the tags for the rooms where the tags have changed since the
+        given version
+
+        Args:
+            user_id(str): The user to get the tags for.
+            stream_id(int): The earliest update to get for the user.
+        Returns:
+            A deferred dict mapping from room_id strings to lists of tag
+            strings for all the rooms that changed since the stream_id token.
+        """
+        def get_updated_tags_txn(txn):
+            sql = (
+                "SELECT room_id from room_tags_revisions"
+                " WHERE user_id = ? AND stream_id > ?"
+            )
+            txn.execute(sql, (user_id, stream_id))
+            room_ids = [row[0] for row in txn.fetchall()]
+            return room_ids
+
+        room_ids = yield self.runInteraction(
+            "get_updated_tags", get_updated_tags_txn
+        )
+
+        results = {}
+        if room_ids:
+            tags_by_room = yield self.get_tags_for_user(user_id)
+            for room_id in room_ids:
+                results[room_id] = tags_by_room[room_id]
+
+        defer.returnValue(results)
+
+    def get_tags_for_room(self, user_id, room_id):
+        """Get all the tags for the given room
+        Args:
+            user_id(str): The user to get tags for
+            room_id(str): The room to get tags for
+        Returns:
+            A deferred list of string tags.
+        """
+        return self._simple_select_list(
+            table="room_tags",
+            keyvalues={"user_id": user_id, "room_id": room_id},
+            retcols=("tag", "content"),
+            desc="get_tags_for_room",
+        ).addCallback(lambda rows: {
+            row["tag"]: json.loads(row["content"]) for row in rows
+        })
+
+    @defer.inlineCallbacks
+    def add_tag_to_room(self, user_id, room_id, tag, content):
+        """Add a tag to a room for a user.
+        Args:
+            user_id(str): The user to add a tag for.
+            room_id(str): The room to add a tag for.
+            tag(str): The tag name to add.
+            content(dict): A json object to associate with the tag.
+        Returns:
+            A deferred that completes once the tag has been added.
+        """
+        content_json = json.dumps(content)
+
+        def add_tag_txn(txn, next_id):
+            self._simple_upsert_txn(
+                txn,
+                table="room_tags",
+                keyvalues={
+                    "user_id": user_id,
+                    "room_id": room_id,
+                    "tag": tag,
+                },
+                values={
+                    "content": content_json,
+                }
+            )
+            self._update_revision_txn(txn, user_id, room_id, next_id)
+
+        with (yield self._private_user_data_id_gen.get_next(self)) as next_id:
+            yield self.runInteraction("add_tag", add_tag_txn, next_id)
+
+        self.get_tags_for_user.invalidate((user_id,))
+
+        result = yield self._private_user_data_id_gen.get_max_token(self)
+        defer.returnValue(result)
+
+    @defer.inlineCallbacks
+    def remove_tag_from_room(self, user_id, room_id, tag):
+        """Remove a tag from a room for a user.
+        Returns:
+            A deferred that completes once the tag has been removed
+        """
+        def remove_tag_txn(txn, next_id):
+            sql = (
+                "DELETE FROM room_tags "
+                " WHERE user_id = ? AND room_id = ? AND tag = ?"
+            )
+            txn.execute(sql, (user_id, room_id, tag))
+            self._update_revision_txn(txn, user_id, room_id, next_id)
+
+        with (yield self._private_user_data_id_gen.get_next(self)) as next_id:
+            yield self.runInteraction("remove_tag", remove_tag_txn, next_id)
+
+        self.get_tags_for_user.invalidate((user_id,))
+
+        result = yield self._private_user_data_id_gen.get_max_token(self)
+        defer.returnValue(result)
+
+    def _update_revision_txn(self, txn, user_id, room_id, next_id):
+        """Update the latest revision of the tags for the given user and room.
+
+        Args:
+            txn: The database cursor
+            user_id(str): The ID of the user.
+            room_id(str): The ID of the room.
+            next_id(int): The the revision to advance to.
+        """
+
+        update_max_id_sql = (
+            "UPDATE private_user_data_max_stream_id"
+            " SET stream_id = ?"
+            " WHERE stream_id < ?"
+        )
+        txn.execute(update_max_id_sql, (next_id, next_id))
+
+        update_sql = (
+            "UPDATE room_tags_revisions"
+            " SET stream_id = ?"
+            " WHERE user_id = ?"
+            " AND room_id = ?"
+        )
+        txn.execute(update_sql, (next_id, user_id, room_id))
+
+        if txn.rowcount == 0:
+            insert_sql = (
+                "INSERT INTO room_tags_revisions (user_id, room_id, stream_id)"
+                " VALUES (?, ?, ?)"
+            )
+            try:
+                txn.execute(insert_sql, (user_id, room_id, next_id))
+            except self.database_engine.module.IntegrityError:
+                # Ignore insertion errors. It doesn't matter if the row wasn't
+                # inserted because if two updates happend concurrently the one
+                # with the higher stream_id will not be reported to a client
+                # unless the previous update has completed. It doesn't matter
+                # which stream_id ends up in the table, as long as it is higher
+                # than the id that the client has.
+                pass
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 15695e9831..4e0d7c9774 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -253,16 +253,6 @@ class TransactionStore(SQLBaseStore):
             retry_interval (int) - how long until next retry in ms
         """
 
-        # As this is the new value, we might as well prefill the cache
-        self.get_destination_retry_timings.prefill(
-            destination,
-            {
-                "destination": destination,
-                "retry_last_ts": retry_last_ts,
-                "retry_interval": retry_interval
-            },
-        )
-
         # XXX: we could chose to not bother persisting this if our cache thinks
         # this is a NOOP
         return self.runInteraction(
@@ -275,31 +265,25 @@ class TransactionStore(SQLBaseStore):
 
     def _set_destination_retry_timings(self, txn, destination,
                                        retry_last_ts, retry_interval):
-        query = (
-            "UPDATE destinations"
-            " SET retry_last_ts = ?, retry_interval = ?"
-            " WHERE destination = ?"
-        )
+        txn.call_after(self.get_destination_retry_timings.invalidate, (destination,))
 
-        txn.execute(
-            query,
-            (
-                retry_last_ts, retry_interval, destination,
-            )
+        self._simple_upsert_txn(
+            txn,
+            "destinations",
+            keyvalues={
+                "destination": destination,
+            },
+            values={
+                "retry_last_ts": retry_last_ts,
+                "retry_interval": retry_interval,
+            },
+            insertion_values={
+                "destination": destination,
+                "retry_last_ts": retry_last_ts,
+                "retry_interval": retry_interval,
+            }
         )
 
-        if txn.rowcount == 0:
-            # destination wasn't already in table. Insert it.
-            self._simple_insert_txn(
-                txn,
-                table="destinations",
-                values={
-                    "destination": destination,
-                    "retry_last_ts": retry_last_ts,
-                    "retry_interval": retry_interval,
-                }
-            )
-
     def get_destinations_needing_retry(self):
         """Get all destinations which are due a retry for sending a transaction.
 
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 699083ae12..f0d68b5bf2 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -21,6 +21,7 @@ from synapse.handlers.presence import PresenceEventSource
 from synapse.handlers.room import RoomEventSource
 from synapse.handlers.typing import TypingNotificationEventSource
 from synapse.handlers.receipts import ReceiptEventSource
+from synapse.handlers.private_user_data import PrivateUserDataEventSource
 
 
 class EventSources(object):
@@ -29,6 +30,7 @@ class EventSources(object):
         "presence": PresenceEventSource,
         "typing": TypingNotificationEventSource,
         "receipt": ReceiptEventSource,
+        "private_user_data": PrivateUserDataEventSource,
     }
 
     def __init__(self, hs):
@@ -52,5 +54,8 @@ class EventSources(object):
             receipt_key=(
                 yield self.sources["receipt"].get_current_key()
             ),
+            private_user_data_key=(
+                yield self.sources["private_user_data"].get_current_key()
+            ),
         )
         defer.returnValue(token)
diff --git a/synapse/types.py b/synapse/types.py
index 9cffc33d27..28344d8b36 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -47,7 +47,7 @@ class DomainSpecificString(
     @classmethod
     def from_string(cls, s):
         """Parse the string given by 's' into a structure object."""
-        if s[0] != cls.SIGIL:
+        if len(s) < 1 or s[0] != cls.SIGIL:
             raise SynapseError(400, "Expected %s string to start with '%s'" % (
                 cls.__name__, cls.SIGIL,
             ))
@@ -98,10 +98,13 @@ class EventID(DomainSpecificString):
 
 
 class StreamToken(
-    namedtuple(
-        "Token",
-        ("room_key", "presence_key", "typing_key", "receipt_key")
-    )
+    namedtuple("Token", (
+        "room_key",
+        "presence_key",
+        "typing_key",
+        "receipt_key",
+        "private_user_data_key",
+    ))
 ):
     _SEPARATOR = "_"
 
@@ -109,7 +112,7 @@ class StreamToken(
     def from_string(cls, string):
         try:
             keys = string.split(cls._SEPARATOR)
-            if len(keys) == len(cls._fields) - 1:
+            while len(keys) < len(cls._fields):
                 # i.e. old token from before receipt_key
                 keys.append("0")
             return cls(*keys)
@@ -128,13 +131,14 @@ class StreamToken(
         else:
             return int(self.room_key[1:].split("-")[-1])
 
-    def is_after(self, other_token):
+    def is_after(self, other):
         """Does this token contain events that the other doesn't?"""
         return (
-            (other_token.room_stream_id < self.room_stream_id)
-            or (int(other_token.presence_key) < int(self.presence_key))
-            or (int(other_token.typing_key) < int(self.typing_key))
-            or (int(other_token.receipt_key) < int(self.receipt_key))
+            (other.room_stream_id < self.room_stream_id)
+            or (int(other.presence_key) < int(self.presence_key))
+            or (int(other.typing_key) < int(self.typing_key))
+            or (int(other.receipt_key) < int(self.receipt_key))
+            or (int(other.private_user_data_key) < int(self.private_user_data_key))
         )
 
     def copy_and_advance(self, key, new_value):
diff --git a/synapse/util/emailutils.py b/synapse/util/emailutils.py
deleted file mode 100644
index 7f9a77bf44..0000000000
--- a/synapse/util/emailutils.py
+++ /dev/null
@@ -1,71 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-""" This module allows you to send out emails.
-"""
-import email.utils
-import smtplib
-import twisted.python.log
-from email.mime.text import MIMEText
-from email.mime.multipart import MIMEMultipart
-
-import logging
-
-logger = logging.getLogger(__name__)
-
-
-class EmailException(Exception):
-    pass
-
-
-def send_email(smtp_server, from_addr, to_addr, subject, body):
-    """Sends an email.
-
-    Args:
-        smtp_server(str): The SMTP server to use.
-        from_addr(str): The address to send from.
-        to_addr(str): The address to send to.
-        subject(str): The subject of the email.
-        body(str): The plain text body of the email.
-    Raises:
-        EmailException if there was a problem sending the mail.
-    """
-    if not smtp_server or not from_addr or not to_addr:
-        raise EmailException("Need SMTP server, from and to addresses. Check"
-                             " the config to set these.")
-
-    msg = MIMEMultipart('alternative')
-    msg['Subject'] = subject
-    msg['From'] = from_addr
-    msg['To'] = to_addr
-    plain_part = MIMEText(body)
-    msg.attach(plain_part)
-
-    raw_from = email.utils.parseaddr(from_addr)[1]
-    raw_to = email.utils.parseaddr(to_addr)[1]
-    if not raw_from or not raw_to:
-        raise EmailException("Couldn't parse from/to address.")
-
-    logger.info("Sending email to %s on server %s with subject %s",
-                to_addr, smtp_server, subject)
-
-    try:
-        smtp = smtplib.SMTP(smtp_server)
-        smtp.sendmail(raw_from, raw_to, msg.as_string())
-        smtp.quit()
-    except Exception as origException:
-        twisted.python.log.err()
-        ese = EmailException()
-        ese.cause = origException
-        raise ese
diff --git a/synapse/util/third_party_invites.py b/synapse/util/third_party_invites.py
new file mode 100644
index 0000000000..31d186740d
--- /dev/null
+++ b/synapse/util/third_party_invites.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+from synapse.api.errors import AuthError
+
+
+INVITE_KEYS = {"id_server", "medium", "address", "display_name"}
+
+JOIN_KEYS = {
+    "token",
+    "public_key",
+    "key_validity_url",
+    "sender",
+    "signed",
+}
+
+
+def has_invite_keys(content):
+    for key in INVITE_KEYS:
+        if key not in content:
+            return False
+    return True
+
+
+def has_join_keys(content):
+    for key in JOIN_KEYS:
+        if key not in content:
+            return False
+    return True
+
+
+def join_has_third_party_invite(content):
+    if "third_party_invite" not in content:
+        return False
+    return has_join_keys(content["third_party_invite"])
+
+
+def extract_join_keys(src):
+    return {
+        key: value
+        for key, value in src.items()
+        if key in JOIN_KEYS
+    }
+
+
+@defer.inlineCallbacks
+def check_key_valid(http_client, event):
+    try:
+        response = yield http_client.get_json(
+            event.content["third_party_invite"]["key_validity_url"],
+            {"public_key": event.content["third_party_invite"]["public_key"]}
+        )
+    except Exception:
+        raise AuthError(502, "Third party certificate could not be checked")
+    if "valid" not in response or not response["valid"]:
+        raise AuthError(403, "Third party certificate was invalid")