diff options
author | Erik Johnston <erik@matrix.org> | 2016-01-29 15:52:42 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-01-29 15:52:42 +0000 |
commit | c1de91aca466bf8e5e711c2042535505bb02a08d (patch) | |
tree | 370b9c0a378d168511d132919e381d1f97133463 | |
parent | Merge pull request #541 from matrix-org/erikj/fixsomeofpush (diff) | |
parent | Comment. Remove superfluous order by (diff) | |
download | synapse-c1de91aca466bf8e5e711c2042535505bb02a08d.tar.xz |
Merge pull request #540 from matrix-org/erikj/sync
Prefill stream change caches
-rw-r--r-- | synapse/storage/__init__.py | 53 | ||||
-rw-r--r-- | synapse/storage/account_data.py | 9 | ||||
-rw-r--r-- | synapse/storage/stream.py | 8 | ||||
-rw-r--r-- | synapse/util/caches/stream_change_cache.py | 5 |
4 files changed, 56 insertions, 19 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index eb88842308..ee2153737d 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -45,9 +45,10 @@ from .search import SearchStore from .tags import TagsStore from .account_data import AccountDataStore - from util.id_generators import IdGenerator, StreamIdGenerator +from synapse.util.caches.stream_change_cache import StreamChangeCache + import logging @@ -84,6 +85,7 @@ class DataStore(RoomMemberStore, RoomStore, def __init__(self, db_conn, hs): self.hs = hs + self.database_engine = hs.database_engine cur = db_conn.cursor() try: @@ -117,8 +119,57 @@ class DataStore(RoomMemberStore, RoomStore, self._push_rule_id_gen = IdGenerator("push_rules", "id", self) self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self) + events_max = self._stream_id_gen.get_max_token(None) + event_cache_prefill, min_event_val = self._get_cache_dict( + db_conn, "events", + entity_column="room_id", + stream_column="stream_ordering", + max_value=events_max, + ) + self._events_stream_cache = StreamChangeCache( + "EventsRoomStreamChangeCache", min_event_val, + prefilled_cache=event_cache_prefill, + ) + + account_max = self._account_data_id_gen.get_max_token(None) + self._account_data_stream_cache = StreamChangeCache( + "AccountDataAndTagsChangeCache", account_max, + ) + super(DataStore, self).__init__(hs) + def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value): + # Fetch a mapping of room_id -> max stream position for "recent" rooms. + # It doesn't really matter how many we get, the StreamChangeCache will + # do the right thing to ensure it respects the max size of cache. + sql = ( + "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s" + " WHERE %(stream)s > ? - 100000" + " GROUP BY %(entity)s" + ) % { + "table": table, + "entity": entity_column, + "stream": stream_column, + } + + sql = self.database_engine.convert_param_style(sql) + + txn = db_conn.cursor() + txn.execute(sql, (int(max_value),)) + rows = txn.fetchall() + + cache = { + row[0]: int(row[1]) + for row in rows + } + + if cache: + min_val = min(cache.values()) + else: + min_val = max_value + + return cache, min_val + @defer.inlineCallbacks def insert_client_ip(self, user, access_token, ip, user_agent): now = int(self._clock.time_msec()) diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index ed6587429b..625d062eb1 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -14,7 +14,6 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.stream_change_cache import StreamChangeCache from twisted.internet import defer import ujson as json @@ -24,14 +23,6 @@ logger = logging.getLogger(__name__) class AccountDataStore(SQLBaseStore): - def __init__(self, hs): - super(AccountDataStore, self).__init__(hs) - - self._account_data_stream_cache = StreamChangeCache( - "AccountDataAndTagsChangeCache", - self._account_data_id_gen.get_max_token(None), - max_size=10000, - ) def get_account_data_for_user(self, user_id): """Get all the client account_data for a user. diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 6e81d46c60..e245d2f914 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -37,7 +37,6 @@ from twisted.internet import defer from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks -from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logutils import log_function @@ -78,13 +77,6 @@ def upper_bound(token): class StreamStore(SQLBaseStore): - def __init__(self, hs): - super(StreamStore, self).__init__(hs) - - self._events_stream_cache = StreamChangeCache( - "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None) - ) - @defer.inlineCallbacks def get_appservice_room_stream(self, service, from_key, to_key, limit=0): # NB this lives here instead of appservice.py so we can reuse the diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index c673b1bdfc..891cb619fa 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -32,7 +32,7 @@ class StreamChangeCache(object): entities that may have changed since that position. If position key is too old then the cache will simply return all given entities. """ - def __init__(self, name, current_stream_pos, max_size=10000): + def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}): self._max_size = max_size self._entity_to_key = {} self._cache = sorteddict() @@ -40,6 +40,9 @@ class StreamChangeCache(object): self.name = name caches_by_name[self.name] = self._cache + for entity, stream_pos in prefilled_cache.items(): + self.entity_has_changed(entity, stream_pos) + def has_entity_changed(self, entity, stream_pos): """Returns True if the entity may have been updated since stream_pos """ |