summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2015-08-12 17:07:22 +0100
committerMark Haines <mark.haines@matrix.org>2015-08-12 17:21:14 +0100
commit998a72d4d9ec6e73000888dcdf51437ec427fbee (patch)
treef8fa9d5deb820b49eb3a216e194ee83e14fb9eda /synapse/handlers
parentBump the version of twisted needed for setup_requires to 15.2.1 (diff)
parentMerge pull request #220 from matrix-org/markjh/generate_keys (diff)
downloadsynapse-998a72d4d9ec6e73000888dcdf51437ec427fbee.tar.xz
Merge branch 'develop' into markjh/twisted-15
Conflicts:
	synapse/http/matrixfederationclient.py
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/__init__.py2
-rw-r--r--synapse/handlers/_base.py4
-rw-r--r--synapse/handlers/appservice.py2
-rw-r--r--synapse/handlers/auth.py6
-rw-r--r--synapse/handlers/federation.py292
-rw-r--r--synapse/handlers/identity.py2
-rw-r--r--synapse/handlers/message.py132
-rw-r--r--synapse/handlers/presence.py30
-rw-r--r--synapse/handlers/receipts.py212
-rw-r--r--synapse/handlers/register.py32
-rw-r--r--synapse/handlers/room.py97
-rw-r--r--synapse/handlers/sync.py48
-rw-r--r--synapse/handlers/typing.py2
13 files changed, 708 insertions, 153 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 685792dbdc..dc5b6ef79d 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -32,6 +32,7 @@ from .appservice import ApplicationServicesHandler
 from .sync import SyncHandler
 from .auth import AuthHandler
 from .identity import IdentityHandler
+from .receipts import ReceiptsHandler
 
 
 class Handlers(object):
@@ -57,6 +58,7 @@ class Handlers(object):
         self.directory_handler = DirectoryHandler(hs)
         self.typing_notification_handler = TypingNotificationHandler(hs)
         self.admin_handler = AdminHandler(hs)
