summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-01-29 13:52:12 +0000
committerErik Johnston <erik@matrix.org>2016-01-29 13:52:12 +0000
commitfd142c29d9af53fe76dcf0cb4e1ebe7aac5d28f2 (patch)
treefd4650925097b4014e8ce32619ab4956a0a89879 /synapse/storage
parentBump version and changelog (diff)
parentBump AccountDataAndTagsChangeCache size (diff)
downloadsynapse-fd142c29d9af53fe76dcf0cb4e1ebe7aac5d28f2.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into release-v0.12.1
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py36
-rw-r--r--synapse/storage/_base.py49
-rw-r--r--synapse/storage/account_data.py21
-rw-r--r--synapse/storage/events.py20
-rw-r--r--synapse/storage/filtering.py3
-rw-r--r--synapse/storage/push_rule.py3
-rw-r--r--synapse/storage/receipts.py79
-rw-r--r--synapse/storage/roommember.py2
-rw-r--r--synapse/storage/schema/delta/28/events_room_stream.sql16
-rw-r--r--synapse/storage/stream.py171
-rw-r--r--synapse/storage/tags.py19
-rw-r--r--synapse/storage/util/id_generators.py36
12 files changed, 293 insertions, 162 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 7a3f6c4662..eb88842308 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -46,6 +46,9 @@ from .tags import TagsStore
 from .account_data import AccountDataStore
 
 
+from util.id_generators import IdGenerator, StreamIdGenerator
+
+
 import logging
 
 
@@ -79,18 +82,43 @@ class DataStore(RoomMemberStore, RoomStore,
                 EventPushActionsStore
                 ):
 
-    def __init__(self, hs):
-        super(DataStore, self).__init__(hs)
+    def __init__(self, db_conn, hs):
         self.hs = hs
 
-        self.min_token_deferred = self._get_min_token()
-        self.min_token = None
+        cur = db_conn.cursor()
+        try:
+            cur.execute("SELECT MIN(stream_ordering) FROM events",)
+            rows = cur.fetchall()
+            self.min_stream_token = rows[0][0] if rows and rows[0] and rows[0][0] else -1
+            self.min_stream_token = min(self.min_stream_token, -1)
+        finally:
+            cur.close()
 
         self.client_ip_last_seen = Cache(
             name="client_ip_last_seen",
             keylen=4,
         )
 
+        self._stream_id_gen = StreamIdGenerator(
+            db_conn, "events", "stream_ordering"
+        )
+        self._receipts_id_gen = StreamIdGenerator(
+            db_conn, "receipts_linearized", "stream_id"
+        )
+        self._account_data_id_gen = StreamIdGenerator(
+            db_conn, "account_data_max_stream_id", "stream_id"
+        )
+
+        self._transaction_id_gen = IdGenerator("sent_transactions", "id", self)
+        self._state_groups_id_gen = IdGenerator("state_groups", "id", self)
+        self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self)
+        self._refresh_tokens_id_gen = IdGenerator("refresh_tokens", "id", self)
+        self._pushers_id_gen = IdGenerator("pushers", "id", self)
+        self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
+        self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
+
+        super(DataStore, self).__init__(hs)
+
     @defer.inlineCallbacks
     def insert_client_ip(self, user, access_token, ip, user_agent):
         now = int(self._clock.time_msec())
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 90d7aee94a..5e77320540 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -15,13 +15,11 @@
 import logging
 
 from synapse.api.errors import StoreError
-from synapse.util.logutils import log_function
 from synapse.util.logcontext import preserve_context_over_fn, LoggingContext
 from synapse.util.caches.dictionary_cache import DictionaryCache
 from synapse.util.caches.descriptors import Cache
 import synapse.metrics
 
-from util.id_generators import IdGenerator, StreamIdGenerator
 
 from twisted.internet import defer
 
@@ -175,16 +173,6 @@ class SQLBaseStore(object):
 
         self.database_engine = hs.database_engine
 
-        self._stream_id_gen = StreamIdGenerator("events", "stream_ordering")
-        self._transaction_id_gen = IdGenerator("sent_transactions", "id", self)
-        self._state_groups_id_gen = IdGenerator("state_groups", "id", self)
-        self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self)
-        self._refresh_tokens_id_gen = IdGenerator("refresh_tokens", "id", self)
-        self._pushers_id_gen = IdGenerator("pushers", "id", self)
-        self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
-        self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
-        self._receipts_id_gen = StreamIdGenerator("receipts_linearized", "stream_id")
-
     def start_profiling(self):
         self._previous_loop_ts = self._clock.time_msec()
 
