diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index eb88842308..95ae97d507 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -45,9 +45,10 @@ from .search import SearchStore
from .tags import TagsStore
from .account_data import AccountDataStore
-
from util.id_generators import IdGenerator, StreamIdGenerator
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
import logging
@@ -117,8 +118,54 @@ class DataStore(RoomMemberStore, RoomStore,
self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
+ events_max = self._stream_id_gen.get_max_token(None)
+ event_cache_prefill = self._get_cache_dict(
+ db_conn, "events",
+ entity_column="room_id",
+ stream_column="stream_ordering",
+ max_value=events_max,
+ )
+ self._events_stream_cache = StreamChangeCache(
+ "EventsRoomStreamChangeCache", events_max,
+ prefilled_cache=event_cache_prefill,
+ )
+
+ account_max = self._account_data_id_gen.get_max_token(None)
+ account_cache_prefill = self._get_cache_dict(
+ db_conn, "account_data",
+ entity_column="user_id",
+ stream_column="stream_id",
+ max_value=account_max,
+ )
+ self._account_data_stream_cache = StreamChangeCache(
+ "AccountDataAndTagsChangeCache", account_max,
+ prefilled_cache=account_cache_prefill,
+ )
+
super(DataStore, self).__init__(hs)
+ def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value):
+ sql = (
+ "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
+ " WHERE %(stream)s > max(? - 100000, 0)"
+ " GROUP BY %(entity)s"
+ " ORDER BY MAX(%(stream)s) DESC"
+ " LIMIT 10000"
+ ) % {
+ "table": table,
+ "entity": entity_column,
+ "stream": stream_column,
+ }
+
+ txn = db_conn.cursor()
+ txn.execute(sql, (int(max_value),))
+ rows = txn.fetchall()
+
+ return {
+ row[0]: row[1]
+ for row in rows
+ }
+
@defer.inlineCallbacks
def insert_client_ip(self, user, access_token, ip, user_agent):
now = int(self._clock.time_msec())
|