summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/user_directory.py429
-rw-r--r--synapse/notifier.py6
-rw-r--r--synapse/push/pusher.py4
-rw-r--r--synapse/rest/__init__.py2
-rw-r--r--synapse/rest/client/v2_alpha/user_directory.py75
-rw-r--r--synapse/server.py5
-rw-r--r--synapse/state.py11
-rw-r--r--synapse/storage/__init__.py14
-rw-r--r--synapse/storage/_base.py5
-rw-r--r--synapse/storage/client_ips.py7
-rw-r--r--synapse/storage/events.py36
-rw-r--r--synapse/storage/schema/delta/42/current_state_delta.sql26
-rw-r--r--synapse/storage/schema/delta/42/user_dir.py84
-rw-r--r--synapse/storage/user_directory.py461
-rw-r--r--synapse/types.py7
-rw-r--r--synapse/util/caches/stream_change_cache.py15
16 files changed, 1175 insertions, 12 deletions
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
new file mode 100644
index 0000000000..0182cf86d6
--- /dev/null
+++ b/synapse/handlers/user_directory.py
@@ -0,0 +1,429 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.storage.roommember import ProfileInfo
+from synapse.util.metrics import Measure
+
+
+logger = logging.getLogger(__name__)
+
+
+class UserDirectoyHandler(object):
+    """Handles querying of and keeping updated the user_directory.
+
+    N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY
+
+    The user directory is filled with users who this server can see are joined to a
+    world_readable or publically joinable room. We keep a database table up to date
+    by streaming changes of the current state and recalculating whether users should
+    be in the directory or not when necessary.
+
+    For each user in the directory we also store a room_id which is public and that the
+    user is joined to. This allows us to ignore history_visibility and join_rules changes
+    for that user in all other public rooms, as we know they'll still be in at least
+    one public room.
+    """
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.state = hs.get_state_handler()
+        self.server_name = hs.hostname
+        self.clock = hs.get_clock()
+
+        # When start up for the first time we need to populate the user_directory.
+        # This is a set of user_id's we've inserted already
+        self.initially_handled_users = set()
+        self.initially_handled_users_in_public = set()
+
+        # The current position in the current_state_delta stream
+        self.pos = None
+
+        # Guard to ensure we only process deltas one at a time
+        self._is_processing = False
+
+        # We kick this off so that we don't have to wait for a change before
+        # we start populating the user directory
+        self.clock.call_later(0, self.notify_new_event)
+
+    def search_users(self, search_term, limit):
+        """Searches for users in directory
+
+        Returns:
+            dict of the form::
+
+                {
+                    "limited": <bool>,  # whether there were more results or not
+                    "results": [  # Ordered by best match first
+                        {
+                            "user_id": <user_id>,
+                            "display_name": <display_name>,
+                            "avatar_url": <avatar_url>
+                        }
+                    ]
+                }
+        """
+        return self.store.search_user_dir(search_term, limit)
+
+    @defer.inlineCallbacks
+    def notify_new_event(self):
+        """Called when there may be more deltas to process
+        """
+        if self._is_processing:
+            return
+
+        self._is_processing = True
+        try:
+            yield self._unsafe_process()
+        finally:
+            self._is_processing = False
+
+    @defer.inlineCallbacks
+    def _unsafe_process(self):
+        # If self.pos is None then means we haven't fetched it from DB
+        if self.pos is None:
+            self.pos = yield self.store.get_user_directory_stream_pos()
+
+        # If still None then we need to do the initial fill of directory
+        if self.pos is None:
+            yield self._do_initial_spam()
+            self.pos = yield self.store.get_user_directory_stream_pos()
+
+        # Loop round handling deltas until we're up to date
+        while True:
+            with Measure(self.clock, "user_dir_delta"):
+                deltas = yield self.store.get_current_state_deltas(self.pos)
+                if not deltas:
+                    return
+
+                yield self._handle_deltas(deltas)
+
+                self.pos = deltas[-1]["stream_id"]
+                yield self.store.update_user_directory_stream_pos(self.pos)
+
+    @defer.inlineCallbacks
+    def _do_initial_spam(self):
+        """Populates the user_directory from the current state of the DB, used
+        when synapse first starts with user_directory support
+        """
+        new_pos = yield self.store.get_max_stream_id_in_current_state_deltas()
+
+        # Delete any existing entries just in case there are any
+        yield self.store.delete_all_from_user_dir()
+
+        # We process by going through each existing room at a time.
+        room_ids = yield self.store.get_all_rooms()
+
+        for room_id in room_ids:
+            yield self._handle_intial_room(room_id)
+
+        self.initially_handled_users = None
+
+        yield self.store.update_user_directory_stream_pos(new_pos)
+
+    @defer.inlineCallbacks
+    def _handle_intial_room(self, room_id):
+        """Called when we initially fill out user_directory one room at a time
+        """
+        is_in_room = yield self.state.get_is_host_in_room(room_id, self.server_name)
+        if not is_in_room:
+            return
+
+        is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id)
+
+        users_with_profile = yield self.state.get_current_user_in_room(room_id)
+        unhandled_users = set(users_with_profile) - self.initially_handled_users
+
+        yield self.store.add_profiles_to_user_dir(
+            room_id, {
+                user_id: users_with_profile[user_id] for user_id in unhandled_users
+            }
+        )
+
+        self.initially_handled_users |= unhandled_users
+
+        if is_public:
+            yield self.store.add_users_to_public_room(
+                room_id,
+                user_ids=unhandled_users - self.initially_handled_users_in_public
+            )
+            self.initially_handled_users_in_public != unhandled_users
+
+    @defer.inlineCallbacks
+    def _handle_deltas(self, deltas):
+        """Called with the state deltas to process
+        """
+        for delta in deltas:
+            typ = delta["type"]
+            state_key = delta["state_key"]
+            room_id = delta["room_id"]
+            event_id = delta["event_id"]
+            prev_event_id = delta["prev_event_id"]
+
+            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+
+            # For join rule and visibility changes we need to check if the room
+            # may have become public or not and add/remove the users in said room
+            if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules):
+                yield self._handle_room_publicity_change(
+                    room_id, prev_event_id, event_id, typ,
+                )
+            elif typ == EventTypes.Member:
+                change = yield self._get_key_change(
+                    prev_event_id, event_id,
+                    key_name="membership",
+                    public_value=Membership.JOIN,
+                )
+
+                if change is None:
+                    # Handle any profile changes
+                    yield self._handle_profile_change(state_key, prev_event_id, event_id)
+                    continue
+
+                if not change:
+                    # Need to check if the server left the room entirely, if so
+                    # we might need to remove all the users in that room
+                    is_in_room = yield self.state.get_is_host_in_room(
+                        room_id, self.server_name,
+                    )
+                    if not is_in_room:
+                        logger.debug("Server left room: %r", room_id)
+                        # Fetch all the users that we marked as being in user
+                        # directory due to being in the room and then check if
+                        # need to remove those users or not
+                        user_ids = yield self.store.get_users_in_dir_due_to_room(room_id)
+                        for user_id in user_ids:
+                            yield self._handle_remove_user(room_id, user_id)
+                        return
+                    else:
+                        logger.debug("Server is still in room: %r", room_id)
+
+                if change:  # The user joined
+                    event = yield self.store.get_event(event_id)
+                    profile = ProfileInfo(
+                        avatar_url=event.content.get("avatar_url"),
+                        display_name=event.content.get("displayname"),
+                    )
+
+                    yield self._handle_new_user(room_id, state_key, profile)
+                else:  # The user left
+                    yield self._handle_remove_user(room_id, state_key)
+            else:
+                logger.debug("Ignoring irrelevant type: %r", typ)
+
+    @defer.inlineCallbacks
+    def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ):
+        """Handle a room having potentially changed from/to world_readable/publically
+        joinable.
+
+        Args:
+            room_id (str)
+            prev_event_id (str|None): The previous event before the state change
+            event_id (str|None): The new event after the state change
+            typ (str): Type of the event
+        """
+        logger.debug("Handling change for %s", typ)
+
+        if typ == EventTypes.RoomHistoryVisibility:
+            change = yield self._get_key_change(
+                prev_event_id, event_id,
+                key_name="history_visibility",
+                public_value="world_readable",
+            )
+        elif typ == EventTypes.JoinRules:
+            change = yield self._get_key_change(
+                prev_event_id, event_id,
+                key_name="join_rule",
+                public_value=JoinRules.PUBLIC,
+            )
+        else:
+            raise Exception("Invalid event type")
+        # If change is None, no change. True => become world_readable/public,
+        # False => was world_readable/public
+        if change is None:
+            logger.debug("No change")
+            return
+
+        # There's been a change to or from being world readable.
+
+        is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
+            room_id
+        )
+
+        logger.debug("Change: %r, is_public: %r", change, is_public)
+
+        if change and not is_public:
+            # If we became world readable but room isn't currently public then
+            # we ignore the change
+            return
+        elif not change and is_public:
+            # If we stopped being world readable but are still public,
+            # ignore the change
+            return
+
+        if change:
+            users_with_profile = yield self.state.get_current_user_in_room(room_id)
+            for user_id, profile in users_with_profile.iteritems():
+                yield self._handle_new_user(room_id, user_id, profile)
+        else:
+            users = yield self.store.get_users_in_public_due_to_room(room_id)
+            for user_id in users:
+                yield self._handle_remove_user(room_id, user_id)
+
+    @defer.inlineCallbacks
+    def _handle_new_user(self, room_id, user_id, profile):
+        """Called when we might need to add user to directory
+
+        Args:
+            room_id (str): room_id that user joined or started being public that
+            user_id (str)
+        """
+        logger.debug("Adding user to dir, %r", user_id)
+
+        row = yield self.store.get_user_in_directory(user_id)
+        if not row:
+            yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile})
+
+        is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
+            room_id
+        )
+
+        if not is_public:
+            return
+
+        row = yield self.store.get_user_in_public_room(user_id)
+        if not row:
+            yield self.store.add_users_to_public_room(room_id, [user_id])
+
+    @defer.inlineCallbacks
+    def _handle_remove_user(self, room_id, user_id):
+        """Called when we might need to remove user to directory
+
+        Args:
+            room_id (str): room_id that user left or stopped being public that
+            user_id (str)
+        """
+        logger.debug("Maybe removing user %r", user_id)
+
+        row = yield self.store.get_user_in_directory(user_id)
+        update_user_dir = row and row["room_id"] == room_id
+
+        row = yield self.store.get_user_in_public_room(user_id)
+        update_user_in_public = row and row["room_id"] == room_id
+
+        if not update_user_in_public and not update_user_dir:
+            return
+
+        # XXX: Make this faster?
+        rooms = yield self.store.get_rooms_for_user(user_id)
+        for j_room_id in rooms:
+            if not update_user_in_public and not update_user_dir:
+                break
+
+            is_in_room = yield self.state.get_is_host_in_room(
+                j_room_id, self.server_name,
+            )
+
+            if not is_in_room:
+                continue
+
+            if update_user_dir:
+                update_user_dir = False
+                yield self.store.update_user_in_user_dir(user_id, j_room_id)
+
+            if update_user_in_public:
+                is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
+                    j_room_id
+                )
+
+                if is_public:
+                    yield self.store.update_user_in_public_user_list(user_id, j_room_id)
+                    update_user_in_public = False
+
+        if update_user_dir:
+            yield self.store.remove_from_user_dir(user_id)
+        elif update_user_in_public:
+            yield self.store.remove_from_user_in_public_room(user_id)
+
+    @defer.inlineCallbacks
+    def _handle_profile_change(self, user_id, prev_event_id, event_id):
+        """Check member event changes for any profile changes and update the
+        database if there are.
+        """
+        if not prev_event_id or not event_id:
+            return
+
+        prev_event = yield self.store.get_event(prev_event_id)
+        event = yield self.store.get_event(event_id)
+
+        if event.membership != Membership.JOIN:
+            return
+
+        prev_name = prev_event.content.get("displayname")
+        new_name = event.content.get("displayname")
+
+        prev_avatar = prev_event.content.get("avatar_url")
+        new_avatar = event.content.get("avatar_url")
+
+        if prev_name != new_name or prev_avatar != new_avatar:
+            yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
+
+    @defer.inlineCallbacks
+    def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
+        """Given two events check if the `key_name` field in content changed
+        from not matching `public_value` to doing so.
+
+        For example, check if `history_visibility` (`key_name`) changed from
+        `shared` to `world_readable` (`public_value`).
+
+        Returns:
+            None if the field in the events either both match `public_value`
+            or if neither do, i.e. there has been no change.
+            True if it didnt match `public_value` but now does
+            False if it did match `public_value` but now doesn't
+        """
+        prev_event = None
+        event = None
+        if prev_event_id:
+            prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+
+        if event_id:
+            event = yield self.store.get_event(event_id, allow_none=True)
+
+        if not event and not prev_event:
+            logger.debug("Neither event exists: %r %r", prev_event_id, event_id)
+            defer.returnValue(None)
+
+        prev_value = None
+        value = None
+
+        if prev_event:
+            prev_value = prev_event.content.get(key_name)
+
+        if event:
+            value = event.content.get(key_name)
+
+        logger.debug("prev_value: %r -> value: %r", prev_value, value)
+
+        if value == public_value and prev_value != public_value:
+            defer.returnValue(True)
+        elif value != public_value and prev_value == public_value:
+            defer.returnValue(False)
+        else:
+            defer.returnValue(None)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 48566187ab..6b1709d700 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -167,6 +167,7 @@ class Notifier(object):
 
         self.clock = hs.get_clock()
         self.appservice_handler = hs.get_application_service_handler()
+        self.user_directory_handler = hs.get_user_directory_handler()
 
         if hs.should_send_federation():
             self.federation_sender = hs.get_federation_sender()
@@ -251,7 +252,10 @@ class Notifier(object):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
         preserve_fn(self.appservice_handler.notify_interested_services)(
-            room_stream_id)
+            room_stream_id
+        )
+
+        preserve_fn(self.user_directory_handler.notify_new_event)()
 
         if self.federation_sender:
             preserve_fn(self.federation_sender.notify_new_events)(
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 9385c80ce3..491f27bded 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -57,8 +57,8 @@ class PusherFactory(object):
             logger.info("found pusher")
             return self.pusher_types[pusherdict['kind']](self.hs, pusherdict)
 
-    def _create_email_pusher(self, pusherdict):
-        app_name = self._brand_from_pusherdict
+    def _create_email_pusher(self, _hs, pusherdict):
+        app_name = self._app_name_from_pusherdict(pusherdict)
         mailer = self.mailers.get(app_name)
         if not mailer:
             mailer = Mailer(
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index aa8d874f96..3d809d181b 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -51,6 +51,7 @@ from synapse.rest.client.v2_alpha import (
     devices,
     thirdparty,
     sendtodevice,
+    user_directory,
 )
 
 from synapse.http.server import JsonResource
@@ -100,3 +101,4 @@ class ClientRestResource(JsonResource):
         devices.register_servlets(hs, client_resource)
         thirdparty.register_servlets(hs, client_resource)
         sendtodevice.register_servlets(hs, client_resource)
+        user_directory.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v2_alpha/user_directory.py b/synapse/rest/client/v2_alpha/user_directory.py
new file mode 100644
index 0000000000..17d3dffc8f
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/user_directory.py
@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from ._base import client_v2_patterns
+
+logger = logging.getLogger(__name__)
+
+
+class UserDirectorySearchRestServlet(RestServlet):
+    PATTERNS = client_v2_patterns("/user_directory/search$")
+
+    def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer): server
+        """
+        super(UserDirectorySearchRestServlet, self).__init__()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.user_directory_handler = hs.get_user_directory_handler()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        """Searches for users in directory
+
+        Returns:
+            dict of the form::
+
+                {
+                    "limited": <bool>,  # whether there were more results or not
+                    "results": [  # Ordered by best match first
+                        {
+                            "user_id": <user_id>,
+                            "display_name": <display_name>,
+                            "avatar_url": <avatar_url>
+                        }
+                    ]
+                }
+        """
+        yield self.auth.get_user_by_req(request, allow_guest=False)
+        body = parse_json_object_from_request(request)
+
+        limit = body.get("limit", 10)
+        limit = min(limit, 50)
+
+        try:
+            search_term = body["search_term"]
+        except:
+            raise SynapseError(400, "`search_term` is required field")
+
+        results = yield self.user_directory_handler.search_users(search_term, limit)
+
+        defer.returnValue((200, results))
+
+
+def register_servlets(hs, http_server):
+    UserDirectorySearchRestServlet(hs).register(http_server)
diff --git a/synapse/server.py b/synapse/server.py
index e400e278c6..a38e5179e0 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -49,6 +49,7 @@ from synapse.handlers.events import EventHandler, EventStreamHandler
 from synapse.handlers.initial_sync import InitialSyncHandler
 from synapse.handlers.receipts import ReceiptsHandler
 from synapse.handlers.read_marker import ReadMarkerHandler
+from synapse.handlers.user_directory import UserDirectoyHandler
 from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.notifier import Notifier
@@ -137,6 +138,7 @@ class HomeServer(object):
         'tcp_replication',
         'read_marker_handler',
         'action_generator',
+        'user_directory_handler',
     ]
 
     def __init__(self, hostname, **kwargs):
@@ -304,6 +306,9 @@ class HomeServer(object):
     def build_action_generator(self):
         return ActionGenerator(self)
 
+    def build_user_directory_handler(self):
+        return UserDirectoyHandler(self)
+
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
 
diff --git a/synapse/state.py b/synapse/state.py
index d1b1a70a91..a98145598f 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -183,6 +183,17 @@ class StateHandler(object):
         defer.returnValue(joined_hosts)
 
     @defer.inlineCallbacks
+    def get_is_host_in_room(self, room_id, host, latest_event_ids=None):
+        if not latest_event_ids:
+            latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
+        logger.debug("calling resolve_state_groups from get_is_host_in_room")
+        entry = yield self.resolve_state_groups(room_id, latest_event_ids)
+        is_host_joined = yield self.store.is_host_joined(
+            room_id, host, entry.state_id, entry.state
+        )
+        defer.returnValue(is_host_joined)
+
+    @defer.inlineCallbacks
     def compute_event_context(self, event, old_state=None):
         """Build an EventContext structure for the event.
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index d604e7668f..3c88ba9860 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -49,6 +49,7 @@ from .tags import TagsStore
 from .account_data import AccountDataStore
 from .openid import OpenIdStore
 from .client_ips import ClientIpStore
+from .user_directory import UserDirectoryStore
 
 from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
 from .engines import PostgresEngine
@@ -86,6 +87,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 ClientIpStore,
                 DeviceStore,
                 DeviceInboxStore,
+                UserDirectoryStore,
                 ):
 
     def __init__(self, db_conn, hs):
@@ -221,6 +223,18 @@ class DataStore(RoomMemberStore, RoomStore,
             "DeviceListFederationStreamChangeCache", device_list_max,
         )
 
+        curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
+            db_conn, "current_state_delta_stream",
+            entity_column="room_id",
+            stream_column="stream_id",
+            max_value=events_max,  # As we share the stream id with events token
+            limit=1000,
+        )
+        self._curr_state_delta_stream_cache = StreamChangeCache(
+            "_curr_state_delta_stream_cache", min_curr_state_delta_id,
+            prefilled_cache=curr_state_delta_prefill,
+        )
+
         cur = LoggingTransaction(
             db_conn.cursor(),
             name="_find_stream_orderings_for_times_txn",
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 58b73af7d2..db816346f5 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -425,6 +425,11 @@ class SQLBaseStore(object):
 
         txn.execute(sql, vals)
 
+    def _simple_insert_many(self, table, values, desc):
+        return self.runInteraction(
+            desc, self._simple_insert_many_txn, table, values
+        )
+
     @staticmethod
     def _simple_insert_many_txn(txn, table, values):
         if not values:
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 747d2df622..014ab635b7 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -20,6 +20,8 @@ from twisted.internet import defer
 from ._base import Cache
 from . import background_updates
 
+import os
+
 logger = logging.getLogger(__name__)
 
 # Number of msec of granularity to store the user IP 'last seen' time. Smaller
@@ -28,12 +30,15 @@ logger = logging.getLogger(__name__)
 LAST_SEEN_GRANULARITY = 120 * 1000
 
 
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
+
+
 class ClientIpStore(background_updates.BackgroundUpdateStore):
     def __init__(self, hs):
         self.client_ip_last_seen = Cache(
             name="client_ip_last_seen",
             keylen=4,
-            max_entries=5000,
+            max_entries=50000 * CACHE_SIZE_FACTOR,
         )
 
         super(ClientIpStore, self).__init__(hs)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 3d4f53ea55..528f19eb87 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -648,9 +648,10 @@ class EventsStore(SQLBaseStore):
                 list of the event ids which are the forward extremities.
 
         """
-        self._update_current_state_txn(txn, current_state_for_room)
-
         max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
+
+        self._update_current_state_txn(txn, current_state_for_room, max_stream_order)
+
         self._update_forward_extremities_txn(
             txn,
             new_forward_extremities=new_forward_extremeties,
@@ -713,7 +714,7 @@ class EventsStore(SQLBaseStore):
             backfilled=backfilled,
         )
 
-    def _update_current_state_txn(self, txn, state_delta_by_room):
+    def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
         for room_id, current_state_tuple in state_delta_by_room.iteritems():
                 to_delete, to_insert, _ = current_state_tuple
                 txn.executemany(
@@ -735,6 +736,29 @@ class EventsStore(SQLBaseStore):
                     ],
                 )
 
+                state_deltas = {key: None for key in to_delete}
+                state_deltas.update(to_insert)
+
+                self._simple_insert_many_txn(
+                    txn,
+                    table="current_state_delta_stream",
+                    values=[
+                        {
+                            "stream_id": max_stream_order,
+                            "room_id": room_id,
+                            "type": key[0],
+                            "state_key": key[1],
+                            "event_id": ev_id,
+                            "prev_event_id": to_delete.get(key, None),
+                        }
+                        for key, ev_id in state_deltas.iteritems()
+                    ]
+                )
+
+                self._curr_state_delta_stream_cache.entity_has_changed(
+                    room_id, max_stream_order,
+                )
+
                 # Invalidate the various caches
 
                 # Figure out the changes of membership to invalidate the
@@ -743,11 +767,7 @@ class EventsStore(SQLBaseStore):
                 # and which we have added, then we invlidate the caches for all
                 # those users.
                 members_changed = set(
-                    state_key for ev_type, state_key in to_delete.iterkeys()
-                    if ev_type == EventTypes.Member
-                )
-                members_changed.update(
-                    state_key for ev_type, state_key in to_insert.iterkeys()
+                    state_key for ev_type, state_key in state_deltas
                     if ev_type == EventTypes.Member
                 )
 
diff --git a/synapse/storage/schema/delta/42/current_state_delta.sql b/synapse/storage/schema/delta/42/current_state_delta.sql
new file mode 100644
index 0000000000..d28851aff8
--- /dev/null
+++ b/synapse/storage/schema/delta/42/current_state_delta.sql
@@ -0,0 +1,26 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE current_state_delta_stream (
+    stream_id BIGINT NOT NULL,
+    room_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    state_key TEXT NOT NULL,
+    event_id TEXT,  -- Is null if the key was removed
+    prev_event_id TEXT  -- Is null if the key was added
+);
+
+CREATE INDEX current_state_delta_stream_idx ON current_state_delta_stream(stream_id);
diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py
new file mode 100644
index 0000000000..ea6a18196d
--- /dev/null
+++ b/synapse/storage/schema/delta/42/user_dir.py
@@ -0,0 +1,84 @@
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.storage.prepare_database import get_statements
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+
+logger = logging.getLogger(__name__)
+
+
+BOTH_TABLES = """
+CREATE TABLE user_directory_stream_pos (
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
+    stream_id BIGINT,
+    CHECK (Lock='X')
+);
+
+INSERT INTO user_directory_stream_pos (stream_id) VALUES (null);
+
+CREATE TABLE user_directory (
+    user_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,  -- A room_id that we know the user is joined to
+    display_name TEXT,
+    avatar_url TEXT
+);
+
+CREATE INDEX user_directory_room_idx ON user_directory(room_id);
+CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id);
+
+CREATE TABLE users_in_pubic_room (
+    user_id TEXT NOT NULL,
+    room_id TEXT NOT NULL  -- A room_id that we know is public
+);
+
+CREATE INDEX users_in_pubic_room_room_idx ON users_in_pubic_room(room_id);
+CREATE UNIQUE INDEX users_in_pubic_room_user_idx ON users_in_pubic_room(user_id);
+"""
+
+
+POSTGRES_TABLE = """
+CREATE TABLE user_directory_search (
+    user_id TEXT NOT NULL,
+    vector tsvector
+);
+
+CREATE INDEX user_directory_search_fts_idx ON user_directory_search USING gin(vector);
+CREATE UNIQUE INDEX user_directory_search_user_idx ON user_directory_search(user_id);
+"""
+
+
+SQLITE_TABLE = """
+CREATE VIRTUAL TABLE user_directory_search
+    USING fts4 ( user_id, value );
+"""
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    for statement in get_statements(BOTH_TABLES.splitlines()):
+        cur.execute(statement)
+
+    if isinstance(database_engine, PostgresEngine):
+        for statement in get_statements(POSTGRES_TABLE.splitlines()):
+            cur.execute(statement)
+    elif isinstance(database_engine, Sqlite3Engine):
+        for statement in get_statements(SQLITE_TABLE.splitlines()):
+            cur.execute(statement)
+    else:
+        raise Exception("Unrecognized database engine")
+
+
+def run_upgrade(*args, **kwargs):
+    pass
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
new file mode 100644
index 0000000000..6a4bf63f0d
--- /dev/null
+++ b/synapse/storage/user_directory.py
@@ -0,0 +1,461 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from ._base import SQLBaseStore
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from synapse.api.constants import EventTypes, JoinRules
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.types import get_domain_from_id, get_localpart_from_id
+
+import re
+
+
+class UserDirectoryStore(SQLBaseStore):
+
+    @cachedInlineCallbacks(cache_context=True)
+    def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context):
+        """Check if the room is either world_readable or publically joinable
+        """
+        current_state_ids = yield self.get_current_state_ids(
+            room_id, on_invalidate=cache_context.invalidate
+        )
+
+        join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
+        if join_rules_id:
+            join_rule_ev = yield self.get_event(join_rules_id, allow_none=True)
+            if join_rule_ev:
+                if join_rule_ev.content.get("join_rule") == JoinRules.PUBLIC:
+                    defer.returnValue(True)
+
+        hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, ""))
+        if hist_vis_id:
+            hist_vis_ev = yield self.get_event(hist_vis_id, allow_none=True)
+            if hist_vis_ev:
+                if hist_vis_ev.content.get("history_visibility") == "world_readable":
+                    defer.returnValue(True)
+
+        defer.returnValue(False)
+
+    @defer.inlineCallbacks
+    def add_users_to_public_room(self, room_id, user_ids):
+        """Add user to the list of users in public rooms
+
+        Args:
+            room_id (str): A room_id that all users are in that is world_readable
+                or publically joinable
+            user_ids (list(str)): Users to add
+        """
+        yield self._simple_insert_many(
+            table="users_in_pubic_room",
+            values=[
+                {
+                    "user_id": user_id,
+                    "room_id": room_id,
+                }
+                for user_id in user_ids
+            ],
+            desc="add_users_to_public_room"
+        )
+        for user_id in user_ids:
+            self.get_user_in_public_room.invalidate((user_id,))
+
+    def add_profiles_to_user_dir(self, room_id, users_with_profile):
+        """Add profiles to the user directory
+
+        Args:
+            room_id (str): A room_id that all users are joined to
+            users_with_profile (dict): Users to add to directory in the form of
+                mapping of user_id -> ProfileInfo
+        """
+        if isinstance(self.database_engine, PostgresEngine):
+            # We weight the loclpart most highly, then display name and finally
+            # server name
+            sql = """
+                INSERT INTO user_directory_search(user_id, vector)
+                VALUES (?,
+                    setweight(to_tsvector('english', ?), 'A')
+                    || setweight(to_tsvector('english', ?), 'D')
+                    || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+                )
+            """
+            args = (
+                (
+                    user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id),
+                    profile.display_name,
+                )
+                for user_id, profile in users_with_profile.iteritems()
+            )
+        elif isinstance(self.database_engine, Sqlite3Engine):
+            sql = """
+                INSERT INTO user_directory_search(user_id, value)
+                VALUES (?,?)
+            """
+            args = (
+                (
+                    user_id,
+                    "%s %s" % (user_id, p.display_name,) if p.display_name else user_id
+                )
+                for user_id, p in users_with_profile.iteritems()
+            )
+        else:
+            # This should be unreachable.
+            raise Exception("Unrecognized database engine")
+
+        def _add_profiles_to_user_dir_txn(txn):
+            txn.executemany(sql, args)
+            self._simple_insert_many_txn(
+                txn,
+                table="user_directory",
+                values=[
+                    {
+                        "user_id": user_id,
+                        "room_id": room_id,
+                        "display_name": profile.display_name,
+                        "avatar_url": profile.avatar_url,
+                    }
+                    for user_id, profile in users_with_profile.iteritems()
+                ]
+            )
+            for user_id in users_with_profile:
+                txn.call_after(
+                    self.get_user_in_directory.invalidate, (user_id,)
+                )
+
+        return self.runInteraction(
+            "add_profiles_to_user_dir", _add_profiles_to_user_dir_txn
+        )
+
+    @defer.inlineCallbacks
+    def update_user_in_user_dir(self, user_id, room_id):
+        yield self._simple_update_one(
+            table="user_directory",
+            keyvalues={"user_id": user_id},
+            updatevalues={"room_id": room_id},
+            desc="update_user_in_user_dir",
+        )
+        self.get_user_in_directory.invalidate((user_id,))
+
+    def update_profile_in_user_dir(self, user_id, display_name, avatar_url):
+        def _update_profile_in_user_dir_txn(txn):
+            self._simple_update_one_txn(
+                txn,
+                table="user_directory",
+                keyvalues={"user_id": user_id},
+                updatevalues={"display_name": display_name, "avatar_url": avatar_url},
+            )
+
+            if isinstance(self.database_engine, PostgresEngine):
+                # We weight the loclpart most highly, then display name and finally
+                # server name
+                sql = """
+                    UPDATE user_directory_search
+                    SET vector = setweight(to_tsvector('english', ?), 'A')
+                        || setweight(to_tsvector('english', ?), 'D')
+                        || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+                    WHERE user_id = ?
+                """
+                args = (
+                    get_localpart_from_id(user_id), get_domain_from_id(user_id),
+                    display_name,
+                    user_id,
+                )
+            elif isinstance(self.database_engine, Sqlite3Engine):
+                sql = """
+                    UPDATE user_directory_search
+                    set value = ?
+                    WHERE user_id = ?
+                """
+                args = (
+                    "%s %s" % (user_id, display_name,) if display_name else user_id,
+                    user_id,
+                )
+            else:
+                # This should be unreachable.
+                raise Exception("Unrecognized database engine")
+
+            txn.execute(sql, args)
+
+            txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
+
+        return self.runInteraction(
+            "update_profile_in_user_dir", _update_profile_in_user_dir_txn
+        )
+
+    @defer.inlineCallbacks
+    def update_user_in_public_user_list(self, user_id, room_id):
+        yield self._simple_update_one(
+            table="users_in_pubic_room",
+            keyvalues={"user_id": user_id},
+            updatevalues={"room_id": room_id},
+            desc="update_user_in_public_user_list",
+        )
+        self.get_user_in_public_room.invalidate((user_id,))
+
+    def remove_from_user_dir(self, user_id):
+        def _remove_from_user_dir_txn(txn):
+            self._simple_delete_txn(
+                txn,
+                table="user_directory",
+                keyvalues={"user_id": user_id},
+            )
+            self._simple_delete_txn(
+                txn,
+                table="user_directory_search",
+                keyvalues={"user_id": user_id},
+            )
+            self._simple_delete_txn(
+                txn,
+                table="users_in_pubic_room",
+                keyvalues={"user_id": user_id},
+            )
+            txn.call_after(
+                self.get_user_in_directory.invalidate, (user_id,)
+            )
+            txn.call_after(
+                self.get_user_in_public_room.invalidate, (user_id,)
+            )
+        return self.runInteraction(
+            "remove_from_user_dir", _remove_from_user_dir_txn,
+        )
+
+    @defer.inlineCallbacks
+    def remove_from_user_in_public_room(self, user_id):
+        yield self._simple_delete(
+            table="users_in_pubic_room",
+            keyvalues={"user_id": user_id},
+            desc="remove_from_user_in_public_room",
+        )
+        self.get_user_in_public_room.invalidate((user_id,))
+
+    def get_users_in_public_due_to_room(self, room_id):
+        """Get all user_ids that are in the room directory becuase they're
+        in the given room_id
+        """
+        return self._simple_select_onecol(
+            table="users_in_pubic_room",
+            keyvalues={"room_id": room_id},
+            retcol="user_id",
+            desc="get_users_in_public_due_to_room",
+        )
+
+    def get_users_in_dir_due_to_room(self, room_id):
+        """Get all user_ids that are in the room directory becuase they're
+        in the given room_id
+        """
+        return self._simple_select_onecol(
+            table="user_directory",
+            keyvalues={"room_id": room_id},
+            retcol="user_id",
+            desc="get_users_in_dir_due_to_room",
+        )
+
+    def get_all_rooms(self):
+        """Get all room_ids we've ever known about
+        """
+        return self._simple_select_onecol(
+            table="current_state_events",
+            keyvalues={},
+            retcol="DISTINCT room_id",
+            desc="get_all_rooms",
+        )
+
+    def delete_all_from_user_dir(self):
+        """Delete the entire user directory
+        """
+        def _delete_all_from_user_dir_txn(txn):
+            txn.execute("DELETE FROM user_directory")
+            txn.execute("DELETE FROM user_directory_search")
+            txn.execute("DELETE FROM users_in_pubic_room")
+            txn.call_after(self.get_user_in_directory.invalidate_all)
+            txn.call_after(self.get_user_in_public_room.invalidate_all)
+        return self.runInteraction(
+            "delete_all_from_user_dir", _delete_all_from_user_dir_txn
+        )
+
+    @cached()
+    def get_user_in_directory(self, user_id):
+        return self._simple_select_one(
+            table="user_directory",
+            keyvalues={"user_id": user_id},
+            retcols=("room_id", "display_name", "avatar_url",),
+            allow_none=True,
+            desc="get_user_in_directory",
+        )
+
+    @cached()
+    def get_user_in_public_room(self, user_id):
+        return self._simple_select_one(
+            table="users_in_pubic_room",
+            keyvalues={"user_id": user_id},
+            retcols=("room_id",),
+            allow_none=True,
+            desc="get_user_in_public_room",
+        )
+
+    def get_user_directory_stream_pos(self):
+        return self._simple_select_one_onecol(
+            table="user_directory_stream_pos",
+            keyvalues={},
+            retcol="stream_id",
+            desc="get_user_directory_stream_pos",
+        )
+
+    def update_user_directory_stream_pos(self, stream_id):
+        return self._simple_update_one(
+            table="user_directory_stream_pos",
+            keyvalues={},
+            updatevalues={"stream_id": stream_id},
+            desc="update_user_directory_stream_pos",
+        )
+
+    def get_current_state_deltas(self, prev_stream_id):
+        prev_stream_id = int(prev_stream_id)
+        if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id):
+            return []
+
+        def get_current_state_deltas_txn(txn):
+            # First we calculate the max stream id that will give us less than
+            # N results.
+            # We arbitarily limit to 100 stream_id entries to ensure we don't
+            # select toooo many.
+            sql = """
+                SELECT stream_id, count(*)
+                FROM current_state_delta_stream
+                WHERE stream_id > ?
+                GROUP BY stream_id
+                ORDER BY stream_id ASC
+                LIMIT 100
+            """
+            txn.execute(sql, (prev_stream_id,))
+
+            total = 0
+            max_stream_id = prev_stream_id
+            for max_stream_id, count in txn:
+                total += count
+                if total > 100:
+                    # We arbitarily limit to 100 entries to ensure we don't
+                    # select toooo many.
+                    break
+
+            # Now actually get the deltas
+            sql = """
+                SELECT stream_id, room_id, type, state_key, event_id, prev_event_id
+                FROM current_state_delta_stream
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+            """
+            txn.execute(sql, (prev_stream_id, max_stream_id,))
+            return self.cursor_to_dict(txn)
+
+        return self.runInteraction(
+            "get_current_state_deltas", get_current_state_deltas_txn
+        )
+
+    def get_max_stream_id_in_current_state_deltas(self):
+        return self._simple_select_one_onecol(
+            table="current_state_delta_stream",
+            keyvalues={},
+            retcol="COALESCE(MAX(stream_id), -1)",
+            desc="get_max_stream_id_in_current_state_deltas",
+        )
+
+    @defer.inlineCallbacks
+    def search_user_dir(self, search_term, limit):
+        """Searches for users in directory
+
+        Returns:
+            dict of the form::
+
+                {
+                    "limited": <bool>,  # whether there were more results or not
+                    "results": [  # Ordered by best match first
+                        {
+                            "user_id": <user_id>,
+                            "display_name": <display_name>,
+                            "avatar_url": <avatar_url>
+                        }
+                    ]
+                }
+        """
+
+        search_query = _parse_query(self.database_engine, search_term)
+
+        if isinstance(self.database_engine, PostgresEngine):
+            # We order by rank and then if they have profile info
+            sql = """
+                SELECT user_id, display_name, avatar_url
+                FROM user_directory_search
+                INNER JOIN user_directory USING (user_id)
+                INNER JOIN users_in_pubic_room USING (user_id)
+                WHERE vector @@ to_tsquery('english', ?)
+                ORDER BY
+                    ts_rank_cd(vector, to_tsquery('english', ?), 1) DESC,
+                    display_name IS NULL,
+                    avatar_url IS NULL
+                LIMIT ?
+            """
+            args = (search_query, search_query, limit + 1,)
+        elif isinstance(self.database_engine, Sqlite3Engine):
+            sql = """
+                SELECT user_id, display_name, avatar_url
+                FROM user_directory_search
+                INNER JOIN user_directory USING (user_id)
+                INNER JOIN users_in_pubic_room USING (user_id)
+                WHERE value MATCH ?
+                ORDER BY
+                    rank(matchinfo(user_directory_search)) DESC,
+                    display_name IS NULL,
+                    avatar_url IS NULL
+                LIMIT ?
+            """
+            args = (search_query, limit + 1)
+        else:
+            # This should be unreachable.
+            raise Exception("Unrecognized database engine")
+
+        results = yield self._execute(
+            "search_user_dir", self.cursor_to_dict, sql, *args
+        )
+
+        limited = len(results) > limit
+
+        defer.returnValue({
+            "limited": limited,
+            "results": results,
+        })
+
+
+def _parse_query(database_engine, search_term):
+    """Takes a plain unicode string from the user and converts it into a form
+    that can be passed to database.
+    We use this so that we can add prefix matching, which isn't something
+    that is supported by default.
+
+    We specifically add both a prefix and non prefix matching term so that
+    exact matches get ranked higher.
+    """
+
+    # Pull out the individual words, discarding any non-word characters.
+    results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
+
+    if isinstance(database_engine, PostgresEngine):
+        return " & ".join("(%s:* | %s)" % (result, result,) for result in results)
+    elif isinstance(database_engine, Sqlite3Engine):
+        return " & ".join("(%s* | %s)" % (result, result,) for result in results)
+    else:
+        # This should be unreachable.
+        raise Exception("Unrecognized database engine")
diff --git a/synapse/types.py b/synapse/types.py
index 445bdcb4d7..111948540d 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -62,6 +62,13 @@ def get_domain_from_id(string):
     return string[idx + 1:]
 
 
+def get_localpart_from_id(string):
+    idx = string.find(":")
+    if idx == -1:
+        raise SynapseError(400, "Invalid ID: %r" % (string,))
+    return string[1:idx]
+
+
 class DomainSpecificString(
         namedtuple("DomainSpecificString", ("localpart", "domain"))
 ):
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 70fe00ce0b..c498aee46c 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -89,6 +89,21 @@ class StreamChangeCache(object):
 
         return result
 
+    def has_any_entity_changed(self, stream_pos):
+        """Returns if any entity has changed
+        """
+        assert type(stream_pos) is int
+
+        if stream_pos >= self._earliest_known_stream_pos:
+            self.metrics.inc_hits()
+            if stream_pos >= max(self._cache):
+                return False
+            else:
+                return True
+        else:
+            self.metrics.inc_misses()
+            return True
+
     def get_all_entities_changed(self, stream_pos):
         """Returns all entites that have had new things since the given
         position. If the position is too old it will return None.