summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py59
-rw-r--r--synapse/storage/_base.py25
-rw-r--r--synapse/storage/account_data.py13
-rw-r--r--synapse/storage/appservice.py3
-rw-r--r--synapse/storage/engines/sqlite3.py2
-rw-r--r--synapse/storage/event_federation.py2
-rw-r--r--synapse/storage/event_push_actions.py94
-rw-r--r--synapse/storage/events.py66
-rw-r--r--synapse/storage/keys.py1
-rw-r--r--synapse/storage/prepare_database.py4
-rw-r--r--synapse/storage/presence.py5
-rw-r--r--synapse/storage/receipts.py19
-rw-r--r--synapse/storage/registration.py37
-rw-r--r--synapse/storage/room.py84
-rw-r--r--synapse/storage/roommember.py4
-rw-r--r--synapse/storage/schema/delta/28/public_roms_index.sql16
-rw-r--r--synapse/storage/schema/delta/29/push_actions.sql31
-rw-r--r--synapse/storage/state.py92
-rw-r--r--synapse/storage/stream.py93
19 files changed, 379 insertions, 271 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index eb88842308..5a9e7720d9 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -45,9 +45,10 @@ from .search import SearchStore
 from .tags import TagsStore
 from .account_data import AccountDataStore
 
-
 from util.id_generators import IdGenerator, StreamIdGenerator
 
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
 
 import logging
 
@@ -58,7 +59,7 @@ logger = logging.getLogger(__name__)
 # Number of msec of granularity to store the user IP 'last seen' time. Smaller
 # times give more inserts into the database even for readonly API hits
 # 120 seconds == 2 minutes
