summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py107
-rw-r--r--synapse/handlers/events.py7
-rw-r--r--synapse/handlers/federation.py12
-rw-r--r--synapse/handlers/message.py57
-rw-r--r--synapse/handlers/register.py7
-rw-r--r--synapse/handlers/room.py104
-rw-r--r--synapse/handlers/sync.py109
7 files changed, 307 insertions, 96 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index b474042e84..2d1167296a 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -19,6 +19,7 @@ from synapse.api.errors import LimitExceededError, SynapseError, AuthError
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.api.constants import Membership, EventTypes
 from synapse.types import UserID, RoomAlias
+from synapse.push.action_generator import ActionGenerator
 
 from synapse.util.logcontext import PreserveLoggingContext
 
@@ -52,16 +53,54 @@ class BaseHandler(object):
         self.event_builder_factory = hs.get_event_builder_factory()
 
     @defer.inlineCallbacks
-    def _filter_events_for_client(self, user_id, events, is_guest=False):
-        # Assumes that user has at some point joined the room if not is_guest.
+    def _filter_events_for_clients(self, users, events):
+        """ Returns dict of user_id -> list of events that user is allowed to
+        see.
+        """
+        event_id_to_state = yield self.store.get_state_for_events(
+            frozenset(e.event_id for e in events),
+            types=(
+                (EventTypes.RoomHistoryVisibility, ""),
+                (EventTypes.Member, None),
+            )
+        )
+
+        forgotten = yield defer.gatherResults([
+            self.store.who_forgot_in_room(
+                room_id,
+            )
+            for room_id in frozenset(e.room_id for e in events)
+        ], consumeErrors=True)
+
+        # Set of membership event_ids that have been forgotten
+        event_id_forgotten = frozenset(
+            row["event_id"] for rows in forgotten for row in rows
+        )
+
+        def allowed(event, user_id, is_guest):
+            state = event_id_to_state[event.event_id]
+
+            visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
+            if visibility_event:
+                visibility = visibility_event.content.get("history_visibility", "shared")
+            else:
+                visibility = "shared"
 
-        def allowed(event, membership, visibility):
             if visibility == "world_readable":
                 return True
 
             if is_guest:
                 return False
 
+            membership_event = state.get((EventTypes.Member, user_id), None)
+            if membership_event:
+                if membership_event.event_id in event_id_forgotten:
+                    membership = None
+                else:
+                    membership = membership_event.membership
+            else:
+                membership = None
+
             if membership == Membership.JOIN:
                 return True
 
@@ -77,43 +116,20 @@ class BaseHandler(object):
 
             return True
 
-        event_id_to_state = yield self.store.get_state_for_events(
-            frozenset(e.event_id for e in events),
-            types=(
-                (EventTypes.RoomHistoryVisibility, ""),
-                (EventTypes.Member, user_id),
-            )
-        )
-
-        events_to_return = []
-        for event in events:
-            state = event_id_to_state[event.event_id]
-
-            membership_event = state.get((EventTypes.Member, user_id), None)
-            if membership_event:
-                was_forgotten_at_event = yield self.store.was_forgotten_at(
-                    membership_event.state_key,
-                    membership_event.room_id,
-                    membership_event.event_id
-                )
-                if was_forgotten_at_event:
-                    membership = None
-                else:
-                    membership = membership_event.membership
-            else:
-                membership = None
-
-            visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
-            if visibility_event:
-                visibility = visibility_event.content.get("history_visibility", "shared")
-            else:
-                visibility = "shared"
-
-            should_include = allowed(event, membership, visibility)
-            if should_include:
-                events_to_return.append(event)
+        defer.returnValue({
+            user_id: [
+                event
+                for event in events
+                if allowed(event, user_id, is_guest)
+            ]
+            for user_id, is_guest in users
+        })
 
-        defer.returnValue(events_to_return)
+    @defer.inlineCallbacks
+    def _filter_events_for_client(self, user_id, events, is_guest=False):
+        # Assumes that user has at some point joined the room if not is_guest.
+        res = yield self._filter_events_for_clients([(user_id, is_guest)], events)
+        defer.returnValue(res.get(user_id, []))
 
     def ratelimit(self, user_id):
         time_now = self.clock.time()
@@ -170,12 +186,10 @@ class BaseHandler(object):
         )
 
     @defer.inlineCallbacks