+        self.receipts_handler = ReceiptsHandler(hs)
         asapi = ApplicationServiceApi(hs)
         self.appservice_handler = ApplicationServicesHandler(
             hs, asapi, AppServiceScheduler(
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 833ff41377..d6c064b398 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -78,7 +78,9 @@ class BaseHandler(object):
         context = yield state_handler.compute_event_context(builder)
 
         if builder.is_state():
-            builder.prev_state = context.prev_state_events
+            builder.prev_state = yield self.store.add_event_hashes(
+                context.prev_state_events
+            )
 
         yield self.auth.add_auth_events(builder, context)
 
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 8269482e47..1240e51649 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -177,7 +177,7 @@ class ApplicationServicesHandler(object):
             return
 
         user_info = yield self.store.get_user_by_id(user_id)
-        if not user_info:
+        if user_info:
             defer.returnValue(False)
             return
 
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 63071653a3..1ecf7fef17 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -85,8 +85,10 @@ class AuthHandler(BaseHandler):
             # email auth link on there). It's probably too open to abuse
             # because it lets unauthenticated clients store arbitrary objects
             # on a home server.
-            # sess['clientdict'] = clientdict
-            # self._save_session(sess)
+            # Revisit: Assumimg the REST APIs do sensible validation, the data
+            # isn't arbintrary.
+            sess['clientdict'] = clientdict
+            self._save_session(sess)
             pass
         elif 'clientdict' in sess:
             clientdict = sess['clientdict']
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 46ce3699d7..f7155fd8d3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -31,6 +31,8 @@ from synapse.crypto.event_signing import (
 )
 from synapse.types import UserID
 
+from synapse.events.utils import prune_event
+
 from synapse.util.retryutils import NotRetryingDestination
 
 from twisted.internet import defer
@@ -138,26 +140,29 @@ class FederationHandler(BaseHandler):
         if state and auth_chain is not None:
             # If we have any state or auth_chain given to us by the replication
             # layer, then we should handle them (if we haven't before.)
+
+            event_infos = []
+
             for e in itertools.chain(auth_chain, state):
                 if e.event_id in seen_ids:
                     continue
-
                 e.internal_metadata.outlier = True
-                try:
-                    auth_ids = [e_id for e_id, _ in e.auth_events]
-                    auth = {
-                        (e.type, e.state_key): e for e in auth_chain
-                        if e.event_id in auth_ids
-                    }
-                    yield self._handle_new_event(
-                        origin, e, auth_events=auth
-                    )
-                    seen_ids.add(e.event_id)
-                except:
-                    logger.exception(
-                        "Failed to handle state event %s",
-                        e.event_id,
-                    )
+                auth_ids = [e_id for e_id, _ in e.auth_events]
+                auth = {
+                    (e.type, e.state_key): e for e in auth_chain
+                    if e.event_id in auth_ids
+                }
+                event_infos.append({
+                    "event": e,
+                    "auth_events": auth,
+                })
+                seen_ids.add(e.event_id)
+
+            yield self._handle_new_events(
+                origin,
+                event_infos,
+                outliers=True
+            )
 
         try:
             _, event_stream_id, max_stream_id = yield self._handle_new_event(
@@ -222,6 +227,56 @@ class FederationHandler(BaseHandler):
                     "user_joined_room", user=user, room_id=event.room_id
                 )
 
+    @defer.inlineCallbacks
+    def _filter_events_for_server(self, server_name, room_id, events):
+        states = yield self.store.get_state_for_events(
+            room_id, [e.event_id for e in events],
+        )
+
+        events_and_states = zip(events, states)
+
+        def redact_disallowed(event_and_state):
+            event, state = event_and_state
+
+            if not state:
+                return event
+
+            history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
+            if history:
+                visibility = history.content.get("history_visibility", "shared")
+                if visibility in ["invited", "joined"]:
+                    # We now loop through all state events looking for
+                    # membership states for the requesting server to determine
+                    # if the server is either in the room or has been invited
+                    # into the room.
+                    for ev in state.values():
+                        if ev.type != EventTypes.Member:
+                            continue
+                        try:
+                            domain = UserID.from_string(ev.state_key).domain
+                        except:
+                            continue
+
+                        if domain != server_name:
+                            continue
+
+                        memtype = ev.membership
+                        if memtype == Membership.JOIN:
+                            return event
+                        elif memtype == Membership.INVITE:
+                            if visibility == "invited":
+                                return event
+                    else:
+                        return prune_event(event)
+
+            return event
+
+        res = map(redact_disallowed, events_and_states)
+
+        logger.info("_filter_events_for_server %r", res)
+
+        defer.returnValue(res)
+
     @log_function
     @defer.inlineCallbacks
     def backfill(self, dest, room_id, limit, extremities=[]):
@@ -247,9 +302,15 @@ class FederationHandler(BaseHandler):
             if set(e_id for e_id, _ in ev.prev_events) - event_ids
         ]
 
+        logger.info(
+            "backfill: Got %d events with %d edges",
+            len(events), len(edges),
+        )
+
         # For each edge get the current state.
 
         auth_events = {}
+        state_events = {}
         events_to_state = {}
         for e_id in edges:
             state, auth = yield self.replication_layer.get_state_for_room(
@@ -258,27 +319,57 @@ class FederationHandler(BaseHandler):
                 event_id=e_id
             )
             auth_events.update({a.event_id: a for a in auth})
+            auth_events.update({s.event_id: s for s in state})
+            state_events.update({s.event_id: s for s in state})
             events_to_state[e_id] = state
 
-        yield defer.gatherResults(
-            [
-                self._handle_new_event(dest, a)
-                for a in auth_events.values()
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError)
+        seen_events = yield self.store.have_events(
+            set(auth_events.keys()) | set(state_events.keys())
+        )
+
+        all_events = events + state_events.values() + auth_events.values()
+        required_auth = set(
+            a_id for event in all_events for a_id, _ in event.auth_events
+        )
 
-        yield defer.gatherResults(
+        missing_auth = required_auth - set(auth_events)
+        results = yield defer.gatherResults(
             [
-                self._handle_new_event(
-                    dest, event_map[e_id],
-                    state=events_to_state[e_id],
-                    backfilled=True,
+                self.replication_layer.get_pdu(
+                    [dest],
+                    event_id,
+                    outlier=True,
+                    timeout=10000,
                 )
-                for e_id in events_to_state
+                for event_id in missing_auth
             ],
             consumeErrors=True
         ).addErrback(unwrapFirstError)
+        auth_events.update({a.event_id: a for a in results})
+
+        ev_infos = []
+        for a in auth_events.values():
+            if a.event_id in seen_events:
+                continue
+            ev_infos.append({
+                "event": a,
+                "auth_events": {
+                    (auth_events[a_id].type, auth_events[a_id].state_key):
+                    auth_events[a_id]
+                    for a_id, _ in a.auth_events
+                }
+            })
+
+        for e_id in events_to_state:
+            ev_infos.append({
+                "event": event_map[e_id],
+                "state": events_to_state[e_id],
+                "auth_events": {
+                    (auth_events[a_id].type, auth_events[a_id].state_key):
+                    auth_events[a_id]
+                    for a_id, _ in event_map[e_id].auth_events
+                }
+            })
 
         events.sort(key=lambda e: e.depth)
 
@@ -286,10 +377,14 @@ class FederationHandler(BaseHandler):
             if event in events_to_state:
                 continue
 
-            yield self._handle_new_event(
-                dest, event,
-                backfilled=True,
-            )
+            ev_infos.append({
+                "event": event,
+            })
+
+        yield self._handle_new_events(
+            dest, ev_infos,
+            backfilled=True,
+        )
 
         defer.returnValue(events)
 
@@ -555,32 +650,22 @@ class FederationHandler(BaseHandler):
                 # FIXME
                 pass
 
-            yield self._handle_auth_events(
-                origin, [e for e in auth_chain if e.event_id != event.event_id]
-            )
-
-            @defer.inlineCallbacks
-            def handle_state(e):
+            ev_infos = []
+            for e in itertools.chain(state, auth_chain):
                 if e.event_id == event.event_id:
-                    return
+                    continue
 
                 e.internal_metadata.outlier = True
-                try:
-                    auth_ids = [e_id for e_id, _ in e.auth_events]
-                    auth = {
+                auth_ids = [e_id for e_id, _ in e.auth_events]
+                ev_infos.append({
+                    "event": e,
+                    "auth_events": {
                         (e.type, e.state_key): e for e in auth_chain
                         if e.event_id in auth_ids
                     }
-                    yield self._handle_new_event(
-                        origin, e, auth_events=auth
-                    )
-                except:
-                    logger.exception(
-                        "Failed to handle state event %s",
-                        e.event_id,
-                    )
+                })
 
-            yield defer.DeferredList([handle_state(e) for e in state])
+            yield self._handle_new_events(origin, ev_infos, outliers=True)
 
             auth_ids = [e_id for e_id, _ in event.auth_events]
             auth_events = {
@@ -837,6 +922,8 @@ class FederationHandler(BaseHandler):
             limit
         )
 
+        events = yield self._filter_events_for_server(origin, room_id, events)
+
         defer.returnValue(events)
 
     @defer.inlineCallbacks
@@ -895,24 +982,62 @@ class FederationHandler(BaseHandler):
     def _handle_new_event(self, origin, event, state=None, backfilled=False,
                           current_state=None, auth_events=None):
 
-        logger.debug(
-            "_handle_new_event: %s, sigs: %s",
-            event.event_id, event.signatures,
+        outlier = event.internal_metadata.is_outlier()
+
+        context = yield self._prep_event(
+            origin, event,
+            state=state,
+            backfilled=backfilled,
+            current_state=current_state,
+            auth_events=auth_events,
         )
 
-        context = yield self.state_handler.compute_event_context(
-            event, old_state=state
+        event_stream_id, max_stream_id = yield self.store.persist_event(
+            event,
+            context=context,
+            backfilled=backfilled,
+            is_new_state=(not outlier and not backfilled),
+            current_state=current_state,
         )
 
-        if not auth_events:
-            auth_events = context.current_state
+        defer.returnValue((context, event_stream_id, max_stream_id))
 
-        logger.debug(
-            "_handle_new_event: %s, auth_events: %s",
-            event.event_id, auth_events,
+    @defer.inlineCallbacks
+    def _handle_new_events(self, origin, event_infos, backfilled=False,
+                           outliers=False):
+        contexts = yield defer.gatherResults(
+            [
+                self._prep_event(
+                    origin,
+                    ev_info["event"],
+                    state=ev_info.get("state"),
+                    backfilled=backfilled,
+                    auth_events=ev_info.get("auth_events"),
+                )
+                for ev_info in event_infos
+            ]
         )
 
-        is_new_state = not event.internal_metadata.is_outlier()
+        yield self.store.persist_events(
+            [
+                (ev_info["event"], context)
+                for ev_info, context in itertools.izip(event_infos, contexts)
+            ],
+            backfilled=backfilled,
+            is_new_state=(not outliers and not backfilled),
+        )
+
+    @defer.inlineCallbacks
+    def _prep_event(self, origin, event, state=None, backfilled=False,
+                    current_state=None, auth_events=None):
+        outlier = event.internal_metadata.is_outlier()
+
+        context = yield self.state_handler.compute_event_context(
+            event, old_state=state, outlier=outlier,
+        )
+
+        if not auth_events:
+            auth_events = context.current_state
 
         # This is a hack to fix some old rooms where the initial join event
         # didn't reference the create event in its auth events.
@@ -937,26 +1062,7 @@ class FederationHandler(BaseHandler):
 
             context.rejected = RejectedReason.AUTH_ERROR
 
-            # FIXME: Don't store as rejected with AUTH_ERROR if we haven't
-            # seen all the auth events.
-            yield self.store.persist_event(
-                event,
-                context=context,
-                backfilled=backfilled,
-                is_new_state=False,
-                current_state=current_state,
-            )
-            raise
-
-        event_stream_id, max_stream_id = yield self.store.persist_event(
-            event,
-            context=context,
-            backfilled=backfilled,
-            is_new_state=(is_new_state and not backfilled),
-            current_state=current_state,
-        )
-
-        defer.returnValue((context, event_stream_id, max_stream_id))
+        defer.returnValue(context)
 
     @defer.inlineCallbacks
     def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
@@ -1019,14 +1125,24 @@ class FederationHandler(BaseHandler):
     @log_function
     def do_auth(self, origin, event, context, auth_events):
         # Check if we have all the auth events.
-        have_events = yield self.store.have_events(
-            [e_id for e_id, _ in event.auth_events]
-        )
-
+        current_state = set(e.event_id for e in auth_events.values())
         event_auth_events = set(e_id for e_id, _ in event.auth_events)
+
+        if event_auth_events - current_state:
+            have_events = yield self.store.have_events(
+                event_auth_events - current_state
+            )
+        else:
+            have_events = {}
+
+        have_events.update({
+            e.event_id: ""
+            for e in auth_events.values()
+        })
+
         seen_events = set(have_events.keys())
 
-        missing_auth = event_auth_events - seen_events
+        missing_auth = event_auth_events - seen_events - current_state
 
         if missing_auth:
             logger.info("Missing auth: %s", missing_auth)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 6200e10775..c1095708a0 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -44,7 +44,7 @@ class IdentityHandler(BaseHandler):
         http_client = SimpleHttpClient(self.hs)
         # XXX: make this configurable!
         # trustedIdServers = ['matrix.org', 'localhost:8090']
-        trustedIdServers = ['matrix.org']
+        trustedIdServers = ['matrix.org', 'vector.im']
 
         if 'id_server' in creds:
             id_server = creds['id_server']
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 867fdbefb0..9d6d4f0978 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -113,11 +113,21 @@ class MessageHandler(BaseHandler):
             "room_key", next_key
         )
 
+        if not events:
+            defer.returnValue({
+                "chunk": [],
+                "start": pagin_config.from_token.to_string(),
+                "end": next_token.to_string(),
+            })
+
+        events = yield self._filter_events_for_client(user_id, room_id, events)
+
         time_now = self.clock.time_msec()
 
         chunk = {
             "chunk": [
-                serialize_event(e, time_now, as_client_event) for e in events
+                serialize_event(e, time_now, as_client_event)
+                for e in events
             ],
             "start": pagin_config.from_token.to_string(),
             "end": next_token.to_string(),
@@ -126,6 +136,52 @@ class MessageHandler(BaseHandler):
         defer.returnValue(chunk)
 
     @defer.inlineCallbacks
+    def _filter_events_for_client(self, user_id, room_id, events):
+        states = yield self.store.get_state_for_events(
+            room_id, [e.event_id for e in events],
+        )
+
+        events_and_states = zip(events, states)
+
+        def allowed(event_and_state):
+            event, state = event_and_state
+
+            if event.type == EventTypes.RoomHistoryVisibility:
+                return True
+
+            membership_ev = state.get((EventTypes.Member, user_id), None)
+            if membership_ev:
+                membership = membership_ev.membership
+            else:
+                membership = Membership.LEAVE
+
+            if membership == Membership.JOIN:
+                return True
+
+            history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
+            if history:
+                visibility = history.content.get("history_visibility", "shared")
+            else:
+                visibility = "shared"
+
+            if visibility == "public":
+                return True
+            elif visibility == "shared":
+                return True
+            elif visibility == "joined":
+                return membership == Membership.JOIN
+            elif visibility == "invited":
+                return membership == Membership.INVITE
+
+            return True
+
+        events_and_states = filter(allowed, events_and_states)
+        defer.returnValue([
+            ev
+            for ev, _ in events_and_states
+        ])
+
+    @defer.inlineCallbacks
     def create_and_send_event(self, event_dict, ratelimit=True,
                               client=None, txn_id=None):
         """ Given a dict from a client, create and handle a new event.
@@ -278,6 +334,11 @@ class MessageHandler(BaseHandler):
             user, pagination_config.get_source_config("presence"), None
         )
 
+        receipt_stream = self.hs.get_event_sources().sources["receipt"]
+        receipt, _ = yield receipt_stream.get_pagination_rows(
+            user, pagination_config.get_source_config("receipt"), None
+        )
+
         public_room_ids = yield self.store.get_public_room_ids()
 
         limit = pagin_config.limit
@@ -316,6 +377,10 @@ class MessageHandler(BaseHandler):
                     ]
                 ).addErrback(unwrapFirstError)
 
+                messages = yield self._filter_events_for_client(
+                    user_id, event.room_id, messages
+                )
+
                 start_token = now_token.copy_and_replace("room_key", token[0])
                 end_token = now_token.copy_and_replace("room_key", token[1])
                 time_now = self.clock.time_msec()
@@ -344,7 +409,8 @@ class MessageHandler(BaseHandler):
         ret = {
             "rooms": rooms_ret,
             "presence": presence,
-            "end": now_token.to_string()
+            "receipts": receipt,
+            "end": now_token.to_string(),
         }
 
         defer.returnValue(ret)
@@ -380,15 +446,6 @@ class MessageHandler(BaseHandler):
         if limit is None:
             limit = 10
 
-        messages, token = yield self.store.get_recent_events_for_room(
-            room_id,
-            limit=limit,
-            end_token=now_token.room_key,
-        )
-
-        start_token = now_token.copy_and_replace("room_key", token[0])
-        end_token = now_token.copy_and_replace("room_key", token[1])
-
         room_members = [
             m for m in current_state.values()
             if m.type == EventTypes.Member
@@ -396,19 +453,45 @@ class MessageHandler(BaseHandler):
         ]
 
         presence_handler = self.hs.get_handlers().presence_handler
-        presence = []
-        for m in room_members:
-            try:
-                member_presence = yield presence_handler.get_state(
-                    target_user=UserID.from_string(m.user_id),
-                    auth_user=auth_user,
-                    as_event=True,
-                )
-                presence.append(member_presence)
-            except SynapseError:
-                logger.exception(
-                    "Failed to get member presence of %r", m.user_id
+
+        @defer.inlineCallbacks
+        def get_presence():
+            presence_defs = yield defer.DeferredList(
+                [
+                    presence_handler.get_state(
+                        target_user=UserID.from_string(m.user_id),
+                        auth_user=auth_user,
+                        as_event=True,
+                        check_auth=False,
+                    )
+                    for m in room_members
+                ],
+                consumeErrors=True,
+            )
+
+            defer.returnValue([p for success, p in presence_defs if success])
+
+        receipts_handler = self.hs.get_handlers().receipts_handler
+
+        presence, receipts, (messages, token) = yield defer.gatherResults(
+            [
+                get_presence(),
+                receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key),
+                self.store.get_recent_events_for_room(
+                    room_id,
+                    limit=limit,
+                    end_token=now_token.room_key,
                 )
+            ],
+            consumeErrors=True,
+        ).addErrback(unwrapFirstError)
+
+        messages = yield self._filter_events_for_client(
+            user_id, room_id, messages
+        )
+
+        start_token = now_token.copy_and_replace("room_key", token[0])
+        end_token = now_token.copy_and_replace("room_key", token[1])
 
         time_now = self.clock.time_msec()
 
@@ -421,5 +504,6 @@ class MessageHandler(BaseHandler):
                 "end": end_token.to_string(),
             },
             "state": state,
-            "presence": presence
+            "presence": presence,
+            "receipts": receipts,
         })
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 023ad33ab0..341a516da2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -191,24 +191,24 @@ class PresenceHandler(BaseHandler):
         defer.returnValue(False)
 
     @defer.inlineCallbacks
-    def get_state(self, target_user, auth_user, as_event=False):
+    def get_state(self, target_user, auth_user, as_event=False, check_auth=True):
         if self.hs.is_mine(target_user):
-            visible = yield self.is_presence_visible(
-                observer_user=auth_user,
-                observed_user=target_user
-            )
+            if check_auth:
+                visible = yield self.is_presence_visible(
+                    observer_user=auth_user,
+                    observed_user=target_user
+                )
 
-            if not visible:
-                raise SynapseError(404, "Presence information not visible")
-            state = yield self.store.get_presence_state(target_user.localpart)
-            if "mtime" in state:
-                del state["mtime"]
-            state["presence"] = state.pop("state")
+                if not visible:
+                    raise SynapseError(404, "Presence information not visible")
 
             if target_user in self._user_cachemap:
-                cached_state = self._user_cachemap[target_user].get_state()
-                if "last_active" in cached_state:
-                    state["last_active"] = cached_state["last_active"]
+                state = self._user_cachemap[target_user].get_state()
+            else:
+                state = yield self.store.get_presence_state(target_user.localpart)
+                if "mtime" in state:
+                    del state["mtime"]
+                state["presence"] = state.pop("state")
         else:
             # TODO(paul): Have remote server send us permissions set
             state = self._get_or_offline_usercache(target_user).get_state()
@@ -992,7 +992,7 @@ class PresenceHandler(BaseHandler):
             room_ids([str]): List of room_ids to notify.
         """
         with PreserveLoggingContext():
-            self.notifier.on_new_user_event(
+            self.notifier.on_new_event(
                 "presence_key",
                 self._user_cachemap_latest_serial,
                 users_to_push,
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
new file mode 100644
index 0000000000..415dd339f6
--- /dev/null
+++ b/synapse/handlers/receipts.py
@@ -0,0 +1,212 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseHandler
+
+from twisted.internet import defer
+
+from synapse.util.logcontext import PreserveLoggingContext
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class ReceiptsHandler(BaseHandler):
+    def __init__(self, hs):
+        super(ReceiptsHandler, self).__init__(hs)
+
+        self.hs = hs
+        self.federation = hs.get_replication_layer()
+        self.federation.register_edu_handler(
+            "m.receipt", self._received_remote_receipt
+        )
+        self.clock = self.hs.get_clock()
+
+        self._receipt_cache = None
+
+    @defer.inlineCallbacks
+    def received_client_receipt(self, room_id, receipt_type, user_id,
+                                event_id):
+        """Called when a client tells us a local user has read up to the given
+        event_id in the room.
+        """
+        receipt = {
+            "room_id": room_id,
+            "receipt_type": receipt_type,
+            "user_id": user_id,
+            "event_ids": [event_id],
+            "data": {
+                "ts": int(self.clock.time_msec()),
+            }
+        }
+
+        is_new = yield self._handle_new_receipts([receipt])
+
+        if is_new:
+            self._push_remotes([receipt])
+
+    @defer.inlineCallbacks
+    def _received_remote_receipt(self, origin, content):
+        """Called when we receive an EDU of type m.receipt from a remote HS.
+        """
+        receipts = [
+            {
+                "room_id": room_id,
+                "receipt_type": receipt_type,
+                "user_id": user_id,
+                "event_ids": user_values["event_ids"],
+                "data": user_values.get("data", {}),
+            }
+            for room_id, room_values in content.items()
+            for receipt_type, users in room_values.items()
+            for user_id, user_values in users.items()
+        ]
+
+        yield self._handle_new_receipts(receipts)
+
+    @defer.inlineCallbacks
+    def _handle_new_receipts(self, receipts):
+        """Takes a list of receipts, stores them and informs the notifier.
+        """
+        for receipt in receipts:
+            room_id = receipt["room_id"]
+            receipt_type = receipt["receipt_type"]
+            user_id = receipt["user_id"]
+            event_ids = receipt["event_ids"]
+            data = receipt["data"]
+
+            res = yield self.store.insert_receipt(
+                room_id, receipt_type, user_id, event_ids, data
+            )
+
+            if not res:
+                # res will be None if this read receipt is 'old'
+                defer.returnValue(False)
+
+            stream_id, max_persisted_id = res
+
+            with PreserveLoggingContext():
+                self.notifier.on_new_event(
+                    "receipt_key", max_persisted_id, rooms=[room_id]
+                )
+
+            defer.returnValue(True)
+
+    @defer.inlineCallbacks
+    def _push_remotes(self, receipts):
+        """Given a list of receipts, works out which remote servers should be
+        poked and pokes them.
+        """
+        # TODO: Some of this stuff should be coallesced.
+        for receipt in receipts:
+            room_id = receipt["room_id"]
+            receipt_type = receipt["receipt_type"]
+            user_id = receipt["user_id"]
+            event_ids = receipt["event_ids"]
+            data = receipt["data"]
+
+            remotedomains = set()
+
+            rm_handler = self.hs.get_handlers().room_member_handler
+            yield rm_handler.fetch_room_distributions_into(
+                room_id, localusers=None, remotedomains=remotedomains
+            )
+
+            logger.debug("Sending receipt to: %r", remotedomains)
+
+            for domain in remotedomains:
+                self.federation.send_edu(
+                    destination=domain,
+                    edu_type="m.receipt",
+                    content={
+                        room_id: {
+                            receipt_type: {
+                                user_id: {
+                                    "event_ids": event_ids,
+                                    "data": data,
+                                }
+                            }
+                        },
+                    },
+                )
+
+    @defer.inlineCallbacks
+    def get_receipts_for_room(self, room_id, to_key):
+        """Gets all receipts for a room, upto the given key.
+        """
+        result = yield self.store.get_linearized_receipts_for_room(
+            room_id,
+            to_key=to_key,
+        )
+
+        if not result:
+            defer.returnValue([])
+
+        event = {
+            "type": "m.receipt",
+            "room_id": room_id,
+            "content": result,
+        }
+
+        defer.returnValue([event])
+
+
+class ReceiptEventSource(object):
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def get_new_events_for_user(self, user, from_key, limit):
+        defer.returnValue(([], from_key))
+        from_key = int(from_key)
+        to_key = yield self.get_current_key()
+
+        if from_key == to_key:
+            defer.returnValue(([], to_key))
+
+        rooms = yield self.store.get_rooms_for_user(user.to_string())
+        rooms = [room.room_id for room in rooms]
+        events = yield self.store.get_linearized_receipts_for_rooms(
+            rooms,
+            from_key=from_key,
+            to_key=to_key,
+        )
+
+        defer.returnValue((events, to_key))
+
+    def get_current_key(self, direction='f'):
+        return self.store.get_max_receipt_stream_id()
+
+    @defer.inlineCallbacks
+    def get_pagination_rows(self, user, config, key):
+        to_key = int(config.from_key)
+        defer.returnValue(([], to_key))
+
+        if config.to_key:
+            from_key = int(config.to_key)
+        else:
+            from_key = None
+
+        rooms = yield self.store.get_rooms_for_user(user.to_string())
+        rooms = [room.room_id for room in rooms]
+        events = yield self.store.get_linearized_receipts_for_rooms(
+            rooms,
+            from_key=from_key,
+            to_key=to_key,
+        )
+
+        defer.returnValue((events, to_key))
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 7b68585a17..f81d75017d 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -73,7 +73,8 @@ class RegistrationHandler(BaseHandler):
             localpart : The local part of the user ID to register. If None,
               one will be randomly generated.
             password (str) : The password to assign to this user so they can
-            login again.
+            login again. This can be None which means they cannot login again
+            via a password (e.g. the user is an application service user).
         Returns:
             A tuple of (user_id, access_token).
         Raises:
@@ -193,6 +194,35 @@ class RegistrationHandler(BaseHandler):
             logger.info("Valid captcha entered from %s", ip)
 
     @defer.inlineCallbacks
+    def register_saml2(self, localpart):
+        """
+        Registers email_id as SAML2 Based Auth.
+        """
+        if urllib.quote(localpart) != localpart:
+            raise SynapseError(
+                400,
+                "User ID must only contain characters which do not"
+                " require URL encoding."
+                )
+        user = UserID(localpart, self.hs.hostname)
+        user_id = user.to_string()
+
+        yield self.check_user_id_is_valid(user_id)
+        token = self._generate_token(user_id)
+        try:
+            yield self.store.register(
+                user_id=user_id,
+                token=token,
+                password_hash=None
+            )
+            yield self.distributor.fire("registered_user", user)
+        except Exception, e:
+            yield self.store.add_access_token_to_user(user_id, token)
+            # Ignore Registration errors
+            logger.exception(e)
+        defer.returnValue((user_id, token))
+
+    @defer.inlineCallbacks
     def register_email(self, threepidCreds):
         """
         Registers emails with an identity server.
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 4bd027d9bb..7511d294f3 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -19,12 +19,15 @@ from twisted.internet import defer
 from ._base import BaseHandler
 
 from synapse.types import UserID, RoomAlias, RoomID
-from synapse.api.constants import EventTypes, Membership, JoinRules
+from synapse.api.constants import (
+    EventTypes, Membership, JoinRules, RoomCreationPreset,
+)
 from synapse.api.errors import StoreError, SynapseError
 from synapse.util import stringutils, unwrapFirstError
 from synapse.util.async import run_on_reactor
 from synapse.events.utils import serialize_event
 
+from collections import OrderedDict
 import logging
 import string
 
@@ -33,6 +36,19 @@ logger = logging.getLogger(__name__)
 
 class RoomCreationHandler(BaseHandler):
 
+    PRESETS_DICT = {
+        RoomCreationPreset.PRIVATE_CHAT: {
+            "join_rules": JoinRules.INVITE,
+            "history_visibility": "invited",
+            "original_invitees_have_ops": False,
+        },
+        RoomCreationPreset.PUBLIC_CHAT: {
+            "join_rules": JoinRules.PUBLIC,
+            "history_visibility": "shared",
+            "original_invitees_have_ops": False,
+        },
+    }
+
     @defer.inlineCallbacks
     def create_room(self, user_id, room_id, config):
         """ Creates a new room.
@@ -121,9 +137,25 @@ class RoomCreationHandler(BaseHandler):
                 servers=[self.hs.hostname],
             )
 
+        preset_config = config.get(
+            "preset",
+            RoomCreationPreset.PUBLIC_CHAT
+            if is_public
+            else RoomCreationPreset.PRIVATE_CHAT
+        )
+
+        raw_initial_state = config.get("initial_state", [])
+
+        initial_state = OrderedDict()
+        for val in raw_initial_state:
+            initial_state[(val["type"], val.get("state_key", ""))] = val["content"]
+
         user = UserID.from_string(user_id)
         creation_events = self._create_events_for_new_room(
-            user, room_id, is_public=is_public
+            user, room_id,
+            preset_config=preset_config,
+            invite_list=invite_list,
+            initial_state=initial_state,
         )
 
         msg_handler = self.hs.get_handlers().message_handler
@@ -170,7 +202,10 @@ class RoomCreationHandler(BaseHandler):
 
         defer.returnValue(result)
 
-    def _create_events_for_new_room(self, creator, room_id, is_public=False):
+    def _create_events_for_new_room(self, creator, room_id, preset_config,
+                                    invite_list, initial_state):
+        config = RoomCreationHandler.PRESETS_DICT[preset_config]
+
         creator_id = creator.to_string()
 
         event_keys = {
@@ -203,9 +238,10 @@ class RoomCreationHandler(BaseHandler):
             },
         )
 
-        power_levels_event = create(
-            etype=EventTypes.PowerLevels,
-            content={
+        returned_events = [creation_event, join_event]
+
+        if (EventTypes.PowerLevels, '') not in initial_state:
+            power_level_content = {
                 "users": {
                     creator.to_string(): 100,
                 },
@@ -213,6 +249,7 @@ class RoomCreationHandler(BaseHandler):
                 "events": {
                     EventTypes.Name: 100,
                     EventTypes.PowerLevels: 100,
+                    EventTypes.RoomHistoryVisibility: 100,
                 },
                 "events_default": 0,
                 "state_default": 50,
@@ -220,21 +257,43 @@ class RoomCreationHandler(BaseHandler):
                 "kick": 50,
                 "redact": 50,
                 "invite": 0,
-            },
-        )
+            }
 
-        join_rule = JoinRules.PUBLIC if is_public else JoinRules.INVITE
-        join_rules_event = create(
-            etype=EventTypes.JoinRules,
-            content={"join_rule": join_rule},
-        )
+            if config["original_invitees_have_ops"]:
+                for invitee in invite_list:
+                    power_level_content["users"][invitee] = 100
 
-        return [
-            creation_event,
-            join_event,
-            power_levels_event,
-            join_rules_event,
-        ]
+            power_levels_event = create(
+                etype=EventTypes.PowerLevels,
+                content=power_level_content,
+            )
+
+            returned_events.append(power_levels_event)
+
+        if (EventTypes.JoinRules, '') not in initial_state:
+            join_rules_event = create(
+                etype=EventTypes.JoinRules,
+                content={"join_rule": config["join_rules"]},
+            )
+
+            returned_events.append(join_rules_event)
+
+        if (EventTypes.RoomHistoryVisibility, '') not in initial_state:
+            history_event = create(
+                etype=EventTypes.RoomHistoryVisibility,
+                content={"history_visibility": config["history_visibility"]}
+            )
+
+            returned_events.append(history_event)
+
+        for (etype, state_key), content in initial_state.items():
+            returned_events.append(create(
+                etype=etype,
+                state_key=state_key,
+                content=content,
+            ))
+
+        return returned_events
 
 
 class RoomMemberHandler(BaseHandler):
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index bd8c603681..6cff6230c1 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -293,6 +293,51 @@ class SyncHandler(BaseHandler):
         ))
 
     @defer.inlineCallbacks
+    def _filter_events_for_client(self, user_id, room_id, events):
+        states = yield self.store.get_state_for_events(
+            room_id, [e.event_id for e in events],
+        )
+
+        events_and_states = zip(events, states)
+
+        def allowed(event_and_state):
+            event, state = event_and_state
+
+            if event.type == EventTypes.RoomHistoryVisibility:
+                return True
+
+            membership_ev = state.get((EventTypes.Member, user_id), None)
+            if membership_ev:
+                membership = membership_ev.membership
+            else:
+                membership = Membership.LEAVE
+
+            if membership == Membership.JOIN:
+                return True
+
+            history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
+            if history:
+                visibility = history.content.get("history_visibility", "shared")
+            else:
+                visibility = "shared"
+
+            if visibility == "public":
+                return True
+            elif visibility == "shared":
+                return True
+            elif visibility == "joined":
+                return membership == Membership.JOIN
+            elif visibility == "invited":
+                return membership == Membership.INVITE
+
+            return True
+        events_and_states = filter(allowed, events_and_states)
+        defer.returnValue([
+            ev
+            for ev, _ in events_and_states
+        ])
+
+    @defer.inlineCallbacks
     def load_filtered_recents(self, room_id, sync_config, now_token,
                               since_token=None):
         limited = True
@@ -313,6 +358,9 @@ class SyncHandler(BaseHandler):
             (room_key, _) = keys
             end_key = "s" + room_key.split('-')[-1]
             loaded_recents = sync_config.filter.filter_room_events(events)
+            loaded_recents = yield self._filter_events_for_client(
+                sync_config.user.to_string(), room_id, loaded_recents,
+            )
             loaded_recents.extend(recents)
             recents = loaded_recents
             if len(events) <= load_limit:
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index a9895292c2..026bd2b9d4 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -218,7 +218,7 @@ class TypingNotificationHandler(BaseHandler):
         self._room_serials[room_id] = self._latest_room_serial
 
         with PreserveLoggingContext():
-            self.notifier.on_new_user_event(
+            self.notifier.on_new_event(
                 "typing_key", self._latest_room_serial, rooms=[room_id]
             )