diff options
Diffstat (limited to 'synapse/storage')
71 files changed, 3428 insertions, 1491 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 4b16f445d6..0cc14fb692 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -14,13 +14,12 @@ # limitations under the License. from twisted.internet import defer - -from synapse.util.logutils import log_function -from synapse.api.constants import EventTypes - -from .appservice import ApplicationServiceStore +from .appservice import ( + ApplicationServiceStore, ApplicationServiceTransactionStore +) +from ._base import Cache from .directory import DirectoryStore -from .feedback import FeedbackStore +from .events import EventsStore from .presence import PresenceStore from .profile import ProfileStore from .registration import RegistrationStore @@ -39,11 +38,6 @@ from .state import StateStore from .signatures import SignatureStore from .filtering import FilteringStore -from syutil.base64util import decode_base64 -from syutil.jsonutil import encode_canonical_json - -from synapse.crypto.event_signing import compute_event_reference_hash - import fnmatch import imp @@ -57,20 +51,18 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 14 +SCHEMA_VERSION = 17 dir_path = os.path.abspath(os.path.dirname(__file__)) - -class _RollbackButIsFineException(Exception): - """ This exception is used to rollback a transaction without implying - something went wrong. - """ - pass +# Number of msec of granularity to store the user IP 'last seen' time. Smaller +# times give more inserts into the database even for readonly API hits +# 120 seconds == 2 minutes +LAST_SEEN_GRANULARITY = 120*1000 class DataStore(RoomMemberStore, RoomStore, - RegistrationStore, StreamStore, ProfileStore, FeedbackStore, + RegistrationStore, StreamStore, ProfileStore, PresenceStore, TransactionStore, DirectoryStore, KeyStore, StateStore, SignatureStore, ApplicationServiceStore, @@ -79,7 +71,9 @@ class DataStore(RoomMemberStore, RoomStore, RejectionsStore, FilteringStore, PusherStore, - PushRuleStore + PushRuleStore, + ApplicationServiceTransactionStore, + EventsStore, ): def __init__(self, hs): @@ -89,474 +83,53 @@ class DataStore(RoomMemberStore, RoomStore, self.min_token_deferred = self._get_min_token() self.min_token = None - @defer.inlineCallbacks - @log_function - def persist_event(self, event, context, backfilled=False, - is_new_state=True, current_state=None): - stream_ordering = None - if backfilled: - if not self.min_token_deferred.called: - yield self.min_token_deferred - self.min_token -= 1 - stream_ordering = self.min_token - - try: - yield self.runInteraction( - "persist_event", - self._persist_event_txn, - event=event, - context=context, - backfilled=backfilled, - stream_ordering=stream_ordering, - is_new_state=is_new_state, - current_state=current_state, - ) - except _RollbackButIsFineException: - pass - - @defer.inlineCallbacks - def get_event(self, event_id, check_redacted=True, - get_prev_content=False, allow_rejected=False, - allow_none=False): - """Get an event from the database by event_id. - - Args: - event_id (str): The event_id of the event to fetch - check_redacted (bool): If True, check if event has been redacted - and redact it. - get_prev_content (bool): If True and event is a state event, - include the previous states content in the unsigned field. - allow_rejected (bool): If True return rejected events. - allow_none (bool): If True, return None if no event found, if - False throw an exception. - - Returns: - Deferred : A FrozenEvent. - """ - event = yield self.runInteraction( - "get_event", self._get_event_txn, - event_id, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - if not event and not allow_none: - raise RuntimeError("Could not find event %s" % (event_id,)) - - defer.returnValue(event) - - @log_function - def _persist_event_txn(self, txn, event, context, backfilled, - stream_ordering=None, is_new_state=True, - current_state=None): - - # Remove the any existing cache entries for the event_id - self._get_event_cache.pop(event.event_id) - - # We purposefully do this first since if we include a `current_state` - # key, we *want* to update the `current_state_events` table - if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) - ) - - for s in current_state: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": s.event_id, - "room_id": s.room_id, - "type": s.type, - "state_key": s.state_key, - }, - or_replace=True, - ) - - if event.is_state() and is_new_state: - if not backfilled and not context.rejected: - self._simple_insert_txn( - txn, - table="state_forward_extremities", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - or_replace=True, - ) - - for prev_state_id, _ in event.prev_state: - self._simple_delete_txn( - txn, - table="state_forward_extremities", - keyvalues={ - "event_id": prev_state_id, - } - ) - - outlier = event.internal_metadata.is_outlier() - - if not outlier: - self._store_state_groups_txn(txn, event, context) - - self._update_min_depth_for_room_txn( - txn, - event.room_id, - event.depth - ) - - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, - ) - - have_persisted = self._simple_select_one_onecol_txn( - txn, - table="event_json", - keyvalues={"event_id": event.event_id}, - retcol="event_id", - allow_none=True, - ) - - metadata_json = encode_canonical_json( - event.internal_metadata.get_dict() - ) - - # If we have already persisted this event, we don't need to do any - # more processing. - # The processing above must be done on every call to persist event, - # since they might not have happened on previous calls. For example, - # if we are persisting an event that we had persisted as an outlier, - # but is no longer one. - if have_persisted: - if not outlier: - sql = ( - "UPDATE event_json SET internal_metadata = ?" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (metadata_json.decode("UTF-8"), event.event_id,) - ) - - sql = ( - "UPDATE events SET outlier = 0" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (event.event_id,) - ) - return - - if event.type == EventTypes.Member: - self._store_room_member_txn(txn, event) - elif event.type == EventTypes.Feedback: - self._store_feedback_txn(txn, event) - elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, event) - elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, event) - elif event.type == EventTypes.Redaction: - self._store_redaction(txn, event) - - event_dict = { - k: v - for k, v in event.get_dict().items() - if k not in [ - "redacted", - "redacted_because", - ] - } - - self._simple_insert_txn( - txn, - table="event_json", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "internal_metadata": metadata_json.decode("UTF-8"), - "json": encode_canonical_json(event_dict).decode("UTF-8"), - }, - or_replace=True, - ) - - content = encode_canonical_json( - event.content - ).decode("UTF-8") - - vals = { - "topological_ordering": event.depth, - "event_id": event.event_id, - "type": event.type, - "room_id": event.room_id, - "content": content, - "processed": True, - "outlier": outlier, - "depth": event.depth, - } - - if stream_ordering is not None: - vals["stream_ordering"] = stream_ordering - - unrec = { - k: v - for k, v in event.get_dict().items() - if k not in vals.keys() and k not in [ - "redacted", - "redacted_because", - "signatures", - "hashes", - "prev_events", - ] - } - - vals["unrecognized_keys"] = encode_canonical_json( - unrec - ).decode("UTF-8") - - try: - self._simple_insert_txn( - txn, - "events", - vals, - or_replace=(not outlier), - or_ignore=bool(outlier), - ) - except: - logger.warn( - "Failed to persist, probably duplicate: %s", - event.event_id, - exc_info=True, - ) - raise _RollbackButIsFineException("_persist_event") - - if context.rejected: - self._store_rejections_txn(txn, event.event_id, context.rejected) - - if event.is_state(): - vals = { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - } - - # TODO: How does this work with backfilling? - if hasattr(event, "replaces_state"): - vals["prev_state"] = event.replaces_state - - self._simple_insert_txn( - txn, - "state_events", - vals, - or_replace=True, - ) - - if is_new_state and not context.rejected: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - or_replace=True, - ) - - for e_id, h in event.prev_state: - self._simple_insert_txn( - txn, - table="event_edges", - values={ - "event_id": event.event_id, - "prev_event_id": e_id, - "room_id": event.room_id, - "is_state": 1, - }, - or_ignore=True, - ) - - for hash_alg, hash_base64 in event.hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_event_content_hash_txn( - txn, event.event_id, hash_alg, hash_bytes, - ) - - for prev_event_id, prev_hashes in event.prev_events: - for alg, hash_base64 in prev_hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_prev_event_hash_txn( - txn, event.event_id, prev_event_id, alg, hash_bytes - ) - - for auth_id, _ in event.auth_events: - self._simple_insert_txn( - txn, - table="event_auth", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "auth_id": auth_id, - }, - or_ignore=True, - ) - - (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) - self._store_event_reference_hash_txn( - txn, event.event_id, ref_alg, ref_hash_bytes - ) - - def _store_redaction(self, txn, event): - # invalidate the cache for the redacted event - self._get_event_cache.pop(event.redacts) - txn.execute( - "INSERT OR IGNORE INTO redactions " - "(event_id, redacts) VALUES (?,?)", - (event.event_id, event.redacts) + self.client_ip_last_seen = Cache( + name="client_ip_last_seen", + keylen=4, ) @defer.inlineCallbacks - def get_current_state(self, room_id, event_type=None, state_key=""): - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = e.event_id " - "LIMIT 1" - ) - - sql = ( - "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " - "INNER JOIN current_state_events as c ON e.event_id = c.event_id " - "INNER JOIN state_events as s ON e.event_id = s.event_id " - "WHERE c.room_id = ? " - ) % { - "redacted": del_sql, - } - - if event_type and state_key is not None: - sql += " AND s.type = ? AND s.state_key = ? " - args = (room_id, event_type, state_key) - elif event_type: - sql += " AND s.type = ?" - args = (room_id, event_type) - else: - args = (room_id, ) - - results = yield self._execute_and_decode("get_current_state", sql, *args) - - events = yield self._parse_events(results) - defer.returnValue(events) - - @defer.inlineCallbacks - def get_room_name_and_aliases(self, room_id): - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = e.event_id " - "LIMIT 1" - ) - - sql = ( - "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " - "INNER JOIN current_state_events as c ON e.event_id = c.event_id " - "INNER JOIN state_events as s ON e.event_id = s.event_id " - "WHERE c.room_id = ? " - ) % { - "redacted": del_sql, - } - - sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')" - sql += " OR s.type = 'm.room.aliases')" - args = (room_id,) - - results = yield self._execute_and_decode("get_current_state", sql, *args) - - events = yield self._parse_events(results) - - name = None - aliases = [] - - for e in events: - if e.type == 'm.room.name': - if 'name' in e.content: - name = e.content['name'] - elif e.type == 'm.room.aliases': - if 'aliases' in e.content: - aliases.extend(e.content['aliases']) - - defer.returnValue((name, aliases)) - - @defer.inlineCallbacks - def _get_min_token(self): - row = yield self._execute( - "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events" - ) + def insert_client_ip(self, user, access_token, device_id, ip, user_agent): + now = int(self._clock.time_msec()) + key = (user.to_string(), access_token, device_id, ip) - self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 - self.min_token = min(self.min_token, -1) + try: + last_seen = self.client_ip_last_seen.get(*key) + except KeyError: + last_seen = None - logger.debug("min_token is: %s", self.min_token) + # Rate-limited inserts + if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: + defer.returnValue(None) - defer.returnValue(self.min_token) + self.client_ip_last_seen.prefill(*key + (now,)) - def insert_client_ip(self, user, access_token, device_id, ip, user_agent): - return self._simple_insert( + # It's safe not to lock here: a) no unique constraint, + # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely + yield self._simple_upsert( "user_ips", - { - "user": user.to_string(), + keyvalues={ + "user_id": user.to_string(), "access_token": access_token, - "device_id": device_id, "ip": ip, "user_agent": user_agent, - "last_seen": int(self._clock.time_msec()), - } + }, + values={ + "device_id": device_id, + "last_seen": now, + }, + desc="insert_client_ip", + lock=False, ) def get_user_ip_and_agents(self, user): return self._simple_select_list( table="user_ips", - keyvalues={"user": user.to_string()}, + keyvalues={"user_id": user.to_string()}, retcols=[ "device_id", "access_token", "ip", "user_agent", "last_seen" ], - ) - - def have_events(self, event_ids): - """Given a list of event ids, check if we have already processed them. - - Returns: - dict: Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps to - None. - """ - if not event_ids: - return defer.succeed({}) - - def f(txn): - sql = ( - "SELECT e.event_id, reason FROM events as e " - "LEFT JOIN rejections as r ON e.event_id = r.event_id " - "WHERE e.event_id = ?" - ) - - res = {} - for event_id in event_ids: - txn.execute(sql, (event_id,)) - row = txn.fetchone() - if row: - _, rejected = row - res[event_id] = rejected - - return res - - return self.runInteraction( - "have_events", f, + desc="get_user_ip_and_agents", ) @@ -580,21 +153,23 @@ class UpgradeDatabaseException(PrepareDatabaseException): pass -def prepare_database(db_conn): +def prepare_database(db_conn, database_engine): """Prepares a database for usage. Will either create all necessary tables or upgrade from an older schema version. """ try: cur = db_conn.cursor() - version_info = _get_or_create_schema_state(cur) + version_info = _get_or_create_schema_state(cur, database_engine) if version_info: user_version, delta_files, upgraded = version_info - _upgrade_existing_database(cur, user_version, delta_files, upgraded) + _upgrade_existing_database( + cur, user_version, delta_files, upgraded, database_engine + ) else: - _setup_new_database(cur) + _setup_new_database(cur, database_engine) - cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,)) + # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,)) cur.close() db_conn.commit() @@ -603,7 +178,7 @@ def prepare_database(db_conn): raise -def _setup_new_database(cur): +def _setup_new_database(cur, database_engine): """Sets up the database by finding a base set of "full schemas" and then applying any necessary deltas. @@ -657,31 +232,30 @@ def _setup_new_database(cur): directory_entries = os.listdir(sql_dir) - sql_script = "BEGIN TRANSACTION;\n" for filename in fnmatch.filter(directory_entries, "*.sql"): sql_loc = os.path.join(sql_dir, filename) logger.debug("Applying schema %s", sql_loc) - sql_script += read_schema(sql_loc) - sql_script += "\n" - sql_script += "COMMIT TRANSACTION;" - cur.executescript(sql_script) + executescript(cur, sql_loc) cur.execute( - "INSERT OR REPLACE INTO schema_version (version, upgraded)" - " VALUES (?,?)", - (max_current_ver, False) + database_engine.convert_param_style( + "INSERT INTO schema_version (version, upgraded)" + " VALUES (?,?)" + ), + (max_current_ver, False,) ) _upgrade_existing_database( cur, current_version=max_current_ver, applied_delta_files=[], - upgraded=False + upgraded=False, + database_engine=database_engine, ) def _upgrade_existing_database(cur, current_version, applied_delta_files, - upgraded): + upgraded, database_engine): """Upgrades an existing database. Delta files can either be SQL stored in *.sql files, or python modules @@ -737,6 +311,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, if not upgraded: start_ver += 1 + logger.debug("applied_delta_files: %s", applied_delta_files) + for v in range(start_ver, SCHEMA_VERSION + 1): logger.debug("Upgrading schema to v%d", v) @@ -753,6 +329,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, directory_entries.sort() for file_name in directory_entries: relative_path = os.path.join(str(v), file_name) + logger.debug("Found file: %s", relative_path) if relative_path in applied_delta_files: continue @@ -774,9 +351,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, module.run_upgrade(cur) elif ext == ".sql": # A plain old .sql file, just read and execute it - delta_schema = read_schema(absolute_path) logger.debug("Applying schema %s", relative_path) - cur.executescript(delta_schema) + executescript(cur, absolute_path) else: # Not a valid delta file. logger.warn( @@ -788,24 +364,83 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, # Mark as done. cur.execute( - "INSERT INTO applied_schema_deltas (version, file)" - " VALUES (?,?)", + database_engine.convert_param_style( + "INSERT INTO applied_schema_deltas (version, file)" + " VALUES (?,?)", + ), (v, relative_path) ) + cur.execute("DELETE FROM schema_version") cur.execute( - "INSERT OR REPLACE INTO schema_version (version, upgraded)" - " VALUES (?,?)", + database_engine.convert_param_style( + "INSERT INTO schema_version (version, upgraded)" + " VALUES (?,?)", + ), (v, True) ) -def _get_or_create_schema_state(txn): +def get_statements(f): + statement_buffer = "" + in_comment = False # If we're in a /* ... */ style comment + + for line in f: + line = line.strip() + + if in_comment: + # Check if this line contains an end to the comment + comments = line.split("*/", 1) + if len(comments) == 1: + continue + line = comments[1] + in_comment = False + + # Remove inline block comments + line = re.sub(r"/\*.*\*/", " ", line) + + # Does this line start a comment? + comments = line.split("/*", 1) + if len(comments) > 1: + line = comments[0] + in_comment = True + + # Deal with line comments + line = line.split("--", 1)[0] + line = line.split("//", 1)[0] + + # Find *all* semicolons. We need to treat first and last entry + # specially. + statements = line.split(";") + + # We must prepend statement_buffer to the first statement + first_statement = "%s %s" % ( + statement_buffer.strip(), + statements[0].strip() + ) + statements[0] = first_statement + + # Every entry, except the last, is a full statement + for statement in statements[:-1]: + yield statement.strip() + + # The last entry did *not* end in a semicolon, so we store it for the + # next semicolon we find + statement_buffer = statements[-1].strip() + + +def executescript(txn, schema_path): + with open(schema_path, 'r') as f: + for statement in get_statements(f): + txn.execute(statement) + + +def _get_or_create_schema_state(txn, database_engine): + # Bluntly try creating the schema_version tables. schema_path = os.path.join( dir_path, "schema", "schema_version.sql", ) - create_schema = read_schema(schema_path) - txn.executescript(create_schema) + executescript(txn, schema_path) txn.execute("SELECT version, upgraded FROM schema_version") row = txn.fetchone() @@ -814,10 +449,13 @@ def _get_or_create_schema_state(txn): if current_version: txn.execute( - "SELECT file FROM applied_schema_deltas WHERE version >= ?", + database_engine.convert_param_style( + "SELECT file FROM applied_schema_deltas WHERE version >= ?" + ), (current_version,) ) - return current_version, txn.fetchall(), upgraded + applied_deltas = [d for d, in txn.fetchall()] + return current_version, applied_deltas, upgraded return None @@ -849,7 +487,19 @@ def prepare_sqlite3_database(db_conn): if row and row[0]: db_conn.execute( - "INSERT OR REPLACE INTO schema_version (version, upgraded)" + "REPLACE INTO schema_version (version, upgraded)" " VALUES (?,?)", (row[0], False) ) + + +def are_all_users_on_domain(txn, database_engine, domain): + sql = database_engine.convert_param_style( + "SELECT COUNT(*) FROM users WHERE name NOT LIKE ?" + ) + pat = "%:" + domain + txn.execute(sql, (pat,)) + num_not_matching = txn.fetchall()[0][0] + if num_not_matching == 0: + return True + return False diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 9125bb1198..ee5587c721 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -22,6 +22,8 @@ from synapse.util.logcontext import PreserveLoggingContext, LoggingContext from synapse.util.lrucache import LruCache import synapse.metrics +from util.id_generators import IdGenerator, StreamIdGenerator + from twisted.internet import defer from collections import namedtuple, OrderedDict @@ -29,12 +31,15 @@ import functools import simplejson as json import sys import time +import threading +DEBUG_CACHES = False logger = logging.getLogger(__name__) sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") +perf_logger = logging.getLogger("synapse.storage.TIME") metrics = synapse.metrics.get_metrics_for("synapse.storage") @@ -53,14 +58,78 @@ cache_counter = metrics.register_cache( ) -# TODO(paul): -# * more generic key management -# * consider other eviction strategies - LRU? -def cached(max_entries=1000): +class Cache(object): + + def __init__(self, name, max_entries=1000, keylen=1, lru=False): + if lru: + self.cache = LruCache(max_size=max_entries) + self.max_entries = None + else: + self.cache = OrderedDict() + self.max_entries = max_entries + + self.name = name + self.keylen = keylen + self.sequence = 0 + self.thread = None + caches_by_name[name] = self.cache + + def check_thread(self): + expected_thread = self.thread + if expected_thread is None: + self.thread = threading.current_thread() + else: + if expected_thread is not threading.current_thread(): + raise ValueError( + "Cache objects can only be accessed from the main thread" + ) + + def get(self, *keyargs): + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + if keyargs in self.cache: + cache_counter.inc_hits(self.name) + return self.cache[keyargs] + + cache_counter.inc_misses(self.name) + raise KeyError() + + def update(self, sequence, *args): + self.check_thread() + if self.sequence == sequence: + # Only update the cache if the caches sequence number matches the + # number that the cache had before the SELECT was started (SYN-369) + self.prefill(*args) + + def prefill(self, *args): # because I can't *keyargs, value + keyargs = args[:-1] + value = args[-1] + + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + if self.max_entries is not None: + while len(self.cache) >= self.max_entries: + self.cache.popitem(last=False) + + self.cache[keyargs] = value + + def invalidate(self, *keyargs): + self.check_thread() + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + # Increment the sequence number so that any SELECT statements that + # raced with the INSERT don't update the cache (SYN-369) + self.sequence += 1 + self.cache.pop(keyargs, None) + + +def cached(max_entries=1000, num_args=1, lru=False): """ A method decorator that applies a memoizing cache around the function. - The function is presumed to take one additional argument, which is used as - the key for the cache. Cache hits are served directly from the cache; + The function is presumed to take zero or more arguments, which are used in + a tuple as the key for the cache. Hits are served directly from the cache; misses use the function body to generate the value. The wrapped function has an additional member, a callable called @@ -71,34 +140,42 @@ def cached(max_entries=1000): calling the calculation function. """ def wrap(orig): - cache = OrderedDict() - name = orig.__name__ - - caches_by_name[name] = cache - - def prefill(key, value): - while len(cache) > max_entries: - cache.popitem(last=False) - - cache[key] = value + cache = Cache( + name=orig.__name__, + max_entries=max_entries, + keylen=num_args, + lru=lru, + ) @functools.wraps(orig) @defer.inlineCallbacks - def wrapped(self, key): - if key in cache: - cache_counter.inc_hits(name) - defer.returnValue(cache[key]) - - cache_counter.inc_misses(name) - ret = yield orig(self, key) - prefill(key, ret) - defer.returnValue(ret) - - def invalidate(key): - cache.pop(key, None) - - wrapped.invalidate = invalidate - wrapped.prefill = prefill + def wrapped(self, *keyargs): + try: + cached_result = cache.get(*keyargs) + if DEBUG_CACHES: + actual_result = yield orig(self, *keyargs) + if actual_result != cached_result: + logger.error( + "Stale cache entry %s%r: cached: %r, actual %r", + orig.__name__, keyargs, + cached_result, actual_result, + ) + raise ValueError("Stale cache entry") + defer.returnValue(cached_result) + except KeyError: + # Get the sequence number of the cache before reading from the + # database so that we can tell if the cache is invalidated + # while the SELECT is executing (SYN-369) + sequence = cache.sequence + + ret = yield orig(self, *keyargs) + + cache.update(sequence, *keyargs + (ret,)) + + defer.returnValue(ret) + + wrapped.invalidate = cache.invalidate + wrapped.prefill = cache.prefill return wrapped return wrap @@ -108,11 +185,20 @@ class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() method.""" - __slots__ = ["txn", "name"] + __slots__ = ["txn", "name", "database_engine", "after_callbacks"] - def __init__(self, txn, name): + def __init__(self, txn, name, database_engine, after_callbacks): object.__setattr__(self, "txn", txn) object.__setattr__(self, "name", name) + object.__setattr__(self, "database_engine", database_engine) + object.__setattr__(self, "after_callbacks", after_callbacks) + + def call_after(self, callback, *args): + """Call the given callback on the main twisted thread after the + transaction has finished. Used to invalidate the caches on the + correct thread. + """ + self.after_callbacks.append((callback, args)) def __getattr__(self, name): return getattr(self.txn, name) @@ -120,30 +206,37 @@ class LoggingTransaction(object): def __setattr__(self, name, value): setattr(self.txn, name, value) - def execute(self, sql, *args, **kwargs): + def execute(self, sql, *args): + self._do_execute(self.txn.execute, sql, *args) + + def executemany(self, sql, *args): + self._do_execute(self.txn.executemany, sql, *args) + + def _do_execute(self, func, sql, *args): # TODO(paul): Maybe use 'info' and 'debug' for values? sql_logger.debug("[SQL] {%s} %s", self.name, sql) - try: - if args and args[0]: - values = args[0] + sql = self.database_engine.convert_param_style(sql) + + if args: + try: sql_logger.debug( - "[SQL values] {%s} " + ", ".join(("<%r>",) * len(values)), - self.name, - *values + "[SQL values] {%s} %r", + self.name, args[0] ) - except: - # Don't let logging failures stop SQL from working - pass + except: + # Don't let logging failures stop SQL from working + pass start = time.time() * 1000 + try: - return self.txn.execute( - sql, *args, **kwargs + return func( + sql, *args ) - except: - logger.exception("[SQL FAIL] {%s}", self.name) - raise + except Exception as e: + logger.debug("[SQL FAIL] {%s} %s", self.name, e) + raise finally: msecs = (time.time() * 1000) - start sql_logger.debug("[SQL time] {%s} %f", self.name, msecs) @@ -205,10 +298,16 @@ class SQLBaseStore(object): self._txn_perf_counters = PerformanceCounters() self._get_event_counters = PerformanceCounters() - self._get_event_cache = LruCache(hs.config.event_cache_size) + self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, + max_entries=hs.config.event_cache_size) + + self.database_engine = hs.database_engine - # Pretend the getEventCache is just another named cache - caches_by_name["*getEvent*"] = self._get_event_cache + self._stream_id_gen = StreamIdGenerator() + self._transaction_id_gen = IdGenerator("sent_transactions", "id", self) + self._state_groups_id_gen = IdGenerator("state_groups", "id", self) + self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self) + self._pushers_id_gen = IdGenerator("pushers", "id", self) def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() @@ -232,7 +331,7 @@ class SQLBaseStore(object): time_now - time_then, limit=3 ) - logger.info( + perf_logger.info( "Total database time: %.3f%% {%s} {%s}", ratio * 100, top_three_counters, top_3_event_counters ) @@ -246,8 +345,14 @@ class SQLBaseStore(object): start_time = time.time() * 1000 - def inner_func(txn, *args, **kwargs): + after_callbacks = [] + + def inner_func(conn, *args, **kwargs): with LoggingContext("runInteraction") as context: + if self.database_engine.is_connection_closed(conn): + logger.debug("Reconnecting closed database connection") + conn.reconnect() + current_context.copy_to(context) start = time.time() * 1000 txn_id = self._TXN_ID @@ -261,9 +366,48 @@ class SQLBaseStore(object): sql_scheduling_timer.inc_by(time.time() * 1000 - start_time) transaction_logger.debug("[TXN START] {%s}", name) try: - return func(LoggingTransaction(txn, name), *args, **kwargs) - except: - logger.exception("[TXN FAIL] {%s}", name) + i = 0 + N = 5 + while True: + try: + txn = conn.cursor() + txn = LoggingTransaction( + txn, name, self.database_engine, after_callbacks + ) + return func(txn, *args, **kwargs) + except self.database_engine.module.OperationalError as e: + # This can happen if the database disappears mid + # transaction. + logger.warn( + "[TXN OPERROR] {%s} %s %d/%d", + name, e, i, N + ) + if i < N: + i += 1 + try: + conn.rollback() + except self.database_engine.module.Error as e1: + logger.warn( + "[TXN EROLL] {%s} %s", + name, e1, + ) + continue + except self.database_engine.module.DatabaseError as e: + if self.database_engine.is_deadlock(e): + logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N) + if i < N: + i += 1 + try: + conn.rollback() + except self.database_engine.module.Error as e1: + logger.warn( + "[TXN EROLL] {%s} %s", + name, e1, + ) + continue + raise + except Exception as e: + logger.debug("[TXN FAIL] {%s} %s", name, e) raise finally: end = time.time() * 1000 @@ -276,9 +420,11 @@ class SQLBaseStore(object): sql_txn_timer.inc_by(duration, desc) with PreserveLoggingContext(): - result = yield self._db_pool.runInteraction( + result = yield self._db_pool.runWithConnection( inner_func, *args, **kwargs ) + for after_callback, after_args in after_callbacks: + after_callback(*after_args) defer.returnValue(result) def cursor_to_dict(self, cursor): @@ -307,11 +453,11 @@ class SQLBaseStore(object): The result of decoder(results) """ def interaction(txn): - cursor = txn.execute(query, args) + txn.execute(query, args) if decoder: - return decoder(cursor) + return decoder(txn) else: - return cursor.fetchall() + return txn.fetchall() return self.runInteraction(desc, interaction) @@ -321,53 +467,94 @@ class SQLBaseStore(object): # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. - def _simple_insert(self, table, values, or_replace=False, or_ignore=False): + @defer.inlineCallbacks + def _simple_insert(self, table, values, or_ignore=False, + desc="_simple_insert"): """Executes an INSERT query on the named table. Args: table : string giving the table name values : dict of new column names and values for them - or_replace : bool; if True performs an INSERT OR REPLACE """ - return self.runInteraction( - "_simple_insert", - self._simple_insert_txn, table, values, or_replace=or_replace, - or_ignore=or_ignore, - ) + try: + yield self.runInteraction( + desc, + self._simple_insert_txn, table, values, + ) + except self.database_engine.module.IntegrityError: + # We have to do or_ignore flag at this layer, since we can't reuse + # a cursor after we receive an error from the db. + if not or_ignore: + raise @log_function - def _simple_insert_txn(self, txn, table, values, or_replace=False, - or_ignore=False): - sql = "%s INTO %s (%s) VALUES(%s)" % ( - ("INSERT OR REPLACE" if or_replace else - "INSERT OR IGNORE" if or_ignore else "INSERT"), + def _simple_insert_txn(self, txn, table, values): + keys, vals = zip(*values.items()) + + sql = "INSERT INTO %s (%s) VALUES(%s)" % ( table, - ", ".join(k for k in values), - ", ".join("?" for k in values) + ", ".join(k for k in keys), + ", ".join("?" for _ in keys) ) - logger.debug( - "[SQL] %s Args=%s", - sql, values.values(), + txn.execute(sql, vals) + + def _simple_insert_many_txn(self, txn, table, values): + if not values: + return + + # This is a *slight* abomination to get a list of tuples of key names + # and a list of tuples of value names. + # + # i.e. [{"a": 1, "b": 2}, {"c": 3, "d": 4}] + # => [("a", "b",), ("c", "d",)] and [(1, 2,), (3, 4,)] + # + # The sort is to ensure that we don't rely on dictionary iteration + # order. + keys, vals = zip(*[ + zip( + *(sorted(i.items(), key=lambda kv: kv[0])) + ) + for i in values + if i + ]) + + for k in keys: + if k != keys[0]: + raise RuntimeError( + "All items must have the same keys" + ) + + sql = "INSERT INTO %s (%s) VALUES(%s)" % ( + table, + ", ".join(k for k in keys[0]), + ", ".join("?" for _ in keys[0]) ) - txn.execute(sql, values.values()) - return txn.lastrowid + txn.executemany(sql, vals) - def _simple_upsert(self, table, keyvalues, values): + def _simple_upsert(self, table, keyvalues, values, + insertion_values={}, desc="_simple_upsert", lock=True): """ Args: table (str): The table to upsert into keyvalues (dict): The unique key tables and their new values values (dict): The nonunique columns and their new values + insertion_values (dict): key/values to use when inserting Returns: A deferred """ return self.runInteraction( - "_simple_upsert", - self._simple_upsert_txn, table, keyvalues, values + desc, + self._simple_upsert_txn, table, keyvalues, values, insertion_values, + lock ) - def _simple_upsert_txn(self, txn, table, keyvalues, values): + def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}, + lock=True): + # We need to lock the table :(, unless we're *really* careful + if lock: + self.database_engine.lock_table(txn, table) + # Try to update sql = "UPDATE %s SET %s WHERE %s" % ( table, @@ -386,6 +573,7 @@ class SQLBaseStore(object): allvalues = {} allvalues.update(keyvalues) allvalues.update(values) + allvalues.update(insertion_values) sql = "INSERT INTO %s (%s) VALUES (%s)" % ( table, @@ -399,7 +587,7 @@ class SQLBaseStore(object): txn.execute(sql, allvalues.values()) def _simple_select_one(self, table, keyvalues, retcols, - allow_none=False): + allow_none=False, desc="_simple_select_one"): """Executes a SELECT query on the named table, which is expected to return a single row, returning a single column from it. @@ -411,12 +599,15 @@ class SQLBaseStore(object): allow_none : If true, return None instead of failing if the SELECT statement returns no rows """ - return self._simple_selectupdate_one( - table, keyvalues, retcols=retcols, allow_none=allow_none + return self.runInteraction( + desc, + self._simple_select_one_txn, + table, keyvalues, retcols, allow_none, ) def _simple_select_one_onecol(self, table, keyvalues, retcol, - allow_none=False): + allow_none=False, + desc="_simple_select_one_onecol"): """Executes a SELECT query on the named table, which is expected to return a single row, returning a single column from it." @@ -426,7 +617,7 @@ class SQLBaseStore(object): retcol : string giving the name of the column to return """ return self.runInteraction( - "_simple_select_one_onecol", + desc, self._simple_select_one_onecol_txn, table, keyvalues, retcol, allow_none=allow_none, ) @@ -450,8 +641,7 @@ class SQLBaseStore(object): def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol): sql = ( - "SELECT %(retcol)s FROM %(table)s WHERE %(where)s " - "ORDER BY rowid asc" + "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" ) % { "retcol": retcol, "table": table, @@ -462,7 +652,8 @@ class SQLBaseStore(object): return [r[0] for r in txn.fetchall()] - def _simple_select_onecol(self, table, keyvalues, retcol): + def _simple_select_onecol(self, table, keyvalues, retcol, + desc="_simple_select_onecol"): """Executes a SELECT query on the named table, which returns a list comprising of the values of the named column from the selected rows. @@ -475,12 +666,13 @@ class SQLBaseStore(object): Deferred: Results in a list """ return self.runInteraction( - "_simple_select_onecol", + desc, self._simple_select_onecol_txn, table, keyvalues, retcol ) - def _simple_select_list(self, table, keyvalues, retcols): + def _simple_select_list(self, table, keyvalues, retcols, + desc="_simple_select_list"): """Executes a SELECT query on the named table, which may return zero or more rows, returning the result as a list of dicts. @@ -491,7 +683,7 @@ class SQLBaseStore(object): retcols : list of strings giving the names of the columns to return """ return self.runInteraction( - "_simple_select_list", + desc, self._simple_select_list_txn, table, keyvalues, retcols ) @@ -507,14 +699,14 @@ class SQLBaseStore(object): retcols : list of strings giving the names of the columns to return """ if keyvalues: - sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( + sql = "SELECT %s FROM %s WHERE %s" % ( ", ".join(retcols), table, " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) txn.execute(sql, keyvalues.values()) else: - sql = "SELECT %s FROM %s ORDER BY rowid asc" % ( + sql = "SELECT %s FROM %s" % ( ", ".join(retcols), table ) @@ -523,7 +715,7 @@ class SQLBaseStore(object): return self.cursor_to_dict(txn) def _simple_update_one(self, table, keyvalues, updatevalues, - retcols=None): + desc="_simple_update_one"): """Executes an UPDATE query on the named table, setting new values for columns in a row matching the key values. @@ -541,56 +733,81 @@ class SQLBaseStore(object): get-and-set. This can be used to implement compare-and-set by putting the update column in the 'keyvalues' dict as well. """ - return self._simple_selectupdate_one(table, keyvalues, updatevalues, - retcols=retcols) + return self.runInteraction( + desc, + self._simple_update_one_txn, + table, keyvalues, updatevalues, + ) - def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None, - retcols=None, allow_none=False): - """ Combined SELECT then UPDATE.""" - if retcols: - select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( - ", ".join(retcols), - table, - " AND ".join("%s = ?" % (k) for k in keyvalues) - ) + def _simple_update_one_txn(self, txn, table, keyvalues, updatevalues): + update_sql = "UPDATE %s SET %s WHERE %s" % ( + table, + ", ".join("%s = ?" % (k,) for k in updatevalues), + " AND ".join("%s = ?" % (k,) for k in keyvalues) + ) - if updatevalues: - update_sql = "UPDATE %s SET %s WHERE %s" % ( - table, - ", ".join("%s = ?" % (k,) for k in updatevalues), - " AND ".join("%s = ?" % (k,) for k in keyvalues) - ) + txn.execute( + update_sql, + updatevalues.values() + keyvalues.values() + ) + + if txn.rowcount == 0: + raise StoreError(404, "No row found") + if txn.rowcount > 1: + raise StoreError(500, "More than one row matched") + + def _simple_select_one_txn(self, txn, table, keyvalues, retcols, + allow_none=False): + select_sql = "SELECT %s FROM %s WHERE %s" % ( + ", ".join(retcols), + table, + " AND ".join("%s = ?" % (k,) for k in keyvalues) + ) + txn.execute(select_sql, keyvalues.values()) + + row = txn.fetchone() + if not row: + if allow_none: + return None + raise StoreError(404, "No row found") + if txn.rowcount > 1: + raise StoreError(500, "More than one row matched") + + return dict(zip(retcols, row)) + + def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None, + retcols=None, allow_none=False, + desc="_simple_selectupdate_one"): + """ Combined SELECT then UPDATE.""" def func(txn): ret = None if retcols: - txn.execute(select_sql, keyvalues.values()) - - row = txn.fetchone() - if not row: - if allow_none: - return None - raise StoreError(404, "No row found") - if txn.rowcount > 1: - raise StoreError(500, "More than one row matched") - - ret = dict(zip(retcols, row)) + ret = self._simple_select_one_txn( + txn, + table=table, + keyvalues=keyvalues, + retcols=retcols, + allow_none=allow_none, + ) if updatevalues: - txn.execute( - update_sql, - updatevalues.values() + keyvalues.values() + self._simple_update_one_txn( + txn, + table=table, + keyvalues=keyvalues, + updatevalues=updatevalues, ) - if txn.rowcount == 0: - raise StoreError(404, "No row found") + # if txn.rowcount == 0: + # raise StoreError(404, "No row found") if txn.rowcount > 1: raise StoreError(500, "More than one row matched") return ret - return self.runInteraction("_simple_selectupdate_one", func) + return self.runInteraction(desc, func) - def _simple_delete_one(self, table, keyvalues): + def _simple_delete_one(self, table, keyvalues, desc="_simple_delete_one"): """Executes a DELETE query on the named table, expecting to delete a single row. @@ -609,9 +826,9 @@ class SQLBaseStore(object): raise StoreError(404, "No row found") if txn.rowcount > 1: raise StoreError(500, "more than one row matched") - return self.runInteraction("_simple_delete_one", func) + return self.runInteraction(desc, func) - def _simple_delete(self, table, keyvalues): + def _simple_delete(self, table, keyvalues, desc="_simple_delete"): """Executes a DELETE query on the named table. Args: @@ -619,7 +836,7 @@ class SQLBaseStore(object): keyvalues : dict of column names and values to select the row with """ - return self.runInteraction("_simple_delete", self._simple_delete_txn) + return self.runInteraction(desc, self._simple_delete_txn) def _simple_delete_txn(self, txn, table, keyvalues): sql = "DELETE FROM %s WHERE %s" % ( @@ -670,6 +887,12 @@ class SQLBaseStore(object): return [e for e in events if e] + def _invalidate_get_event_cache(self, event_id): + for check_redacted in (False, True): + for get_prev_content in (False, True): + self._get_event_cache.invalidate(event_id, check_redacted, + get_prev_content) + def _get_event_txn(self, txn, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False): @@ -680,16 +903,14 @@ class SQLBaseStore(object): sql_getevents_timer.inc_by(curr_time - last_time, desc) return curr_time - cache = self._get_event_cache.setdefault(event_id, {}) - try: - # Separate cache entries for each way to invoke _get_event_txn - ret = cache[(check_redacted, get_prev_content, allow_rejected)] + ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) - cache_counter.inc_hits("*getEvent*") - return ret + if allow_rejected or not ret.rejected_reason: + return ret + else: + return None except KeyError: - cache_counter.inc_misses("*getEvent*") pass finally: start_time = update_counter("event_cache", start_time) @@ -714,19 +935,22 @@ class SQLBaseStore(object): start_time = update_counter("select_event", start_time) + result = self._get_event_from_row_txn( + txn, internal_metadata, js, redacted, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=rejected_reason, + ) + self._get_event_cache.prefill(event_id, check_redacted, get_prev_content, result) + if allow_rejected or not rejected_reason: - result = self._get_event_from_row_txn( - txn, internal_metadata, js, redacted, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - ) - cache[(check_redacted, get_prev_content, allow_rejected)] = result return result else: return None def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False): + check_redacted=True, get_prev_content=False, + rejected_reason=None): start_time = time.time() * 1000 @@ -741,7 +965,11 @@ class SQLBaseStore(object): internal_metadata = json.loads(internal_metadata) start_time = update_counter("decode_internal", start_time) - ev = FrozenEvent(d, internal_metadata_dict=internal_metadata) + ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) start_time = update_counter("build_frozen_event", start_time) if check_redacted and redacted: @@ -788,6 +1016,19 @@ class SQLBaseStore(object): result = txn.fetchone() return result[0] if result else None + def get_next_stream_id(self): + with self._next_stream_id_lock: + i = self._next_stream_id + self._next_stream_id += 1 + return i + + +class _RollbackButIsFineException(Exception): + """ This exception is used to rollback a transaction without implying + something went wrong. + """ + pass + class Table(object): """ A base class used to store information about a particular table. @@ -804,7 +1045,7 @@ class Table(object): _select_where_clause = "SELECT %s FROM %s WHERE %s" _select_clause = "SELECT %s FROM %s" - _insert_clause = "INSERT OR REPLACE INTO %s (%s) VALUES (%s)" + _insert_clause = "REPLACE INTO %s (%s) VALUES (%s)" @classmethod def select_statement(cls, where_clause=None): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 850676ce6c..39b7881c40 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -13,154 +13,35 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import simplejson +import urllib +import yaml from simplejson import JSONDecodeError +import simplejson as json from twisted.internet import defer from synapse.api.constants import Membership -from synapse.api.errors import StoreError -from synapse.appservice import ApplicationService +from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.storage.roommember import RoomsForUser +from synapse.types import UserID from ._base import SQLBaseStore logger = logging.getLogger(__name__) -def log_failure(failure): - logger.error("Failed to detect application services: %s", failure.value) - logger.error(failure.getTraceback()) - - class ApplicationServiceStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceStore, self).__init__(hs) + self.hostname = hs.hostname self.services_cache = [] - self.cache_defer = self._populate_cache() - self.cache_defer.addErrback(log_failure) - - @defer.inlineCallbacks - def unregister_app_service(self, token): - """Unregisters this service. - - This removes all AS specific regex and the base URL. The token is the - only thing preserved for future registration attempts. - """ - yield self.cache_defer # make sure the cache is ready - yield self.runInteraction( - "unregister_app_service", - self._unregister_app_service_txn, - token, - ) - # update cache TODO: Should this be in the txn? - for service in self.services_cache: - if service.token == token: - service.url = None - service.namespaces = None - service.hs_token = None - - def _unregister_app_service_txn(self, txn, token): - # kill the url to prevent pushes - txn.execute( - "UPDATE application_services SET url=NULL WHERE token=?", - (token,) - ) - - # cleanup regex - as_id = self._get_as_id_txn(txn, token) - if not as_id: - logger.warning( - "unregister_app_service_txn: Failed to find as_id for token=", - token - ) - return False - - txn.execute( - "DELETE FROM application_services_regex WHERE as_id=?", - (as_id,) + self._populate_appservice_cache( + hs.config.app_service_config_files ) - return True - @defer.inlineCallbacks - def update_app_service(self, service): - """Update an application service, clobbering what was previously there. - - Args: - service(ApplicationService): The updated service. - """ - yield self.cache_defer # make sure the cache is ready - - # NB: There is no "insert" since we provide no public-facing API to - # allocate new ASes. It relies on the server admin inserting the AS - # token into the database manually. - - if not service.token or not service.url: - raise StoreError(400, "Token and url must be specified.") - - if not service.hs_token: - raise StoreError(500, "No HS token") - - yield self.runInteraction( - "update_app_service", - self._update_app_service_txn, - service - ) - - # update cache TODO: Should this be in the txn? - for (index, cache_service) in enumerate(self.services_cache): - if service.token == cache_service.token: - self.services_cache[index] = service - logger.info("Updated: %s", service) - return - # new entry - self.services_cache.append(service) - logger.info("Updated(new): %s", service) - - def _update_app_service_txn(self, txn, service): - as_id = self._get_as_id_txn(txn, service.token) - if not as_id: - logger.warning( - "update_app_service_txn: Failed to find as_id for token=", - service.token - ) - return False - - txn.execute( - "UPDATE application_services SET url=?, hs_token=?, sender=? " - "WHERE id=?", - (service.url, service.hs_token, service.sender, as_id,) - ) - # cleanup regex - txn.execute( - "DELETE FROM application_services_regex WHERE as_id=?", - (as_id,) - ) - for (ns_int, ns_str) in enumerate(ApplicationService.NS_LIST): - if ns_str in service.namespaces: - for regex_obj in service.namespaces[ns_str]: - txn.execute( - "INSERT INTO application_services_regex(" - "as_id, namespace, regex) values(?,?,?)", - (as_id, ns_int, simplejson.dumps(regex_obj)) - ) - return True - - def _get_as_id_txn(self, txn, token): - cursor = txn.execute( - "SELECT id FROM application_services WHERE token=?", - (token,) - ) - res = cursor.fetchone() - if res: - return res[0] - - @defer.inlineCallbacks def get_app_services(self): - yield self.cache_defer # make sure the cache is ready - defer.returnValue(self.services_cache) + return defer.succeed(self.services_cache) - @defer.inlineCallbacks def get_app_service_by_user_id(self, user_id): """Retrieve an application service from their user ID. @@ -174,37 +55,23 @@ class ApplicationServiceStore(SQLBaseStore): Returns: synapse.appservice.ApplicationService or None. """ - - yield self.cache_defer # make sure the cache is ready - for service in self.services_cache: if service.sender == user_id: - defer.returnValue(service) - return - defer.returnValue(None) + return defer.succeed(service) + return defer.succeed(None) - @defer.inlineCallbacks - def get_app_service_by_token(self, token, from_cache=True): + def get_app_service_by_token(self, token): """Get the application service with the given appservice token. Args: token (str): The application service token. - from_cache (bool): True to get this service from the cache, False to - check the database. - Raises: - StoreError if there was a problem retrieving this service. + Returns: + synapse.appservice.ApplicationService or None. """ - yield self.cache_defer # make sure the cache is ready - - if from_cache: - for service in self.services_cache: - if service.token == token: - defer.returnValue(service) - return - defer.returnValue(None) - - # TODO: The from_cache=False impl - # TODO: This should be JOINed with the application_services_regex table. + for service in self.services_cache: + if service.token == token: + return defer.succeed(service) + return defer.succeed(None) def get_app_service_rooms(self, service): """Get a list of RoomsForUser for this application service. @@ -277,12 +144,7 @@ class ApplicationServiceStore(SQLBaseStore): return rooms_for_user_matching_user_id - @defer.inlineCallbacks - def _populate_cache(self): - """Populates the ApplicationServiceCache from the database.""" - sql = ("SELECT * FROM application_services LEFT JOIN " - "application_services_regex ON application_services.id = " - "application_services_regex.as_id") + def _parse_services_dict(self, results): # SQL results in the form: # [ # { @@ -296,12 +158,14 @@ class ApplicationServiceStore(SQLBaseStore): # } # ] services = {} - results = yield self._execute_and_decode("_populate_cache", sql) for res in results: as_token = res["token"] + if as_token is None: + continue if as_token not in services: # add the service services[as_token] = { + "id": res["id"], "url": res["url"], "token": as_token, "hs_token": res["hs_token"], @@ -319,20 +183,289 @@ class ApplicationServiceStore(SQLBaseStore): try: services[as_token]["namespaces"][ ApplicationService.NS_LIST[ns_int]].append( - simplejson.loads(res["regex"]) + json.loads(res["regex"]) ) except IndexError: logger.error("Bad namespace enum '%s'. %s", ns_int, res) except JSONDecodeError: logger.error("Bad regex object '%s'", res["regex"]) - # TODO get last successful txn id f.e. service + service_list = [] for service in services.values(): - logger.info("Found application service: %s", service) - self.services_cache.append(ApplicationService( + service_list.append(ApplicationService( token=service["token"], url=service["url"], namespaces=service["namespaces"], hs_token=service["hs_token"], - sender=service["sender"] + sender=service["sender"], + id=service["id"] )) + return service_list + + def _load_appservice(self, as_info): + required_string_fields = [ + "url", "as_token", "hs_token", "sender_localpart" + ] + for field in required_string_fields: + if not isinstance(as_info.get(field), basestring): + raise KeyError("Required string field: '%s'", field) + + localpart = as_info["sender_localpart"] + if urllib.quote(localpart) != localpart: + raise ValueError( + "sender_localpart needs characters which are not URL encoded." + ) + user = UserID(localpart, self.hostname) + user_id = user.to_string() + + # namespace checks + if not isinstance(as_info.get("namespaces"), dict): + raise KeyError("Requires 'namespaces' object.") + for ns in ApplicationService.NS_LIST: + # specific namespaces are optional + if ns in as_info["namespaces"]: + # expect a list of dicts with exclusive and regex keys + for regex_obj in as_info["namespaces"][ns]: + if not isinstance(regex_obj, dict): + raise ValueError( + "Expected namespace entry in %s to be an object," + " but got %s", ns, regex_obj + ) + if not isinstance(regex_obj.get("regex"), basestring): + raise ValueError( + "Missing/bad type 'regex' key in %s", regex_obj + ) + if not isinstance(regex_obj.get("exclusive"), bool): + raise ValueError( + "Missing/bad type 'exclusive' key in %s", regex_obj + ) + return ApplicationService( + token=as_info["as_token"], + url=as_info["url"], + namespaces=as_info["namespaces"], + hs_token=as_info["hs_token"], + sender=user_id, + id=as_info["as_token"] # the token is the only unique thing here + ) + + def _populate_appservice_cache(self, config_files): + """Populates a cache of Application Services from the config files.""" + if not isinstance(config_files, list): + logger.warning( + "Expected %s to be a list of AS config files.", config_files + ) + return + + for config_file in config_files: + try: + with open(config_file, 'r') as f: + appservice = self._load_appservice(yaml.load(f)) + logger.info("Loaded application service: %s", appservice) + self.services_cache.append(appservice) + except Exception as e: + logger.error("Failed to load appservice from '%s'", config_file) + logger.exception(e) + + +class ApplicationServiceTransactionStore(SQLBaseStore): + + def __init__(self, hs): + super(ApplicationServiceTransactionStore, self).__init__(hs) + + @defer.inlineCallbacks + def get_appservices_by_state(self, state): + """Get a list of application services based on their state. + + Args: + state(ApplicationServiceState): The state to filter on. + Returns: + A Deferred which resolves to a list of ApplicationServices, which + may be empty. + """ + results = yield self._simple_select_list( + "application_services_state", + dict(state=state), + ["as_id"] + ) + # NB: This assumes this class is linked with ApplicationServiceStore + as_list = yield self.get_app_services() + services = [] + + for res in results: + for service in as_list: + if service.id == res["as_id"]: + services.append(service) + defer.returnValue(services) + + @defer.inlineCallbacks + def get_appservice_state(self, service): + """Get the application service state. + + Args: + service(ApplicationService): The service whose state to set. + Returns: + A Deferred which resolves to ApplicationServiceState. + """ + result = yield self._simple_select_one( + "application_services_state", + dict(as_id=service.id), + ["state"], + allow_none=True + ) + if result: + defer.returnValue(result.get("state")) + return + defer.returnValue(None) + + def set_appservice_state(self, service, state): + """Set the application service state. + + Args: + service(ApplicationService): The service whose state to set. + state(ApplicationServiceState): The connectivity state to apply. + Returns: + A Deferred which resolves when the state was set successfully. + """ + return self._simple_upsert( + "application_services_state", + dict(as_id=service.id), + dict(state=state) + ) + + def create_appservice_txn(self, service, events): + """Atomically creates a new transaction for this application service + with the given list of events. + + Args: + service(ApplicationService): The service who the transaction is for. + events(list<Event>): A list of events to put in the transaction. + Returns: + AppServiceTransaction: A new transaction. + """ + return self.runInteraction( + "create_appservice_txn", + self._create_appservice_txn, + service, events + ) + + def _create_appservice_txn(self, txn, service, events): + # work out new txn id (highest txn id for this service += 1) + # The highest id may be the last one sent (in which case it is last_txn) + # or it may be the highest in the txns list (which are waiting to be/are + # being sent) + last_txn_id = self._get_last_txn(txn, service.id) + + txn.execute( + "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", + (service.id,) + ) + highest_txn_id = txn.fetchone()[0] + if highest_txn_id is None: + highest_txn_id = 0 + + new_txn_id = max(highest_txn_id, last_txn_id) + 1 + + # Insert new txn into txn table + event_ids = json.dumps([e.event_id for e in events]) + txn.execute( + "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " + "VALUES(?,?,?)", + (service.id, new_txn_id, event_ids) + ) + return AppServiceTransaction( + service=service, id=new_txn_id, events=events + ) + + def complete_appservice_txn(self, txn_id, service): + """Completes an application service transaction. + + Args: + txn_id(str): The transaction ID being completed. + service(ApplicationService): The application service which was sent + this transaction. + Returns: + A Deferred which resolves if this transaction was stored + successfully. + """ + return self.runInteraction( + "complete_appservice_txn", + self._complete_appservice_txn, + txn_id, service + ) + + def _complete_appservice_txn(self, txn, txn_id, service): + txn_id = int(txn_id) + + # Debugging query: Make sure the txn being completed is EXACTLY +1 from + # what was there before. If it isn't, we've got problems (e.g. the AS + # has probably missed some events), so whine loudly but still continue, + # since it shouldn't fail completion of the transaction. + last_txn_id = self._get_last_txn(txn, service.id) + if (last_txn_id + 1) != txn_id: + logger.error( + "appservice: Completing a transaction which has an ID > 1 from " + "the last ID sent to this AS. We've either dropped events or " + "sent it to the AS out of order. FIX ME. last_txn=%s " + "completing_txn=%s service_id=%s", last_txn_id, txn_id, + service.id + ) + + # Set current txn_id for AS to 'txn_id' + self._simple_upsert_txn( + txn, "application_services_state", dict(as_id=service.id), + dict(last_txn=txn_id) + ) + + # Delete txn + self._simple_delete_txn( + txn, "application_services_txns", + dict(txn_id=txn_id, as_id=service.id) + ) + + def get_oldest_unsent_txn(self, service): + """Get the oldest transaction which has not been sent for this + service. + + Args: + service(ApplicationService): The app service to get the oldest txn. + Returns: + A Deferred which resolves to an AppServiceTransaction or + None. + """ + return self.runInteraction( + "get_oldest_unsent_appservice_txn", + self._get_oldest_unsent_txn, + service + ) + + def _get_oldest_unsent_txn(self, txn, service): + # Monotonically increasing txn ids, so just select the smallest + # one in the txns table (we delete them when they are sent) + txn.execute( + "SELECT * FROM application_services_txns WHERE as_id=?" + " ORDER BY txn_id ASC LIMIT 1", + (service.id,) + ) + rows = self.cursor_to_dict(txn) + if not rows: + return None + + entry = rows[0] + + event_ids = json.loads(entry["event_ids"]) + events = self._get_events_txn(txn, event_ids) + + return AppServiceTransaction( + service=service, id=entry["txn_id"], events=events + ) + + def _get_last_txn(self, txn, service_id): + txn.execute( + "SELECT last_txn FROM application_services_state WHERE as_id=?", + (service_id,) + ) + last_txn_id = txn.fetchone() + if last_txn_id is None or last_txn_id[0] is None: # no row exists + return 0 + else: + return int(last_txn_id[0]) # select 'last_txn' col diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 68b7d59693..2b2bdf8615 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached from synapse.api.errors import SynapseError @@ -21,8 +21,6 @@ from twisted.internet import defer from collections import namedtuple -import sqlite3 - RoomAliasMapping = namedtuple( "RoomAliasMapping", @@ -48,6 +46,7 @@ class DirectoryStore(SQLBaseStore): {"room_alias": room_alias.to_string()}, "room_id", allow_none=True, + desc="get_association_from_room_alias", ) if not room_id: @@ -58,6 +57,7 @@ class DirectoryStore(SQLBaseStore): "room_alias_servers", {"room_alias": room_alias.to_string()}, "server", + desc="get_association_from_room_alias", ) if not servers: @@ -87,8 +87,9 @@ class DirectoryStore(SQLBaseStore): "room_alias": room_alias.to_string(), "room_id": room_id, }, + desc="create_room_alias_association", ) - except sqlite3.IntegrityError: + except self.database_engine.module.IntegrityError: raise SynapseError( 409, "Room alias %s already exists" % room_alias.to_string() ) @@ -100,23 +101,29 @@ class DirectoryStore(SQLBaseStore): { "room_alias": room_alias.to_string(), "server": server, - } + }, + desc="create_room_alias_association", ) + self.get_aliases_for_room.invalidate(room_id) + @defer.inlineCallbacks def delete_room_alias(self, room_alias): - return self.runInteraction( + room_id = yield self.runInteraction( "delete_room_alias", self._delete_room_alias_txn, room_alias, ) + self.get_aliases_for_room.invalidate(room_id) + defer.returnValue(room_id) + def _delete_room_alias_txn(self, txn, room_alias): - cursor = txn.execute( + txn.execute( "SELECT room_id FROM room_aliases WHERE room_alias = ?", (room_alias.to_string(),) ) - res = cursor.fetchone() + res = txn.fetchone() if res: room_id = res[0] else: @@ -134,9 +141,11 @@ class DirectoryStore(SQLBaseStore): return room_id + @cached() def get_aliases_for_room(self, room_id): return self._simple_select_onecol( "room_aliases", {"room_id": room_id}, "room_alias", + desc="get_aliases_for_room", ) diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py new file mode 100644 index 0000000000..bd3c8f9452 --- /dev/null +++ b/synapse/storage/engines/__init__.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import IncorrectDatabaseSetup +from .postgres import PostgresEngine +from .sqlite3 import Sqlite3Engine + +import importlib + + +SUPPORTED_MODULE = { + "sqlite3": Sqlite3Engine, + "psycopg2": PostgresEngine, +} + + +def create_engine(name): + engine_class = SUPPORTED_MODULE.get(name, None) + + if engine_class: + module = importlib.import_module(name) + return engine_class(module) + + raise RuntimeError( + "Unsupported database engine '%s'" % (name,) + ) + + +__all__ = ["create_engine", "IncorrectDatabaseSetup"] diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py new file mode 100644 index 0000000000..0b549d314b --- /dev/null +++ b/synapse/storage/engines/_base.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class IncorrectDatabaseSetup(RuntimeError): + pass diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py new file mode 100644 index 0000000000..a323028546 --- /dev/null +++ b/synapse/storage/engines/postgres.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage import prepare_database + +from ._base import IncorrectDatabaseSetup + + +class PostgresEngine(object): + def __init__(self, database_module): + self.module = database_module + self.module.extensions.register_type(self.module.extensions.UNICODE) + + def check_database(self, txn): + txn.execute("SHOW SERVER_ENCODING") + rows = txn.fetchall() + if rows and rows[0][0] != "UTF8": + raise IncorrectDatabaseSetup( + "Database has incorrect encoding: '%s' instead of 'UTF8'\n" + "See docs/postgres.rst for more information." + % (rows[0][0],) + ) + + def convert_param_style(self, sql): + return sql.replace("?", "%s") + + def on_new_connection(self, db_conn): + db_conn.set_isolation_level( + self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ + ) + + def prepare_database(self, db_conn): + prepare_database(db_conn, self) + + def is_deadlock(self, error): + if isinstance(error, self.module.DatabaseError): + return error.pgcode in ["40001", "40P01"] + return False + + def is_connection_closed(self, conn): + return bool(conn.closed) + + def lock_table(self, txn, table): + txn.execute("LOCK TABLE %s in EXCLUSIVE MODE" % (table,)) diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py new file mode 100644 index 0000000000..ff13d8006a --- /dev/null +++ b/synapse/storage/engines/sqlite3.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage import prepare_database, prepare_sqlite3_database + + +class Sqlite3Engine(object): + def __init__(self, database_module): + self.module = database_module + + def check_database(self, txn): + pass + + def convert_param_style(self, sql): + return sql + + def on_new_connection(self, db_conn): + self.prepare_database(db_conn) + + def prepare_database(self, db_conn): + prepare_sqlite3_database(db_conn) + prepare_database(db_conn, self) + + def is_deadlock(self, error): + return False + + def is_connection_closed(self, conn): + return False + + def lock_table(self, txn, table): + return diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 032334bfd6..74b4e23590 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached from syutil.base64util import encode_base64 import logging @@ -96,11 +96,23 @@ class EventFederationStore(SQLBaseStore): room_id, ) + @cached() + def get_latest_event_ids_in_room(self, room_id): + return self._simple_select_onecol( + table="event_forward_extremities", + keyvalues={ + "room_id": room_id, + }, + retcol="event_id", + desc="get_latest_event_ids_in_room", + ) + def _get_latest_events_in_room(self, txn, room_id): sql = ( "SELECT e.event_id, e.depth FROM events as e " "INNER JOIN event_forward_extremities as f " "ON e.event_id = f.event_id " + "AND e.room_id = f.room_id " "WHERE f.room_id = ?" ) @@ -153,7 +165,7 @@ class EventFederationStore(SQLBaseStore): results = self._get_prev_events_and_state( txn, event_id, - is_state=1, + is_state=True, ) return [(e_id, h, ) for e_id, h, _ in results] @@ -164,7 +176,7 @@ class EventFederationStore(SQLBaseStore): } if is_state is not None: - keyvalues["is_state"] = is_state + keyvalues["is_state"] = bool(is_state) res = self._simple_select_list_txn( txn, @@ -242,7 +254,6 @@ class EventFederationStore(SQLBaseStore): "room_id": room_id, "min_depth": depth, }, - or_replace=True, ) def _handle_prev_events(self, txn, outlier, event_id, prev_events, @@ -251,19 +262,19 @@ class EventFederationStore(SQLBaseStore): For the given event, update the event edges table and forward and backward extremities tables. """ - for e_id, _ in prev_events: - # TODO (erikj): This could be done as a bulk insert - self._simple_insert_txn( - txn, - table="event_edges", - values={ + self._simple_insert_many_txn( + txn, + table="event_edges", + values=[ + { "event_id": event_id, "prev_event_id": e_id, "room_id": room_id, - "is_state": 0, - }, - or_ignore=True, - ) + "is_state": False, + } + for e_id, _ in prev_events + ], + ) # Update the extremities table if this is not an outlier. if not outlier: @@ -281,33 +292,33 @@ class EventFederationStore(SQLBaseStore): # We only insert as a forward extremity the new event if there are # no other events that reference it as a prev event query = ( - "INSERT OR IGNORE INTO %(table)s (event_id, room_id) " - "SELECT ?, ? WHERE NOT EXISTS (" - "SELECT 1 FROM %(event_edges)s WHERE " - "prev_event_id = ? " - ")" - ) % { - "table": "event_forward_extremities", - "event_edges": "event_edges", - } + "SELECT 1 FROM event_edges WHERE prev_event_id = ?" + ) - logger.debug("query: %s", query) + txn.execute(query, (event_id,)) + + if not txn.fetchone(): + query = ( + "INSERT INTO event_forward_extremities" + " (event_id, room_id)" + " VALUES (?, ?)" + ) - txn.execute(query, (event_id, room_id, event_id)) + txn.execute(query, (event_id, room_id)) # Insert all the prev_events as a backwards thing, they'll get # deleted in a second if they're incorrect anyway. - for e_id, _ in prev_events: - # TODO (erikj): This could be done as a bulk insert - self._simple_insert_txn( - txn, - table="event_backward_extremities", - values={ + self._simple_insert_many_txn( + txn, + table="event_backward_extremities", + values=[ + { "event_id": e_id, "room_id": room_id, - }, - or_ignore=True, - ) + } + for e_id, _ in prev_events + ], + ) # Also delete from the backwards extremities table all ones that # reference events that we have already seen @@ -321,6 +332,10 @@ class EventFederationStore(SQLBaseStore): ) txn.execute(query) + txn.call_after( + self.get_latest_event_ids_in_room.invalidate, room_id + ) + def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occurred before (and including) the events in event_list. Return a list of max size `limit` @@ -400,7 +415,7 @@ class EventFederationStore(SQLBaseStore): query = ( "SELECT prev_event_id FROM event_edges " - "WHERE room_id = ? AND event_id = ? AND is_state = 0 " + "WHERE room_id = ? AND event_id = ? AND is_state = ? " "LIMIT ?" ) @@ -409,7 +424,7 @@ class EventFederationStore(SQLBaseStore): for event_id in front: txn.execute( query, - (room_id, event_id, limit - len(event_results)) + (room_id, event_id, False, limit - len(event_results)) ) for e_id, in txn.fetchall(): diff --git a/synapse/storage/events.py b/synapse/storage/events.py new file mode 100644 index 0000000000..38395c66ab --- /dev/null +++ b/synapse/storage/events.py @@ -0,0 +1,391 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from _base import SQLBaseStore, _RollbackButIsFineException + +from twisted.internet import defer + +from synapse.util.logutils import log_function +from synapse.api.constants import EventTypes +from synapse.crypto.event_signing import compute_event_reference_hash + +from syutil.base64util import decode_base64 +from syutil.jsonutil import encode_canonical_json + +import logging + +logger = logging.getLogger(__name__) + + +class EventsStore(SQLBaseStore): + @defer.inlineCallbacks + @log_function + def persist_event(self, event, context, backfilled=False, + is_new_state=True, current_state=None): + stream_ordering = None + if backfilled: + if not self.min_token_deferred.called: + yield self.min_token_deferred + self.min_token -= 1 + stream_ordering = self.min_token + + try: + yield self.runInteraction( + "persist_event", + self._persist_event_txn, + event=event, + context=context, + backfilled=backfilled, + stream_ordering=stream_ordering, + is_new_state=is_new_state, + current_state=current_state, + ) + except _RollbackButIsFineException: + pass + + @defer.inlineCallbacks + def get_event(self, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False, + allow_none=False): + """Get an event from the database by event_id. + + Args: + event_id (str): The event_id of the event to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + allow_none (bool): If True, return None if no event found, if + False throw an exception. + + Returns: + Deferred : A FrozenEvent. + """ + event = yield self.runInteraction( + "get_event", self._get_event_txn, + event_id, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + if not event and not allow_none: + raise RuntimeError("Could not find event %s" % (event_id,)) + + defer.returnValue(event) + + @log_function + def _persist_event_txn(self, txn, event, context, backfilled, + stream_ordering=None, is_new_state=True, + current_state=None): + + # Remove the any existing cache entries for the event_id + txn.call_after(self._invalidate_get_event_cache, event.event_id) + + if stream_ordering is None: + with self._stream_id_gen.get_next_txn(txn) as stream_ordering: + return self._persist_event_txn( + txn, event, context, backfilled, + stream_ordering=stream_ordering, + is_new_state=is_new_state, + current_state=current_state, + ) + + # We purposefully do this first since if we include a `current_state` + # key, we *want* to update the `current_state_events` table + if current_state: + self._simple_delete_txn( + txn, + table="current_state_events", + keyvalues={"room_id": event.room_id}, + ) + + for s in current_state: + if s.type == EventTypes.Member: + txn.call_after( + self.get_rooms_for_user.invalidate, s.state_key + ) + txn.call_after( + self.get_joined_hosts_for_room.invalidate, s.room_id + ) + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": s.event_id, + "room_id": s.room_id, + "type": s.type, + "state_key": s.state_key, + } + ) + + outlier = event.internal_metadata.is_outlier() + + if not outlier: + self._store_state_groups_txn(txn, event, context) + + self._update_min_depth_for_room_txn( + txn, + event.room_id, + event.depth + ) + + have_persisted = self._simple_select_one_onecol_txn( + txn, + table="event_json", + keyvalues={"event_id": event.event_id}, + retcol="event_id", + allow_none=True, + ) + + metadata_json = encode_canonical_json( + event.internal_metadata.get_dict() + ).decode("UTF-8") + + # If we have already persisted this event, we don't need to do any + # more processing. + # The processing above must be done on every call to persist event, + # since they might not have happened on previous calls. For example, + # if we are persisting an event that we had persisted as an outlier, + # but is no longer one. + if have_persisted: + if not outlier: + sql = ( + "UPDATE event_json SET internal_metadata = ?" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (metadata_json, event.event_id,) + ) + + sql = ( + "UPDATE events SET outlier = ?" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (False, event.event_id,) + ) + return + + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + + if event.type == EventTypes.Member: + self._store_room_member_txn(txn, event) + elif event.type == EventTypes.Name: + self._store_room_name_txn(txn, event) + elif event.type == EventTypes.Topic: + self._store_room_topic_txn(txn, event) + elif event.type == EventTypes.Redaction: + self._store_redaction(txn, event) + + event_dict = { + k: v + for k, v in event.get_dict().items() + if k not in [ + "redacted", + "redacted_because", + ] + } + + self._simple_insert_txn( + txn, + table="event_json", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "internal_metadata": metadata_json, + "json": encode_canonical_json(event_dict).decode("UTF-8"), + }, + ) + + content = encode_canonical_json( + event.content + ).decode("UTF-8") + + vals = { + "topological_ordering": event.depth, + "event_id": event.event_id, + "type": event.type, + "room_id": event.room_id, + "content": content, + "processed": True, + "outlier": outlier, + "depth": event.depth, + } + + unrec = { + k: v + for k, v in event.get_dict().items() + if k not in vals.keys() and k not in [ + "redacted", + "redacted_because", + "signatures", + "hashes", + "prev_events", + ] + } + + vals["unrecognized_keys"] = encode_canonical_json( + unrec + ).decode("UTF-8") + + sql = ( + "INSERT INTO events" + " (stream_ordering, topological_ordering, event_id, type," + " room_id, content, processed, outlier, depth)" + " VALUES (?,?,?,?,?,?,?,?,?)" + ) + + txn.execute( + sql, + ( + stream_ordering, event.depth, event.event_id, event.type, + event.room_id, content, True, outlier, event.depth + ) + ) + + if context.rejected: + self._store_rejections_txn( + txn, event.event_id, context.rejected + ) + + for hash_alg, hash_base64 in event.hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_event_content_hash_txn( + txn, event.event_id, hash_alg, hash_bytes, + ) + + for prev_event_id, prev_hashes in event.prev_events: + for alg, hash_base64 in prev_hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_prev_event_hash_txn( + txn, event.event_id, prev_event_id, alg, + hash_bytes + ) + + self._simple_insert_many_txn( + txn, + table="event_auth", + values=[ + { + "event_id": event.event_id, + "room_id": event.room_id, + "auth_id": auth_id, + } + for auth_id, _ in event.auth_events + ], + ) + + (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) + self._store_event_reference_hash_txn( + txn, event.event_id, ref_alg, ref_hash_bytes + ) + + if event.is_state(): + vals = { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + + # TODO: How does this work with backfilling? + if hasattr(event, "replaces_state"): + vals["prev_state"] = event.replaces_state + + self._simple_insert_txn( + txn, + "state_events", + vals, + ) + + self._simple_insert_many_txn( + txn, + table="event_edges", + values=[ + { + "event_id": event.event_id, + "prev_event_id": e_id, + "room_id": event.room_id, + "is_state": True, + } + for e_id, h in event.prev_state + ], + ) + + if is_new_state and not context.rejected: + self._simple_upsert_txn( + txn, + "current_state_events", + keyvalues={ + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + values={ + "event_id": event.event_id, + } + ) + + return + + def _store_redaction(self, txn, event): + # invalidate the cache for the redacted event + txn.call_after(self._invalidate_get_event_cache, event.redacts) + txn.execute( + "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", + (event.event_id, event.redacts) + ) + + def have_events(self, event_ids): + """Given a list of event ids, check if we have already processed them. + + Returns: + dict: Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps to + None. + """ + if not event_ids: + return defer.succeed({}) + + def f(txn): + sql = ( + "SELECT e.event_id, reason FROM events as e " + "LEFT JOIN rejections as r ON e.event_id = r.event_id " + "WHERE e.event_id = ?" + ) + + res = {} + for event_id in event_ids: + txn.execute(sql, (event_id,)) + row = txn.fetchone() + if row: + _, rejected = row + res[event_id] = rejected + + return res + + return self.runInteraction( + "have_events", f, + ) diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py deleted file mode 100644 index 8eab769b71..0000000000 --- a/synapse/storage/feedback.py +++ /dev/null @@ -1,47 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from twisted.internet import defer - -from ._base import SQLBaseStore - - -class FeedbackStore(SQLBaseStore): - - def _store_feedback_txn(self, txn, event): - self._simple_insert_txn(txn, "feedback", { - "event_id": event.event_id, - "feedback_type": event.content["type"], - "room_id": event.room_id, - "target_event_id": event.content["target_event_id"], - "sender": event.user_id, - }) - - @defer.inlineCallbacks - def get_feedback_for_event(self, event_id): - sql = ( - "SELECT events.* FROM events INNER JOIN feedback " - "ON events.event_id = feedback.event_id " - "WHERE feedback.target_event_id = ? " - ) - - rows = yield self._execute_and_decode("get_feedback_for_event", sql, event_id) - - defer.returnValue( - [ - (yield self._parse_events(r)) - for r in rows - ] - ) diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index 457a11fd02..8800116570 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -31,6 +31,7 @@ class FilteringStore(SQLBaseStore): }, retcol="filter_json", allow_none=False, + desc="get_user_filter", ) defer.returnValue(json.loads(def_json)) diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 09d1e63657..5bdf497b93 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -57,16 +57,18 @@ class KeyStore(SQLBaseStore): OpenSSL.crypto.FILETYPE_ASN1, tls_certificate ) fingerprint = hashlib.sha256(tls_certificate_bytes).hexdigest() - return self._simple_insert( + return self._simple_upsert( table="server_tls_certificates", - values={ + keyvalues={ "server_name": server_name, "fingerprint": fingerprint, + }, + values={ "from_server": from_server, "ts_added_ms": time_now_ms, "tls_certificate": buffer(tls_certificate_bytes), }, - or_ignore=True, + desc="store_server_certificate", ) @defer.inlineCallbacks @@ -107,14 +109,85 @@ class KeyStore(SQLBaseStore): ts_now_ms (int): The time now in milliseconds verification_key (VerifyKey): The NACL verify key. """ - return self._simple_insert( + return self._simple_upsert( table="server_signature_keys", - values={ + keyvalues={ "server_name": server_name, "key_id": "%s:%s" % (verify_key.alg, verify_key.version), + }, + values={ "from_server": from_server, "ts_added_ms": time_now_ms, "verify_key": buffer(verify_key.encode()), }, - or_ignore=True, + desc="store_server_verify_key", + ) + + def store_server_keys_json(self, server_name, key_id, from_server, + ts_now_ms, ts_expires_ms, key_json_bytes): + """Stores the JSON bytes for a set of keys from a server + The JSON should be signed by the originating server, the intermediate + server, and by this server. Updates the value for the + (server_name, key_id, from_server) triplet if one already existed. + Args: + server_name (str): The name of the server. + key_id (str): The identifer of the key this JSON is for. + from_server (str): The server this JSON was fetched from. + ts_now_ms (int): The time now in milliseconds. + ts_valid_until_ms (int): The time when this json stops being valid. + key_json (bytes): The encoded JSON. + """ + return self._simple_upsert( + table="server_keys_json", + keyvalues={ + "server_name": server_name, + "key_id": key_id, + "from_server": from_server, + }, + values={ + "server_name": server_name, + "key_id": key_id, + "from_server": from_server, + "ts_added_ms": ts_now_ms, + "ts_valid_until_ms": ts_expires_ms, + "key_json": buffer(key_json_bytes), + }, + ) + + def get_server_keys_json(self, server_keys): + """Retrive the key json for a list of server_keys and key ids. + If no keys are found for a given server, key_id and source then + that server, key_id, and source triplet entry will be an empty list. + The JSON is returned as a byte array so that it can be efficiently + used in an HTTP response. + Args: + server_keys (list): List of (server_name, key_id, source) triplets. + Returns: + Dict mapping (server_name, key_id, source) triplets to dicts with + "ts_valid_until_ms" and "key_json" keys. + """ + def _get_server_keys_json_txn(txn): + results = {} + for server_name, key_id, from_server in server_keys: + keyvalues = {"server_name": server_name} + if key_id is not None: + keyvalues["key_id"] = key_id + if from_server is not None: + keyvalues["from_server"] = from_server + rows = self._simple_select_list_txn( + txn, + "server_keys_json", + keyvalues=keyvalues, + retcols=( + "key_id", + "from_server", + "ts_added_ms", + "ts_valid_until_ms", + "key_json", + ), + ) + results[(server_name, key_id, from_server)] = rows + return results + return self.runInteraction( + "get_server_keys_json", _get_server_keys_json_txn ) diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 7101d2beec..7bf57234f6 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -32,6 +32,7 @@ class MediaRepositoryStore(SQLBaseStore): {"media_id": media_id}, ("media_type", "media_length", "upload_name", "created_ts"), allow_none=True, + desc="get_local_media", ) def store_local_media(self, media_id, media_type, time_now_ms, upload_name, @@ -45,7 +46,8 @@ class MediaRepositoryStore(SQLBaseStore): "upload_name": upload_name, "media_length": media_length, "user_id": user_id.to_string(), - } + }, + desc="store_local_media", ) def get_local_media_thumbnails(self, media_id): @@ -55,7 +57,8 @@ class MediaRepositoryStore(SQLBaseStore): ( "thumbnail_width", "thumbnail_height", "thumbnail_method", "thumbnail_type", "thumbnail_length", - ) + ), + desc="get_local_media_thumbnails", ) def store_local_thumbnail(self, media_id, thumbnail_width, @@ -70,7 +73,8 @@ class MediaRepositoryStore(SQLBaseStore): "thumbnail_method": thumbnail_method, "thumbnail_type": thumbnail_type, "thumbnail_length": thumbnail_length, - } + }, + desc="store_local_thumbnail", ) def get_cached_remote_media(self, origin, media_id): @@ -82,6 +86,7 @@ class MediaRepositoryStore(SQLBaseStore): "filesystem_id", ), allow_none=True, + desc="get_cached_remote_media", ) def store_cached_remote_media(self, origin, media_id, media_type, @@ -97,7 +102,8 @@ class MediaRepositoryStore(SQLBaseStore): "created_ts": time_now_ms, "upload_name": upload_name, "filesystem_id": filesystem_id, - } + }, + desc="store_cached_remote_media", ) def get_remote_media_thumbnails(self, origin, media_id): @@ -107,7 +113,8 @@ class MediaRepositoryStore(SQLBaseStore): ( "thumbnail_width", "thumbnail_height", "thumbnail_method", "thumbnail_type", "thumbnail_length", "filesystem_id", - ) + ), + desc="get_remote_media_thumbnails", ) def store_remote_media_thumbnail(self, origin, media_id, filesystem_id, @@ -125,5 +132,6 @@ class MediaRepositoryStore(SQLBaseStore): "thumbnail_type": thumbnail_type, "thumbnail_length": thumbnail_length, "filesystem_id": filesystem_id, - } + }, + desc="store_remote_media_thumbnail", ) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 1dcd34723b..22ec94bc16 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -21,6 +21,7 @@ class PresenceStore(SQLBaseStore): return self._simple_insert( table="presence", values={"user_id": user_localpart}, + desc="create_presence", ) def has_presence_state(self, user_localpart): @@ -29,6 +30,7 @@ class PresenceStore(SQLBaseStore): keyvalues={"user_id": user_localpart}, retcols=["user_id"], allow_none=True, + desc="has_presence_state", ) def get_presence_state(self, user_localpart): @@ -36,6 +38,7 @@ class PresenceStore(SQLBaseStore): table="presence", keyvalues={"user_id": user_localpart}, retcols=["state", "status_msg", "mtime"], + desc="get_presence_state", ) def set_presence_state(self, user_localpart, new_state): @@ -45,7 +48,7 @@ class PresenceStore(SQLBaseStore): updatevalues={"state": new_state["state"], "status_msg": new_state["status_msg"], "mtime": self._clock.time_msec()}, - retcols=["state"], + desc="set_presence_state", ) def allow_presence_visible(self, observed_localpart, observer_userid): @@ -53,6 +56,8 @@ class PresenceStore(SQLBaseStore): table="presence_allow_inbound", values={"observed_user_id": observed_localpart, "observer_user_id": observer_userid}, + desc="allow_presence_visible", + or_ignore=True, ) def disallow_presence_visible(self, observed_localpart, observer_userid): @@ -60,6 +65,7 @@ class PresenceStore(SQLBaseStore): table="presence_allow_inbound", keyvalues={"observed_user_id": observed_localpart, "observer_user_id": observer_userid}, + desc="disallow_presence_visible", ) def is_presence_visible(self, observed_localpart, observer_userid): @@ -69,6 +75,7 @@ class PresenceStore(SQLBaseStore): "observer_user_id": observer_userid}, retcols=["observed_user_id"], allow_none=True, + desc="is_presence_visible", ) def add_presence_list_pending(self, observer_localpart, observed_userid): @@ -77,6 +84,7 @@ class PresenceStore(SQLBaseStore): values={"user_id": observer_localpart, "observed_user_id": observed_userid, "accepted": False}, + desc="add_presence_list_pending", ) def set_presence_list_accepted(self, observer_localpart, observed_userid): @@ -85,6 +93,7 @@ class PresenceStore(SQLBaseStore): keyvalues={"user_id": observer_localpart, "observed_user_id": observed_userid}, updatevalues={"accepted": True}, + desc="set_presence_list_accepted", ) def get_presence_list(self, observer_localpart, accepted=None): @@ -96,6 +105,7 @@ class PresenceStore(SQLBaseStore): table="presence_list", keyvalues=keyvalues, retcols=["observed_user_id", "accepted"], + desc="get_presence_list", ) def del_presence_list(self, observer_localpart, observed_userid): @@ -103,4 +113,5 @@ class PresenceStore(SQLBaseStore): table="presence_list", keyvalues={"user_id": observer_localpart, "observed_user_id": observed_userid}, + desc="del_presence_list", ) diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index 153c7ad027..a6e52cb248 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -21,6 +21,7 @@ class ProfileStore(SQLBaseStore): return self._simple_insert( table="profiles", values={"user_id": user_localpart}, + desc="create_profile", ) def get_profile_displayname(self, user_localpart): @@ -28,6 +29,7 @@ class ProfileStore(SQLBaseStore): table="profiles", keyvalues={"user_id": user_localpart}, retcol="displayname", + desc="get_profile_displayname", ) def set_profile_displayname(self, user_localpart, new_displayname): @@ -35,6 +37,7 @@ class ProfileStore(SQLBaseStore): table="profiles", keyvalues={"user_id": user_localpart}, updatevalues={"displayname": new_displayname}, + desc="set_profile_displayname", ) def get_profile_avatar_url(self, user_localpart): @@ -42,6 +45,7 @@ class ProfileStore(SQLBaseStore): table="profiles", keyvalues={"user_id": user_localpart}, retcol="avatar_url", + desc="get_profile_avatar_url", ) def set_profile_avatar_url(self, user_localpart, new_avatar_url): @@ -49,4 +53,5 @@ class ProfileStore(SQLBaseStore): table="profiles", keyvalues={"user_id": user_localpart}, updatevalues={"avatar_url": new_avatar_url}, + desc="set_profile_avatar_url", ) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index d769db2c78..ee7718d5ed 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -50,7 +50,8 @@ class PushRuleStore(SQLBaseStore): results = yield self._simple_select_list( PushRuleEnableTable.table_name, {'user_name': user_name}, - PushRuleEnableTable.fields + PushRuleEnableTable.fields, + desc="get_push_rules_enabled_for_user", ) defer.returnValue( {r['rule_id']: False if r['enabled'] == 0 else True for r in results} @@ -153,7 +154,7 @@ class PushRuleStore(SQLBaseStore): txn.execute(sql, (user_name, priority_class, new_rule_priority)) # now insert the new rule - sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" (" + sql = "INSERT INTO "+PushRuleTable.table_name+" (" sql += ",".join(new_rule.keys())+") VALUES (" sql += ", ".join(["?" for _ in new_rule.keys()])+")" @@ -182,7 +183,7 @@ class PushRuleStore(SQLBaseStore): new_rule['priority_class'] = priority_class new_rule['priority'] = new_prio - sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" (" + sql = "INSERT INTO "+PushRuleTable.table_name+" (" sql += ",".join(new_rule.keys())+") VALUES (" sql += ", ".join(["?" for _ in new_rule.keys()])+")" @@ -201,7 +202,8 @@ class PushRuleStore(SQLBaseStore): """ yield self._simple_delete_one( PushRuleTable.table_name, - {'user_name': user_name, 'rule_id': rule_id} + {'user_name': user_name, 'rule_id': rule_id}, + desc="delete_push_rule", ) @defer.inlineCallbacks @@ -209,7 +211,8 @@ class PushRuleStore(SQLBaseStore): yield self._simple_upsert( PushRuleEnableTable.table_name, {'user_name': user_name, 'rule_id': rule_id}, - {'enabled': enabled} + {'enabled': enabled}, + desc="set_push_rule_enabled", ) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 587dada68f..08ea62681b 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -13,162 +13,141 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections - from ._base import SQLBaseStore, Table from twisted.internet import defer from synapse.api.errors import StoreError +from syutil.jsonutil import encode_canonical_json + import logging +import simplejson as json +import types logger = logging.getLogger(__name__) class PusherStore(SQLBaseStore): + def _decode_pushers_rows(self, rows): + for r in rows: + dataJson = r['data'] + r['data'] = None + try: + if isinstance(dataJson, types.BufferType): + dataJson = str(dataJson).decode("UTF8") + + r['data'] = json.loads(dataJson) + except Exception as e: + logger.warn( + "Invalid JSON in data for pusher %d: %s, %s", + r['id'], dataJson, e.message, + ) + pass + + if isinstance(r['pushkey'], types.BufferType): + r['pushkey'] = str(r['pushkey']).decode("UTF8") + + return rows + @defer.inlineCallbacks - def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey): - sql = ( - "SELECT id, user_name, kind, profile_tag, app_id," - "app_display_name, device_display_name, pushkey, ts, data, " - "last_token, last_success, failing_since " - "FROM pushers " - "WHERE app_id = ? AND pushkey = ?" - ) + def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey): + def r(txn): + sql = ( + "SELECT * FROM pushers" + " WHERE app_id = ? AND pushkey = ?" + ) - rows = yield self._execute( - "get_pushers_by_app_id_and_pushkey", None, sql, - app_id_and_pushkey[0], app_id_and_pushkey[1] + txn.execute(sql, (app_id, pushkey,)) + rows = self.cursor_to_dict(txn) + + return self._decode_pushers_rows(rows) + + rows = yield self.runInteraction( + "get_pushers_by_app_id_and_pushkey", r ) - ret = [ - { - "id": r[0], - "user_name": r[1], - "kind": r[2], - "profile_tag": r[3], - "app_id": r[4], - "app_display_name": r[5], - "device_display_name": r[6], - "pushkey": r[7], - "pushkey_ts": r[8], - "data": r[9], - "last_token": r[10], - "last_success": r[11], - "failing_since": r[12] - } - for r in rows - ] - - defer.returnValue(ret[0]) + defer.returnValue(rows) @defer.inlineCallbacks def get_all_pushers(self): - sql = ( - "SELECT id, user_name, kind, profile_tag, app_id," - "app_display_name, device_display_name, pushkey, ts, data, " - "last_token, last_success, failing_since " - "FROM pushers" - ) + def get_pushers(txn): + txn.execute("SELECT * FROM pushers") + rows = self.cursor_to_dict(txn) - rows = yield self._execute("get_all_pushers", None, sql) - - ret = [ - { - "id": r[0], - "user_name": r[1], - "kind": r[2], - "profile_tag": r[3], - "app_id": r[4], - "app_display_name": r[5], - "device_display_name": r[6], - "pushkey": r[7], - "pushkey_ts": r[8], - "data": r[9], - "last_token": r[10], - "last_success": r[11], - "failing_since": r[12] - } - for r in rows - ] - - defer.returnValue(ret) + return self._decode_pushers_rows(rows) + + rows = yield self.runInteraction("get_all_pushers", get_pushers) + defer.returnValue(rows) @defer.inlineCallbacks - def add_pusher(self, user_name, profile_tag, kind, app_id, + def add_pusher(self, user_name, access_token, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, pushkey_ts, lang, data): try: + next_id = yield self._pushers_id_gen.get_next() yield self._simple_upsert( PushersTable.table_name, dict( app_id=app_id, pushkey=pushkey, + user_name=user_name, ), dict( - user_name=user_name, + access_token=access_token, kind=kind, profile_tag=profile_tag, app_display_name=app_display_name, device_display_name=device_display_name, ts=pushkey_ts, lang=lang, - data=data - )) + data=encode_canonical_json(data), + ), + insertion_values=dict( + id=next_id, + ), + desc="add_pusher", + ) except Exception as e: logger.error("create_pusher with failed: %s", e) raise StoreError(500, "Problem creating pusher.") @defer.inlineCallbacks - def delete_pusher_by_app_id_pushkey(self, app_id, pushkey): + def delete_pusher_by_app_id_pushkey_user_name(self, app_id, pushkey, user_name): yield self._simple_delete_one( PushersTable.table_name, - dict(app_id=app_id, pushkey=pushkey) + {"app_id": app_id, "pushkey": pushkey, 'user_name': user_name}, + desc="delete_pusher_by_app_id_pushkey_user_name", ) @defer.inlineCallbacks - def update_pusher_last_token(self, app_id, pushkey, last_token): + def update_pusher_last_token(self, app_id, pushkey, user_name, last_token): yield self._simple_update_one( PushersTable.table_name, - {'app_id': app_id, 'pushkey': pushkey}, - {'last_token': last_token} + {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name}, + {'last_token': last_token}, + desc="update_pusher_last_token", ) @defer.inlineCallbacks - def update_pusher_last_token_and_success(self, app_id, pushkey, + def update_pusher_last_token_and_success(self, app_id, pushkey, user_name, last_token, last_success): yield self._simple_update_one( PushersTable.table_name, - {'app_id': app_id, 'pushkey': pushkey}, - {'last_token': last_token, 'last_success': last_success} + {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name}, + {'last_token': last_token, 'last_success': last_success}, + desc="update_pusher_last_token_and_success", ) @defer.inlineCallbacks - def update_pusher_failing_since(self, app_id, pushkey, failing_since): + def update_pusher_failing_since(self, app_id, pushkey, user_name, + failing_since): yield self._simple_update_one( PushersTable.table_name, - {'app_id': app_id, 'pushkey': pushkey}, - {'failing_since': failing_since} + {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name}, + {'failing_since': failing_since}, + desc="update_pusher_failing_since", ) class PushersTable(Table): table_name = "pushers" - - fields = [ - "id", - "user_name", - "kind", - "profile_tag", - "app_id", - "app_display_name", - "device_display_name", - "pushkey", - "pushkey_ts", - "data", - "last_token", - "last_success", - "failing_since" - ] - - EntryType = collections.namedtuple("PusherEntry", fields) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 3c2f1d6a15..90e2606be2 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -15,8 +15,6 @@ from twisted.internet import defer -from sqlite3 import IntegrityError - from synapse.api.errors import StoreError, Codes from ._base import SQLBaseStore, cached @@ -39,16 +37,16 @@ class RegistrationStore(SQLBaseStore): Raises: StoreError if there was a problem adding this. """ - row = yield self._simple_select_one("users", {"name": user_id}, ["id"]) - if not row: - raise StoreError(400, "Bad user ID supplied.") - row_id = row["id"] + next_id = yield self._access_tokens_id_gen.get_next() + yield self._simple_insert( "access_tokens", { - "user_id": row_id, + "id": next_id, + "user_id": user_id, "token": token - } + }, + desc="add_access_token_to_user", ) @defer.inlineCallbacks @@ -70,32 +68,72 @@ class RegistrationStore(SQLBaseStore): def _register(self, txn, user_id, token, password_hash): now = int(self.clock.time()) + next_id = self._access_tokens_id_gen.get_next_txn(txn) + try: txn.execute("INSERT INTO users(name, password_hash, creation_ts) " "VALUES (?,?,?)", [user_id, password_hash, now]) - except IntegrityError: + except self.database_engine.module.IntegrityError: raise StoreError( 400, "User ID already taken.", errcode=Codes.USER_IN_USE ) # it's possible for this to get a conflict, but only for a single user # since tokens are namespaced based on their user ID - txn.execute("INSERT INTO access_tokens(user_id, token) " + - "VALUES (?,?)", [txn.lastrowid, token]) + txn.execute( + "INSERT INTO access_tokens(id, user_id, token)" + " VALUES (?,?,?)", + (next_id, user_id, token,) + ) def get_user_by_id(self, user_id): - query = ("SELECT users.name, users.password_hash FROM users" - " WHERE users.name = ?") - return self._execute( - "get_user_by_id", self.cursor_to_dict, query, user_id + return self._simple_select_one( + table="users", + keyvalues={ + "name": user_id, + }, + retcols=["name", "password_hash"], + allow_none=True, + ) + + @defer.inlineCallbacks + def user_set_password_hash(self, user_id, password_hash): + """ + NB. This does *not* evict any cache because the one use for this + removes most of the entries subsequently anyway so it would be + pointless. Use flush_user separately. + """ + yield self._simple_update_one('users', { + 'name': user_id + }, { + 'password_hash': password_hash + }) + + @defer.inlineCallbacks + def user_delete_access_tokens_apart_from(self, user_id, token_id): + yield self.runInteraction( + "user_delete_access_tokens_apart_from", + self._user_delete_access_tokens_apart_from, user_id, token_id ) + def _user_delete_access_tokens_apart_from(self, txn, user_id, token_id): + txn.execute( + "DELETE FROM access_tokens WHERE user_id = ? AND id != ?", + (user_id, token_id) + ) + + @defer.inlineCallbacks + def flush_user(self, user_id): + rows = yield self._execute( + 'flush_user', None, + "SELECT token FROM access_tokens WHERE user_id = ?", + user_id + ) + for r in rows: + self.get_user_by_token.invalidate(r) + @cached() - # TODO(paul): Currently there's no code to invalidate this cache. That - # means if/when we ever add internal ways to invalidate access tokens or - # change whether a user is a server admin, those will need to invoke - # store.get_user_by_token.invalidate(token) def get_user_by_token(self, token): """Get a user from the given access token. @@ -120,6 +158,7 @@ class RegistrationStore(SQLBaseStore): keyvalues={"name": user.to_string()}, retcol="admin", allow_none=True, + desc="is_server_admin", ) defer.returnValue(res if res else False) @@ -129,13 +168,49 @@ class RegistrationStore(SQLBaseStore): "SELECT users.name, users.admin," " access_tokens.device_id, access_tokens.id as token_id" " FROM users" - " INNER JOIN access_tokens on users.id = access_tokens.user_id" + " INNER JOIN access_tokens on users.name = access_tokens.user_id" " WHERE token = ?" ) - cursor = txn.execute(sql, (token,)) - rows = self.cursor_to_dict(cursor) + txn.execute(sql, (token,)) + rows = self.cursor_to_dict(txn) if rows: return rows[0] - raise StoreError(404, "Token not found.") + return None + + @defer.inlineCallbacks + def user_add_threepid(self, user_id, medium, address, validated_at, added_at): + yield self._simple_upsert("user_threepids", { + "user_id": user_id, + "medium": medium, + "address": address, + }, { + "validated_at": validated_at, + "added_at": added_at, + }) + + @defer.inlineCallbacks + def user_get_threepids(self, user_id): + ret = yield self._simple_select_list( + "user_threepids", { + "user_id": user_id + }, + ['medium', 'address', 'validated_at', 'added_at'], + 'user_get_threepids' + ) + defer.returnValue(ret) + + @defer.inlineCallbacks + def get_user_id_by_threepid(self, medium, address): + ret = yield self._simple_select_one( + "user_threepids", + { + "medium": medium, + "address": address + }, + ['user_id'], True, 'get_user_id_by_threepid' + ) + if ret: + defer.returnValue(ret['user_id']) + defer.returnValue(None) diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py index 4e1a9a2783..0838eb3d12 100644 --- a/synapse/storage/rejections.py +++ b/synapse/storage/rejections.py @@ -29,7 +29,7 @@ class RejectionsStore(SQLBaseStore): "event_id": event_id, "reason": reason, "last_check": self._clock.time_msec(), - } + }, ) def get_rejection_reason(self, event_id): @@ -40,4 +40,5 @@ class RejectionsStore(SQLBaseStore): "event_id": event_id, }, allow_none=True, + desc="get_rejection_reason", ) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 549c9af393..f956377632 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -15,11 +15,9 @@ from twisted.internet import defer -from sqlite3 import IntegrityError - from synapse.api.errors import StoreError -from ._base import SQLBaseStore, Table +from ._base import SQLBaseStore import collections import logging @@ -27,8 +25,9 @@ import logging logger = logging.getLogger(__name__) -OpsLevel = collections.namedtuple("OpsLevel", ( - "ban_level", "kick_level", "redact_level") +OpsLevel = collections.namedtuple( + "OpsLevel", + ("ban_level", "kick_level", "redact_level",) ) @@ -47,13 +46,15 @@ class RoomStore(SQLBaseStore): StoreError if the room could not be stored. """ try: - yield self._simple_insert(RoomsTable.table_name, dict( - room_id=room_id, - creator=room_creator_user_id, - is_public=is_public - )) - except IntegrityError: - raise StoreError(409, "Room ID in use.") + yield self._simple_insert( + RoomsTable.table_name, + { + "room_id": room_id, + "creator": room_creator_user_id, + "is_public": is_public, + }, + desc="store_room", + ) except Exception as e: logger.error("store_room with room_id=%s failed: %s", room_id, e) raise StoreError(500, "Problem creating room.") @@ -66,9 +67,22 @@ class RoomStore(SQLBaseStore): Returns: A namedtuple containing the room information, or an empty list. """ - query = RoomsTable.select_statement("room_id=?") - return self._execute( - "get_room", RoomsTable.decode_single_result, query, room_id, + return self._simple_select_one( + table=RoomsTable.table_name, + keyvalues={"room_id": room_id}, + retcols=RoomsTable.fields, + desc="get_room", + allow_none=True, + ) + + def get_public_room_ids(self): + return self._simple_select_onecol( + table="rooms", + keyvalues={ + "is_public": True, + }, + retcol="room_id", + desc="get_public_room_ids", ) @defer.inlineCallbacks @@ -99,24 +113,37 @@ class RoomStore(SQLBaseStore): "ON c.event_id = room_names.event_id " ) - # We use non printing ascii character US () as a seperator + # We use non printing ascii character US (\x1F) as a separator sql = ( - "SELECT r.room_id, n.name, t.topic, " - "group_concat(a.room_alias, '') " - "FROM rooms AS r " - "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id " - "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id " - "INNER JOIN room_aliases AS a ON a.room_id = r.room_id " - "WHERE r.is_public = ? " - "GROUP BY r.room_id " + "SELECT r.room_id, max(n.name), max(t.topic)" + " FROM rooms AS r" + " LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id" + " LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id" + " WHERE r.is_public = ?" + " GROUP BY r.room_id" ) % { "topic": topic_subquery, "name": name_subquery, } - c = txn.execute(sql, (is_public,)) + txn.execute(sql, (is_public,)) + + rows = txn.fetchall() - return c.fetchall() + for i, row in enumerate(rows): + room_id = row[0] + aliases = self._simple_select_onecol_txn( + txn, + table="room_aliases", + keyvalues={ + "room_id": room_id + }, + retcol="room_alias", + ) + + rows[i] = list(row) + [aliases] + + return rows rows = yield self.runInteraction( "get_rooms", f @@ -127,9 +154,10 @@ class RoomStore(SQLBaseStore): "room_id": r[0], "name": r[1], "topic": r[2], - "aliases": r[3].split(""), + "aliases": r[3], } for r in rows + if r[3] # We only return rooms that have at least one alias. ] defer.returnValue(ret) @@ -143,7 +171,7 @@ class RoomStore(SQLBaseStore): "event_id": event.event_id, "room_id": event.room_id, "topic": event.content["topic"], - } + }, ) def _store_room_name_txn(self, txn, event): @@ -158,8 +186,39 @@ class RoomStore(SQLBaseStore): } ) + @defer.inlineCallbacks + def get_room_name_and_aliases(self, room_id): + def f(txn): + sql = ( + "SELECT event_id FROM current_state_events " + "WHERE room_id = ? " + ) + + sql += " AND ((type = 'm.room.name' AND state_key = '')" + sql += " OR type = 'm.room.aliases')" + + txn.execute(sql, (room_id,)) + results = self.cursor_to_dict(txn) + + return self._parse_events_txn(txn, results) + + events = yield self.runInteraction("get_room_name_and_aliases", f) -class RoomsTable(Table): + name = None + aliases = [] + + for e in events: + if e.type == 'm.room.name': + if 'name' in e.content: + name = e.content['name'] + elif e.type == 'm.room.aliases': + if 'aliases' in e.content: + aliases.extend(e.content['aliases']) + + defer.returnValue((name, aliases)) + + +class RoomsTable(object): table_name = "rooms" fields = [ @@ -167,5 +226,3 @@ class RoomsTable(Table): "is_public", "creator" ] - - EntryType = collections.namedtuple("RoomEntry", fields) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 65ffb4627f..839c74f63a 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -40,7 +40,6 @@ class RoomMemberStore(SQLBaseStore): """ try: target_user_id = event.state_key - domain = UserID.from_string(target_user_id).domain except: logger.exception( "Failed to parse target_user_id=%s", target_user_id @@ -65,42 +64,9 @@ class RoomMemberStore(SQLBaseStore): } ) - # Update room hosts table - if event.membership == Membership.JOIN: - sql = ( - "INSERT OR IGNORE INTO room_hosts (room_id, host) " - "VALUES (?, ?)" - ) - txn.execute(sql, (event.room_id, domain)) - elif event.membership != Membership.INVITE: - # Check if this was the last person to have left. - member_events = self._get_members_query_txn( - txn, - where_clause=("c.room_id = ? AND m.membership = ?" - " AND m.user_id != ?"), - where_values=(event.room_id, Membership.JOIN, target_user_id,) - ) - - joined_domains = set() - for e in member_events: - try: - joined_domains.add( - UserID.from_string(e.state_key).domain - ) - except: - # FIXME: How do we deal with invalid user ids in the db? - logger.exception("Invalid user_id: %s", event.state_key) - - if domain not in joined_domains: - sql = ( - "DELETE FROM room_hosts WHERE room_id = ? AND host = ?" - ) + txn.call_after(self.get_rooms_for_user.invalidate, target_user_id) + txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id) - txn.execute(sql, (event.room_id, domain)) - - self.get_rooms_for_user.invalidate(target_user_id) - - @defer.inlineCallbacks def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -110,41 +76,27 @@ class RoomMemberStore(SQLBaseStore): Returns: Deferred: Results in a MembershipEvent or None. """ - rows = yield self._get_members_by_dict({ - "e.room_id": room_id, - "m.user_id": user_id, - }) + def f(txn): + events = self._get_members_events_txn( + txn, + room_id, + user_id=user_id, + ) - defer.returnValue(rows[0] if rows else None) + return events[0] if events else None - def _get_room_member(self, txn, user_id, room_id): - sql = ( - "SELECT e.* FROM events as e" - " INNER JOIN room_memberships as m" - " ON e.event_id = m.event_id" - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id" - " WHERE m.user_id = ? and e.room_id = ?" - " LIMIT 1" - ) - txn.execute(sql, (user_id, room_id)) - rows = self.cursor_to_dict(txn) - if rows: - return self._parse_events_txn(txn, rows)[0] - else: - return None + return self.runInteraction("get_room_member", f) def get_users_in_room(self, room_id): def f(txn): - sql = ( - "SELECT m.user_id FROM room_memberships as m" - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id" - " WHERE m.membership = ? AND m.room_id = ?" + + rows = self._get_members_rows_txn( + txn, + room_id=room_id, + membership=Membership.JOIN, ) - txn.execute(sql, (Membership.JOIN, room_id)) - return [r[0] for r in txn.fetchall()] + return [r["user_id"] for r in rows] return self.runInteraction("get_users_in_room", f) def get_room_members(self, room_id, membership=None): @@ -159,11 +111,14 @@ class RoomMemberStore(SQLBaseStore): list of namedtuples representing the members in this room. """ - where = {"m.room_id": room_id} - if membership: - where["m.membership"] = membership + def f(txn): + return self._get_members_events_txn( + txn, + room_id, + membership=membership, + ) - return self._get_members_by_dict(where) + return self.runInteraction("get_room_members", f) def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user @@ -199,7 +154,9 @@ class RoomMemberStore(SQLBaseStore): "SELECT m.room_id, m.sender, m.membership" " FROM room_memberships as m" " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id" + " ON m.event_id = c.event_id " + " AND m.room_id = c.room_id " + " AND m.user_id = c.state_key" " WHERE %s" ) % (where_clause,) @@ -208,32 +165,59 @@ class RoomMemberStore(SQLBaseStore): RoomsForUser(**r) for r in self.cursor_to_dict(txn) ] + @cached() def get_joined_hosts_for_room(self, room_id): - return self._simple_select_onecol( - "room_hosts", - {"room_id": room_id}, - "host" + return self.runInteraction( + "get_joined_hosts_for_room", + self._get_joined_hosts_for_room_txn, + room_id, + ) + + def _get_joined_hosts_for_room_txn(self, txn, room_id): + rows = self._get_members_rows_txn( + txn, + room_id, membership=Membership.JOIN + ) + + joined_domains = set( + UserID.from_string(r["user_id"]).domain + for r in rows ) - def _get_members_by_dict(self, where_dict): - clause = " AND ".join("%s = ?" % k for k in where_dict.keys()) - vals = where_dict.values() - return self._get_members_query(clause, vals) + return joined_domains def _get_members_query(self, where_clause, where_values): return self.runInteraction( - "get_members_query", self._get_members_query_txn, + "get_members_query", self._get_members_events_txn, where_clause, where_values ) - def _get_members_query_txn(self, txn, where_clause, where_values): + def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None): + rows = self._get_members_rows_txn( + txn, + room_id, membership, user_id, + ) + return self._get_events_txn(txn, [r["event_id"] for r in rows]) + + def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None): + where_clause = "c.room_id = ?" + where_values = [room_id] + + if membership: + where_clause += " AND m.membership = ?" + where_values.append(membership) + + if user_id: + where_clause += " AND m.user_id = ?" + where_values.append(user_id) + sql = ( - "SELECT e.* FROM events as e " - "INNER JOIN room_memberships as m " - "ON e.event_id = m.event_id " - "INNER JOIN current_state_events as c " - "ON m.event_id = c.event_id " - "WHERE %(where)s " + "SELECT m.* FROM room_memberships as m" + " INNER JOIN current_state_events as c" + " ON m.event_id = c.event_id " + " AND m.room_id = c.room_id " + " AND m.user_id = c.state_key" + " WHERE %(where)s" ) % { "where": where_clause, } @@ -241,8 +225,7 @@ class RoomMemberStore(SQLBaseStore): txn.execute(sql, where_values) rows = self.cursor_to_dict(txn) - results = self._parse_events_txn(txn, rows) - return results + return rows @cached() def get_rooms_for_user(self, user_id): diff --git a/synapse/storage/schema/delta/12/v12.sql b/synapse/storage/schema/delta/12/v12.sql index b87ef1fe79..878c36260a 100644 --- a/synapse/storage/schema/delta/12/v12.sql +++ b/synapse/storage/schema/delta/12/v12.sql @@ -17,26 +17,25 @@ CREATE TABLE IF NOT EXISTS rejections( event_id TEXT NOT NULL, reason TEXT NOT NULL, last_check TEXT NOT NULL, - CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE + UNIQUE (event_id) ); -- Push notification endpoints that users have configured CREATE TABLE IF NOT EXISTS pushers ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_name TEXT NOT NULL, - profile_tag varchar(32) NOT NULL, - kind varchar(8) NOT NULL, - app_id varchar(64) NOT NULL, - app_display_name varchar(64) NOT NULL, - device_display_name varchar(128) NOT NULL, - pushkey blob NOT NULL, - ts BIGINT NOT NULL, - lang varchar(8), - data blob, + profile_tag VARCHAR(32) NOT NULL, + kind VARCHAR(8) NOT NULL, + app_id VARCHAR(64) NOT NULL, + app_display_name VARCHAR(64) NOT NULL, + device_display_name VARCHAR(128) NOT NULL, + pushkey VARBINARY(512) NOT NULL, + ts BIGINT UNSIGNED NOT NULL, + lang VARCHAR(8), + data LONGBLOB, last_token TEXT, - last_success BIGINT, - failing_since BIGINT, - FOREIGN KEY(user_name) REFERENCES users(name), + last_success BIGINT UNSIGNED, + failing_since BIGINT UNSIGNED, UNIQUE (app_id, pushkey) ); @@ -55,13 +54,10 @@ CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name); CREATE TABLE IF NOT EXISTS user_filters( user_id TEXT, - filter_id INTEGER, - filter_json TEXT, - FOREIGN KEY(user_id) REFERENCES users(id) + filter_id BIGINT UNSIGNED, + filter_json LONGBLOB ); CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters( - user_id, filter_id + user_id, filter_id ); - -PRAGMA user_version = 12; diff --git a/synapse/storage/schema/delta/13/v13.sql b/synapse/storage/schema/delta/13/v13.sql index e491ad5aec..3265924013 100644 --- a/synapse/storage/schema/delta/13/v13.sql +++ b/synapse/storage/schema/delta/13/v13.sql @@ -19,16 +19,13 @@ CREATE TABLE IF NOT EXISTS application_services( token TEXT, hs_token TEXT, sender TEXT, - UNIQUE(token) ON CONFLICT ROLLBACK + UNIQUE(token) ); CREATE TABLE IF NOT EXISTS application_services_regex( id INTEGER PRIMARY KEY AUTOINCREMENT, - as_id INTEGER NOT NULL, + as_id BIGINT UNSIGNED NOT NULL, namespace INTEGER, /* enum[room_id|room_alias|user_id] */ regex TEXT, FOREIGN KEY(as_id) REFERENCES application_services(id) ); - - - diff --git a/synapse/storage/schema/delta/14/upgrade_appservice_db.py b/synapse/storage/schema/delta/14/upgrade_appservice_db.py index 847b1c5b89..9f3a4dd4c5 100644 --- a/synapse/storage/schema/delta/14/upgrade_appservice_db.py +++ b/synapse/storage/schema/delta/14/upgrade_appservice_db.py @@ -1,3 +1,17 @@ +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import json import logging diff --git a/synapse/storage/schema/delta/14/v14.sql b/synapse/storage/schema/delta/14/v14.sql index 0212726448..1d09ad7a15 100644 --- a/synapse/storage/schema/delta/14/v14.sql +++ b/synapse/storage/schema/delta/14/v14.sql @@ -1,3 +1,17 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ CREATE TABLE IF NOT EXISTS push_rules_enable ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_name TEXT NOT NULL, diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql new file mode 100644 index 0000000000..db2e720393 --- /dev/null +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -0,0 +1,31 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS application_services_state( + as_id TEXT PRIMARY KEY, + state VARCHAR(5), + last_txn INTEGER +); + +CREATE TABLE IF NOT EXISTS application_services_txns( + as_id TEXT NOT NULL, + txn_id INTEGER NOT NULL, + event_ids TEXT NOT NULL, + UNIQUE(as_id, txn_id) +); + +CREATE INDEX IF NOT EXISTS application_services_txns_id ON application_services_txns ( + as_id +); diff --git a/synapse/storage/schema/delta/15/presence_indices.sql b/synapse/storage/schema/delta/15/presence_indices.sql new file mode 100644 index 0000000000..6b8d0f1ca7 --- /dev/null +++ b/synapse/storage/schema/delta/15/presence_indices.sql @@ -0,0 +1,2 @@ + +CREATE INDEX IF NOT EXISTS presence_list_user_id ON presence_list (user_id); diff --git a/synapse/storage/schema/delta/15/v15.sql b/synapse/storage/schema/delta/15/v15.sql new file mode 100644 index 0000000000..f5b2a08ca4 --- /dev/null +++ b/synapse/storage/schema/delta/15/v15.sql @@ -0,0 +1,25 @@ +-- Drop, copy & recreate pushers table to change unique key +-- Also add access_token column at the same time +CREATE TABLE IF NOT EXISTS pushers2 ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_name TEXT NOT NULL, + access_token INTEGER DEFAULT NULL, + profile_tag varchar(32) NOT NULL, + kind varchar(8) NOT NULL, + app_id varchar(64) NOT NULL, + app_display_name varchar(64) NOT NULL, + device_display_name varchar(128) NOT NULL, + pushkey blob NOT NULL, + ts BIGINT NOT NULL, + lang varchar(8), + data blob, + last_token TEXT, + last_success BIGINT, + failing_since BIGINT, + FOREIGN KEY(user_name) REFERENCES users(name), + UNIQUE (app_id, pushkey, user_name) +); +INSERT INTO pushers2 (id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since) + SELECT id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since FROM pushers; +DROP TABLE pushers; +ALTER TABLE pushers2 RENAME TO pushers; diff --git a/synapse/storage/schema/delta/16/events_order_index.sql b/synapse/storage/schema/delta/16/events_order_index.sql new file mode 100644 index 0000000000..a48f215170 --- /dev/null +++ b/synapse/storage/schema/delta/16/events_order_index.sql @@ -0,0 +1,4 @@ +CREATE INDEX events_order ON events (topological_ordering, stream_ordering); +CREATE INDEX events_order_room ON events ( + room_id, topological_ordering, stream_ordering +); diff --git a/synapse/storage/schema/delta/16/remote_media_cache_index.sql b/synapse/storage/schema/delta/16/remote_media_cache_index.sql new file mode 100644 index 0000000000..7a15265cb1 --- /dev/null +++ b/synapse/storage/schema/delta/16/remote_media_cache_index.sql @@ -0,0 +1,2 @@ +CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id + ON remote_media_cache_thumbnails (media_id); \ No newline at end of file diff --git a/synapse/storage/schema/delta/16/remove_duplicates.sql b/synapse/storage/schema/delta/16/remove_duplicates.sql new file mode 100644 index 0000000000..65c97b5e2f --- /dev/null +++ b/synapse/storage/schema/delta/16/remove_duplicates.sql @@ -0,0 +1,9 @@ + + +DELETE FROM event_to_state_groups WHERE state_group not in ( + SELECT MAX(state_group) FROM event_to_state_groups GROUP BY event_id +); + +DELETE FROM event_to_state_groups WHERE rowid not in ( + SELECT MIN(rowid) FROM event_to_state_groups GROUP BY event_id +); diff --git a/synapse/storage/schema/delta/16/room_alias_index.sql b/synapse/storage/schema/delta/16/room_alias_index.sql new file mode 100644 index 0000000000..f82486132b --- /dev/null +++ b/synapse/storage/schema/delta/16/room_alias_index.sql @@ -0,0 +1,3 @@ + +CREATE INDEX IF NOT EXISTS room_aliases_id ON room_aliases(room_id); +CREATE INDEX IF NOT EXISTS room_alias_servers_alias ON room_alias_servers(room_alias); diff --git a/synapse/storage/schema/delta/16/unique_constraints.sql b/synapse/storage/schema/delta/16/unique_constraints.sql new file mode 100644 index 0000000000..fecf11118c --- /dev/null +++ b/synapse/storage/schema/delta/16/unique_constraints.sql @@ -0,0 +1,80 @@ + +-- We can use SQLite features here, since other db support was only added in v16 + +-- +DELETE FROM current_state_events WHERE rowid not in ( + SELECT MIN(rowid) FROM current_state_events GROUP BY event_id +); + +DROP INDEX IF EXISTS current_state_events_event_id; +CREATE UNIQUE INDEX current_state_events_event_id ON current_state_events(event_id); + +-- +DELETE FROM room_memberships WHERE rowid not in ( + SELECT MIN(rowid) FROM room_memberships GROUP BY event_id +); + +DROP INDEX IF EXISTS room_memberships_event_id; +CREATE UNIQUE INDEX room_memberships_event_id ON room_memberships(event_id); + +-- +DELETE FROM feedback WHERE rowid not in ( + SELECT MIN(rowid) FROM feedback GROUP BY event_id +); + +DROP INDEX IF EXISTS feedback_event_id; +CREATE UNIQUE INDEX feedback_event_id ON feedback(event_id); + +-- +DELETE FROM topics WHERE rowid not in ( + SELECT MIN(rowid) FROM topics GROUP BY event_id +); + +DROP INDEX IF EXISTS topics_event_id; +CREATE UNIQUE INDEX topics_event_id ON topics(event_id); + +-- +DELETE FROM room_names WHERE rowid not in ( + SELECT MIN(rowid) FROM room_names GROUP BY event_id +); + +DROP INDEX IF EXISTS room_names_id; +CREATE UNIQUE INDEX room_names_id ON room_names(event_id); + +-- +DELETE FROM presence WHERE rowid not in ( + SELECT MIN(rowid) FROM presence GROUP BY user_id +); + +DROP INDEX IF EXISTS presence_id; +CREATE UNIQUE INDEX presence_id ON presence(user_id); + +-- +DELETE FROM presence_allow_inbound WHERE rowid not in ( + SELECT MIN(rowid) FROM presence_allow_inbound + GROUP BY observed_user_id, observer_user_id +); + +DROP INDEX IF EXISTS presence_allow_inbound_observers; +CREATE UNIQUE INDEX presence_allow_inbound_observers ON presence_allow_inbound( + observed_user_id, observer_user_id +); + +-- +DELETE FROM presence_list WHERE rowid not in ( + SELECT MIN(rowid) FROM presence_list + GROUP BY user_id, observed_user_id +); + +DROP INDEX IF EXISTS presence_list_observers; +CREATE UNIQUE INDEX presence_list_observers ON presence_list( + user_id, observed_user_id +); + +-- +DELETE FROM room_aliases WHERE rowid not in ( + SELECT MIN(rowid) FROM room_aliases GROUP BY room_alias +); + +DROP INDEX IF EXISTS room_aliases_id; +CREATE INDEX room_aliases_id ON room_aliases(room_id); diff --git a/synapse/storage/schema/delta/16/users.sql b/synapse/storage/schema/delta/16/users.sql new file mode 100644 index 0000000000..cd0709250d --- /dev/null +++ b/synapse/storage/schema/delta/16/users.sql @@ -0,0 +1,56 @@ +-- Convert `access_tokens`.user from rowids to user strings. +-- MUST BE DONE BEFORE REMOVING ID COLUMN FROM USERS TABLE BELOW +CREATE TABLE IF NOT EXISTS new_access_tokens( + id BIGINT UNSIGNED PRIMARY KEY, + user_id TEXT NOT NULL, + device_id TEXT, + token TEXT NOT NULL, + last_used BIGINT UNSIGNED, + UNIQUE(token) +); + +INSERT INTO new_access_tokens + SELECT a.id, u.name, a.device_id, a.token, a.last_used + FROM access_tokens as a + INNER JOIN users as u ON u.id = a.user_id; + +DROP TABLE access_tokens; + +ALTER TABLE new_access_tokens RENAME TO access_tokens; + +-- Remove ID column from `users` table +CREATE TABLE IF NOT EXISTS new_users( + name TEXT, + password_hash TEXT, + creation_ts BIGINT UNSIGNED, + admin BOOL DEFAULT 0 NOT NULL, + UNIQUE(name) +); + +INSERT INTO new_users SELECT name, password_hash, creation_ts, admin FROM users; + +DROP TABLE users; + +ALTER TABLE new_users RENAME TO users; + + +-- Remove UNIQUE constraint from `user_ips` table +CREATE TABLE IF NOT EXISTS new_user_ips ( + user_id TEXT NOT NULL, + access_token TEXT NOT NULL, + device_id TEXT, + ip TEXT NOT NULL, + user_agent TEXT NOT NULL, + last_seen BIGINT UNSIGNED NOT NULL +); + +INSERT INTO new_user_ips + SELECT user, access_token, device_id, ip, user_agent, last_seen FROM user_ips; + +DROP TABLE user_ips; + +ALTER TABLE new_user_ips RENAME TO user_ips; + +CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user_id); +CREATE INDEX IF NOT EXISTS user_ips_user_ip ON user_ips(user_id, access_token, ip); + diff --git a/synapse/storage/schema/delta/17/drop_indexes.sql b/synapse/storage/schema/delta/17/drop_indexes.sql new file mode 100644 index 0000000000..8eb3325a6b --- /dev/null +++ b/synapse/storage/schema/delta/17/drop_indexes.sql @@ -0,0 +1,18 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +DROP INDEX IF EXISTS sent_transaction_dest; +DROP INDEX IF EXISTS sent_transaction_sent; +DROP INDEX IF EXISTS user_ips_user; diff --git a/synapse/storage/schema/delta/17/server_keys.sql b/synapse/storage/schema/delta/17/server_keys.sql new file mode 100644 index 0000000000..513c30a717 --- /dev/null +++ b/synapse/storage/schema/delta/17/server_keys.sql @@ -0,0 +1,24 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS server_keys_json ( + server_name TEXT, -- Server name. + key_id TEXT, -- Requested key id. + from_server TEXT, -- Which server the keys were fetched from. + ts_added_ms INTEGER, -- When the keys were fetched + ts_valid_until_ms INTEGER, -- When this version of the keys exipires. + key_json bytea, -- JSON certificate for the remote server. + CONSTRAINT uniqueness UNIQUE (server_name, key_id, from_server) +); diff --git a/synapse/storage/schema/delta/17/user_threepids.sql b/synapse/storage/schema/delta/17/user_threepids.sql new file mode 100644 index 0000000000..c17715ac80 --- /dev/null +++ b/synapse/storage/schema/delta/17/user_threepids.sql @@ -0,0 +1,9 @@ +CREATE TABLE user_threepids ( + user_id TEXT NOT NULL, + medium TEXT NOT NULL, + address TEXT NOT NULL, + validated_at BIGINT NOT NULL, + added_at BIGINT NOT NULL, + CONSTRAINT user_medium_address UNIQUE (user_id, medium, address) +); +CREATE INDEX user_threepids_user_id ON user_threepids(user_id); diff --git a/synapse/storage/schema/full_schemas/11/event_edges.sql b/synapse/storage/schema/full_schemas/11/event_edges.sql index 1e766d6db2..f7020f7793 100644 --- a/synapse/storage/schema/full_schemas/11/event_edges.sql +++ b/synapse/storage/schema/full_schemas/11/event_edges.sql @@ -16,52 +16,52 @@ CREATE TABLE IF NOT EXISTS event_forward_extremities( event_id TEXT NOT NULL, room_id TEXT NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE + UNIQUE (event_id, room_id) ); -CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id); -CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id); +CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id); +CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_backward_extremities( event_id TEXT NOT NULL, room_id TEXT NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE + UNIQUE (event_id, room_id) ); -CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id); -CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id); +CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id); +CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_edges( event_id TEXT NOT NULL, prev_event_id TEXT NOT NULL, room_id TEXT NOT NULL, - is_state INTEGER NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id, is_state) + is_state BOOL NOT NULL, + UNIQUE (event_id, prev_event_id, room_id, is_state) ); -CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id); -CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id); +CREATE INDEX ev_edges_id ON event_edges(event_id); +CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id); CREATE TABLE IF NOT EXISTS room_depth( room_id TEXT NOT NULL, min_depth INTEGER NOT NULL, - CONSTRAINT uniqueness UNIQUE (room_id) + UNIQUE (room_id) ); -CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id); +CREATE INDEX room_depth_room ON room_depth(room_id); create TABLE IF NOT EXISTS event_destinations( event_id TEXT NOT NULL, destination TEXT NOT NULL, - delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered - CONSTRAINT uniqueness UNIQUE (event_id, destination) ON CONFLICT REPLACE + delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered + UNIQUE (event_id, destination) ); -CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id); +CREATE INDEX event_destinations_id ON event_destinations(event_id); CREATE TABLE IF NOT EXISTS state_forward_extremities( @@ -69,21 +69,21 @@ CREATE TABLE IF NOT EXISTS state_forward_extremities( room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE + UNIQUE (event_id, room_id) ); -CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities( +CREATE INDEX st_extrem_keys ON state_forward_extremities( room_id, type, state_key ); -CREATE INDEX IF NOT EXISTS st_extrem_id ON state_forward_extremities(event_id); +CREATE INDEX st_extrem_id ON state_forward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_auth( event_id TEXT NOT NULL, auth_id TEXT NOT NULL, room_id TEXT NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, auth_id, room_id) + UNIQUE (event_id, auth_id, room_id) ); -CREATE INDEX IF NOT EXISTS evauth_edges_id ON event_auth(event_id); -CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id); \ No newline at end of file +CREATE INDEX evauth_edges_id ON event_auth(event_id); +CREATE INDEX evauth_edges_auth_id ON event_auth(auth_id); diff --git a/synapse/storage/schema/full_schemas/11/event_signatures.sql b/synapse/storage/schema/full_schemas/11/event_signatures.sql index c28c39c48a..636b2d3353 100644 --- a/synapse/storage/schema/full_schemas/11/event_signatures.sql +++ b/synapse/storage/schema/full_schemas/11/event_signatures.sql @@ -16,50 +16,40 @@ CREATE TABLE IF NOT EXISTS event_content_hashes ( event_id TEXT, algorithm TEXT, - hash BLOB, - CONSTRAINT uniqueness UNIQUE (event_id, algorithm) + hash bytea, + UNIQUE (event_id, algorithm) ); -CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes( - event_id -); +CREATE INDEX event_content_hashes_id ON event_content_hashes(event_id); CREATE TABLE IF NOT EXISTS event_reference_hashes ( event_id TEXT, algorithm TEXT, - hash BLOB, - CONSTRAINT uniqueness UNIQUE (event_id, algorithm) + hash bytea, + UNIQUE (event_id, algorithm) ); -CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes ( - event_id -); +CREATE INDEX event_reference_hashes_id ON event_reference_hashes(event_id); CREATE TABLE IF NOT EXISTS event_signatures ( event_id TEXT, signature_name TEXT, key_id TEXT, - signature BLOB, - CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id) + signature bytea, + UNIQUE (event_id, signature_name, key_id) ); -CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures ( - event_id -); +CREATE INDEX event_signatures_id ON event_signatures(event_id); CREATE TABLE IF NOT EXISTS event_edge_hashes( event_id TEXT, prev_event_id TEXT, algorithm TEXT, - hash BLOB, - CONSTRAINT uniqueness UNIQUE ( - event_id, prev_event_id, algorithm - ) + hash bytea, + UNIQUE (event_id, prev_event_id, algorithm) ); -CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes( - event_id -); +CREATE INDEX event_edge_hashes_id ON event_edge_hashes(event_id); diff --git a/synapse/storage/schema/full_schemas/11/im.sql b/synapse/storage/schema/full_schemas/11/im.sql index dd00c1cd2f..1901654ac2 100644 --- a/synapse/storage/schema/full_schemas/11/im.sql +++ b/synapse/storage/schema/full_schemas/11/im.sql @@ -15,7 +15,7 @@ CREATE TABLE IF NOT EXISTS events( stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT, - topological_ordering INTEGER NOT NULL, + topological_ordering BIGINT NOT NULL, event_id TEXT NOT NULL, type TEXT NOT NULL, room_id TEXT NOT NULL, @@ -23,26 +23,24 @@ CREATE TABLE IF NOT EXISTS events( unrecognized_keys TEXT, processed BOOL NOT NULL, outlier BOOL NOT NULL, - depth INTEGER DEFAULT 0 NOT NULL, - CONSTRAINT ev_uniq UNIQUE (event_id) + depth BIGINT DEFAULT 0 NOT NULL, + UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS events_event_id ON events (event_id); -CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering); -CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering); -CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id); +CREATE INDEX events_stream_ordering ON events (stream_ordering); +CREATE INDEX events_topological_ordering ON events (topological_ordering); +CREATE INDEX events_room_id ON events (room_id); CREATE TABLE IF NOT EXISTS event_json( event_id TEXT NOT NULL, room_id TEXT NOT NULL, - internal_metadata NOT NULL, - json BLOB NOT NULL, - CONSTRAINT ev_j_uniq UNIQUE (event_id) + internal_metadata TEXT NOT NULL, + json TEXT NOT NULL, + UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS event_json_id ON event_json(event_id); -CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id); +CREATE INDEX event_json_room_id ON event_json(room_id); CREATE TABLE IF NOT EXISTS state_events( @@ -50,13 +48,13 @@ CREATE TABLE IF NOT EXISTS state_events( room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, - prev_state TEXT + prev_state TEXT, + UNIQUE (event_id) ); -CREATE UNIQUE INDEX IF NOT EXISTS state_events_event_id ON state_events (event_id); -CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id); -CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type); -CREATE INDEX IF NOT EXISTS state_events_state_key ON state_events (state_key); +CREATE INDEX state_events_room_id ON state_events (room_id); +CREATE INDEX state_events_type ON state_events (type); +CREATE INDEX state_events_state_key ON state_events (state_key); CREATE TABLE IF NOT EXISTS current_state_events( @@ -64,13 +62,13 @@ CREATE TABLE IF NOT EXISTS current_state_events( room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, - CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE + UNIQUE (room_id, type, state_key) ); -CREATE INDEX IF NOT EXISTS curr_events_event_id ON current_state_events (event_id); -CREATE INDEX IF NOT EXISTS current_state_events_room_id ON current_state_events (room_id); -CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (type); -CREATE INDEX IF NOT EXISTS current_state_events_state_key ON current_state_events (state_key); +CREATE INDEX curr_events_event_id ON current_state_events (event_id); +CREATE INDEX current_state_events_room_id ON current_state_events (room_id); +CREATE INDEX current_state_events_type ON current_state_events (type); +CREATE INDEX current_state_events_state_key ON current_state_events (state_key); CREATE TABLE IF NOT EXISTS room_memberships( event_id TEXT NOT NULL, @@ -80,9 +78,9 @@ CREATE TABLE IF NOT EXISTS room_memberships( membership TEXT NOT NULL ); -CREATE INDEX IF NOT EXISTS room_memberships_event_id ON room_memberships (event_id); -CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id); -CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id); +CREATE INDEX room_memberships_event_id ON room_memberships (event_id); +CREATE INDEX room_memberships_room_id ON room_memberships (room_id); +CREATE INDEX room_memberships_user_id ON room_memberships (user_id); CREATE TABLE IF NOT EXISTS feedback( event_id TEXT NOT NULL, @@ -98,8 +96,8 @@ CREATE TABLE IF NOT EXISTS topics( topic TEXT NOT NULL ); -CREATE INDEX IF NOT EXISTS topics_event_id ON topics(event_id); -CREATE INDEX IF NOT EXISTS topics_room_id ON topics(room_id); +CREATE INDEX topics_event_id ON topics(event_id); +CREATE INDEX topics_room_id ON topics(room_id); CREATE TABLE IF NOT EXISTS room_names( event_id TEXT NOT NULL, @@ -107,19 +105,19 @@ CREATE TABLE IF NOT EXISTS room_names( name TEXT NOT NULL ); -CREATE INDEX IF NOT EXISTS room_names_event_id ON room_names(event_id); -CREATE INDEX IF NOT EXISTS room_names_room_id ON room_names(room_id); +CREATE INDEX room_names_event_id ON room_names(event_id); +CREATE INDEX room_names_room_id ON room_names(room_id); CREATE TABLE IF NOT EXISTS rooms( room_id TEXT PRIMARY KEY NOT NULL, - is_public INTEGER, + is_public BOOL, creator TEXT ); CREATE TABLE IF NOT EXISTS room_hosts( room_id TEXT NOT NULL, host TEXT NOT NULL, - CONSTRAINT room_hosts_uniq UNIQUE (room_id, host) ON CONFLICT IGNORE + UNIQUE (room_id, host) ); -CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id); +CREATE INDEX room_hosts_room_id ON room_hosts (room_id); diff --git a/synapse/storage/schema/full_schemas/11/keys.sql b/synapse/storage/schema/full_schemas/11/keys.sql index a9e0a4fe0d..afc142045e 100644 --- a/synapse/storage/schema/full_schemas/11/keys.sql +++ b/synapse/storage/schema/full_schemas/11/keys.sql @@ -16,16 +16,16 @@ CREATE TABLE IF NOT EXISTS server_tls_certificates( server_name TEXT, -- Server name. fingerprint TEXT, -- Certificate fingerprint. from_server TEXT, -- Which key server the certificate was fetched from. - ts_added_ms INTEGER, -- When the certifcate was added. - tls_certificate BLOB, -- DER encoded x509 certificate. - CONSTRAINT uniqueness UNIQUE (server_name, fingerprint) + ts_added_ms BIGINT, -- When the certifcate was added. + tls_certificate bytea, -- DER encoded x509 certificate. + UNIQUE (server_name, fingerprint) ); CREATE TABLE IF NOT EXISTS server_signature_keys( server_name TEXT, -- Server name. key_id TEXT, -- Key version. from_server TEXT, -- Which key server the key was fetched form. - ts_added_ms INTEGER, -- When the key was added. - verify_key BLOB, -- NACL verification key. - CONSTRAINT uniqueness UNIQUE (server_name, key_id) + ts_added_ms BIGINT, -- When the key was added. + verify_key bytea, -- NACL verification key. + UNIQUE (server_name, key_id) ); diff --git a/synapse/storage/schema/full_schemas/11/media_repository.sql b/synapse/storage/schema/full_schemas/11/media_repository.sql index afdf48cbfb..e927e581d1 100644 --- a/synapse/storage/schema/full_schemas/11/media_repository.sql +++ b/synapse/storage/schema/full_schemas/11/media_repository.sql @@ -17,10 +17,10 @@ CREATE TABLE IF NOT EXISTS local_media_repository ( media_id TEXT, -- The id used to refer to the media. media_type TEXT, -- The MIME-type of the media. media_length INTEGER, -- Length of the media in bytes. - created_ts INTEGER, -- When the content was uploaded in ms. + created_ts BIGINT, -- When the content was uploaded in ms. upload_name TEXT, -- The name the media was uploaded with. user_id TEXT, -- The user who uploaded the file. - CONSTRAINT uniqueness UNIQUE (media_id) + UNIQUE (media_id) ); CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails ( @@ -30,23 +30,23 @@ CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails ( thumbnail_type TEXT, -- The MIME-type of the thumbnail. thumbnail_method TEXT, -- The method used to make the thumbnail. thumbnail_length INTEGER, -- The length of the thumbnail in bytes. - CONSTRAINT uniqueness UNIQUE ( + UNIQUE ( media_id, thumbnail_width, thumbnail_height, thumbnail_type ) ); -CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id +CREATE INDEX local_media_repository_thumbnails_media_id ON local_media_repository_thumbnails (media_id); CREATE TABLE IF NOT EXISTS remote_media_cache ( media_origin TEXT, -- The remote HS the media came from. media_id TEXT, -- The id used to refer to the media on that server. media_type TEXT, -- The MIME-type of the media. - created_ts INTEGER, -- When the content was uploaded in ms. + created_ts BIGINT, -- When the content was uploaded in ms. upload_name TEXT, -- The name the media was uploaded with. media_length INTEGER, -- Length of the media in bytes. filesystem_id TEXT, -- The name used to store the media on disk. - CONSTRAINT uniqueness UNIQUE (media_origin, media_id) + UNIQUE (media_origin, media_id) ); CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails ( @@ -58,11 +58,8 @@ CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails ( thumbnail_type TEXT, -- The MIME-type of the thumbnail. thumbnail_length INTEGER, -- The length of the thumbnail in bytes. filesystem_id TEXT, -- The name used to store the media on disk. - CONSTRAINT uniqueness UNIQUE ( + UNIQUE ( media_origin, media_id, thumbnail_width, thumbnail_height, - thumbnail_type, thumbnail_type - ) + thumbnail_type + ) ); - -CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id - ON local_media_repository_thumbnails (media_id); diff --git a/synapse/storage/schema/full_schemas/11/presence.sql b/synapse/storage/schema/full_schemas/11/presence.sql index f9f8db9697..d8d82e9fe3 100644 --- a/synapse/storage/schema/full_schemas/11/presence.sql +++ b/synapse/storage/schema/full_schemas/11/presence.sql @@ -13,26 +13,23 @@ * limitations under the License. */ CREATE TABLE IF NOT EXISTS presence( - user_id INTEGER NOT NULL, - state INTEGER, + user_id TEXT NOT NULL, + state VARCHAR(20), status_msg TEXT, - mtime INTEGER, -- miliseconds since last state change - FOREIGN KEY(user_id) REFERENCES users(id) + mtime BIGINT -- miliseconds since last state change ); -- For each of /my/ users which possibly-remote users are allowed to see their -- presence state CREATE TABLE IF NOT EXISTS presence_allow_inbound( - observed_user_id INTEGER NOT NULL, - observer_user_id TEXT, -- a UserID, - FOREIGN KEY(observed_user_id) REFERENCES users(id) + observed_user_id TEXT NOT NULL, + observer_user_id TEXT NOT NULL -- a UserID, ); -- For each of /my/ users (watcher), which possibly-remote users are they -- watching? CREATE TABLE IF NOT EXISTS presence_list( - user_id INTEGER NOT NULL, - observed_user_id TEXT, -- a UserID, - accepted BOOLEAN, - FOREIGN KEY(user_id) REFERENCES users(id) + user_id TEXT NOT NULL, + observed_user_id TEXT NOT NULL, -- a UserID, + accepted BOOLEAN NOT NULL ); diff --git a/synapse/storage/schema/full_schemas/11/profiles.sql b/synapse/storage/schema/full_schemas/11/profiles.sql index f06a528b4d..26e4204437 100644 --- a/synapse/storage/schema/full_schemas/11/profiles.sql +++ b/synapse/storage/schema/full_schemas/11/profiles.sql @@ -13,8 +13,7 @@ * limitations under the License. */ CREATE TABLE IF NOT EXISTS profiles( - user_id INTEGER NOT NULL, + user_id TEXT NOT NULL, displayname TEXT, - avatar_url TEXT, - FOREIGN KEY(user_id) REFERENCES users(id) + avatar_url TEXT ); diff --git a/synapse/storage/schema/full_schemas/11/redactions.sql b/synapse/storage/schema/full_schemas/11/redactions.sql index 5011d95db8..69621955d4 100644 --- a/synapse/storage/schema/full_schemas/11/redactions.sql +++ b/synapse/storage/schema/full_schemas/11/redactions.sql @@ -15,8 +15,8 @@ CREATE TABLE IF NOT EXISTS redactions ( event_id TEXT NOT NULL, redacts TEXT NOT NULL, - CONSTRAINT ev_uniq UNIQUE (event_id) + UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id); -CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts); +CREATE INDEX redactions_event_id ON redactions (event_id); +CREATE INDEX redactions_redacts ON redactions (redacts); diff --git a/synapse/storage/schema/full_schemas/11/room_aliases.sql b/synapse/storage/schema/full_schemas/11/room_aliases.sql index 0d2df01603..5027b1e3f6 100644 --- a/synapse/storage/schema/full_schemas/11/room_aliases.sql +++ b/synapse/storage/schema/full_schemas/11/room_aliases.sql @@ -22,6 +22,3 @@ CREATE TABLE IF NOT EXISTS room_alias_servers( room_alias TEXT NOT NULL, server TEXT NOT NULL ); - - - diff --git a/synapse/storage/schema/full_schemas/11/state.sql b/synapse/storage/schema/full_schemas/11/state.sql index 1fe8f1e430..ffd164ab71 100644 --- a/synapse/storage/schema/full_schemas/11/state.sql +++ b/synapse/storage/schema/full_schemas/11/state.sql @@ -30,18 +30,11 @@ CREATE TABLE IF NOT EXISTS state_groups_state( CREATE TABLE IF NOT EXISTS event_to_state_groups( event_id TEXT NOT NULL, state_group INTEGER NOT NULL, - CONSTRAINT event_to_state_groups_uniq UNIQUE (event_id) + UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id); +CREATE INDEX state_groups_id ON state_groups(id); -CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state( - state_group -); -CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state( - room_id, type, state_key -); - -CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups( - event_id -); \ No newline at end of file +CREATE INDEX state_groups_state_id ON state_groups_state(state_group); +CREATE INDEX state_groups_state_tuple ON state_groups_state(room_id, type, state_key); +CREATE INDEX event_to_state_groups_id ON event_to_state_groups(event_id); diff --git a/synapse/storage/schema/full_schemas/11/transactions.sql b/synapse/storage/schema/full_schemas/11/transactions.sql index 2d30f99b06..cc5b54f5aa 100644 --- a/synapse/storage/schema/full_schemas/11/transactions.sql +++ b/synapse/storage/schema/full_schemas/11/transactions.sql @@ -14,17 +14,16 @@ */ -- Stores what transaction ids we have received and what our response was CREATE TABLE IF NOT EXISTS received_transactions( - transaction_id TEXT, - origin TEXT, - ts INTEGER, + transaction_id TEXT, + origin TEXT, + ts BIGINT, response_code INTEGER, - response_json TEXT, - has_been_referenced BOOL default 0, -- Whether thishas been referenced by a prev_tx - CONSTRAINT uniquesss UNIQUE (transaction_id, origin) ON CONFLICT REPLACE + response_json bytea, + has_been_referenced SMALLINT DEFAULT 0, -- Whether thishas been referenced by a prev_tx + UNIQUE (transaction_id, origin) ); -CREATE UNIQUE INDEX IF NOT EXISTS transactions_txid ON received_transactions(transaction_id, origin); -CREATE INDEX IF NOT EXISTS transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0; +CREATE INDEX transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0; -- Stores what transactions we've sent, what their response was (if we got one) and whether we have @@ -35,17 +34,14 @@ CREATE TABLE IF NOT EXISTS sent_transactions( destination TEXT, response_code INTEGER DEFAULT 0, response_json TEXT, - ts INTEGER + ts BIGINT ); -CREATE INDEX IF NOT EXISTS sent_transaction_dest ON sent_transactions(destination); -CREATE INDEX IF NOT EXISTS sent_transaction_dest_referenced ON sent_transactions( - destination -); -CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id); +CREATE INDEX sent_transaction_dest ON sent_transactions(destination); +CREATE INDEX sent_transaction_txn_id ON sent_transactions(transaction_id); -- So that we can do an efficient look up of all transactions that have yet to be successfully -- sent. -CREATE INDEX IF NOT EXISTS sent_transaction_sent ON sent_transactions(response_code); +CREATE INDEX sent_transaction_sent ON sent_transactions(response_code); -- For sent transactions only. @@ -56,13 +52,12 @@ CREATE TABLE IF NOT EXISTS transaction_id_to_pdu( pdu_origin TEXT ); -CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(transaction_id, destination); -CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); -CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination); +CREATE INDEX transaction_id_to_pdu_tx ON transaction_id_to_pdu(transaction_id, destination); +CREATE INDEX transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); -- To track destination health CREATE TABLE IF NOT EXISTS destinations( destination TEXT PRIMARY KEY, - retry_last_ts INTEGER, + retry_last_ts BIGINT, retry_interval INTEGER ); diff --git a/synapse/storage/schema/full_schemas/11/users.sql b/synapse/storage/schema/full_schemas/11/users.sql index 08ccfdac0a..eec3da3c35 100644 --- a/synapse/storage/schema/full_schemas/11/users.sql +++ b/synapse/storage/schema/full_schemas/11/users.sql @@ -16,19 +16,18 @@ CREATE TABLE IF NOT EXISTS users( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, password_hash TEXT, - creation_ts INTEGER, - admin BOOL DEFAULT 0 NOT NULL, - UNIQUE(name) ON CONFLICT ROLLBACK + creation_ts BIGINT, + admin SMALLINT DEFAULT 0 NOT NULL, + UNIQUE(name) ); CREATE TABLE IF NOT EXISTS access_tokens( id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER NOT NULL, + user_id TEXT NOT NULL, device_id TEXT, token TEXT NOT NULL, - last_used INTEGER, - FOREIGN KEY(user_id) REFERENCES users(id), - UNIQUE(token) ON CONFLICT ROLLBACK + last_used BIGINT, + UNIQUE(token) ); CREATE TABLE IF NOT EXISTS user_ips ( @@ -37,9 +36,8 @@ CREATE TABLE IF NOT EXISTS user_ips ( device_id TEXT, ip TEXT NOT NULL, user_agent TEXT NOT NULL, - last_seen INTEGER NOT NULL, - CONSTRAINT user_ip UNIQUE (user, access_token, ip, user_agent) ON CONFLICT REPLACE + last_seen BIGINT NOT NULL, + UNIQUE (user, access_token, ip, user_agent) ); -CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user); - +CREATE INDEX user_ips_user ON user_ips(user); diff --git a/synapse/storage/schema/full_schemas/16/application_services.sql b/synapse/storage/schema/full_schemas/16/application_services.sql new file mode 100644 index 0000000000..d382d63fbd --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/application_services.sql @@ -0,0 +1,48 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS application_services( + id BIGINT PRIMARY KEY, + url TEXT, + token TEXT, + hs_token TEXT, + sender TEXT, + UNIQUE(token) +); + +CREATE TABLE IF NOT EXISTS application_services_regex( + id BIGINT PRIMARY KEY, + as_id BIGINT NOT NULL, + namespace INTEGER, /* enum[room_id|room_alias|user_id] */ + regex TEXT, + FOREIGN KEY(as_id) REFERENCES application_services(id) +); + +CREATE TABLE IF NOT EXISTS application_services_state( + as_id TEXT PRIMARY KEY, + state VARCHAR(5), + last_txn INTEGER +); + +CREATE TABLE IF NOT EXISTS application_services_txns( + as_id TEXT NOT NULL, + txn_id INTEGER NOT NULL, + event_ids TEXT NOT NULL, + UNIQUE(as_id, txn_id) +); + +CREATE INDEX application_services_txns_id ON application_services_txns ( + as_id +); diff --git a/synapse/storage/schema/full_schemas/16/event_edges.sql b/synapse/storage/schema/full_schemas/16/event_edges.sql new file mode 100644 index 0000000000..f7020f7793 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/event_edges.sql @@ -0,0 +1,89 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_forward_extremities( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + UNIQUE (event_id, room_id) +); + +CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id); +CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id); + + +CREATE TABLE IF NOT EXISTS event_backward_extremities( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + UNIQUE (event_id, room_id) +); + +CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id); +CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id); + + +CREATE TABLE IF NOT EXISTS event_edges( + event_id TEXT NOT NULL, + prev_event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + is_state BOOL NOT NULL, + UNIQUE (event_id, prev_event_id, room_id, is_state) +); + +CREATE INDEX ev_edges_id ON event_edges(event_id); +CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id); + + +CREATE TABLE IF NOT EXISTS room_depth( + room_id TEXT NOT NULL, + min_depth INTEGER NOT NULL, + UNIQUE (room_id) +); + +CREATE INDEX room_depth_room ON room_depth(room_id); + + +create TABLE IF NOT EXISTS event_destinations( + event_id TEXT NOT NULL, + destination TEXT NOT NULL, + delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered + UNIQUE (event_id, destination) +); + +CREATE INDEX event_destinations_id ON event_destinations(event_id); + + +CREATE TABLE IF NOT EXISTS state_forward_extremities( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + type TEXT NOT NULL, + state_key TEXT NOT NULL, + UNIQUE (event_id, room_id) +); + +CREATE INDEX st_extrem_keys ON state_forward_extremities( + room_id, type, state_key +); +CREATE INDEX st_extrem_id ON state_forward_extremities(event_id); + + +CREATE TABLE IF NOT EXISTS event_auth( + event_id TEXT NOT NULL, + auth_id TEXT NOT NULL, + room_id TEXT NOT NULL, + UNIQUE (event_id, auth_id, room_id) +); + +CREATE INDEX evauth_edges_id ON event_auth(event_id); +CREATE INDEX evauth_edges_auth_id ON event_auth(auth_id); diff --git a/synapse/storage/schema/full_schemas/16/event_signatures.sql b/synapse/storage/schema/full_schemas/16/event_signatures.sql new file mode 100644 index 0000000000..636b2d3353 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/event_signatures.sql @@ -0,0 +1,55 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_content_hashes ( + event_id TEXT, + algorithm TEXT, + hash bytea, + UNIQUE (event_id, algorithm) +); + +CREATE INDEX event_content_hashes_id ON event_content_hashes(event_id); + + +CREATE TABLE IF NOT EXISTS event_reference_hashes ( + event_id TEXT, + algorithm TEXT, + hash bytea, + UNIQUE (event_id, algorithm) +); + +CREATE INDEX event_reference_hashes_id ON event_reference_hashes(event_id); + + +CREATE TABLE IF NOT EXISTS event_signatures ( + event_id TEXT, + signature_name TEXT, + key_id TEXT, + signature bytea, + UNIQUE (event_id, signature_name, key_id) +); + +CREATE INDEX event_signatures_id ON event_signatures(event_id); + + +CREATE TABLE IF NOT EXISTS event_edge_hashes( + event_id TEXT, + prev_event_id TEXT, + algorithm TEXT, + hash bytea, + UNIQUE (event_id, prev_event_id, algorithm) +); + +CREATE INDEX event_edge_hashes_id ON event_edge_hashes(event_id); diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql new file mode 100644 index 0000000000..576653a3c9 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/im.sql @@ -0,0 +1,128 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS events( + stream_ordering INTEGER PRIMARY KEY, + topological_ordering BIGINT NOT NULL, + event_id TEXT NOT NULL, + type TEXT NOT NULL, + room_id TEXT NOT NULL, + content TEXT NOT NULL, + unrecognized_keys TEXT, + processed BOOL NOT NULL, + outlier BOOL NOT NULL, + depth BIGINT DEFAULT 0 NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX events_stream_ordering ON events (stream_ordering); +CREATE INDEX events_topological_ordering ON events (topological_ordering); +CREATE INDEX events_order ON events (topological_ordering, stream_ordering); +CREATE INDEX events_room_id ON events (room_id); +CREATE INDEX events_order_room ON events ( + room_id, topological_ordering, stream_ordering +); + + +CREATE TABLE IF NOT EXISTS event_json( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + internal_metadata TEXT NOT NULL, + json TEXT NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX event_json_room_id ON event_json(room_id); + + +CREATE TABLE IF NOT EXISTS state_events( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + type TEXT NOT NULL, + state_key TEXT NOT NULL, + prev_state TEXT, + UNIQUE (event_id) +); + +CREATE INDEX state_events_room_id ON state_events (room_id); +CREATE INDEX state_events_type ON state_events (type); +CREATE INDEX state_events_state_key ON state_events (state_key); + + +CREATE TABLE IF NOT EXISTS current_state_events( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + type TEXT NOT NULL, + state_key TEXT NOT NULL, + UNIQUE (event_id), + UNIQUE (room_id, type, state_key) +); + +CREATE INDEX current_state_events_room_id ON current_state_events (room_id); +CREATE INDEX current_state_events_type ON current_state_events (type); +CREATE INDEX current_state_events_state_key ON current_state_events (state_key); + +CREATE TABLE IF NOT EXISTS room_memberships( + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + sender TEXT NOT NULL, + room_id TEXT NOT NULL, + membership TEXT NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX room_memberships_room_id ON room_memberships (room_id); +CREATE INDEX room_memberships_user_id ON room_memberships (user_id); + +CREATE TABLE IF NOT EXISTS feedback( + event_id TEXT NOT NULL, + feedback_type TEXT, + target_event_id TEXT, + sender TEXT, + room_id TEXT, + UNIQUE (event_id) +); + +CREATE TABLE IF NOT EXISTS topics( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + topic TEXT NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX topics_room_id ON topics(room_id); + +CREATE TABLE IF NOT EXISTS room_names( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + name TEXT NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX room_names_room_id ON room_names(room_id); + +CREATE TABLE IF NOT EXISTS rooms( + room_id TEXT PRIMARY KEY NOT NULL, + is_public BOOL, + creator TEXT +); + +CREATE TABLE IF NOT EXISTS room_hosts( + room_id TEXT NOT NULL, + host TEXT NOT NULL, + UNIQUE (room_id, host) +); + +CREATE INDEX room_hosts_room_id ON room_hosts (room_id); diff --git a/synapse/storage/schema/full_schemas/16/keys.sql b/synapse/storage/schema/full_schemas/16/keys.sql new file mode 100644 index 0000000000..afc142045e --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/keys.sql @@ -0,0 +1,31 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS server_tls_certificates( + server_name TEXT, -- Server name. + fingerprint TEXT, -- Certificate fingerprint. + from_server TEXT, -- Which key server the certificate was fetched from. + ts_added_ms BIGINT, -- When the certifcate was added. + tls_certificate bytea, -- DER encoded x509 certificate. + UNIQUE (server_name, fingerprint) +); + +CREATE TABLE IF NOT EXISTS server_signature_keys( + server_name TEXT, -- Server name. + key_id TEXT, -- Key version. + from_server TEXT, -- Which key server the key was fetched form. + ts_added_ms BIGINT, -- When the key was added. + verify_key bytea, -- NACL verification key. + UNIQUE (server_name, key_id) +); diff --git a/synapse/storage/schema/full_schemas/16/media_repository.sql b/synapse/storage/schema/full_schemas/16/media_repository.sql new file mode 100644 index 0000000000..dacbda40ca --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/media_repository.sql @@ -0,0 +1,68 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS local_media_repository ( + media_id TEXT, -- The id used to refer to the media. + media_type TEXT, -- The MIME-type of the media. + media_length INTEGER, -- Length of the media in bytes. + created_ts BIGINT, -- When the content was uploaded in ms. + upload_name TEXT, -- The name the media was uploaded with. + user_id TEXT, -- The user who uploaded the file. + UNIQUE (media_id) +); + +CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails ( + media_id TEXT, -- The id used to refer to the media. + thumbnail_width INTEGER, -- The width of the thumbnail in pixels. + thumbnail_height INTEGER, -- The height of the thumbnail in pixels. + thumbnail_type TEXT, -- The MIME-type of the thumbnail. + thumbnail_method TEXT, -- The method used to make the thumbnail. + thumbnail_length INTEGER, -- The length of the thumbnail in bytes. + UNIQUE ( + media_id, thumbnail_width, thumbnail_height, thumbnail_type + ) +); + +CREATE INDEX local_media_repository_thumbnails_media_id + ON local_media_repository_thumbnails (media_id); + +CREATE TABLE IF NOT EXISTS remote_media_cache ( + media_origin TEXT, -- The remote HS the media came from. + media_id TEXT, -- The id used to refer to the media on that server. + media_type TEXT, -- The MIME-type of the media. + created_ts BIGINT, -- When the content was uploaded in ms. + upload_name TEXT, -- The name the media was uploaded with. + media_length INTEGER, -- Length of the media in bytes. + filesystem_id TEXT, -- The name used to store the media on disk. + UNIQUE (media_origin, media_id) +); + +CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails ( + media_origin TEXT, -- The remote HS the media came from. + media_id TEXT, -- The id used to refer to the media. + thumbnail_width INTEGER, -- The width of the thumbnail in pixels. + thumbnail_height INTEGER, -- The height of the thumbnail in pixels. + thumbnail_method TEXT, -- The method used to make the thumbnail + thumbnail_type TEXT, -- The MIME-type of the thumbnail. + thumbnail_length INTEGER, -- The length of the thumbnail in bytes. + filesystem_id TEXT, -- The name used to store the media on disk. + UNIQUE ( + media_origin, media_id, thumbnail_width, thumbnail_height, + thumbnail_type + ) +); + +CREATE INDEX remote_media_cache_thumbnails_media_id + ON remote_media_cache_thumbnails (media_id); diff --git a/synapse/storage/schema/full_schemas/16/presence.sql b/synapse/storage/schema/full_schemas/16/presence.sql new file mode 100644 index 0000000000..80088413ba --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/presence.sql @@ -0,0 +1,40 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS presence( + user_id TEXT NOT NULL, + state VARCHAR(20), + status_msg TEXT, + mtime BIGINT, -- miliseconds since last state change + UNIQUE (user_id) +); + +-- For each of /my/ users which possibly-remote users are allowed to see their +-- presence state +CREATE TABLE IF NOT EXISTS presence_allow_inbound( + observed_user_id TEXT NOT NULL, + observer_user_id TEXT NOT NULL, -- a UserID, + UNIQUE (observed_user_id, observer_user_id) +); + +-- For each of /my/ users (watcher), which possibly-remote users are they +-- watching? +CREATE TABLE IF NOT EXISTS presence_list( + user_id TEXT NOT NULL, + observed_user_id TEXT NOT NULL, -- a UserID, + accepted BOOLEAN NOT NULL, + UNIQUE (user_id, observed_user_id) +); + +CREATE INDEX presence_list_user_id ON presence_list (user_id); diff --git a/synapse/storage/schema/full_schemas/16/profiles.sql b/synapse/storage/schema/full_schemas/16/profiles.sql new file mode 100644 index 0000000000..934be86520 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/profiles.sql @@ -0,0 +1,20 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS profiles( + user_id TEXT NOT NULL, + displayname TEXT, + avatar_url TEXT, + UNIQUE(user_id) +); diff --git a/synapse/storage/schema/full_schemas/16/push.sql b/synapse/storage/schema/full_schemas/16/push.sql new file mode 100644 index 0000000000..9387f920f0 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/push.sql @@ -0,0 +1,74 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS rejections( + event_id TEXT NOT NULL, + reason TEXT NOT NULL, + last_check TEXT NOT NULL, + UNIQUE (event_id) +); + +-- Push notification endpoints that users have configured +CREATE TABLE IF NOT EXISTS pushers ( + id BIGINT PRIMARY KEY, + user_name TEXT NOT NULL, + access_token BIGINT DEFAULT NULL, + profile_tag VARCHAR(32) NOT NULL, + kind VARCHAR(8) NOT NULL, + app_id VARCHAR(64) NOT NULL, + app_display_name VARCHAR(64) NOT NULL, + device_display_name VARCHAR(128) NOT NULL, + pushkey bytea NOT NULL, + ts BIGINT NOT NULL, + lang VARCHAR(8), + data bytea, + last_token TEXT, + last_success BIGINT, + failing_since BIGINT, + UNIQUE (app_id, pushkey) +); + +CREATE TABLE IF NOT EXISTS push_rules ( + id BIGINT PRIMARY KEY, + user_name TEXT NOT NULL, + rule_id TEXT NOT NULL, + priority_class SMALLINT NOT NULL, + priority INTEGER NOT NULL DEFAULT 0, + conditions TEXT NOT NULL, + actions TEXT NOT NULL, + UNIQUE(user_name, rule_id) +); + +CREATE INDEX push_rules_user_name on push_rules (user_name); + +CREATE TABLE IF NOT EXISTS user_filters( + user_id TEXT, + filter_id BIGINT, + filter_json bytea +); + +CREATE INDEX user_filters_by_user_id_filter_id ON user_filters( + user_id, filter_id +); + +CREATE TABLE IF NOT EXISTS push_rules_enable ( + id BIGINT PRIMARY KEY, + user_name TEXT NOT NULL, + rule_id TEXT NOT NULL, + enabled SMALLINT, + UNIQUE(user_name, rule_id) +); + +CREATE INDEX push_rules_enable_user_name on push_rules_enable (user_name); diff --git a/synapse/storage/schema/full_schemas/16/redactions.sql b/synapse/storage/schema/full_schemas/16/redactions.sql new file mode 100644 index 0000000000..69621955d4 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/redactions.sql @@ -0,0 +1,22 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS redactions ( + event_id TEXT NOT NULL, + redacts TEXT NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX redactions_event_id ON redactions (event_id); +CREATE INDEX redactions_redacts ON redactions (redacts); diff --git a/synapse/storage/schema/full_schemas/16/room_aliases.sql b/synapse/storage/schema/full_schemas/16/room_aliases.sql new file mode 100644 index 0000000000..412bb97fad --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/room_aliases.sql @@ -0,0 +1,29 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS room_aliases( + room_alias TEXT NOT NULL, + room_id TEXT NOT NULL, + UNIQUE (room_alias) +); + +CREATE INDEX room_aliases_id ON room_aliases(room_id); + +CREATE TABLE IF NOT EXISTS room_alias_servers( + room_alias TEXT NOT NULL, + server TEXT NOT NULL +); + +CREATE INDEX room_alias_servers_alias ON room_alias_servers(room_alias); diff --git a/synapse/storage/schema/full_schemas/16/state.sql b/synapse/storage/schema/full_schemas/16/state.sql new file mode 100644 index 0000000000..705cac6ce9 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/state.sql @@ -0,0 +1,40 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS state_groups( + id BIGINT PRIMARY KEY, + room_id TEXT NOT NULL, + event_id TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS state_groups_state( + state_group BIGINT NOT NULL, + room_id TEXT NOT NULL, + type TEXT NOT NULL, + state_key TEXT NOT NULL, + event_id TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS event_to_state_groups( + event_id TEXT NOT NULL, + state_group BIGINT NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX state_groups_id ON state_groups(id); + +CREATE INDEX state_groups_state_id ON state_groups_state(state_group); +CREATE INDEX state_groups_state_tuple ON state_groups_state(room_id, type, state_key); +CREATE INDEX event_to_state_groups_id ON event_to_state_groups(event_id); diff --git a/synapse/storage/schema/full_schemas/16/transactions.sql b/synapse/storage/schema/full_schemas/16/transactions.sql new file mode 100644 index 0000000000..1ab77cdb63 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/transactions.sql @@ -0,0 +1,63 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +-- Stores what transaction ids we have received and what our response was +CREATE TABLE IF NOT EXISTS received_transactions( + transaction_id TEXT, + origin TEXT, + ts BIGINT, + response_code INTEGER, + response_json bytea, + has_been_referenced smallint default 0, -- Whether thishas been referenced by a prev_tx + UNIQUE (transaction_id, origin) +); + +CREATE INDEX transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0; + + +-- Stores what transactions we've sent, what their response was (if we got one) and whether we have +-- since referenced the transaction in another outgoing transaction +CREATE TABLE IF NOT EXISTS sent_transactions( + id BIGINT PRIMARY KEY, -- This is used to apply insertion ordering + transaction_id TEXT, + destination TEXT, + response_code INTEGER DEFAULT 0, + response_json TEXT, + ts BIGINT +); + +CREATE INDEX sent_transaction_dest ON sent_transactions(destination); +CREATE INDEX sent_transaction_txn_id ON sent_transactions(transaction_id); +-- So that we can do an efficient look up of all transactions that have yet to be successfully +-- sent. +CREATE INDEX sent_transaction_sent ON sent_transactions(response_code); + + +-- For sent transactions only. +CREATE TABLE IF NOT EXISTS transaction_id_to_pdu( + transaction_id INTEGER, + destination TEXT, + pdu_id TEXT, + pdu_origin TEXT, + UNIQUE (transaction_id, destination) +); + +CREATE INDEX transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); + +-- To track destination health +CREATE TABLE IF NOT EXISTS destinations( + destination TEXT PRIMARY KEY, + retry_last_ts BIGINT, + retry_interval INTEGER +); diff --git a/synapse/storage/schema/full_schemas/16/users.sql b/synapse/storage/schema/full_schemas/16/users.sql new file mode 100644 index 0000000000..d2fa3122da --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/users.sql @@ -0,0 +1,42 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS users( + name TEXT, + password_hash TEXT, + creation_ts BIGINT, + admin SMALLINT DEFAULT 0 NOT NULL, + UNIQUE(name) +); + +CREATE TABLE IF NOT EXISTS access_tokens( + id BIGINT PRIMARY KEY, + user_id TEXT NOT NULL, + device_id TEXT, + token TEXT NOT NULL, + last_used BIGINT, + UNIQUE(token) +); + +CREATE TABLE IF NOT EXISTS user_ips ( + user_id TEXT NOT NULL, + access_token TEXT NOT NULL, + device_id TEXT, + ip TEXT NOT NULL, + user_agent TEXT NOT NULL, + last_seen BIGINT NOT NULL +); + +CREATE INDEX user_ips_user ON user_ips(user_id); +CREATE INDEX user_ips_user_ip ON user_ips(user_id, access_token, ip); diff --git a/synapse/storage/schema/schema_version.sql b/synapse/storage/schema/schema_version.sql index 0431e2d051..d682608aa0 100644 --- a/synapse/storage/schema/schema_version.sql +++ b/synapse/storage/schema/schema_version.sql @@ -14,17 +14,14 @@ */ CREATE TABLE IF NOT EXISTS schema_version( - Lock char(1) NOT NULL DEFAULT 'X', -- Makes sure this table only has one row. + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. version INTEGER NOT NULL, upgraded BOOL NOT NULL, -- Whether we reached this version from an upgrade or an initial schema. - CONSTRAINT schema_version_lock_x CHECK (Lock='X') - CONSTRAINT schema_version_lock_uniq UNIQUE (Lock) + CHECK (Lock='X') ); CREATE TABLE IF NOT EXISTS applied_schema_deltas( version INTEGER NOT NULL, file TEXT NOT NULL, - CONSTRAINT schema_deltas_ver_file UNIQUE (version, file) ON CONFLICT IGNORE + UNIQUE(version, file) ); - -CREATE INDEX IF NOT EXISTS schema_deltas_ver ON applied_schema_deltas(version); diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index d0d53770f2..f051828630 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -56,7 +56,6 @@ class SignatureStore(SQLBaseStore): "algorithm": algorithm, "hash": buffer(hash_bytes), }, - or_ignore=True, ) def get_event_reference_hashes(self, event_ids): @@ -100,7 +99,7 @@ class SignatureStore(SQLBaseStore): " WHERE event_id = ?" ) txn.execute(query, (event_id, )) - return dict(txn.fetchall()) + return {k: v for k, v in txn.fetchall()} def _store_event_reference_hash_txn(self, txn, event_id, algorithm, hash_bytes): @@ -119,7 +118,6 @@ class SignatureStore(SQLBaseStore): "algorithm": algorithm, "hash": buffer(hash_bytes), }, - or_ignore=True, ) def _get_event_signatures_txn(self, txn, event_id): @@ -164,7 +162,6 @@ class SignatureStore(SQLBaseStore): "key_id": key_id, "signature": buffer(signature_bytes), }, - or_ignore=True, ) def _get_prev_event_hashes_txn(self, txn, event_id): @@ -198,5 +195,4 @@ class SignatureStore(SQLBaseStore): "algorithm": algorithm, "hash": buffer(hash_bytes), }, - or_ignore=True, ) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 456e4bd45d..dbc0e49c1f 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -15,6 +15,10 @@ from ._base import SQLBaseStore +from twisted.internet import defer + +from synapse.util.stringutils import random_string + import logging logger = logging.getLogger(__name__) @@ -89,29 +93,31 @@ class StateStore(SQLBaseStore): state_group = context.state_group if not state_group: - state_group = self._simple_insert_txn( + state_group = self._state_groups_id_gen.get_next_txn(txn) + self._simple_insert_txn( txn, table="state_groups", values={ + "id": state_group, "room_id": event.room_id, "event_id": event.event_id, }, - or_ignore=True, ) - for state in state_events.values(): - self._simple_insert_txn( - txn, - table="state_groups_state", - values={ + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { "state_group": state_group, "room_id": state.room_id, "type": state.type, "state_key": state.state_key, "event_id": state.event_id, - }, - or_ignore=True, - ) + } + for state in state_events.values() + ], + ) self._simple_insert_txn( txn, @@ -120,5 +126,33 @@ class StateStore(SQLBaseStore): "state_group": state_group, "event_id": event.event_id, }, - or_replace=True, ) + + @defer.inlineCallbacks + def get_current_state(self, room_id, event_type=None, state_key=""): + def f(txn): + sql = ( + "SELECT event_id FROM current_state_events" + " WHERE room_id = ? " + ) + + if event_type and state_key is not None: + sql += " AND type = ? AND state_key = ? " + args = (room_id, event_type, state_key) + elif event_type: + sql += " AND type = ?" + args = (room_id, event_type) + else: + args = (room_id, ) + + txn.execute(sql, args) + results = self.cursor_to_dict(txn) + + return self._parse_events_txn(txn, results) + + events = yield self.runInteraction("get_current_state", f) + defer.returnValue(events) + + +def _make_group_id(clock): + return str(int(clock.time_msec())) + random_string(5) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 09bc522210..280d4ad605 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -110,7 +110,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): if self.topological is None: return "(%d < %s)" % (self.stream, "stream_ordering") else: - return "(%d < %s OR (%d == %s AND %d < %s))" % ( + return "(%d < %s OR (%d = %s AND %d < %s))" % ( self.topological, "topological_ordering", self.topological, "topological_ordering", self.stream, "stream_ordering", @@ -120,7 +120,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): if self.topological is None: return "(%d >= %s)" % (self.stream, "stream_ordering") else: - return "(%d > %s OR (%d == %s AND %d >= %s))" % ( + return "(%d > %s OR (%d = %s AND %d >= %s))" % ( self.topological, "topological_ordering", self.topological, "topological_ordering", self.stream, "stream_ordering", @@ -149,7 +149,8 @@ class StreamStore(SQLBaseStore): # select all the events between from/to with a sensible limit sql = ( "SELECT e.event_id, e.room_id, e.type, s.state_key, " - "e.stream_ordering FROM events AS e LEFT JOIN state_events as s ON " + "e.stream_ordering FROM events AS e " + "LEFT JOIN state_events as s ON " "e.event_id = s.event_id " "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " "ORDER BY stream_ordering ASC LIMIT %(limit)d " @@ -214,8 +215,9 @@ class StreamStore(SQLBaseStore): current_room_membership_sql = ( "SELECT m.room_id FROM room_memberships as m " - "INNER JOIN current_state_events as c ON m.event_id = c.event_id " - "WHERE m.user_id = ? AND m.membership = 'join'" + " INNER JOIN current_state_events as c" + " ON m.event_id = c.event_id AND c.state_key = m.user_id" + " WHERE m.user_id = ? AND m.membership = 'join'" ) # We also want to get any membership events about that user, e.g. @@ -240,7 +242,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT e.event_id, e.stream_ordering FROM events AS e WHERE " - "(e.outlier = 0 AND (room_id IN (%(current)s)) OR " + "(e.outlier = ? AND (room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " "AND e.stream_ordering > ? AND e.stream_ordering <= ? " "ORDER BY stream_ordering ASC LIMIT %(limit)d " @@ -251,7 +253,7 @@ class StreamStore(SQLBaseStore): } def f(txn): - txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,)) + txn.execute(sql, (False, user_id, user_id, from_id.stream, to_id.stream,)) rows = self.cursor_to_dict(txn) @@ -283,7 +285,7 @@ class StreamStore(SQLBaseStore): # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. - args = [room_id] + args = [False, room_id] if direction == 'b': order = "DESC" bounds = _StreamToken.parse(from_key).upper_bound() @@ -307,7 +309,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT * FROM events" - " WHERE outlier = 0 AND room_id = ? AND %(bounds)s" + " WHERE outlier = ? AND room_id = ? AND %(bounds)s" " ORDER BY topological_ordering %(order)s," " stream_ordering %(order)s %(limit)s" ) % { @@ -358,7 +360,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT stream_ordering, topological_ordering, event_id" " FROM events" - " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0" + " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?" " ORDER BY topological_ordering DESC, stream_ordering DESC" " LIMIT ?" ) @@ -368,17 +370,17 @@ class StreamStore(SQLBaseStore): "SELECT stream_ordering, topological_ordering, event_id" " FROM events" " WHERE room_id = ? AND stream_ordering > ?" - " AND stream_ordering <= ? AND outlier = 0" + " AND stream_ordering <= ? AND outlier = ?" " ORDER BY topological_ordering DESC, stream_ordering DESC" " LIMIT ?" ) def get_recent_events_for_room_txn(txn): if from_token is None: - txn.execute(sql, (room_id, end_token.stream, limit,)) + txn.execute(sql, (room_id, end_token.stream, False, limit,)) else: txn.execute(sql, ( - room_id, from_token.stream, end_token.stream, limit + room_id, from_token.stream, end_token.stream, False, limit )) rows = self.cursor_to_dict(txn) @@ -413,26 +415,23 @@ class StreamStore(SQLBaseStore): "get_recent_events_for_room", get_recent_events_for_room_txn ) + @defer.inlineCallbacks def get_room_events_max_id(self): - return self.runInteraction( - "get_room_events_max_id", - self._get_room_events_max_id_txn - ) + token = yield self._stream_id_gen.get_max_token(self) + defer.returnValue("s%d" % (token,)) - def _get_room_events_max_id_txn(self, txn): - txn.execute( - "SELECT MAX(stream_ordering) as m FROM events" + @defer.inlineCallbacks + def _get_min_token(self): + row = yield self._execute( + "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events" ) - res = self.cursor_to_dict(txn) - - logger.debug("get_room_events_max_id: %s", res) + self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 + self.min_token = min(self.min_token, -1) - if not res or not res[0] or not res[0]["m"]: - return "s0" + logger.debug("min_token is: %s", self.min_token) - key = res[0]["m"] - return "s%d" % (key,) + defer.returnValue(self.min_token) @staticmethod def _set_before_and_after(events, rows): diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 0b8a3b7a07..624da4a9dc 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -13,10 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, Table, cached +from ._base import SQLBaseStore, cached from collections import namedtuple +from syutil.jsonutil import encode_canonical_json import logging logger = logging.getLogger(__name__) @@ -46,15 +47,19 @@ class TransactionStore(SQLBaseStore): ) def _get_received_txn_response(self, txn, transaction_id, origin): - where_clause = "transaction_id = ? AND origin = ?" - query = ReceivedTransactionsTable.select_statement(where_clause) - - txn.execute(query, (transaction_id, origin)) - - results = ReceivedTransactionsTable.decode_results(txn.fetchall()) + result = self._simple_select_one_txn( + txn, + table=ReceivedTransactionsTable.table_name, + keyvalues={ + "transaction_id": transaction_id, + "origin": origin, + }, + retcols=ReceivedTransactionsTable.fields, + allow_none=True, + ) - if results and results[0].response_code: - return (results[0].response_code, results[0].response_json) + if result and result.response_code: + return result["response_code"], result["response_json"] else: return None @@ -72,22 +77,18 @@ class TransactionStore(SQLBaseStore): response_json (str) """ - return self.runInteraction( - "set_received_txn_response", - self._set_received_txn_response, - transaction_id, origin, code, response_dict + return self._simple_insert( + table=ReceivedTransactionsTable.table_name, + values={ + "transaction_id": transaction_id, + "origin": origin, + "response_code": code, + "response_json": buffer(encode_canonical_json(response_dict)), + }, + or_ignore=True, + desc="set_received_txn_response", ) - def _set_received_txn_response(self, txn, transaction_id, origin, code, - response_json): - query = ( - "UPDATE %s " - "SET response_code = ?, response_json = ? " - "WHERE transaction_id = ? AND origin = ?" - ) % ReceivedTransactionsTable.table_name - - txn.execute(query, (code, response_json, transaction_id, origin)) - def prep_send_transaction(self, transaction_id, destination, origin_server_ts): """Persists an outgoing transaction and calculates the values for the @@ -114,41 +115,38 @@ class TransactionStore(SQLBaseStore): def _prep_send_transaction(self, txn, transaction_id, destination, origin_server_ts): + next_id = self._transaction_id_gen.get_next_txn(txn) + # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. - query = "%s ORDER BY id DESC LIMIT 1" % ( - SentTransactions.select_statement("destination = ?"), - ) + query = ( + "SELECT * FROM sent_transactions" + " WHERE destination = ?" + " ORDER BY id DESC LIMIT 1" + ) - results = txn.execute(query, (destination,)) - results = SentTransactions.decode_results(results) + txn.execute(query, (destination,)) + results = self.cursor_to_dict(txn) - prev_txns = [r.transaction_id for r in results] + prev_txns = [r["transaction_id"] for r in results] # Actually add the new transaction to the sent_transactions table. - query = SentTransactions.insert_statement() - txn.execute(query, SentTransactions.EntryType( - None, - transaction_id=transaction_id, - destination=destination, - ts=origin_server_ts, - response_code=0, - response_json=None - )) - - # Update the tx id -> pdu id mapping - - # values = [ - # (transaction_id, destination, pdu[0], pdu[1]) - # for pdu in pdu_list - # ] - # - # logger.debug("Inserting: %s", repr(values)) - # - # query = TransactionsToPduTable.insert_statement() - # txn.executemany(query, values) + self._simple_insert_txn( + txn, + table=SentTransactions.table_name, + values={ + "id": next_id, + "transaction_id": transaction_id, + "destination": destination, + "ts": origin_server_ts, + "response_code": 0, + "response_json": None, + } + ) + + # TODO Update the tx id -> pdu id mapping return prev_txns @@ -164,18 +162,24 @@ class TransactionStore(SQLBaseStore): return self.runInteraction( "delivered_txn", self._delivered_txn, - transaction_id, destination, code, response_dict + transaction_id, destination, code, + buffer(encode_canonical_json(response_dict)), ) - def _delivered_txn(cls, txn, transaction_id, destination, + def _delivered_txn(self, txn, transaction_id, destination, code, response_json): - query = ( - "UPDATE %s " - "SET response_code = ?, response_json = ? " - "WHERE transaction_id = ? AND destination = ?" - ) % SentTransactions.table_name - - txn.execute(query, (code, response_json, transaction_id, destination)) + self._simple_update_one_txn( + txn, + table=SentTransactions.table_name, + keyvalues={ + "transaction_id": transaction_id, + "destination": destination, + }, + updatevalues={ + "response_code": code, + "response_json": None, # For now, don't persist response_json + } + ) def get_transactions_after(self, transaction_id, destination): """Get all transactions after a given local transaction_id. @@ -185,25 +189,26 @@ class TransactionStore(SQLBaseStore): destination (str) Returns: - list: A list of `ReceivedTransactionsTable.EntryType` + list: A list of dicts """ return self.runInteraction( "get_transactions_after", self._get_transactions_after, transaction_id, destination ) - def _get_transactions_after(cls, txn, transaction_id, destination): - where = ( - "destination = ? AND id > (select id FROM %s WHERE " - "transaction_id = ? AND destination = ?)" - ) % ( - SentTransactions.table_name + def _get_transactions_after(self, txn, transaction_id, destination): + query = ( + "SELECT * FROM sent_transactions" + " WHERE destination = ? AND id >" + " (" + " SELECT id FROM sent_transactions" + " WHERE transaction_id = ? AND destination = ?" + " )" ) - query = SentTransactions.select_statement(where) txn.execute(query, (destination, transaction_id, destination)) - return ReceivedTransactionsTable.decode_results(txn.fetchall()) + return self.cursor_to_dict(txn) @cached() def get_destination_retry_timings(self, destination): @@ -214,22 +219,27 @@ class TransactionStore(SQLBaseStore): Returns: None if not retrying - Otherwise a DestinationsTable.EntryType for the retry scheme + Otherwise a dict for the retry scheme """ return self.runInteraction( "get_destination_retry_timings", self._get_destination_retry_timings, destination) - def _get_destination_retry_timings(cls, txn, destination): - query = DestinationsTable.select_statement("destination = ?") - txn.execute(query, (destination,)) - result = txn.fetchall() - if result: - result = DestinationsTable.decode_single_result(result) - if result.retry_last_ts > 0: - return result - else: - return None + def _get_destination_retry_timings(self, txn, destination): + result = self._simple_select_one_txn( + txn, + table=DestinationsTable.table_name, + keyvalues={ + "destination": destination, + }, + retcols=DestinationsTable.fields, + allow_none=True, + ) + + if result and result["retry_last_ts"] > 0: + return result + else: + return None def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval): @@ -245,11 +255,11 @@ class TransactionStore(SQLBaseStore): # As this is the new value, we might as well prefill the cache self.get_destination_retry_timings.prefill( destination, - DestinationsTable.EntryType( - destination, - retry_last_ts, - retry_interval - ) + { + "destination": destination, + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval + }, ) # XXX: we could chose to not bother persisting this if our cache thinks @@ -262,22 +272,38 @@ class TransactionStore(SQLBaseStore): retry_interval, ) - def _set_destination_retry_timings(cls, txn, destination, + def _set_destination_retry_timings(self, txn, destination, retry_last_ts, retry_interval): - query = ( - "INSERT OR REPLACE INTO %s " - "(destination, retry_last_ts, retry_interval) " - "VALUES (?, ?, ?) " - ) % DestinationsTable.table_name + "UPDATE destinations" + " SET retry_last_ts = ?, retry_interval = ?" + " WHERE destination = ?" + ) + + txn.execute( + query, + ( + retry_last_ts, retry_interval, destination, + ) + ) - txn.execute(query, (destination, retry_last_ts, retry_interval)) + if txn.rowcount == 0: + # destination wasn't already in table. Insert it. + self._simple_insert_txn( + txn, + table="destinations", + values={ + "destination": destination, + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + } + ) def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction. Returns: - list: A list of `DestinationsTable.EntryType` + list: A list of dicts """ return self.runInteraction( @@ -285,14 +311,17 @@ class TransactionStore(SQLBaseStore): self._get_destinations_needing_retry ) - def _get_destinations_needing_retry(cls, txn): - where = "retry_last_ts > 0 and retry_next_ts < now()" - query = DestinationsTable.select_statement(where) - txn.execute(query) - return DestinationsTable.decode_results(txn.fetchall()) + def _get_destinations_needing_retry(self, txn): + query = ( + "SELECT * FROM destinations" + " WHERE retry_last_ts > 0 and retry_next_ts < ?" + ) + txn.execute(query, (self._clock.time_msec(),)) + return self.cursor_to_dict(txn) -class ReceivedTransactionsTable(Table): + +class ReceivedTransactionsTable(object): table_name = "received_transactions" fields = [ @@ -304,10 +333,8 @@ class ReceivedTransactionsTable(Table): "has_been_referenced", ] - EntryType = namedtuple("ReceivedTransactionsEntry", fields) - -class SentTransactions(Table): +class SentTransactions(object): table_name = "sent_transactions" fields = [ @@ -322,7 +349,7 @@ class SentTransactions(Table): EntryType = namedtuple("SentTransactionsEntry", fields) -class TransactionsToPduTable(Table): +class TransactionsToPduTable(object): table_name = "transaction_id_to_pdu" fields = [ @@ -332,10 +359,8 @@ class TransactionsToPduTable(Table): "pdu_origin", ] - EntryType = namedtuple("TransactionsToPduEntry", fields) - -class DestinationsTable(Table): +class DestinationsTable(object): table_name = "destinations" fields = [ @@ -343,5 +368,3 @@ class DestinationsTable(Table): "retry_last_ts", "retry_interval", ] - - EntryType = namedtuple("DestinationsEntry", fields) diff --git a/synapse/storage/util/__init__.py b/synapse/storage/util/__init__.py new file mode 100644 index 0000000000..c488b10d3c --- /dev/null +++ b/synapse/storage/util/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py new file mode 100644 index 0000000000..e40eb8a8c4 --- /dev/null +++ b/synapse/storage/util/id_generators.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from collections import deque +import contextlib +import threading + + +class IdGenerator(object): + def __init__(self, table, column, store): + self.table = table + self.column = column + self.store = store + self._lock = threading.Lock() + self._next_id = None + + @defer.inlineCallbacks + def get_next(self): + if self._next_id is None: + yield self.store.runInteraction( + "IdGenerator_%s" % (self.table,), + self.get_next_txn, + ) + + with self._lock: + i = self._next_id + self._next_id += 1 + defer.returnValue(i) + + def get_next_txn(self, txn): + with self._lock: + if self._next_id: + i = self._next_id + self._next_id += 1 + return i + else: + txn.execute( + "SELECT MAX(%s) FROM %s" % (self.column, self.table,) + ) + + val, = txn.fetchone() + cur = val or 0 + cur += 1 + self._next_id = cur + 1 + + return cur + + +class StreamIdGenerator(object): + """Used to generate new stream ids when persisting events while keeping + track of which transactions have been completed. + + This allows us to get the "current" stream id, i.e. the stream id such that + all ids less than or equal to it have completed. This handles the fact that + persistence of events can complete out of order. + + Usage: + with stream_id_gen.get_next_txn(txn) as stream_id: + # ... persist event ... + """ + def __init__(self): + self._lock = threading.Lock() + + self._current_max = None + self._unfinished_ids = deque() + + def get_next_txn(self, txn): + """ + Usage: + with stream_id_gen.get_next_txn(txn) as stream_id: + # ... persist event ... + """ + if not self._current_max: + self._get_or_compute_current_max(txn) + + with self._lock: + self._current_max += 1 + next_id = self._current_max + + self._unfinished_ids.append(next_id) + + @contextlib.contextmanager + def manager(): + try: + yield next_id + finally: + with self._lock: + self._unfinished_ids.remove(next_id) + + return manager() + + @defer.inlineCallbacks + def get_max_token(self, store): + """Returns the maximum stream id such that all stream ids less than or + equal to it have been successfully persisted. + """ + if not self._current_max: + yield store.runInteraction( + "_compute_current_max", + self._get_or_compute_current_max, + ) + + with self._lock: + if self._unfinished_ids: + defer.returnValue(self._unfinished_ids[0] - 1) + + defer.returnValue(self._current_max) + + def _get_or_compute_current_max(self, txn): + with self._lock: + txn.execute("SELECT MAX(stream_ordering) FROM events") + rows = txn.fetchall() + val, = rows[0] + + self._current_max = int(val) if val else 1 + + return self._current_max |