-    def handle_new_client_event(self, event, context, extra_destinations=[],
-                                extra_users=[], suppress_auth=False):
+    def handle_new_client_event(self, event, context, extra_users=[]):
         # We now need to go and hit out to wherever we need to hit out to.
 
-        if not suppress_auth:
-            self.auth.check(event, auth_events=context.current_state)
+        self.auth.check(event, auth_events=context.current_state)
 
         yield self.maybe_kick_guest_users(event, context.current_state.values())
 
@@ -252,7 +266,12 @@ class BaseHandler(object):
             event, context=context
         )
 
-        destinations = set(extra_destinations)
+        action_generator = ActionGenerator(self.store)
+        yield action_generator.handle_push_actions_for_event(
+            event, self
+        )
+
+        destinations = set()
         for k, s in context.current_state.items():
             try:
                 if k[0] == EventTypes.Member:
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 28c674730e..c73eec2b91 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -36,10 +36,6 @@ def stopped_user_eventstream(distributor, user):
     return distributor.fire("stopped_user_eventstream", user)
 
 
-def user_joined_room(distributor, user, room_id):
-    return distributor.fire("user_joined_room", user, room_id)
-
-
 class EventStreamHandler(BaseHandler):
 
     def __init__(self, hs):
@@ -136,9 +132,6 @@ class EventStreamHandler(BaseHandler):
                 # thundering herds on restart.
                 timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
 