@@ -345,7 +333,8 @@ class SQLBaseStore(object):
 
         defer.returnValue(result)
 
-    def cursor_to_dict(self, cursor):
+    @staticmethod
+    def cursor_to_dict(cursor):
         """Converts a SQL cursor into an list of dicts.
 
         Args:
@@ -402,8 +391,8 @@ class SQLBaseStore(object):
             if not or_ignore:
                 raise
 
-    @log_function
-    def _simple_insert_txn(self, txn, table, values):
+    @staticmethod
+    def _simple_insert_txn(txn, table, values):
         keys, vals = zip(*values.items())
 
         sql = "INSERT INTO %s (%s) VALUES(%s)" % (
@@ -414,7 +403,8 @@ class SQLBaseStore(object):
 
         txn.execute(sql, vals)
 
-    def _simple_insert_many_txn(self, txn, table, values):
+    @staticmethod
+    def _simple_insert_many_txn(txn, table, values):
         if not values:
             return
 
@@ -537,9 +527,10 @@ class SQLBaseStore(object):
             table, keyvalues, retcol, allow_none=allow_none,
         )
 
-    def _simple_select_one_onecol_txn(self, txn, table, keyvalues, retcol,
+    @classmethod
+    def _simple_select_one_onecol_txn(cls, txn, table, keyvalues, retcol,
                                       allow_none=False):
-        ret = self._simple_select_onecol_txn(
+        ret = cls._simple_select_onecol_txn(
             txn,
             table=table,
             keyvalues=keyvalues,
@@ -554,7 +545,8 @@ class SQLBaseStore(object):
             else:
                 raise StoreError(404, "No row found")
 
-    def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol):
+    @staticmethod
+    def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
         sql = (
             "SELECT %(retcol)s FROM %(table)s WHERE %(where)s"
         ) % {
@@ -603,7 +595,8 @@ class SQLBaseStore(object):
             table, keyvalues, retcols
         )
 
-    def _simple_select_list_txn(self, txn, table, keyvalues, retcols):
+    @classmethod
+    def _simple_select_list_txn(cls, txn, table, keyvalues, retcols):
         """Executes a SELECT query on the named table, which may return zero or
         more rows, returning the result as a list of dicts.
 
@@ -627,7 +620,7 @@ class SQLBaseStore(object):
             )
             txn.execute(sql)
 
-        return self.cursor_to_dict(txn)
+        return cls.cursor_to_dict(txn)
 
     @defer.inlineCallbacks
     def _simple_select_many_batch(self, table, column, iterable, retcols,
@@ -662,7 +655,8 @@ class SQLBaseStore(object):
 
         defer.returnValue(results)
 
-    def _simple_select_many_txn(self, txn, table, column, iterable, keyvalues, retcols):
+    @classmethod
+    def _simple_select_many_txn(cls, txn, table, column, iterable, keyvalues, retcols):
         """Executes a SELECT query on the named table, which may return zero or
         more rows, returning the result as a list of dicts.
 
@@ -699,7 +693,7 @@ class SQLBaseStore(object):
             )
 
         txn.execute(sql, values)
-        return self.cursor_to_dict(txn)
+        return cls.cursor_to_dict(txn)
 
     def _simple_update_one(self, table, keyvalues, updatevalues,
                            desc="_simple_update_one"):
@@ -726,7 +720,8 @@ class SQLBaseStore(object):
             table, keyvalues, updatevalues,
         )
 
-    def _simple_update_one_txn(self, txn, table, keyvalues, updatevalues):
+    @staticmethod
+    def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
         update_sql = "UPDATE %s SET %s WHERE %s" % (
             table,
             ", ".join("%s = ?" % (k,) for k in updatevalues),
@@ -743,7 +738,8 @@ class SQLBaseStore(object):
         if txn.rowcount > 1:
             raise StoreError(500, "More than one row matched")
 
-    def _simple_select_one_txn(self, txn, table, keyvalues, retcols,
+    @staticmethod
+    def _simple_select_one_txn(txn, table, keyvalues, retcols,
                                allow_none=False):
         select_sql = "SELECT %s FROM %s WHERE %s" % (
             ", ".join(retcols),
@@ -784,7 +780,8 @@ class SQLBaseStore(object):
                 raise StoreError(500, "more than one row matched")
         return self.runInteraction(desc, func)
 
-    def _simple_delete_txn(self, txn, table, keyvalues):
+    @staticmethod
+    def _simple_delete_txn(txn, table, keyvalues):
         sql = "DELETE FROM %s WHERE %s" % (
             table,
             " AND ".join("%s = ?" % (k, ) for k in keyvalues)
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 9c6597e012..ed6587429b 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from ._base import SQLBaseStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from twisted.internet import defer
 
 import ujson as json
@@ -23,6 +24,14 @@ logger = logging.getLogger(__name__)
 
 
 class AccountDataStore(SQLBaseStore):
+    def __init__(self, hs):
+        super(AccountDataStore, self).__init__(hs)
+
+        self._account_data_stream_cache = StreamChangeCache(
+            "AccountDataAndTagsChangeCache",
+            self._account_data_id_gen.get_max_token(None),
+            max_size=10000,
+        )
 
     def get_account_data_for_user(self, user_id):
         """Get all the client account_data for a user.
@@ -83,7 +92,7 @@ class AccountDataStore(SQLBaseStore):
             "get_account_data_for_room", get_account_data_for_room_txn
         )
 
