diff options
Diffstat (limited to 'synapse/storage')
37 files changed, 2037 insertions, 645 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 4beb951b9f..a3ff995695 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.api.constants import EventTypes +from .appservice import ApplicationServiceStore from .directory import DirectoryStore from .feedback import FeedbackStore from .presence import PresenceStore @@ -29,10 +30,14 @@ from .stream import StreamStore from .transactions import TransactionStore from .keys import KeyStore from .event_federation import EventFederationStore +from .pusher import PusherStore +from .push_rule import PushRuleStore from .media_repository import MediaRepositoryStore +from .rejections import RejectionsStore from .state import StateStore from .signatures import SignatureStore +from .filtering import FilteringStore from syutil.base64util import decode_base64 from syutil.jsonutil import encode_canonical_json @@ -40,33 +45,21 @@ from syutil.jsonutil import encode_canonical_json from synapse.crypto.event_signing import compute_event_reference_hash -import json +import fnmatch +import imp import logging import os +import re logger = logging.getLogger(__name__) -SCHEMAS = [ - "transactions", - "users", - "profiles", - "presence", - "im", - "room_aliases", - "keys", - "redactions", - "state", - "event_edges", - "event_signatures", - "media_repository", -] +# Remember to update this number every time a change is made to database +# schema files, so the users will be informed on server restarts. +SCHEMA_VERSION = 14 - -# Remember to update this number every time an incompatible change is made to -# database schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 11 +dir_path = os.path.abspath(os.path.dirname(__file__)) class _RollbackButIsFineException(Exception): @@ -80,8 +73,13 @@ class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, TransactionStore, DirectoryStore, KeyStore, StateStore, SignatureStore, + ApplicationServiceStore, EventFederationStore, MediaRepositoryStore, + RejectionsStore, + FilteringStore, + PusherStore, + PushRuleStore ): def __init__(self, hs): @@ -117,21 +115,147 @@ class DataStore(RoomMemberStore, RoomStore, pass @defer.inlineCallbacks - def get_event(self, event_id, allow_none=False): - events = yield self._get_events([event_id]) + def get_event(self, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False, + allow_none=False): + """Get an event from the database by event_id. + + Args: + event_id (str): The event_id of the event to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + allow_none (bool): If True, return None if no event found, if + False throw an exception. + + Returns: + Deferred : A FrozenEvent. + """ + event = yield self.runInteraction( + "get_event", self._get_event_txn, + event_id, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) - if not events: - if allow_none: - defer.returnValue(None) - else: - raise RuntimeError("Could not find event %s" % (event_id,)) + if not event and not allow_none: + raise RuntimeError("Could not find event %s" % (event_id,)) - defer.returnValue(events[0]) + defer.returnValue(event) @log_function def _persist_event_txn(self, txn, event, context, backfilled, stream_ordering=None, is_new_state=True, current_state=None): + + # Remove the any existing cache entries for the event_id + self._get_event_cache.pop(event.event_id) + + # We purposefully do this first since if we include a `current_state` + # key, we *want* to update the `current_state_events` table + if current_state: + txn.execute( + "DELETE FROM current_state_events WHERE room_id = ?", + (event.room_id,) + ) + + for s in current_state: + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": s.event_id, + "room_id": s.room_id, + "type": s.type, + "state_key": s.state_key, + }, + or_replace=True, + ) + + if event.is_state() and is_new_state: + if not backfilled and not context.rejected: + self._simple_insert_txn( + txn, + table="state_forward_extremities", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + or_replace=True, + ) + + for prev_state_id, _ in event.prev_state: + self._simple_delete_txn( + txn, + table="state_forward_extremities", + keyvalues={ + "event_id": prev_state_id, + } + ) + + outlier = event.internal_metadata.is_outlier() + + if not outlier: + self._store_state_groups_txn(txn, event, context) + + self._update_min_depth_for_room_txn( + txn, + event.room_id, + event.depth + ) + + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + + have_persisted = self._simple_select_one_onecol_txn( + txn, + table="event_json", + keyvalues={"event_id": event.event_id}, + retcol="event_id", + allow_none=True, + ) + + metadata_json = encode_canonical_json( + event.internal_metadata.get_dict() + ) + + # If we have already persisted this event, we don't need to do any + # more processing. + # The processing above must be done on every call to persist event, + # since they might not have happened on previous calls. For example, + # if we are persisting an event that we had persisted as an outlier, + # but is no longer one. + if have_persisted: + if not outlier: + sql = ( + "UPDATE event_json SET internal_metadata = ?" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (metadata_json.decode("UTF-8"), event.event_id,) + ) + + sql = ( + "UPDATE events SET outlier = 0" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (event.event_id,) + ) + return + if event.type == EventTypes.Member: self._store_room_member_txn(txn, event) elif event.type == EventTypes.Feedback: @@ -143,8 +267,6 @@ class DataStore(RoomMemberStore, RoomStore, elif event.type == EventTypes.Redaction: self._store_redaction(txn, event) - outlier = event.internal_metadata.is_outlier() - event_dict = { k: v for k, v in event.get_dict().items() @@ -154,10 +276,6 @@ class DataStore(RoomMemberStore, RoomStore, ] } - metadata_json = encode_canonical_json( - event.internal_metadata.get_dict() - ) - self._simple_insert_txn( txn, table="event_json", @@ -170,12 +288,16 @@ class DataStore(RoomMemberStore, RoomStore, or_replace=True, ) + content = encode_canonical_json( + event.content + ).decode("UTF-8") + vals = { "topological_ordering": event.depth, "event_id": event.event_id, "type": event.type, "room_id": event.room_id, - "content": json.dumps(event.get_dict()["content"]), + "content": content, "processed": True, "outlier": outlier, "depth": event.depth, @@ -195,7 +317,10 @@ class DataStore(RoomMemberStore, RoomStore, "prev_events", ] } - vals["unrecognized_keys"] = json.dumps(unrec) + + vals["unrecognized_keys"] = encode_canonical_json( + unrec + ).decode("UTF-8") try: self._simple_insert_txn( @@ -213,38 +338,10 @@ class DataStore(RoomMemberStore, RoomStore, ) raise _RollbackButIsFineException("_persist_event") - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, - ) - - if not outlier: - self._store_state_groups_txn(txn, event, context) - - if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) - ) - - for s in current_state: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": s.event_id, - "room_id": s.room_id, - "type": s.type, - "state_key": s.state_key, - }, - or_replace=True, - ) + if context.rejected: + self._store_rejections_txn(txn, event.event_id, context.rejected) - is_state = hasattr(event, "state_key") and event.state_key is not None - if is_state: + if event.is_state(): vals = { "event_id": event.event_id, "room_id": event.room_id, @@ -252,6 +349,7 @@ class DataStore(RoomMemberStore, RoomStore, "state_key": event.state_key, } + # TODO: How does this work with backfilling? if hasattr(event, "replaces_state"): vals["prev_state"] = event.replaces_state @@ -262,7 +360,7 @@ class DataStore(RoomMemberStore, RoomStore, or_replace=True, ) - if is_new_state: + if is_new_state and not context.rejected: self._simple_insert_txn( txn, "current_state_events", @@ -288,28 +386,6 @@ class DataStore(RoomMemberStore, RoomStore, or_ignore=True, ) - if not backfilled: - self._simple_insert_txn( - txn, - table="state_forward_extremities", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - or_replace=True, - ) - - for prev_state_id, _ in event.prev_state: - self._simple_delete_txn( - txn, - table="state_forward_extremities", - keyvalues={ - "event_id": prev_state_id, - } - ) - for hash_alg, hash_base64 in event.hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_event_content_hash_txn( @@ -340,14 +416,9 @@ class DataStore(RoomMemberStore, RoomStore, txn, event.event_id, ref_alg, ref_hash_bytes ) - if not outlier: - self._update_min_depth_for_room_txn( - txn, - event.room_id, - event.depth - ) - def _store_redaction(self, txn, event): + # invalidate the cache for the redacted event + self._get_event_cache.pop(event.redacts) txn.execute( "INSERT OR IGNORE INTO redactions " "(event_id, redacts) VALUES (?,?)", @@ -370,9 +441,12 @@ class DataStore(RoomMemberStore, RoomStore, "redacted": del_sql, } - if event_type: + if event_type and state_key is not None: sql += " AND s.type = ? AND s.state_key = ? " args = (room_id, event_type, state_key) + elif event_type: + sql += " AND s.type = ?" + args = (room_id, event_type) else: args = (room_id, ) @@ -382,6 +456,41 @@ class DataStore(RoomMemberStore, RoomStore, defer.returnValue(events) @defer.inlineCallbacks + def get_room_name_and_aliases(self, room_id): + del_sql = ( + "SELECT event_id FROM redactions WHERE redacts = e.event_id " + "LIMIT 1" + ) + + sql = ( + "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " + "INNER JOIN current_state_events as c ON e.event_id = c.event_id " + "INNER JOIN state_events as s ON e.event_id = s.event_id " + "WHERE c.room_id = ? " + ) % { + "redacted": del_sql, + } + + sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')" + sql += " OR s.type = 'm.room.aliases')" + args = (room_id,) + + results = yield self._execute_and_decode(sql, *args) + + events = yield self._parse_events(results) + + name = None + aliases = [] + + for e in events: + if e.type == 'm.room.name': + name = e.content['name'] + elif e.type == 'm.room.aliases': + aliases.extend(e.content['aliases']) + + defer.returnValue((name, aliases)) + + @defer.inlineCallbacks def _get_min_token(self): row = yield self._execute( None, @@ -417,30 +526,48 @@ class DataStore(RoomMemberStore, RoomStore, ], ) + def have_events(self, event_ids): + """Given a list of event ids, check if we have already processed them. + + Returns: + dict: Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps to + None. + """ + if not event_ids: + return defer.succeed({}) + + def f(txn): + sql = ( + "SELECT e.event_id, reason FROM events as e " + "LEFT JOIN rejections as r ON e.event_id = r.event_id " + "WHERE e.event_id = ?" + ) -def schema_path(schema): - """ Get a filesystem path for the named database schema + res = {} + for event_id in event_ids: + txn.execute(sql, (event_id,)) + row = txn.fetchone() + if row: + _, rejected = row + res[event_id] = rejected - Args: - schema: Name of the database schema. - Returns: - A filesystem path pointing at a ".sql" file. + return res - """ - dir_path = os.path.dirname(__file__) - schemaPath = os.path.join(dir_path, "schema", schema + ".sql") - return schemaPath + return self.runInteraction( + "have_events", f, + ) -def read_schema(schema): +def read_schema(path): """ Read the named database schema. Args: - schema: Name of the datbase schema. + path: Path of the database schema. Returns: A string containing the database schema. """ - with open(schema_path(schema)) as schema_file: + with open(path) as schema_file: return schema_file.read() @@ -453,46 +580,275 @@ class UpgradeDatabaseException(PrepareDatabaseException): def prepare_database(db_conn): - """ Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we - don't have to worry about overwriting existing content. + """Prepares a database for usage. Will either create all necessary tables + or upgrade from an older schema version. """ - c = db_conn.cursor() - c.execute("PRAGMA user_version") - row = c.fetchone() + try: + cur = db_conn.cursor() + version_info = _get_or_create_schema_state(cur) + + if version_info: + user_version, delta_files, upgraded = version_info + _upgrade_existing_database(cur, user_version, delta_files, upgraded) + else: + _setup_new_database(cur) - if row and row[0]: - user_version = row[0] + cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,)) - if user_version > SCHEMA_VERSION: - raise ValueError( - "Cannot use this database as it is too " + - "new for the server to understand" - ) - elif user_version < SCHEMA_VERSION: - logger.info( - "Upgrading database from version %d", - user_version + cur.close() + db_conn.commit() + except: + db_conn.rollback() + raise + + +def _setup_new_database(cur): + """Sets up the database by finding a base set of "full schemas" and then + applying any necessary deltas. + + The "full_schemas" directory has subdirectories named after versions. This + function searches for the highest version less than or equal to + `SCHEMA_VERSION` and executes all .sql files in that directory. + + The function will then apply all deltas for all versions after the base + version. + + Example directory structure: + + schema/ + delta/ + ... + full_schemas/ + 3/ + test.sql + ... + 11/ + foo.sql + bar.sql + ... + + In the example foo.sql and bar.sql would be run, and then any delta files + for versions strictly greater than 11. + """ + current_dir = os.path.join(dir_path, "schema", "full_schemas") + directory_entries = os.listdir(current_dir) + + valid_dirs = [] + pattern = re.compile(r"^\d+(\.sql)?$") + for filename in directory_entries: + match = pattern.match(filename) + abs_path = os.path.join(current_dir, filename) + if match and os.path.isdir(abs_path): + ver = int(match.group(0)) + if ver <= SCHEMA_VERSION: + valid_dirs.append((ver, abs_path)) + else: + logger.warn("Unexpected entry in 'full_schemas': %s", filename) + + if not valid_dirs: + raise PrepareDatabaseException( + "Could not find a suitable base set of full schemas" + ) + + max_current_ver, sql_dir = max(valid_dirs, key=lambda x: x[0]) + + logger.debug("Initialising schema v%d", max_current_ver) + + directory_entries = os.listdir(sql_dir) + + sql_script = "BEGIN TRANSACTION;\n" + for filename in fnmatch.filter(directory_entries, "*.sql"): + sql_loc = os.path.join(sql_dir, filename) + logger.debug("Applying schema %s", sql_loc) + sql_script += read_schema(sql_loc) + sql_script += "\n" + sql_script += "COMMIT TRANSACTION;" + cur.executescript(sql_script) + + cur.execute( + "INSERT OR REPLACE INTO schema_version (version, upgraded)" + " VALUES (?,?)", + (max_current_ver, False) + ) + + _upgrade_existing_database( + cur, + current_version=max_current_ver, + applied_delta_files=[], + upgraded=False + ) + + +def _upgrade_existing_database(cur, current_version, applied_delta_files, + upgraded): + """Upgrades an existing database. + + Delta files can either be SQL stored in *.sql files, or python modules + in *.py. + + There can be multiple delta files per version. Synapse will keep track of + which delta files have been applied, and will apply any that haven't been + even if there has been no version bump. This is useful for development + where orthogonal schema changes may happen on separate branches. + + Different delta files for the same version *must* be orthogonal and give + the same result when applied in any order. No guarantees are made on the + order of execution of these scripts. + + This is a no-op of current_version == SCHEMA_VERSION. + + Example directory structure: + + schema/ + delta/ + 11/ + foo.sql + ... + 12/ + foo.sql + bar.py + ... + full_schemas/ + ... + + In the example, if current_version is 11, then foo.sql will be run if and + only if `upgraded` is True. Then `foo.sql` and `bar.py` would be run in + some arbitrary order. + + Args: + cur (Cursor) + current_version (int): The current version of the schema. + applied_delta_files (list): A list of deltas that have already been + applied. + upgraded (bool): Whether the current version was generated by having + applied deltas or from full schema file. If `True` the function + will never apply delta files for the given `current_version`, since + the current_version wasn't generated by applying those delta files. + """ + + if current_version > SCHEMA_VERSION: + raise ValueError( + "Cannot use this database as it is too " + + "new for the server to understand" + ) + + start_ver = current_version + if not upgraded: + start_ver += 1 + + for v in range(start_ver, SCHEMA_VERSION + 1): + logger.debug("Upgrading schema to v%d", v) + + delta_dir = os.path.join(dir_path, "schema", "delta", str(v)) + + try: + directory_entries = os.listdir(delta_dir) + except OSError: + logger.exception("Could not open delta dir for version %d", v) + raise UpgradeDatabaseException( + "Could not open delta dir for version %d" % (v,) ) - # Run every version since after the current version. - for v in range(user_version + 1, SCHEMA_VERSION + 1): - if v == 10: - raise UpgradeDatabaseException( - "No delta for version 10" + directory_entries.sort() + for file_name in directory_entries: + relative_path = os.path.join(str(v), file_name) + if relative_path in applied_delta_files: + continue + + absolute_path = os.path.join( + dir_path, "schema", "delta", relative_path, + ) + root_name, ext = os.path.splitext(file_name) + if ext == ".py": + # This is a python upgrade module. We need to import into some + # package and then execute its `run_upgrade` function. + module_name = "synapse.storage.v%d_%s" % ( + v, root_name + ) + with open(absolute_path) as python_file: + module = imp.load_source( + module_name, absolute_path, python_file ) - sql_script = read_schema("delta/v%d" % (v)) - c.executescript(sql_script) - - db_conn.commit() - - else: - sql_script = "BEGIN TRANSACTION;\n" - for sql_loc in SCHEMAS: - sql_script += read_schema(sql_loc) - sql_script += "\n" - sql_script += "COMMIT TRANSACTION;" - c.executescript(sql_script) - db_conn.commit() - c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION) + logger.debug("Running script %s", relative_path) + module.run_upgrade(cur) + elif ext == ".sql": + # A plain old .sql file, just read and execute it + delta_schema = read_schema(absolute_path) + logger.debug("Applying schema %s", relative_path) + cur.executescript(delta_schema) + else: + # Not a valid delta file. + logger.warn( + "Found directory entry that did not end in .py or" + " .sql: %s", + relative_path, + ) + continue + + # Mark as done. + cur.execute( + "INSERT INTO applied_schema_deltas (version, file)" + " VALUES (?,?)", + (v, relative_path) + ) - c.close() + cur.execute( + "INSERT OR REPLACE INTO schema_version (version, upgraded)" + " VALUES (?,?)", + (v, True) + ) + + +def _get_or_create_schema_state(txn): + schema_path = os.path.join( + dir_path, "schema", "schema_version.sql", + ) + create_schema = read_schema(schema_path) + txn.executescript(create_schema) + + txn.execute("SELECT version, upgraded FROM schema_version") + row = txn.fetchone() + current_version = int(row[0]) if row else None + upgraded = bool(row[1]) if row else None + + if current_version: + txn.execute( + "SELECT file FROM applied_schema_deltas WHERE version >= ?", + (current_version,) + ) + return current_version, txn.fetchall(), upgraded + + return None + + +def prepare_sqlite3_database(db_conn): + """This function should be called before `prepare_database` on sqlite3 + databases. + + Since we changed the way we store the current schema version and handle + updates to schemas, we need a way to upgrade from the old method to the + new. This only affects sqlite databases since they were the only ones + supported at the time. + """ + with db_conn: + schema_path = os.path.join( + dir_path, "schema", "schema_version.sql", + ) + create_schema = read_schema(schema_path) + db_conn.executescript(create_schema) + + c = db_conn.execute("SELECT * FROM schema_version") + rows = c.fetchall() + c.close() + + if not rows: + c = db_conn.execute("PRAGMA user_version") + row = c.fetchone() + c.close() + + if row and row[0]: + db_conn.execute( + "INSERT OR REPLACE INTO schema_version (version, upgraded)" + " VALUES (?,?)", + (row[0], False) + ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index ce63f12008..3725c9795d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -19,11 +19,12 @@ from synapse.events import FrozenEvent from synapse.events.utils import prune_event from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext, LoggingContext +from synapse.util.lrucache import LruCache from twisted.internet import defer -import collections -import json +from collections import namedtuple, OrderedDict +import simplejson as json import sys import time @@ -34,6 +35,52 @@ sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") +# TODO(paul): +# * more generic key management +# * export monitoring stats +# * consider other eviction strategies - LRU? +def cached(max_entries=1000): + """ A method decorator that applies a memoizing cache around the function. + + The function is presumed to take one additional argument, which is used as + the key for the cache. Cache hits are served directly from the cache; + misses use the function body to generate the value. + + The wrapped function has an additional member, a callable called + "invalidate". This can be used to remove individual entries from the cache. + + The wrapped function has another additional callable, called "prefill", + which can be used to insert values into the cache specifically, without + calling the calculation function. + """ + def wrap(orig): + cache = OrderedDict() + + def prefill(key, value): + while len(cache) > max_entries: + cache.popitem(last=False) + + cache[key] = value + + @defer.inlineCallbacks + def wrapped(self, key): + if key in cache: + defer.returnValue(cache[key]) + + ret = yield orig(self, key) + prefill(key, ret) + defer.returnValue(ret) + + def invalidate(key): + cache.pop(key, None) + + wrapped.invalidate = invalidate + wrapped.prefill = prefill + return wrapped + + return wrap + + class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging to the .execute() method.""" @@ -77,6 +124,43 @@ class LoggingTransaction(object): sql_logger.debug("[SQL time] {%s} %f", self.name, end - start) +class PerformanceCounters(object): + def __init__(self): + self.current_counters = {} + self.previous_counters = {} + + def update(self, key, start_time, end_time=None): + if end_time is None: + end_time = time.time() * 1000 + duration = end_time - start_time + count, cum_time = self.current_counters.get(key, (0, 0)) + count += 1 + cum_time += duration + self.current_counters[key] = (count, cum_time) + return end_time + + def interval(self, interval_duration, limit=3): + counters = [] + for name, (count, cum_time) in self.current_counters.items(): + prev_count, prev_time = self.previous_counters.get(name, (0, 0)) + counters.append(( + (cum_time - prev_time) / interval_duration, + count - prev_count, + name + )) + + self.previous_counters = dict(self.current_counters) + + counters.sort(reverse=True) + + top_n_counters = ", ".join( + "%s(%d): %.3f%%" % (name, count, 100 * ratio) + for ratio, count, name in counters[:limit] + ) + + return top_n_counters + + class SQLBaseStore(object): _TXN_ID = 0 @@ -85,6 +169,43 @@ class SQLBaseStore(object): self._db_pool = hs.get_db_pool() self._clock = hs.get_clock() + self._previous_txn_total_time = 0 + self._current_txn_total_time = 0 + self._previous_loop_ts = 0 + self._txn_perf_counters = PerformanceCounters() + self._get_event_counters = PerformanceCounters() + + self._get_event_cache = LruCache(hs.config.event_cache_size) + + def start_profiling(self): + self._previous_loop_ts = self._clock.time_msec() + + def loop(): + curr = self._current_txn_total_time + prev = self._previous_txn_total_time + self._previous_txn_total_time = curr + + time_now = self._clock.time_msec() + time_then = self._previous_loop_ts + self._previous_loop_ts = time_now + + ratio = (curr - prev)/(time_now - time_then) + + top_three_counters = self._txn_perf_counters.interval( + time_now - time_then, limit=3 + ) + + top_3_event_counters = self._get_event_counters.interval( + time_now - time_then, limit=3 + ) + + logger.info( + "Total database time: %.3f%% {%s} {%s}", + ratio * 100, top_three_counters, top_3_event_counters + ) + + self._clock.looping_call(loop, 10000) + @defer.inlineCallbacks def runInteraction(self, desc, func, *args, **kwargs): """Wraps the .runInteraction() method on the underlying db_pool.""" @@ -94,8 +215,7 @@ class SQLBaseStore(object): with LoggingContext("runInteraction") as context: current_context.copy_to(context) start = time.time() * 1000 - txn_id = SQLBaseStore._TXN_ID - SQLBaseStore._TXN_ID += 1 + txn_id = self._TXN_ID # We don't really need these to be unique, so lets stop it from # growing really large. @@ -115,6 +235,10 @@ class SQLBaseStore(object): "[TXN END] {%s} %f", name, end - start ) + + self._current_txn_total_time += end - start + self._txn_perf_counters.update(desc, start, end) + with PreserveLoggingContext(): result = yield self._db_pool.runInteraction( inner_func, *args, **kwargs @@ -194,6 +318,50 @@ class SQLBaseStore(object): txn.execute(sql, values.values()) return txn.lastrowid + def _simple_upsert(self, table, keyvalues, values): + """ + Args: + table (str): The table to upsert into + keyvalues (dict): The unique key tables and their new values + values (dict): The nonunique columns and their new values + Returns: A deferred + """ + return self.runInteraction( + "_simple_upsert", + self._simple_upsert_txn, table, keyvalues, values + ) + + def _simple_upsert_txn(self, txn, table, keyvalues, values): + # Try to update + sql = "UPDATE %s SET %s WHERE %s" % ( + table, + ", ".join("%s = ?" % (k,) for k in values), + " AND ".join("%s = ?" % (k,) for k in keyvalues) + ) + sqlargs = values.values() + keyvalues.values() + logger.debug( + "[SQL] %s Args=%s", + sql, sqlargs, + ) + + txn.execute(sql, sqlargs) + if txn.rowcount == 0: + # We didn't update and rows so insert a new one + allvalues = {} + allvalues.update(keyvalues) + allvalues.update(values) + + sql = "INSERT INTO %s (%s) VALUES (%s)" % ( + table, + ", ".join(k for k in allvalues), + ", ".join("?" for _ in allvalues) + ) + logger.debug( + "[SQL] %s Args=%s", + sql, keyvalues.values(), + ) + txn.execute(sql, allvalues.values()) + def _simple_select_one(self, table, keyvalues, retcols, allow_none=False): """Executes a SELECT query on the named table, which is expected to @@ -282,7 +450,8 @@ class SQLBaseStore(object): Args: table : string giving the table name - keyvalues : dict of column names and values to select the rows with + keyvalues : dict of column names and values to select the rows with, + or None to not apply a WHERE clause. retcols : list of strings giving the names of the columns to return """ return self.runInteraction( @@ -301,13 +470,20 @@ class SQLBaseStore(object): keyvalues : dict of column names and values to select the rows with retcols : list of strings giving the names of the columns to return """ - sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( - ", ".join(retcols), - table, - " AND ".join("%s = ?" % (k, ) for k in keyvalues) - ) + if keyvalues: + sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( + ", ".join(retcols), + table, + " AND ".join("%s = ?" % (k, ) for k in keyvalues) + ) + txn.execute(sql, keyvalues.values()) + else: + sql = "SELECT %s FROM %s ORDER BY rowid asc" % ( + ", ".join(retcols), + table + ) + txn.execute(sql) - txn.execute(sql, keyvalues.values()) return self.cursor_to_dict(txn) def _simple_update_one(self, table, keyvalues, updatevalues, @@ -345,8 +521,8 @@ class SQLBaseStore(object): if updatevalues: update_sql = "UPDATE %s SET %s WHERE %s" % ( table, - ", ".join("%s = ?" % (k) for k in updatevalues), - " AND ".join("%s = ?" % (k) for k in keyvalues) + ", ".join("%s = ?" % (k,) for k in updatevalues), + " AND ".join("%s = ?" % (k,) for k in keyvalues) ) def func(txn): @@ -459,10 +635,26 @@ class SQLBaseStore(object): return [e for e in events if e] def _get_event_txn(self, txn, event_id, check_redacted=True, - get_prev_content=False): + get_prev_content=False, allow_rejected=False): + + start_time = time.time() * 1000 + update_counter = self._get_event_counters.update + + cache = self._get_event_cache.setdefault(event_id, {}) + + try: + # Separate cache entries for each way to invoke _get_event_txn + return cache[(check_redacted, get_prev_content, allow_rejected)] + except KeyError: + pass + finally: + start_time = update_counter("event_cache", start_time) + sql = ( - "SELECT internal_metadata, json, r.event_id FROM event_json as e " + "SELECT e.internal_metadata, e.json, r.event_id, rej.reason " + "FROM event_json as e " "LEFT JOIN redactions as r ON e.event_id = r.redacts " + "LEFT JOIN rejections as rej on rej.event_id = e.event_id " "WHERE e.event_id = ? " "LIMIT 1 " ) @@ -474,20 +666,35 @@ class SQLBaseStore(object): if not res: return None - internal_metadata, js, redacted = res + internal_metadata, js, redacted, rejected_reason = res - return self._get_event_from_row_txn( - txn, internal_metadata, js, redacted, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - ) + start_time = update_counter("select_event", start_time) + + if allow_rejected or not rejected_reason: + result = self._get_event_from_row_txn( + txn, internal_metadata, js, redacted, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + ) + cache[(check_redacted, get_prev_content, allow_rejected)] = result + return result + else: + return None def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False): + + start_time = time.time() * 1000 + update_counter = self._get_event_counters.update + d = json.loads(js) + start_time = update_counter("decode_json", start_time) + internal_metadata = json.loads(internal_metadata) + start_time = update_counter("decode_internal", start_time) ev = FrozenEvent(d, internal_metadata_dict=internal_metadata) + start_time = update_counter("build_frozen_event", start_time) if check_redacted and redacted: ev = prune_event(ev) @@ -503,6 +710,7 @@ class SQLBaseStore(object): if because: ev.unsigned["redacted_because"] = because + start_time = update_counter("redact_event", start_time) if get_prev_content and "replaces_state" in ev.unsigned: prev = self._get_event_txn( @@ -512,6 +720,7 @@ class SQLBaseStore(object): ) if prev: ev.unsigned["prev_content"] = prev.get_dict()["content"] + start_time = update_counter("get_prev_content", start_time) return ev @@ -632,7 +841,7 @@ class JoinHelper(object): for table in self.tables: res += [f for f in table.fields if f not in res] - self.EntryType = collections.namedtuple("JoinHelperEntry", res) + self.EntryType = namedtuple("JoinHelperEntry", res) def get_fields(self, **prefixes): """Get a string representing a list of fields for use in SELECT diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py new file mode 100644 index 0000000000..e30265750a --- /dev/null +++ b/synapse/storage/appservice.py @@ -0,0 +1,338 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import simplejson +from simplejson import JSONDecodeError +from twisted.internet import defer + +from synapse.api.constants import Membership +from synapse.api.errors import StoreError +from synapse.appservice import ApplicationService +from synapse.storage.roommember import RoomsForUser +from ._base import SQLBaseStore + + +logger = logging.getLogger(__name__) + + +def log_failure(failure): + logger.error("Failed to detect application services: %s", failure.value) + logger.error(failure.getTraceback()) + + +class ApplicationServiceStore(SQLBaseStore): + + def __init__(self, hs): + super(ApplicationServiceStore, self).__init__(hs) + self.services_cache = [] + self.cache_defer = self._populate_cache() + self.cache_defer.addErrback(log_failure) + + @defer.inlineCallbacks + def unregister_app_service(self, token): + """Unregisters this service. + + This removes all AS specific regex and the base URL. The token is the + only thing preserved for future registration attempts. + """ + yield self.cache_defer # make sure the cache is ready + yield self.runInteraction( + "unregister_app_service", + self._unregister_app_service_txn, + token, + ) + # update cache TODO: Should this be in the txn? + for service in self.services_cache: + if service.token == token: + service.url = None + service.namespaces = None + service.hs_token = None + + def _unregister_app_service_txn(self, txn, token): + # kill the url to prevent pushes + txn.execute( + "UPDATE application_services SET url=NULL WHERE token=?", + (token,) + ) + + # cleanup regex + as_id = self._get_as_id_txn(txn, token) + if not as_id: + logger.warning( + "unregister_app_service_txn: Failed to find as_id for token=", + token + ) + return False + + txn.execute( + "DELETE FROM application_services_regex WHERE as_id=?", + (as_id,) + ) + return True + + @defer.inlineCallbacks + def update_app_service(self, service): + """Update an application service, clobbering what was previously there. + + Args: + service(ApplicationService): The updated service. + """ + yield self.cache_defer # make sure the cache is ready + + # NB: There is no "insert" since we provide no public-facing API to + # allocate new ASes. It relies on the server admin inserting the AS + # token into the database manually. + + if not service.token or not service.url: + raise StoreError(400, "Token and url must be specified.") + + if not service.hs_token: + raise StoreError(500, "No HS token") + + yield self.runInteraction( + "update_app_service", + self._update_app_service_txn, + service + ) + + # update cache TODO: Should this be in the txn? + for (index, cache_service) in enumerate(self.services_cache): + if service.token == cache_service.token: + self.services_cache[index] = service + logger.info("Updated: %s", service) + return + # new entry + self.services_cache.append(service) + logger.info("Updated(new): %s", service) + + def _update_app_service_txn(self, txn, service): + as_id = self._get_as_id_txn(txn, service.token) + if not as_id: + logger.warning( + "update_app_service_txn: Failed to find as_id for token=", + service.token + ) + return False + + txn.execute( + "UPDATE application_services SET url=?, hs_token=?, sender=? " + "WHERE id=?", + (service.url, service.hs_token, service.sender, as_id,) + ) + # cleanup regex + txn.execute( + "DELETE FROM application_services_regex WHERE as_id=?", + (as_id,) + ) + for (ns_int, ns_str) in enumerate(ApplicationService.NS_LIST): + if ns_str in service.namespaces: + for regex_obj in service.namespaces[ns_str]: + txn.execute( + "INSERT INTO application_services_regex(" + "as_id, namespace, regex) values(?,?,?)", + (as_id, ns_int, simplejson.dumps(regex_obj)) + ) + return True + + def _get_as_id_txn(self, txn, token): + cursor = txn.execute( + "SELECT id FROM application_services WHERE token=?", + (token,) + ) + res = cursor.fetchone() + if res: + return res[0] + + @defer.inlineCallbacks + def get_app_services(self): + yield self.cache_defer # make sure the cache is ready + defer.returnValue(self.services_cache) + + @defer.inlineCallbacks + def get_app_service_by_user_id(self, user_id): + """Retrieve an application service from their user ID. + + All application services have associated with them a particular user ID. + There is no distinguishing feature on the user ID which indicates it + represents an application service. This function allows you to map from + a user ID to an application service. + + Args: + user_id(str): The user ID to see if it is an application service. + Returns: + synapse.appservice.ApplicationService or None. + """ + + yield self.cache_defer # make sure the cache is ready + + for service in self.services_cache: + if service.sender == user_id: + defer.returnValue(service) + return + defer.returnValue(None) + + @defer.inlineCallbacks + def get_app_service_by_token(self, token, from_cache=True): + """Get the application service with the given appservice token. + + Args: + token (str): The application service token. + from_cache (bool): True to get this service from the cache, False to + check the database. + Raises: + StoreError if there was a problem retrieving this service. + """ + yield self.cache_defer # make sure the cache is ready + + if from_cache: + for service in self.services_cache: + if service.token == token: + defer.returnValue(service) + return + defer.returnValue(None) + + # TODO: The from_cache=False impl + # TODO: This should be JOINed with the application_services_regex table. + + def get_app_service_rooms(self, service): + """Get a list of RoomsForUser for this application service. + + Application services may be "interested" in lots of rooms depending on + the room ID, the room aliases, or the members in the room. This function + takes all of these into account and returns a list of RoomsForUser which + represent the entire list of room IDs that this application service + wants to know about. + + Args: + service: The application service to get a room list for. + Returns: + A list of RoomsForUser. + """ + return self.runInteraction( + "get_app_service_rooms", + self._get_app_service_rooms_txn, + service, + ) + + def _get_app_service_rooms_txn(self, txn, service): + # get all rooms matching the room ID regex. + room_entries = self._simple_select_list_txn( + txn=txn, table="rooms", keyvalues=None, retcols=["room_id"] + ) + matching_room_list = set([ + r["room_id"] for r in room_entries if + service.is_interested_in_room(r["room_id"]) + ]) + + # resolve room IDs for matching room alias regex. + room_alias_mappings = self._simple_select_list_txn( + txn=txn, table="room_aliases", keyvalues=None, + retcols=["room_id", "room_alias"] + ) + matching_room_list |= set([ + r["room_id"] for r in room_alias_mappings if + service.is_interested_in_alias(r["room_alias"]) + ]) + + # get all rooms for every user for this AS. This is scoped to users on + # this HS only. + user_list = self._simple_select_list_txn( + txn=txn, table="users", keyvalues=None, retcols=["name"] + ) + user_list = [ + u["name"] for u in user_list if + service.is_interested_in_user(u["name"]) + ] + rooms_for_user_matching_user_id = set() # RoomsForUser list + for user_id in user_list: + # FIXME: This assumes this store is linked with RoomMemberStore :( + rooms_for_user = self._get_rooms_for_user_where_membership_is_txn( + txn=txn, + user_id=user_id, + membership_list=[Membership.JOIN] + ) + rooms_for_user_matching_user_id |= set(rooms_for_user) + + # make RoomsForUser tuples for room ids and aliases which are not in the + # main rooms_for_user_list - e.g. they are rooms which do not have AS + # registered users in it. + known_room_ids = [r.room_id for r in rooms_for_user_matching_user_id] + missing_rooms_for_user = [ + RoomsForUser(r, service.sender, "join") for r in + matching_room_list if r not in known_room_ids + ] + rooms_for_user_matching_user_id |= set(missing_rooms_for_user) + + return rooms_for_user_matching_user_id + + @defer.inlineCallbacks + def _populate_cache(self): + """Populates the ApplicationServiceCache from the database.""" + sql = ("SELECT * FROM application_services LEFT JOIN " + "application_services_regex ON application_services.id = " + "application_services_regex.as_id") + # SQL results in the form: + # [ + # { + # 'regex': "something", + # 'url': "something", + # 'namespace': enum, + # 'as_id': 0, + # 'token': "something", + # 'hs_token': "otherthing", + # 'id': 0 + # } + # ] + services = {} + results = yield self._execute_and_decode(sql) + for res in results: + as_token = res["token"] + if as_token not in services: + # add the service + services[as_token] = { + "url": res["url"], + "token": as_token, + "hs_token": res["hs_token"], + "sender": res["sender"], + "namespaces": { + ApplicationService.NS_USERS: [], + ApplicationService.NS_ALIASES: [], + ApplicationService.NS_ROOMS: [] + } + } + # add the namespace regex if one exists + ns_int = res["namespace"] + if ns_int is None: + continue + try: + services[as_token]["namespaces"][ + ApplicationService.NS_LIST[ns_int]].append( + simplejson.loads(res["regex"]) + ) + except IndexError: + logger.error("Bad namespace enum '%s'. %s", ns_int, res) + except JSONDecodeError: + logger.error("Bad regex object '%s'", res["regex"]) + + # TODO get last successful txn id f.e. service + for service in services.values(): + logger.info("Found application service: %s", service) + self.services_cache.append(ApplicationService( + token=service["token"], + url=service["url"], + namespaces=service["namespaces"], + hs_token=service["hs_token"], + sender=service["sender"] + )) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 0cbcdd1b55..2deda8ac50 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -55,17 +55,19 @@ class EventFederationStore(SQLBaseStore): results = set() base_sql = ( - "SELECT auth_id FROM event_auth WHERE %s" + "SELECT auth_id FROM event_auth WHERE event_id = ?" ) front = set(event_ids) while front: - sql = base_sql % ( - " OR ".join(["event_id=?"] * len(front)), - ) + new_front = set() + for f in front: + txn.execute(base_sql, (f,)) + new_front.update([r[0] for r in txn.fetchall()]) + + new_front -= results - txn.execute(sql, list(front)) - front = [r[0] for r in txn.fetchall()] + front = new_front results.update(front) return list(results) @@ -379,3 +381,51 @@ class EventFederationStore(SQLBaseStore): event_results += new_front return self._get_events_txn(txn, event_results) + + def get_missing_events(self, room_id, earliest_events, latest_events, + limit, min_depth): + return self.runInteraction( + "get_missing_events", + self._get_missing_events, + room_id, earliest_events, latest_events, limit, min_depth + ) + + def _get_missing_events(self, txn, room_id, earliest_events, latest_events, + limit, min_depth): + + earliest_events = set(earliest_events) + front = set(latest_events) - earliest_events + + event_results = set() + + query = ( + "SELECT prev_event_id FROM event_edges " + "WHERE room_id = ? AND event_id = ? AND is_state = 0 " + "LIMIT ?" + ) + + while front and len(event_results) < limit: + new_front = set() + for event_id in front: + txn.execute( + query, + (room_id, event_id, limit - len(event_results)) + ) + + for e_id, in txn.fetchall(): + new_front.add(e_id) + + new_front -= earliest_events + new_front -= event_results + + front = new_front + event_results |= new_front + + events = self._get_events_txn(txn, event_results) + + events = sorted( + [ev for ev in events if ev.depth >= min_depth], + key=lambda e: e.depth, + ) + + return events[:limit] diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py new file mode 100644 index 0000000000..457a11fd02 --- /dev/null +++ b/synapse/storage/filtering.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from ._base import SQLBaseStore + +import simplejson as json + + +class FilteringStore(SQLBaseStore): + @defer.inlineCallbacks + def get_user_filter(self, user_localpart, filter_id): + def_json = yield self._simple_select_one_onecol( + table="user_filters", + keyvalues={ + "user_id": user_localpart, + "filter_id": filter_id, + }, + retcol="filter_json", + allow_none=False, + ) + + defer.returnValue(json.loads(def_json)) + + def add_user_filter(self, user_localpart, user_filter): + def_json = json.dumps(user_filter) + + # Need an atomic transaction to SELECT the maximal ID so far then + # INSERT a new one + def _do_txn(txn): + sql = ( + "SELECT MAX(filter_id) FROM user_filters " + "WHERE user_id = ?" + ) + txn.execute(sql, (user_localpart,)) + max_id = txn.fetchone()[0] + if max_id is None: + filter_id = 0 + else: + filter_id = max_id + 1 + + sql = ( + "INSERT INTO user_filters (user_id, filter_id, filter_json)" + "VALUES(?, ?, ?)" + ) + txn.execute(sql, (user_localpart, filter_id, def_json)) + + return filter_id + + return self.runInteraction("add_user_filter", _do_txn) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py new file mode 100644 index 0000000000..bbf322cc84 --- /dev/null +++ b/synapse/storage/push_rule.py @@ -0,0 +1,264 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections + +from ._base import SQLBaseStore, Table +from twisted.internet import defer + +import logging +import copy +import simplejson as json + +logger = logging.getLogger(__name__) + + +class PushRuleStore(SQLBaseStore): + @defer.inlineCallbacks + def get_push_rules_for_user(self, user_name): + sql = ( + "SELECT "+",".join(PushRuleTable.fields)+" " + "FROM "+PushRuleTable.table_name+" " + "WHERE user_name = ? " + "ORDER BY priority_class DESC, priority DESC" + ) + rows = yield self._execute(None, sql, user_name) + + dicts = [] + for r in rows: + d = {} + for i, f in enumerate(PushRuleTable.fields): + d[f] = r[i] + dicts.append(d) + + defer.returnValue(dicts) + + @defer.inlineCallbacks + def get_push_rules_enabled_for_user(self, user_name): + results = yield self._simple_select_list( + PushRuleEnableTable.table_name, + {'user_name': user_name}, + PushRuleEnableTable.fields + ) + defer.returnValue( + {r['rule_id']: False if r['enabled'] == 0 else True for r in results} + ) + + @defer.inlineCallbacks + def get_push_rule_enabled_by_user_rule_id(self, user_name, rule_id): + results = yield self._simple_select_list( + PushRuleEnableTable.table_name, + {'user_name': user_name, 'rule_id': rule_id}, + ['enabled'] + ) + if not results: + defer.returnValue(True) + defer.returnValue(results[0]) + + @defer.inlineCallbacks + def add_push_rule(self, before, after, **kwargs): + vals = copy.copy(kwargs) + if 'conditions' in vals: + vals['conditions'] = json.dumps(vals['conditions']) + if 'actions' in vals: + vals['actions'] = json.dumps(vals['actions']) + # we could check the rest of the keys are valid column names + # but sqlite will do that anyway so I think it's just pointless. + if 'id' in vals: + del vals['id'] + + if before or after: + ret = yield self.runInteraction( + "_add_push_rule_relative_txn", + self._add_push_rule_relative_txn, + before=before, + after=after, + **vals + ) + defer.returnValue(ret) + else: + ret = yield self.runInteraction( + "_add_push_rule_highest_priority_txn", + self._add_push_rule_highest_priority_txn, + **vals + ) + defer.returnValue(ret) + + def _add_push_rule_relative_txn(self, txn, user_name, **kwargs): + after = None + relative_to_rule = None + if 'after' in kwargs and kwargs['after']: + after = kwargs['after'] + relative_to_rule = after + if 'before' in kwargs and kwargs['before']: + relative_to_rule = kwargs['before'] + + # get the priority of the rule we're inserting after/before + sql = ( + "SELECT priority_class, priority FROM ? " + "WHERE user_name = ? and rule_id = ?" % (PushRuleTable.table_name,) + ) + txn.execute(sql, (user_name, relative_to_rule)) + res = txn.fetchall() + if not res: + raise RuleNotFoundException( + "before/after rule not found: %s" % (relative_to_rule,) + ) + priority_class, base_rule_priority = res[0] + + if 'priority_class' in kwargs and kwargs['priority_class'] != priority_class: + raise InconsistentRuleException( + "Given priority class does not match class of relative rule" + ) + + new_rule = copy.copy(kwargs) + if 'before' in new_rule: + del new_rule['before'] + if 'after' in new_rule: + del new_rule['after'] + new_rule['priority_class'] = priority_class + new_rule['user_name'] = user_name + + # check if the priority before/after is free + new_rule_priority = base_rule_priority + if after: + new_rule_priority -= 1 + else: + new_rule_priority += 1 + + new_rule['priority'] = new_rule_priority + + sql = ( + "SELECT COUNT(*) FROM " + PushRuleTable.table_name + + " WHERE user_name = ? AND priority_class = ? AND priority = ?" + ) + txn.execute(sql, (user_name, priority_class, new_rule_priority)) + res = txn.fetchall() + num_conflicting = res[0][0] + + # if there are conflicting rules, bump everything + if num_conflicting: + sql = "UPDATE "+PushRuleTable.table_name+" SET priority = priority " + if after: + sql += "-1" + else: + sql += "+1" + sql += " WHERE user_name = ? AND priority_class = ? AND priority " + if after: + sql += "<= ?" + else: + sql += ">= ?" + + txn.execute(sql, (user_name, priority_class, new_rule_priority)) + + # now insert the new rule + sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" (" + sql += ",".join(new_rule.keys())+") VALUES (" + sql += ", ".join(["?" for _ in new_rule.keys()])+")" + + txn.execute(sql, new_rule.values()) + + def _add_push_rule_highest_priority_txn(self, txn, user_name, + priority_class, **kwargs): + # find the highest priority rule in that class + sql = ( + "SELECT COUNT(*), MAX(priority) FROM " + PushRuleTable.table_name + + " WHERE user_name = ? and priority_class = ?" + ) + txn.execute(sql, (user_name, priority_class)) + res = txn.fetchall() + (how_many, highest_prio) = res[0] + + new_prio = 0 + if how_many > 0: + new_prio = highest_prio + 1 + + # and insert the new rule + new_rule = copy.copy(kwargs) + if 'id' in new_rule: + del new_rule['id'] + new_rule['user_name'] = user_name + new_rule['priority_class'] = priority_class + new_rule['priority'] = new_prio + + sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" (" + sql += ",".join(new_rule.keys())+") VALUES (" + sql += ", ".join(["?" for _ in new_rule.keys()])+")" + + txn.execute(sql, new_rule.values()) + + @defer.inlineCallbacks + def delete_push_rule(self, user_name, rule_id): + """ + Delete a push rule. Args specify the row to be deleted and can be + any of the columns in the push_rule table, but below are the + standard ones + + Args: + user_name (str): The matrix ID of the push rule owner + rule_id (str): The rule_id of the rule to be deleted + """ + yield self._simple_delete_one( + PushRuleTable.table_name, + {'user_name': user_name, 'rule_id': rule_id} + ) + + @defer.inlineCallbacks + def set_push_rule_enabled(self, user_name, rule_id, enabled): + if enabled: + yield self._simple_delete_one( + PushRuleEnableTable.table_name, + {'user_name': user_name, 'rule_id': rule_id} + ) + else: + yield self._simple_upsert( + PushRuleEnableTable.table_name, + {'user_name': user_name, 'rule_id': rule_id}, + {'enabled': False} + ) + + +class RuleNotFoundException(Exception): + pass + + +class InconsistentRuleException(Exception): + pass + + +class PushRuleTable(Table): + table_name = "push_rules" + + fields = [ + "id", + "user_name", + "rule_id", + "priority_class", + "priority", + "conditions", + "actions", + ] + + EntryType = collections.namedtuple("PushRuleEntry", fields) + + +class PushRuleEnableTable(Table): + table_name = "push_rules_enable" + + fields = [ + "user_name", + "rule_id", + "enabled" + ] diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py new file mode 100644 index 0000000000..6622b4d18a --- /dev/null +++ b/synapse/storage/pusher.py @@ -0,0 +1,173 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections + +from ._base import SQLBaseStore, Table +from twisted.internet import defer + +from synapse.api.errors import StoreError + +import logging + +logger = logging.getLogger(__name__) + + +class PusherStore(SQLBaseStore): + @defer.inlineCallbacks + def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey): + sql = ( + "SELECT id, user_name, kind, profile_tag, app_id," + "app_display_name, device_display_name, pushkey, ts, data, " + "last_token, last_success, failing_since " + "FROM pushers " + "WHERE app_id = ? AND pushkey = ?" + ) + + rows = yield self._execute( + None, sql, app_id_and_pushkey[0], app_id_and_pushkey[1] + ) + + ret = [ + { + "id": r[0], + "user_name": r[1], + "kind": r[2], + "profile_tag": r[3], + "app_id": r[4], + "app_display_name": r[5], + "device_display_name": r[6], + "pushkey": r[7], + "pushkey_ts": r[8], + "data": r[9], + "last_token": r[10], + "last_success": r[11], + "failing_since": r[12] + } + for r in rows + ] + + defer.returnValue(ret[0]) + + @defer.inlineCallbacks + def get_all_pushers(self): + sql = ( + "SELECT id, user_name, kind, profile_tag, app_id," + "app_display_name, device_display_name, pushkey, ts, data, " + "last_token, last_success, failing_since " + "FROM pushers" + ) + + rows = yield self._execute(None, sql) + + ret = [ + { + "id": r[0], + "user_name": r[1], + "kind": r[2], + "profile_tag": r[3], + "app_id": r[4], + "app_display_name": r[5], + "device_display_name": r[6], + "pushkey": r[7], + "pushkey_ts": r[8], + "data": r[9], + "last_token": r[10], + "last_success": r[11], + "failing_since": r[12] + } + for r in rows + ] + + defer.returnValue(ret) + + @defer.inlineCallbacks + def add_pusher(self, user_name, profile_tag, kind, app_id, + app_display_name, device_display_name, + pushkey, pushkey_ts, lang, data): + try: + yield self._simple_upsert( + PushersTable.table_name, + dict( + app_id=app_id, + pushkey=pushkey, + ), + dict( + user_name=user_name, + kind=kind, + profile_tag=profile_tag, + app_display_name=app_display_name, + device_display_name=device_display_name, + ts=pushkey_ts, + lang=lang, + data=data + )) + except Exception as e: + logger.error("create_pusher with failed: %s", e) + raise StoreError(500, "Problem creating pusher.") + + @defer.inlineCallbacks + def delete_pusher_by_app_id_pushkey(self, app_id, pushkey): + yield self._simple_delete_one( + PushersTable.table_name, + dict(app_id=app_id, pushkey=pushkey) + ) + + @defer.inlineCallbacks + def update_pusher_last_token(self, app_id, pushkey, last_token): + yield self._simple_update_one( + PushersTable.table_name, + {'app_id': app_id, 'pushkey': pushkey}, + {'last_token': last_token} + ) + + @defer.inlineCallbacks + def update_pusher_last_token_and_success(self, app_id, pushkey, + last_token, last_success): + yield self._simple_update_one( + PushersTable.table_name, + {'app_id': app_id, 'pushkey': pushkey}, + {'last_token': last_token, 'last_success': last_success} + ) + + @defer.inlineCallbacks + def update_pusher_failing_since(self, app_id, pushkey, failing_since): + yield self._simple_update_one( + PushersTable.table_name, + {'app_id': app_id, 'pushkey': pushkey}, + {'failing_since': failing_since} + ) + + +class PushersTable(Table): + table_name = "pushers" + + fields = [ + "id", + "user_name", + "kind", + "profile_tag", + "app_id", + "app_display_name", + "device_display_name", + "pushkey", + "pushkey_ts", + "data", + "last_token", + "last_success", + "failing_since" + ] + + EntryType = collections.namedtuple("PusherEntry", fields) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 75dffa4db2..029b07cc66 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -122,7 +122,8 @@ class RegistrationStore(SQLBaseStore): def _query_for_auth(self, txn, token): sql = ( - "SELECT users.name, users.admin, access_tokens.device_id" + "SELECT users.name, users.admin," + " access_tokens.device_id, access_tokens.id as token_id" " FROM users" " INNER JOIN access_tokens on users.id = access_tokens.user_id" " WHERE token = ?" diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py new file mode 100644 index 0000000000..4e1a9a2783 --- /dev/null +++ b/synapse/storage/rejections.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore + +import logging + +logger = logging.getLogger(__name__) + + +class RejectionsStore(SQLBaseStore): + def _store_rejections_txn(self, txn, event_id, reason): + self._simple_insert_txn( + txn, + table="rejections", + values={ + "event_id": event_id, + "reason": reason, + "last_check": self._clock.time_msec(), + } + ) + + def get_rejection_reason(self, event_id): + return self._simple_select_one_onecol( + table="rejections", + retcol="reason", + keyvalues={ + "event_id": event_id, + }, + allow_none=True, + ) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 6542f8e4f8..750b17a45f 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -82,38 +82,45 @@ class RoomStore(SQLBaseStore): "topic" key if one is set, and a "name" key if one is set """ - topic_subquery = ( - "SELECT topics.event_id as event_id, " - "topics.room_id as room_id, topic " - "FROM topics " - "INNER JOIN current_state_events as c " - "ON c.event_id = topics.event_id " - ) + def f(txn): + topic_subquery = ( + "SELECT topics.event_id as event_id, " + "topics.room_id as room_id, topic " + "FROM topics " + "INNER JOIN current_state_events as c " + "ON c.event_id = topics.event_id " + ) - name_subquery = ( - "SELECT room_names.event_id as event_id, " - "room_names.room_id as room_id, name " - "FROM room_names " - "INNER JOIN current_state_events as c " - "ON c.event_id = room_names.event_id " - ) + name_subquery = ( + "SELECT room_names.event_id as event_id, " + "room_names.room_id as room_id, name " + "FROM room_names " + "INNER JOIN current_state_events as c " + "ON c.event_id = room_names.event_id " + ) - # We use non printing ascii character US () as a seperator - sql = ( - "SELECT r.room_id, n.name, t.topic, " - "group_concat(a.room_alias, '') " - "FROM rooms AS r " - "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id " - "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id " - "INNER JOIN room_aliases AS a ON a.room_id = r.room_id " - "WHERE r.is_public = ? " - "GROUP BY r.room_id " - ) % { - "topic": topic_subquery, - "name": name_subquery, - } - - rows = yield self._execute(None, sql, is_public) + # We use non printing ascii character US () as a seperator + sql = ( + "SELECT r.room_id, n.name, t.topic, " + "group_concat(a.room_alias, '') " + "FROM rooms AS r " + "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id " + "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id " + "INNER JOIN room_aliases AS a ON a.room_id = r.room_id " + "WHERE r.is_public = ? " + "GROUP BY r.room_id " + ) % { + "topic": topic_subquery, + "name": name_subquery, + } + + c = txn.execute(sql, (is_public,)) + + return c.fetchall() + + rows = yield self.runInteraction( + "get_rooms", f + ) ret = [ { diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index e59e65529b..65ffb4627f 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -17,9 +17,10 @@ from twisted.internet import defer from collections import namedtuple -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached from synapse.api.constants import Membership +from synapse.types import UserID import logging @@ -39,7 +40,7 @@ class RoomMemberStore(SQLBaseStore): """ try: target_user_id = event.state_key - domain = self.hs.parse_userid(target_user_id).domain + domain = UserID.from_string(target_user_id).domain except: logger.exception( "Failed to parse target_user_id=%s", target_user_id @@ -84,7 +85,7 @@ class RoomMemberStore(SQLBaseStore): for e in member_events: try: joined_domains.add( - self.hs.parse_userid(e.state_key).domain + UserID.from_string(e.state_key).domain ) except: # FIXME: How do we deal with invalid user ids in the db? @@ -97,6 +98,8 @@ class RoomMemberStore(SQLBaseStore): txn.execute(sql, (event.room_id, domain)) + self.get_rooms_for_user.invalidate(target_user_id) + @defer.inlineCallbacks def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -177,6 +180,14 @@ class RoomMemberStore(SQLBaseStore): if not membership_list: return defer.succeed(None) + return self.runInteraction( + "get_rooms_for_user_where_membership_is", + self._get_rooms_for_user_where_membership_is_txn, + user_id, membership_list + ) + + def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id, + membership_list): where_clause = "user_id = ? AND (%s)" % ( " OR ".join(["membership = ?" for _ in membership_list]), ) @@ -184,24 +195,18 @@ class RoomMemberStore(SQLBaseStore): args = [user_id] args.extend(membership_list) - def f(txn): - sql = ( - "SELECT m.room_id, m.sender, m.membership" - " FROM room_memberships as m" - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id" - " WHERE %s" - ) % (where_clause,) - - txn.execute(sql, args) - return [ - RoomsForUser(**r) for r in self.cursor_to_dict(txn) - ] + sql = ( + "SELECT m.room_id, m.sender, m.membership" + " FROM room_memberships as m" + " INNER JOIN current_state_events as c" + " ON m.event_id = c.event_id" + " WHERE %s" + ) % (where_clause,) - return self.runInteraction( - "get_rooms_for_user_where_membership_is", - f - ) + txn.execute(sql, args) + return [ + RoomsForUser(**r) for r in self.cursor_to_dict(txn) + ] def get_joined_hosts_for_room(self, room_id): return self._simple_select_onecol( @@ -239,28 +244,32 @@ class RoomMemberStore(SQLBaseStore): results = self._parse_events_txn(txn, rows) return results + @cached() + def get_rooms_for_user(self, user_id): + return self.get_rooms_for_user_where_membership_is( + user_id, membership_list=[Membership.JOIN], + ) + + @defer.inlineCallbacks def user_rooms_intersect(self, user_id_list): """ Checks whether all the users whose IDs are given in a list share a room. + + This is a "hot path" function that's called a lot, e.g. by presence for + generating the event stream. As such, it is implemented locally by + wrapping logic around heavily-cached database queries. """ - def interaction(txn): - user_list_clause = " OR ".join(["m.user_id = ?"] * len(user_id_list)) - sql = ( - "SELECT m.room_id FROM room_memberships as m " - "INNER JOIN current_state_events as c " - "ON m.event_id = c.event_id " - "WHERE m.membership = 'join' " - "AND (%(clause)s) " - # TODO(paul): We've got duplicate rows in the database somewhere - # so we have to DISTINCT m.user_id here - "GROUP BY m.room_id HAVING COUNT(DISTINCT m.user_id) = ?" - ) % {"clause": user_list_clause} + if len(user_id_list) < 2: + defer.returnValue(True) + + deferreds = [self.get_rooms_for_user(u) for u in user_id_list] - args = list(user_id_list) - args.append(len(user_id_list)) + results = yield defer.DeferredList(deferreds, consumeErrors=True) - txn.execute(sql, args) + # A list of sets of strings giving room IDs for each user + room_id_lists = [set([r.room_id for r in result[1]]) for result in results] - return len(txn.fetchall()) > 0 + # There isn't a setintersection(*list_of_sets) + ret = len(room_id_lists.pop(0).intersection(*room_id_lists)) > 0 - return self.runInteraction("user_rooms_intersect", interaction) + defer.returnValue(ret) diff --git a/synapse/storage/schema/delta/v11.sql b/synapse/storage/schema/delta/11/v11.sql index 313592221b..313592221b 100644 --- a/synapse/storage/schema/delta/v11.sql +++ b/synapse/storage/schema/delta/11/v11.sql diff --git a/synapse/storage/schema/delta/12/v12.sql b/synapse/storage/schema/delta/12/v12.sql new file mode 100644 index 0000000000..b87ef1fe79 --- /dev/null +++ b/synapse/storage/schema/delta/12/v12.sql @@ -0,0 +1,67 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS rejections( + event_id TEXT NOT NULL, + reason TEXT NOT NULL, + last_check TEXT NOT NULL, + CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE +); + +-- Push notification endpoints that users have configured +CREATE TABLE IF NOT EXISTS pushers ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_name TEXT NOT NULL, + profile_tag varchar(32) NOT NULL, + kind varchar(8) NOT NULL, + app_id varchar(64) NOT NULL, + app_display_name varchar(64) NOT NULL, + device_display_name varchar(128) NOT NULL, + pushkey blob NOT NULL, + ts BIGINT NOT NULL, + lang varchar(8), + data blob, + last_token TEXT, + last_success BIGINT, + failing_since BIGINT, + FOREIGN KEY(user_name) REFERENCES users(name), + UNIQUE (app_id, pushkey) +); + +CREATE TABLE IF NOT EXISTS push_rules ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_name TEXT NOT NULL, + rule_id TEXT NOT NULL, + priority_class TINYINT NOT NULL, + priority INTEGER NOT NULL DEFAULT 0, + conditions TEXT NOT NULL, + actions TEXT NOT NULL, + UNIQUE(user_name, rule_id) +); + +CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name); + +CREATE TABLE IF NOT EXISTS user_filters( + user_id TEXT, + filter_id INTEGER, + filter_json TEXT, + FOREIGN KEY(user_id) REFERENCES users(id) +); + +CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters( + user_id, filter_id +); + +PRAGMA user_version = 12; diff --git a/synapse/storage/schema/delta/v3.sql b/synapse/storage/schema/delta/13/v13.sql index c67e38ff52..e491ad5aec 100644 --- a/synapse/storage/schema/delta/v3.sql +++ b/synapse/storage/schema/delta/13/v13.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2015 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,15 +13,22 @@ * limitations under the License. */ +CREATE TABLE IF NOT EXISTS application_services( + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT, + token TEXT, + hs_token TEXT, + sender TEXT, + UNIQUE(token) ON CONFLICT ROLLBACK +); -CREATE INDEX IF NOT EXISTS room_aliases_alias ON room_aliases(room_alias); -CREATE INDEX IF NOT EXISTS room_aliases_id ON room_aliases(room_id); +CREATE TABLE IF NOT EXISTS application_services_regex( + id INTEGER PRIMARY KEY AUTOINCREMENT, + as_id INTEGER NOT NULL, + namespace INTEGER, /* enum[room_id|room_alias|user_id] */ + regex TEXT, + FOREIGN KEY(as_id) REFERENCES application_services(id) +); -CREATE INDEX IF NOT EXISTS room_alias_servers_alias ON room_alias_servers(room_alias); -DELETE FROM room_aliases WHERE rowid NOT IN (SELECT max(rowid) FROM room_aliases GROUP BY room_alias, room_id); - -CREATE UNIQUE INDEX IF NOT EXISTS room_aliases_uniq ON room_aliases(room_alias, room_id); - -PRAGMA user_version = 3; diff --git a/synapse/storage/schema/delta/14/upgrade_appservice_db.py b/synapse/storage/schema/delta/14/upgrade_appservice_db.py new file mode 100644 index 0000000000..847b1c5b89 --- /dev/null +++ b/synapse/storage/schema/delta/14/upgrade_appservice_db.py @@ -0,0 +1,23 @@ +import json +import logging + +logger = logging.getLogger(__name__) + + +def run_upgrade(cur): + cur.execute("SELECT id, regex FROM application_services_regex") + for row in cur.fetchall(): + try: + logger.debug("Checking %s..." % row[0]) + json.loads(row[1]) + except ValueError: + # row isn't in json, make it so. + string_regex = row[1] + new_regex = json.dumps({ + "regex": string_regex, + "exclusive": True + }) + cur.execute( + "UPDATE application_services_regex SET regex=? WHERE id=?", + (new_regex, row[0]) + ) diff --git a/synapse/storage/schema/delta/14/v14.sql b/synapse/storage/schema/delta/14/v14.sql new file mode 100644 index 0000000000..0212726448 --- /dev/null +++ b/synapse/storage/schema/delta/14/v14.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS push_rules_enable ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_name TEXT NOT NULL, + rule_id TEXT NOT NULL, + enabled TINYINT, + UNIQUE(user_name, rule_id) +); + +CREATE INDEX IF NOT EXISTS push_rules_enable_user_name on push_rules_enable (user_name); diff --git a/synapse/storage/schema/delta/v2.sql b/synapse/storage/schema/delta/v2.sql deleted file mode 100644 index f740f6dd5d..0000000000 --- a/synapse/storage/schema/delta/v2.sql +++ /dev/null @@ -1,168 +0,0 @@ -/* Copyright 2014, 2015 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -CREATE TABLE IF NOT EXISTS events( - stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT, - topological_ordering INTEGER NOT NULL, - event_id TEXT NOT NULL, - type TEXT NOT NULL, - room_id TEXT NOT NULL, - content TEXT NOT NULL, - unrecognized_keys TEXT, - processed BOOL NOT NULL, - outlier BOOL NOT NULL, - CONSTRAINT ev_uniq UNIQUE (event_id) -); - -CREATE INDEX IF NOT EXISTS events_event_id ON events (event_id); -CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering); -CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering); -CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id); - -CREATE TABLE IF NOT EXISTS state_events( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - type TEXT NOT NULL, - state_key TEXT NOT NULL, - prev_state TEXT -); - -CREATE UNIQUE INDEX IF NOT EXISTS state_events_event_id ON state_events (event_id); -CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id); -CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type); -CREATE INDEX IF NOT EXISTS state_events_state_key ON state_events (state_key); - - -CREATE TABLE IF NOT EXISTS current_state_events( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - type TEXT NOT NULL, - state_key TEXT NOT NULL, - CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE -); - -CREATE INDEX IF NOT EXISTS curr_events_event_id ON current_state_events (event_id); -CREATE INDEX IF NOT EXISTS current_state_events_room_id ON current_state_events (room_id); -CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (type); -CREATE INDEX IF NOT EXISTS current_state_events_state_key ON current_state_events (state_key); - -CREATE TABLE IF NOT EXISTS room_memberships( - event_id TEXT NOT NULL, - user_id TEXT NOT NULL, - sender TEXT NOT NULL, - room_id TEXT NOT NULL, - membership TEXT NOT NULL -); - -CREATE INDEX IF NOT EXISTS room_memberships_event_id ON room_memberships (event_id); -CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id); -CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id); - -CREATE TABLE IF NOT EXISTS feedback( - event_id TEXT NOT NULL, - feedback_type TEXT, - target_event_id TEXT, - sender TEXT, - room_id TEXT -); - -CREATE TABLE IF NOT EXISTS topics( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - topic TEXT NOT NULL -); - -CREATE TABLE IF NOT EXISTS room_names( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - name TEXT NOT NULL -); - -CREATE TABLE IF NOT EXISTS rooms( - room_id TEXT PRIMARY KEY NOT NULL, - is_public INTEGER, - creator TEXT -); - -CREATE TABLE IF NOT EXISTS room_join_rules( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - join_rule TEXT NOT NULL -); -CREATE INDEX IF NOT EXISTS room_join_rules_event_id ON room_join_rules(event_id); -CREATE INDEX IF NOT EXISTS room_join_rules_room_id ON room_join_rules(room_id); - - -CREATE TABLE IF NOT EXISTS room_power_levels( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - user_id TEXT NOT NULL, - level INTEGER NOT NULL -); -CREATE INDEX IF NOT EXISTS room_power_levels_event_id ON room_power_levels(event_id); -CREATE INDEX IF NOT EXISTS room_power_levels_room_id ON room_power_levels(room_id); -CREATE INDEX IF NOT EXISTS room_power_levels_room_user ON room_power_levels(room_id, user_id); - - -CREATE TABLE IF NOT EXISTS room_default_levels( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - level INTEGER NOT NULL -); - -CREATE INDEX IF NOT EXISTS room_default_levels_event_id ON room_default_levels(event_id); -CREATE INDEX IF NOT EXISTS room_default_levels_room_id ON room_default_levels(room_id); - - -CREATE TABLE IF NOT EXISTS room_add_state_levels( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - level INTEGER NOT NULL -); - -CREATE INDEX IF NOT EXISTS room_add_state_levels_event_id ON room_add_state_levels(event_id); -CREATE INDEX IF NOT EXISTS room_add_state_levels_room_id ON room_add_state_levels(room_id); - - -CREATE TABLE IF NOT EXISTS room_send_event_levels( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - level INTEGER NOT NULL -); - -CREATE INDEX IF NOT EXISTS room_send_event_levels_event_id ON room_send_event_levels(event_id); -CREATE INDEX IF NOT EXISTS room_send_event_levels_room_id ON room_send_event_levels(room_id); - - -CREATE TABLE IF NOT EXISTS room_ops_levels( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - ban_level INTEGER, - kick_level INTEGER -); - -CREATE INDEX IF NOT EXISTS room_ops_levels_event_id ON room_ops_levels(event_id); -CREATE INDEX IF NOT EXISTS room_ops_levels_room_id ON room_ops_levels(room_id); - - -CREATE TABLE IF NOT EXISTS room_hosts( - room_id TEXT NOT NULL, - host TEXT NOT NULL, - CONSTRAINT room_hosts_uniq UNIQUE (room_id, host) ON CONFLICT IGNORE -); - -CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id); - -PRAGMA user_version = 2; diff --git a/synapse/storage/schema/delta/v4.sql b/synapse/storage/schema/delta/v4.sql deleted file mode 100644 index d3807b7686..0000000000 --- a/synapse/storage/schema/delta/v4.sql +++ /dev/null @@ -1,26 +0,0 @@ -/* Copyright 2014, 2015 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -CREATE TABLE IF NOT EXISTS redactions ( - event_id TEXT NOT NULL, - redacts TEXT NOT NULL, - CONSTRAINT ev_uniq UNIQUE (event_id) -); - -CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id); -CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts); - -ALTER TABLE room_ops_levels ADD COLUMN redact_level INTEGER; - -PRAGMA user_version = 4; diff --git a/synapse/storage/schema/delta/v5.sql b/synapse/storage/schema/delta/v5.sql deleted file mode 100644 index 0874a15431..0000000000 --- a/synapse/storage/schema/delta/v5.sql +++ /dev/null @@ -1,30 +0,0 @@ -/* Copyright 2014, 2015 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -CREATE TABLE IF NOT EXISTS user_ips ( - user TEXT NOT NULL, - access_token TEXT NOT NULL, - device_id TEXT, - ip TEXT NOT NULL, - user_agent TEXT NOT NULL, - last_seen INTEGER NOT NULL, - CONSTRAINT user_ip UNIQUE (user, access_token, ip, user_agent) ON CONFLICT REPLACE -); - -CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user); - -ALTER TABLE users ADD COLUMN admin BOOL DEFAULT 0 NOT NULL; - -PRAGMA user_version = 5; diff --git a/synapse/storage/schema/delta/v6.sql b/synapse/storage/schema/delta/v6.sql deleted file mode 100644 index a9e0a4fe0d..0000000000 --- a/synapse/storage/schema/delta/v6.sql +++ /dev/null @@ -1,31 +0,0 @@ -/* Copyright 2014, 2015 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -CREATE TABLE IF NOT EXISTS server_tls_certificates( - server_name TEXT, -- Server name. - fingerprint TEXT, -- Certificate fingerprint. - from_server TEXT, -- Which key server the certificate was fetched from. - ts_added_ms INTEGER, -- When the certifcate was added. - tls_certificate BLOB, -- DER encoded x509 certificate. - CONSTRAINT uniqueness UNIQUE (server_name, fingerprint) -); - -CREATE TABLE IF NOT EXISTS server_signature_keys( - server_name TEXT, -- Server name. - key_id TEXT, -- Key version. - from_server TEXT, -- Which key server the key was fetched form. - ts_added_ms INTEGER, -- When the key was added. - verify_key BLOB, -- NACL verification key. - CONSTRAINT uniqueness UNIQUE (server_name, key_id) -); diff --git a/synapse/storage/schema/delta/v8.sql b/synapse/storage/schema/delta/v8.sql deleted file mode 100644 index 1e9f8b18cb..0000000000 --- a/synapse/storage/schema/delta/v8.sql +++ /dev/null @@ -1,34 +0,0 @@ -/* Copyright 2014, 2015 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - CREATE TABLE IF NOT EXISTS event_signatures_2 ( - event_id TEXT, - signature_name TEXT, - key_id TEXT, - signature BLOB, - CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id) -); - -INSERT INTO event_signatures_2 (event_id, signature_name, key_id, signature) -SELECT event_id, signature_name, key_id, signature FROM event_signatures; - -DROP TABLE event_signatures; -ALTER TABLE event_signatures_2 RENAME TO event_signatures; - -CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures ( - event_id -); - -PRAGMA user_version = 8; \ No newline at end of file diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql deleted file mode 100644 index 455d51a70c..0000000000 --- a/synapse/storage/schema/delta/v9.sql +++ /dev/null @@ -1,79 +0,0 @@ -/* Copyright 2014, 2015 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - --- To track destination health -CREATE TABLE IF NOT EXISTS destinations( - destination TEXT PRIMARY KEY, - retry_last_ts INTEGER, - retry_interval INTEGER -); - - -CREATE TABLE IF NOT EXISTS local_media_repository ( - media_id TEXT, -- The id used to refer to the media. - media_type TEXT, -- The MIME-type of the media. - media_length INTEGER, -- Length of the media in bytes. - created_ts INTEGER, -- When the content was uploaded in ms. - upload_name TEXT, -- The name the media was uploaded with. - user_id TEXT, -- The user who uploaded the file. - CONSTRAINT uniqueness UNIQUE (media_id) -); - -CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails ( - media_id TEXT, -- The id used to refer to the media. - thumbnail_width INTEGER, -- The width of the thumbnail in pixels. - thumbnail_height INTEGER, -- The height of the thumbnail in pixels. - thumbnail_type TEXT, -- The MIME-type of the thumbnail. - thumbnail_method TEXT, -- The method used to make the thumbnail. - thumbnail_length INTEGER, -- The length of the thumbnail in bytes. - CONSTRAINT uniqueness UNIQUE ( - media_id, thumbnail_width, thumbnail_height, thumbnail_type - ) -); - -CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id - ON local_media_repository_thumbnails (media_id); - -CREATE TABLE IF NOT EXISTS remote_media_cache ( - media_origin TEXT, -- The remote HS the media came from. - media_id TEXT, -- The id used to refer to the media on that server. - media_type TEXT, -- The MIME-type of the media. - created_ts INTEGER, -- When the content was uploaded in ms. - upload_name TEXT, -- The name the media was uploaded with. - media_length INTEGER, -- Length of the media in bytes. - filesystem_id TEXT, -- The name used to store the media on disk. - CONSTRAINT uniqueness UNIQUE (media_origin, media_id) -); - -CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails ( - media_origin TEXT, -- The remote HS the media came from. - media_id TEXT, -- The id used to refer to the media. - thumbnail_width INTEGER, -- The width of the thumbnail in pixels. - thumbnail_height INTEGER, -- The height of the thumbnail in pixels. - thumbnail_method TEXT, -- The method used to make the thumbnail - thumbnail_type TEXT, -- The MIME-type of the thumbnail. - thumbnail_length INTEGER, -- The length of the thumbnail in bytes. - filesystem_id TEXT, -- The name used to store the media on disk. - CONSTRAINT uniqueness UNIQUE ( - media_origin, media_id, thumbnail_width, thumbnail_height, - thumbnail_type, thumbnail_type - ) -); - -CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id - ON local_media_repository_thumbnails (media_id); - - -PRAGMA user_version = 9; diff --git a/synapse/storage/schema/event_edges.sql b/synapse/storage/schema/full_schemas/11/event_edges.sql index 1e766d6db2..1e766d6db2 100644 --- a/synapse/storage/schema/event_edges.sql +++ b/synapse/storage/schema/full_schemas/11/event_edges.sql diff --git a/synapse/storage/schema/event_signatures.sql b/synapse/storage/schema/full_schemas/11/event_signatures.sql index c28c39c48a..c28c39c48a 100644 --- a/synapse/storage/schema/event_signatures.sql +++ b/synapse/storage/schema/full_schemas/11/event_signatures.sql diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/full_schemas/11/im.sql index dd00c1cd2f..dd00c1cd2f 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/full_schemas/11/im.sql diff --git a/synapse/storage/schema/keys.sql b/synapse/storage/schema/full_schemas/11/keys.sql index a9e0a4fe0d..a9e0a4fe0d 100644 --- a/synapse/storage/schema/keys.sql +++ b/synapse/storage/schema/full_schemas/11/keys.sql diff --git a/synapse/storage/schema/media_repository.sql b/synapse/storage/schema/full_schemas/11/media_repository.sql index afdf48cbfb..afdf48cbfb 100644 --- a/synapse/storage/schema/media_repository.sql +++ b/synapse/storage/schema/full_schemas/11/media_repository.sql diff --git a/synapse/storage/schema/presence.sql b/synapse/storage/schema/full_schemas/11/presence.sql index f9f8db9697..f9f8db9697 100644 --- a/synapse/storage/schema/presence.sql +++ b/synapse/storage/schema/full_schemas/11/presence.sql diff --git a/synapse/storage/schema/profiles.sql b/synapse/storage/schema/full_schemas/11/profiles.sql index f06a528b4d..f06a528b4d 100644 --- a/synapse/storage/schema/profiles.sql +++ b/synapse/storage/schema/full_schemas/11/profiles.sql diff --git a/synapse/storage/schema/redactions.sql b/synapse/storage/schema/full_schemas/11/redactions.sql index 5011d95db8..5011d95db8 100644 --- a/synapse/storage/schema/redactions.sql +++ b/synapse/storage/schema/full_schemas/11/redactions.sql diff --git a/synapse/storage/schema/room_aliases.sql b/synapse/storage/schema/full_schemas/11/room_aliases.sql index 0d2df01603..0d2df01603 100644 --- a/synapse/storage/schema/room_aliases.sql +++ b/synapse/storage/schema/full_schemas/11/room_aliases.sql diff --git a/synapse/storage/schema/state.sql b/synapse/storage/schema/full_schemas/11/state.sql index 1fe8f1e430..1fe8f1e430 100644 --- a/synapse/storage/schema/state.sql +++ b/synapse/storage/schema/full_schemas/11/state.sql diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/full_schemas/11/transactions.sql index 2d30f99b06..2d30f99b06 100644 --- a/synapse/storage/schema/transactions.sql +++ b/synapse/storage/schema/full_schemas/11/transactions.sql diff --git a/synapse/storage/schema/users.sql b/synapse/storage/schema/full_schemas/11/users.sql index 08ccfdac0a..08ccfdac0a 100644 --- a/synapse/storage/schema/users.sql +++ b/synapse/storage/schema/full_schemas/11/users.sql diff --git a/synapse/storage/schema/schema_version.sql b/synapse/storage/schema/schema_version.sql new file mode 100644 index 0000000000..0431e2d051 --- /dev/null +++ b/synapse/storage/schema/schema_version.sql @@ -0,0 +1,30 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS schema_version( + Lock char(1) NOT NULL DEFAULT 'X', -- Makes sure this table only has one row. + version INTEGER NOT NULL, + upgraded BOOL NOT NULL, -- Whether we reached this version from an upgrade or an initial schema. + CONSTRAINT schema_version_lock_x CHECK (Lock='X') + CONSTRAINT schema_version_lock_uniq UNIQUE (Lock) +); + +CREATE TABLE IF NOT EXISTS applied_schema_deltas( + version INTEGER NOT NULL, + file TEXT NOT NULL, + CONSTRAINT schema_deltas_ver_file UNIQUE (version, file) ON CONFLICT IGNORE +); + +CREATE INDEX IF NOT EXISTS schema_deltas_ver ON applied_schema_deltas(version); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 8ac2adab05..09bc522210 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -36,6 +36,7 @@ what sort order was used: from twisted.internet import defer from ._base import SQLBaseStore +from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.util.logutils import log_function @@ -82,10 +83,10 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): def parse(cls, string): try: if string[0] == 's': - return cls(None, int(string[1:])) + return cls(topological=None, stream=int(string[1:])) if string[0] == 't': parts = string[1:].split('-', 1) - return cls(int(parts[1]), int(parts[0])) + return cls(topological=int(parts[0]), stream=int(parts[1])) except: pass raise SynapseError(400, "Invalid token %r" % (string,)) @@ -94,7 +95,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): def parse_stream_token(cls, string): try: if string[0] == 's': - return cls(None, int(string[1:])) + return cls(topological=None, stream=int(string[1:])) except: pass raise SynapseError(400, "Invalid token %r" % (string,)) @@ -127,6 +128,85 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): class StreamStore(SQLBaseStore): + + @defer.inlineCallbacks + def get_appservice_room_stream(self, service, from_key, to_key, limit=0): + # NB this lives here instead of appservice.py so we can reuse the + # 'private' StreamToken class in this file. + if limit: + limit = max(limit, MAX_STREAM_SIZE) + else: + limit = MAX_STREAM_SIZE + + # From and to keys should be integers from ordering. + from_id = _StreamToken.parse_stream_token(from_key) + to_id = _StreamToken.parse_stream_token(to_key) + + if from_key == to_key: + defer.returnValue(([], to_key)) + return + + # select all the events between from/to with a sensible limit + sql = ( + "SELECT e.event_id, e.room_id, e.type, s.state_key, " + "e.stream_ordering FROM events AS e LEFT JOIN state_events as s ON " + "e.event_id = s.event_id " + "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " + "ORDER BY stream_ordering ASC LIMIT %(limit)d " + ) % { + "limit": limit + } + + def f(txn): + # pull out all the events between the tokens + txn.execute(sql, (from_id.stream, to_id.stream,)) + rows = self.cursor_to_dict(txn) + + # Logic: + # - We want ALL events which match the AS room_id regex + # - We want ALL events which match the rooms represented by the AS + # room_alias regex + # - We want ALL events for rooms that AS users have joined. + # This is currently supported via get_app_service_rooms (which is + # used for the Notifier listener rooms). We can't reasonably make a + # SQL query for these room IDs, so we'll pull all the events between + # from/to and filter in python. + rooms_for_as = self._get_app_service_rooms_txn(txn, service) + room_ids_for_as = [r.room_id for r in rooms_for_as] + + def app_service_interested(row): + if row["room_id"] in room_ids_for_as: + return True + + if row["type"] == EventTypes.Member: + if service.is_interested_in_user(row.get("state_key")): + return True + return False + + ret = self._get_events_txn( + txn, + # apply the filter on the room id list + [ + r["event_id"] for r in rows + if app_service_interested(r) + ], + get_prev_content=True + ) + + self._set_before_and_after(ret, rows) + + if rows: + key = "s%d" % max(r["stream_ordering"] for r in rows) + else: + # Assume we didn't get anything because there was nothing to + # get. + key = to_key + + return ret, key + + results = yield self.runInteraction("get_appservice_room_stream", f) + defer.returnValue(results) + @log_function def get_room_events_stream(self, user_id, from_key, to_key, room_id, limit=0, with_feedback=False): @@ -181,8 +261,10 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) + self._set_before_and_after(ret, rows) + if rows: - key = "s%d" % max([r["stream_ordering"] for r in rows]) + key = "s%d" % max(r["stream_ordering"] for r in rows) else: # Assume we didn't get anything because there was nothing to # get. @@ -260,22 +342,44 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) + self._set_before_and_after(events, rows) + return events, next_token, return self.runInteraction("paginate_room_events", f) def get_recent_events_for_room(self, room_id, limit, end_token, - with_feedback=False): + with_feedback=False, from_token=None): # TODO (erikj): Handle compressed feedback - sql = ( - "SELECT stream_ordering, topological_ordering, event_id FROM events " - "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 " - "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? " - ) + end_token = _StreamToken.parse_stream_token(end_token) - def f(txn): - txn.execute(sql, (room_id, end_token, limit,)) + if from_token is None: + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0" + " ORDER BY topological_ordering DESC, stream_ordering DESC" + " LIMIT ?" + ) + else: + from_token = _StreamToken.parse_stream_token(from_token) + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering > ?" + " AND stream_ordering <= ? AND outlier = 0" + " ORDER BY topological_ordering DESC, stream_ordering DESC" + " LIMIT ?" + ) + + def get_recent_events_for_room_txn(txn): + if from_token is None: + txn.execute(sql, (room_id, end_token.stream, limit,)) + else: + txn.execute(sql, ( + room_id, from_token.stream, end_token.stream, limit + )) rows = self.cursor_to_dict(txn) @@ -291,9 +395,9 @@ class StreamStore(SQLBaseStore): toke = rows[0]["stream_ordering"] - 1 start_token = str(_StreamToken(topo, toke)) - token = (start_token, end_token) + token = (start_token, str(end_token)) else: - token = (end_token, end_token) + token = (str(end_token), str(end_token)) events = self._get_events_txn( txn, @@ -301,9 +405,13 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) + self._set_before_and_after(events, rows) + return events, token - return self.runInteraction("get_recent_events_for_room", f) + return self.runInteraction( + "get_recent_events_for_room", get_recent_events_for_room_txn + ) def get_room_events_max_id(self): return self.runInteraction( @@ -325,3 +433,12 @@ class StreamStore(SQLBaseStore): key = res[0]["m"] return "s%d" % (key,) + + @staticmethod + def _set_before_and_after(events, rows): + for event, row in zip(events, rows): + stream = row["stream_ordering"] + topo = event.depth + internal = event.internal_metadata + internal.before = str(_StreamToken(topo, stream - 1)) + internal.after = str(_StreamToken(topo, stream)) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index e06ef35690..0b8a3b7a07 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -13,12 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, Table +from ._base import SQLBaseStore, Table, cached from collections import namedtuple -from twisted.internet import defer - import logging logger = logging.getLogger(__name__) @@ -28,10 +26,6 @@ class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ - # a write-through cache of DestinationsTable.EntryType indexed by - # destination string - destination_retry_cache = {} - def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -211,6 +205,7 @@ class TransactionStore(SQLBaseStore): return ReceivedTransactionsTable.decode_results(txn.fetchall()) + @cached() def get_destination_retry_timings(self, destination): """Gets the current retry timings (if any) for a given destination. @@ -221,9 +216,6 @@ class TransactionStore(SQLBaseStore): None if not retrying Otherwise a DestinationsTable.EntryType for the retry scheme """ - if destination in self.destination_retry_cache: - return defer.succeed(self.destination_retry_cache[destination]) - return self.runInteraction( "get_destination_retry_timings", self._get_destination_retry_timings, destination) @@ -250,7 +242,9 @@ class TransactionStore(SQLBaseStore): retry_interval (int) - how long until next retry in ms """ - self.destination_retry_cache[destination] = ( + # As this is the new value, we might as well prefill the cache + self.get_destination_retry_timings.prefill( + destination, DestinationsTable.EntryType( destination, retry_last_ts, |