diff options
author | Erik Johnston <erik@matrix.org> | 2015-09-03 09:54:08 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-09-03 09:54:08 +0100 |
commit | efeeff29f66f14fd4744413a3fd211d01e6c302a (patch) | |
tree | db5be4d096fdd045623017abffc7b67806c55c32 /synapse/storage | |
parent | hacky support for video for FS CC DD (diff) | |
parent | Change log level to info (diff) | |
download | synapse-efeeff29f66f14fd4744413a3fd211d01e6c302a.tar.xz |
Merge branch 'release-v0.10.0' v0.10.0
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 16 | ||||
-rw-r--r-- | synapse/storage/_base.py | 165 | ||||
-rw-r--r-- | synapse/storage/directory.py | 7 | ||||
-rw-r--r-- | synapse/storage/end_to_end_keys.py | 125 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 141 | ||||
-rw-r--r-- | synapse/storage/events.py | 420 | ||||
-rw-r--r-- | synapse/storage/keys.py | 48 | ||||
-rw-r--r-- | synapse/storage/presence.py | 37 | ||||
-rw-r--r-- | synapse/storage/push_rule.py | 25 | ||||
-rw-r--r-- | synapse/storage/receipts.py | 406 | ||||
-rw-r--r-- | synapse/storage/registration.py | 31 | ||||
-rw-r--r-- | synapse/storage/room.py | 6 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 51 | ||||
-rw-r--r-- | synapse/storage/schema/delta/21/end_to_end_keys.sql | 34 | ||||
-rw-r--r-- | synapse/storage/schema/delta/21/receipts.sql | 38 | ||||
-rw-r--r-- | synapse/storage/schema/delta/22/receipts_index.sql | 18 | ||||
-rw-r--r-- | synapse/storage/schema/delta/22/user_threepids_unique.sql | 19 | ||||
-rw-r--r-- | synapse/storage/signatures.py | 28 | ||||
-rw-r--r-- | synapse/storage/state.py | 357 | ||||
-rw-r--r-- | synapse/storage/stream.py | 6 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 3 | ||||
-rw-r--r-- | synapse/storage/util/id_generators.py | 38 |
22 files changed, 1466 insertions, 553 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index c137f47820..f154b1c8ae 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -37,6 +37,9 @@ from .rejections import RejectionsStore from .state import StateStore from .signatures import SignatureStore from .filtering import FilteringStore +from .end_to_end_keys import EndToEndKeyStore + +from .receipts import ReceiptsStore import fnmatch @@ -51,7 +54,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 20 +SCHEMA_VERSION = 22 dir_path = os.path.abspath(os.path.dirname(__file__)) @@ -74,6 +77,8 @@ class DataStore(RoomMemberStore, RoomStore, PushRuleStore, ApplicationServiceTransactionStore, EventsStore, + ReceiptsStore, + EndToEndKeyStore, ): def __init__(self, hs): @@ -94,7 +99,7 @@ class DataStore(RoomMemberStore, RoomStore, key = (user.to_string(), access_token, device_id, ip) try: - last_seen = self.client_ip_last_seen.get(*key) + last_seen = self.client_ip_last_seen.get(key) except KeyError: last_seen = None @@ -102,7 +107,7 @@ class DataStore(RoomMemberStore, RoomStore, if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: defer.returnValue(None) - self.client_ip_last_seen.prefill(*key + (now,)) + self.client_ip_last_seen.prefill(key, now) # It's safe not to lock here: a) no unique constraint, # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely @@ -349,6 +354,11 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, ) logger.debug("Running script %s", relative_path) module.run_upgrade(cur, database_engine) + elif ext == ".pyc": + # Sometimes .pyc files turn up anyway even though we've + # disabled their generation; e.g. from distribution package + # installers. Silently skip it + pass elif ext == ".sql": # A plain old .sql file, just read and execute it logger.debug("Applying schema %s", relative_path) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 8d33def6c6..d976e17786 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -17,21 +17,20 @@ import logging from synapse.api.errors import StoreError from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn, LoggingContext -from synapse.util.lrucache import LruCache +from synapse.util.caches.dictionary_cache import DictionaryCache +from synapse.util.caches.descriptors import Cache import synapse.metrics from util.id_generators import IdGenerator, StreamIdGenerator from twisted.internet import defer -from collections import namedtuple, OrderedDict +from collections import namedtuple -import functools import sys import time import threading -DEBUG_CACHES = False logger = logging.getLogger(__name__) @@ -47,159 +46,6 @@ sql_scheduling_timer = metrics.register_distribution("schedule_time") sql_query_timer = metrics.register_distribution("query_time", labels=["verb"]) sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"]) -caches_by_name = {} -cache_counter = metrics.register_cache( - "cache", - lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, - labels=["name"], -) - - -class Cache(object): - - def __init__(self, name, max_entries=1000, keylen=1, lru=False): - if lru: - self.cache = LruCache(max_size=max_entries) - self.max_entries = None - else: - self.cache = OrderedDict() - self.max_entries = max_entries - - self.name = name - self.keylen = keylen - self.sequence = 0 - self.thread = None - caches_by_name[name] = self.cache - - def check_thread(self): - expected_thread = self.thread - if expected_thread is None: - self.thread = threading.current_thread() - else: - if expected_thread is not threading.current_thread(): - raise ValueError( - "Cache objects can only be accessed from the main thread" - ) - - def get(self, *keyargs): - if len(keyargs) != self.keylen: - raise ValueError("Expected a key to have %d items", self.keylen) - - if keyargs in self.cache: - cache_counter.inc_hits(self.name) - return self.cache[keyargs] - - cache_counter.inc_misses(self.name) - raise KeyError() - - def update(self, sequence, *args): - self.check_thread() - if self.sequence == sequence: - # Only update the cache if the caches sequence number matches the - # number that the cache had before the SELECT was started (SYN-369) - self.prefill(*args) - - def prefill(self, *args): # because I can't *keyargs, value - keyargs = args[:-1] - value = args[-1] - - if len(keyargs) != self.keylen: - raise ValueError("Expected a key to have %d items", self.keylen) - - if self.max_entries is not None: - while len(self.cache) >= self.max_entries: - self.cache.popitem(last=False) - - self.cache[keyargs] = value - - def invalidate(self, *keyargs): - self.check_thread() - if len(keyargs) != self.keylen: - raise ValueError("Expected a key to have %d items", self.keylen) - # Increment the sequence number so that any SELECT statements that - # raced with the INSERT don't update the cache (SYN-369) - self.sequence += 1 - self.cache.pop(keyargs, None) - - def invalidate_all(self): - self.check_thread() - self.sequence += 1 - self.cache.clear() - - -class CacheDescriptor(object): - """ A method decorator that applies a memoizing cache around the function. - - The function is presumed to take zero or more arguments, which are used in - a tuple as the key for the cache. Hits are served directly from the cache; - misses use the function body to generate the value. - - The wrapped function has an additional member, a callable called - "invalidate". This can be used to remove individual entries from the cache. - - The wrapped function has another additional callable, called "prefill", - which can be used to insert values into the cache specifically, without - calling the calculation function. - """ - def __init__(self, orig, max_entries=1000, num_args=1, lru=False): - self.orig = orig - - self.max_entries = max_entries - self.num_args = num_args - self.lru = lru - - def __get__(self, obj, objtype=None): - cache = Cache( - name=self.orig.__name__, - max_entries=self.max_entries, - keylen=self.num_args, - lru=self.lru, - ) - - @functools.wraps(self.orig) - @defer.inlineCallbacks - def wrapped(*keyargs): - try: - cached_result = cache.get(*keyargs[:self.num_args]) - if DEBUG_CACHES: - actual_result = yield self.orig(obj, *keyargs) - if actual_result != cached_result: - logger.error( - "Stale cache entry %s%r: cached: %r, actual %r", - self.orig.__name__, keyargs, - cached_result, actual_result, - ) - raise ValueError("Stale cache entry") - defer.returnValue(cached_result) - except KeyError: - # Get the sequence number of the cache before reading from the - # database so that we can tell if the cache is invalidated - # while the SELECT is executing (SYN-369) - sequence = cache.sequence - - ret = yield self.orig(obj, *keyargs) - - cache.update(sequence, *keyargs[:self.num_args] + (ret,)) - - defer.returnValue(ret) - - wrapped.invalidate = cache.invalidate - wrapped.invalidate_all = cache.invalidate_all - wrapped.prefill = cache.prefill - - obj.__dict__[self.orig.__name__] = wrapped - - return wrapped - - -def cached(max_entries=1000, num_args=1, lru=False): - return lambda orig: CacheDescriptor( - orig, - max_entries=max_entries, - num_args=num_args, - lru=lru - ) - class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object @@ -321,6 +167,8 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) + self._state_group_cache = DictionaryCache("*stateGroupCache*", 2000) + self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 @@ -329,13 +177,14 @@ class SQLBaseStore(object): self.database_engine = hs.database_engine - self._stream_id_gen = StreamIdGenerator() + self._stream_id_gen = StreamIdGenerator("events", "stream_ordering") self._transaction_id_gen = IdGenerator("sent_transactions", "id", self) self._state_groups_id_gen = IdGenerator("state_groups", "id", self) self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self) self._pushers_id_gen = IdGenerator("pushers", "id", self) self._push_rule_id_gen = IdGenerator("push_rules", "id", self) self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self) + self._receipts_id_gen = StreamIdGenerator("receipts_linearized", "stream_id") def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 2b2bdf8615..d92028ea43 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -13,7 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached from synapse.api.errors import SynapseError @@ -104,7 +105,7 @@ class DirectoryStore(SQLBaseStore): }, desc="create_room_alias_association", ) - self.get_aliases_for_room.invalidate(room_id) + self.get_aliases_for_room.invalidate((room_id,)) @defer.inlineCallbacks def delete_room_alias(self, room_alias): @@ -114,7 +115,7 @@ class DirectoryStore(SQLBaseStore): room_alias, ) - self.get_aliases_for_room.invalidate(room_id) + self.get_aliases_for_room.invalidate((room_id,)) defer.returnValue(room_id) def _delete_room_alias_txn(self, txn, room_alias): diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py new file mode 100644 index 0000000000..325740d7d0 --- /dev/null +++ b/synapse/storage/end_to_end_keys.py @@ -0,0 +1,125 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from _base import SQLBaseStore + + +class EndToEndKeyStore(SQLBaseStore): + def set_e2e_device_keys(self, user_id, device_id, time_now, json_bytes): + return self._simple_upsert( + table="e2e_device_keys_json", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + values={ + "ts_added_ms": time_now, + "key_json": json_bytes, + } + ) + + def get_e2e_device_keys(self, query_list): + """Fetch a list of device keys. + Args: + query_list(list): List of pairs of user_ids and device_ids. + Returns: + Dict mapping from user-id to dict mapping from device_id to + key json byte strings. + """ + def _get_e2e_device_keys(txn): + result = {} + for user_id, device_id in query_list: + user_result = result.setdefault(user_id, {}) + keyvalues = {"user_id": user_id} + if device_id: + keyvalues["device_id"] = device_id + rows = self._simple_select_list_txn( + txn, table="e2e_device_keys_json", + keyvalues=keyvalues, + retcols=["device_id", "key_json"] + ) + for row in rows: + user_result[row["device_id"]] = row["key_json"] + return result + return self.runInteraction("get_e2e_device_keys", _get_e2e_device_keys) + + def add_e2e_one_time_keys(self, user_id, device_id, time_now, key_list): + def _add_e2e_one_time_keys(txn): + for (algorithm, key_id, json_bytes) in key_list: + self._simple_upsert_txn( + txn, table="e2e_one_time_keys_json", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + "algorithm": algorithm, + "key_id": key_id, + }, + values={ + "ts_added_ms": time_now, + "key_json": json_bytes, + } + ) + return self.runInteraction( + "add_e2e_one_time_keys", _add_e2e_one_time_keys + ) + + def count_e2e_one_time_keys(self, user_id, device_id): + """ Count the number of one time keys the server has for a device + Returns: + Dict mapping from algorithm to number of keys for that algorithm. + """ + def _count_e2e_one_time_keys(txn): + sql = ( + "SELECT algorithm, COUNT(key_id) FROM e2e_one_time_keys_json" + " WHERE user_id = ? AND device_id = ?" + " GROUP BY algorithm" + ) + txn.execute(sql, (user_id, device_id)) + result = {} + for algorithm, key_count in txn.fetchall(): + result[algorithm] = key_count + return result + return self.runInteraction( + "count_e2e_one_time_keys", _count_e2e_one_time_keys + ) + + def claim_e2e_one_time_keys(self, query_list): + """Take a list of one time keys out of the database""" + def _claim_e2e_one_time_keys(txn): + sql = ( + "SELECT key_id, key_json FROM e2e_one_time_keys_json" + " WHERE user_id = ? AND device_id = ? AND algorithm = ?" + " LIMIT 1" + ) + result = {} + delete = [] + for user_id, device_id, algorithm in query_list: + user_result = result.setdefault(user_id, {}) + device_result = user_result.setdefault(device_id, {}) + txn.execute(sql, (user_id, device_id, algorithm)) + for key_id, key_json in txn.fetchall(): + device_result[algorithm + ":" + key_id] = key_json + delete.append((user_id, device_id, algorithm, key_id)) + sql = ( + "DELETE FROM e2e_one_time_keys_json" + " WHERE user_id = ? AND device_id = ? AND algorithm = ?" + " AND key_id = ?" + ) + for user_id, device_id, algorithm, key_id in delete: + txn.execute(sql, (user_id, device_id, algorithm, key_id)) + return result + return self.runInteraction( + "claim_e2e_one_time_keys", _claim_e2e_one_time_keys + ) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 1ba073884b..dda3027b61 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -15,7 +15,8 @@ from twisted.internet import defer -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached from syutil.base64util import encode_base64 import logging @@ -49,14 +50,22 @@ class EventFederationStore(SQLBaseStore): results = set() base_sql = ( - "SELECT auth_id FROM event_auth WHERE event_id = ?" + "SELECT auth_id FROM event_auth WHERE event_id IN (%s)" ) front = set(event_ids) while front: new_front = set() - for f in front: - txn.execute(base_sql, (f,)) + front_list = list(front) + chunks = [ + front_list[x:x+100] + for x in xrange(0, len(front), 100) + ] + for chunk in chunks: + txn.execute( + base_sql % (",".join(["?"] * len(chunk)),), + chunk + ) new_front.update([r[0] for r in txn.fetchall()]) new_front -= results @@ -274,8 +283,7 @@ class EventFederationStore(SQLBaseStore): }, ) - def _handle_prev_events(self, txn, outlier, event_id, prev_events, - room_id): + def _handle_mult_prev_events(self, txn, events): """ For the given event, update the event edges table and forward and backward extremities tables. @@ -285,70 +293,83 @@ class EventFederationStore(SQLBaseStore): table="event_edges", values=[ { - "event_id": event_id, + "event_id": ev.event_id, "prev_event_id": e_id, - "room_id": room_id, + "room_id": ev.room_id, "is_state": False, } - for e_id, _ in prev_events + for ev in events + for e_id, _ in ev.prev_events ], ) - # Update the extremities table if this is not an outlier. - if not outlier: - for e_id, _ in prev_events: - # TODO (erikj): This could be done as a bulk insert - self._simple_delete_txn( - txn, - table="event_forward_extremities", - keyvalues={ - "event_id": e_id, - "room_id": room_id, - } - ) + events_by_room = {} + for ev in events: + events_by_room.setdefault(ev.room_id, []).append(ev) - # We only insert as a forward extremity the new event if there are - # no other events that reference it as a prev event - query = ( - "SELECT 1 FROM event_edges WHERE prev_event_id = ?" - ) + for room_id, room_events in events_by_room.items(): + prevs = [ + e_id for ev in room_events for e_id, _ in ev.prev_events + if not ev.internal_metadata.is_outlier() + ] + if prevs: + txn.execute( + "DELETE FROM event_forward_extremities" + " WHERE room_id = ?" + " AND event_id in (%s)" % ( + ",".join(["?"] * len(prevs)), + ), + [room_id] + prevs, + ) - txn.execute(query, (event_id,)) + query = ( + "INSERT INTO event_forward_extremities (event_id, room_id)" + " SELECT ?, ? WHERE NOT EXISTS (" + " SELECT 1 FROM event_edges WHERE prev_event_id = ?" + " )" + ) - if not txn.fetchone(): - query = ( - "INSERT INTO event_forward_extremities" - " (event_id, room_id)" - " VALUES (?, ?)" - ) + txn.executemany( + query, + [ + (ev.event_id, ev.room_id, ev.event_id) for ev in events + if not ev.internal_metadata.is_outlier() + ] + ) - txn.execute(query, (event_id, room_id)) - - query = ( - "INSERT INTO event_backward_extremities (event_id, room_id)" - " SELECT ?, ? WHERE NOT EXISTS (" - " SELECT 1 FROM event_backward_extremities" - " WHERE event_id = ? AND room_id = ?" - " )" - " AND NOT EXISTS (" - " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? " - " AND outlier = ?" - " )" - ) + query = ( + "INSERT INTO event_backward_extremities (event_id, room_id)" + " SELECT ?, ? WHERE NOT EXISTS (" + " SELECT 1 FROM event_backward_extremities" + " WHERE event_id = ? AND room_id = ?" + " )" + " AND NOT EXISTS (" + " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? " + " AND outlier = ?" + " )" + ) - txn.executemany(query, [ - (e_id, room_id, e_id, room_id, e_id, room_id, False) - for e_id, _ in prev_events - ]) + txn.executemany(query, [ + (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False) + for ev in events for e_id, _ in ev.prev_events + if not ev.internal_metadata.is_outlier() + ]) - query = ( - "DELETE FROM event_backward_extremities" - " WHERE event_id = ? AND room_id = ?" - ) - txn.execute(query, (event_id, room_id)) + query = ( + "DELETE FROM event_backward_extremities" + " WHERE event_id = ? AND room_id = ?" + ) + txn.executemany( + query, + [ + (ev.event_id, ev.room_id) for ev in events + if not ev.internal_metadata.is_outlier() + ] + ) + for room_id in events_by_room: txn.call_after( - self.get_latest_event_ids_in_room.invalidate, room_id + self.get_latest_event_ids_in_room.invalidate, (room_id,) ) def get_backfill_events(self, room_id, event_list, limit): @@ -400,10 +421,12 @@ class EventFederationStore(SQLBaseStore): keyvalues={ "event_id": event_id, }, - retcol="depth" + retcol="depth", + allow_none=True, ) - queue.put((-depth, event_id)) + if depth: + queue.put((-depth, event_id)) while not queue.empty() and len(event_results) < limit: try: @@ -489,4 +512,4 @@ class EventFederationStore(SQLBaseStore): query = "DELETE FROM event_forward_extremities WHERE room_id = ?" txn.execute(query, (room_id,)) - txn.call_after(self.get_latest_event_ids_in_room.invalidate, room_id) + txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2caf0aae80..e3eabab13d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -23,9 +23,7 @@ from synapse.events.utils import prune_event from synapse.util.logcontext import preserve_context_over_deferred from synapse.util.logutils import log_function from synapse.api.constants import EventTypes -from synapse.crypto.event_signing import compute_event_reference_hash -from syutil.base64util import decode_base64 from syutil.jsonutil import encode_json from contextlib import contextmanager @@ -47,6 +45,48 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events class EventsStore(SQLBaseStore): @defer.inlineCallbacks + def persist_events(self, events_and_contexts, backfilled=False, + is_new_state=True): + if not events_and_contexts: + return + + if backfilled: + if not self.min_token_deferred.called: + yield self.min_token_deferred + start = self.min_token - 1 + self.min_token -= len(events_and_contexts) + 1 + stream_orderings = range(start, self.min_token, -1) + + @contextmanager + def stream_ordering_manager(): + yield stream_orderings + stream_ordering_manager = stream_ordering_manager() + else: + stream_ordering_manager = yield self._stream_id_gen.get_next_mult( + self, len(events_and_contexts) + ) + + with stream_ordering_manager as stream_orderings: + for (event, _), stream in zip(events_and_contexts, stream_orderings): + event.internal_metadata.stream_ordering = stream + + chunks = [ + events_and_contexts[x:x+100] + for x in xrange(0, len(events_and_contexts), 100) + ] + + for chunk in chunks: + # We can't easily parallelize these since different chunks + # might contain the same event. :( + yield self.runInteraction( + "persist_events", + self._persist_events_txn, + events_and_contexts=chunk, + backfilled=backfilled, + is_new_state=is_new_state, + ) + + @defer.inlineCallbacks @log_function def persist_event(self, event, context, backfilled=False, is_new_state=True, current_state=None): @@ -67,13 +107,13 @@ class EventsStore(SQLBaseStore): try: with stream_ordering_manager as stream_ordering: + event.internal_metadata.stream_ordering = stream_ordering yield self.runInteraction( "persist_event", self._persist_event_txn, event=event, context=context, backfilled=backfilled, - stream_ordering=stream_ordering, is_new_state=is_new_state, current_state=current_state, ) @@ -116,19 +156,14 @@ class EventsStore(SQLBaseStore): @log_function def _persist_event_txn(self, txn, event, context, backfilled, - stream_ordering=None, is_new_state=True, - current_state=None): - - # Remove the any existing cache entries for the event_id - txn.call_after(self._invalidate_get_event_cache, event.event_id) - + is_new_state=True, current_state=None): # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: txn.call_after(self.get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) - txn.call_after(self.get_users_in_room.invalidate, event.room_id) - txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id) + txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) + txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_room_name_and_aliases, event.room_id) self._simple_delete_txn( @@ -149,37 +184,78 @@ class EventsStore(SQLBaseStore): } ) - outlier = event.internal_metadata.is_outlier() + return self._persist_events_txn( + txn, + [(event, context)], + backfilled=backfilled, + is_new_state=is_new_state, + ) - if not outlier: - self._update_min_depth_for_room_txn( - txn, - event.room_id, - event.depth + @log_function + def _persist_events_txn(self, txn, events_and_contexts, backfilled, + is_new_state=True): + + # Remove the any existing cache entries for the event_ids + for event, _ in events_and_contexts: + txn.call_after(self._invalidate_get_event_cache, event.event_id) + + depth_updates = {} + for event, _ in events_and_contexts: + if event.internal_metadata.is_outlier(): + continue + depth_updates[event.room_id] = max( + event.depth, depth_updates.get(event.room_id, event.depth) ) - have_persisted = self._simple_select_one_txn( - txn, - table="events", - keyvalues={"event_id": event.event_id}, - retcols=["event_id", "outlier"], - allow_none=True, + for room_id, depth in depth_updates.items(): + self._update_min_depth_for_room_txn(txn, room_id, depth) + + txn.execute( + "SELECT event_id, outlier FROM events WHERE event_id in (%s)" % ( + ",".join(["?"] * len(events_and_contexts)), + ), + [event.event_id for event, _ in events_and_contexts] ) + have_persisted = { + event_id: outlier + for event_id, outlier in txn.fetchall() + } + + event_map = {} + to_remove = set() + for event, context in events_and_contexts: + # Handle the case of the list including the same event multiple + # times. The tricky thing here is when they differ by whether + # they are an outlier. + if event.event_id in event_map: + other = event_map[event.event_id] + + if not other.internal_metadata.is_outlier(): + to_remove.add(event) + continue + elif not event.internal_metadata.is_outlier(): + to_remove.add(event) + continue + else: + to_remove.add(other) - metadata_json = encode_json( - event.internal_metadata.get_dict(), - using_frozen_dicts=USE_FROZEN_DICTS - ).decode("UTF-8") - - # If we have already persisted this event, we don't need to do any - # more processing. - # The processing above must be done on every call to persist event, - # since they might not have happened on previous calls. For example, - # if we are persisting an event that we had persisted as an outlier, - # but is no longer one. - if have_persisted: - if not outlier and have_persisted["outlier"]: - self._store_state_groups_txn(txn, event, context) + event_map[event.event_id] = event + + if event.event_id not in have_persisted: + continue + + to_remove.add(event) + + outlier_persisted = have_persisted[event.event_id] + if not event.internal_metadata.is_outlier() and outlier_persisted: + self._store_state_groups_txn( + txn, event, context, + ) + + metadata_json = encode_json( + event.internal_metadata.get_dict(), + using_frozen_dicts=USE_FROZEN_DICTS + ).decode("UTF-8") sql = ( "UPDATE event_json SET internal_metadata = ?" @@ -198,94 +274,91 @@ class EventsStore(SQLBaseStore): sql, (False, event.event_id,) ) - return - - if not outlier: - self._store_state_groups_txn(txn, event, context) - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, + events_and_contexts = filter( + lambda ec: ec[0] not in to_remove, + events_and_contexts ) - if event.type == EventTypes.Member: - self._store_room_member_txn(txn, event) - elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, event) - elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, event) - elif event.type == EventTypes.Redaction: - self._store_redaction(txn, event) - - event_dict = { - k: v - for k, v in event.get_dict().items() - if k not in [ - "redacted", - "redacted_because", - ] - } + if not events_and_contexts: + return - self._simple_insert_txn( + self._store_mult_state_groups_txn(txn, [ + (event, context) + for event, context in events_and_contexts + if not event.internal_metadata.is_outlier() + ]) + + self._handle_mult_prev_events( txn, - table="event_json", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "internal_metadata": metadata_json, - "json": encode_json( - event_dict, using_frozen_dicts=USE_FROZEN_DICTS - ).decode("UTF-8"), - }, + events=[event for event, _ in events_and_contexts], ) - content = encode_json( - event.content, using_frozen_dicts=USE_FROZEN_DICTS - ).decode("UTF-8") - - vals = { - "topological_ordering": event.depth, - "event_id": event.event_id, - "type": event.type, - "room_id": event.room_id, - "content": content, - "processed": True, - "outlier": outlier, - "depth": event.depth, - } + for event, _ in events_and_contexts: + if event.type == EventTypes.Name: + self._store_room_name_txn(txn, event) + elif event.type == EventTypes.Topic: + self._store_room_topic_txn(txn, event) + elif event.type == EventTypes.Redaction: + self._store_redaction(txn, event) - unrec = { - k: v - for k, v in event.get_dict().items() - if k not in vals.keys() and k not in [ - "redacted", - "redacted_because", - "signatures", - "hashes", - "prev_events", + self._store_room_members_txn( + txn, + [ + event + for event, _ in events_and_contexts + if event.type == EventTypes.Member ] - } + ) - vals["unrecognized_keys"] = encode_json( - unrec, using_frozen_dicts=USE_FROZEN_DICTS - ).decode("UTF-8") + def event_dict(event): + return { + k: v + for k, v in event.get_dict().items() + if k not in [ + "redacted", + "redacted_because", + ] + } - sql = ( - "INSERT INTO events" - " (stream_ordering, topological_ordering, event_id, type," - " room_id, content, processed, outlier, depth)" - " VALUES (?,?,?,?,?,?,?,?,?)" + self._simple_insert_many_txn( + txn, + table="event_json", + values=[ + { + "event_id": event.event_id, + "room_id": event.room_id, + "internal_metadata": encode_json( + event.internal_metadata.get_dict(), + using_frozen_dicts=USE_FROZEN_DICTS + ).decode("UTF-8"), + "json": encode_json( + event_dict(event), using_frozen_dicts=USE_FROZEN_DICTS + ).decode("UTF-8"), + } + for event, _ in events_and_contexts + ], ) - txn.execute( - sql, - ( - stream_ordering, event.depth, event.event_id, event.type, - event.room_id, content, True, outlier, event.depth - ) + self._simple_insert_many_txn( + txn, + table="events", + values=[ + { + "stream_ordering": event.internal_metadata.stream_ordering, + "topological_ordering": event.depth, + "depth": event.depth, + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "processed": True, + "outlier": event.internal_metadata.is_outlier(), + "content": encode_json( + event.content, using_frozen_dicts=USE_FROZEN_DICTS + ).decode("UTF-8"), + } + for event, _ in events_and_contexts + ], ) if context.rejected: @@ -293,20 +366,6 @@ class EventsStore(SQLBaseStore): txn, event.event_id, context.rejected ) - for hash_alg, hash_base64 in event.hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_event_content_hash_txn( - txn, event.event_id, hash_alg, hash_bytes, - ) - - for prev_event_id, prev_hashes in event.prev_events: - for alg, hash_base64 in prev_hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_prev_event_hash_txn( - txn, event.event_id, prev_event_id, alg, - hash_bytes - ) - self._simple_insert_many_txn( txn, table="event_auth", @@ -316,16 +375,22 @@ class EventsStore(SQLBaseStore): "room_id": event.room_id, "auth_id": auth_id, } + for event, _ in events_and_contexts for auth_id, _ in event.auth_events ], ) - (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) - self._store_event_reference_hash_txn( - txn, event.event_id, ref_alg, ref_hash_bytes + self._store_event_reference_hashes_txn( + txn, [event for event, _ in events_and_contexts] ) - if event.is_state(): + state_events_and_contexts = filter( + lambda i: i[0].is_state(), + events_and_contexts, + ) + + state_values = [] + for event, context in state_events_and_contexts: vals = { "event_id": event.event_id, "room_id": event.room_id, @@ -337,51 +402,55 @@ class EventsStore(SQLBaseStore): if hasattr(event, "replaces_state"): vals["prev_state"] = event.replaces_state - self._simple_insert_txn( - txn, - "state_events", - vals, - ) + state_values.append(vals) - self._simple_insert_many_txn( - txn, - table="event_edges", - values=[ - { - "event_id": event.event_id, - "prev_event_id": e_id, - "room_id": event.room_id, - "is_state": True, - } - for e_id, h in event.prev_state - ], - ) + self._simple_insert_many_txn( + txn, + table="state_events", + values=state_values, + ) - if is_new_state and not context.rejected: - txn.call_after( - self.get_current_state_for_key.invalidate, - event.room_id, event.type, event.state_key - ) + self._simple_insert_many_txn( + txn, + table="event_edges", + values=[ + { + "event_id": event.event_id, + "prev_event_id": prev_id, + "room_id": event.room_id, + "is_state": True, + } + for event, _ in state_events_and_contexts + for prev_id, _ in event.prev_state + ], + ) - if (event.type == EventTypes.Name - or event.type == EventTypes.Aliases): + if is_new_state: + for event, _ in state_events_and_contexts: + if not context.rejected: txn.call_after( - self.get_room_name_and_aliases.invalidate, - event.room_id + self.get_current_state_for_key.invalidate, + (event.room_id, event.type, event.state_key,) ) - self._simple_upsert_txn( - txn, - "current_state_events", - keyvalues={ - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - values={ - "event_id": event.event_id, - } - ) + if event.type in [EventTypes.Name, EventTypes.Aliases]: + txn.call_after( + self.get_room_name_and_aliases.invalidate, + (event.room_id,) + ) + + self._simple_upsert_txn( + txn, + "current_state_events", + keyvalues={ + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + values={ + "event_id": event.event_id, + } + ) return @@ -498,8 +567,9 @@ class EventsStore(SQLBaseStore): def _invalidate_get_event_cache(self, event_id): for check_redacted in (False, True): for get_prev_content in (False, True): - self._get_event_cache.invalidate(event_id, check_redacted, - get_prev_content) + self._get_event_cache.invalidate( + (event_id, check_redacted, get_prev_content) + ) def _get_event_txn(self, txn, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False): @@ -520,7 +590,7 @@ class EventsStore(SQLBaseStore): for event_id in events: try: ret = self._get_event_cache.get( - event_id, check_redacted, get_prev_content + (event_id, check_redacted, get_prev_content,) ) if allow_rejected or not ret.rejected_reason: @@ -741,6 +811,8 @@ class EventsStore(SQLBaseStore): ) if because: + # It's fine to do add the event directly, since get_pdu_json + # will serialise this field correctly ev.unsigned["redacted_because"] = because if get_prev_content and "replaces_state" in ev.unsigned: @@ -753,7 +825,7 @@ class EventsStore(SQLBaseStore): ev.unsigned["prev_content"] = prev.get_dict()["content"] self._get_event_cache.prefill( - ev.event_id, check_redacted, get_prev_content, ev + (ev.event_id, check_redacted, get_prev_content), ev ) defer.returnValue(ev) @@ -810,7 +882,7 @@ class EventsStore(SQLBaseStore): ev.unsigned["prev_content"] = prev.get_dict()["content"] self._get_event_cache.prefill( - ev.event_id, check_redacted, get_prev_content, ev + (ev.event_id, check_redacted, get_prev_content), ev ) return ev diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 5bdf497b93..ffd6daa880 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -14,6 +14,7 @@ # limitations under the License. from _base import SQLBaseStore +from synapse.util.caches.descriptors import cachedInlineCallbacks from twisted.internet import defer @@ -71,6 +72,24 @@ class KeyStore(SQLBaseStore): desc="store_server_certificate", ) + @cachedInlineCallbacks() + def get_all_server_verify_keys(self, server_name): + rows = yield self._simple_select_list( + table="server_signature_keys", + keyvalues={ + "server_name": server_name, + }, + retcols=["key_id", "verify_key"], + desc="get_all_server_verify_keys", + ) + + defer.returnValue({ + row["key_id"]: decode_verify_key_bytes( + row["key_id"], str(row["verify_key"]) + ) + for row in rows + }) + @defer.inlineCallbacks def get_server_verify_keys(self, server_name, key_ids): """Retrieve the NACL verification key for a given server for the given @@ -81,24 +100,14 @@ class KeyStore(SQLBaseStore): Returns: (list of VerifyKey): The verification keys. """ - sql = ( - "SELECT key_id, verify_key FROM server_signature_keys" - " WHERE server_name = ?" - " AND key_id in (" + ",".join("?" for key_id in key_ids) + ")" - ) - - rows = yield self._execute_and_decode( - "get_server_verify_keys", sql, server_name, *key_ids - ) - - keys = [] - for row in rows: - key_id = row["key_id"] - key_bytes = row["verify_key"] - key = decode_verify_key_bytes(key_id, str(key_bytes)) - keys.append(key) - defer.returnValue(keys) + keys = yield self.get_all_server_verify_keys(server_name) + defer.returnValue({ + k: keys[k] + for k in key_ids + if k in keys and keys[k] + }) + @defer.inlineCallbacks def store_server_verify_key(self, server_name, from_server, time_now_ms, verify_key): """Stores a NACL verification key for the given server. @@ -109,7 +118,7 @@ class KeyStore(SQLBaseStore): ts_now_ms (int): The time now in milliseconds verification_key (VerifyKey): The NACL verify key. """ - return self._simple_upsert( + yield self._simple_upsert( table="server_signature_keys", keyvalues={ "server_name": server_name, @@ -123,6 +132,8 @@ class KeyStore(SQLBaseStore): desc="store_server_verify_key", ) + self.get_all_server_verify_keys.invalidate((server_name,)) + def store_server_keys_json(self, server_name, key_id, from_server, ts_now_ms, ts_expires_ms, key_json_bytes): """Stores the JSON bytes for a set of keys from a server @@ -152,6 +163,7 @@ class KeyStore(SQLBaseStore): "ts_valid_until_ms": ts_expires_ms, "key_json": buffer(key_json_bytes), }, + desc="store_server_keys_json", ) def get_server_keys_json(self, server_keys): diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index fefcf6bce0..34ca3b9a54 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -13,19 +13,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached, cachedList from twisted.internet import defer class PresenceStore(SQLBaseStore): def create_presence(self, user_localpart): - return self._simple_insert( + res = self._simple_insert( table="presence", values={"user_id": user_localpart}, desc="create_presence", ) + self.get_presence_state.invalidate((user_localpart,)) + return res + def has_presence_state(self, user_localpart): return self._simple_select_one( table="presence", @@ -35,6 +39,7 @@ class PresenceStore(SQLBaseStore): desc="has_presence_state", ) + @cached(max_entries=2000) def get_presence_state(self, user_localpart): return self._simple_select_one( table="presence", @@ -43,8 +48,27 @@ class PresenceStore(SQLBaseStore): desc="get_presence_state", ) + @cachedList(get_presence_state.cache, list_name="user_localparts") + def get_presence_states(self, user_localparts): + def f(txn): + results = {} + for user_localpart in user_localparts: + res = self._simple_select_one_txn( + txn, + table="presence", + keyvalues={"user_id": user_localpart}, + retcols=["state", "status_msg", "mtime"], + allow_none=True, + ) + if res: + results[user_localpart] = res + + return results + + return self.runInteraction("get_presence_states", f) + def set_presence_state(self, user_localpart, new_state): - return self._simple_update_one( + res = self._simple_update_one( table="presence", keyvalues={"user_id": user_localpart}, updatevalues={"state": new_state["state"], @@ -53,6 +77,9 @@ class PresenceStore(SQLBaseStore): desc="set_presence_state", ) + self.get_presence_state.invalidate((user_localpart,)) + return res + def allow_presence_visible(self, observed_localpart, observer_userid): return self._simple_insert( table="presence_allow_inbound", @@ -98,7 +125,7 @@ class PresenceStore(SQLBaseStore): updatevalues={"accepted": True}, desc="set_presence_list_accepted", ) - self.get_presence_list_accepted.invalidate(observer_localpart) + self.get_presence_list_accepted.invalidate((observer_localpart,)) defer.returnValue(result) def get_presence_list(self, observer_localpart, accepted=None): @@ -133,4 +160,4 @@ class PresenceStore(SQLBaseStore): "observed_user_id": observed_userid}, desc="del_presence_list", ) - self.get_presence_list_accepted.invalidate(observer_localpart) + self.get_presence_list_accepted.invalidate((observer_localpart,)) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 4cac118d17..5305b7e122 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -13,7 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cachedInlineCallbacks from twisted.internet import defer import logging @@ -23,8 +24,7 @@ logger = logging.getLogger(__name__) class PushRuleStore(SQLBaseStore): - @cached() - @defer.inlineCallbacks + @cachedInlineCallbacks() def get_push_rules_for_user(self, user_name): rows = yield self._simple_select_list( table=PushRuleTable.table_name, @@ -41,8 +41,7 @@ class PushRuleStore(SQLBaseStore): defer.returnValue(rows) - @cached() - @defer.inlineCallbacks + @cachedInlineCallbacks() def get_push_rules_enabled_for_user(self, user_name): results = yield self._simple_select_list( table=PushRuleEnableTable.table_name, @@ -153,11 +152,11 @@ class PushRuleStore(SQLBaseStore): txn.execute(sql, (user_name, priority_class, new_rule_priority)) txn.call_after( - self.get_push_rules_for_user.invalidate, user_name + self.get_push_rules_for_user.invalidate, (user_name,) ) txn.call_after( - self.get_push_rules_enabled_for_user.invalidate, user_name + self.get_push_rules_enabled_for_user.invalidate, (user_name,) ) self._simple_insert_txn( @@ -189,10 +188,10 @@ class PushRuleStore(SQLBaseStore): new_rule['priority'] = new_prio txn.call_after( - self.get_push_rules_for_user.invalidate, user_name + self.get_push_rules_for_user.invalidate, (user_name,) ) txn.call_after( - self.get_push_rules_enabled_for_user.invalidate, user_name + self.get_push_rules_enabled_for_user.invalidate, (user_name,) ) self._simple_insert_txn( @@ -218,8 +217,8 @@ class PushRuleStore(SQLBaseStore): desc="delete_push_rule", ) - self.get_push_rules_for_user.invalidate(user_name) - self.get_push_rules_enabled_for_user.invalidate(user_name) + self.get_push_rules_for_user.invalidate((user_name,)) + self.get_push_rules_enabled_for_user.invalidate((user_name,)) @defer.inlineCallbacks def set_push_rule_enabled(self, user_name, rule_id, enabled): @@ -240,10 +239,10 @@ class PushRuleStore(SQLBaseStore): {'id': new_id}, ) txn.call_after( - self.get_push_rules_for_user.invalidate, user_name + self.get_push_rules_for_user.invalidate, (user_name,) ) txn.call_after( - self.get_push_rules_enabled_for_user.invalidate, user_name + self.get_push_rules_enabled_for_user.invalidate, (user_name,) ) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py new file mode 100644 index 0000000000..a535063547 --- /dev/null +++ b/synapse/storage/receipts.py @@ -0,0 +1,406 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList +from synapse.util.caches import cache_counter, caches_by_name + +from twisted.internet import defer + +from blist import sorteddict +import logging +import ujson as json + + +logger = logging.getLogger(__name__) + + +class ReceiptsStore(SQLBaseStore): + def __init__(self, hs): + super(ReceiptsStore, self).__init__(hs) + + self._receipts_stream_cache = _RoomStreamChangeCache() + + @defer.inlineCallbacks + def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): + """Get receipts for multiple rooms for sending to clients. + + Args: + room_ids (list): List of room_ids. + to_key (int): Max stream id to fetch receipts upto. + from_key (int): Min stream id to fetch receipts from. None fetches + from the start. + + Returns: + list: A list of receipts. + """ + room_ids = set(room_ids) + + if from_key: + room_ids = yield self._receipts_stream_cache.get_rooms_changed( + self, room_ids, from_key + ) + + results = yield self._get_linearized_receipts_for_rooms( + room_ids, to_key, from_key=from_key + ) + + defer.returnValue([ev for res in results.values() for ev in res]) + + @cachedInlineCallbacks(num_args=3, max_entries=5000) + def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): + """Get receipts for a single room for sending to clients. + + Args: + room_ids (str): The room id. + to_key (int): Max stream id to fetch receipts upto. + from_key (int): Min stream id to fetch receipts from. None fetches + from the start. + + Returns: + list: A list of receipts. + """ + def f(txn): + if from_key: + sql = ( + "SELECT * FROM receipts_linearized WHERE" + " room_id = ? AND stream_id > ? AND stream_id <= ?" + ) + + txn.execute( + sql, + (room_id, from_key, to_key) + ) + else: + sql = ( + "SELECT * FROM receipts_linearized WHERE" + " room_id = ? AND stream_id <= ?" + ) + + txn.execute( + sql, + (room_id, to_key) + ) + + rows = self.cursor_to_dict(txn) + + return rows + + rows = yield self.runInteraction( + "get_linearized_receipts_for_room", f + ) + + if not rows: + defer.returnValue([]) + + content = {} + for row in rows: + content.setdefault( + row["event_id"], {} + ).setdefault( + row["receipt_type"], {} + )[row["user_id"]] = json.loads(row["data"]) + + defer.returnValue([{ + "type": "m.receipt", + "room_id": room_id, + "content": content, + }]) + + @cachedList(cache=get_linearized_receipts_for_room.cache, list_name="room_ids", + num_args=3, inlineCallbacks=True) + def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): + if not room_ids: + defer.returnValue({}) + + def f(txn): + if from_key: + sql = ( + "SELECT * FROM receipts_linearized WHERE" + " room_id IN (%s) AND stream_id > ? AND stream_id <= ?" + ) % ( + ",".join(["?"] * len(room_ids)) + ) + args = list(room_ids) + args.extend([from_key, to_key]) + + txn.execute(sql, args) + else: + sql = ( + "SELECT * FROM receipts_linearized WHERE" + " room_id IN (%s) AND stream_id <= ?" + ) % ( + ",".join(["?"] * len(room_ids)) + ) + + args = list(room_ids) + args.append(to_key) + + txn.execute(sql, args) + + return self.cursor_to_dict(txn) + + txn_results = yield self.runInteraction( + "_get_linearized_receipts_for_rooms", f + ) + + results = {} + for row in txn_results: + # We want a single event per room, since we want to batch the + # receipts by room, event and type. + room_event = results.setdefault(row["room_id"], { + "type": "m.receipt", + "room_id": row["room_id"], + "content": {}, + }) + + # The content is of the form: + # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. } + event_entry = room_event["content"].setdefault(row["event_id"], {}) + receipt_type = event_entry.setdefault(row["receipt_type"], {}) + + receipt_type[row["user_id"]] = json.loads(row["data"]) + + results = { + room_id: [results[room_id]] if room_id in results else [] + for room_id in room_ids + } + defer.returnValue(results) + + def get_max_receipt_stream_id(self): + return self._receipts_id_gen.get_max_token(self) + + @cachedInlineCallbacks() + def get_graph_receipts_for_room(self, room_id): + """Get receipts for sending to remote servers. + """ + rows = yield self._simple_select_list( + table="receipts_graph", + keyvalues={"room_id": room_id}, + retcols=["receipt_type", "user_id", "event_id"], + desc="get_linearized_receipts_for_room", + ) + + result = {} + for row in rows: + result.setdefault( + row["user_id"], {} + ).setdefault( + row["receipt_type"], [] + ).append(row["event_id"]) + + defer.returnValue(result) + + def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, + user_id, event_id, data, stream_id): + + # We don't want to clobber receipts for more recent events, so we + # have to compare orderings of existing receipts + sql = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " INNER JOIN receipts_linearized as r USING (event_id, room_id)" + " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" + ) + + txn.execute(sql, (room_id, receipt_type, user_id)) + results = txn.fetchall() + + if results: + res = self._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + ) + topological_ordering = int(res["topological_ordering"]) + stream_ordering = int(res["stream_ordering"]) + + for to, so, _ in results: + if int(to) > topological_ordering: + return False + elif int(to) == topological_ordering and int(so) >= stream_ordering: + return False + + self._simple_delete_txn( + txn, + table="receipts_linearized", + keyvalues={ + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + } + ) + + self._simple_insert_txn( + txn, + table="receipts_linearized", + values={ + "stream_id": stream_id, + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_id": event_id, + "data": json.dumps(data), + } + ) + + return True + + @defer.inlineCallbacks + def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data): + """Insert a receipt, either from local client or remote server. + + Automatically does conversion between linearized and graph + representations. + """ + if not event_ids: + return + + if len(event_ids) == 1: + linearized_event_id = event_ids[0] + else: + # we need to points in graph -> linearized form. + # TODO: Make this better. + def graph_to_linear(txn): + query = ( + "SELECT event_id WHERE room_id = ? AND stream_ordering IN (" + " SELECT max(stream_ordering) WHERE event_id IN (%s)" + ")" + ) % (",".join(["?"] * len(event_ids))) + + txn.execute(query, [room_id] + event_ids) + rows = txn.fetchall() + if rows: + return rows[0][0] + else: + raise RuntimeError("Unrecognized event_ids: %r" % (event_ids,)) + + linearized_event_id = yield self.runInteraction( + "insert_receipt_conv", graph_to_linear + ) + + stream_id_manager = yield self._receipts_id_gen.get_next(self) + with stream_id_manager as stream_id: + yield self._receipts_stream_cache.room_has_changed( + self, room_id, stream_id + ) + have_persisted = yield self.runInteraction( + "insert_linearized_receipt", + self.insert_linearized_receipt_txn, + room_id, receipt_type, user_id, linearized_event_id, + data, + stream_id=stream_id, + ) + + if not have_persisted: + defer.returnValue(None) + + yield self.insert_graph_receipt( + room_id, receipt_type, user_id, event_ids, data + ) + + max_persisted_id = yield self._stream_id_gen.get_max_token(self) + defer.returnValue((stream_id, max_persisted_id)) + + def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids, + data): + return self.runInteraction( + "insert_graph_receipt", + self.insert_graph_receipt_txn, + room_id, receipt_type, user_id, event_ids, data + ) + + def insert_graph_receipt_txn(self, txn, room_id, receipt_type, + user_id, event_ids, data): + self._simple_delete_txn( + txn, + table="receipts_graph", + keyvalues={ + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + } + ) + self._simple_insert_txn( + txn, + table="receipts_graph", + values={ + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_ids": json.dumps(event_ids), + "data": json.dumps(data), + } + ) + + +class _RoomStreamChangeCache(object): + """Keeps track of the stream_id of the latest change in rooms. + + Given a list of rooms and stream key, it will give a subset of rooms that + may have changed since that key. If the key is too old then the cache + will simply return all rooms. + """ + def __init__(self, size_of_cache=10000): + self._size_of_cache = size_of_cache + self._room_to_key = {} + self._cache = sorteddict() + self._earliest_key = None + self.name = "ReceiptsRoomChangeCache" + caches_by_name[self.name] = self._cache + + @defer.inlineCallbacks + def get_rooms_changed(self, store, room_ids, key): + """Returns subset of room ids that have had new receipts since the + given key. If the key is too old it will just return the given list. + """ + if key > (yield self._get_earliest_key(store)): + keys = self._cache.keys() + i = keys.bisect_right(key) + + result = set( + self._cache[k] for k in keys[i:] + ).intersection(room_ids) + + cache_counter.inc_hits(self.name) + else: + result = room_ids + cache_counter.inc_misses(self.name) + + defer.returnValue(result) + + @defer.inlineCallbacks + def room_has_changed(self, store, room_id, key): + """Informs the cache that the room has been changed at the given key. + """ + if key > (yield self._get_earliest_key(store)): + old_key = self._room_to_key.get(room_id, None) + if old_key: + key = max(key, old_key) + self._cache.pop(old_key, None) + self._cache[key] = room_id + + while len(self._cache) > self._size_of_cache: + k, r = self._cache.popitem() + self._earliest_key = max(k, self._earliest_key) + self._room_to_key.pop(r, None) + + @defer.inlineCallbacks + def _get_earliest_key(self, store): + if self._earliest_key is None: + self._earliest_key = yield store.get_max_receipt_stream_id() + self._earliest_key = int(self._earliest_key) + + defer.returnValue(self._earliest_key) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 90e2606be2..586628579d 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -17,7 +17,8 @@ from twisted.internet import defer from synapse.api.errors import StoreError, Codes -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached class RegistrationStore(SQLBaseStore): @@ -97,6 +98,20 @@ class RegistrationStore(SQLBaseStore): allow_none=True, ) + def get_users_by_id_case_insensitive(self, user_id): + """Gets users that match user_id case insensitively. + Returns a mapping of user_id -> password_hash. + """ + def f(txn): + sql = ( + "SELECT name, password_hash FROM users" + " WHERE lower(name) = lower(?)" + ) + txn.execute(sql, (user_id,)) + return dict(txn.fetchall()) + + return self.runInteraction("get_users_by_id_case_insensitive", f) + @defer.inlineCallbacks def user_set_password_hash(self, user_id, password_hash): """ @@ -111,16 +126,16 @@ class RegistrationStore(SQLBaseStore): }) @defer.inlineCallbacks - def user_delete_access_tokens_apart_from(self, user_id, token_id): + def user_delete_access_tokens(self, user_id): yield self.runInteraction( - "user_delete_access_tokens_apart_from", - self._user_delete_access_tokens_apart_from, user_id, token_id + "user_delete_access_tokens", + self._user_delete_access_tokens, user_id ) - def _user_delete_access_tokens_apart_from(self, txn, user_id, token_id): + def _user_delete_access_tokens(self, txn, user_id): txn.execute( - "DELETE FROM access_tokens WHERE user_id = ? AND id != ?", - (user_id, token_id) + "DELETE FROM access_tokens WHERE user_id = ?", + (user_id, ) ) @defer.inlineCallbacks @@ -131,7 +146,7 @@ class RegistrationStore(SQLBaseStore): user_id ) for r in rows: - self.get_user_by_token.invalidate(r) + self.get_user_by_token.invalidate((r,)) @cached() def get_user_by_token(self, token): diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 4612a8aa83..5e07b7e0e5 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -17,7 +17,8 @@ from twisted.internet import defer from synapse.api.errors import StoreError -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cachedInlineCallbacks import collections import logging @@ -186,8 +187,7 @@ class RoomStore(SQLBaseStore): } ) - @cached() - @defer.inlineCallbacks + @cachedInlineCallbacks() def get_room_name_and_aliases(self, room_id): def f(txn): sql = ( diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index d36a6c18a8..8eee2dfbcc 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -17,7 +17,8 @@ from twisted.internet import defer from collections import namedtuple -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached from synapse.api.constants import Membership from synapse.types import UserID @@ -35,38 +36,28 @@ RoomsForUser = namedtuple( class RoomMemberStore(SQLBaseStore): - def _store_room_member_txn(self, txn, event): + def _store_room_members_txn(self, txn, events): """Store a room member in the database. """ - try: - target_user_id = event.state_key - except: - logger.exception( - "Failed to parse target_user_id=%s", target_user_id - ) - raise - - logger.debug( - "_store_room_member_txn: target_user_id=%s, membership=%s", - target_user_id, - event.membership, - ) - - self._simple_insert_txn( + self._simple_insert_many_txn( txn, - "room_memberships", - { - "event_id": event.event_id, - "user_id": target_user_id, - "sender": event.user_id, - "room_id": event.room_id, - "membership": event.membership, - } + table="room_memberships", + values=[ + { + "event_id": event.event_id, + "user_id": event.state_key, + "sender": event.user_id, + "room_id": event.room_id, + "membership": event.membership, + } + for event in events + ] ) - txn.call_after(self.get_rooms_for_user.invalidate, target_user_id) - txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id) - txn.call_after(self.get_users_in_room.invalidate, event.room_id) + for event in events: + txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,)) + txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) + txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -88,7 +79,7 @@ class RoomMemberStore(SQLBaseStore): lambda events: events[0] if events else None ) - @cached() + @cached(max_entries=5000) def get_users_in_room(self, room_id): def f(txn): @@ -164,7 +155,7 @@ class RoomMemberStore(SQLBaseStore): RoomsForUser(**r) for r in self.cursor_to_dict(txn) ] - @cached() + @cached(max_entries=5000) def get_joined_hosts_for_room(self, room_id): return self.runInteraction( "get_joined_hosts_for_room", diff --git a/synapse/storage/schema/delta/21/end_to_end_keys.sql b/synapse/storage/schema/delta/21/end_to_end_keys.sql new file mode 100644 index 0000000000..8b4a380d11 --- /dev/null +++ b/synapse/storage/schema/delta/21/end_to_end_keys.sql @@ -0,0 +1,34 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE IF NOT EXISTS e2e_device_keys_json ( + user_id TEXT NOT NULL, -- The user these keys are for. + device_id TEXT NOT NULL, -- Which of the user's devices these keys are for. + ts_added_ms BIGINT NOT NULL, -- When the keys were uploaded. + key_json TEXT NOT NULL, -- The keys for the device as a JSON blob. + CONSTRAINT e2e_device_keys_json_uniqueness UNIQUE (user_id, device_id) +); + + +CREATE TABLE IF NOT EXISTS e2e_one_time_keys_json ( + user_id TEXT NOT NULL, -- The user this one-time key is for. + device_id TEXT NOT NULL, -- The device this one-time key is for. + algorithm TEXT NOT NULL, -- Which algorithm this one-time key is for. + key_id TEXT NOT NULL, -- An id for suppressing duplicate uploads. + ts_added_ms BIGINT NOT NULL, -- When this key was uploaded. + key_json TEXT NOT NULL, -- The key as a JSON blob. + CONSTRAINT e2e_one_time_keys_json_uniqueness UNIQUE (user_id, device_id, algorithm, key_id) +); diff --git a/synapse/storage/schema/delta/21/receipts.sql b/synapse/storage/schema/delta/21/receipts.sql new file mode 100644 index 0000000000..2f64d609fc --- /dev/null +++ b/synapse/storage/schema/delta/21/receipts.sql @@ -0,0 +1,38 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE IF NOT EXISTS receipts_graph( + room_id TEXT NOT NULL, + receipt_type TEXT NOT NULL, + user_id TEXT NOT NULL, + event_ids TEXT NOT NULL, + data TEXT NOT NULL, + CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id) +); + +CREATE TABLE IF NOT EXISTS receipts_linearized ( + stream_id BIGINT NOT NULL, + room_id TEXT NOT NULL, + receipt_type TEXT NOT NULL, + user_id TEXT NOT NULL, + event_id TEXT NOT NULL, + data TEXT NOT NULL, + CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id) +); + +CREATE INDEX receipts_linearized_id ON receipts_linearized( + stream_id +); diff --git a/synapse/storage/schema/delta/22/receipts_index.sql b/synapse/storage/schema/delta/22/receipts_index.sql new file mode 100644 index 0000000000..b182b2b661 --- /dev/null +++ b/synapse/storage/schema/delta/22/receipts_index.sql @@ -0,0 +1,18 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( + room_id, stream_id +); diff --git a/synapse/storage/schema/delta/22/user_threepids_unique.sql b/synapse/storage/schema/delta/22/user_threepids_unique.sql new file mode 100644 index 0000000000..87edfa454c --- /dev/null +++ b/synapse/storage/schema/delta/22/user_threepids_unique.sql @@ -0,0 +1,19 @@ +CREATE TABLE IF NOT EXISTS user_threepids2 ( + user_id TEXT NOT NULL, + medium TEXT NOT NULL, + address TEXT NOT NULL, + validated_at BIGINT NOT NULL, + added_at BIGINT NOT NULL, + CONSTRAINT medium_address UNIQUE (medium, address) +); + +INSERT INTO user_threepids2 + SELECT * FROM user_threepids WHERE added_at IN ( + SELECT max(added_at) FROM user_threepids GROUP BY medium, address + ) +; + +DROP TABLE user_threepids; +ALTER TABLE user_threepids2 RENAME TO user_threepids; + +CREATE INDEX user_threepids_user_id ON user_threepids(user_id); diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index f051828630..4f15e534b4 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -18,6 +18,7 @@ from twisted.internet import defer from _base import SQLBaseStore from syutil.base64util import encode_base64 +from synapse.crypto.event_signing import compute_event_reference_hash class SignatureStore(SQLBaseStore): @@ -101,23 +102,26 @@ class SignatureStore(SQLBaseStore): txn.execute(query, (event_id, )) return {k: v for k, v in txn.fetchall()} - def _store_event_reference_hash_txn(self, txn, event_id, algorithm, - hash_bytes): + def _store_event_reference_hashes_txn(self, txn, events): """Store a hash for a PDU Args: txn (cursor): - event_id (str): Id for the Event. - algorithm (str): Hashing algorithm. - hash_bytes (bytes): Hash function output bytes. + events (list): list of Events. """ - self._simple_insert_txn( + + vals = [] + for event in events: + ref_alg, ref_hash_bytes = compute_event_reference_hash(event) + vals.append({ + "event_id": event.event_id, + "algorithm": ref_alg, + "hash": buffer(ref_hash_bytes), + }) + + self._simple_insert_many_txn( txn, - "event_reference_hashes", - { - "event_id": event_id, - "algorithm": algorithm, - "hash": buffer(hash_bytes), - }, + table="event_reference_hashes", + values=vals, ) def _get_event_signatures_txn(self, txn, event_id): diff --git a/synapse/storage/state.py b/synapse/storage/state.py index f2b17f29ea..9630efcfcc 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -13,7 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import ( + cached, cachedInlineCallbacks, cachedList +) from twisted.internet import defer @@ -44,72 +47,44 @@ class StateStore(SQLBaseStore): """ @defer.inlineCallbacks - def get_state_groups(self, event_ids): + def get_state_groups(self, room_id, event_ids): """ Get the state groups for the given list of event_ids The return value is a dict mapping group names to lists of events. """ + if not event_ids: + defer.returnValue({}) - def f(txn): - groups = set() - for event_id in event_ids: - group = self._simple_select_one_onecol_txn( - txn, - table="event_to_state_groups", - keyvalues={"event_id": event_id}, - retcol="state_group", - allow_none=True, - ) - if group: - groups.add(group) - - res = {} - for group in groups: - state_ids = self._simple_select_onecol_txn( - txn, - table="state_groups_state", - keyvalues={"state_group": group}, - retcol="event_id", - ) - - res[group] = state_ids - - return res - - states = yield self.runInteraction( - "get_state_groups", - f, + event_to_groups = yield self._get_state_group_for_events( + room_id, event_ids, ) - state_list = yield defer.gatherResults( - [ - self._fetch_events_for_group(group, vals) - for group, vals in states.items() - ], - consumeErrors=True, - ) + groups = set(event_to_groups.values()) + group_to_state = yield self._get_state_for_groups(groups) - defer.returnValue(dict(state_list)) - - @cached(num_args=1) - def _fetch_events_for_group(self, state_group, events): - return self._get_events( - events, get_prev_content=False - ).addCallback( - lambda evs: (state_group, evs) - ) + defer.returnValue({ + group: state_map.values() + for group, state_map in group_to_state.items() + }) def _store_state_groups_txn(self, txn, event, context): - if context.current_state is None: - return + return self._store_mult_state_groups_txn(txn, [(event, context)]) + + def _store_mult_state_groups_txn(self, txn, events_and_contexts): + state_groups = {} + for event, context in events_and_contexts: + if context.current_state is None: + continue + + if context.state_group is not None: + state_groups[event.event_id] = context.state_group + continue - state_events = dict(context.current_state) + state_events = dict(context.current_state) - if event.is_state(): - state_events[(event.type, event.state_key)] = event + if event.is_state(): + state_events[(event.type, event.state_key)] = event - state_group = context.state_group - if not state_group: state_group = self._state_groups_id_gen.get_next_txn(txn) self._simple_insert_txn( txn, @@ -135,14 +110,19 @@ class StateStore(SQLBaseStore): for state in state_events.values() ], ) + state_groups[event.event_id] = state_group - self._simple_insert_txn( + self._simple_insert_many_txn( txn, table="event_to_state_groups", - values={ - "state_group": state_group, - "event_id": event.event_id, - }, + values=[ + { + "state_group": state_groups[event.event_id], + "event_id": event.event_id, + } + for event, context in events_and_contexts + if context.current_state is not None + ], ) @defer.inlineCallbacks @@ -177,8 +157,7 @@ class StateStore(SQLBaseStore): events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) - @cached(num_args=3) - @defer.inlineCallbacks + @cachedInlineCallbacks(num_args=3) def get_current_state_for_key(self, room_id, event_type, state_key): def f(txn): sql = ( @@ -194,6 +173,262 @@ class StateStore(SQLBaseStore): events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) + def _get_state_groups_from_groups(self, groups_and_types): + """Returns dictionary state_group -> state event ids + + Args: + groups_and_types (list): list of 2-tuple (`group`, `types`) + """ + def f(txn): + results = {} + for group, types in groups_and_types: + if types is not None: + where_clause = "AND (%s)" % ( + " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), + ) + else: + where_clause = "" + + sql = ( + "SELECT event_id FROM state_groups_state WHERE" + " state_group = ? %s" + ) % (where_clause,) + + args = [group] + if types is not None: + args.extend([i for typ in types for i in typ]) + + txn.execute(sql, args) + + results[group] = [r[0] for r in txn.fetchall()] + + return results + + return self.runInteraction( + "_get_state_groups_from_groups", + f, + ) + + @defer.inlineCallbacks + def get_state_for_events(self, room_id, event_ids, types): + """Given a list of event_ids and type tuples, return a list of state + dicts for each event. The state dicts will only have the type/state_keys + that are in the `types` list. + + Args: + room_id (str) + event_ids (list) + types (list): List of (type, state_key) tuples which are used to + filter the state fetched. `state_key` may be None, which matches + any `state_key` + + Returns: + deferred: A list of dicts corresponding to the event_ids given. + The dicts are mappings from (type, state_key) -> state_events + """ + event_to_groups = yield self._get_state_group_for_events( + room_id, event_ids, + ) + + groups = set(event_to_groups.values()) + group_to_state = yield self._get_state_for_groups(groups, types) + + event_to_state = { + event_id: group_to_state[group] + for event_id, group in event_to_groups.items() + } + + defer.returnValue({event: event_to_state[event] for event in event_ids}) + + @cached(num_args=2, lru=True, max_entries=10000) + def _get_state_group_for_event(self, room_id, event_id): + return self._simple_select_one_onecol( + table="event_to_state_groups", + keyvalues={ + "event_id": event_id, + }, + retcol="state_group", + allow_none=True, + desc="_get_state_group_for_event", + ) + + @cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids", + num_args=2) + def _get_state_group_for_events(self, room_id, event_ids): + """Returns mapping event_id -> state_group + """ + def f(txn): + results = {} + for event_id in event_ids: + results[event_id] = self._simple_select_one_onecol_txn( + txn, + table="event_to_state_groups", + keyvalues={ + "event_id": event_id, + }, + retcol="state_group", + allow_none=True, + ) + + return results + + return self.runInteraction("_get_state_group_for_events", f) + + def _get_some_state_from_cache(self, group, types): + """Checks if group is in cache. See `_get_state_for_groups` + + Returns 3-tuple (`state_dict`, `missing_types`, `got_all`). + `missing_types` is the list of types that aren't in the cache for that + group. `got_all` is a bool indicating if we successfully retrieved all + requests state from the cache, if False we need to query the DB for the + missing state. + + Args: + group: The state group to lookup + types (list): List of 2-tuples of the form (`type`, `state_key`), + where a `state_key` of `None` matches all state_keys for the + `type`. + """ + is_all, state_dict = self._state_group_cache.get(group) + + type_to_key = {} + missing_types = set() + for typ, state_key in types: + if state_key is None: + type_to_key[typ] = None + missing_types.add((typ, state_key)) + else: + if type_to_key.get(typ, object()) is not None: + type_to_key.setdefault(typ, set()).add(state_key) + + if (typ, state_key) not in state_dict: + missing_types.add((typ, state_key)) + + sentinel = object() + + def include(typ, state_key): + valid_state_keys = type_to_key.get(typ, sentinel) + if valid_state_keys is sentinel: + return False + if valid_state_keys is None: + return True + if state_key in valid_state_keys: + return True + return False + + got_all = not (missing_types or types is None) + + return { + k: v for k, v in state_dict.items() + if include(k[0], k[1]) + }, missing_types, got_all + + def _get_all_state_from_cache(self, group): + """Checks if group is in cache. See `_get_state_for_groups` + + Returns 2-tuple (`state_dict`, `got_all`). `got_all` is a bool + indicating if we successfully retrieved all requests state from the + cache, if False we need to query the DB for the missing state. + + Args: + group: The state group to lookup + """ + is_all, state_dict = self._state_group_cache.get(group) + return state_dict, is_all + + @defer.inlineCallbacks + def _get_state_for_groups(self, groups, types=None): + """Given list of groups returns dict of group -> list of state events + with matching types. `types` is a list of `(type, state_key)`, where + a `state_key` of None matches all state_keys. If `types` is None then + all events are returned. + """ + results = {} + missing_groups_and_types = [] + if types is not None: + for group in set(groups): + state_dict, missing_types, got_all = self._get_some_state_from_cache( + group, types + ) + results[group] = state_dict + + if not got_all: + missing_groups_and_types.append((group, missing_types)) + else: + for group in set(groups): + state_dict, got_all = self._get_all_state_from_cache( + group + ) + results[group] = state_dict + + if not got_all: + missing_groups_and_types.append((group, None)) + + if not missing_groups_and_types: + defer.returnValue({ + group: { + type_tuple: event + for type_tuple, event in state.items() + if event + } + for group, state in results.items() + }) + + # Okay, so we have some missing_types, lets fetch them. + cache_seq_num = self._state_group_cache.sequence + + group_state_dict = yield self._get_state_groups_from_groups( + missing_groups_and_types + ) + + state_events = yield self._get_events( + [e_id for l in group_state_dict.values() for e_id in l], + get_prev_content=False + ) + + state_events = {e.event_id: e for e in state_events} + + # Now we want to update the cache with all the things we fetched + # from the database. + for group, state_ids in group_state_dict.items(): + if types: + # We delibrately put key -> None mappings into the cache to + # cache absence of the key, on the assumption that if we've + # explicitly asked for some types then we will probably ask + # for them again. + state_dict = {key: None for key in types} + state_dict.update(results[group]) + results[group] = state_dict + else: + state_dict = results[group] + + for event_id in state_ids: + try: + state_event = state_events[event_id] + state_dict[(state_event.type, state_event.state_key)] = state_event + except KeyError: + # Hmm. So we do don't have that state event? Interesting. + logger.warn( + "Can't find state event %r for state group %r", + event_id, group, + ) + + self._state_group_cache.update( + cache_seq_num, + key=group, + value=state_dict, + full=(types is None), + ) + + # Remove all the entries with None values. The None values were just + # used for bookkeeping in the cache. + for group, state_dict in results.items(): + results[group] = { + key: event for key, event in state_dict.items() if event + } + + defer.returnValue(results) + def _make_group_id(clock): return str(int(clock.time_msec())) + random_string(5) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index af45fc5619..d7fe423f5a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -36,6 +36,7 @@ what sort order was used: from twisted.internet import defer from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logutils import log_function @@ -299,9 +300,8 @@ class StreamStore(SQLBaseStore): defer.returnValue((events, token)) - @defer.inlineCallbacks - def get_recent_events_for_room(self, room_id, limit, end_token, - with_feedback=False, from_token=None): + @cachedInlineCallbacks(num_args=4) + def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): # TODO (erikj): Handle compressed feedback end_token = RoomStreamToken.parse_stream_token(end_token) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 624da4a9dc..c8c7e6591a 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -13,7 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached from collections import namedtuple diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 89d1643f10..e956df62c7 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -72,7 +72,10 @@ class StreamIdGenerator(object): with stream_id_gen.get_next_txn(txn) as stream_id: # ... persist event ... """ - def __init__(self): + def __init__(self, table, column): + self.table = table + self.column = column + self._lock = threading.Lock() self._current_max = None @@ -108,6 +111,37 @@ class StreamIdGenerator(object): defer.returnValue(manager()) @defer.inlineCallbacks + def get_next_mult(self, store, n): + """ + Usage: + with yield stream_id_gen.get_next(store, n) as stream_ids: + # ... persist events ... + """ + if not self._current_max: + yield store.runInteraction( + "_compute_current_max", + self._get_or_compute_current_max, + ) + + with self._lock: + next_ids = range(self._current_max + 1, self._current_max + n + 1) + self._current_max += n + + for next_id in next_ids: + self._unfinished_ids.append(next_id) + + @contextlib.contextmanager + def manager(): + try: + yield next_ids + finally: + with self._lock: + for next_id in next_ids: + self._unfinished_ids.remove(next_id) + + defer.returnValue(manager()) + + @defer.inlineCallbacks def get_max_token(self, store): """Returns the maximum stream id such that all stream ids less than or equal to it have been successfully persisted. @@ -126,7 +160,7 @@ class StreamIdGenerator(object): def _get_or_compute_current_max(self, txn): with self._lock: - txn.execute("SELECT MAX(stream_ordering) FROM events") + txn.execute("SELECT MAX(%s) FROM %s" % (self.column, self.table)) rows = txn.fetchall() val, = rows[0] |