-    def get_updated_account_data_for_user(self, user_id, stream_id):
+    def get_updated_account_data_for_user(self, user_id, stream_id, room_ids=None):
         """Get all the client account_data for a that's changed.
 
         Args:
@@ -120,6 +129,12 @@ class AccountDataStore(SQLBaseStore):
 
             return (global_account_data, account_data_by_room)
 
+        changed = self._account_data_stream_cache.has_entity_changed(
+            user_id, int(stream_id)
+        )
+        if not changed:
+            return ({}, {})
+
         return self.runInteraction(
             "get_updated_account_data_for_user", get_updated_account_data_for_user_txn
         )
@@ -186,6 +201,10 @@ class AccountDataStore(SQLBaseStore):
                     "content": content_json,
                 }
             )
+            txn.call_after(
+                self._account_data_stream_cache.entity_has_changed,
+                user_id, next_id,
+            )
             self._update_max_stream_id(txn, next_id)
 
         with (yield self._account_data_id_gen.get_next(self)) as next_id:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ba368a3eca..5e85552029 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -66,11 +66,9 @@ class EventsStore(SQLBaseStore):
             return
 
         if backfilled:
-            if not self.min_token_deferred.called:
-                yield self.min_token_deferred
-            start = self.min_token - 1
-            self.min_token -= len(events_and_contexts) + 1
-            stream_orderings = range(start, self.min_token, -1)
+            start = self.min_stream_token - 1
+            self.min_stream_token -= len(events_and_contexts) + 1
+            stream_orderings = range(start, self.min_stream_token, -1)
 
             @contextmanager
             def stream_ordering_manager():
@@ -107,10 +105,8 @@ class EventsStore(SQLBaseStore):
                       is_new_state=True, current_state=None):
         stream_ordering = None
         if backfilled:
-            if not self.min_token_deferred.called:
-                yield self.min_token_deferred
-            self.min_token -= 1
-            stream_ordering = self.min_token
+            self.min_stream_token -= 1
+            stream_ordering = self.min_stream_token
 
         if stream_ordering is None:
             stream_ordering_manager = yield self._stream_id_gen.get_next(self)
@@ -214,6 +210,12 @@ class EventsStore(SQLBaseStore):
         for event, _ in events_and_contexts:
             txn.call_after(self._invalidate_get_event_cache, event.event_id)
 
+            if not backfilled:
+                txn.call_after(
+                    self._events_stream_cache.entity_has_changed,
+                    event.room_id, event.internal_metadata.stream_ordering,
+                )
+
         depth_updates = {}
         for event, _ in events_and_contexts:
             if event.internal_metadata.is_outlier():
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index f8fc9bdddc..5248736816 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -16,12 +16,13 @@
 from twisted.internet import defer
 
 from ._base import SQLBaseStore
+from synapse.util.caches.descriptors import cachedInlineCallbacks
 
 import simplejson as json
 
 
 class FilteringStore(SQLBaseStore):
-    @defer.inlineCallbacks
+    @cachedInlineCallbacks(num_args=2)
     def get_user_filter(self, user_localpart, filter_id):
         def_json = yield self._simple_select_one_onecol(
             table="user_filters",
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 1f51c90ee5..f9a48171ba 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -130,7 +130,8 @@ class PushRuleStore(SQLBaseStore):
 
     def _add_push_rule_relative_txn(self, txn, user_id, **kwargs):
         after = kwargs.pop("after", None)
-        relative_to_rule = kwargs.pop("before", after)
+        before = kwargs.pop("before", None)
+        relative_to_rule = before or after
 
         res = self._simple_select_one_txn(
             txn,
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index c4232bdc65..8068c73740 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -15,11 +15,10 @@
 
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
-from synapse.util.caches import cache_counter, caches_by_name
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 from twisted.internet import defer
 
-from blist import sorteddict
 import logging
 import ujson as json
 
@@ -31,7 +30,9 @@ class ReceiptsStore(SQLBaseStore):
     def __init__(self, hs):
         super(ReceiptsStore, self).__init__(hs)
 
-        self._receipts_stream_cache = _RoomStreamChangeCache()
+        self._receipts_stream_cache = StreamChangeCache(
+            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token(None)
+        )
 
     @cached(num_args=2)
     def get_receipts_for_room(self, room_id, receipt_type):
@@ -76,8 +77,8 @@ class ReceiptsStore(SQLBaseStore):
         room_ids = set(room_ids)
 
         if from_key:
-            room_ids = yield self._receipts_stream_cache.get_rooms_changed(
-                self, room_ids, from_key
+            room_ids = yield self._receipts_stream_cache.get_entities_changed(
+                room_ids, from_key
             )
 
         results = yield self._get_linearized_receipts_for_rooms(
@@ -220,6 +221,11 @@ class ReceiptsStore(SQLBaseStore):
         # FIXME: This shouldn't invalidate the whole cache
         txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
 
+        txn.call_after(
+            self._receipts_stream_cache.entity_has_changed,
+            room_id, stream_id
+        )
+
         # We don't want to clobber receipts for more recent events, so we
         # have to compare orderings of existing receipts
         sql = (
@@ -307,9 +313,6 @@ class ReceiptsStore(SQLBaseStore):
 
         stream_id_manager = yield self._receipts_id_gen.get_next(self)
         with stream_id_manager as stream_id:
-            yield self._receipts_stream_cache.room_has_changed(
-                self, room_id, stream_id
-            )
             have_persisted = yield self.runInteraction(
                 "insert_linearized_receipt",
                 self.insert_linearized_receipt_txn,
@@ -368,63 +371,3 @@ class ReceiptsStore(SQLBaseStore):
                 "data": json.dumps(data),
             }
         )
-
-
-class _RoomStreamChangeCache(object):
-    """Keeps track of the stream_id of the latest change in rooms.
-
-    Given a list of rooms and stream key, it will give a subset of rooms that
-    may have changed since that key. If the key is too old then the cache
-    will simply return all rooms.
-    """
-    def __init__(self, size_of_cache=10000):
-        self._size_of_cache = size_of_cache
-        self._room_to_key = {}
-        self._cache = sorteddict()
-        self._earliest_key = None
-        self.name = "ReceiptsRoomChangeCache"
-        caches_by_name[self.name] = self._cache
-
-    @defer.inlineCallbacks
-    def get_rooms_changed(self, store, room_ids, key):
-        """Returns subset of room ids that have had new receipts since the
-        given key. If the key is too old it will just return the given list.
-        """
-        if key > (yield self._get_earliest_key(store)):
-            keys = self._cache.keys()
-            i = keys.bisect_right(key)
-
-            result = set(
-                self._cache[k] for k in keys[i:]
-            ).intersection(room_ids)
-
-            cache_counter.inc_hits(self.name)
-        else:
-            result = room_ids
-            cache_counter.inc_misses(self.name)
-
-        defer.returnValue(result)
-
-    @defer.inlineCallbacks
-    def room_has_changed(self, store, room_id, key):
-        """Informs the cache that the room has been changed at the given key.
-        """
-        if key > (yield self._get_earliest_key(store)):
-            old_key = self._room_to_key.get(room_id, None)
-            if old_key:
-                key = max(key, old_key)
-                self._cache.pop(old_key, None)
-            self._cache[key] = room_id
-
-            while len(self._cache) > self._size_of_cache:
-                k, r = self._cache.popitem()
-                self._earliest_key = max(k, self._earliest_key)
-                self._room_to_key.pop(r, None)
-
-    @defer.inlineCallbacks
-    def _get_earliest_key(self, store):
-        if self._earliest_key is None:
-            self._earliest_key = yield store.get_max_receipt_stream_id()
-            self._earliest_key = int(self._earliest_key)
-
-        defer.returnValue(self._earliest_key)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index edfecced05..1d3e004c90 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -241,7 +241,7 @@ class RoomMemberStore(SQLBaseStore):
 
         return rows
 
-    @cached()
+    @cached(max_entries=5000)
     def get_rooms_for_user(self, user_id):
         return self.get_rooms_for_user_where_membership_is(
             user_id, membership_list=[Membership.JOIN],
diff --git a/synapse/storage/schema/delta/28/events_room_stream.sql b/synapse/storage/schema/delta/28/events_room_stream.sql
new file mode 100644
index 0000000000..200c35e6e2
--- /dev/null
+++ b/synapse/storage/schema/delta/28/events_room_stream.sql
@@ -0,0 +1,16 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+CREATE INDEX events_room_stream on events(room_id, stream_ordering);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 02b1913e26..6e81d46c60 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -37,6 +37,7 @@ from twisted.internet import defer
 
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.api.constants import EventTypes
 from synapse.types import RoomStreamToken
 from synapse.util.logutils import log_function
@@ -77,6 +78,12 @@ def upper_bound(token):
 
 
 class StreamStore(SQLBaseStore):
+    def __init__(self, hs):
+        super(StreamStore, self).__init__(hs)
+
+        self._events_stream_cache = StreamChangeCache(
+            "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None)
+        )
 
     @defer.inlineCallbacks
     def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
@@ -157,6 +164,135 @@ class StreamStore(SQLBaseStore):
         results = yield self.runInteraction("get_appservice_room_stream", f)
         defer.returnValue(results)
 
+    @defer.inlineCallbacks
+    def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0):
+        from_id = RoomStreamToken.parse_stream_token(from_key).stream
+
+        room_ids = yield self._events_stream_cache.get_entities_changed(
+            room_ids, from_id
+        )
+
+        if not room_ids:
+            defer.returnValue({})
+
+        results = {}
+        room_ids = list(room_ids)
+        for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)):
+            res = yield defer.gatherResults([
+                self.get_room_events_stream_for_room(
+                    room_id, from_key, to_key, limit
+                ).addCallback(lambda r, rm: (rm, r), room_id)
+                for room_id in room_ids
+            ])
+            results.update(dict(res))
+
+        defer.returnValue(results)
+
+    @defer.inlineCallbacks
+    def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0):
+        if from_key is not None:
+            from_id = RoomStreamToken.parse_stream_token(from_key).stream
+        else:
+            from_id = None
+        to_id = RoomStreamToken.parse_stream_token(to_key).stream
+
+        if from_key == to_key:
+            defer.returnValue(([], from_key))
+
+        if from_id:
+            has_changed = yield self._events_stream_cache.has_entity_changed(
+                room_id, from_id
+            )
+
+            if not has_changed:
+                defer.returnValue(([], from_key))
+
+        def f(txn):
+            if from_id is not None:
+                sql = (
+                    "SELECT event_id, stream_ordering FROM events WHERE"
+                    " room_id = ?"
+                    " AND not outlier"
+                    " AND stream_ordering > ? AND stream_ordering <= ?"
+                    " ORDER BY stream_ordering DESC LIMIT ?"
+                )
+                txn.execute(sql, (room_id, from_id, to_id, limit))
+            else:
+                sql = (
+                    "SELECT event_id, stream_ordering FROM events WHERE"
+                    " room_id = ?"
+                    " AND not outlier"
+                    " AND stream_ordering <= ?"
+                    " ORDER BY stream_ordering DESC LIMIT ?"
+                )
+                txn.execute(sql, (room_id, to_id, limit))
+
+            rows = self.cursor_to_dict(txn)
+
+            ret = self._get_events_txn(
+                txn,
+                [r["event_id"] for r in rows],
+                get_prev_content=True
+            )
+
+            self._set_before_and_after(ret, rows, topo_order=False)
+
+            ret.reverse()
+
+            if rows:
+                key = "s%d" % min(r["stream_ordering"] for r in rows)
+            else:
+                # Assume we didn't get anything because there was nothing to
+                # get.
+                key = from_key
+
+            return ret, key
+        res = yield self.runInteraction("get_room_events_stream_for_room", f)
+        defer.returnValue(res)
+
+    def get_room_changes_for_user(self, user_id, from_key, to_key):
+        if from_key is not None:
+            from_id = RoomStreamToken.parse_stream_token(from_key).stream
+        else:
+            from_id = None
+        to_id = RoomStreamToken.parse_stream_token(to_key).stream
+
+        if from_key == to_key:
+            return defer.succeed([])
+
+        def f(txn):
+            if from_id is not None:
+                sql = (
+                    "SELECT m.event_id, stream_ordering FROM events AS e,"
+                    " room_memberships AS m"
+                    " WHERE e.event_id = m.event_id"
+                    " AND m.user_id = ?"
+                    " AND e.stream_ordering > ? AND e.stream_ordering <= ?"
+                    " ORDER BY e.stream_ordering ASC"
+                )
+                txn.execute(sql, (user_id, from_id, to_id,))
+            else:
+                sql = (
+                    "SELECT m.event_id, stream_ordering FROM events AS e,"
+                    " room_memberships AS m"
+                    " WHERE e.event_id = m.event_id"
+                    " AND m.user_id = ?"
+                    " AND stream_ordering <= ?"
+                    " ORDER BY stream_ordering ASC"
+                )
+                txn.execute(sql, (user_id, to_id,))
+            rows = self.cursor_to_dict(txn)
+
+            ret = self._get_events_txn(
+                txn,
+                [r["event_id"] for r in rows],
+                get_prev_content=True
+            )
+
+            return ret
+
+        return self.runInteraction("get_room_changes_for_user", f)
+
     @log_function
     def get_room_events_stream(
         self,
@@ -174,7 +310,8 @@ class StreamStore(SQLBaseStore):
                 "SELECT c.room_id FROM history_visibility AS h"
                 " INNER JOIN current_state_events AS c"
                 " ON h.event_id = c.event_id"
-                " WHERE c.room_id IN (%s) AND h.history_visibility = 'world_readable'" % (
+                " WHERE c.room_id IN (%s)"
+                " AND h.history_visibility = 'world_readable'" % (
                     ",".join(map(lambda _: "?", room_ids))
                 )
             )
@@ -434,6 +571,18 @@ class StreamStore(SQLBaseStore):
             row["topological_ordering"], row["stream_ordering"],)
         )
 
+    def get_max_topological_token_for_stream_and_room(self, room_id, stream_key):
+        sql = (
+            "SELECT max(topological_ordering) FROM events"
+            " WHERE room_id = ? AND stream_ordering < ?"
+        )
+        return self._execute(
+            "get_max_topological_token_for_stream_and_room", None,
+            sql, room_id, stream_key,
+        ).addCallback(
+            lambda r: r[0][0] if r else 0
+        )
+
     def _get_max_topological_txn(self, txn):
         txn.execute(
             "SELECT MAX(topological_ordering) FROM events"
@@ -444,24 +593,14 @@ class StreamStore(SQLBaseStore):
         rows = txn.fetchall()
         return rows[0][0] if rows else 0
 
-    @defer.inlineCallbacks
-    def _get_min_token(self):
-        row = yield self._execute(
-            "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events"
-        )
-
-        self.min_token = row[0][0] if row and row[0] and row[0][0] else -1
-        self.min_token = min(self.min_token, -1)
-
-        logger.debug("min_token is: %s", self.min_token)
-
-        defer.returnValue(self.min_token)
-
     @staticmethod
-    def _set_before_and_after(events, rows):
+    def _set_before_and_after(events, rows, topo_order=True):
         for event, row in zip(events, rows):
             stream = row["stream_ordering"]
-            topo = event.depth
+            if topo_order:
+                topo = event.depth
+            else:
+                topo = None
             internal = event.internal_metadata
             internal.before = str(RoomStreamToken(topo, stream - 1))
             internal.after = str(RoomStreamToken(topo, stream))
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index ed9c91e5ea..e1a9c0c261 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -16,7 +16,6 @@
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached
 from twisted.internet import defer
-from .util.id_generators import StreamIdGenerator
 
 import ujson as json
 import logging
@@ -25,13 +24,6 @@ logger = logging.getLogger(__name__)
 
 
 class TagsStore(SQLBaseStore):
-    def __init__(self, hs):
-        super(TagsStore, self).__init__(hs)
-
-        self._account_data_id_gen = StreamIdGenerator(
-            "account_data_max_stream_id", "stream_id"
-        )
-
     def get_max_account_data_stream_id(self):
         """Get the current max stream id for the private user data stream
 