-            if is_guest:
-                yield user_joined_room(self.distributor, auth_user, room_id)
-
             events, tokens = yield self.notifier.get_events_for(
                 auth_user, pagin_config, timeout,
                 only_room_events=only_room_events,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c1704afc53..4b94940e99 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -36,6 +36,8 @@ from synapse.events.utils import prune_event
 
 from synapse.util.retryutils import NotRetryingDestination
 
+from synapse.push.action_generator import ActionGenerator
+
 from twisted.internet import defer
 
 import itertools
@@ -242,6 +244,12 @@ class FederationHandler(BaseHandler):
                     user = UserID.from_string(event.state_key)
                     yield user_joined_room(self.distributor, user, event.room_id)
 
+        if not backfilled and not event.internal_metadata.is_outlier():
+            action_generator = ActionGenerator(self.store)
+            yield action_generator.handle_push_actions_for_event(
+                event, self
+            )
+
     @defer.inlineCallbacks
     def _filter_events_for_server(self, server_name, room_id, events):
         event_to_state = yield self.store.get_state_for_events(
@@ -1684,7 +1692,7 @@ class FederationHandler(BaseHandler):
             self.auth.check(event, context.current_state)
             yield self._validate_keyserver(event, auth_events=context.current_state)
             member_handler = self.hs.get_handlers().room_member_handler
-            yield member_handler.change_membership(event, context)
+            yield member_handler.send_membership_event(event, context)
         else:
             destinations = set([x.split(":", 1)[-1] for x in (sender, room_id)])
             yield self.replication_layer.forward_third_party_invite(
@@ -1713,7 +1721,7 @@ class FederationHandler(BaseHandler):
         # TODO: Make sure the signatures actually are correct.
         event.signatures.update(returned_invite.signatures)
         member_handler = self.hs.get_handlers().room_member_handler
-        yield member_handler.change_membership(event, context)
+        yield member_handler.send_membership_event(event, context)
 
     @defer.inlineCallbacks
     def add_display_name_to_third_party_invite(self, event_dict, event, context):
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 5805190ce8..4c7bf2bef3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -174,30 +174,25 @@ class MessageHandler(BaseHandler):
         defer.returnValue(chunk)
 
     @defer.inlineCallbacks
-    def create_and_send_event(self, event_dict, ratelimit=True,
-                              token_id=None, txn_id=None, is_guest=False):
-        """ Given a dict from a client, create and handle a new event.
+    def create_event(self, event_dict, token_id=None, txn_id=None):
+        """
+        Given a dict from a client, create a new event.
 
         Creates an FrozenEvent object, filling out auth_events, prev_events,
         etc.
 
         Adds display names to Join membership events.
 
-        Persists and notifies local clients and federation.
-
         Args:
             event_dict (dict): An entire event
+
+        Returns:
+            Tuple of created event (FrozenEvent), Context
         """
         builder = self.event_builder_factory.new(event_dict)
 
         self.validator.validate_new(builder)
 
-        if ratelimit:
-            self.ratelimit(builder.user_id)
-        # TODO(paul): Why does 'event' not have a 'user' object?
-        user = UserID.from_string(builder.user_id)
-        assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
-
         if builder.type == EventTypes.Member:
             membership = builder.content.get("membership", None)
             if membership == Membership.JOIN:
@@ -216,6 +211,25 @@ class MessageHandler(BaseHandler):
         event, context = yield self._create_new_client_event(
             builder=builder,
         )
+        defer.returnValue((event, context))
+
+    @defer.inlineCallbacks
+    def send_event(self, event, context, ratelimit=True, is_guest=False):
+        """
+        Persists and notifies local clients and federation of an event.
+
+        Args:
+            event (FrozenEvent) the event to send.
+            context (Context) the context of the event.
+            ratelimit (bool): Whether to rate limit this send.
+            is_guest (bool): Whether the sender is a guest.
+        """
+        user = UserID.from_string(event.sender)
+
+        assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
+
+        if ratelimit:
+            self.ratelimit(event.sender)
 
         if event.is_state():
             prev_state = context.current_state.get((event.type, event.state_key))
@@ -229,7 +243,7 @@ class MessageHandler(BaseHandler):
 
         if event.type == EventTypes.Member:
             member_handler = self.hs.get_handlers().room_member_handler
-            yield member_handler.change_membership(event, context, is_guest=is_guest)
+            yield member_handler.send_membership_event(event, context, is_guest=is_guest)
         else:
             yield self.handle_new_client_event(
                 event=event,
@@ -241,6 +255,25 @@ class MessageHandler(BaseHandler):
             with PreserveLoggingContext():
                 presence.bump_presence_active_time(user)
 
+    @defer.inlineCallbacks
+    def create_and_send_event(self, event_dict, ratelimit=True,
+                              token_id=None, txn_id=None, is_guest=False):
+        """
+        Creates an event, then sends it.
+
+        See self.create_event and self.send_event.
+        """
+        event, context = yield self.create_event(
+            event_dict,
+            token_id=token_id,
+            txn_id=txn_id
+        )
+        yield self.send_event(
+            event,
+            context,
+            ratelimit=ratelimit,
+            is_guest=is_guest
+        )
         defer.returnValue(event)
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 6f111ff63e..8e601b052b 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -53,7 +53,8 @@ class RegistrationHandler(BaseHandler):
             raise SynapseError(
                 400,
                 "User ID must only contain characters which do not"
-                " require URL encoding."
+                " require URL encoding.",
+                Codes.INVALID_USERNAME
             )
 
         user = UserID(localpart, self.hs.hostname)
@@ -84,7 +85,8 @@ class RegistrationHandler(BaseHandler):
         localpart=None,
         password=None,
         generate_token=True,
-        guest_access_token=None
+        guest_access_token=None,
+        make_guest=False
     ):
         """Registers a new client on the server.
 
@@ -118,6 +120,7 @@ class RegistrationHandler(BaseHandler):
                 token=token,
                 password_hash=password_hash,
                 was_guest=guest_access_token is not None,
+                make_guest=make_guest,
             )
 
             yield registered_user(self.distributor, user)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 48a07e4e35..a1baf9d200 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -22,7 +22,7 @@ from synapse.types import UserID, RoomAlias, RoomID
 from synapse.api.constants import (
     EventTypes, Membership, JoinRules, RoomCreationPreset,
 )
-from synapse.api.errors import AuthError, StoreError, SynapseError
+from synapse.api.errors import AuthError, StoreError, SynapseError, Codes
 from synapse.util import stringutils, unwrapFirstError
 from synapse.util.async import run_on_reactor
 
@@ -397,7 +397,58 @@ class RoomMemberHandler(BaseHandler):
                     remotedomains.add(member.domain)
 
     @defer.inlineCallbacks
-    def change_membership(self, event, context, do_auth=True, is_guest=False):
+    def update_membership(self, requester, target, room_id, action, txn_id=None):
+        effective_membership_state = action
+        if action in ["kick", "unban"]:
+            effective_membership_state = "leave"
+        elif action == "forget":
+            effective_membership_state = "leave"
+
+        msg_handler = self.hs.get_handlers().message_handler
+
+        content = {"membership": unicode(effective_membership_state)}
+        if requester.is_guest:
+            content["kind"] = "guest"
+
+        event, context = yield msg_handler.create_event(
+            {
+                "type": EventTypes.Member,
+                "content": content,
+                "room_id": room_id,
+                "sender": requester.user.to_string(),
+                "state_key": target.to_string(),
+            },
+            token_id=requester.access_token_id,
+            txn_id=txn_id,
+        )
+
+        old_state = context.current_state.get((EventTypes.Member, event.state_key))
+        old_membership = old_state.content.get("membership") if old_state else None
+        if action == "unban" and old_membership != "ban":
+            raise SynapseError(
+                403,
+                "Cannot unban user who was not banned (membership=%s)" % old_membership,
+                errcode=Codes.BAD_STATE
+            )
+        if old_membership == "ban" and action != "unban":
+            raise SynapseError(
+                403,
+                "Cannot %s user who was is banned" % (action,),
+                errcode=Codes.BAD_STATE
+            )
+
+        yield msg_handler.send_event(
+            event,
+            context,
+            ratelimit=True,
+            is_guest=requester.is_guest
+        )
+
+        if action == "forget":
+            yield self.forget(requester.user, room_id)
+
+    @defer.inlineCallbacks
+    def send_membership_event(self, event, context, is_guest=False):
         """ Change the membership status of a user in a room.
 
         Args:
@@ -432,7 +483,7 @@ class RoomMemberHandler(BaseHandler):
                 if not is_guest_access_allowed:
                     raise AuthError(403, "Guest access not allowed")
 
-            yield self._do_join(event, context, do_auth=do_auth)
+            yield self._do_join(event, context)
         else:
             if event.membership == Membership.LEAVE:
                 is_host_in_room = yield self.is_host_in_room(room_id, context)
@@ -459,9 +510,7 @@ class RoomMemberHandler(BaseHandler):
 
             yield self._do_local_membership_update(
                 event,
-                membership=event.content["membership"],
                 context=context,
-                do_auth=do_auth,
             )
 
             if prev_state and prev_state.membership == Membership.JOIN:
@@ -497,12 +546,12 @@ class RoomMemberHandler(BaseHandler):
         })
         event, context = yield self._create_new_client_event(builder)
 
-        yield self._do_join(event, context, room_hosts=hosts, do_auth=True)
+        yield self._do_join(event, context, room_hosts=hosts)
 
         defer.returnValue({"room_id": room_id})
 
     @defer.inlineCallbacks
-    def _do_join(self, event, context, room_hosts=None, do_auth=True):
+    def _do_join(self, event, context, room_hosts=None):
         room_id = event.room_id
 
         # XXX: We don't do an auth check if we are doing an invite
@@ -536,9 +585,7 @@ class RoomMemberHandler(BaseHandler):
 
             yield self._do_local_membership_update(
                 event,
-                membership=event.content["membership"],
                 context=context,
-                do_auth=do_auth,
             )
 
         prev_state = context.current_state.get((event.type, event.state_key))
@@ -603,8 +650,7 @@ class RoomMemberHandler(BaseHandler):
         defer.returnValue(room_ids)
 
     @defer.inlineCallbacks
-    def _do_local_membership_update(self, event, membership, context,
-                                    do_auth):
+    def _do_local_membership_update(self, event, context):
         yield run_on_reactor()
 
         target_user = UserID.from_string(event.state_key)
@@ -613,7 +659,6 @@ class RoomMemberHandler(BaseHandler):
             event,
             context,
             extra_users=[target_user],
-            suppress_auth=(not do_auth),
         )
 
     @defer.inlineCallbacks
@@ -880,28 +925,39 @@ class RoomContextHandler(BaseHandler):
                 (excluding state).
 
         Returns:
-            dict
+            dict, or None if the event isn't found
         """
         before_limit = math.floor(limit/2.)
         after_limit = limit - before_limit
 
         now_token = yield self.hs.get_event_sources().get_current_token()
 
+        def filter_evts(events):
+            return self._filter_events_for_client(
+                user.to_string(),
+                events,
+                is_guest=is_guest)
+
+        event = yield self.store.get_event(event_id, get_prev_content=True,
+                                           allow_none=True)
+        if not event:
+            defer.returnValue(None)
+            return
+
+        filtered = yield(filter_evts([event]))
+        if not filtered:
+            raise AuthError(
+                403,
+                "You don't have permission to access that event."
+            )
+
         results = yield self.store.get_events_around(
             room_id, event_id, before_limit, after_limit
         )
 
-        results["events_before"] = yield self._filter_events_for_client(
-            user.to_string(),
-            results["events_before"],
-            is_guest=is_guest,
-        )
-
-        results["events_after"] = yield self._filter_events_for_client(
-            user.to_string(),
-            results["events_after"],
-            is_guest=is_guest,
-        )
+        results["events_before"] = yield filter_evts(results["events_before"])
+        results["events_after"] = yield filter_evts(results["events_after"])
+        results["event"] = event
 
         if results["events_after"]:
             last_event_id = results["events_after"][-1].event_id
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 3bc18b4338..d2864977b0 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -54,6 +54,8 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
     "state",             # dict[(str, str), FrozenEvent]
     "ephemeral",
     "account_data",
+    "unread_notification_count",
+    "unread_highlight_count",
 ])):
     __slots__ = []
 
@@ -66,6 +68,8 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
             or self.state
             or self.ephemeral
             or self.account_data
+            # nb the notification count does not, er, count: if there's nothing
+            # else in the result, we don't need to send it.
         )
 
 
@@ -163,6 +167,18 @@ class SyncHandler(BaseHandler):
         else:
             return self.incremental_sync_with_gap(sync_config, since_token)
 
+    def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room):
+        if room_id not in ephemeral_by_room:
+            return None
+        for e in ephemeral_by_room[room_id]:
+            if e['type'] != 'm.receipt':
+                continue
+            for receipt_event_id, val in e['content'].items():
+                if 'm.read' in val:
+                    if user_id in val['m.read']:
+                        return receipt_event_id
+        return None
+
     @defer.inlineCallbacks
     def full_state_sync(self, sync_config, timeline_since_token):
         """Get a sync for a client which is starting without any state.
@@ -274,6 +290,18 @@ class SyncHandler(BaseHandler):
             room_id, sync_config, now_token, since_token=timeline_since_token
         )
 
+        notifs = yield self.unread_notifs_for_room_id(
+            room_id, sync_config, ephemeral_by_room
+        )
+
+        notif_count = None
+        highlight_count = None
+        if notifs is not None:
+            notif_count = len(notifs)
+            highlight_count = len([
+                1 for notif in notifs if _action_has_highlight(notif["actions"])
+            ])
+
         current_state = yield self.get_state_at(room_id, now_token)
 
         defer.returnValue(JoinedSyncResult(
@@ -284,6 +312,8 @@ class SyncHandler(BaseHandler):
             account_data=self.account_data_for_room(
                 room_id, tags_by_room, account_data_by_room
             ),
+            unread_notification_count=notif_count,
+            unread_highlight_count=highlight_count,
         ))
 
     def account_data_for_user(self, account_data):
@@ -423,6 +453,13 @@ class SyncHandler(BaseHandler):
         )
         now_token = now_token.copy_and_replace("presence_key", presence_key)
 
+        # We now fetch all ephemeral events for this room in order to get
+        # this users current read receipt. This could almost certainly be
+        # optimised.
+        _, all_ephemeral_by_room = yield self.ephemeral_by_room(
+            sync_config, now_token
+        )
+
         now_token, ephemeral_by_room = yield self.ephemeral_by_room(
             sync_config, now_token, since_token
         )
@@ -496,6 +533,18 @@ class SyncHandler(BaseHandler):
                 else:
                     prev_batch = now_token
 
+                notifs = yield self.unread_notifs_for_room_id(
+                    room_id, sync_config, all_ephemeral_by_room
+                )
+
+                notif_count = None
+                highlight_count = None
+                if notifs is not None:
+                    notif_count = len(notifs)
+                    highlight_count = len([
+                        1 for notif in notifs if _action_has_highlight(notif["actions"])
+                    ])
+
                 just_joined = yield self.check_joined_room(sync_config, state)
                 if just_joined:
                     logger.debug("User has just joined %s: needs full state",
@@ -516,6 +565,8 @@ class SyncHandler(BaseHandler):
                     account_data=self.account_data_for_room(
                         room_id, tags_by_room, account_data_by_room
                     ),
+                    unread_notification_count=notif_count,
+                    unread_highlight_count=highlight_count,
                 )
                 logger.debug("Result for room %s: %r", room_id, room_sync)
 
@@ -537,7 +588,8 @@ class SyncHandler(BaseHandler):
             for room_id in joined_room_ids:
                 room_sync = yield self.incremental_sync_with_gap_for_room(
                     room_id, sync_config, since_token, now_token,
-                    ephemeral_by_room, tags_by_room, account_data_by_room
+                    ephemeral_by_room, tags_by_room, account_data_by_room,
+                    all_ephemeral_by_room=all_ephemeral_by_room,
                 )
                 if room_sync:
                     joined.append(room_sync)
@@ -547,7 +599,8 @@ class SyncHandler(BaseHandler):
                 sync_config, leave_event, since_token, tags_by_room,
                 account_data_by_room
             )
-            archived.append(room_sync)
+            if room_sync:
+                archived.append(room_sync)
 
         invited = [
             InvitedSyncResult(room_id=event.room_id, invite=event)
@@ -616,7 +669,8 @@ class SyncHandler(BaseHandler):
     def incremental_sync_with_gap_for_room(self, room_id, sync_config,
                                            since_token, now_token,
                                            ephemeral_by_room, tags_by_room,
-                                           account_data_by_room):
+                                           account_data_by_room,
+                                           all_ephemeral_by_room):
         """ Get the incremental delta needed to bring the client up to date for
         the room. Gives the client the most recent events and the changes to
         state.
@@ -632,7 +686,7 @@ class SyncHandler(BaseHandler):
             room_id, sync_config, now_token, since_token,
         )
 
-        logging.debug("Recents %r", batch)
+        logger.debug("Recents %r", batch)
 
         current_state = yield self.get_state_at(room_id, now_token)
 
@@ -650,6 +704,18 @@ class SyncHandler(BaseHandler):
         if just_joined:
             state = yield self.get_state_at(room_id, now_token)
 
+        notifs = yield self.unread_notifs_for_room_id(
+            room_id, sync_config, all_ephemeral_by_room
+        )
+
+        notif_count = None
+        highlight_count = None
+        if notifs is not None:
+            notif_count = len(notifs)
+            highlight_count = len([
+                1 for notif in notifs if _action_has_highlight(notif["actions"])
+            ])
+
         room_sync = JoinedSyncResult(
             room_id=room_id,
             timeline=batch,
@@ -658,6 +724,8 @@ class SyncHandler(BaseHandler):
             account_data=self.account_data_for_room(
                 room_id, tags_by_room, account_data_by_room
             ),
+            unread_notification_count=notif_count,
+            unread_highlight_count=highlight_count,
         )
 
         logger.debug("Room sync: %r", room_sync)
@@ -680,11 +748,14 @@ class SyncHandler(BaseHandler):
 
         leave_token = since_token.copy_and_replace("room_key", stream_token)
 
+        if since_token.is_after(leave_token):
+            defer.returnValue(None)
+
         batch = yield self.load_filtered_recents(
             leave_event.room_id, sync_config, leave_token, since_token,
         )
 
-        logging.debug("Recents %r", batch)
+        logger.debug("Recents %r", batch)
 
         state_events_at_leave = yield self.store.get_state_for_event(
             leave_event.event_id
@@ -788,3 +859,31 @@ class SyncHandler(BaseHandler):
             if join_event.content["membership"] == Membership.JOIN:
                 return True
         return False
+
+    @defer.inlineCallbacks
+    def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room):
+        last_unread_event_id = self.last_read_event_id_for_room_and_user(
+            room_id, sync_config.user.to_string(), ephemeral_by_room
+        )
+
+        notifs = []
+        if last_unread_event_id:
+            notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
+                room_id, sync_config.user.to_string(), last_unread_event_id
+            )
+            defer.returnValue(notifs)
+
+        # There is no new information in this period, so your notification
+        # count is whatever it was last time.
+        defer.returnValue(None)
+
+
+def _action_has_highlight(actions):
+    for action in actions:
+        try:
+            if action.get("set_tweak", None) == "highlight":
+                return action.get("value", True)
+        except AttributeError:
+            pass
+
+    return False