diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index eb88842308..ee2153737d 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -45,9 +45,10 @@ from .search import SearchStore
from .tags import TagsStore
from .account_data import AccountDataStore
-
from util.id_generators import IdGenerator, StreamIdGenerator
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
import logging
@@ -84,6 +85,7 @@ class DataStore(RoomMemberStore, RoomStore,
def __init__(self, db_conn, hs):
self.hs = hs
+ self.database_engine = hs.database_engine
cur = db_conn.cursor()
try:
@@ -117,8 +119,57 @@ class DataStore(RoomMemberStore, RoomStore,
self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
+ events_max = self._stream_id_gen.get_max_token(None)
+ event_cache_prefill, min_event_val = self._get_cache_dict(
+ db_conn, "events",
+ entity_column="room_id",
+ stream_column="stream_ordering",
+ max_value=events_max,
+ )
+ self._events_stream_cache = StreamChangeCache(
+ "EventsRoomStreamChangeCache", min_event_val,
+ prefilled_cache=event_cache_prefill,
+ )
+
+ account_max = self._account_data_id_gen.get_max_token(None)
+ self._account_data_stream_cache = StreamChangeCache(
+ "AccountDataAndTagsChangeCache", account_max,
+ )
+
super(DataStore, self).__init__(hs)
+ def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value):
+ # Fetch a mapping of room_id -> max stream position for "recent" rooms.
+ # It doesn't really matter how many we get, the StreamChangeCache will
+ # do the right thing to ensure it respects the max size of cache.
+ sql = (
+ "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
+ " WHERE %(stream)s > ? - 100000"
+ " GROUP BY %(entity)s"
+ ) % {
+ "table": table,
+ "entity": entity_column,
+ "stream": stream_column,
+ }
+
+ sql = self.database_engine.convert_param_style(sql)
+
+ txn = db_conn.cursor()
+ txn.execute(sql, (int(max_value),))
+ rows = txn.fetchall()
+
+ cache = {
+ row[0]: int(row[1])
+ for row in rows
+ }
+
+ if cache:
+ min_val = min(cache.values())
+ else:
+ min_val = max_value
+
+ return cache, min_val
+
@defer.inlineCallbacks
def insert_client_ip(self, user, access_token, ip, user_agent):
now = int(self._clock.time_msec())
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index ed6587429b..625d062eb1 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -14,7 +14,6 @@
# limitations under the License.
from ._base import SQLBaseStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
from twisted.internet import defer
import ujson as json
@@ -24,14 +23,6 @@ logger = logging.getLogger(__name__)
class AccountDataStore(SQLBaseStore):
- def __init__(self, hs):
- super(AccountDataStore, self).__init__(hs)
-
- self._account_data_stream_cache = StreamChangeCache(
- "AccountDataAndTagsChangeCache",
- self._account_data_id_gen.get_max_token(None),
- max_size=10000,
- )
def get_account_data_for_user(self, user_id):
"""Get all the client account_data for a user.
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 6e81d46c60..e245d2f914 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -37,7 +37,6 @@ from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
-from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
from synapse.util.logutils import log_function
@@ -78,13 +77,6 @@ def upper_bound(token):
class StreamStore(SQLBaseStore):
- def __init__(self, hs):
- super(StreamStore, self).__init__(hs)
-
- self._events_stream_cache = StreamChangeCache(
- "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None)
- )
-
@defer.inlineCallbacks
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
# NB this lives here instead of appservice.py so we can reuse the
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index c673b1bdfc..891cb619fa 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -32,7 +32,7 @@ class StreamChangeCache(object):
entities that may have changed since that position. If position key is too
old then the cache will simply return all given entities.
"""
- def __init__(self, name, current_stream_pos, max_size=10000):
+ def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
self._max_size = max_size
self._entity_to_key = {}
self._cache = sorteddict()
@@ -40,6 +40,9 @@ class StreamChangeCache(object):
self.name = name
caches_by_name[self.name] = self._cache
+ for entity, stream_pos in prefilled_cache.items():
+ self.entity_has_changed(entity, stream_pos)
+
def has_entity_changed(self, entity, stream_pos):
"""Returns True if the entity may have been updated since stream_pos
"""
|