@@ -87,6 +79,12 @@ class TagsStore(SQLBaseStore):
             room_ids = [row[0] for row in txn.fetchall()]
             return room_ids
 
+        changed = self._account_data_stream_cache.has_entity_changed(
+            user_id, int(stream_id)
+        )
+        if not changed:
+            defer.returnValue({})
+
         room_ids = yield self.runInteraction(
             "get_updated_tags", get_updated_tags_txn
         )
@@ -184,6 +182,11 @@ class TagsStore(SQLBaseStore):
             next_id(int): The the revision to advance to.
         """
 
+        txn.call_after(
+            self._account_data_stream_cache.entity_has_changed,
+            user_id, next_id
+        )
+
         update_max_id_sql = (
             "UPDATE account_data_max_stream_id"
             " SET stream_id = ?"
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index f58bf7fd2c..5c522f4ab9 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -72,28 +72,24 @@ class StreamIdGenerator(object):
         with stream_id_gen.get_next_txn(txn) as stream_id:
             # ... persist event ...
     """
-    def __init__(self, table, column):
+    def __init__(self, db_conn, table, column):
         self.table = table
         self.column = column
 
         self._lock = threading.Lock()
 
-        self._current_max = None
+        cur = db_conn.cursor()
+        self._current_max = self._get_or_compute_current_max(cur)
+        cur.close()
+
         self._unfinished_ids = deque()
 
