diff options
author | Kegan Dougal <kegan@matrix.org> | 2015-03-26 10:07:59 +0000 |
---|---|---|
committer | Kegan Dougal <kegan@matrix.org> | 2015-03-26 10:07:59 +0000 |
commit | 4edcbcee3b9579f4b50ecb97e566edab1a7c4c8b (patch) | |
tree | 5ab27c5ebbfc762ee133223fb48e868b85474c79 /synapse/storage | |
parent | Set the service ID as soon as it is known. (diff) | |
parent | Allow a choice of LRU behaviour for Cache() by using LruCache() or OrderedDict() (diff) | |
download | synapse-4edcbcee3b9579f4b50ecb97e566edab1a7c4c8b.tar.xz |
Merge branch 'develop' into application-services-txn-reliability
Conflicts: synapse/storage/__init__.py
Diffstat (limited to '')
-rw-r--r-- | synapse/storage/__init__.py | 489 | ||||
-rw-r--r-- | synapse/storage/_base.py | 242 | ||||
-rw-r--r-- | synapse/storage/directory.py | 17 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 12 | ||||
-rw-r--r-- | synapse/storage/events.py | 395 | ||||
-rw-r--r-- | synapse/storage/feedback.py | 47 | ||||
-rw-r--r-- | synapse/storage/filtering.py | 1 | ||||
-rw-r--r-- | synapse/storage/media_repository.py | 20 | ||||
-rw-r--r-- | synapse/storage/presence.py | 12 | ||||
-rw-r--r-- | synapse/storage/profile.py | 5 | ||||
-rw-r--r-- | synapse/storage/push_rule.py | 9 | ||||
-rw-r--r-- | synapse/storage/pusher.py | 16 | ||||
-rw-r--r-- | synapse/storage/registration.py | 16 | ||||
-rw-r--r-- | synapse/storage/rejections.py | 3 | ||||
-rw-r--r-- | synapse/storage/room.py | 76 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 3 | ||||
-rw-r--r-- | synapse/storage/state.py | 34 | ||||
-rw-r--r-- | synapse/storage/stream.py | 22 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 20 |
19 files changed, 795 insertions, 644 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index e752b035e6..f4dec70393 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -14,15 +14,12 @@ # limitations under the License. from twisted.internet import defer - -from synapse.util.logutils import log_function -from synapse.api.constants import EventTypes - from .appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore ) +from ._base import Cache from .directory import DirectoryStore -from .feedback import FeedbackStore +from .events import EventsStore from .presence import PresenceStore from .profile import ProfileStore from .registration import RegistrationStore @@ -41,11 +38,6 @@ from .state import StateStore from .signatures import SignatureStore from .filtering import FilteringStore -from syutil.base64util import decode_base64 -from syutil.jsonutil import encode_canonical_json - -from synapse.crypto.event_signing import compute_event_reference_hash - import fnmatch import imp @@ -63,16 +55,14 @@ SCHEMA_VERSION = 15 dir_path = os.path.abspath(os.path.dirname(__file__)) - -class _RollbackButIsFineException(Exception): - """ This exception is used to rollback a transaction without implying - something went wrong. - """ - pass +# Number of msec of granularity to store the user IP 'last seen' time. Smaller +# times give more inserts into the database even for readonly API hits +# 120 seconds == 2 minutes +LAST_SEEN_GRANULARITY = 120*1000 class DataStore(RoomMemberStore, RoomStore, - RegistrationStore, StreamStore, ProfileStore, FeedbackStore, + RegistrationStore, StreamStore, ProfileStore, PresenceStore, TransactionStore, DirectoryStore, KeyStore, StateStore, SignatureStore, ApplicationServiceStore, @@ -83,6 +73,7 @@ class DataStore(RoomMemberStore, RoomStore, PusherStore, PushRuleStore, ApplicationServiceTransactionStore, + EventsStore, ): def __init__(self, hs): @@ -92,424 +83,28 @@ class DataStore(RoomMemberStore, RoomStore, self.min_token_deferred = self._get_min_token() self.min_token = None - @defer.inlineCallbacks - @log_function - def persist_event(self, event, context, backfilled=False, - is_new_state=True, current_state=None): - stream_ordering = None - if backfilled: - if not self.min_token_deferred.called: - yield self.min_token_deferred - self.min_token -= 1 - stream_ordering = self.min_token - - try: - yield self.runInteraction( - "persist_event", - self._persist_event_txn, - event=event, - context=context, - backfilled=backfilled, - stream_ordering=stream_ordering, - is_new_state=is_new_state, - current_state=current_state, - ) - except _RollbackButIsFineException: - pass - - @defer.inlineCallbacks - def get_event(self, event_id, check_redacted=True, - get_prev_content=False, allow_rejected=False, - allow_none=False): - """Get an event from the database by event_id. - - Args: - event_id (str): The event_id of the event to fetch - check_redacted (bool): If True, check if event has been redacted - and redact it. - get_prev_content (bool): If True and event is a state event, - include the previous states content in the unsigned field. - allow_rejected (bool): If True return rejected events. - allow_none (bool): If True, return None if no event found, if - False throw an exception. - - Returns: - Deferred : A FrozenEvent. - """ - event = yield self.runInteraction( - "get_event", self._get_event_txn, - event_id, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - if not event and not allow_none: - raise RuntimeError("Could not find event %s" % (event_id,)) - - defer.returnValue(event) - - @log_function - def _persist_event_txn(self, txn, event, context, backfilled, - stream_ordering=None, is_new_state=True, - current_state=None): - - # Remove the any existing cache entries for the event_id - self._get_event_cache.pop(event.event_id) - - # We purposefully do this first since if we include a `current_state` - # key, we *want* to update the `current_state_events` table - if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) - ) - - for s in current_state: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": s.event_id, - "room_id": s.room_id, - "type": s.type, - "state_key": s.state_key, - }, - or_replace=True, - ) - - if event.is_state() and is_new_state: - if not backfilled and not context.rejected: - self._simple_insert_txn( - txn, - table="state_forward_extremities", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - or_replace=True, - ) - - for prev_state_id, _ in event.prev_state: - self._simple_delete_txn( - txn, - table="state_forward_extremities", - keyvalues={ - "event_id": prev_state_id, - } - ) - - outlier = event.internal_metadata.is_outlier() - - if not outlier: - self._store_state_groups_txn(txn, event, context) - - self._update_min_depth_for_room_txn( - txn, - event.room_id, - event.depth - ) - - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, - ) - - have_persisted = self._simple_select_one_onecol_txn( - txn, - table="event_json", - keyvalues={"event_id": event.event_id}, - retcol="event_id", - allow_none=True, - ) - - metadata_json = encode_canonical_json( - event.internal_metadata.get_dict() - ) - - # If we have already persisted this event, we don't need to do any - # more processing. - # The processing above must be done on every call to persist event, - # since they might not have happened on previous calls. For example, - # if we are persisting an event that we had persisted as an outlier, - # but is no longer one. - if have_persisted: - if not outlier: - sql = ( - "UPDATE event_json SET internal_metadata = ?" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (metadata_json.decode("UTF-8"), event.event_id,) - ) - - sql = ( - "UPDATE events SET outlier = 0" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (event.event_id,) - ) - return - - if event.type == EventTypes.Member: - self._store_room_member_txn(txn, event) - elif event.type == EventTypes.Feedback: - self._store_feedback_txn(txn, event) - elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, event) - elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, event) - elif event.type == EventTypes.Redaction: - self._store_redaction(txn, event) - - event_dict = { - k: v - for k, v in event.get_dict().items() - if k not in [ - "redacted", - "redacted_because", - ] - } - - self._simple_insert_txn( - txn, - table="event_json", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "internal_metadata": metadata_json.decode("UTF-8"), - "json": encode_canonical_json(event_dict).decode("UTF-8"), - }, - or_replace=True, - ) - - content = encode_canonical_json( - event.content - ).decode("UTF-8") - - vals = { - "topological_ordering": event.depth, - "event_id": event.event_id, - "type": event.type, - "room_id": event.room_id, - "content": content, - "processed": True, - "outlier": outlier, - "depth": event.depth, - } - - if stream_ordering is not None: - vals["stream_ordering"] = stream_ordering - - unrec = { - k: v - for k, v in event.get_dict().items() - if k not in vals.keys() and k not in [ - "redacted", - "redacted_because", - "signatures", - "hashes", - "prev_events", - ] - } - - vals["unrecognized_keys"] = encode_canonical_json( - unrec - ).decode("UTF-8") - - try: - self._simple_insert_txn( - txn, - "events", - vals, - or_replace=(not outlier), - or_ignore=bool(outlier), - ) - except: - logger.warn( - "Failed to persist, probably duplicate: %s", - event.event_id, - exc_info=True, - ) - raise _RollbackButIsFineException("_persist_event") - - if context.rejected: - self._store_rejections_txn(txn, event.event_id, context.rejected) - - if event.is_state(): - vals = { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - } - - # TODO: How does this work with backfilling? - if hasattr(event, "replaces_state"): - vals["prev_state"] = event.replaces_state - - self._simple_insert_txn( - txn, - "state_events", - vals, - or_replace=True, - ) - - if is_new_state and not context.rejected: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - or_replace=True, - ) - - for e_id, h in event.prev_state: - self._simple_insert_txn( - txn, - table="event_edges", - values={ - "event_id": event.event_id, - "prev_event_id": e_id, - "room_id": event.room_id, - "is_state": 1, - }, - or_ignore=True, - ) - - for hash_alg, hash_base64 in event.hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_event_content_hash_txn( - txn, event.event_id, hash_alg, hash_bytes, - ) - - for prev_event_id, prev_hashes in event.prev_events: - for alg, hash_base64 in prev_hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_prev_event_hash_txn( - txn, event.event_id, prev_event_id, alg, hash_bytes - ) - - for auth_id, _ in event.auth_events: - self._simple_insert_txn( - txn, - table="event_auth", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "auth_id": auth_id, - }, - or_ignore=True, - ) - - (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) - self._store_event_reference_hash_txn( - txn, event.event_id, ref_alg, ref_hash_bytes - ) - - def _store_redaction(self, txn, event): - # invalidate the cache for the redacted event - self._get_event_cache.pop(event.redacts) - txn.execute( - "INSERT OR IGNORE INTO redactions " - "(event_id, redacts) VALUES (?,?)", - (event.event_id, event.redacts) - ) - - @defer.inlineCallbacks - def get_current_state(self, room_id, event_type=None, state_key=""): - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = e.event_id " - "LIMIT 1" - ) - - sql = ( - "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " - "INNER JOIN current_state_events as c ON e.event_id = c.event_id " - "INNER JOIN state_events as s ON e.event_id = s.event_id " - "WHERE c.room_id = ? " - ) % { - "redacted": del_sql, - } - - if event_type and state_key is not None: - sql += " AND s.type = ? AND s.state_key = ? " - args = (room_id, event_type, state_key) - elif event_type: - sql += " AND s.type = ?" - args = (room_id, event_type) - else: - args = (room_id, ) - - results = yield self._execute_and_decode("get_current_state", sql, *args) - - events = yield self._parse_events(results) - defer.returnValue(events) - - @defer.inlineCallbacks - def get_room_name_and_aliases(self, room_id): - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = e.event_id " - "LIMIT 1" + self.client_ip_last_seen = Cache( + name="client_ip_last_seen", + keylen=4, ) - sql = ( - "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " - "INNER JOIN current_state_events as c ON e.event_id = c.event_id " - "INNER JOIN state_events as s ON e.event_id = s.event_id " - "WHERE c.room_id = ? " - ) % { - "redacted": del_sql, - } - - sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')" - sql += " OR s.type = 'm.room.aliases')" - args = (room_id,) - - results = yield self._execute_and_decode("get_current_state", sql, *args) - - events = yield self._parse_events(results) - - name = None - aliases = [] - - for e in events: - if e.type == 'm.room.name': - if 'name' in e.content: - name = e.content['name'] - elif e.type == 'm.room.aliases': - if 'aliases' in e.content: - aliases.extend(e.content['aliases']) - - defer.returnValue((name, aliases)) - @defer.inlineCallbacks - def _get_min_token(self): - row = yield self._execute( - "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events" - ) + def insert_client_ip(self, user, access_token, device_id, ip, user_agent): + now = int(self._clock.time_msec()) + key = (user.to_string(), access_token, device_id, ip) - self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 - self.min_token = min(self.min_token, -1) + try: + last_seen = self.client_ip_last_seen.get(*key) + except KeyError: + last_seen = None - logger.debug("min_token is: %s", self.min_token) + # Rate-limited inserts + if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: + defer.returnValue(None) - defer.returnValue(self.min_token) + self.client_ip_last_seen.prefill(*key + (now,)) - def insert_client_ip(self, user, access_token, device_id, ip, user_agent): - return self._simple_insert( + yield self._simple_insert( "user_ips", { "user": user.to_string(), @@ -517,8 +112,9 @@ class DataStore(RoomMemberStore, RoomStore, "device_id": device_id, "ip": ip, "user_agent": user_agent, - "last_seen": int(self._clock.time_msec()), - } + "last_seen": now, + }, + desc="insert_client_ip", ) def get_user_ip_and_agents(self, user): @@ -528,38 +124,7 @@ class DataStore(RoomMemberStore, RoomStore, retcols=[ "device_id", "access_token", "ip", "user_agent", "last_seen" ], - ) - - def have_events(self, event_ids): - """Given a list of event ids, check if we have already processed them. - - Returns: - dict: Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps to - None. - """ - if not event_ids: - return defer.succeed({}) - - def f(txn): - sql = ( - "SELECT e.event_id, reason FROM events as e " - "LEFT JOIN rejections as r ON e.event_id = r.event_id " - "WHERE e.event_id = ?" - ) - - res = {} - for event_id in event_ids: - txn.execute(sql, (event_id,)) - row = txn.fetchone() - if row: - _, rejected = row - res[event_id] = rejected - - return res - - return self.runInteraction( - "have_events", f, + desc="get_user_ip_and_agents", ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 40f2fc6d76..6fa63f052e 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -25,6 +25,7 @@ import synapse.metrics from twisted.internet import defer from collections import namedtuple, OrderedDict +import functools import simplejson as json import sys import time @@ -38,6 +39,8 @@ transaction_logger = logging.getLogger("synapse.storage.txn") metrics = synapse.metrics.get_metrics_for("synapse.storage") +sql_scheduling_timer = metrics.register_distribution("schedule_time") + sql_query_timer = metrics.register_distribution("query_time", labels=["verb"]) sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"]) sql_getevents_timer = metrics.register_distribution("getEvents_time", labels=["desc"]) @@ -50,14 +53,57 @@ cache_counter = metrics.register_cache( ) -# TODO(paul): -# * more generic key management -# * consider other eviction strategies - LRU? -def cached(max_entries=1000): +class Cache(object): + + def __init__(self, name, max_entries=1000, keylen=1, lru=False): + if lru: + self.cache = LruCache(max_size=max_entries) + self.max_entries = None + else: + self.cache = OrderedDict() + self.max_entries = max_entries + + self.name = name + self.keylen = keylen + + caches_by_name[name] = self.cache + + def get(self, *keyargs): + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + if keyargs in self.cache: + cache_counter.inc_hits(self.name) + return self.cache[keyargs] + + cache_counter.inc_misses(self.name) + raise KeyError() + + def prefill(self, *args): # because I can't *keyargs, value + keyargs = args[:-1] + value = args[-1] + + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + if self.max_entries is not None: + while len(self.cache) >= self.max_entries: + self.cache.popitem(last=False) + + self.cache[keyargs] = value + + def invalidate(self, *keyargs): + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + self.cache.pop(keyargs, None) + + +def cached(max_entries=1000, num_args=1, lru=False): """ A method decorator that applies a memoizing cache around the function. - The function is presumed to take one additional argument, which is used as - the key for the cache. Cache hits are served directly from the cache; + The function is presumed to take zero or more arguments, which are used in + a tuple as the key for the cache. Hits are served directly from the cache; misses use the function body to generate the value. The wrapped function has an additional member, a callable called @@ -68,33 +114,27 @@ def cached(max_entries=1000): calling the calculation function. """ def wrap(orig): - cache = OrderedDict() - name = orig.__name__ - - caches_by_name[name] = cache - - def prefill(key, value): - while len(cache) > max_entries: - cache.popitem(last=False) - - cache[key] = value + cache = Cache( + name=orig.__name__, + max_entries=max_entries, + keylen=num_args, + lru=lru, + ) + @functools.wraps(orig) @defer.inlineCallbacks - def wrapped(self, key): - if key in cache: - cache_counter.inc_hits(name) - defer.returnValue(cache[key]) + def wrapped(self, *keyargs): + try: + defer.returnValue(cache.get(*keyargs)) + except KeyError: + ret = yield orig(self, *keyargs) - cache_counter.inc_misses(name) - ret = yield orig(self, key) - prefill(key, ret) - defer.returnValue(ret) + cache.prefill(*keyargs + (ret,)) - def invalidate(key): - cache.pop(key, None) + defer.returnValue(ret) - wrapped.invalidate = invalidate - wrapped.prefill = prefill + wrapped.invalidate = cache.invalidate + wrapped.prefill = cache.prefill return wrapped return wrap @@ -240,6 +280,8 @@ class SQLBaseStore(object): """Wraps the .runInteraction() method on the underlying db_pool.""" current_context = LoggingContext.current_context() + start_time = time.time() * 1000 + def inner_func(txn, *args, **kwargs): with LoggingContext("runInteraction") as context: current_context.copy_to(context) @@ -252,6 +294,7 @@ class SQLBaseStore(object): name = "%s-%x" % (desc, txn_id, ) + sql_scheduling_timer.inc_by(time.time() * 1000 - start_time) transaction_logger.debug("[TXN START] {%s}", name) try: return func(LoggingTransaction(txn, name), *args, **kwargs) @@ -314,7 +357,8 @@ class SQLBaseStore(object): # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. - def _simple_insert(self, table, values, or_replace=False, or_ignore=False): + def _simple_insert(self, table, values, or_replace=False, or_ignore=False, + desc="_simple_insert"): """Executes an INSERT query on the named table. Args: @@ -323,7 +367,7 @@ class SQLBaseStore(object): or_replace : bool; if True performs an INSERT OR REPLACE """ return self.runInteraction( - "_simple_insert", + desc, self._simple_insert_txn, table, values, or_replace=or_replace, or_ignore=or_ignore, ) @@ -347,7 +391,7 @@ class SQLBaseStore(object): txn.execute(sql, values.values()) return txn.lastrowid - def _simple_upsert(self, table, keyvalues, values): + def _simple_upsert(self, table, keyvalues, values, desc="_simple_upsert"): """ Args: table (str): The table to upsert into @@ -356,7 +400,7 @@ class SQLBaseStore(object): Returns: A deferred """ return self.runInteraction( - "_simple_upsert", + desc, self._simple_upsert_txn, table, keyvalues, values ) @@ -392,7 +436,7 @@ class SQLBaseStore(object): txn.execute(sql, allvalues.values()) def _simple_select_one(self, table, keyvalues, retcols, - allow_none=False): + allow_none=False, desc="_simple_select_one"): """Executes a SELECT query on the named table, which is expected to return a single row, returning a single column from it. @@ -404,12 +448,15 @@ class SQLBaseStore(object): allow_none : If true, return None instead of failing if the SELECT statement returns no rows """ - return self._simple_selectupdate_one( - table, keyvalues, retcols=retcols, allow_none=allow_none + return self.runInteraction( + desc, + self._simple_select_one_txn, + table, keyvalues, retcols, allow_none, ) def _simple_select_one_onecol(self, table, keyvalues, retcol, - allow_none=False): + allow_none=False, + desc="_simple_select_one_onecol"): """Executes a SELECT query on the named table, which is expected to return a single row, returning a single column from it." @@ -419,7 +466,7 @@ class SQLBaseStore(object): retcol : string giving the name of the column to return """ return self.runInteraction( - "_simple_select_one_onecol", + desc, self._simple_select_one_onecol_txn, table, keyvalues, retcol, allow_none=allow_none, ) @@ -455,7 +502,8 @@ class SQLBaseStore(object): return [r[0] for r in txn.fetchall()] - def _simple_select_onecol(self, table, keyvalues, retcol): + def _simple_select_onecol(self, table, keyvalues, retcol, + desc="_simple_select_onecol"): """Executes a SELECT query on the named table, which returns a list comprising of the values of the named column from the selected rows. @@ -468,12 +516,13 @@ class SQLBaseStore(object): Deferred: Results in a list """ return self.runInteraction( - "_simple_select_onecol", + desc, self._simple_select_onecol_txn, table, keyvalues, retcol ) - def _simple_select_list(self, table, keyvalues, retcols): + def _simple_select_list(self, table, keyvalues, retcols, + desc="_simple_select_list"): """Executes a SELECT query on the named table, which may return zero or more rows, returning the result as a list of dicts. @@ -484,7 +533,7 @@ class SQLBaseStore(object): retcols : list of strings giving the names of the columns to return """ return self.runInteraction( - "_simple_select_list", + desc, self._simple_select_list_txn, table, keyvalues, retcols ) @@ -516,7 +565,7 @@ class SQLBaseStore(object): return self.cursor_to_dict(txn) def _simple_update_one(self, table, keyvalues, updatevalues, - retcols=None): + desc="_simple_update_one"): """Executes an UPDATE query on the named table, setting new values for columns in a row matching the key values. @@ -534,56 +583,76 @@ class SQLBaseStore(object): get-and-set. This can be used to implement compare-and-set by putting the update column in the 'keyvalues' dict as well. """ - return self._simple_selectupdate_one(table, keyvalues, updatevalues, - retcols=retcols) + return self.runInteraction( + desc, + self._simple_update_one_txn, + table, keyvalues, updatevalues, + ) - def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None, - retcols=None, allow_none=False): - """ Combined SELECT then UPDATE.""" - if retcols: - select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( - ", ".join(retcols), - table, - " AND ".join("%s = ?" % (k) for k in keyvalues) - ) + def _simple_update_one_txn(self, txn, table, keyvalues, updatevalues): + update_sql = "UPDATE %s SET %s WHERE %s" % ( + table, + ", ".join("%s = ?" % (k,) for k in updatevalues), + " AND ".join("%s = ?" % (k,) for k in keyvalues) + ) - if updatevalues: - update_sql = "UPDATE %s SET %s WHERE %s" % ( - table, - ", ".join("%s = ?" % (k,) for k in updatevalues), - " AND ".join("%s = ?" % (k,) for k in keyvalues) - ) + txn.execute( + update_sql, + updatevalues.values() + keyvalues.values() + ) + + if txn.rowcount == 0: + raise StoreError(404, "No row found") + if txn.rowcount > 1: + raise StoreError(500, "More than one row matched") + + def _simple_select_one_txn(self, txn, table, keyvalues, retcols, + allow_none=False): + select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( + ", ".join(retcols), + table, + " AND ".join("%s = ?" % (k) for k in keyvalues) + ) + + txn.execute(select_sql, keyvalues.values()) + row = txn.fetchone() + if not row: + if allow_none: + return None + raise StoreError(404, "No row found") + if txn.rowcount > 1: + raise StoreError(500, "More than one row matched") + + return dict(zip(retcols, row)) + + def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None, + retcols=None, allow_none=False, + desc="_simple_selectupdate_one"): + """ Combined SELECT then UPDATE.""" def func(txn): ret = None if retcols: - txn.execute(select_sql, keyvalues.values()) - - row = txn.fetchone() - if not row: - if allow_none: - return None - raise StoreError(404, "No row found") - if txn.rowcount > 1: - raise StoreError(500, "More than one row matched") - - ret = dict(zip(retcols, row)) + ret = self._simple_select_one_txn( + txn, + table=table, + keyvalues=keyvalues, + retcols=retcols, + allow_none=allow_none, + ) if updatevalues: - txn.execute( - update_sql, - updatevalues.values() + keyvalues.values() + self._simple_update_one_txn( + txn, + table=table, + keyvalues=keyvalues, + updatevalues=updatevalues, ) - if txn.rowcount == 0: - raise StoreError(404, "No row found") - if txn.rowcount > 1: - raise StoreError(500, "More than one row matched") - return ret - return self.runInteraction("_simple_selectupdate_one", func) + return self.runInteraction(desc, func) - def _simple_delete_one(self, table, keyvalues): + def _simple_delete_one(self, table, keyvalues, desc="_simple_delete_one"): """Executes a DELETE query on the named table, expecting to delete a single row. @@ -602,9 +671,9 @@ class SQLBaseStore(object): raise StoreError(404, "No row found") if txn.rowcount > 1: raise StoreError(500, "more than one row matched") - return self.runInteraction("_simple_delete_one", func) + return self.runInteraction(desc, func) - def _simple_delete(self, table, keyvalues): + def _simple_delete(self, table, keyvalues, desc="_simple_delete"): """Executes a DELETE query on the named table. Args: @@ -612,7 +681,7 @@ class SQLBaseStore(object): keyvalues : dict of column names and values to select the row with """ - return self.runInteraction("_simple_delete", self._simple_delete_txn) + return self.runInteraction(desc, self._simple_delete_txn) def _simple_delete_txn(self, txn, table, keyvalues): sql = "DELETE FROM %s WHERE %s" % ( @@ -782,6 +851,13 @@ class SQLBaseStore(object): return result[0] if result else None +class _RollbackButIsFineException(Exception): + """ This exception is used to rollback a transaction without implying + something went wrong. + """ + pass + + class Table(object): """ A base class used to store information about a particular table. """ diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 68b7d59693..0199539fea 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached from synapse.api.errors import SynapseError @@ -48,6 +48,7 @@ class DirectoryStore(SQLBaseStore): {"room_alias": room_alias.to_string()}, "room_id", allow_none=True, + desc="get_association_from_room_alias", ) if not room_id: @@ -58,6 +59,7 @@ class DirectoryStore(SQLBaseStore): "room_alias_servers", {"room_alias": room_alias.to_string()}, "server", + desc="get_association_from_room_alias", ) if not servers: @@ -87,6 +89,7 @@ class DirectoryStore(SQLBaseStore): "room_alias": room_alias.to_string(), "room_id": room_id, }, + desc="create_room_alias_association", ) except sqlite3.IntegrityError: raise SynapseError( @@ -100,16 +103,22 @@ class DirectoryStore(SQLBaseStore): { "room_alias": room_alias.to_string(), "server": server, - } + }, + desc="create_room_alias_association", ) + self.get_aliases_for_room.invalidate(room_id) + @defer.inlineCallbacks def delete_room_alias(self, room_alias): - return self.runInteraction( + room_id = yield self.runInteraction( "delete_room_alias", self._delete_room_alias_txn, room_alias, ) + self.get_aliases_for_room.invalidate(room_id) + defer.returnValue(room_id) + def _delete_room_alias_txn(self, txn, room_alias): cursor = txn.execute( "SELECT room_id FROM room_aliases WHERE room_alias = ?", @@ -134,9 +143,11 @@ class DirectoryStore(SQLBaseStore): return room_id + @cached() def get_aliases_for_room(self, room_id): return self._simple_select_onecol( "room_aliases", {"room_id": room_id}, "room_alias", + desc="get_aliases_for_room", ) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 2deda8ac50..032334bfd6 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -429,3 +429,15 @@ class EventFederationStore(SQLBaseStore): ) return events[:limit] + + def clean_room_for_join(self, room_id): + return self.runInteraction( + "clean_room_for_join", + self._clean_room_for_join_txn, + room_id, + ) + + def _clean_room_for_join_txn(self, txn, room_id): + query = "DELETE FROM event_forward_extremities WHERE room_id = ?" + + txn.execute(query, (room_id,)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py new file mode 100644 index 0000000000..a86230d92c --- /dev/null +++ b/synapse/storage/events.py @@ -0,0 +1,395 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from _base import SQLBaseStore, _RollbackButIsFineException + +from twisted.internet import defer + +from synapse.util.logutils import log_function +from synapse.api.constants import EventTypes +from synapse.crypto.event_signing import compute_event_reference_hash + +from syutil.base64util import decode_base64 +from syutil.jsonutil import encode_canonical_json + +import logging + +logger = logging.getLogger(__name__) + + +class EventsStore(SQLBaseStore): + @defer.inlineCallbacks + @log_function + def persist_event(self, event, context, backfilled=False, + is_new_state=True, current_state=None): + stream_ordering = None + if backfilled: + if not self.min_token_deferred.called: + yield self.min_token_deferred + self.min_token -= 1 + stream_ordering = self.min_token + + try: + yield self.runInteraction( + "persist_event", + self._persist_event_txn, + event=event, + context=context, + backfilled=backfilled, + stream_ordering=stream_ordering, + is_new_state=is_new_state, + current_state=current_state, + ) + self.get_room_events_max_id.invalidate() + except _RollbackButIsFineException: + pass + + @defer.inlineCallbacks + def get_event(self, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False, + allow_none=False): + """Get an event from the database by event_id. + + Args: + event_id (str): The event_id of the event to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + allow_none (bool): If True, return None if no event found, if + False throw an exception. + + Returns: + Deferred : A FrozenEvent. + """ + event = yield self.runInteraction( + "get_event", self._get_event_txn, + event_id, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + if not event and not allow_none: + raise RuntimeError("Could not find event %s" % (event_id,)) + + defer.returnValue(event) + + @log_function + def _persist_event_txn(self, txn, event, context, backfilled, + stream_ordering=None, is_new_state=True, + current_state=None): + + # Remove the any existing cache entries for the event_id + self._get_event_cache.pop(event.event_id) + + # We purposefully do this first since if we include a `current_state` + # key, we *want* to update the `current_state_events` table + if current_state: + txn.execute( + "DELETE FROM current_state_events WHERE room_id = ?", + (event.room_id,) + ) + + for s in current_state: + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": s.event_id, + "room_id": s.room_id, + "type": s.type, + "state_key": s.state_key, + }, + or_replace=True, + ) + + if event.is_state() and is_new_state: + if not backfilled and not context.rejected: + self._simple_insert_txn( + txn, + table="state_forward_extremities", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + or_replace=True, + ) + + for prev_state_id, _ in event.prev_state: + self._simple_delete_txn( + txn, + table="state_forward_extremities", + keyvalues={ + "event_id": prev_state_id, + } + ) + + outlier = event.internal_metadata.is_outlier() + + if not outlier: + self._store_state_groups_txn(txn, event, context) + + self._update_min_depth_for_room_txn( + txn, + event.room_id, + event.depth + ) + + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + + have_persisted = self._simple_select_one_onecol_txn( + txn, + table="event_json", + keyvalues={"event_id": event.event_id}, + retcol="event_id", + allow_none=True, + ) + + metadata_json = encode_canonical_json( + event.internal_metadata.get_dict() + ) + + # If we have already persisted this event, we don't need to do any + # more processing. + # The processing above must be done on every call to persist event, + # since they might not have happened on previous calls. For example, + # if we are persisting an event that we had persisted as an outlier, + # but is no longer one. + if have_persisted: + if not outlier: + sql = ( + "UPDATE event_json SET internal_metadata = ?" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (metadata_json.decode("UTF-8"), event.event_id,) + ) + + sql = ( + "UPDATE events SET outlier = 0" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (event.event_id,) + ) + return + + if event.type == EventTypes.Member: + self._store_room_member_txn(txn, event) + elif event.type == EventTypes.Feedback: + self._store_feedback_txn(txn, event) + elif event.type == EventTypes.Name: + self._store_room_name_txn(txn, event) + elif event.type == EventTypes.Topic: + self._store_room_topic_txn(txn, event) + elif event.type == EventTypes.Redaction: + self._store_redaction(txn, event) + + event_dict = { + k: v + for k, v in event.get_dict().items() + if k not in [ + "redacted", + "redacted_because", + ] + } + + self._simple_insert_txn( + txn, + table="event_json", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "internal_metadata": metadata_json.decode("UTF-8"), + "json": encode_canonical_json(event_dict).decode("UTF-8"), + }, + or_replace=True, + ) + + content = encode_canonical_json( + event.content + ).decode("UTF-8") + + vals = { + "topological_ordering": event.depth, + "event_id": event.event_id, + "type": event.type, + "room_id": event.room_id, + "content": content, + "processed": True, + "outlier": outlier, + "depth": event.depth, + } + + if stream_ordering is not None: + vals["stream_ordering"] = stream_ordering + + unrec = { + k: v + for k, v in event.get_dict().items() + if k not in vals.keys() and k not in [ + "redacted", + "redacted_because", + "signatures", + "hashes", + "prev_events", + ] + } + + vals["unrecognized_keys"] = encode_canonical_json( + unrec + ).decode("UTF-8") + + try: + self._simple_insert_txn( + txn, + "events", + vals, + or_replace=(not outlier), + or_ignore=bool(outlier), + ) + except: + logger.warn( + "Failed to persist, probably duplicate: %s", + event.event_id, + exc_info=True, + ) + raise _RollbackButIsFineException("_persist_event") + + if context.rejected: + self._store_rejections_txn(txn, event.event_id, context.rejected) + + if event.is_state(): + vals = { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + + # TODO: How does this work with backfilling? + if hasattr(event, "replaces_state"): + vals["prev_state"] = event.replaces_state + + self._simple_insert_txn( + txn, + "state_events", + vals, + ) + + if is_new_state and not context.rejected: + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + ) + + for e_id, h in event.prev_state: + self._simple_insert_txn( + txn, + table="event_edges", + values={ + "event_id": event.event_id, + "prev_event_id": e_id, + "room_id": event.room_id, + "is_state": 1, + }, + ) + + for hash_alg, hash_base64 in event.hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_event_content_hash_txn( + txn, event.event_id, hash_alg, hash_bytes, + ) + + for prev_event_id, prev_hashes in event.prev_events: + for alg, hash_base64 in prev_hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_prev_event_hash_txn( + txn, event.event_id, prev_event_id, alg, hash_bytes + ) + + for auth_id, _ in event.auth_events: + self._simple_insert_txn( + txn, + table="event_auth", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "auth_id": auth_id, + }, + ) + + (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) + self._store_event_reference_hash_txn( + txn, event.event_id, ref_alg, ref_hash_bytes + ) + + def _store_redaction(self, txn, event): + # invalidate the cache for the redacted event + self._get_event_cache.pop(event.redacts) + txn.execute( + "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", + (event.event_id, event.redacts) + ) + + def have_events(self, event_ids): + """Given a list of event ids, check if we have already processed them. + + Returns: + dict: Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps to + None. + """ + if not event_ids: + return defer.succeed({}) + + def f(txn): + sql = ( + "SELECT e.event_id, reason FROM events as e " + "LEFT JOIN rejections as r ON e.event_id = r.event_id " + "WHERE e.event_id = ?" + ) + + res = {} + for event_id in event_ids: + txn.execute(sql, (event_id,)) + row = txn.fetchone() + if row: + _, rejected = row + res[event_id] = rejected + + return res + + return self.runInteraction( + "have_events", f, + ) diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py deleted file mode 100644 index 8eab769b71..0000000000 --- a/synapse/storage/feedback.py +++ /dev/null @@ -1,47 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from twisted.internet import defer - -from ._base import SQLBaseStore - - -class FeedbackStore(SQLBaseStore): - - def _store_feedback_txn(self, txn, event): - self._simple_insert_txn(txn, "feedback", { - "event_id": event.event_id, - "feedback_type": event.content["type"], - "room_id": event.room_id, - "target_event_id": event.content["target_event_id"], - "sender": event.user_id, - }) - - @defer.inlineCallbacks - def get_feedback_for_event(self, event_id): - sql = ( - "SELECT events.* FROM events INNER JOIN feedback " - "ON events.event_id = feedback.event_id " - "WHERE feedback.target_event_id = ? " - ) - - rows = yield self._execute_and_decode("get_feedback_for_event", sql, event_id) - - defer.returnValue( - [ - (yield self._parse_events(r)) - for r in rows - ] - ) diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index 457a11fd02..8800116570 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -31,6 +31,7 @@ class FilteringStore(SQLBaseStore): }, retcol="filter_json", allow_none=False, + desc="get_user_filter", ) defer.returnValue(json.loads(def_json)) diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 7101d2beec..7bf57234f6 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -32,6 +32,7 @@ class MediaRepositoryStore(SQLBaseStore): {"media_id": media_id}, ("media_type", "media_length", "upload_name", "created_ts"), allow_none=True, + desc="get_local_media", ) def store_local_media(self, media_id, media_type, time_now_ms, upload_name, @@ -45,7 +46,8 @@ class MediaRepositoryStore(SQLBaseStore): "upload_name": upload_name, "media_length": media_length, "user_id": user_id.to_string(), - } + }, + desc="store_local_media", ) def get_local_media_thumbnails(self, media_id): @@ -55,7 +57,8 @@ class MediaRepositoryStore(SQLBaseStore): ( "thumbnail_width", "thumbnail_height", "thumbnail_method", "thumbnail_type", "thumbnail_length", - ) + ), + desc="get_local_media_thumbnails", ) def store_local_thumbnail(self, media_id, thumbnail_width, @@ -70,7 +73,8 @@ class MediaRepositoryStore(SQLBaseStore): "thumbnail_method": thumbnail_method, "thumbnail_type": thumbnail_type, "thumbnail_length": thumbnail_length, - } + }, + desc="store_local_thumbnail", ) def get_cached_remote_media(self, origin, media_id): @@ -82,6 +86,7 @@ class MediaRepositoryStore(SQLBaseStore): "filesystem_id", ), allow_none=True, + desc="get_cached_remote_media", ) def store_cached_remote_media(self, origin, media_id, media_type, @@ -97,7 +102,8 @@ class MediaRepositoryStore(SQLBaseStore): "created_ts": time_now_ms, "upload_name": upload_name, "filesystem_id": filesystem_id, - } + }, + desc="store_cached_remote_media", ) def get_remote_media_thumbnails(self, origin, media_id): @@ -107,7 +113,8 @@ class MediaRepositoryStore(SQLBaseStore): ( "thumbnail_width", "thumbnail_height", "thumbnail_method", "thumbnail_type", "thumbnail_length", "filesystem_id", - ) + ), + desc="get_remote_media_thumbnails", ) def store_remote_media_thumbnail(self, origin, media_id, filesystem_id, @@ -125,5 +132,6 @@ class MediaRepositoryStore(SQLBaseStore): "thumbnail_type": thumbnail_type, "thumbnail_length": thumbnail_length, "filesystem_id": filesystem_id, - } + }, + desc="store_remote_media_thumbnail", ) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 1dcd34723b..87fba55439 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -21,6 +21,7 @@ class PresenceStore(SQLBaseStore): return self._simple_insert( table="presence", values={"user_id": user_localpart}, + desc="create_presence", ) def has_presence_state(self, user_localpart): @@ -29,6 +30,7 @@ class PresenceStore(SQLBaseStore): keyvalues={"user_id": user_localpart}, retcols=["user_id"], allow_none=True, + desc="has_presence_state", ) def get_presence_state(self, user_localpart): @@ -36,6 +38,7 @@ class PresenceStore(SQLBaseStore): table="presence", keyvalues={"user_id": user_localpart}, retcols=["state", "status_msg", "mtime"], + desc="get_presence_state", ) def set_presence_state(self, user_localpart, new_state): @@ -45,7 +48,7 @@ class PresenceStore(SQLBaseStore): updatevalues={"state": new_state["state"], "status_msg": new_state["status_msg"], "mtime": self._clock.time_msec()}, - retcols=["state"], + desc="set_presence_state", ) def allow_presence_visible(self, observed_localpart, observer_userid): @@ -53,6 +56,7 @@ class PresenceStore(SQLBaseStore): table="presence_allow_inbound", values={"observed_user_id": observed_localpart, "observer_user_id": observer_userid}, + desc="allow_presence_visible", ) def disallow_presence_visible(self, observed_localpart, observer_userid): @@ -60,6 +64,7 @@ class PresenceStore(SQLBaseStore): table="presence_allow_inbound", keyvalues={"observed_user_id": observed_localpart, "observer_user_id": observer_userid}, + desc="disallow_presence_visible", ) def is_presence_visible(self, observed_localpart, observer_userid): @@ -69,6 +74,7 @@ class PresenceStore(SQLBaseStore): "observer_user_id": observer_userid}, retcols=["observed_user_id"], allow_none=True, + desc="is_presence_visible", ) def add_presence_list_pending(self, observer_localpart, observed_userid): @@ -77,6 +83,7 @@ class PresenceStore(SQLBaseStore): values={"user_id": observer_localpart, "observed_user_id": observed_userid, "accepted": False}, + desc="add_presence_list_pending", ) def set_presence_list_accepted(self, observer_localpart, observed_userid): @@ -85,6 +92,7 @@ class PresenceStore(SQLBaseStore): keyvalues={"user_id": observer_localpart, "observed_user_id": observed_userid}, updatevalues={"accepted": True}, + desc="set_presence_list_accepted", ) def get_presence_list(self, observer_localpart, accepted=None): @@ -96,6 +104,7 @@ class PresenceStore(SQLBaseStore): table="presence_list", keyvalues=keyvalues, retcols=["observed_user_id", "accepted"], + desc="get_presence_list", ) def del_presence_list(self, observer_localpart, observed_userid): @@ -103,4 +112,5 @@ class PresenceStore(SQLBaseStore): table="presence_list", keyvalues={"user_id": observer_localpart, "observed_user_id": observed_userid}, + desc="del_presence_list", ) diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index 153c7ad027..a6e52cb248 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -21,6 +21,7 @@ class ProfileStore(SQLBaseStore): return self._simple_insert( table="profiles", values={"user_id": user_localpart}, + desc="create_profile", ) def get_profile_displayname(self, user_localpart): @@ -28,6 +29,7 @@ class ProfileStore(SQLBaseStore): table="profiles", keyvalues={"user_id": user_localpart}, retcol="displayname", + desc="get_profile_displayname", ) def set_profile_displayname(self, user_localpart, new_displayname): @@ -35,6 +37,7 @@ class ProfileStore(SQLBaseStore): table="profiles", keyvalues={"user_id": user_localpart}, updatevalues={"displayname": new_displayname}, + desc="set_profile_displayname", ) def get_profile_avatar_url(self, user_localpart): @@ -42,6 +45,7 @@ class ProfileStore(SQLBaseStore): table="profiles", keyvalues={"user_id": user_localpart}, retcol="avatar_url", + desc="get_profile_avatar_url", ) def set_profile_avatar_url(self, user_localpart, new_avatar_url): @@ -49,4 +53,5 @@ class ProfileStore(SQLBaseStore): table="profiles", keyvalues={"user_id": user_localpart}, updatevalues={"avatar_url": new_avatar_url}, + desc="set_profile_avatar_url", ) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index d769db2c78..c47bdc2861 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -50,7 +50,8 @@ class PushRuleStore(SQLBaseStore): results = yield self._simple_select_list( PushRuleEnableTable.table_name, {'user_name': user_name}, - PushRuleEnableTable.fields + PushRuleEnableTable.fields, + desc="get_push_rules_enabled_for_user", ) defer.returnValue( {r['rule_id']: False if r['enabled'] == 0 else True for r in results} @@ -201,7 +202,8 @@ class PushRuleStore(SQLBaseStore): """ yield self._simple_delete_one( PushRuleTable.table_name, - {'user_name': user_name, 'rule_id': rule_id} + {'user_name': user_name, 'rule_id': rule_id}, + desc="delete_push_rule", ) @defer.inlineCallbacks @@ -209,7 +211,8 @@ class PushRuleStore(SQLBaseStore): yield self._simple_upsert( PushRuleEnableTable.table_name, {'user_name': user_name, 'rule_id': rule_id}, - {'enabled': enabled} + {'enabled': enabled}, + desc="set_push_rule_enabled", ) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 587dada68f..000502b4ff 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -114,7 +114,9 @@ class PusherStore(SQLBaseStore): ts=pushkey_ts, lang=lang, data=data - )) + ), + desc="add_pusher", + ) except Exception as e: logger.error("create_pusher with failed: %s", e) raise StoreError(500, "Problem creating pusher.") @@ -123,7 +125,8 @@ class PusherStore(SQLBaseStore): def delete_pusher_by_app_id_pushkey(self, app_id, pushkey): yield self._simple_delete_one( PushersTable.table_name, - dict(app_id=app_id, pushkey=pushkey) + {"app_id": app_id, "pushkey": pushkey}, + desc="delete_pusher_by_app_id_pushkey", ) @defer.inlineCallbacks @@ -131,7 +134,8 @@ class PusherStore(SQLBaseStore): yield self._simple_update_one( PushersTable.table_name, {'app_id': app_id, 'pushkey': pushkey}, - {'last_token': last_token} + {'last_token': last_token}, + desc="update_pusher_last_token", ) @defer.inlineCallbacks @@ -140,7 +144,8 @@ class PusherStore(SQLBaseStore): yield self._simple_update_one( PushersTable.table_name, {'app_id': app_id, 'pushkey': pushkey}, - {'last_token': last_token, 'last_success': last_success} + {'last_token': last_token, 'last_success': last_success}, + desc="update_pusher_last_token_and_success", ) @defer.inlineCallbacks @@ -148,7 +153,8 @@ class PusherStore(SQLBaseStore): yield self._simple_update_one( PushersTable.table_name, {'app_id': app_id, 'pushkey': pushkey}, - {'failing_since': failing_since} + {'failing_since': failing_since}, + desc="update_pusher_failing_since", ) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index adc8fc0794..f24154f146 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -19,7 +19,7 @@ from sqlite3 import IntegrityError from synapse.api.errors import StoreError, Codes -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached class RegistrationStore(SQLBaseStore): @@ -39,7 +39,10 @@ class RegistrationStore(SQLBaseStore): Raises: StoreError if there was a problem adding this. """ - row = yield self._simple_select_one("users", {"name": user_id}, ["id"]) + row = yield self._simple_select_one( + "users", {"name": user_id}, ["id"], + desc="add_access_token_to_user", + ) if not row: raise StoreError(400, "Bad user ID supplied.") row_id = row["id"] @@ -48,7 +51,8 @@ class RegistrationStore(SQLBaseStore): { "user_id": row_id, "token": token - } + }, + desc="add_access_token_to_user", ) @defer.inlineCallbacks @@ -91,6 +95,11 @@ class RegistrationStore(SQLBaseStore): "get_user_by_id", self.cursor_to_dict, query, user_id ) + @cached() + # TODO(paul): Currently there's no code to invalidate this cache. That + # means if/when we ever add internal ways to invalidate access tokens or + # change whether a user is a server admin, those will need to invoke + # store.get_user_by_token.invalidate(token) def get_user_by_token(self, token): """Get a user from the given access token. @@ -115,6 +124,7 @@ class RegistrationStore(SQLBaseStore): keyvalues={"name": user.to_string()}, retcol="admin", allow_none=True, + desc="is_server_admin", ) defer.returnValue(res if res else False) diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py index 4e1a9a2783..0838eb3d12 100644 --- a/synapse/storage/rejections.py +++ b/synapse/storage/rejections.py @@ -29,7 +29,7 @@ class RejectionsStore(SQLBaseStore): "event_id": event_id, "reason": reason, "last_check": self._clock.time_msec(), - } + }, ) def get_rejection_reason(self, event_id): @@ -40,4 +40,5 @@ class RejectionsStore(SQLBaseStore): "event_id": event_id, }, allow_none=True, + desc="get_rejection_reason", ) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 549c9af393..be3e28c2ea 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -15,11 +15,9 @@ from twisted.internet import defer -from sqlite3 import IntegrityError - from synapse.api.errors import StoreError -from ._base import SQLBaseStore, Table +from ._base import SQLBaseStore import collections import logging @@ -27,8 +25,9 @@ import logging logger = logging.getLogger(__name__) -OpsLevel = collections.namedtuple("OpsLevel", ( - "ban_level", "kick_level", "redact_level") +OpsLevel = collections.namedtuple( + "OpsLevel", + ("ban_level", "kick_level", "redact_level",) ) @@ -47,13 +46,15 @@ class RoomStore(SQLBaseStore): StoreError if the room could not be stored. """ try: - yield self._simple_insert(RoomsTable.table_name, dict( - room_id=room_id, - creator=room_creator_user_id, - is_public=is_public - )) - except IntegrityError: - raise StoreError(409, "Room ID in use.") + yield self._simple_insert( + RoomsTable.table_name, + { + "room_id": room_id, + "creator": room_creator_user_id, + "is_public": is_public, + }, + desc="store_room", + ) except Exception as e: logger.error("store_room with room_id=%s failed: %s", room_id, e) raise StoreError(500, "Problem creating room.") @@ -66,9 +67,11 @@ class RoomStore(SQLBaseStore): Returns: A namedtuple containing the room information, or an empty list. """ - query = RoomsTable.select_statement("room_id=?") - return self._execute( - "get_room", RoomsTable.decode_single_result, query, room_id, + return self._simple_select_one( + table=RoomsTable.table_name, + keyvalues={"room_id": room_id}, + retcols=RoomsTable.fields, + desc="get_room", ) @defer.inlineCallbacks @@ -143,7 +146,7 @@ class RoomStore(SQLBaseStore): "event_id": event.event_id, "room_id": event.room_id, "topic": event.content["topic"], - } + }, ) def _store_room_name_txn(self, txn, event): @@ -158,8 +161,45 @@ class RoomStore(SQLBaseStore): } ) + @defer.inlineCallbacks + def get_room_name_and_aliases(self, room_id): + del_sql = ( + "SELECT event_id FROM redactions WHERE redacts = e.event_id " + "LIMIT 1" + ) + + sql = ( + "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " + "INNER JOIN current_state_events as c ON e.event_id = c.event_id " + "INNER JOIN state_events as s ON e.event_id = s.event_id " + "WHERE c.room_id = ? " + ) % { + "redacted": del_sql, + } + + sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')" + sql += " OR s.type = 'm.room.aliases')" + args = (room_id,) -class RoomsTable(Table): + results = yield self._execute_and_decode("get_current_state", sql, *args) + + events = yield self._parse_events(results) + + name = None + aliases = [] + + for e in events: + if e.type == 'm.room.name': + if 'name' in e.content: + name = e.content['name'] + elif e.type == 'm.room.aliases': + if 'aliases' in e.content: + aliases.extend(e.content['aliases']) + + defer.returnValue((name, aliases)) + + +class RoomsTable(object): table_name = "rooms" fields = [ @@ -167,5 +207,3 @@ class RoomsTable(Table): "is_public", "creator" ] - - EntryType = collections.namedtuple("RoomEntry", fields) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 65ffb4627f..52c37c76f5 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -212,7 +212,8 @@ class RoomMemberStore(SQLBaseStore): return self._simple_select_onecol( "room_hosts", {"room_id": room_id}, - "host" + "host", + desc="get_joined_hosts_for_room", ) def _get_members_by_dict(self, where_dict): diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 71db16d0e5..58dbf2802b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -15,6 +15,8 @@ from ._base import SQLBaseStore +from twisted.internet import defer + import logging logger = logging.getLogger(__name__) @@ -82,7 +84,7 @@ class StateStore(SQLBaseStore): if context.current_state is None: return - state_events = context.current_state + state_events = dict(context.current_state) if event.is_state(): state_events[(event.type, event.state_key)] = event @@ -122,3 +124,33 @@ class StateStore(SQLBaseStore): }, or_replace=True, ) + + @defer.inlineCallbacks + def get_current_state(self, room_id, event_type=None, state_key=""): + del_sql = ( + "SELECT event_id FROM redactions WHERE redacts = e.event_id " + "LIMIT 1" + ) + + sql = ( + "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " + "INNER JOIN current_state_events as c ON e.event_id = c.event_id " + "INNER JOIN state_events as s ON e.event_id = s.event_id " + "WHERE c.room_id = ? " + ) % { + "redacted": del_sql, + } + + if event_type and state_key is not None: + sql += " AND s.type = ? AND s.state_key = ? " + args = (room_id, event_type, state_key) + elif event_type: + sql += " AND s.type = ?" + args = (room_id, event_type) + else: + args = (room_id, ) + + results = yield self._execute_and_decode("get_current_state", sql, *args) + + events = yield self._parse_events(results) + defer.returnValue(events) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 09bc522210..66f307e640 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -35,7 +35,7 @@ what sort order was used: from twisted.internet import defer -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.util.logutils import log_function @@ -413,12 +413,32 @@ class StreamStore(SQLBaseStore): "get_recent_events_for_room", get_recent_events_for_room_txn ) + @cached(num_args=0) def get_room_events_max_id(self): return self.runInteraction( "get_room_events_max_id", self._get_room_events_max_id_txn ) + @defer.inlineCallbacks + def _get_min_token(self): + row = yield self._execute( + "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events" + ) + + self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 + self.min_token = min(self.min_token, -1) + + logger.debug("min_token is: %s", self.min_token) + + defer.returnValue(self.min_token) + + def get_next_stream_id(self): + with self._next_stream_id_lock: + i = self._next_stream_id + self._next_stream_id += 1 + return i + def _get_room_events_max_id_txn(self, txn): txn.execute( "SELECT MAX(stream_ordering) as m FROM events" diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 0b8a3b7a07..b777395e06 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -46,15 +46,19 @@ class TransactionStore(SQLBaseStore): ) def _get_received_txn_response(self, txn, transaction_id, origin): - where_clause = "transaction_id = ? AND origin = ?" - query = ReceivedTransactionsTable.select_statement(where_clause) - - txn.execute(query, (transaction_id, origin)) - - results = ReceivedTransactionsTable.decode_results(txn.fetchall()) + result = self._simple_select_one_txn( + txn, + table=ReceivedTransactionsTable.table_name, + keyvalues={ + "transaction_id": transaction_id, + "origin": origin, + }, + retcols=ReceivedTransactionsTable.fields, + allow_none=True, + ) - if results and results[0].response_code: - return (results[0].response_code, results[0].response_json) + if result and result.response_code: + return result["response_code"], result["response_json"] else: return None |