summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py51
-rw-r--r--synapse/handlers/directory.py4
-rw-r--r--synapse/handlers/events.py13
-rw-r--r--synapse/handlers/federation.py62
-rw-r--r--synapse/handlers/identity.py26
-rw-r--r--synapse/handlers/presence.py22
-rw-r--r--synapse/handlers/register.py63
-rw-r--r--synapse/handlers/room.py139
-rw-r--r--synapse/handlers/sync.py508
-rw-r--r--synapse/handlers/typing.py23
10 files changed, 499 insertions, 412 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 744a9ee507..064e8723c8 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -53,25 +53,10 @@ class BaseHandler(object):
         self.event_builder_factory = hs.get_event_builder_factory()
 
     @defer.inlineCallbacks
-    def _filter_events_for_clients(self, user_tuples, events):
+    def _filter_events_for_clients(self, user_tuples, events, event_id_to_state):
         """ Returns dict of user_id -> list of events that user is allowed to
         see.
         """
-        # If there is only one user, just get the state for that one user,
-        # otherwise just get all the state.
-        if len(user_tuples) == 1:
-            types = (
-                (EventTypes.RoomHistoryVisibility, ""),
-                (EventTypes.Member, user_tuples[0][0]),
-            )
-        else:
-            types = None
-
-        event_id_to_state = yield self.store.get_state_for_events(
-            frozenset(e.event_id for e in events),
-            types=types
-        )
-
         forgotten = yield defer.gatherResults([
             self.store.who_forgot_in_room(
                 room_id,
@@ -135,7 +120,17 @@ class BaseHandler(object):
     @defer.inlineCallbacks
     def _filter_events_for_client(self, user_id, events, is_peeking=False):
         # Assumes that user has at some point joined the room if not is_guest.
-        res = yield self._filter_events_for_clients([(user_id, is_peeking)], events)
+        types = (
+            (EventTypes.RoomHistoryVisibility, ""),
+            (EventTypes.Member, user_id),
+        )
+        event_id_to_state = yield self.store.get_state_for_events(
+            frozenset(e.event_id for e in events),
+            types=types
+        )
+        res = yield self._filter_events_for_clients(
+            [(user_id, is_peeking)], events, event_id_to_state
+        )
         defer.returnValue(res.get(user_id, []))
 
     def ratelimit(self, user_id):
@@ -147,7 +142,7 @@ class BaseHandler(object):
         )
         if not allowed:
             raise LimitExceededError(
-                retry_after_ms=int(1000*(time_allowed - time_now)),
+                retry_after_ms=int(1000 * (time_allowed - time_now)),
             )
 
     @defer.inlineCallbacks
@@ -269,13 +264,13 @@ class BaseHandler(object):
                         "You don't have permission to redact events"
                     )
 
-        (event_stream_id, max_stream_id) = yield self.store.persist_event(
-            event, context=context
-        )
-
         action_generator = ActionGenerator(self.hs)
         yield action_generator.handle_push_actions_for_event(
-            event, self
+            event, context, self
+        )
+
+        (event_stream_id, max_stream_id) = yield self.store.persist_event(
+            event, context=context
         )
 
         destinations = set()
@@ -293,19 +288,11 @@ class BaseHandler(object):
 
         with PreserveLoggingContext():
             # Don't block waiting on waking up all the listeners.
-            notify_d = self.notifier.on_new_room_event(
+            self.notifier.on_new_room_event(
                 event, event_stream_id, max_stream_id,
                 extra_users=extra_users
             )
 
-        def log_failure(f):
-            logger.warn(
-                "Failed to notify about %s: %s",
-                event.event_id, f.value
-            )
-
-        notify_d.addErrback(log_failure)
-
         # If invite, remove room_state from unsigned before sending.
         event.unsigned.pop("invite_room_state", None)
 
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 691564c651..4efecb1ffd 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -175,8 +175,8 @@ class DirectoryHandler(BaseHandler):
         # If this server is in the list of servers, return it first.
         if self.server_name in servers:
             servers = (
-                [self.server_name]
-                + [s for s in servers if s != self.server_name]
+                [self.server_name] +
+                [s for s in servers if s != self.server_name]
             )
         else:
             servers = list(servers)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 254b483da6..4933c31c19 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from synapse.util.logutils import log_function
 from synapse.types import UserID
 from synapse.events.utils import serialize_event
+from synapse.util.logcontext import preserve_context_over_fn
 
 from ._base import BaseHandler
 
@@ -29,11 +30,17 @@ logger = logging.getLogger(__name__)
 
 
 def started_user_eventstream(distributor, user):
-    return distributor.fire("started_user_eventstream", user)
+    return preserve_context_over_fn(
+        distributor.fire,
+        "started_user_eventstream", user
+    )
 
 
 def stopped_user_eventstream(distributor, user):
-    return distributor.fire("stopped_user_eventstream", user)
+    return preserve_context_over_fn(
+        distributor.fire,
+        "stopped_user_eventstream", user
+    )
 
 
 class EventStreamHandler(BaseHandler):
@@ -130,7 +137,7 @@ class EventStreamHandler(BaseHandler):
 
                 # Add some randomness to this value to try and mitigate against
                 # thundering herds on restart.
-                timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
+                timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))
 
             events, tokens = yield self.notifier.get_events_for(
                 auth_user, pagin_config, timeout,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2ce1e9d6c7..da55d43541 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -221,19 +221,11 @@ class FederationHandler(BaseHandler):
                 extra_users.append(target_user)
 
             with PreserveLoggingContext():
-                d = self.notifier.on_new_room_event(
+                self.notifier.on_new_room_event(
                     event, event_stream_id, max_stream_id,
                     extra_users=extra_users
                 )
 
-            def log_failure(f):
-                logger.warn(
-                    "Failed to notify about %s: %s",
-                    event.event_id, f.value
-                )
-
-            d.addErrback(log_failure)
-
         if event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
                 prev_state = context.current_state.get((event.type, event.state_key))
@@ -244,12 +236,6 @@ class FederationHandler(BaseHandler):
                     user = UserID.from_string(event.state_key)
                     yield user_joined_room(self.distributor, user, event.room_id)
 
-        if not backfilled and not event.internal_metadata.is_outlier():
-            action_generator = ActionGenerator(self.hs)
-            yield action_generator.handle_push_actions_for_event(
-                event, self
-            )
-
     @defer.inlineCallbacks
     def _filter_events_for_server(self, server_name, room_id, events):
         event_to_state = yield self.store.get_state_for_events(
@@ -643,19 +629,11 @@ class FederationHandler(BaseHandler):
             )
 
             with PreserveLoggingContext():
-                d = self.notifier.on_new_room_event(
+                self.notifier.on_new_room_event(
                     event, event_stream_id, max_stream_id,
                     extra_users=[joinee]
                 )
 
-            def log_failure(f):
-                logger.warn(
-                    "Failed to notify about %s: %s",
-                    event.event_id, f.value
-                )
-
-            d.addErrback(log_failure)
-
             logger.debug("Finished joining %s to %s", joinee, room_id)
         finally:
             room_queue = self.room_queues[room_id]
@@ -730,18 +708,10 @@ class FederationHandler(BaseHandler):
             extra_users.append(target_user)
 
         with PreserveLoggingContext():
-            d = self.notifier.on_new_room_event(
+            self.notifier.on_new_room_event(
                 event, event_stream_id, max_stream_id, extra_users=extra_users
             )
 
-        def log_failure(f):
-            logger.warn(
-                "Failed to notify about %s: %s",
-                event.event_id, f.value
-            )
-
-        d.addErrback(log_failure)
-
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.JOIN:
                 user = UserID.from_string(event.state_key)
@@ -811,19 +781,11 @@ class FederationHandler(BaseHandler):
 
         target_user = UserID.from_string(event.state_key)
         with PreserveLoggingContext():
-            d = self.notifier.on_new_room_event(
+            self.notifier.on_new_room_event(
                 event, event_stream_id, max_stream_id,
                 extra_users=[target_user],
             )
 
-        def log_failure(f):
-            logger.warn(
-                "Failed to notify about %s: %s",
-                event.event_id, f.value
-            )
-
-        d.addErrback(log_failure)
-
         defer.returnValue(event)
 
     @defer.inlineCallbacks
@@ -948,18 +910,10 @@ class FederationHandler(BaseHandler):
             extra_users.append(target_user)
 
         with PreserveLoggingContext():
-            d = self.notifier.on_new_room_event(
+            self.notifier.on_new_room_event(
                 event, event_stream_id, max_stream_id, extra_users=extra_users
             )
 
-        def log_failure(f):
-            logger.warn(
-                "Failed to notify about %s: %s",
-                event.event_id, f.value
-            )
-
-        d.addErrback(log_failure)
-
         new_pdu = event
 
         destinations = set()
@@ -1113,6 +1067,12 @@ class FederationHandler(BaseHandler):
             auth_events=auth_events,
         )
 
+        if not backfilled and not event.internal_metadata.is_outlier():
+            action_generator = ActionGenerator(self.hs)
+            yield action_generator.handle_push_actions_for_event(
+                event, context, self
+            )
+
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event,
             context=context,
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 819ec57c4f..656ce124f9 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -36,14 +36,15 @@ class IdentityHandler(BaseHandler):
 
         self.http_client = hs.get_simple_http_client()
 
+        self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers)
+        self.trust_any_id_server_just_for_testing_do_not_use = (
+            hs.config.use_insecure_ssl_client_just_for_testing_do_not_use
+        )
+
     @defer.inlineCallbacks
     def threepid_from_creds(self, creds):
         yield run_on_reactor()
 
-        # XXX: make this configurable!
-        # trustedIdServers = ['matrix.org', 'localhost:8090']
-        trustedIdServers = ['matrix.org', 'vector.im']
-
         if 'id_server' in creds:
             id_server = creds['id_server']
         elif 'idServer' in creds:
@@ -58,10 +59,19 @@ class IdentityHandler(BaseHandler):
         else:
             raise SynapseError(400, "No client_secret in creds")
 
-        if id_server not in trustedIdServers:
-            logger.warn('%s is not a trusted ID server: rejecting 3pid ' +
-                        'credentials', id_server)
-            defer.returnValue(None)
+        if id_server not in self.trusted_id_servers:
+            if self.trust_any_id_server_just_for_testing_do_not_use:
+                logger.warn(
+                    "Trusting untrustworthy ID server %r even though it isn't"
+                    " in the trusted id list for testing because"
+                    " 'use_insecure_ssl_client_just_for_testing_do_not_use'"
+                    " is set in the config",
+                    id_server,
+                )
+            else:
+                logger.warn('%s is not a trusted ID server: rejecting 3pid ' +
+                            'credentials', id_server)
+                defer.returnValue(None)
 
         data = {}
         try:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index d36eb3b8d7..b61394f2b5 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -34,7 +34,7 @@ metrics = synapse.metrics.get_metrics_for(__name__)
 
 
 # Don't bother bumping "last active" time if it differs by less than 60 seconds
-LAST_ACTIVE_GRANULARITY = 60*1000
+LAST_ACTIVE_GRANULARITY = 60 * 1000
 
 # Keep no more than this number of offline serial revisions
 MAX_OFFLINE_SERIALS = 1000
@@ -378,9 +378,9 @@ class PresenceHandler(BaseHandler):
         was_polling = target_user in self._user_cachemap
 
         if now_online and not was_polling:
-            self.start_polling_presence(target_user, state=state)
+            yield self.start_polling_presence(target_user, state=state)
         elif not now_online and was_polling:
-            self.stop_polling_presence(target_user)
+            yield self.stop_polling_presence(target_user)
 
         # TODO(paul): perform a presence push as part of start/stop poll so
         #   we don't have to do this all the time
@@ -394,7 +394,8 @@ class PresenceHandler(BaseHandler):
         if now - prev_state.state.get("last_active", 0) < LAST_ACTIVE_GRANULARITY:
             return
 
-        self.changed_presencelike_data(user, {"last_active": now})
+        with PreserveLoggingContext():
+            self.changed_presencelike_data(user, {"last_active": now})
 
     def get_joined_rooms_for_user(self, user):
         """Get the list of rooms a user is joined to.
@@ -466,11 +467,12 @@ class PresenceHandler(BaseHandler):
                 local_user, room_ids=[room_id], add_to_cache=False
             )
 
-            self.push_update_to_local_and_remote(
-                observed_user=local_user,
-                users_to_push=[user],
-                statuscache=statuscache,
-            )
+            with PreserveLoggingContext():
+                self.push_update_to_local_and_remote(
+                    observed_user=local_user,
+                    users_to_push=[user],
+                    statuscache=statuscache,
+                )
 
     @defer.inlineCallbacks
     def send_presence_invite(self, observer_user, observed_user):
@@ -556,7 +558,7 @@ class PresenceHandler(BaseHandler):
             observer_user.localpart, observed_user.to_string()
         )
 
-        self.start_polling_presence(
+        yield self.start_polling_presence(
             observer_user, target_user=observed_user
         )
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index c11b98d0b7..24c850ae9b 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -21,7 +21,6 @@ from synapse.api.errors import (
     AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError
 )
 from ._base import BaseHandler
-import synapse.util.stringutils as stringutils
 from synapse.util.async import run_on_reactor
 from synapse.http.client import CaptchaServerHttpClient
 
@@ -45,6 +44,8 @@ class RegistrationHandler(BaseHandler):
         self.distributor.declare("registered_user")
         self.captcha_client = CaptchaServerHttpClient(hs)
 
+        self._next_generated_user_id = None
+
     @defer.inlineCallbacks
     def check_username(self, localpart, guest_access_token=None):
         yield run_on_reactor()
@@ -91,7 +92,7 @@ class RegistrationHandler(BaseHandler):
 
         Args:
             localpart : The local part of the user ID to register. If None,
-              one will be randomly generated.
+              one will be generated.
             password (str) : The password to assign to this user so they can
             login again. This can be None which means they cannot login again
             via a password (e.g. the user is an application service user).
@@ -108,6 +109,18 @@ class RegistrationHandler(BaseHandler):
         if localpart:
             yield self.check_username(localpart, guest_access_token=guest_access_token)
 
+            was_guest = guest_access_token is not None
+
+            if not was_guest:
+                try:
+                    int(localpart)
+                    raise RegistrationError(
+                        400,
+                        "Numeric user IDs are reserved for guest users."
+                    )
+                except ValueError:
+                    pass
+
             user = UserID(localpart, self.hs.hostname)
             user_id = user.to_string()
 
@@ -118,38 +131,36 @@ class RegistrationHandler(BaseHandler):
                 user_id=user_id,
                 token=token,
                 password_hash=password_hash,
-                was_guest=guest_access_token is not None,
+                was_guest=was_guest,
                 make_guest=make_guest,
             )
 
             yield registered_user(self.distributor, user)
         else:
-            # autogen a random user ID
+            # autogen a sequential user ID
             attempts = 0
-            user_id = None
             token = None
-            while not user_id:
+            user = None
+            while not user:
+                localpart = yield self._generate_user_id(attempts > 0)
+                user = UserID(localpart, self.hs.hostname)
+                user_id = user.to_string()
+                yield self.check_user_id_is_valid(user_id)
+                if generate_token:
+                    token = self.auth_handler().generate_access_token(user_id)
                 try:
-                    localpart = self._generate_user_id()
-                    user = UserID(localpart, self.hs.hostname)
-                    user_id = user.to_string()
-                    yield self.check_user_id_is_valid(user_id)
-                    if generate_token:
-                        token = self.auth_handler().generate_access_token(user_id)
                     yield self.store.register(
                         user_id=user_id,
                         token=token,
-                        password_hash=password_hash)
-
-                    yield registered_user(self.distributor, user)
+                        password_hash=password_hash,
+                        make_guest=make_guest
+                    )
                 except SynapseError:
                     # if user id is taken, just generate another
                     user_id = None
                     token = None
                     attempts += 1
-                    if attempts > 5:
-                        raise RegistrationError(
-                            500, "Cannot generate user ID.")
+            yield registered_user(self.distributor, user)
 
         # We used to generate default identicons here, but nowadays
         # we want clients to generate their own as part of their branding
@@ -175,7 +186,7 @@ class RegistrationHandler(BaseHandler):
             token=token,
             password_hash=""
         )
-        registered_user(self.distributor, user)
+        yield registered_user(self.distributor, user)
         defer.returnValue((user_id, token))
 
     @defer.inlineCallbacks
@@ -211,7 +222,7 @@ class RegistrationHandler(BaseHandler):
                 400,
                 "User ID must only contain characters which do not"
                 " require URL encoding."
-                )
+            )
         user = UserID(localpart, self.hs.hostname)
         user_id = user.to_string()
 
@@ -281,8 +292,16 @@ class RegistrationHandler(BaseHandler):
                     errcode=Codes.EXCLUSIVE
                 )
 
-    def _generate_user_id(self):
-        return "-" + stringutils.random_string(18)
+    @defer.inlineCallbacks
+    def _generate_user_id(self, reseed=False):
+        if reseed or self._next_generated_user_id is None:
+            self._next_generated_user_id = (
+                yield self.store.find_next_generated_user_id_localpart()
+            )
+
+        id = self._next_generated_user_id
+        self._next_generated_user_id += 1
+        defer.returnValue(str(id))
 
     @defer.inlineCallbacks
     def _validate_captcha(self, ip_addr, private_key, challenge, response):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 58e2d25f97..a8e3a9029c 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -18,13 +18,14 @@ from twisted.internet import defer
 
 from ._base import BaseHandler
 
-from synapse.types import UserID, RoomAlias, RoomID
+from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
 from synapse.api.constants import (
     EventTypes, Membership, JoinRules, RoomCreationPreset,
 )
 from synapse.api.errors import AuthError, StoreError, SynapseError, Codes
 from synapse.util import stringutils, unwrapFirstError
 from synapse.util.async import run_on_reactor
+from synapse.util.logcontext import preserve_context_over_fn
 
 from signedjson.sign import verify_signed_json
 from signedjson.key import decode_verify_key_bytes
@@ -46,11 +47,17 @@ def collect_presencelike_data(distributor, user, content):
 
 
 def user_left_room(distributor, user, room_id):
-    return distributor.fire("user_left_room", user=user, room_id=room_id)
+    return preserve_context_over_fn(
+        distributor.fire,
+        "user_left_room", user=user, room_id=room_id
+    )
 
 
 def user_joined_room(distributor, user, room_id):
-    return distributor.fire("user_joined_room", user=user, room_id=room_id)
+    return preserve_context_over_fn(
+        distributor.fire,
+        "user_joined_room", user=user, room_id=room_id
+    )
 
 
 class RoomCreationHandler(BaseHandler):
@@ -876,39 +883,71 @@ class RoomListHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_public_room_list(self):
-        chunk = yield self.store.get_rooms(is_public=True)
-
-        room_members = yield defer.gatherResults(
-            [
-                self.store.get_users_in_room(room["room_id"])
-                for room in chunk
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError)
-
-        avatar_urls = yield defer.gatherResults(
-            [
-                self.get_room_avatar_url(room["room_id"])
-                for room in chunk
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError)
-
-        for i, room in enumerate(chunk):
-            room["num_joined_members"] = len(room_members[i])
-            if avatar_urls[i]:
-                room["avatar_url"] = avatar_urls[i]
+        room_ids = yield self.store.get_public_room_ids()
+
+        @defer.inlineCallbacks
+        def handle_room(room_id):
+            aliases = yield self.store.get_aliases_for_room(room_id)
+            if not aliases:
+                defer.returnValue(None)
+
+            state = yield self.state_handler.get_current_state(room_id)
+
+            result = {"aliases": aliases, "room_id": room_id}
+
+            name_event = state.get((EventTypes.Name, ""), None)
+            if name_event:
+                name = name_event.content.get("name", None)
+                if name:
+                    result["name"] = name
+
+            topic_event = state.get((EventTypes.Topic, ""), None)
+            if topic_event:
+                topic = topic_event.content.get("topic", None)
+                if topic:
+                    result["topic"] = topic
+
+            canonical_event = state.get((EventTypes.CanonicalAlias, ""), None)
+            if canonical_event:
+                canonical_alias = canonical_event.content.get("alias", None)
+                if canonical_alias:
+                    result["canonical_alias"] = canonical_alias
+
+            visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
+            visibility = None
+            if visibility_event:
+                visibility = visibility_event.content.get("history_visibility", None)
+            result["world_readable"] = visibility == "world_readable"
+
+            guest_event = state.get((EventTypes.GuestAccess, ""), None)
+            guest = None
+            if guest_event:
+                guest = guest_event.content.get("guest_access", None)
+            result["guest_can_join"] = guest == "can_join"
+
+            avatar_event = state.get(("m.room.avatar", ""), None)
+            if avatar_event:
+                avatar_url = avatar_event.content.get("url", None)
+                if avatar_url:
+                    result["avatar_url"] = avatar_url
+
+            result["num_joined_members"] = sum(
+                1 for (event_type, _), ev in state.items()
+                if event_type == EventTypes.Member and ev.membership == Membership.JOIN
+            )
 
-        # FIXME (erikj): START is no longer a valid value
-        defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
+            defer.returnValue(result)
 
-    @defer.inlineCallbacks
-    def get_room_avatar_url(self, room_id):
-        event = yield self.hs.get_state_handler().get_current_state(
-            room_id, "m.room.avatar"
-        )
-        if event and "url" in event.content:
-            defer.returnValue(event.content["url"])
+        result = []
+        for chunk in (room_ids[i:i + 10] for i in xrange(0, len(room_ids), 10)):
+            chunk_result = yield defer.gatherResults([
+                handle_room(room_id)
+                for room_id in chunk
+            ], consumeErrors=True).addErrback(unwrapFirstError)
+            result.extend(v for v in chunk_result if v)
+
+        # FIXME (erikj): START is no longer a valid value
+        defer.returnValue({"start": "START", "end": "END", "chunk": result})
 
 
 class RoomContextHandler(BaseHandler):
@@ -927,7 +966,7 @@ class RoomContextHandler(BaseHandler):
         Returns:
             dict, or None if the event isn't found
         """
-        before_limit = math.floor(limit/2.)
+        before_limit = math.floor(limit / 2.)
         after_limit = limit - before_limit
 
         now_token = yield self.hs.get_event_sources().get_current_token()
@@ -997,6 +1036,11 @@ class RoomEventSource(object):
 
         to_key = yield self.get_current_key()
 
+        from_token = RoomStreamToken.parse(from_key)
+        if from_token.topological:
+            logger.warn("Stream has topological part!!!! %r", from_key)
+            from_key = "s%s" % (from_token.stream,)
+
         app_service = yield self.store.get_app_service_by_user_id(
             user.to_string()
         )
@@ -1008,15 +1052,30 @@ class RoomEventSource(object):
                 limit=limit,
             )
         else:
-            events, end_key = yield self.store.get_room_events_stream(
-                user_id=user.to_string(),
+            room_events = yield self.store.get_membership_changes_for_user(
+                user.to_string(), from_key, to_key
+            )
+
+            room_to_events = yield self.store.get_room_events_stream_for_rooms(
+                room_ids=room_ids,
                 from_key=from_key,
                 to_key=to_key,
-                limit=limit,
-                room_ids=room_ids,
-                is_guest=is_guest,
+                limit=limit or 10,
             )
 
+            events = list(room_events)
+            events.extend(e for evs, _ in room_to_events.values() for e in evs)
+
+            events.sort(key=lambda e: e.internal_metadata.order)
+
+            if limit:
+                events[:] = events[:limit]
+
+            if events:
+                end_key = events[-1].internal_metadata.after
+            else:
+                end_key = to_key
+
         defer.returnValue((events, end_key))
 
     def get_current_key(self, direction='f'):
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 075566417f..1d0f0058a2 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -18,11 +18,14 @@ from ._base import BaseHandler
 from synapse.streams.config import PaginationConfig
 from synapse.api.constants import Membership, EventTypes
 from synapse.util import unwrapFirstError
+from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.metrics import Measure
 
 from twisted.internet import defer
 
 import collections
 import logging
+import itertools
 
 logger = logging.getLogger(__name__)
 
@@ -139,6 +142,15 @@ class SyncHandler(BaseHandler):
             A Deferred SyncResult.
         """
 
+        context = LoggingContext.current_context()
+        if context:
+            if since_token is None:
+                context.tag = "initial_sync"
+            elif full_state:
+                context.tag = "full_state_sync"
+            else:
+                context.tag = "incremental_sync"
+
         if timeout == 0 or since_token is None or full_state:
             # we are going to return immediately, so don't bother calling
             # notifier.wait_for_events.
@@ -167,18 +179,6 @@ class SyncHandler(BaseHandler):
         else:
             return self.incremental_sync_with_gap(sync_config, since_token)
 
-    def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room):
-        if room_id not in ephemeral_by_room:
-            return None
-        for e in ephemeral_by_room[room_id]:
-            if e['type'] != 'm.receipt':
-                continue
-            for receipt_event_id, val in e['content'].items():
-                if 'm.read' in val:
-                    if user_id in val['m.read']:
-                        return receipt_event_id
-        return None
-
     @defer.inlineCallbacks
     def full_state_sync(self, sync_config, timeline_since_token):
         """Get a sync for a client which is starting without any state.
@@ -228,44 +228,51 @@ class SyncHandler(BaseHandler):
         invited = []
         archived = []
         deferreds = []
-        for event in room_list:
-            if event.membership == Membership.JOIN:
-                room_sync_deferred = self.full_state_sync_for_joined_room(
-                    room_id=event.room_id,
-                    sync_config=sync_config,
-                    now_token=now_token,
-                    timeline_since_token=timeline_since_token,
-                    ephemeral_by_room=ephemeral_by_room,
-                    tags_by_room=tags_by_room,
-                    account_data_by_room=account_data_by_room,
-                )
-                room_sync_deferred.addCallback(joined.append)
-                deferreds.append(room_sync_deferred)
-            elif event.membership == Membership.INVITE:
-                invite = yield self.store.get_event(event.event_id)
-                invited.append(InvitedSyncResult(
-                    room_id=event.room_id,
-                    invite=invite,
-                ))
-            elif event.membership in (Membership.LEAVE, Membership.BAN):
-                leave_token = now_token.copy_and_replace(
-                    "room_key", "s%d" % (event.stream_ordering,)
-                )
-                room_sync_deferred = self.full_state_sync_for_archived_room(
-                    sync_config=sync_config,
-                    room_id=event.room_id,
-                    leave_event_id=event.event_id,
-                    leave_token=leave_token,
-                    timeline_since_token=timeline_since_token,
-                    tags_by_room=tags_by_room,
-                    account_data_by_room=account_data_by_room,
-                )
-                room_sync_deferred.addCallback(archived.append)
-                deferreds.append(room_sync_deferred)
 
-        yield defer.gatherResults(
-            deferreds, consumeErrors=True
-        ).addErrback(unwrapFirstError)
+        room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)]
+        for room_list_chunk in room_list_chunks:
+            for event in room_list_chunk:
+                if event.membership == Membership.JOIN:
+                    room_sync_deferred = preserve_fn(
+                        self.full_state_sync_for_joined_room
+                    )(
+                        room_id=event.room_id,
+                        sync_config=sync_config,
+                        now_token=now_token,
+                        timeline_since_token=timeline_since_token,
+                        ephemeral_by_room=ephemeral_by_room,
+                        tags_by_room=tags_by_room,
+                        account_data_by_room=account_data_by_room,
+                    )
+                    room_sync_deferred.addCallback(joined.append)
+                    deferreds.append(room_sync_deferred)
+                elif event.membership == Membership.INVITE:
+                    invite = yield self.store.get_event(event.event_id)
+                    invited.append(InvitedSyncResult(
+                        room_id=event.room_id,
+                        invite=invite,
+                    ))
+                elif event.membership in (Membership.LEAVE, Membership.BAN):
+                    leave_token = now_token.copy_and_replace(
+                        "room_key", "s%d" % (event.stream_ordering,)
+                    )
+                    room_sync_deferred = preserve_fn(
+                        self.full_state_sync_for_archived_room
+                    )(
+                        sync_config=sync_config,
+                        room_id=event.room_id,
+                        leave_event_id=event.event_id,
+                        leave_token=leave_token,
+                        timeline_since_token=timeline_since_token,
+                        tags_by_room=tags_by_room,
+                        account_data_by_room=account_data_by_room,
+                    )
+                    room_sync_deferred.addCallback(archived.append)
+                    deferreds.append(room_sync_deferred)
+
+            yield defer.gatherResults(
+                deferreds, consumeErrors=True
+            ).addErrback(unwrapFirstError)
 
         account_data_for_user = sync_config.filter_collection.filter_account_data(
             self.account_data_for_user(account_data)
@@ -305,7 +312,6 @@ class SyncHandler(BaseHandler):
             ephemeral_by_room=ephemeral_by_room,
             tags_by_room=tags_by_room,
             account_data_by_room=account_data_by_room,
-            all_ephemeral_by_room=ephemeral_by_room,
             batch=batch,
             full_state=True,
         )
@@ -355,50 +361,51 @@ class SyncHandler(BaseHandler):
             typing events for that room.
         """
 
-        typing_key = since_token.typing_key if since_token else "0"
-
-        rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
-        room_ids = [room.room_id for room in rooms]
-
-        typing_source = self.event_sources.sources["typing"]
-        typing, typing_key = yield typing_source.get_new_events(
-            user=sync_config.user,
-            from_key=typing_key,
-            limit=sync_config.filter_collection.ephemeral_limit(),
-            room_ids=room_ids,
-            is_guest=sync_config.is_guest,
-        )
-        now_token = now_token.copy_and_replace("typing_key", typing_key)
-
-        ephemeral_by_room = {}
-
-        for event in typing:
-            # we want to exclude the room_id from the event, but modifying the
-            # result returned by the event source is poor form (it might cache
-            # the object)
-            room_id = event["room_id"]
-            event_copy = {k: v for (k, v) in event.iteritems()
-                          if k != "room_id"}
-            ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+        with Measure(self.clock, "ephemeral_by_room"):
+            typing_key = since_token.typing_key if since_token else "0"
 
-        receipt_key = since_token.receipt_key if since_token else "0"
+            rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+            room_ids = [room.room_id for room in rooms]
 
-        receipt_source = self.event_sources.sources["receipt"]
-        receipts, receipt_key = yield receipt_source.get_new_events(
-            user=sync_config.user,
-            from_key=receipt_key,
-            limit=sync_config.filter_collection.ephemeral_limit(),
-            room_ids=room_ids,
-            is_guest=sync_config.is_guest,
-        )
-        now_token = now_token.copy_and_replace("receipt_key", receipt_key)
+            typing_source = self.event_sources.sources["typing"]
+            typing, typing_key = yield typing_source.get_new_events(
+                user=sync_config.user,
+                from_key=typing_key,
+                limit=sync_config.filter_collection.ephemeral_limit(),
+                room_ids=room_ids,
+                is_guest=sync_config.is_guest,
+            )
+            now_token = now_token.copy_and_replace("typing_key", typing_key)
+
+            ephemeral_by_room = {}
+
+            for event in typing:
+                # we want to exclude the room_id from the event, but modifying the
+                # result returned by the event source is poor form (it might cache
+                # the object)
+                room_id = event["room_id"]
+                event_copy = {k: v for (k, v) in event.iteritems()
+                              if k != "room_id"}
+                ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+
+            receipt_key = since_token.receipt_key if since_token else "0"
+
+            receipt_source = self.event_sources.sources["receipt"]
+            receipts, receipt_key = yield receipt_source.get_new_events(
+                user=sync_config.user,
+                from_key=receipt_key,
+                limit=sync_config.filter_collection.ephemeral_limit(),
+                room_ids=room_ids,
+                is_guest=sync_config.is_guest,
+            )
+            now_token = now_token.copy_and_replace("receipt_key", receipt_key)
 
-        for event in receipts:
-            room_id = event["room_id"]
-            # exclude room id, as above
-            event_copy = {k: v for (k, v) in event.iteritems()
-                          if k != "room_id"}
-            ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+            for event in receipts:
+                room_id = event["room_id"]
+                # exclude room id, as above
+                event_copy = {k: v for (k, v) in event.iteritems()
+                              if k != "room_id"}
+                ephemeral_by_room.setdefault(room_id, []).append(event_copy)
 
         defer.returnValue((now_token, ephemeral_by_room))
 
@@ -438,13 +445,6 @@ class SyncHandler(BaseHandler):
         )
         now_token = now_token.copy_and_replace("presence_key", presence_key)
 
-        # We now fetch all ephemeral events for this room in order to get
-        # this users current read receipt. This could almost certainly be
-        # optimised.
-        _, all_ephemeral_by_room = yield self.ephemeral_by_room(
-            sync_config, now_token
-        )
-
         now_token, ephemeral_by_room = yield self.ephemeral_by_room(
             sync_config, now_token, since_token
         )
@@ -478,7 +478,7 @@ class SyncHandler(BaseHandler):
         )
 
         # Get a list of membership change events that have happened.
-        rooms_changed = yield self.store.get_room_changes_for_user(
+        rooms_changed = yield self.store.get_membership_changes_for_user(
             user_id, since_token.room_key, now_token.room_key
         )
 
@@ -576,7 +576,6 @@ class SyncHandler(BaseHandler):
                 ephemeral_by_room=ephemeral_by_room,
                 tags_by_room=tags_by_room,
                 account_data_by_room=account_data_by_room,
-                all_ephemeral_by_room=all_ephemeral_by_room,
                 batch=batch,
                 full_state=full_state,
             )
@@ -606,58 +605,64 @@ class SyncHandler(BaseHandler):
         """
         :returns a Deferred TimelineBatch
         """
-        filtering_factor = 2
-        timeline_limit = sync_config.filter_collection.timeline_limit()
-        load_limit = max(timeline_limit * filtering_factor, 10)
-        max_repeat = 5  # Only try a few times per room, otherwise
-        room_key = now_token.room_key
-        end_key = room_key
-
-        limited = recents is None or newly_joined_room or timeline_limit < len(recents)
-
-        if recents is not None:
-            recents = sync_config.filter_collection.filter_room_timeline(recents)
-            recents = yield self._filter_events_for_client(
-                sync_config.user.to_string(),
-                recents,
-                is_peeking=sync_config.is_guest,
-            )
-        else:
-            recents = []
-
-        since_key = None
-        if since_token and not newly_joined_room:
-            since_key = since_token.room_key
-
-        while limited and len(recents) < timeline_limit and max_repeat:
-            events, end_key = yield self.store.get_room_events_stream_for_room(
-                room_id,
-                limit=load_limit + 1,
-                from_key=since_key,
-                to_key=end_key,
-            )
-            loaded_recents = sync_config.filter_collection.filter_room_timeline(events)
-            loaded_recents = yield self._filter_events_for_client(
-                sync_config.user.to_string(),
-                loaded_recents,
-                is_peeking=sync_config.is_guest,
-            )
-            loaded_recents.extend(recents)
-            recents = loaded_recents
-
-            if len(events) <= load_limit:
+        with Measure(self.clock, "load_filtered_recents"):
+            filtering_factor = 2
+            timeline_limit = sync_config.filter_collection.timeline_limit()
+            load_limit = max(timeline_limit * filtering_factor, 10)
+            max_repeat = 5  # Only try a few times per room, otherwise
+            room_key = now_token.room_key
+            end_key = room_key
+
+            if recents is None or newly_joined_room or timeline_limit < len(recents):
+                limited = True
+            else:
                 limited = False
-                break
-            max_repeat -= 1
 
-        if len(recents) > timeline_limit:
-            limited = True
-            recents = recents[-timeline_limit:]
-            room_key = recents[0].internal_metadata.before
+            if recents is not None:
+                recents = sync_config.filter_collection.filter_room_timeline(recents)
+                recents = yield self._filter_events_for_client(
+                    sync_config.user.to_string(),
+                    recents,
+                    is_peeking=sync_config.is_guest,
+                )
+            else:
+                recents = []
+
+            since_key = None
+            if since_token and not newly_joined_room:
+                since_key = since_token.room_key
+
+            while limited and len(recents) < timeline_limit and max_repeat:
+                events, end_key = yield self.store.get_room_events_stream_for_room(
+                    room_id,
+                    limit=load_limit + 1,
+                    from_key=since_key,
+                    to_key=end_key,
+                )
+                loaded_recents = sync_config.filter_collection.filter_room_timeline(
+                    events
+                )
+                loaded_recents = yield self._filter_events_for_client(
+                    sync_config.user.to_string(),
+                    loaded_recents,
+                    is_peeking=sync_config.is_guest,
+                )
+                loaded_recents.extend(recents)
+                recents = loaded_recents
 
-        prev_batch_token = now_token.copy_and_replace(
-            "room_key", room_key
-        )
+                if len(events) <= load_limit:
+                    limited = False
+                    break
+                max_repeat -= 1
+
+            if len(recents) > timeline_limit:
+                limited = True
+                recents = recents[-timeline_limit:]
+                room_key = recents[0].internal_metadata.before
+
+            prev_batch_token = now_token.copy_and_replace(
+                "room_key", room_key
+            )
 
         defer.returnValue(TimelineBatch(
             events=recents,
@@ -670,37 +675,11 @@ class SyncHandler(BaseHandler):
                                            since_token, now_token,
                                            ephemeral_by_room, tags_by_room,
                                            account_data_by_room,
-                                           all_ephemeral_by_room,
                                            batch, full_state=False):
-        if full_state:
-            state = yield self.get_state_at(room_id, now_token)
-
-        elif batch.limited:
-            current_state = yield self.get_state_at(room_id, now_token)
-
-            state_at_previous_sync = yield self.get_state_at(
-                room_id, stream_position=since_token
-            )
-
-            state = yield self.compute_state_delta(
-                since_token=since_token,
-                previous_state=state_at_previous_sync,
-                current_state=current_state,
-            )
-        else:
-            state = {
-                (event.type, event.state_key): event
-                for event in batch.events if event.is_state()
-            }
-
-        just_joined = yield self.check_joined_room(sync_config, state)
-        if just_joined:
-            state = yield self.get_state_at(room_id, now_token)
-
-        state = {
-            (e.type, e.state_key): e
-            for e in sync_config.filter_collection.filter_room_state(state.values())
-        }
+        state = yield self.compute_state_delta(
+            room_id, batch, sync_config, since_token, now_token,
+            full_state=full_state
+        )
 
         account_data = self.account_data_for_room(
             room_id, tags_by_room, account_data_by_room
@@ -726,14 +705,12 @@ class SyncHandler(BaseHandler):
 
         if room_sync:
             notifs = yield self.unread_notifs_for_room_id(
-                room_id, sync_config, all_ephemeral_by_room
+                room_id, sync_config
             )
 
             if notifs is not None:
-                unread_notifications["notification_count"] = len(notifs)
-                unread_notifications["highlight_count"] = len([
-                    1 for notif in notifs if _action_has_highlight(notif["actions"])
-                ])
+                unread_notifications["notification_count"] = notifs["notify_count"]
+                unread_notifications["highlight_count"] = notifs["highlight_count"]
 
         logger.debug("Room sync: %r", room_sync)
 
@@ -766,30 +743,11 @@ class SyncHandler(BaseHandler):
 
         logger.debug("Recents %r", batch)
 
-        state_events_at_leave = yield self.store.get_state_for_event(
-            leave_event_id
+        state_events_delta = yield self.compute_state_delta(
+            room_id, batch, sync_config, since_token, leave_token,
+            full_state=full_state
         )
 
-        if not full_state:
-            state_at_previous_sync = yield self.get_state_at(
-                room_id, stream_position=since_token
-            )
-
-            state_events_delta = yield self.compute_state_delta(
-                since_token=since_token,
-                previous_state=state_at_previous_sync,
-                current_state=state_events_at_leave,
-            )
-        else:
-            state_events_delta = state_events_at_leave
-
-        state_events_delta = {
-            (e.type, e.state_key): e
-            for e in sync_config.filter_collection.filter_room_state(
-                state_events_delta.values()
-            )
-        }
-
         account_data = self.account_data_for_room(
             room_id, tags_by_room, account_data_by_room
         )
@@ -843,15 +801,19 @@ class SyncHandler(BaseHandler):
             state = {}
         defer.returnValue(state)
 
-    def compute_state_delta(self, since_token, previous_state, current_state):
-        """ Works out the differnce in state between the current state and the
-        state the client got when it last performed a sync.
-
-        :param str since_token: the point we are comparing against
-        :param dict[(str,str), synapse.events.FrozenEvent] previous_state: the
-            state to compare to
-        :param dict[(str,str), synapse.events.FrozenEvent] current_state: the
-            new state
+    @defer.inlineCallbacks
+    def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
+                            full_state):
+        """ Works out the differnce in state between the start of the timeline
+        and the previous sync.
+
+        :param str room_id
+        :param TimelineBatch batch: The timeline batch for the room that will
+            be sent to the user.
+        :param sync_config
+        :param str since_token: Token of the end of the previous batch. May be None.
+        :param str now_token: Token of the end of the current batch.
+        :param bool full_state: Whether to force returning the full state.
 
         :returns A new event dictionary
         """
@@ -860,12 +822,53 @@ class SyncHandler(BaseHandler):
         # updates even if they occured logically before the previous event.
         # TODO(mjark) Check for new redactions in the state events.
 
-        state_delta = {}
-        for key, event in current_state.iteritems():
-            if (key not in previous_state or
-                    previous_state[key].event_id != event.event_id):
-                state_delta[key] = event
-        return state_delta
+        with Measure(self.clock, "compute_state_delta"):
+            if full_state:
+                if batch:
+                    state = yield self.store.get_state_for_event(
+                        batch.events[0].event_id
+                    )
+                else:
+                    state = yield self.get_state_at(
+                        room_id, stream_position=now_token
+                    )
+
+                timeline_state = {
+                    (event.type, event.state_key): event
+                    for event in batch.events if event.is_state()
+                }
+
+                state = _calculate_state(
+                    timeline_contains=timeline_state,
+                    timeline_start=state,
+                    previous={},
+                )
+            elif batch.limited:
+                state_at_previous_sync = yield self.get_state_at(
+                    room_id, stream_position=since_token
+                )
+
+                state_at_timeline_start = yield self.store.get_state_for_event(
+                    batch.events[0].event_id
+                )
+
+                timeline_state = {
+                    (event.type, event.state_key): event
+                    for event in batch.events if event.is_state()
+                }
+
+                state = _calculate_state(
+                    timeline_contains=timeline_state,
+                    timeline_start=state_at_timeline_start,
+                    previous=state_at_previous_sync,
+                )
+            else:
+                state = {}
+
+            defer.returnValue({
+                (e.type, e.state_key): e
+                for e in sync_config.filter_collection.filter_room_state(state.values())
+            })
 
     def check_joined_room(self, sync_config, state_delta):
         """
@@ -886,21 +889,24 @@ class SyncHandler(BaseHandler):
         return False
 
     @defer.inlineCallbacks
-    def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room):
-        last_unread_event_id = self.last_read_event_id_for_room_and_user(
-            room_id, sync_config.user.to_string(), ephemeral_by_room
-        )
-
-        notifs = []
-        if last_unread_event_id:
-            notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
-                room_id, sync_config.user.to_string(), last_unread_event_id
+    def unread_notifs_for_room_id(self, room_id, sync_config):
+        with Measure(self.clock, "unread_notifs_for_room_id"):
+            last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user(
+                user_id=sync_config.user.to_string(),
+                room_id=room_id,
+                receipt_type="m.read"
             )
-            defer.returnValue(notifs)
 
-        # There is no new information in this period, so your notification
-        # count is whatever it was last time.
-        defer.returnValue(None)
+            notifs = []
+            if last_unread_event_id:
+                notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
+                    room_id, sync_config.user.to_string(), last_unread_event_id
+                )
+                defer.returnValue(notifs)
+
+            # There is no new information in this period, so your notification
+            # count is whatever it was last time.
+            defer.returnValue(None)
 
 
 def _action_has_highlight(actions):
@@ -912,3 +918,37 @@ def _action_has_highlight(actions):
             pass
 
     return False
+
+
+def _calculate_state(timeline_contains, timeline_start, previous):
+    """Works out what state to include in a sync response.
+
+    Args:
+        timeline_contains (dict): state in the timeline
+        timeline_start (dict): state at the start of the timeline
+        previous (dict): state at the end of the previous sync (or empty dict
+            if this is an initial sync)
+
+    Returns:
+        dict
+    """
+    event_id_to_state = {
+        e.event_id: e
+        for e in itertools.chain(
+            timeline_contains.values(),
+            previous.values(),
+            timeline_start.values(),
+        )
+    }
+
+    tc_ids = set(e.event_id for e in timeline_contains.values())
+    p_ids = set(e.event_id for e in previous.values())
+    ts_ids = set(e.event_id for e in timeline_start.values())
+
+    state_ids = (ts_ids - p_ids) - tc_ids
+
+    evs = (event_id_to_state[e] for e in state_ids)
+    return {
+        (e.type, e.state_key): e
+        for e in evs
+    }
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 43bf600913..b16d0017df 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -19,6 +19,7 @@ from ._base import BaseHandler
 
 from synapse.api.errors import SynapseError, AuthError
 from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.metrics import Measure
 from synapse.types import UserID
 
 import logging
@@ -222,6 +223,7 @@ class TypingNotificationHandler(BaseHandler):
 class TypingNotificationEventSource(object):
     def __init__(self, hs):
         self.hs = hs
+        self.clock = hs.get_clock()
         self._handler = None
         self._room_member_handler = None
 
@@ -247,19 +249,20 @@ class TypingNotificationEventSource(object):
         }
 
     def get_new_events(self, from_key, room_ids, **kwargs):
-        from_key = int(from_key)
-        handler = self.handler()
+        with Measure(self.clock, "typing.get_new_events"):
+            from_key = int(from_key)
+            handler = self.handler()
 
-        events = []
-        for room_id in room_ids:
-            if room_id not in handler._room_serials:
-                continue
-            if handler._room_serials[room_id] <= from_key:
-                continue
+            events = []
+            for room_id in room_ids:
+                if room_id not in handler._room_serials:
+                    continue
+                if handler._room_serials[room_id] <= from_key:
+                    continue
 
-            events.append(self._make_event_for(room_id))
+                events.append(self._make_event_for(room_id))
 
-        return events, handler._latest_room_serial
+            return events, handler._latest_room_serial
 
     def get_current_key(self):
         return self.handler()._latest_room_serial