-    @defer.inlineCallbacks
     def get_next(self, store):
         """
         Usage:
             with yield stream_id_gen.get_next as stream_id:
                 # ... persist event ...
         """
-        if not self._current_max:
-            yield store.runInteraction(
-                "_compute_current_max",
-                self._get_or_compute_current_max,
-            )
-
         with self._lock:
             self._current_max += 1
             next_id = self._current_max
@@ -108,21 +104,14 @@ class StreamIdGenerator(object):
                 with self._lock:
                     self._unfinished_ids.remove(next_id)
 
-        defer.returnValue(manager())
+        return manager()
 
-    @defer.inlineCallbacks
     def get_next_mult(self, store, n):
         """
         Usage:
             with yield stream_id_gen.get_next(store, n) as stream_ids:
                 # ... persist events ...
         """
-        if not self._current_max:
-            yield store.runInteraction(
-                "_compute_current_max",
-                self._get_or_compute_current_max,
-            )
-
         with self._lock:
             next_ids = range(self._current_max + 1, self._current_max + n + 1)
             self._current_max += n
@@ -139,24 +128,17 @@ class StreamIdGenerator(object):
                     for next_id in next_ids:
                         self._unfinished_ids.remove(next_id)
 
-        defer.returnValue(manager())
+        return manager()
 
-    @defer.inlineCallbacks
     def get_max_token(self, store):
         """Returns the maximum stream id such that all stream ids less than or
         equal to it have been successfully persisted.
         """
-        if not self._current_max:
-            yield store.runInteraction(
-                "_compute_current_max",
-                self._get_or_compute_current_max,
-            )
-
         with self._lock:
             if self._unfinished_ids:
-                defer.returnValue(self._unfinished_ids[0] - 1)
+                return self._unfinished_ids[0] - 1
 
-            defer.returnValue(self._current_max)
+            return self._current_max
 
     def _get_or_compute_current_max(self, txn):
         with self._lock: