summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2016-03-29 01:20:25 +0100
committerMatthew Hodgson <matthew@matrix.org>2016-03-29 01:20:25 +0100
commite0c2490a145550310edb3ad0a414aa1840ef3ce4 (patch)
tree3d9ba7834e859171397ac7136ccb82123b4cd8ce /synapse
parentMerge branch 'develop' into matthew/preview_urls (diff)
parenttypo (diff)
downloadsynapse-e0c2490a145550310edb3ad0a414aa1840ef3ce4.tar.xz
Merge branch 'develop' into matthew/preview_urls
Diffstat (limited to '')
-rw-r--r--synapse/api/auth.py54
-rw-r--r--synapse/events/__init__.py5
-rw-r--r--synapse/federation/federation_client.py1
-rw-r--r--synapse/federation/federation_server.py1
-rw-r--r--synapse/federation/transport/server.py28
-rw-r--r--synapse/handlers/directory.py22
-rw-r--r--synapse/handlers/federation.py40
-rw-r--r--synapse/handlers/room.py58
-rw-r--r--synapse/handlers/sync.py16
-rw-r--r--synapse/http/server.py10
-rw-r--r--synapse/push/__init__.py2
-rw-r--r--synapse/push/baserules.py41
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py4
-rw-r--r--synapse/rest/client/v1/directory.py42
-rw-r--r--synapse/rest/client/v2_alpha/sync.py3
-rw-r--r--synapse/state.py143
-rw-r--r--synapse/storage/_base.py11
-rw-r--r--synapse/storage/directory.py2
-rw-r--r--synapse/storage/event_push_actions.py2
-rw-r--r--synapse/storage/events.py51
-rw-r--r--synapse/storage/receipts.py21
-rw-r--r--synapse/storage/room.py8
-rw-r--r--synapse/storage/roommember.py10
-rw-r--r--synapse/storage/schema/delta/30/public_rooms.sql23
-rw-r--r--synapse/storage/state.py128
-rw-r--r--synapse/storage/stream.py33
-rw-r--r--synapse/util/caches/__init__.py57
-rw-r--r--synapse/util/caches/lrucache.py73
-rw-r--r--synapse/util/caches/response_cache.py46
29 files changed, 616 insertions, 319 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 3038df4ab8..4f9c3c9db8 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -814,17 +814,16 @@ class Auth(object):
 
         return auth_ids
 
-    @log_function
-    def _can_send_event(self, event, auth_events):
+    def _get_send_level(self, etype, state_key, auth_events):
         key = (EventTypes.PowerLevels, "", )
         send_level_event = auth_events.get(key)
         send_level = None
         if send_level_event:
             send_level = send_level_event.content.get("events", {}).get(
-                event.type
+                etype
             )
             if send_level is None:
-                if hasattr(event, "state_key"):
+                if state_key is not None:
                     send_level = send_level_event.content.get(
                         "state_default", 50
                     )
@@ -838,6 +837,13 @@ class Auth(object):
         else:
             send_level = 0
 
+        return send_level
+
+    @log_function
+    def _can_send_event(self, event, auth_events):
+        send_level = self._get_send_level(
+            event.type, event.get("state_key", None), auth_events
+        )
         user_level = self._get_user_power_level(event.user_id, auth_events)
 
         if user_level < send_level:
@@ -982,3 +988,43 @@ class Auth(object):
                     "You don't have permission to add ops level greater "
                     "than your own"
                 )
+
+    @defer.inlineCallbacks
+    def check_can_change_room_list(self, room_id, user):
+        """Check if the user is allowed to edit the room's entry in the
+        published room list.
+
+        Args:
+            room_id (str)
+            user (UserID)
+        """
+
+        is_admin = yield self.is_server_admin(user)
+        if is_admin:
+            defer.returnValue(True)
+
+        user_id = user.to_string()
+        yield self.check_joined_room(room_id, user_id)
+
+        # We currently require the user is a "moderator" in the room. We do this
+        # by checking if they would (theoretically) be able to change the
+        # m.room.aliases events
+        power_level_event = yield self.state.get_current_state(
+            room_id, EventTypes.PowerLevels, ""
+        )
+
+        auth_events = {}
+        if power_level_event:
+            auth_events[(EventTypes.PowerLevels, "")] = power_level_event
+
+        send_level = self._get_send_level(
+            EventTypes.Aliases, "", auth_events
+        )
+        user_level = self._get_user_power_level(user_id, auth_events)
+
+        if user_level < send_level:
+            raise AuthError(
+                403,
+                "This server requires you to be a moderator in the room to"
+                " edit its room list entry"
+            )
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index abed6b5e6b..23f8b612ae 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from synapse.util.frozenutils import freeze
+from synapse.util.caches import intern_dict
 
 
 # Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents
@@ -140,6 +141,10 @@ class FrozenEvent(EventBase):
 
         unsigned = dict(event_dict.pop("unsigned", {}))
 
+        # We intern these strings because they turn up a lot (especially when
+        # caching).
+        event_dict = intern_dict(event_dict)
+
         if USE_FROZEN_DICTS:
             frozen_dict = freeze(event_dict)
         else:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 83c1f46586..37ee469fa2 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -418,6 +418,7 @@ class FederationClient(FederationBase):
                     "Failed to make_%s via %s: %s",
                     membership, destination, e.message
                 )
+                raise
 
         raise RuntimeError("Failed to send to any server.")
 
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 76820b924b..429ab6ddec 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -531,7 +531,6 @@ class FederationServer(FederationBase):
         yield self.handler.on_receive_pdu(
             origin,
             pdu,
-            backfilled=False,
             state=state,
             auth_chain=auth_chain,
         )
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 208bff8d4f..d65a7893d8 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -175,7 +175,7 @@ class BaseFederationServlet(object):
 
 
 class FederationSendServlet(BaseFederationServlet):
-    PATH = "/send/([^/]*)/"
+    PATH = "/send/(?P<transaction_id>[^/]*)/"
 
     def __init__(self, handler, server_name, **kwargs):
         super(FederationSendServlet, self).__init__(
@@ -250,7 +250,7 @@ class FederationPullServlet(BaseFederationServlet):
 
 
 class FederationEventServlet(BaseFederationServlet):
-    PATH = "/event/([^/]*)/"
+    PATH = "/event/(?P<event_id>[^/]*)/"
 
     # This is when someone asks for a data item for a given server data_id pair.
     def on_GET(self, origin, content, query, event_id):
@@ -258,7 +258,7 @@ class FederationEventServlet(BaseFederationServlet):
 
 
 class FederationStateServlet(BaseFederationServlet):
-    PATH = "/state/([^/]*)/"
+    PATH = "/state/(?P<context>[^/]*)/"
 
     # This is when someone asks for all data for a given context.
     def on_GET(self, origin, content, query, context):
@@ -270,7 +270,7 @@ class FederationStateServlet(BaseFederationServlet):
 
 
 class FederationBackfillServlet(BaseFederationServlet):
-    PATH = "/backfill/([^/]*)/"
+    PATH = "/backfill/(?P<context>[^/]*)/"
 
     def on_GET(self, origin, content, query, context):
         versions = query["v"]
@@ -285,7 +285,7 @@ class FederationBackfillServlet(BaseFederationServlet):
 
 
 class FederationQueryServlet(BaseFederationServlet):
-    PATH = "/query/([^/]*)"
+    PATH = "/query/(?P<query_type>[^/]*)"
 
     # This is when we receive a server-server Query
     def on_GET(self, origin, content, query, query_type):
@@ -296,7 +296,7 @@ class FederationQueryServlet(BaseFederationServlet):
 
 
 class FederationMakeJoinServlet(BaseFederationServlet):
-    PATH = "/make_join/([^/]*)/([^/]*)"
+    PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
 
     @defer.inlineCallbacks
     def on_GET(self, origin, content, query, context, user_id):
@@ -305,7 +305,7 @@ class FederationMakeJoinServlet(BaseFederationServlet):
 
 
 class FederationMakeLeaveServlet(BaseFederationServlet):
-    PATH = "/make_leave/([^/]*)/([^/]*)"
+    PATH = "/make_leave/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
 
     @defer.inlineCallbacks
     def on_GET(self, origin, content, query, context, user_id):
@@ -314,7 +314,7 @@ class FederationMakeLeaveServlet(BaseFederationServlet):
 
 
 class FederationSendLeaveServlet(BaseFederationServlet):
-    PATH = "/send_leave/([^/]*)/([^/]*)"
+    PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<txid>[^/]*)"
 
     @defer.inlineCallbacks
     def on_PUT(self, origin, content, query, room_id, txid):
@@ -323,14 +323,14 @@ class FederationSendLeaveServlet(BaseFederationServlet):
 
 
 class FederationEventAuthServlet(BaseFederationServlet):
-    PATH = "/event_auth/([^/]*)/([^/]*)"
+    PATH = "/event_auth(?P<context>[^/]*)/(?P<event_id>[^/]*)"
 
     def on_GET(self, origin, content, query, context, event_id):
         return self.handler.on_event_auth(origin, context, event_id)
 
 
 class FederationSendJoinServlet(BaseFederationServlet):
-    PATH = "/send_join/([^/]*)/([^/]*)"
+    PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
 
     @defer.inlineCallbacks
     def on_PUT(self, origin, content, query, context, event_id):
@@ -341,7 +341,7 @@ class FederationSendJoinServlet(BaseFederationServlet):
 
 
 class FederationInviteServlet(BaseFederationServlet):
-    PATH = "/invite/([^/]*)/([^/]*)"
+    PATH = "/invite/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
 
     @defer.inlineCallbacks
     def on_PUT(self, origin, content, query, context, event_id):
@@ -352,7 +352,7 @@ class FederationInviteServlet(BaseFederationServlet):
 
 
 class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
-    PATH = "/exchange_third_party_invite/([^/]*)"
+    PATH = "/exchange_third_party_invite/(?P<room_id>[^/]*)"
 
     @defer.inlineCallbacks
     def on_PUT(self, origin, content, query, room_id):
@@ -381,7 +381,7 @@ class FederationClientKeysClaimServlet(BaseFederationServlet):
 
 
 class FederationQueryAuthServlet(BaseFederationServlet):
-    PATH = "/query_auth/([^/]*)/([^/]*)"
+    PATH = "/query_auth/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
 
     @defer.inlineCallbacks
     def on_POST(self, origin, content, query, context, event_id):
@@ -394,7 +394,7 @@ class FederationQueryAuthServlet(BaseFederationServlet):
 
 class FederationGetMissingEventsServlet(BaseFederationServlet):
     # TODO(paul): Why does this path alone end with "/?" optional?
-    PATH = "/get_missing_events/([^/]*)/?"
+    PATH = "/get_missing_events/(?P<room_id>[^/]*)/?"
 
     @defer.inlineCallbacks
     def on_POST(self, origin, content, query, room_id):
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 6bcc5a5e2b..8eeb225811 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -317,3 +317,25 @@ class DirectoryHandler(BaseHandler):
 
         is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))
         defer.returnValue(is_admin)
+
+    @defer.inlineCallbacks
+    def edit_published_room_list(self, requester, room_id, visibility):
+        """Edit the entry of the room in the published room list.
+
+        requester
+        room_id (str)
+        visibility (str): "public" or "private"
+        """
+        if requester.is_guest:
+            raise AuthError(403, "Guests cannot edit the published room list")
+
+        if visibility not in ["public", "private"]:
+            raise SynapseError(400, "Invalid visibility setting")
+
+        room = yield self.store.get_room(room_id)
+        if room is None:
+            raise SynapseError(400, "Unknown room")
+
+        yield self.auth.check_can_change_room_list(room_id, requester.user)
+
+        yield self.store.set_room_is_public(room_id, visibility == "public")
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f599e817aa..267fedf114 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -102,7 +102,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def on_receive_pdu(self, origin, pdu, backfilled, state=None,
+    def on_receive_pdu(self, origin, pdu, state=None,
                        auth_chain=None):
         """ Called by the ReplicationLayer when we have a new pdu. We need to
         do auth checks and put it through the StateHandler.
@@ -123,7 +123,6 @@ class FederationHandler(BaseHandler):
 
         # FIXME (erikj): Awful hack to make the case where we are not currently
         # in the room work
-        current_state = None
         is_in_room = yield self.auth.check_host_in_room(
             event.room_id,
             self.server_name
@@ -186,8 +185,6 @@ class FederationHandler(BaseHandler):
                     origin,
                     event,
                     state=state,
-                    backfilled=backfilled,
-                    current_state=current_state,
                 )
             except AuthError as e:
                 raise FederationError(
@@ -216,18 +213,17 @@ class FederationHandler(BaseHandler):
             except StoreError:
                 logger.exception("Failed to store room.")
 
-        if not backfilled:
-            extra_users = []
-            if event.type == EventTypes.Member:
-                target_user_id = event.state_key
-                target_user = UserID.from_string(target_user_id)
-                extra_users.append(target_user)
+        extra_users = []
+        if event.type == EventTypes.Member:
+            target_user_id = event.state_key
+            target_user = UserID.from_string(target_user_id)
+            extra_users.append(target_user)
 
-            with PreserveLoggingContext():
-                self.notifier.on_new_room_event(
-                    event, event_stream_id, max_stream_id,
-                    extra_users=extra_users
-                )
+        with PreserveLoggingContext():
+            self.notifier.on_new_room_event(
+                event, event_stream_id, max_stream_id,
+                extra_users=extra_users
+            )
 
         if event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
@@ -647,7 +643,7 @@ class FederationHandler(BaseHandler):
                     continue
 
                 try:
-                    self.on_receive_pdu(origin, p, backfilled=False)
+                    self.on_receive_pdu(origin, p)
                 except:
                     logger.exception("Couldn't handle pdu")
 
@@ -779,7 +775,6 @@ class FederationHandler(BaseHandler):
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event,
             context=context,
-            backfilled=False,
         )
 
         target_user = UserID.from_string(event.state_key)
@@ -819,7 +814,6 @@ class FederationHandler(BaseHandler):
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event,
             context=context,
-            backfilled=False,
         )
 
         target_user = UserID.from_string(event.state_key)
@@ -1074,8 +1068,7 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def _handle_new_event(self, origin, event, state=None, backfilled=False,
-                          current_state=None, auth_events=None):
+    def _handle_new_event(self, origin, event, state=None, auth_events=None):
 
         outlier = event.internal_metadata.is_outlier()
 
@@ -1085,7 +1078,7 @@ class FederationHandler(BaseHandler):
             auth_events=auth_events,
         )
 
-        if not backfilled and not event.internal_metadata.is_outlier():
+        if not event.internal_metadata.is_outlier():
             action_generator = ActionGenerator(self.hs)
             yield action_generator.handle_push_actions_for_event(
                 event, context, self
@@ -1094,9 +1087,7 @@ class FederationHandler(BaseHandler):
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event,
             context=context,
-            backfilled=backfilled,
-            is_new_state=(not outlier and not backfilled),
-            current_state=current_state,
+            is_new_state=not outlier,
         )
 
         defer.returnValue((context, event_stream_id, max_stream_id))
@@ -1194,7 +1185,6 @@ class FederationHandler(BaseHandler):
 
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event, new_event_context,
-            backfilled=False,
             is_new_state=True,
             current_state=state,
         )
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 051468989f..133183a257 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -25,6 +25,7 @@ from synapse.api.constants import (
 from synapse.api.errors import AuthError, StoreError, SynapseError, Codes
 from synapse.util import stringutils, unwrapFirstError
 from synapse.util.logcontext import preserve_context_over_fn
+from synapse.util.caches.response_cache import ResponseCache
 
 from signedjson.sign import verify_signed_json
 from signedjson.key import decode_verify_key_bytes
@@ -119,7 +120,8 @@ class RoomCreationHandler(BaseHandler):
 
         invite_3pid_list = config.get("invite_3pid", [])
 
-        is_public = config.get("visibility", None) == "public"
+        visibility = config.get("visibility", None)
+        is_public = visibility == "public"
 
         # autogen room IDs and try to create it. We may clash, so just
         # try a few times till one goes through, giving up eventually.
@@ -155,9 +157,9 @@ class RoomCreationHandler(BaseHandler):
 
         preset_config = config.get(
             "preset",
-            RoomCreationPreset.PUBLIC_CHAT
-            if is_public
-            else RoomCreationPreset.PRIVATE_CHAT
+            RoomCreationPreset.PRIVATE_CHAT
+            if visibility == "private"
+            else RoomCreationPreset.PUBLIC_CHAT
         )
 
         raw_initial_state = config.get("initial_state", [])
@@ -938,61 +940,79 @@ class RoomMemberHandler(BaseHandler):
 
 
 class RoomListHandler(BaseHandler):
+    def __init__(self, hs):
+        super(RoomListHandler, self).__init__(hs)
+        self.response_cache = ResponseCache()
 
-    @defer.inlineCallbacks
     def get_public_room_list(self):
+        result = self.response_cache.get(())
+        if not result:
+            result = self.response_cache.set((), self._get_public_room_list())
+        return result
+
+    @defer.inlineCallbacks
+    def _get_public_room_list(self):
         room_ids = yield self.store.get_public_room_ids()
 
         @defer.inlineCallbacks
         def handle_room(room_id):
             aliases = yield self.store.get_aliases_for_room(room_id)
-            if not aliases:
-                defer.returnValue(None)
 
-            state = yield self.state_handler.get_current_state(room_id)
+            # We pull each bit of state out indvidually to avoid pulling the
+            # full state into memory. Due to how the caching works this should
+            # be fairly quick, even if not originally in the cache.
+            def get_state(etype, state_key):
+                return self.state_handler.get_current_state(room_id, etype, state_key)
+
+            # Double check that this is actually a public room.
+            join_rules_event = yield get_state(EventTypes.JoinRules, "")
+            if join_rules_event:
+                join_rule = join_rules_event.content.get("join_rule", None)
+                if join_rule and join_rule != JoinRules.PUBLIC:
+                    defer.returnValue(None)
 
-            result = {"aliases": aliases, "room_id": room_id}
+            result = {"room_id": room_id}
+            if aliases:
+                result["aliases"] = aliases
 
-            name_event = state.get((EventTypes.Name, ""), None)
+            name_event = yield get_state(EventTypes.Name, "")
             if name_event:
                 name = name_event.content.get("name", None)
                 if name:
                     result["name"] = name
 
-            topic_event = state.get((EventTypes.Topic, ""), None)
+            topic_event = yield get_state(EventTypes.Topic, "")
             if topic_event:
                 topic = topic_event.content.get("topic", None)
                 if topic:
                     result["topic"] = topic
 
-            canonical_event = state.get((EventTypes.CanonicalAlias, ""), None)
+            canonical_event = yield get_state(EventTypes.CanonicalAlias, "")
             if canonical_event:
                 canonical_alias = canonical_event.content.get("alias", None)
                 if canonical_alias:
                     result["canonical_alias"] = canonical_alias
 
-            visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
+            visibility_event = yield get_state(EventTypes.RoomHistoryVisibility, "")
             visibility = None
             if visibility_event:
                 visibility = visibility_event.content.get("history_visibility", None)
             result["world_readable"] = visibility == "world_readable"
 
-            guest_event = state.get((EventTypes.GuestAccess, ""), None)
+            guest_event = yield get_state(EventTypes.GuestAccess, "")
             guest = None
             if guest_event:
                 guest = guest_event.content.get("guest_access", None)
             result["guest_can_join"] = guest == "can_join"
 
-            avatar_event = state.get(("m.room.avatar", ""), None)
+            avatar_event = yield get_state("m.room.avatar", "")
             if avatar_event:
                 avatar_url = avatar_event.content.get("url", None)
                 if avatar_url:
                     result["avatar_url"] = avatar_url
 
-            result["num_joined_members"] = sum(
-                1 for (event_type, _), ev in state.items()
-                if event_type == EventTypes.Member and ev.membership == Membership.JOIN
-            )
+            joined_users = yield self.store.get_users_in_room(room_id)
+            result["num_joined_members"] = len(joined_users)
 
             defer.returnValue(result)
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 1f6fde8e8a..48ab5707e1 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,6 +20,7 @@ from synapse.api.constants import Membership, EventTypes
 from synapse.util import unwrapFirstError
 from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.metrics import Measure
+from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
 
 from twisted.internet import defer
@@ -35,6 +36,7 @@ SyncConfig = collections.namedtuple("SyncConfig", [
     "user",
     "filter_collection",
     "is_guest",
+    "request_key",
 ])
 
 
@@ -136,8 +138,8 @@ class SyncHandler(BaseHandler):
         super(SyncHandler, self).__init__(hs)
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
+        self.response_cache = ResponseCache()
 
-    @defer.inlineCallbacks
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                full_state=False):
         """Get the sync for a client if we have new data for it now. Otherwise
@@ -146,7 +148,19 @@ class SyncHandler(BaseHandler):
         Returns:
             A Deferred SyncResult.
         """
+        result = self.response_cache.get(sync_config.request_key)
+        if not result:
+            result = self.response_cache.set(
+                sync_config.request_key,
+                self._wait_for_sync_for_user(
+                    sync_config, since_token, timeout, full_state
+                )
+            )
+        return result
 
+    @defer.inlineCallbacks
+    def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
+                                full_state):
         context = LoggingContext.current_context()
         if context:
             if since_token is None:
diff --git a/synapse/http/server.py b/synapse/http/server.py
index b17b190ee5..b82196fd5e 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -18,6 +18,7 @@ from synapse.api.errors import (
     cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes
 )
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.caches import intern_dict
 import synapse.metrics
 import synapse.events
 
@@ -229,11 +230,12 @@ class JsonResource(HttpServer, resource.Resource):
             else:
                 servlet_classname = "%r" % callback
 
-            args = [
-                urllib.unquote(u).decode("UTF-8") if u else u for u in m.groups()
-            ]
+            kwargs = intern_dict({
+                name: urllib.unquote(value).decode("UTF-8") if value else value
+                for name, value in m.groupdict().items()
+            })
 
-            callback_return = yield callback(request, *args)
+            callback_return = yield callback(request, **kwargs)
             if callback_return is not None:
                 code, response = callback_return
                 self._send_response(request, code, response)
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 65ef1b68a3..296c4447ec 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -317,7 +317,7 @@ class Pusher(object):
     @defer.inlineCallbacks
     def _get_badge_count(self):
         invites, joins = yield defer.gatherResults([
-            self.store.get_invites_for_user(self.user_id),
+            self.store.get_invited_rooms_for_user(self.user_id),
             self.store.get_rooms_for_user(self.user_id),
         ], consumeErrors=True)
 
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 86a2998bcc..792af70eb7 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -160,7 +160,27 @@ BASE_APPEND_OVRRIDE_RULES = [
         'actions': [
             'dont_notify',
         ]
-    }
+    },
+    # Will we sometimes want to know about people joining and leaving?
+    # Perhaps: if so, this could be expanded upon. Seems the most usual case
+    # is that we don't though. We add this override rule so that even if
+    # the room rule is set to notify, we don't get notifications about
+    # join/leave/avatar/displayname events.
+    # See also: https://matrix.org/jira/browse/SYN-607
+    {
+        'rule_id': 'global/override/.m.rule.member_event',
+        'conditions': [
+            {
+                'kind': 'event_match',
+                'key': 'type',
+                'pattern': 'm.room.member',
+                '_id': '_member',
+            }
+        ],
+        'actions': [
+            'dont_notify'
+        ]
+    },
 ]
 
 
@@ -261,25 +281,6 @@ BASE_APPEND_UNDERRIDE_RULES = [
             }
         ]
     },
-    # This is too simple: https://matrix.org/jira/browse/SYN-607
-    # Removing for now
-    # {
-    #     'rule_id': 'global/underride/.m.rule.member_event',
-    #     'conditions': [
-    #         {
-    #             'kind': 'event_match',
-    #             'key': 'type',
-    #             'pattern': 'm.room.member',
-    #             '_id': '_member',
-    #         }
-    #     ],
-    #     'actions': [
-    #         'notify', {
-    #             'set_tweak': 'highlight',
-    #             'value': False
-    #         }
-    #     ]
-    # },
     {
         'rule_id': 'global/underride/.m.rule.message',
         'conditions': [
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 87d5061fb0..76d7eb7ce0 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -107,7 +107,9 @@ class BulkPushRuleEvaluator:
             users_dict.items(), [event], {event.event_id: current_state}
         )
 
-        evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room))
+        room_members = yield self.store.get_users_in_room(self.room_id)
+
+        evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
 
         condition_cache = {}
 
diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py
index 59a23d6cb6..8ac09419dc 100644
--- a/synapse/rest/client/v1/directory.py
+++ b/synapse/rest/client/v1/directory.py
@@ -30,6 +30,7 @@ logger = logging.getLogger(__name__)
 
 def register_servlets(hs, http_server):
     ClientDirectoryServer(hs).register(http_server)
+    ClientDirectoryListServer(hs).register(http_server)
 
 
 class ClientDirectoryServer(ClientV1RestServlet):
@@ -137,3 +138,44 @@ class ClientDirectoryServer(ClientV1RestServlet):
         )
 
         defer.returnValue((200, {}))
+
+
+class ClientDirectoryListServer(ClientV1RestServlet):
+    PATTERNS = client_path_patterns("/directory/list/room/(?P<room_id>[^/]*)$")
+
+    def __init__(self, hs):
+        super(ClientDirectoryListServer, self).__init__(hs)
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, room_id):
+        room = yield self.store.get_room(room_id)
+        if room is None:
+            raise SynapseError(400, "Unknown room")
+
+        defer.returnValue((200, {
+            "visibility": "public" if room["is_public"] else "private"
+        }))
+
+    @defer.inlineCallbacks
+    def on_PUT(self, request, room_id):
+        requester = yield self.auth.get_user_by_req(request)
+
+        content = parse_json_object_from_request(request)
+        visibility = content.get("visibility", "public")
+
+        yield self.handlers.directory_handler.edit_published_room_list(
+            requester, room_id, visibility,
+        )
+
+        defer.returnValue((200, {}))
+
+    @defer.inlineCallbacks
+    def on_DELETE(self, request, room_id):
+        requester = yield self.auth.get_user_by_req(request)
+
+        yield self.handlers.directory_handler.edit_published_room_list(
+            requester, room_id, "private",
+        )
+
+        defer.returnValue((200, {}))
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index de4a020ad4..c5785d7074 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -115,6 +115,8 @@ class SyncRestServlet(RestServlet):
             )
         )
 
+        request_key = (user, timeout, since, filter_id, full_state)
+
         if filter_id:
             if filter_id.startswith('{'):
                 try:
@@ -134,6 +136,7 @@ class SyncRestServlet(RestServlet):
             user=user,
             filter_collection=filter,
             is_guest=requester.is_guest,
+            request_key=request_key,
         )
 
         if since is not None:
diff --git a/synapse/state.py b/synapse/state.py
index b9a1387520..41d32e664a 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 
 from synapse.util.logutils import log_function
 from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.metrics import Measure
 from synapse.api.constants import EventTypes
 from synapse.api.errors import AuthError
 from synapse.api.auth import AuthEventTypes
@@ -27,6 +28,7 @@ from collections import namedtuple
 
 import logging
 import hashlib
+import os
 
 logger = logging.getLogger(__name__)
 
@@ -34,8 +36,11 @@ logger = logging.getLogger(__name__)
 KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key"))
 
 
-SIZE_OF_CACHE = 1000
-EVICTION_TIMEOUT_SECONDS = 20
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
+
+
+SIZE_OF_CACHE = int(1000 * CACHE_SIZE_FACTOR)
+EVICTION_TIMEOUT_SECONDS = 60 * 60
 
 
 class _StateCacheEntry(object):
@@ -85,16 +90,8 @@ class StateHandler(object):
         """
         event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
 
-        cache = None
-        if self._state_cache is not None:
-            cache = self._state_cache.get(frozenset(event_ids), None)
-
-        if cache:
-            cache.ts = self.clock.time_msec()
-            state = cache.state
-        else:
-            res = yield self.resolve_state_groups(room_id, event_ids)
-            state = res[1]
+        res = yield self.resolve_state_groups(room_id, event_ids)
+        state = res[1]
 
         if event_type:
             defer.returnValue(state.get((event_type, state_key)))
@@ -186,20 +183,6 @@ class StateHandler(object):
         """
         logger.debug("resolve_state_groups event_ids %s", event_ids)
 
-        if self._state_cache is not None:
-            cache = self._state_cache.get(frozenset(event_ids), None)
-            if cache and cache.state_group:
-                cache.ts = self.clock.time_msec()
-                prev_state = cache.state.get((event_type, state_key), None)
-                if prev_state:
-                    prev_state = prev_state.event_id
-                    prev_states = [prev_state]
-                else:
-                    prev_states = []
-                defer.returnValue(
-                    (cache.state_group, cache.state, prev_states)
-                )
-
         state_groups = yield self.store.get_state_groups(
             room_id, event_ids
         )
@@ -209,7 +192,7 @@ class StateHandler(object):
             state_groups.keys()
         )
 
-        group_names = set(state_groups.keys())
+        group_names = frozenset(state_groups.keys())
         if len(group_names) == 1:
             name, state_list = state_groups.items().pop()
             state = {
@@ -223,16 +206,25 @@ class StateHandler(object):
             else:
                 prev_states = []
 
-            if self._state_cache is not None:
-                cache = _StateCacheEntry(
-                    state=state,
-                    state_group=name,
-                    ts=self.clock.time_msec()
-                )
+            defer.returnValue((name, state, prev_states))
 
-                self._state_cache[frozenset(event_ids)] = cache
+        if self._state_cache is not None:
+            cache = self._state_cache.get(group_names, None)
+            if cache and cache.state_group:
+                cache.ts = self.clock.time_msec()
 
-            defer.returnValue((name, state, prev_states))
+                event_dict = yield self.store.get_events(cache.state.values())
+                state = {(e.type, e.state_key): e for e in event_dict.values()}
+
+                prev_state = state.get((event_type, state_key), None)
+                if prev_state:
+                    prev_state = prev_state.event_id
+                    prev_states = [prev_state]
+                else:
+                    prev_states = []
+                defer.returnValue(
+                    (cache.state_group, state, prev_states)
+                )
 
         new_state, prev_states = self._resolve_events(
             state_groups.values(), event_type, state_key
@@ -240,12 +232,12 @@ class StateHandler(object):
 
         if self._state_cache is not None:
             cache = _StateCacheEntry(
-                state=new_state,
+                state={key: event.event_id for key, event in new_state.items()},
                 state_group=None,
                 ts=self.clock.time_msec()
             )
 
-            self._state_cache[frozenset(event_ids)] = cache
+            self._state_cache[group_names] = cache
 
         defer.returnValue((None, new_state, prev_states))
 
@@ -263,48 +255,49 @@ class StateHandler(object):
         from (type, state_key) to event. prev_states is a list of event_ids.
         :rtype: (dict[(str, str), synapse.events.FrozenEvent], list[str])
         """
-        state = {}
-        for st in state_sets:
-            for e in st:
-                state.setdefault(
-                    (e.type, e.state_key),
-                    {}
-                )[e.event_id] = e
-
-        unconflicted_state = {
-            k: v.values()[0] for k, v in state.items()
-            if len(v.values()) == 1
-        }
-
-        conflicted_state = {
-            k: v.values()
-            for k, v in state.items()
-            if len(v.values()) > 1
-        }
+        with Measure(self.clock, "state._resolve_events"):
+            state = {}
+            for st in state_sets:
+                for e in st:
+                    state.setdefault(
+                        (e.type, e.state_key),
+                        {}
+                    )[e.event_id] = e
+
+            unconflicted_state = {
+                k: v.values()[0] for k, v in state.items()
+                if len(v.values()) == 1
+            }
 
-        if event_type:
-            prev_states_events = conflicted_state.get(
-                (event_type, state_key), []
-            )
-            prev_states = [s.event_id for s in prev_states_events]
-        else:
-            prev_states = []
+            conflicted_state = {
+                k: v.values()
+                for k, v in state.items()
+                if len(v.values()) > 1
+            }
+
+            if event_type:
+                prev_states_events = conflicted_state.get(
+                    (event_type, state_key), []
+                )
+                prev_states = [s.event_id for s in prev_states_events]
+            else:
+                prev_states = []
 
-        auth_events = {
-            k: e for k, e in unconflicted_state.items()
-            if k[0] in AuthEventTypes
-        }
+            auth_events = {
+                k: e for k, e in unconflicted_state.items()
+                if k[0] in AuthEventTypes
+            }
 
-        try:
-            resolved_state = self._resolve_state_events(
-                conflicted_state, auth_events
-            )
-        except:
-            logger.exception("Failed to resolve state")
-            raise
+            try:
+                resolved_state = self._resolve_state_events(
+                    conflicted_state, auth_events
+                )
+            except:
+                logger.exception("Failed to resolve state")
+                raise
 
-        new_state = unconflicted_state
-        new_state.update(resolved_state)
+            new_state = unconflicted_state
+            new_state.update(resolved_state)
 
         return new_state, prev_states
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 7dc67ecd57..b75b79df36 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -18,6 +18,7 @@ from synapse.api.errors import StoreError
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.caches.dictionary_cache import DictionaryCache
 from synapse.util.caches.descriptors import Cache
+from synapse.util.caches import intern_dict
 import synapse.metrics
 
 
@@ -26,6 +27,10 @@ from twisted.internet import defer
 import sys
 import time
 import threading
+import os
+
+
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
 
 
 logger = logging.getLogger(__name__)
@@ -163,7 +168,9 @@ class SQLBaseStore(object):
         self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
                                       max_entries=hs.config.event_cache_size)
 
-        self._state_group_cache = DictionaryCache("*stateGroupCache*", 2000)
+        self._state_group_cache = DictionaryCache(
+            "*stateGroupCache*", 2000 * CACHE_SIZE_FACTOR
+        )
 
         self._event_fetch_lock = threading.Condition()
         self._event_fetch_list = []
@@ -344,7 +351,7 @@ class SQLBaseStore(object):
         """
         col_headers = list(column[0] for column in cursor.description)
         results = list(
-            dict(zip(col_headers, row)) for row in cursor.fetchall()
+            intern_dict(dict(zip(col_headers, row))) for row in cursor.fetchall()
         )
         return results
 
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 012a0b414a..ef231a04dc 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -155,7 +155,7 @@ class DirectoryStore(SQLBaseStore):
 
         return room_id
 
-    @cached()
+    @cached(max_entries=5000)
     def get_aliases_for_room(self, room_id):
         return self._simple_select_onecol(
             "room_aliases",
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 5820539a92..dc5830450a 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -49,7 +49,7 @@ class EventPushActionsStore(SQLBaseStore):
             )
         self._simple_insert_many_txn(txn, "event_push_actions", values)
 
-    @cachedInlineCallbacks(num_args=3, lru=True, tree=True)
+    @cachedInlineCallbacks(num_args=3, lru=True, tree=True, max_entries=5000)
     def get_unread_event_push_actions_by_room_for_user(
             self, room_id, user_id, last_read_event_id
     ):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 285c586cfe..5233430028 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -101,30 +101,16 @@ class EventsStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     @log_function
-    def persist_event(self, event, context, backfilled=False,
+    def persist_event(self, event, context,
                       is_new_state=True, current_state=None):
-        stream_ordering = None
-        if backfilled:
-            self.min_stream_token -= 1
-            stream_ordering = self.min_stream_token
-
-        if stream_ordering is None:
-            stream_ordering_manager = self._stream_id_gen.get_next()
-        else:
-            @contextmanager
-            def stream_ordering_manager():
-                yield stream_ordering
-            stream_ordering_manager = stream_ordering_manager()
-
         try:
-            with stream_ordering_manager as stream_ordering:
+            with self._stream_id_gen.get_next() as stream_ordering:
                 event.internal_metadata.stream_ordering = stream_ordering
                 yield self.runInteraction(
                     "persist_event",
                     self._persist_event_txn,
                     event=event,
                     context=context,
-                    backfilled=backfilled,
                     is_new_state=is_new_state,
                     current_state=current_state,
                 )
@@ -165,13 +151,38 @@ class EventsStore(SQLBaseStore):
 
         defer.returnValue(events[0] if events else None)
 
+    @defer.inlineCallbacks
+    def get_events(self, event_ids, check_redacted=True,
+                   get_prev_content=False, allow_rejected=False):
+        """Get events from the database
+
+        Args:
+            event_ids (list): The event_ids of the events to fetch
+            check_redacted (bool): If True, check if event has been redacted
+                and redact it.
+            get_prev_content (bool): If True and event is a state event,
+                include the previous states content in the unsigned field.
+            allow_rejected (bool): If True return rejected events.
+
+        Returns:
+            Deferred : Dict from event_id to event.
+        """
+        events = yield self._get_events(
+            event_ids,
+            check_redacted=check_redacted,
+            get_prev_content=get_prev_content,
+            allow_rejected=allow_rejected,
+        )
+
+        defer.returnValue({e.event_id: e for e in events})
+
     @log_function
-    def _persist_event_txn(self, txn, event, context, backfilled,
+    def _persist_event_txn(self, txn, event, context,
                            is_new_state=True, current_state=None):
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
-            txn.call_after(self.get_current_state_for_key.invalidate_all)
+            txn.call_after(self._get_current_state_for_key.invalidate_all)
             txn.call_after(self.get_rooms_for_user.invalidate_all)
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
@@ -198,7 +209,7 @@ class EventsStore(SQLBaseStore):
         return self._persist_events_txn(
             txn,
             [(event, context)],
-            backfilled=backfilled,
+            backfilled=False,
             is_new_state=is_new_state,
         )
 
@@ -455,7 +466,7 @@ class EventsStore(SQLBaseStore):
             for event, _ in state_events_and_contexts:
                 if not context.rejected:
                     txn.call_after(
-                        self.get_current_state_for_key.invalidate,
+                        self._get_current_state_for_key.invalidate,
                         (event.room_id, event.type, event.state_key,)
                     )
 
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index dbc074d6b5..6b9d848eaa 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -62,18 +62,17 @@ class ReceiptsStore(SQLBaseStore):
 
     @cachedInlineCallbacks(num_args=2)
     def get_receipts_for_user(self, user_id, receipt_type):
-        def f(txn):
-            sql = (
-                "SELECT room_id,event_id "
-                "FROM receipts_linearized "
-                "WHERE user_id = ? AND receipt_type = ? "
-            )
-            txn.execute(sql, (user_id, receipt_type))
-            return txn.fetchall()
+        rows = yield self._simple_select_list(
+            table="receipts_linearized",
+            keyvalues={
+                "user_id": user_id,
+                "receipt_type": receipt_type,
+            },
+            retcols=("room_id", "event_id"),
+            desc="get_receipts_for_user",
+        )
 
-        defer.returnValue(dict(
-            (yield self.runInteraction("get_receipts_for_user", f))
-        ))
+        defer.returnValue({row["room_id"]: row["event_id"] for row in rows})
 
     @defer.inlineCallbacks
     def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 46ab38a313..9be977f387 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -77,6 +77,14 @@ class RoomStore(SQLBaseStore):
             allow_none=True,
         )
 
+    def set_room_is_public(self, room_id, is_public):
+        return self._simple_update_one(
+            table="rooms",
+            keyvalues={"room_id": room_id},
+            updatevalues={"is_public": is_public},
+            desc="set_room_is_public",
+        )
+
     def get_public_room_ids(self):
         return self._simple_select_onecol(
             table="rooms",
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 0cd89260f2..430b49c12e 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -115,19 +115,17 @@ class RoomMemberStore(SQLBaseStore):
         ).addCallback(self._get_events)
 
     @cached()
-    def get_invites_for_user(self, user_id):
-        """ Get all the invite events for a user
+    def get_invited_rooms_for_user(self, user_id):
+        """ Get all the rooms the user is invited to
         Args:
             user_id (str): The user ID.
         Returns:
-            A deferred list of event objects.
+            A deferred list of RoomsForUser.
         """
 
         return self.get_rooms_for_user_where_membership_is(
             user_id, [Membership.INVITE]
-        ).addCallback(lambda invites: self._get_events([
-            invite.event_id for invite in invites
-        ]))
+        )
 
     def get_leave_and_ban_events_for_user(self, user_id):
         """ Get all the leave events for a user
diff --git a/synapse/storage/schema/delta/30/public_rooms.sql b/synapse/storage/schema/delta/30/public_rooms.sql
new file mode 100644
index 0000000000..f09db4faa6
--- /dev/null
+++ b/synapse/storage/schema/delta/30/public_rooms.sql
@@ -0,0 +1,23 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/* This release removes the restriction that published rooms must have an alias,
+ * so we go back and ensure the only 'public' rooms are ones with an alias.
+ * We use (1 = 0) and (1 = 1) so that it works in both postgres and sqlite
+ */
+UPDATE rooms SET is_public = (1 = 0) WHERE is_public = (1 = 1) AND room_id not in (
+    SELECT room_id FROM room_aliases
+);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 8ed8a21b0a..02cefdff26 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -14,9 +14,8 @@
 # limitations under the License.
 
 from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import (
-    cached, cachedInlineCallbacks, cachedList
-)
+from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.caches import intern_string
 
 from twisted.internet import defer
 
@@ -155,8 +154,14 @@ class StateStore(SQLBaseStore):
         events = yield self._get_events(event_ids, get_prev_content=False)
         defer.returnValue(events)
 
-    @cachedInlineCallbacks(num_args=3)
+    @defer.inlineCallbacks
     def get_current_state_for_key(self, room_id, event_type, state_key):
+        event_ids = yield self._get_current_state_for_key(room_id, event_type, state_key)
+        events = yield self._get_events(event_ids, get_prev_content=False)
+        defer.returnValue(events)
+
+    @cached(num_args=3)
+    def _get_current_state_for_key(self, room_id, event_type, state_key):
         def f(txn):
             sql = (
                 "SELECT event_id FROM current_state_events"
@@ -167,12 +172,10 @@ class StateStore(SQLBaseStore):
             txn.execute(sql, args)
             results = txn.fetchall()
             return [r[0] for r in results]
-        event_ids = yield self.runInteraction("get_current_state_for_key", f)
-        events = yield self._get_events(event_ids, get_prev_content=False)
-        defer.returnValue(events)
+        return self.runInteraction("get_current_state_for_key", f)
 
     def _get_state_groups_from_groups(self, groups, types):
-        """Returns dictionary state_group -> state event ids
+        """Returns dictionary state_group -> (dict of (type, state_key) -> event id)
         """
         def f(txn, groups):
             if types is not None:
@@ -183,7 +186,8 @@ class StateStore(SQLBaseStore):
                 where_clause = ""
 
             sql = (
-                "SELECT state_group, event_id FROM state_groups_state WHERE"
+                "SELECT state_group, event_id, type, state_key"
+                " FROM state_groups_state WHERE"
                 " state_group IN (%s) %s" % (
                     ",".join("?" for _ in groups),
                     where_clause,
@@ -199,7 +203,8 @@ class StateStore(SQLBaseStore):
 
             results = {}
             for row in rows:
-                results.setdefault(row["state_group"], []).append(row["event_id"])
+                key = (row["type"], row["state_key"])
+                results.setdefault(row["state_group"], {})[key] = row["event_id"]
             return results
 
         chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
@@ -296,7 +301,7 @@ class StateStore(SQLBaseStore):
                 where a `state_key` of `None` matches all state_keys for the
                 `type`.
         """
-        is_all, state_dict = self._state_group_cache.get(group)
+        is_all, state_dict_ids = self._state_group_cache.get(group)
 
         type_to_key = {}
         missing_types = set()
@@ -308,7 +313,7 @@ class StateStore(SQLBaseStore):
                 if type_to_key.get(typ, object()) is not None:
                     type_to_key.setdefault(typ, set()).add(state_key)
 
-                if (typ, state_key) not in state_dict:
+                if (typ, state_key) not in state_dict_ids:
                     missing_types.add((typ, state_key))
 
         sentinel = object()
@@ -326,7 +331,7 @@ class StateStore(SQLBaseStore):
         got_all = not (missing_types or types is None)
 
         return {
-            k: v for k, v in state_dict.items()
+            k: v for k, v in state_dict_ids.items()
             if include(k[0], k[1])
         }, missing_types, got_all
 
@@ -340,8 +345,9 @@ class StateStore(SQLBaseStore):
         Args:
             group: The state group to lookup
         """
-        is_all, state_dict = self._state_group_cache.get(group)
-        return state_dict, is_all
+        is_all, state_dict_ids = self._state_group_cache.get(group)
+
+        return state_dict_ids, is_all
 
     @defer.inlineCallbacks
     def _get_state_for_groups(self, groups, types=None):
@@ -354,84 +360,72 @@ class StateStore(SQLBaseStore):
         missing_groups = []
         if types is not None:
             for group in set(groups):
-                state_dict, missing_types, got_all = self._get_some_state_from_cache(
+                state_dict_ids, missing_types, got_all = self._get_some_state_from_cache(
                     group, types
                 )
-                results[group] = state_dict
+                results[group] = state_dict_ids
 
                 if not got_all:
                     missing_groups.append(group)
         else:
             for group in set(groups):
-                state_dict, got_all = self._get_all_state_from_cache(
+                state_dict_ids, got_all = self._get_all_state_from_cache(
                     group
                 )
-                results[group] = state_dict
+
+                results[group] = state_dict_ids
 
                 if not got_all:
                     missing_groups.append(group)
 
-        if not missing_groups:
-            defer.returnValue({
-                group: {
-                    type_tuple: event
-                    for type_tuple, event in state.items()
-                    if event
-                }
-                for group, state in results.items()
-            })
+        if missing_groups:
+            # Okay, so we have some missing_types, lets fetch them.
+            cache_seq_num = self._state_group_cache.sequence
 
-        # Okay, so we have some missing_types, lets fetch them.
-        cache_seq_num = self._state_group_cache.sequence
+            group_to_state_dict = yield self._get_state_groups_from_groups(
+                missing_groups, types
+            )
 
-        group_state_dict = yield self._get_state_groups_from_groups(
-            missing_groups, types
-        )
+            # Now we want to update the cache with all the things we fetched
+            # from the database.
+            for group, group_state_dict in group_to_state_dict.items():
+                if types:
+                    # We delibrately put key -> None mappings into the cache to
+                    # cache absence of the key, on the assumption that if we've
+                    # explicitly asked for some types then we will probably ask
+                    # for them again.
+                    state_dict = {
+                        (intern_string(etype), intern_string(state_key)): None
+                        for (etype, state_key) in types
+                    }
+                    state_dict.update(results[group])
+                    results[group] = state_dict
+                else:
+                    state_dict = results[group]
+
+                state_dict.update(group_state_dict)
+
+                self._state_group_cache.update(
+                    cache_seq_num,
+                    key=group,
+                    value=state_dict,
+                    full=(types is None),
+                )
 
         state_events = yield self._get_events(
-            [e_id for l in group_state_dict.values() for e_id in l],
+            [ev_id for sd in results.values() for ev_id in sd.values()],
             get_prev_content=False
         )
 
         state_events = {e.event_id: e for e in state_events}
 
-        # Now we want to update the cache with all the things we fetched
-        # from the database.
-        for group, state_ids in group_state_dict.items():
-            if types:
-                # We delibrately put key -> None mappings into the cache to
-                # cache absence of the key, on the assumption that if we've
-                # explicitly asked for some types then we will probably ask
-                # for them again.
-                state_dict = {key: None for key in types}
-                state_dict.update(results[group])
-                results[group] = state_dict
-            else:
-                state_dict = results[group]
-
-            for event_id in state_ids:
-                try:
-                    state_event = state_events[event_id]
-                    state_dict[(state_event.type, state_event.state_key)] = state_event
-                except KeyError:
-                    # Hmm. So we do don't have that state event? Interesting.
-                    logger.warn(
-                        "Can't find state event %r for state group %r",
-                        event_id, group,
-                    )
-
-            self._state_group_cache.update(
-                cache_seq_num,
-                key=group,
-                value=state_dict,
-                full=(types is None),
-            )
-
         # Remove all the entries with None values. The None values were just
         # used for bookkeeping in the cache.
         for group, state_dict in results.items():
             results[group] = {
-                key: event for key, event in state_dict.items() if event
+                key: state_events[event_id]
+                for key, event_id in state_dict.items()
+                if event_id and event_id in state_events
             }
 
         defer.returnValue(results)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 7f4a827528..cf84938be5 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -36,7 +36,7 @@ what sort order was used:
 from twisted.internet import defer
 
 from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.descriptors import cached
 from synapse.api.constants import EventTypes
 from synapse.types import RoomStreamToken
 from synapse.util.logcontext import preserve_fn
@@ -465,9 +465,25 @@ class StreamStore(SQLBaseStore):
 
         defer.returnValue((events, token))
 
-    @cachedInlineCallbacks(num_args=4)
+    @defer.inlineCallbacks
     def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
+        rows, token = yield self.get_recent_event_ids_for_room(
+            room_id, limit, end_token, from_token
+        )
+
+        logger.debug("stream before")
+        events = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
+        logger.debug("stream after")
+
+        self._set_before_and_after(events, rows)
+
+        defer.returnValue((events, token))
 
+    @cached(num_args=4)
+    def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None):
         end_token = RoomStreamToken.parse_stream_token(end_token)
 
         if from_token is None:
@@ -517,21 +533,10 @@ class StreamStore(SQLBaseStore):
 
             return rows, token
 
-        rows, token = yield self.runInteraction(
+        return self.runInteraction(
             "get_recent_events_for_room", get_recent_events_for_room_txn
         )
 
-        logger.debug("stream before")
-        events = yield self._get_events(
-            [r["event_id"] for r in rows],
-            get_prev_content=True
-        )
-        logger.debug("stream after")
-
-        self._set_before_and_after(events, rows)
-
-        defer.returnValue((events, token))
-
     @defer.inlineCallbacks
     def get_room_events_max_id(self, direction='f'):
         token = yield self._stream_id_gen.get_max_token()
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 1a14904194..d53569ca49 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -14,6 +14,10 @@
 # limitations under the License.
 
 import synapse.metrics
+from lrucache import LruCache
+import os
+
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
 
 DEBUG_CACHES = False
 
@@ -25,3 +29,56 @@ cache_counter = metrics.register_cache(
     lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
     labels=["name"],
 )
+
+_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR))
+caches_by_name["string_cache"] = _string_cache
+
+
+KNOWN_KEYS = {
+    key: key for key in
+    (
+        "auth_events",
+        "content",
+        "depth",
+        "event_id",
+        "hashes",
+        "origin",
+        "origin_server_ts",
+        "prev_events",
+        "room_id",
+        "sender",
+        "signatures",
+        "state_key",
+        "type",
+        "unsigned",
+        "user_id",
+    )
+}
+
+
+def intern_string(string):
+    """Takes a (potentially) unicode string and interns using custom cache
+    """
+    return _string_cache.setdefault(string, string)
+
+
+def intern_dict(dictionary):
+    """Takes a dictionary and interns well known keys and their values
+    """
+    return {
+        KNOWN_KEYS.get(key, key): _intern_known_values(key, value)
+        for key, value in dictionary.items()
+    }
+
+
+def _intern_known_values(key, value):
+    intern_str_keys = ("event_id", "room_id")
+    intern_unicode_keys = ("sender", "user_id", "type", "state_key")
+
+    if key in intern_str_keys:
+        return intern(value.encode('ascii'))
+
+    if key in intern_unicode_keys:
+        return intern_string(value)
+
+    return value
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index f7423f2fab..f9df445a8d 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -29,6 +29,16 @@ def enumerate_leaves(node, depth):
                 yield m
 
 
+class _Node(object):
+    __slots__ = ["prev_node", "next_node", "key", "value"]
+
+    def __init__(self, prev_node, next_node, key, value):
+        self.prev_node = prev_node
+        self.next_node = next_node
+        self.key = key
+        self.value = value
+
+
 class LruCache(object):
     """
     Least-recently-used cache.
@@ -38,10 +48,9 @@ class LruCache(object):
     def __init__(self, max_size, keylen=1, cache_type=dict):
         cache = cache_type()
         self.cache = cache  # Used for introspection.
-        list_root = []
-        list_root[:] = [list_root, list_root, None, None]
-
-        PREV, NEXT, KEY, VALUE = 0, 1, 2, 3
+        list_root = _Node(None, None, None, None)
+        list_root.next_node = list_root
+        list_root.prev_node = list_root
 
         lock = threading.Lock()
 
@@ -55,36 +64,36 @@ class LruCache(object):
 
         def add_node(key, value):
             prev_node = list_root
-            next_node = prev_node[NEXT]
-            node = [prev_node, next_node, key, value]
-            prev_node[NEXT] = node
-            next_node[PREV] = node
+            next_node = prev_node.next_node
+            node = _Node(prev_node, next_node, key, value)
+            prev_node.next_node = node
+            next_node.prev_node = node
             cache[key] = node
 
         def move_node_to_front(node):
-            prev_node = node[PREV]
-            next_node = node[NEXT]
-            prev_node[NEXT] = next_node
-            next_node[PREV] = prev_node
+            prev_node = node.prev_node
+            next_node = node.next_node
+            prev_node.next_node = next_node
+            next_node.prev_node = prev_node
             prev_node = list_root
-            next_node = prev_node[NEXT]
-            node[PREV] = prev_node
-            node[NEXT] = next_node
-            prev_node[NEXT] = node
-            next_node[PREV] = node
+            next_node = prev_node.next_node
+            node.prev_node = prev_node
+            node.next_node = next_node
+            prev_node.next_node = node
+            next_node.prev_node = node
 
         def delete_node(node):
-            prev_node = node[PREV]
-            next_node = node[NEXT]
-            prev_node[NEXT] = next_node
-            next_node[PREV] = prev_node
+            prev_node = node.prev_node
+            next_node = node.next_node
+            prev_node.next_node = next_node
+            next_node.prev_node = prev_node
 
         @synchronized
         def cache_get(key, default=None):
             node = cache.get(key, None)
             if node is not None:
                 move_node_to_front(node)
-                return node[VALUE]
+                return node.value
             else:
                 return default
 
@@ -93,25 +102,25 @@ class LruCache(object):
             node = cache.get(key, None)
             if node is not None:
                 move_node_to_front(node)
-                node[VALUE] = value
+                node.value = value
             else:
                 add_node(key, value)
                 if len(cache) > max_size:
-                    todelete = list_root[PREV]
+                    todelete = list_root.prev_node
                     delete_node(todelete)
-                    cache.pop(todelete[KEY], None)
+                    cache.pop(todelete.key, None)
 
         @synchronized
         def cache_set_default(key, value):
             node = cache.get(key, None)
             if node is not None:
-                return node[VALUE]
+                return node.value
             else:
                 add_node(key, value)
                 if len(cache) > max_size:
-                    todelete = list_root[PREV]
+                    todelete = list_root.prev_node
                     delete_node(todelete)
-                    cache.pop(todelete[KEY], None)
+                    cache.pop(todelete.key, None)
                 return value
 
         @synchronized
@@ -119,8 +128,8 @@ class LruCache(object):
             node = cache.get(key, None)
             if node:
                 delete_node(node)
-                cache.pop(node[KEY], None)
-                return node[VALUE]
+                cache.pop(node.key, None)
+                return node.value
             else:
                 return default
 
@@ -137,8 +146,8 @@ class LruCache(object):
 
         @synchronized
         def cache_clear():
-            list_root[NEXT] = list_root
-            list_root[PREV] = list_root
+            list_root.next_node = list_root
+            list_root.prev_node = list_root
             cache.clear()
 
         @synchronized
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
new file mode 100644
index 0000000000..be310ba320
--- /dev/null
+++ b/synapse/util/caches/response_cache.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.util.async import ObservableDeferred
+
+
+class ResponseCache(object):
+    """
+    This caches a deferred response. Until the deferred completes it will be
+    returned from the cache. This means that if the client retries the request
+    while the response is still being computed, that original response will be
+    used rather than trying to compute a new response.
+    """
+
+    def __init__(self):
+        self.pending_result_cache = {}  # Requests that haven't finished yet.
+
+    def get(self, key):
+        result = self.pending_result_cache.get(key)
+        if result is not None:
+            return result.observe()
+        else:
+            return None
+
+    def set(self, key, deferred):
+        result = ObservableDeferred(deferred)
+        self.pending_result_cache[key] = result
+
+        def remove(r):
+            self.pending_result_cache.pop(key, None)
+            return r
+
+        result.addBoth(remove)
+        return result.observe()