-LAST_SEEN_GRANULARITY = 120*1000
+LAST_SEEN_GRANULARITY = 120 * 1000
 
 
 class DataStore(RoomMemberStore, RoomStore,
@@ -84,6 +85,7 @@ class DataStore(RoomMemberStore, RoomStore,
 
     def __init__(self, db_conn, hs):
         self.hs = hs
+        self.database_engine = hs.database_engine
 
         cur = db_conn.cursor()
         try:
@@ -117,8 +119,61 @@ class DataStore(RoomMemberStore, RoomStore,
         self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
         self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
 
+        events_max = self._stream_id_gen.get_max_token(None)
+        event_cache_prefill, min_event_val = self._get_cache_dict(
+            db_conn, "events",
+            entity_column="room_id",
+            stream_column="stream_ordering",
+            max_value=events_max,
+        )
+        self._events_stream_cache = StreamChangeCache(
+            "EventsRoomStreamChangeCache", min_event_val,
+            prefilled_cache=event_cache_prefill,
+        )
+
+        self._membership_stream_cache = StreamChangeCache(
+            "MembershipStreamChangeCache", events_max,
+        )
+
+        account_max = self._account_data_id_gen.get_max_token(None)
+        self._account_data_stream_cache = StreamChangeCache(
+            "AccountDataAndTagsChangeCache", account_max,
+        )
+
         super(DataStore, self).__init__(hs)
 
+    def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value):
+        # Fetch a mapping of room_id -> max stream position for "recent" rooms.
+        # It doesn't really matter how many we get, the StreamChangeCache will
+        # do the right thing to ensure it respects the max size of cache.
+        sql = (
+            "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
+            " WHERE %(stream)s > ? - 100000"
+            " GROUP BY %(entity)s"
+        ) % {
+            "table": table,
+            "entity": entity_column,
+            "stream": stream_column,
+        }
+
+        sql = self.database_engine.convert_param_style(sql)
+
+        txn = db_conn.cursor()
+        txn.execute(sql, (int(max_value),))
+        rows = txn.fetchall()
+
+        cache = {
+            row[0]: int(row[1])
+            for row in rows
+        }
+
+        if cache:
+            min_val = min(cache.values())
+        else:
+            min_val = max_value
+
+        return cache, min_val
+
     @defer.inlineCallbacks
     def insert_client_ip(self, user, access_token, ip, user_agent):
         now = int(self._clock.time_msec())
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 5e77320540..2e97ac84a8 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -15,7 +15,7 @@
 import logging
 
 from synapse.api.errors import StoreError
-from synapse.util.logcontext import preserve_context_over_fn, LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.caches.dictionary_cache import DictionaryCache
 from synapse.util.caches.descriptors import Cache
 import synapse.metrics
@@ -185,7 +185,7 @@ class SQLBaseStore(object):
             time_then = self._previous_loop_ts
             self._previous_loop_ts = time_now
 
-            ratio = (curr - prev)/(time_now - time_then)
+            ratio = (curr - prev) / (time_now - time_then)
 
             top_three_counters = self._txn_perf_counters.interval(
                 time_now - time_then, limit=3
@@ -298,10 +298,10 @@ class SQLBaseStore(object):
                     func, *args, **kwargs
                 )
 
-        result = yield preserve_context_over_fn(
-            self._db_pool.runWithConnection,
-            inner_func, *args, **kwargs
-        )
+        with PreserveLoggingContext():
+            result = yield self._db_pool.runWithConnection(
+                inner_func, *args, **kwargs
+            )
 
         for after_callback, after_args in after_callbacks:
             after_callback(*after_args)
@@ -326,10 +326,10 @@ class SQLBaseStore(object):
 
                 return func(conn, *args, **kwargs)
 
-        result = yield preserve_context_over_fn(
-            self._db_pool.runWithConnection,
-            inner_func, *args, **kwargs
-        )
+        with PreserveLoggingContext():
+            result = yield self._db_pool.runWithConnection(
+                inner_func, *args, **kwargs
+            )
 
         defer.returnValue(result)
 
@@ -643,7 +643,10 @@ class SQLBaseStore(object):
         if not iterable:
             defer.returnValue(results)
 
-        chunks = [iterable[i:i+batch_size] for i in xrange(0, len(iterable), batch_size)]
+        chunks = [
+            iterable[i:i + batch_size]
+            for i in xrange(0, len(iterable), batch_size)
+        ]
         for chunk in chunks:
             rows = yield self.runInteraction(
                 desc,
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index ed6587429b..b8387fc500 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -14,7 +14,6 @@
 # limitations under the License.
 
 from ._base import SQLBaseStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
 from twisted.internet import defer
 
 import ujson as json
@@ -24,14 +23,6 @@ logger = logging.getLogger(__name__)
 
 
 class AccountDataStore(SQLBaseStore):
-    def __init__(self, hs):
-        super(AccountDataStore, self).__init__(hs)
-
-        self._account_data_stream_cache = StreamChangeCache(
-            "AccountDataAndTagsChangeCache",
-            self._account_data_id_gen.get_max_token(None),
-            max_size=10000,
-        )
 
     def get_account_data_for_user(self, user_id):
         """Get all the client account_data for a user.
@@ -166,6 +157,10 @@ class AccountDataStore(SQLBaseStore):
                     "content": content_json,
                 }
             )
+            txn.call_after(
+                self._account_data_stream_cache.entity_has_changed,
+                user_id, next_id,
+            )
             self._update_max_stream_id(txn, next_id)
 
         with (yield self._account_data_id_gen.get_next(self)) as next_id:
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index b5aa55c0a3..1100c67714 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -276,7 +276,8 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
             "application_services_state",
             dict(as_id=service.id),
             ["state"],
-            allow_none=True
+            allow_none=True,
+            desc="get_appservice_state",
         )
         if result:
             defer.returnValue(result.get("state"))
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index 400c10103c..91fac33b8b 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -54,7 +54,7 @@ class Sqlite3Engine(object):
 
 def _parse_match_info(buf):
     bufsize = len(buf)
-    return [struct.unpack('@I', buf[i:i+4])[0] for i in range(0, bufsize, 4)]
+    return [struct.unpack('@I', buf[i:i + 4])[0] for i in range(0, bufsize, 4)]
 
 
 def _rank(raw_match_info):
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 5f32eec6f8..ce2c794025 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -58,7 +58,7 @@ class EventFederationStore(SQLBaseStore):
             new_front = set()
             front_list = list(front)
             chunks = [
-                front_list[x:x+100]
+                front_list[x:x + 100]
                 for x in xrange(0, len(front), 100)
             ]
             for chunk in chunks:
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index a05c4f84cf..d77a817682 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -24,8 +24,7 @@ logger = logging.getLogger(__name__)
 
 
 class EventPushActionsStore(SQLBaseStore):
-    @defer.inlineCallbacks
-    def set_push_actions_for_event_and_users(self, event, tuples):
+    def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
         """
         :param event: the event set actions for
         :param tuples: list of tuples of (user_id, profile_tag, actions)
@@ -37,21 +36,19 @@ class EventPushActionsStore(SQLBaseStore):
                 'event_id': event.event_id,
                 'user_id': uid,
                 'profile_tag': profile_tag,
-                'actions': json.dumps(actions)
+                'actions': json.dumps(actions),
+                'stream_ordering': event.internal_metadata.stream_ordering,
+                'topological_ordering': event.depth,
+                'notif': 1,
+                'highlight': 1 if _action_has_highlight(actions) else 0,
             })
 
-        def f(txn):
-            for uid, _, __ in tuples:
-                txn.call_after(
-                    self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
-                    (event.room_id, uid)
-                )
-            return self._simple_insert_many_txn(txn, "event_push_actions", values)
-
-        yield self.runInteraction(
-            "set_actions_for_event_and_users",
-            f,
-        )
+        for uid, _, __ in tuples:
+            txn.call_after(
+                self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+                (event.room_id, uid)
+            )
+        self._simple_insert_many_txn(txn, "event_push_actions", values)
 
     @cachedInlineCallbacks(num_args=3, lru=True, tree=True)
     def get_unread_event_push_actions_by_room_for_user(
@@ -68,32 +65,34 @@ class EventPushActionsStore(SQLBaseStore):
             )
             results = txn.fetchall()
             if len(results) == 0:
-                return []
+                return {"notify_count": 0, "highlight_count": 0}
 
             stream_ordering = results[0][0]
             topological_ordering = results[0][1]
 
             sql = (
-                "SELECT ea.event_id, ea.actions"
-                " FROM event_push_actions ea, events e"
-                " WHERE ea.room_id = e.room_id"
-                " AND ea.event_id = e.event_id"
-                " AND ea.user_id = ?"
-                " AND ea.room_id = ?"
+                "SELECT sum(notif), sum(highlight)"
+                " FROM event_push_actions ea"
+                " WHERE"
+                " user_id = ?"
+                " AND room_id = ?"
                 " AND ("
-                "       e.topological_ordering > ?"
-                "       OR (e.topological_ordering = ? AND e.stream_ordering > ?)"
+                "       topological_ordering > ?"
+                "       OR (topological_ordering = ? AND stream_ordering > ?)"
                 ")"
             )
             txn.execute(sql, (
                 user_id, room_id,
                 topological_ordering, topological_ordering, stream_ordering
-            )
-            )
-            return [
-                {"event_id": row[0], "actions": json.loads(row[1])}
-                for row in txn.fetchall()
-            ]
+            ))
+            row = txn.fetchone()
+            if row:
+                return {
+                    "notify_count": row[0] or 0,
+                    "highlight_count": row[1] or 0,
+                }
+            else:
+                return {"notify_count": 0, "highlight_count": 0}
 
         ret = yield self.runInteraction(
             "get_unread_event_push_actions_by_room",
@@ -101,19 +100,24 @@ class EventPushActionsStore(SQLBaseStore):
         )
         defer.returnValue(ret)
 
-    @defer.inlineCallbacks
-    def remove_push_actions_for_event_id(self, room_id, event_id):
-        def f(txn):
-            # Sad that we have to blow away the cache for the whole room here
-            txn.call_after(
-                self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
-                (room_id,)
-            )
-            txn.execute(
-                "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
-                (room_id, event_id)
-            )
-        yield self.runInteraction(
-            "remove_push_actions_for_event_id",
-            f
+    def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
+        # Sad that we have to blow away the cache for the whole room here
+        txn.call_after(
+            self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+            (room_id,)
         )
+        txn.execute(
+            "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
+            (room_id, event_id)
+        )
+
+
+def _action_has_highlight(actions):
+    for action in actions:
+        try:
+            if action.get("set_tweak", None) == "highlight":
+                return action.get("value", True)
+        except AttributeError:
+            pass
+
+    return False
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5e85552029..3a5c6ee4b1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,7 +19,7 @@ from twisted.internet import defer, reactor
 from synapse.events import FrozenEvent, USE_FROZEN_DICTS
 from synapse.events.utils import prune_event
 
-from synapse.util.logcontext import preserve_context_over_deferred
+from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
 
@@ -84,7 +84,7 @@ class EventsStore(SQLBaseStore):
                 event.internal_metadata.stream_ordering = stream
 
             chunks = [
-                events_and_contexts[x:x+100]
+                events_and_contexts[x:x + 100]
                 for x in xrange(0, len(events_and_contexts), 100)
             ]
 
@@ -205,23 +205,29 @@ class EventsStore(SQLBaseStore):
     @log_function
     def _persist_events_txn(self, txn, events_and_contexts, backfilled,
                             is_new_state=True):
-
-        # Remove the any existing cache entries for the event_ids
-        for event, _ in events_and_contexts:
+        depth_updates = {}
+        for event, context in events_and_contexts:
+            # Remove the any existing cache entries for the event_ids
             txn.call_after(self._invalidate_get_event_cache, event.event_id)
-
             if not backfilled:
                 txn.call_after(
                     self._events_stream_cache.entity_has_changed,
                     event.room_id, event.internal_metadata.stream_ordering,
                 )
 
-        depth_updates = {}
-        for event, _ in events_and_contexts:
-            if event.internal_metadata.is_outlier():
-                continue
-            depth_updates[event.room_id] = max(
-                event.depth, depth_updates.get(event.room_id, event.depth)
+            if not event.internal_metadata.is_outlier():
+                depth_updates[event.room_id] = max(
+                    event.depth, depth_updates.get(event.room_id, event.depth)
+                )
+
+            if context.push_actions:
+                self._set_push_actions_for_event_and_users_txn(
+                    txn, event, context.push_actions
+                )
+
+        if event.type == EventTypes.Redaction and event.redacts is not None:
+            self._remove_push_actions_for_event_id_txn(
+                txn, event.room_id, event.redacts
             )
 
         for room_id, depth in depth_updates.items():
@@ -664,14 +670,16 @@ class EventsStore(SQLBaseStore):
                     for ids, d in lst:
                         if not d.called:
                             try:
-                                d.callback([
-                                    res[i]
-                                    for i in ids
-                                    if i in res
-                                ])
+                                with PreserveLoggingContext():
+                                    d.callback([
+                                        res[i]
+                                        for i in ids
+                                        if i in res
+                                    ])
                             except:
                                 logger.exception("Failed to callback")
-                reactor.callFromThread(fire, event_list, row_dict)
+                with PreserveLoggingContext():
+                    reactor.callFromThread(fire, event_list, row_dict)
             except Exception as e:
                 logger.exception("do_fetch")
 
@@ -679,10 +687,12 @@ class EventsStore(SQLBaseStore):
                 def fire(evs):
                     for _, d in evs:
                         if not d.called:
-                            d.errback(e)
+                            with PreserveLoggingContext():
+                                d.errback(e)
 
                 if event_list:
-                    reactor.callFromThread(fire, event_list)
+                    with PreserveLoggingContext():
+                        reactor.callFromThread(fire, event_list)
 
     @defer.inlineCallbacks
     def _enqueue_events(self, events, check_redacted=True,
@@ -709,18 +719,20 @@ class EventsStore(SQLBaseStore):
                 should_start = False
 
         if should_start:
-            self.runWithConnection(
-                self._do_fetch
-            )
+            with PreserveLoggingContext():
+                self.runWithConnection(
+                    self._do_fetch
+                )
 
-        rows = yield preserve_context_over_deferred(events_d)
+        with PreserveLoggingContext():
+            rows = yield events_d
 
         if not allow_rejected:
             rows[:] = [r for r in rows if not r["rejects"]]
 
         res = yield defer.gatherResults(
             [
-                self._get_event_from_row(
+                preserve_fn(self._get_event_from_row)(
                     row["internal_metadata"], row["json"], row["redacts"],
                     check_redacted=check_redacted,
                     get_prev_content=get_prev_content,
@@ -740,7 +752,7 @@ class EventsStore(SQLBaseStore):
         rows = []
         N = 200
         for i in range(1 + len(events) / N):
-            evs = events[i*N:(i + 1)*N]
+            evs = events[i * N:(i + 1) * N]
             if not evs:
                 break
 
@@ -755,7 +767,7 @@ class EventsStore(SQLBaseStore):
                 " LEFT JOIN rejections as rej USING (event_id)"
                 " LEFT JOIN redactions as r ON e.event_id = r.redacts"
                 " WHERE e.event_id IN (%s)"
-            ) % (",".join(["?"]*len(evs)),)
+            ) % (",".join(["?"] * len(evs)),)
 
             txn.execute(sql, evs)
             rows.extend(self.cursor_to_dict(txn))
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 8022b8cfc6..fd05bfe54e 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -39,6 +39,7 @@ class KeyStore(SQLBaseStore):
             table="server_tls_certificates",
             keyvalues={"server_name": server_name},
             retcols=("tls_certificate",),
+            desc="get_server_certificate",
         )
         tls_certificate = OpenSSL.crypto.load_certificate(
             OpenSSL.crypto.FILETYPE_ASN1, tls_certificate_bytes,
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index c1f5f99789..850736c85e 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 28
+SCHEMA_VERSION = 29
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
@@ -211,7 +211,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
     logger.debug("applied_delta_files: %s", applied_delta_files)
 
     for v in range(start_ver, SCHEMA_VERSION + 1):
-        logger.debug("Upgrading schema to v%d", v)
+        logger.info("Upgrading schema to v%d", v)
 
         delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
 
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 9b3aecaf8c..ef525f34c5 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -68,8 +68,9 @@ class PresenceStore(SQLBaseStore):
             for row in rows
         })
 
+    @defer.inlineCallbacks
     def set_presence_state(self, user_localpart, new_state):
-        res = self._simple_update_one(
+        res = yield self._simple_update_one(
             table="presence",
             keyvalues={"user_id": user_localpart},
             updatevalues={"state": new_state["state"],
@@ -79,7 +80,7 @@ class PresenceStore(SQLBaseStore):
         )
 
         self.get_presence_state.invalidate((user_localpart,))
-        return res
+        defer.returnValue(res)
 
     def allow_presence_visible(self, observed_localpart, observer_userid):
         return self._simple_insert(
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 8068c73740..4202a6b3dc 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -46,6 +46,20 @@ class ReceiptsStore(SQLBaseStore):
             desc="get_receipts_for_room",
         )
 
+    @cached(num_args=3)
+    def get_last_receipt_event_id_for_user(self, user_id, room_id, receipt_type):
+        return self._simple_select_one_onecol(
+            table="receipts_linearized",
+            keyvalues={
+                "room_id": room_id,
+                "receipt_type": receipt_type,
+                "user_id": user_id
+            },
+            retcol="event_id",
+            desc="get_own_receipt_for_user",
+            allow_none=True,
+        )
+
     @cachedInlineCallbacks(num_args=2)
     def get_receipts_for_user(self, user_id, receipt_type):
         def f(txn):
@@ -226,6 +240,11 @@ class ReceiptsStore(SQLBaseStore):
             room_id, stream_id
         )
 
+        txn.call_after(
+            self.get_last_receipt_event_id_for_user.invalidate,
+            (user_id, room_id, receipt_type)
+        )
+
         # We don't want to clobber receipts for more recent events, so we
         # have to compare orderings of existing receipts
         sql = (
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 70cde0d04d..967c732bda 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import re
+
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError, Codes
@@ -134,6 +136,7 @@ class RegistrationStore(SQLBaseStore):
             },
             retcols=["name", "password_hash", "is_guest"],
             allow_none=True,
+            desc="get_user_by_id",
         )
 
     def get_users_by_id_case_insensitive(self, user_id):
@@ -350,3 +353,37 @@ class RegistrationStore(SQLBaseStore):
 
         ret = yield self.runInteraction("count_users", _count_users)
         defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def find_next_generated_user_id_localpart(self):
+        """
+        Gets the localpart of the next generated user ID.
+
+        Generated user IDs are integers, and we aim for them to be as small as
+        we can. Unfortunately, it's possible some of them are already taken by
+        existing users, and there may be gaps in the already taken range. This
+        function returns the start of the first allocatable gap. This is to
+        avoid the case of ID 10000000 being pre-allocated, so us wasting the
+        first (and shortest) many generated user IDs.
+        """
+        def _find_next_generated_user_id(txn):
+            txn.execute("SELECT name FROM users")
+            rows = self.cursor_to_dict(txn)
+
+            regex = re.compile("^@(\d+):")
+
+            found = set()
+
+            for r in rows:
+                user_id = r["name"]
+                match = regex.search(user_id)
+                if match:
+                    found.add(int(match.group(1)))
+            for i in xrange(len(found) + 1):
+                if i not in found:
+                    return i
+
+        defer.returnValue((yield self.runInteraction(
+            "find_next_generated_user_id",
+            _find_next_generated_user_id
+        )))
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index dc09a3aaba..46ab38a313 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -87,90 +87,20 @@ class RoomStore(SQLBaseStore):
             desc="get_public_room_ids",
         )
 
-    @defer.inlineCallbacks
-    def get_rooms(self, is_public):
-        """Retrieve a list of all public rooms.
-
-        Args:
-            is_public (bool): True if the rooms returned should be public.
-        Returns:
-            A list of room dicts containing at least a "room_id" key, a
-            "topic" key if one is set, and a "name" key if one is set
+    def get_room_count(self):
+        """Retrieve a list of all rooms
         """
 
         def f(txn):
-            def subquery(table_name, column_name=None):
-                column_name = column_name or table_name
-                return (
-                    "SELECT %(table_name)s.event_id as event_id, "
-                    "%(table_name)s.room_id as room_id, %(column_name)s "
-                    "FROM %(table_name)s "
-                    "INNER JOIN current_state_events as c "
-                    "ON c.event_id = %(table_name)s.event_id " % {
-                        "column_name": column_name,
-                        "table_name": table_name,
-                    }
-                )
-
-            sql = (
-                "SELECT"
-                "    r.room_id,"
-                "    max(n.name),"
-                "    max(t.topic),"
-                "    max(v.history_visibility),"
-                "    max(g.guest_access)"
-                " FROM rooms AS r"
-                " LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id"
-                " LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id"
-                " LEFT JOIN (%(history_visibility)s) AS v ON v.room_id = r.room_id"
-                " LEFT JOIN (%(guest_access)s) AS g ON g.room_id = r.room_id"
-                " WHERE r.is_public = ?"
-                " GROUP BY r.room_id" % {
-                    "topic": subquery("topics", "topic"),
-                    "name": subquery("room_names", "name"),
-                    "history_visibility": subquery("history_visibility"),
-                    "guest_access": subquery("guest_access"),
-                }
-            )
-
-            txn.execute(sql, (is_public,))
-
-            rows = txn.fetchall()
-
-            for i, row in enumerate(rows):
-                room_id = row[0]
-                aliases = self._simple_select_onecol_txn(
-                    txn,
-                    table="room_aliases",
-                    keyvalues={
-                        "room_id": room_id
-                    },
-                    retcol="room_alias",
-                )
+            sql = "SELECT count(*)  FROM rooms"
+            txn.execute(sql)
+            row = txn.fetchone()
+            return row[0] or 0
 
-                rows[i] = list(row) + [aliases]
-
-            return rows
-
-        rows = yield self.runInteraction(
+        return self.runInteraction(
             "get_rooms", f
         )
 
-        ret = [
-            {
-                "room_id": r[0],
-                "name": r[1],
-                "topic": r[2],
-                "world_readable": r[3] == "world_readable",
-                "guest_can_join": r[4] == "can_join",
-                "aliases": r[5],
-            }
-            for r in rows
-            if r[5]  # We only return rooms that have at least one alias.
-        ]
-
-        defer.returnValue(ret)
-
     def _store_room_topic_txn(self, txn, event):
         if hasattr(event, "content") and "topic" in event.content:
             self._simple_insert_txn(
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 1d3e004c90..3065b0c1a5 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -58,6 +58,10 @@ class RoomMemberStore(SQLBaseStore):
             txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,))
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
+            txn.call_after(
+                self._membership_stream_cache.entity_has_changed,
+                event.state_key, event.internal_metadata.stream_ordering
+            )
 
     def get_room_member(self, user_id, room_id):
         """Retrieve the current state of a room member.
diff --git a/synapse/storage/schema/delta/28/public_roms_index.sql b/synapse/storage/schema/delta/28/public_roms_index.sql
new file mode 100644
index 0000000000..ba62a974a4
--- /dev/null
+++ b/synapse/storage/schema/delta/28/public_roms_index.sql
@@ -0,0 +1,16 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+CREATE INDEX public_room_index on rooms(is_public);
diff --git a/synapse/storage/schema/delta/29/push_actions.sql b/synapse/storage/schema/delta/29/push_actions.sql
new file mode 100644
index 0000000000..7e7b09820a
--- /dev/null
+++ b/synapse/storage/schema/delta/29/push_actions.sql
@@ -0,0 +1,31 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE event_push_actions ADD COLUMN topological_ordering BIGINT;
+ALTER TABLE event_push_actions ADD COLUMN stream_ordering BIGINT;
+ALTER TABLE event_push_actions ADD COLUMN notif SMALLINT;
+ALTER TABLE event_push_actions ADD COLUMN highlight SMALLINT;
+
+UPDATE event_push_actions SET stream_ordering = (
+    SELECT stream_ordering FROM events WHERE event_id = event_push_actions.event_id
+), topological_ordering = (
+    SELECT topological_ordering FROM events WHERE event_id = event_push_actions.event_id
+);
+
+UPDATE event_push_actions SET notif = 1, highlight = 0;
+
+CREATE INDEX event_push_actions_rm_tokens on event_push_actions(
+    user_id, room_id, topological_ordering, stream_ordering
+);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 6c32e8f7b3..372b540002 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -171,41 +171,43 @@ class StateStore(SQLBaseStore):
         events = yield self._get_events(event_ids, get_prev_content=False)
         defer.returnValue(events)
 
-    def _get_state_groups_from_groups(self, groups_and_types):
+    def _get_state_groups_from_groups(self, groups, types):
         """Returns dictionary state_group -> state event ids
-
-        Args:
-            groups_and_types (list): list of 2-tuple (`group`, `types`)
         """
-        def f(txn):
-            results = {}
-            for group, types in groups_and_types:
-                if types is not None:
-                    where_clause = "AND (%s)" % (
-                        " OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
-                    )
-                else:
-                    where_clause = ""
-
-                sql = (
-                    "SELECT event_id FROM state_groups_state WHERE"
-                    " state_group = ? %s"
-                ) % (where_clause,)
+        def f(txn, groups):
+            if types is not None:
+                where_clause = "AND (%s)" % (
+                    " OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
+                )
+            else:
+                where_clause = ""
 
-                args = [group]
-                if types is not None:
-                    args.extend([i for typ in types for i in typ])
+            sql = (
+                "SELECT state_group, event_id FROM state_groups_state WHERE"
+                " state_group IN (%s) %s" % (
+                    ",".join("?" for _ in groups),
+                    where_clause,
+                )
+            )
 
-                txn.execute(sql, args)
+            args = list(groups)
+            if types is not None:
+                args.extend([i for typ in types for i in typ])
 
-                results[group] = [r[0] for r in txn.fetchall()]
+            txn.execute(sql, args)
+            rows = self.cursor_to_dict(txn)
 
+            results = {}
+            for row in rows:
+                results.setdefault(row["state_group"], []).append(row["event_id"])
             return results
 
-        return self.runInteraction(
-            "_get_state_groups_from_groups",
-            f,
-        )
+        chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
+        for chunk in chunks:
+            return self.runInteraction(
+                "_get_state_groups_from_groups",
+                f, chunk
+            )
 
     @defer.inlineCallbacks
     def get_state_for_events(self, event_ids, types):
@@ -264,26 +266,20 @@ class StateStore(SQLBaseStore):
         )
 
     @cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids",
-                num_args=1)
+                num_args=1, inlineCallbacks=True)
     def _get_state_group_for_events(self, event_ids):
         """Returns mapping event_id -> state_group
         """
-        def f(txn):
-            results = {}
-            for event_id in event_ids:
-                results[event_id] = self._simple_select_one_onecol_txn(
-                    txn,
-                    table="event_to_state_groups",
-                    keyvalues={
-                        "event_id": event_id,
-                    },
-                    retcol="state_group",
-                    allow_none=True,
-                )
-
-            return results
+        rows = yield self._simple_select_many_batch(
+            table="event_to_state_groups",
+            column="event_id",
+            iterable=event_ids,
+            keyvalues={},
+            retcols=("event_id", "state_group",),
+            desc="_get_state_group_for_events",
+        )
 
-        return self.runInteraction("_get_state_group_for_events", f)
+        defer.returnValue({row["event_id"]: row["state_group"] for row in rows})
 
     def _get_some_state_from_cache(self, group, types):
         """Checks if group is in cache. See `_get_state_for_groups`
@@ -355,7 +351,7 @@ class StateStore(SQLBaseStore):
         all events are returned.
         """
         results = {}
-        missing_groups_and_types = []
+        missing_groups = []
         if types is not None:
             for group in set(groups):
                 state_dict, missing_types, got_all = self._get_some_state_from_cache(
@@ -364,7 +360,7 @@ class StateStore(SQLBaseStore):
                 results[group] = state_dict
 
                 if not got_all:
-                    missing_groups_and_types.append((group, missing_types))
+                    missing_groups.append(group)
         else:
             for group in set(groups):
                 state_dict, got_all = self._get_all_state_from_cache(
@@ -373,9 +369,9 @@ class StateStore(SQLBaseStore):
                 results[group] = state_dict
 
                 if not got_all:
-                    missing_groups_and_types.append((group, None))
+                    missing_groups.append(group)
 
-        if not missing_groups_and_types:
+        if not missing_groups:
             defer.returnValue({
                 group: {
                     type_tuple: event
@@ -389,7 +385,7 @@ class StateStore(SQLBaseStore):
         cache_seq_num = self._state_group_cache.sequence
 
         group_state_dict = yield self._get_state_groups_from_groups(
-            missing_groups_and_types
+            missing_groups, types
         )
 
         state_events = yield self._get_events(
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 6e81d46c60..367ffc9543 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -37,10 +37,9 @@ from twisted.internet import defer
 
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks
-from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.api.constants import EventTypes
 from synapse.types import RoomStreamToken
-from synapse.util.logutils import log_function
+from synapse.util.logcontext import preserve_fn
 
 import logging
 
@@ -78,13 +77,6 @@ def upper_bound(token):
 
 
 class StreamStore(SQLBaseStore):
-    def __init__(self, hs):
-        super(StreamStore, self).__init__(hs)
-
-        self._events_stream_cache = StreamChangeCache(
-            "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None)
-        )
-
     @defer.inlineCallbacks
     def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
         # NB this lives here instead of appservice.py so we can reuse the
@@ -177,14 +169,14 @@ class StreamStore(SQLBaseStore):
 
         results = {}
         room_ids = list(room_ids)
-        for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)):
+        for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
             res = yield defer.gatherResults([
-                self.get_room_events_stream_for_room(
-                    room_id, from_key, to_key, limit
-                ).addCallback(lambda r, rm: (rm, r), room_id)
+                preserve_fn(self.get_room_events_stream_for_room)(
+                    room_id, from_key, to_key, limit,
+                )
                 for room_id in room_ids
             ])
-            results.update(dict(res))
+            results.update(dict(zip(rm_ids, res)))
 
         defer.returnValue(results)
 
@@ -229,28 +221,30 @@ class StreamStore(SQLBaseStore):
 
             rows = self.cursor_to_dict(txn)
 
-            ret = self._get_events_txn(
-                txn,
-                [r["event_id"] for r in rows],
-                get_prev_content=True
-            )
+            return rows
+
+        rows = yield self.runInteraction("get_room_events_stream_for_room", f)
 
-            self._set_before_and_after(ret, rows, topo_order=False)
+        ret = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
 
-            ret.reverse()
+        self._set_before_and_after(ret, rows, topo_order=False)
 
-            if rows:
-                key = "s%d" % min(r["stream_ordering"] for r in rows)
-            else:
-                # Assume we didn't get anything because there was nothing to
-                # get.
-                key = from_key
+        ret.reverse()
 
-            return ret, key
-        res = yield self.runInteraction("get_room_events_stream_for_room", f)
-        defer.returnValue(res)
+        if rows:
+            key = "s%d" % min(r["stream_ordering"] for r in rows)
+        else:
+            # Assume we didn't get anything because there was nothing to
+            # get.
+            key = from_key
 
-    def get_room_changes_for_user(self, user_id, from_key, to_key):
+        defer.returnValue((ret, key))
+
+    @defer.inlineCallbacks
+    def get_membership_changes_for_user(self, user_id, from_key, to_key):
         if from_key is not None:
             from_id = RoomStreamToken.parse_stream_token(from_key).stream
         else:
@@ -258,7 +252,14 @@ class StreamStore(SQLBaseStore):
         to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
         if from_key == to_key:
-            return defer.succeed([])
+            defer.returnValue([])
+
+        if from_id:
+            has_changed = self._membership_stream_cache.has_entity_changed(
+                user_id, int(from_id)
+            )
+            if not has_changed:
+                defer.returnValue([])
 
         def f(txn):
             if from_id is not None:
@@ -283,17 +284,19 @@ class StreamStore(SQLBaseStore):
                 txn.execute(sql, (user_id, to_id,))
             rows = self.cursor_to_dict(txn)
 
-            ret = self._get_events_txn(
-                txn,
-                [r["event_id"] for r in rows],
-                get_prev_content=True
-            )
+            return rows
+
+        rows = yield self.runInteraction("get_membership_changes_for_user", f)
 
-            return ret
+        ret = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
 
-        return self.runInteraction("get_room_changes_for_user", f)
+        self._set_before_and_after(ret, rows, topo_order=False)
+
+        defer.returnValue(ret)
 
-    @log_function
     def get_room_events_stream(
         self,
         user_id,
@@ -324,11 +327,6 @@ class StreamStore(SQLBaseStore):
                 " WHERE m.user_id = ? AND m.membership = 'join'"
             )
             current_room_membership_args = [user_id]
-            if room_ids:
-                current_room_membership_sql += " AND m.room_id in (%s)" % (
-                    ",".join(map(lambda _: "?", room_ids))
-                )
-                current_room_membership_args = [user_id] + room_ids
 
         # We also want to get any membership events about that user, e.g.
         # invites or leave notifications.
@@ -567,6 +565,7 @@ class StreamStore(SQLBaseStore):
             table="events",
             keyvalues={"event_id": event_id},
             retcols=("stream_ordering", "topological_ordering"),
+            desc="get_topological_token_for_event",
         ).addCallback(lambda row: "t%d-%d" % (
             row["topological_ordering"], row["stream_ordering"],)
         )
@@ -604,6 +603,10 @@ class StreamStore(SQLBaseStore):
             internal = event.internal_metadata
             internal.before = str(RoomStreamToken(topo, stream - 1))
             internal.after = str(RoomStreamToken(topo, stream))
+            internal.order = (
+                int(topo) if topo else 0,
+                int(stream),
+            )
 
     @defer.inlineCallbacks
     def get_events_around(self, room_id, event_id, before_limit, after_limit):