diff options
Diffstat (limited to 'synapse/storage')
60 files changed, 2046 insertions, 734 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f4dec70393..61215bbc7b 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -51,7 +51,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 15 +SCHEMA_VERSION = 16 dir_path = os.path.abspath(os.path.dirname(__file__)) @@ -104,14 +104,16 @@ class DataStore(RoomMemberStore, RoomStore, self.client_ip_last_seen.prefill(*key + (now,)) - yield self._simple_insert( + yield self._simple_upsert( "user_ips", - { - "user": user.to_string(), + keyvalues={ + "user_id": user.to_string(), "access_token": access_token, - "device_id": device_id, "ip": ip, "user_agent": user_agent, + }, + values={ + "device_id": device_id, "last_seen": now, }, desc="insert_client_ip", @@ -120,7 +122,7 @@ class DataStore(RoomMemberStore, RoomStore, def get_user_ip_and_agents(self, user): return self._simple_select_list( table="user_ips", - keyvalues={"user": user.to_string()}, + keyvalues={"user_id": user.to_string()}, retcols=[ "device_id", "access_token", "ip", "user_agent", "last_seen" ], @@ -148,21 +150,23 @@ class UpgradeDatabaseException(PrepareDatabaseException): pass -def prepare_database(db_conn): +def prepare_database(db_conn, database_engine): """Prepares a database for usage. Will either create all necessary tables or upgrade from an older schema version. """ try: cur = db_conn.cursor() - version_info = _get_or_create_schema_state(cur) + version_info = _get_or_create_schema_state(cur, database_engine) if version_info: user_version, delta_files, upgraded = version_info - _upgrade_existing_database(cur, user_version, delta_files, upgraded) + _upgrade_existing_database( + cur, user_version, delta_files, upgraded, database_engine + ) else: - _setup_new_database(cur) + _setup_new_database(cur, database_engine) - cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,)) + # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,)) cur.close() db_conn.commit() @@ -171,7 +175,7 @@ def prepare_database(db_conn): raise -def _setup_new_database(cur): +def _setup_new_database(cur, database_engine): """Sets up the database by finding a base set of "full schemas" and then applying any necessary deltas. @@ -225,31 +229,30 @@ def _setup_new_database(cur): directory_entries = os.listdir(sql_dir) - sql_script = "BEGIN TRANSACTION;\n" for filename in fnmatch.filter(directory_entries, "*.sql"): sql_loc = os.path.join(sql_dir, filename) logger.debug("Applying schema %s", sql_loc) - sql_script += read_schema(sql_loc) - sql_script += "\n" - sql_script += "COMMIT TRANSACTION;" - cur.executescript(sql_script) + executescript(cur, sql_loc) cur.execute( - "INSERT OR REPLACE INTO schema_version (version, upgraded)" - " VALUES (?,?)", - (max_current_ver, False) + database_engine.convert_param_style( + "INSERT INTO schema_version (version, upgraded)" + " VALUES (?,?)" + ), + (max_current_ver, False,) ) _upgrade_existing_database( cur, current_version=max_current_ver, applied_delta_files=[], - upgraded=False + upgraded=False, + database_engine=database_engine, ) def _upgrade_existing_database(cur, current_version, applied_delta_files, - upgraded): + upgraded, database_engine): """Upgrades an existing database. Delta files can either be SQL stored in *.sql files, or python modules @@ -305,6 +308,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, if not upgraded: start_ver += 1 + logger.debug("applied_delta_files: %s", applied_delta_files) + for v in range(start_ver, SCHEMA_VERSION + 1): logger.debug("Upgrading schema to v%d", v) @@ -321,6 +326,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, directory_entries.sort() for file_name in directory_entries: relative_path = os.path.join(str(v), file_name) + logger.debug("Found file: %s", relative_path) if relative_path in applied_delta_files: continue @@ -342,9 +348,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, module.run_upgrade(cur) elif ext == ".sql": # A plain old .sql file, just read and execute it - delta_schema = read_schema(absolute_path) logger.debug("Applying schema %s", relative_path) - cur.executescript(delta_schema) + executescript(cur, absolute_path) else: # Not a valid delta file. logger.warn( @@ -356,24 +361,82 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, # Mark as done. cur.execute( - "INSERT INTO applied_schema_deltas (version, file)" - " VALUES (?,?)", + database_engine.convert_param_style( + "INSERT INTO applied_schema_deltas (version, file)" + " VALUES (?,?)", + ), (v, relative_path) ) cur.execute( - "INSERT OR REPLACE INTO schema_version (version, upgraded)" - " VALUES (?,?)", + database_engine.convert_param_style( + "REPLACE INTO schema_version (version, upgraded)" + " VALUES (?,?)", + ), (v, True) ) -def _get_or_create_schema_state(txn): +def get_statements(f): + statement_buffer = "" + in_comment = False # If we're in a /* ... */ style comment + + for line in f: + line = line.strip() + + if in_comment: + # Check if this line contains an end to the comment + comments = line.split("*/", 1) + if len(comments) == 1: + continue + line = comments[1] + in_comment = False + + # Remove inline block comments + line = re.sub(r"/\*.*\*/", " ", line) + + # Does this line start a comment? + comments = line.split("/*", 1) + if len(comments) > 1: + line = comments[0] + in_comment = True + + # Deal with line comments + line = line.split("--", 1)[0] + line = line.split("//", 1)[0] + + # Find *all* semicolons. We need to treat first and last entry + # specially. + statements = line.split(";") + + # We must prepend statement_buffer to the first statement + first_statement = "%s %s" % ( + statement_buffer.strip(), + statements[0].strip() + ) + statements[0] = first_statement + + # Every entry, except the last, is a full statement + for statement in statements[:-1]: + yield statement.strip() + + # The last entry did *not* end in a semicolon, so we store it for the + # next semicolon we find + statement_buffer = statements[-1].strip() + + +def executescript(txn, schema_path): + with open(schema_path, 'r') as f: + for statement in get_statements(f): + txn.execute(statement) + + +def _get_or_create_schema_state(txn, database_engine): + # Bluntly try creating the schema_version tables. schema_path = os.path.join( dir_path, "schema", "schema_version.sql", ) - create_schema = read_schema(schema_path) - txn.executescript(create_schema) + executescript(txn, schema_path) txn.execute("SELECT version, upgraded FROM schema_version") row = txn.fetchone() @@ -382,10 +445,13 @@ def _get_or_create_schema_state(txn): if current_version: txn.execute( - "SELECT file FROM applied_schema_deltas WHERE version >= ?", + database_engine.convert_param_style( + "SELECT file FROM applied_schema_deltas WHERE version >= ?" + ), (current_version,) ) - return current_version, txn.fetchall(), upgraded + applied_deltas = [d for d, in txn.fetchall()] + return current_version, applied_deltas, upgraded return None @@ -417,7 +483,19 @@ def prepare_sqlite3_database(db_conn): if row and row[0]: db_conn.execute( - "INSERT OR REPLACE INTO schema_version (version, upgraded)" + "REPLACE INTO schema_version (version, upgraded)" " VALUES (?,?)", (row[0], False) ) + + +def are_all_users_on_domain(txn, database_engine, domain): + sql = database_engine.convert_param_style( + "SELECT COUNT(*) FROM users WHERE name NOT LIKE ?" + ) + pat = "%:" + domain + txn.execute(sql, (pat,)) + num_not_matching = txn.fetchall()[0][0] + if num_not_matching == 0: + return True + return False diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e3e67d8e0d..6017c2a6e8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -22,6 +22,8 @@ from synapse.util.logcontext import PreserveLoggingContext, LoggingContext from synapse.util.lrucache import LruCache import synapse.metrics +from util.id_generators import IdGenerator, StreamIdGenerator + from twisted.internet import defer from collections import namedtuple, OrderedDict @@ -145,11 +147,12 @@ class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() method.""" - __slots__ = ["txn", "name"] + __slots__ = ["txn", "name", "database_engine"] - def __init__(self, txn, name): + def __init__(self, txn, name, database_engine): object.__setattr__(self, "txn", txn) object.__setattr__(self, "name", name) + object.__setattr__(self, "database_engine", database_engine) def __getattr__(self, name): return getattr(self.txn, name) @@ -161,26 +164,32 @@ class LoggingTransaction(object): # TODO(paul): Maybe use 'info' and 'debug' for values? sql_logger.debug("[SQL] {%s} %s", self.name, sql) - try: - if args and args[0]: - values = args[0] + sql = self.database_engine.convert_param_style(sql) + + if args and args[0]: + args = list(args) + args[0] = [ + self.database_engine.encode_parameter(a) for a in args[0] + ] + try: sql_logger.debug( - "[SQL values] {%s} " + ", ".join(("<%r>",) * len(values)), + "[SQL values] {%s} " + ", ".join(("<%r>",) * len(args[0])), self.name, - *values + *args[0] ) - except: - # Don't let logging failures stop SQL from working - pass + except: + # Don't let logging failures stop SQL from working + pass start = time.time() * 1000 + try: return self.txn.execute( sql, *args, **kwargs ) - except: - logger.exception("[SQL FAIL] {%s}", self.name) - raise + except Exception as e: + logger.debug("[SQL FAIL] {%s} %s", self.name, e) + raise finally: msecs = (time.time() * 1000) - start sql_logger.debug("[SQL time] {%s} %f", self.name, msecs) @@ -245,6 +254,14 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) + self.database_engine = hs.database_engine + + self._stream_id_gen = StreamIdGenerator() + self._transaction_id_gen = IdGenerator("sent_transactions", "id", self) + self._state_groups_id_gen = IdGenerator("state_groups", "id", self) + self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self) + self._pushers_id_gen = IdGenerator("pushers", "id", self) + def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() @@ -281,8 +298,11 @@ class SQLBaseStore(object): start_time = time.time() * 1000 - def inner_func(txn, *args, **kwargs): + def inner_func(conn, *args, **kwargs): with LoggingContext("runInteraction") as context: + if self.database_engine.is_connection_closed(conn): + conn.reconnect() + current_context.copy_to(context) start = time.time() * 1000 txn_id = self._TXN_ID @@ -296,9 +316,48 @@ class SQLBaseStore(object): sql_scheduling_timer.inc_by(time.time() * 1000 - start_time) transaction_logger.debug("[TXN START] {%s}", name) try: - return func(LoggingTransaction(txn, name), *args, **kwargs) - except: - logger.exception("[TXN FAIL] {%s}", name) + i = 0 + N = 5 + while True: + try: + txn = conn.cursor() + return func( + LoggingTransaction(txn, name, self.database_engine), + *args, **kwargs + ) + except self.database_engine.module.OperationalError as e: + # This can happen if the database disappears mid + # transaction. + logger.warn( + "[TXN OPERROR] {%s} %s %d/%d", + name, e, i, N + ) + if i < N: + i += 1 + try: + conn.rollback() + except self.database_engine.module.Error as e1: + logger.warn( + "[TXN EROLL] {%s} %s", + name, e1, + ) + continue + except self.database_engine.module.DatabaseError as e: + if self.database_engine.is_deadlock(e): + logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N) + if i < N: + i += 1 + try: + conn.rollback() + except self.database_engine.module.Error as e1: + logger.warn( + "[TXN EROLL] {%s} %s", + name, e1, + ) + continue + raise + except Exception as e: + logger.debug("[TXN FAIL] {%s} %s", name, e) raise finally: end = time.time() * 1000 @@ -311,7 +370,7 @@ class SQLBaseStore(object): sql_txn_timer.inc_by(duration, desc) with PreserveLoggingContext(): - result = yield self._db_pool.runInteraction( + result = yield self._db_pool.runWithConnection( inner_func, *args, **kwargs ) defer.returnValue(result) @@ -342,11 +401,11 @@ class SQLBaseStore(object): The result of decoder(results) """ def interaction(txn): - cursor = txn.execute(query, args) + txn.execute(query, args) if decoder: - return decoder(cursor) + return decoder(txn) else: - return cursor.fetchall() + return txn.fetchall() return self.runInteraction(desc, interaction) @@ -356,27 +415,29 @@ class SQLBaseStore(object): # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. - def _simple_insert(self, table, values, or_replace=False, or_ignore=False, + @defer.inlineCallbacks + def _simple_insert(self, table, values, or_ignore=False, desc="_simple_insert"): """Executes an INSERT query on the named table. Args: table : string giving the table name values : dict of new column names and values for them - or_replace : bool; if True performs an INSERT OR REPLACE """ - return self.runInteraction( - desc, - self._simple_insert_txn, table, values, or_replace=or_replace, - or_ignore=or_ignore, - ) + try: + yield self.runInteraction( + desc, + self._simple_insert_txn, table, values, + ) + except self.database_engine.module.IntegrityError: + # We have to do or_ignore flag at this layer, since we can't reuse + # a cursor after we receive an error from the db. + if not or_ignore: + raise @log_function - def _simple_insert_txn(self, txn, table, values, or_replace=False, - or_ignore=False): - sql = "%s INTO %s (%s) VALUES(%s)" % ( - ("INSERT OR REPLACE" if or_replace else - "INSERT OR IGNORE" if or_ignore else "INSERT"), + def _simple_insert_txn(self, txn, table, values): + sql = "INSERT INTO %s (%s) VALUES(%s)" % ( table, ", ".join(k for k in values), ", ".join("?" for k in values) @@ -388,22 +449,26 @@ class SQLBaseStore(object): ) txn.execute(sql, values.values()) - return txn.lastrowid - def _simple_upsert(self, table, keyvalues, values, desc="_simple_upsert"): + def _simple_upsert(self, table, keyvalues, values, + insertion_values={}, desc="_simple_upsert"): """ Args: table (str): The table to upsert into keyvalues (dict): The unique key tables and their new values values (dict): The nonunique columns and their new values + insertion_values (dict): key/values to use when inserting Returns: A deferred """ return self.runInteraction( desc, - self._simple_upsert_txn, table, keyvalues, values + self._simple_upsert_txn, table, keyvalues, values, insertion_values, ) - def _simple_upsert_txn(self, txn, table, keyvalues, values): + def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}): + # We need to lock the table :( + self.database_engine.lock_table(txn, table) + # Try to update sql = "UPDATE %s SET %s WHERE %s" % ( table, @@ -422,6 +487,7 @@ class SQLBaseStore(object): allvalues = {} allvalues.update(keyvalues) allvalues.update(values) + allvalues.update(insertion_values) sql = "INSERT INTO %s (%s) VALUES (%s)" % ( table, @@ -489,8 +555,7 @@ class SQLBaseStore(object): def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol): sql = ( - "SELECT %(retcol)s FROM %(table)s WHERE %(where)s " - "ORDER BY rowid asc" + "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" ) % { "retcol": retcol, "table": table, @@ -548,14 +613,14 @@ class SQLBaseStore(object): retcols : list of strings giving the names of the columns to return """ if keyvalues: - sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( + sql = "SELECT %s FROM %s WHERE %s" % ( ", ".join(retcols), table, " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) txn.execute(sql, keyvalues.values()) else: - sql = "SELECT %s FROM %s ORDER BY rowid asc" % ( + sql = "SELECT %s FROM %s" % ( ", ".join(retcols), table ) @@ -607,10 +672,10 @@ class SQLBaseStore(object): def _simple_select_one_txn(self, txn, table, keyvalues, retcols, allow_none=False): - select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( + select_sql = "SELECT %s FROM %s WHERE %s" % ( ", ".join(retcols), table, - " AND ".join("%s = ?" % (k) for k in keyvalues) + " AND ".join("%s = ?" % (k,) for k in keyvalues) ) txn.execute(select_sql, keyvalues.values()) @@ -648,6 +713,11 @@ class SQLBaseStore(object): updatevalues=updatevalues, ) + # if txn.rowcount == 0: + # raise StoreError(404, "No row found") + if txn.rowcount > 1: + raise StoreError(500, "More than one row matched") + return ret return self.runInteraction(desc, func) @@ -860,6 +930,12 @@ class SQLBaseStore(object): result = txn.fetchone() return result[0] if result else None + def get_next_stream_id(self): + with self._next_stream_id_lock: + i = self._next_stream_id + self._next_stream_id += 1 + return i + class _RollbackButIsFineException(Exception): """ This exception is used to rollback a transaction without implying @@ -883,7 +959,7 @@ class Table(object): _select_where_clause = "SELECT %s FROM %s WHERE %s" _select_clause = "SELECT %s FROM %s" - _insert_clause = "INSERT OR REPLACE INTO %s (%s) VALUES (%s)" + _insert_clause = "REPLACE INTO %s (%s) VALUES (%s)" @classmethod def select_statement(cls, where_clause=None): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index f8cbb3f323..63d1af4e86 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -366,11 +366,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore): new_txn_id = max(highest_txn_id, last_txn_id) + 1 # Insert new txn into txn table - event_ids = [e.event_id for e in events] + event_ids = json.dumps([e.event_id for e in events]) txn.execute( "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " "VALUES(?,?,?)", - (service.id, new_txn_id, json.dumps(event_ids)) + (service.id, new_txn_id, event_ids) ) return AppServiceTransaction( service=service, id=new_txn_id, events=events diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 0199539fea..2b2bdf8615 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -21,8 +21,6 @@ from twisted.internet import defer from collections import namedtuple -import sqlite3 - RoomAliasMapping = namedtuple( "RoomAliasMapping", @@ -91,7 +89,7 @@ class DirectoryStore(SQLBaseStore): }, desc="create_room_alias_association", ) - except sqlite3.IntegrityError: + except self.database_engine.module.IntegrityError: raise SynapseError( 409, "Room alias %s already exists" % room_alias.to_string() ) @@ -120,12 +118,12 @@ class DirectoryStore(SQLBaseStore): defer.returnValue(room_id) def _delete_room_alias_txn(self, txn, room_alias): - cursor = txn.execute( + txn.execute( "SELECT room_id FROM room_aliases WHERE room_alias = ?", (room_alias.to_string(),) ) - res = cursor.fetchone() + res = txn.fetchone() if res: room_id = res[0] else: diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py new file mode 100644 index 0000000000..eb76df7f01 --- /dev/null +++ b/synapse/storage/engines/__init__.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .postgres import PostgresEngine +from .sqlite3 import Sqlite3Engine + +import importlib + + +SUPPORTED_MODULE = { + "sqlite3": Sqlite3Engine, + "psycopg2": PostgresEngine, +} + + +def create_engine(name): + engine_class = SUPPORTED_MODULE.get(name, None) + + if engine_class: + module = importlib.import_module(name) + return engine_class(module) + + raise RuntimeError( + "Unsupported database engine '%s'" % (name,) + ) diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py new file mode 100644 index 0000000000..b8cca9b187 --- /dev/null +++ b/synapse/storage/engines/postgres.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage import prepare_database + + +class PostgresEngine(object): + def __init__(self, database_module): + self.module = database_module + self.module.extensions.register_type(self.module.extensions.UNICODE) + + def convert_param_style(self, sql): + return sql.replace("?", "%s") + + def encode_parameter(self, param): + return param + + def on_new_connection(self, db_conn): + db_conn.set_isolation_level( + self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ + ) + + def prepare_database(self, db_conn): + prepare_database(db_conn, self) + + def is_deadlock(self, error): + if isinstance(error, self.module.DatabaseError): + return error.pgcode in ["40001", "40P01"] + return False + + def is_connection_closed(self, conn): + return bool(conn) + + def lock_table(self, txn, table): + txn.execute("LOCK TABLE %s in EXCLUSIVE MODE" % (table,)) diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py new file mode 100644 index 0000000000..f62d5d1205 --- /dev/null +++ b/synapse/storage/engines/sqlite3.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage import prepare_database, prepare_sqlite3_database + + +class Sqlite3Engine(object): + def __init__(self, database_module): + self.module = database_module + + def convert_param_style(self, sql): + return sql + + def encode_parameter(self, param): + return param + + def on_new_connection(self, db_conn): + self.prepare_database(db_conn) + + def prepare_database(self, db_conn): + prepare_sqlite3_database(db_conn) + prepare_database(db_conn, self) + + def is_deadlock(self, error): + return False + + def is_connection_closed(self, conn): + return False + + def lock_table(self, txn, table): + return diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 032334bfd6..54a3c9d805 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -153,7 +153,7 @@ class EventFederationStore(SQLBaseStore): results = self._get_prev_events_and_state( txn, event_id, - is_state=1, + is_state=True, ) return [(e_id, h, ) for e_id, h, _ in results] @@ -164,7 +164,7 @@ class EventFederationStore(SQLBaseStore): } if is_state is not None: - keyvalues["is_state"] = is_state + keyvalues["is_state"] = bool(is_state) res = self._simple_select_list_txn( txn, @@ -242,7 +242,6 @@ class EventFederationStore(SQLBaseStore): "room_id": room_id, "min_depth": depth, }, - or_replace=True, ) def _handle_prev_events(self, txn, outlier, event_id, prev_events, @@ -260,9 +259,8 @@ class EventFederationStore(SQLBaseStore): "event_id": event_id, "prev_event_id": e_id, "room_id": room_id, - "is_state": 0, + "is_state": False, }, - or_ignore=True, ) # Update the extremities table if this is not an outlier. @@ -281,19 +279,19 @@ class EventFederationStore(SQLBaseStore): # We only insert as a forward extremity the new event if there are # no other events that reference it as a prev event query = ( - "INSERT OR IGNORE INTO %(table)s (event_id, room_id) " - "SELECT ?, ? WHERE NOT EXISTS (" - "SELECT 1 FROM %(event_edges)s WHERE " - "prev_event_id = ? " - ")" - ) % { - "table": "event_forward_extremities", - "event_edges": "event_edges", - } + "SELECT 1 FROM event_edges WHERE prev_event_id = ?" + ) - logger.debug("query: %s", query) + txn.execute(query, (event_id,)) - txn.execute(query, (event_id, room_id, event_id)) + if not txn.fetchone(): + query = ( + "INSERT INTO event_forward_extremities" + " (event_id, room_id)" + " VALUES (?, ?)" + ) + + txn.execute(query, (event_id, room_id)) # Insert all the prev_events as a backwards thing, they'll get # deleted in a second if they're incorrect anyway. @@ -306,7 +304,6 @@ class EventFederationStore(SQLBaseStore): "event_id": e_id, "room_id": room_id, }, - or_ignore=True, ) # Also delete from the backwards extremities table all ones that @@ -400,7 +397,7 @@ class EventFederationStore(SQLBaseStore): query = ( "SELECT prev_event_id FROM event_edges " - "WHERE room_id = ? AND event_id = ? AND is_state = 0 " + "WHERE room_id = ? AND event_id = ? AND is_state = ? " "LIMIT ?" ) @@ -409,7 +406,7 @@ class EventFederationStore(SQLBaseStore): for event_id in front: txn.execute( query, - (room_id, event_id, limit - len(event_results)) + (room_id, event_id, False, limit - len(event_results)) ) for e_id, in txn.fetchall(): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2425f57f5f..a3c260ddc4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -52,7 +52,6 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) - self.get_room_events_max_id.invalidate() except _RollbackButIsFineException: pass @@ -96,12 +95,22 @@ class EventsStore(SQLBaseStore): # Remove the any existing cache entries for the event_id self._invalidate_get_event_cache(event.event_id) + if stream_ordering is None: + with self._stream_id_gen.get_next_txn(txn) as stream_ordering: + return self._persist_event_txn( + txn, event, context, backfilled, + stream_ordering=stream_ordering, + is_new_state=is_new_state, + current_state=current_state, + ) + # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) + self._simple_delete_txn( + txn, + table="current_state_events", + keyvalues={"room_id": event.room_id}, ) for s in current_state: @@ -114,7 +123,6 @@ class EventsStore(SQLBaseStore): "type": s.type, "state_key": s.state_key, }, - or_replace=True, ) if event.is_state() and is_new_state: @@ -128,7 +136,6 @@ class EventsStore(SQLBaseStore): "type": event.type, "state_key": event.state_key, }, - or_replace=True, ) for prev_state_id, _ in event.prev_state: @@ -151,14 +158,6 @@ class EventsStore(SQLBaseStore): event.depth ) - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, - ) - have_persisted = self._simple_select_one_onecol_txn( txn, table="event_json", @@ -169,7 +168,7 @@ class EventsStore(SQLBaseStore): metadata_json = encode_canonical_json( event.internal_metadata.get_dict() - ) + ).decode("UTF-8") # If we have already persisted this event, we don't need to do any # more processing. @@ -185,23 +184,29 @@ class EventsStore(SQLBaseStore): ) txn.execute( sql, - (metadata_json.decode("UTF-8"), event.event_id,) + (metadata_json, event.event_id,) ) sql = ( - "UPDATE events SET outlier = 0" + "UPDATE events SET outlier = ?" " WHERE event_id = ?" ) txn.execute( sql, - (event.event_id,) + (False, event.event_id,) ) return + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + if event.type == EventTypes.Member: self._store_room_member_txn(txn, event) - elif event.type == EventTypes.Feedback: - self._store_feedback_txn(txn, event) elif event.type == EventTypes.Name: self._store_room_name_txn(txn, event) elif event.type == EventTypes.Topic: @@ -224,10 +229,9 @@ class EventsStore(SQLBaseStore): values={ "event_id": event.event_id, "room_id": event.room_id, - "internal_metadata": metadata_json.decode("UTF-8"), + "internal_metadata": metadata_json, "json": encode_canonical_json(event_dict).decode("UTF-8"), }, - or_replace=True, ) content = encode_canonical_json( @@ -245,9 +249,6 @@ class EventsStore(SQLBaseStore): "depth": event.depth, } - if stream_ordering is not None: - vals["stream_ordering"] = stream_ordering - unrec = { k: v for k, v in event.get_dict().items() @@ -264,25 +265,53 @@ class EventsStore(SQLBaseStore): unrec ).decode("UTF-8") - try: - self._simple_insert_txn( - txn, - "events", - vals, - or_replace=(not outlier), - or_ignore=bool(outlier), - ) - except: - logger.warn( - "Failed to persist, probably duplicate: %s", - event.event_id, - exc_info=True, + sql = ( + "INSERT INTO events" + " (stream_ordering, topological_ordering, event_id, type," + " room_id, content, processed, outlier, depth)" + " VALUES (?,?,?,?,?,?,?,?,?)" + ) + + txn.execute( + sql, + ( + stream_ordering, event.depth, event.event_id, event.type, + event.room_id, content, True, outlier, event.depth ) - raise _RollbackButIsFineException("_persist_event") + ) if context.rejected: self._store_rejections_txn(txn, event.event_id, context.rejected) + for hash_alg, hash_base64 in event.hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_event_content_hash_txn( + txn, event.event_id, hash_alg, hash_bytes, + ) + + for prev_event_id, prev_hashes in event.prev_events: + for alg, hash_base64 in prev_hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_prev_event_hash_txn( + txn, event.event_id, prev_event_id, alg, hash_bytes + ) + + for auth_id, _ in event.auth_events: + self._simple_insert_txn( + txn, + table="event_auth", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "auth_id": auth_id, + }, + ) + + (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) + self._store_event_reference_hash_txn( + txn, event.event_id, ref_alg, ref_hash_bytes + ) + if event.is_state(): vals = { "event_id": event.event_id, @@ -301,18 +330,6 @@ class EventsStore(SQLBaseStore): vals, ) - if is_new_state and not context.rejected: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - ) - for e_id, h in event.prev_state: self._simple_insert_txn( txn, @@ -321,39 +338,24 @@ class EventsStore(SQLBaseStore): "event_id": event.event_id, "prev_event_id": e_id, "room_id": event.room_id, - "is_state": 1, + "is_state": True, }, ) - for hash_alg, hash_base64 in event.hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_event_content_hash_txn( - txn, event.event_id, hash_alg, hash_bytes, - ) - - for prev_event_id, prev_hashes in event.prev_events: - for alg, hash_base64 in prev_hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_prev_event_hash_txn( - txn, event.event_id, prev_event_id, alg, hash_bytes + if is_new_state and not context.rejected: + self._simple_upsert_txn( + txn, + "current_state_events", + keyvalues={ + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + values={ + "event_id": event.event_id, + } ) - for auth_id, _ in event.auth_events: - self._simple_insert_txn( - txn, - table="event_auth", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "auth_id": auth_id, - }, - ) - - (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) - self._store_event_reference_hash_txn( - txn, event.event_id, ref_alg, ref_hash_bytes - ) - def _store_redaction(self, txn, event): # invalidate the cache for the redacted event self._invalidate_get_event_cache(event.redacts) diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 22b158d71e..cbe9339ccf 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -57,16 +57,18 @@ class KeyStore(SQLBaseStore): OpenSSL.crypto.FILETYPE_ASN1, tls_certificate ) fingerprint = hashlib.sha256(tls_certificate_bytes).hexdigest() - return self._simple_insert( + return self._simple_upsert( table="server_tls_certificates", - values={ + keyvalues={ "server_name": server_name, "fingerprint": fingerprint, + }, + values={ "from_server": from_server, "ts_added_ms": time_now_ms, "tls_certificate": buffer(tls_certificate_bytes), }, - or_ignore=True, + desc="store_server_certificate", ) @defer.inlineCallbacks @@ -107,16 +109,18 @@ class KeyStore(SQLBaseStore): ts_now_ms (int): The time now in milliseconds verification_key (VerifyKey): The NACL verify key. """ - return self._simple_insert( + return self._simple_upsert( table="server_signature_keys", - values={ + keyvalues={ "server_name": server_name, "key_id": "%s:%s" % (verify_key.alg, verify_key.version), + }, + values={ "from_server": from_server, "ts_added_ms": time_now_ms, "verify_key": buffer(verify_key.encode()), }, - or_ignore=True, + desc="store_server_verify_key", ) def store_server_keys_json(self, server_name, key_id, from_server, diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 87fba55439..22ec94bc16 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -57,6 +57,7 @@ class PresenceStore(SQLBaseStore): values={"observed_user_id": observed_localpart, "observer_user_id": observer_userid}, desc="allow_presence_visible", + or_ignore=True, ) def disallow_presence_visible(self, observed_localpart, observer_userid): diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index c47bdc2861..ee7718d5ed 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -154,7 +154,7 @@ class PushRuleStore(SQLBaseStore): txn.execute(sql, (user_name, priority_class, new_rule_priority)) # now insert the new rule - sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" (" + sql = "INSERT INTO "+PushRuleTable.table_name+" (" sql += ",".join(new_rule.keys())+") VALUES (" sql += ", ".join(["?" for _ in new_rule.keys()])+")" @@ -183,7 +183,7 @@ class PushRuleStore(SQLBaseStore): new_rule['priority_class'] = priority_class new_rule['priority'] = new_prio - sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" (" + sql = "INSERT INTO "+PushRuleTable.table_name+" (" sql += ",".join(new_rule.keys())+") VALUES (" sql += ", ".join(["?" for _ in new_rule.keys()])+")" diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 000502b4ff..2582a1da66 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -13,13 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections - from ._base import SQLBaseStore, Table from twisted.internet import defer from synapse.api.errors import StoreError +from syutil.jsonutil import encode_canonical_json + import logging logger = logging.getLogger(__name__) @@ -27,93 +27,55 @@ logger = logging.getLogger(__name__) class PusherStore(SQLBaseStore): @defer.inlineCallbacks - def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey): + def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey): sql = ( - "SELECT id, user_name, kind, profile_tag, app_id," - "app_display_name, device_display_name, pushkey, ts, data, " - "last_token, last_success, failing_since " - "FROM pushers " + "SELECT * FROM pushers " "WHERE app_id = ? AND pushkey = ?" ) - rows = yield self._execute( - "get_pushers_by_app_id_and_pushkey", None, sql, - app_id_and_pushkey[0], app_id_and_pushkey[1] + rows = yield self._execute_and_decode( + "get_pushers_by_app_id_and_pushkey", + sql, + app_id, pushkey ) - ret = [ - { - "id": r[0], - "user_name": r[1], - "kind": r[2], - "profile_tag": r[3], - "app_id": r[4], - "app_display_name": r[5], - "device_display_name": r[6], - "pushkey": r[7], - "pushkey_ts": r[8], - "data": r[9], - "last_token": r[10], - "last_success": r[11], - "failing_since": r[12] - } - for r in rows - ] - - defer.returnValue(ret[0]) + defer.returnValue(rows) @defer.inlineCallbacks def get_all_pushers(self): sql = ( - "SELECT id, user_name, kind, profile_tag, app_id," - "app_display_name, device_display_name, pushkey, ts, data, " - "last_token, last_success, failing_since " - "FROM pushers" + "SELECT * FROM pushers" ) - rows = yield self._execute("get_all_pushers", None, sql) - - ret = [ - { - "id": r[0], - "user_name": r[1], - "kind": r[2], - "profile_tag": r[3], - "app_id": r[4], - "app_display_name": r[5], - "device_display_name": r[6], - "pushkey": r[7], - "pushkey_ts": r[8], - "data": r[9], - "last_token": r[10], - "last_success": r[11], - "failing_since": r[12] - } - for r in rows - ] - - defer.returnValue(ret) + rows = yield self._execute_and_decode("get_all_pushers", sql) + + defer.returnValue(rows) @defer.inlineCallbacks - def add_pusher(self, user_name, profile_tag, kind, app_id, + def add_pusher(self, user_name, access_token, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, pushkey_ts, lang, data): try: + next_id = yield self._pushers_id_gen.get_next() yield self._simple_upsert( PushersTable.table_name, dict( app_id=app_id, pushkey=pushkey, + user_name=user_name, ), dict( - user_name=user_name, + access_token=access_token, kind=kind, profile_tag=profile_tag, app_display_name=app_display_name, device_display_name=device_display_name, ts=pushkey_ts, lang=lang, - data=data + data=encode_canonical_json(data).decode("UTF-8"), + ), + insertion_values=dict( + id=next_id, ), desc="add_pusher", ) @@ -122,37 +84,38 @@ class PusherStore(SQLBaseStore): raise StoreError(500, "Problem creating pusher.") @defer.inlineCallbacks - def delete_pusher_by_app_id_pushkey(self, app_id, pushkey): + def delete_pusher_by_app_id_pushkey_user_name(self, app_id, pushkey, user_name): yield self._simple_delete_one( PushersTable.table_name, - {"app_id": app_id, "pushkey": pushkey}, - desc="delete_pusher_by_app_id_pushkey", + {"app_id": app_id, "pushkey": pushkey, 'user_name': user_name}, + desc="delete_pusher_by_app_id_pushkey_user_name", ) @defer.inlineCallbacks - def update_pusher_last_token(self, app_id, pushkey, last_token): + def update_pusher_last_token(self, app_id, pushkey, user_name, last_token): yield self._simple_update_one( PushersTable.table_name, - {'app_id': app_id, 'pushkey': pushkey}, + {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name}, {'last_token': last_token}, desc="update_pusher_last_token", ) @defer.inlineCallbacks - def update_pusher_last_token_and_success(self, app_id, pushkey, + def update_pusher_last_token_and_success(self, app_id, pushkey, user_name, last_token, last_success): yield self._simple_update_one( PushersTable.table_name, - {'app_id': app_id, 'pushkey': pushkey}, + {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name}, {'last_token': last_token, 'last_success': last_success}, desc="update_pusher_last_token_and_success", ) @defer.inlineCallbacks - def update_pusher_failing_since(self, app_id, pushkey, failing_since): + def update_pusher_failing_since(self, app_id, pushkey, user_name, + failing_since): yield self._simple_update_one( PushersTable.table_name, - {'app_id': app_id, 'pushkey': pushkey}, + {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name}, {'failing_since': failing_since}, desc="update_pusher_failing_since", ) @@ -160,21 +123,3 @@ class PusherStore(SQLBaseStore): class PushersTable(Table): table_name = "pushers" - - fields = [ - "id", - "user_name", - "kind", - "profile_tag", - "app_id", - "app_display_name", - "device_display_name", - "pushkey", - "pushkey_ts", - "data", - "last_token", - "last_success", - "failing_since" - ] - - EntryType = collections.namedtuple("PusherEntry", fields) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index f24154f146..a986c4816e 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -15,8 +15,6 @@ from twisted.internet import defer -from sqlite3 import IntegrityError - from synapse.api.errors import StoreError, Codes from ._base import SQLBaseStore, cached @@ -39,17 +37,13 @@ class RegistrationStore(SQLBaseStore): Raises: StoreError if there was a problem adding this. """ - row = yield self._simple_select_one( - "users", {"name": user_id}, ["id"], - desc="add_access_token_to_user", - ) - if not row: - raise StoreError(400, "Bad user ID supplied.") - row_id = row["id"] + next_id = yield self._access_tokens_id_gen.get_next() + yield self._simple_insert( "access_tokens", { - "user_id": row_id, + "id": next_id, + "user_id": user_id, "token": token }, desc="add_access_token_to_user", @@ -74,32 +68,71 @@ class RegistrationStore(SQLBaseStore): def _register(self, txn, user_id, token, password_hash): now = int(self.clock.time()) + next_id = self._access_tokens_id_gen.get_next_txn(txn) + try: txn.execute("INSERT INTO users(name, password_hash, creation_ts) " "VALUES (?,?,?)", [user_id, password_hash, now]) - except IntegrityError: + except self.database_engine.module.IntegrityError: raise StoreError( 400, "User ID already taken.", errcode=Codes.USER_IN_USE ) # it's possible for this to get a conflict, but only for a single user # since tokens are namespaced based on their user ID - txn.execute("INSERT INTO access_tokens(user_id, token) " + - "VALUES (?,?)", [txn.lastrowid, token]) + txn.execute( + "INSERT INTO access_tokens(id, user_id, token)" + " VALUES (?,?,?)", + (next_id, user_id, token,) + ) def get_user_by_id(self, user_id): - query = ("SELECT users.name, users.password_hash FROM users" - " WHERE users.name = ?") - return self._execute( - "get_user_by_id", self.cursor_to_dict, query, user_id + return self._simple_select_one( + table="users", + keyvalues={ + "name": user_id, + }, + retcols=["name", "password_hash"], + allow_none=True, + ) + + @defer.inlineCallbacks + def user_set_password_hash(self, user_id, password_hash): + """ + NB. This does *not* evict any cache because the one use for this + removes most of the entries subsequently anyway so it would be + pointless. Use flush_user separately. + """ + yield self._simple_update_one('users', { + 'name': user_id + }, { + 'password_hash': password_hash + }) + + @defer.inlineCallbacks + def user_delete_access_tokens_apart_from(self, user_id, token_id): + rows = yield self.get_user_by_id(user_id) + if len(rows) == 0: + raise Exception("No such user!") + + yield self._execute( + "delete_access_tokens_apart_from", None, + "DELETE FROM access_tokens WHERE user_id = ? AND id != ?", + rows[0]['id'], token_id + ) + + @defer.inlineCallbacks + def flush_user(self, user_id): + rows = yield self._execute( + 'flush_user', None, + "SELECT token FROM access_tokens WHERE user_id = ?", + user_id ) + for r in rows: + self.get_user_by_token.invalidate(r) @cached() - # TODO(paul): Currently there's no code to invalidate this cache. That - # means if/when we ever add internal ways to invalidate access tokens or - # change whether a user is a server admin, those will need to invoke - # store.get_user_by_token.invalidate(token) def get_user_by_token(self, token): """Get a user from the given access token. @@ -134,13 +167,49 @@ class RegistrationStore(SQLBaseStore): "SELECT users.name, users.admin," " access_tokens.device_id, access_tokens.id as token_id" " FROM users" - " INNER JOIN access_tokens on users.id = access_tokens.user_id" + " INNER JOIN access_tokens on users.name = access_tokens.user_id" " WHERE token = ?" ) - cursor = txn.execute(sql, (token,)) - rows = self.cursor_to_dict(cursor) + txn.execute(sql, (token,)) + rows = self.cursor_to_dict(txn) if rows: return rows[0] - raise StoreError(404, "Token not found.") + return None + + @defer.inlineCallbacks + def user_add_threepid(self, user_id, medium, address, validated_at, added_at): + yield self._simple_upsert("user_threepids", { + "user": user_id, + "medium": medium, + "address": address, + }, { + "validated_at": validated_at, + "added_at": added_at, + }) + + @defer.inlineCallbacks + def user_get_threepids(self, user_id): + ret = yield self._simple_select_list( + "user_threepids", { + "user": user_id + }, + ['medium', 'address', 'validated_at', 'added_at'], + 'user_get_threepids' + ) + defer.returnValue(ret) + + @defer.inlineCallbacks + def get_user_by_threepid(self, medium, address): + ret = yield self._simple_select_one( + "user_threepids", + { + "medium": medium, + "address": address + }, + ['user'], True, 'get_user_by_threepid' + ) + if ret: + defer.returnValue(ret['user']) + defer.returnValue(None) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index be3e28c2ea..48ebb33057 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -72,6 +72,7 @@ class RoomStore(SQLBaseStore): keyvalues={"room_id": room_id}, retcols=RoomsTable.fields, desc="get_room", + allow_none=True, ) @defer.inlineCallbacks @@ -102,24 +103,37 @@ class RoomStore(SQLBaseStore): "ON c.event_id = room_names.event_id " ) - # We use non printing ascii character US () as a seperator + # We use non printing ascii character US (\x1F) as a separator sql = ( - "SELECT r.room_id, n.name, t.topic, " - "group_concat(a.room_alias, '') " - "FROM rooms AS r " - "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id " - "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id " - "INNER JOIN room_aliases AS a ON a.room_id = r.room_id " - "WHERE r.is_public = ? " - "GROUP BY r.room_id " + "SELECT r.room_id, max(n.name), max(t.topic)" + " FROM rooms AS r" + " LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id" + " LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id" + " WHERE r.is_public = ?" + " GROUP BY r.room_id" ) % { "topic": topic_subquery, "name": name_subquery, } - c = txn.execute(sql, (is_public,)) + txn.execute(sql, (is_public,)) - return c.fetchall() + rows = txn.fetchall() + + for i, row in enumerate(rows): + room_id = row[0] + aliases = self._simple_select_onecol_txn( + txn, + table="room_aliases", + keyvalues={ + "room_id": room_id + }, + retcol="room_alias", + ) + + rows[i] = list(row) + [aliases] + + return rows rows = yield self.runInteraction( "get_rooms", f @@ -130,9 +144,10 @@ class RoomStore(SQLBaseStore): "room_id": r[0], "name": r[1], "topic": r[2], - "aliases": r[3].split(""), + "aliases": r[3], } for r in rows + if r[3] # We only return rooms that have at least one alias. ] defer.returnValue(ret) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 52c37c76f5..8ea5756d61 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -40,7 +40,6 @@ class RoomMemberStore(SQLBaseStore): """ try: target_user_id = event.state_key - domain = UserID.from_string(target_user_id).domain except: logger.exception( "Failed to parse target_user_id=%s", target_user_id @@ -65,42 +64,8 @@ class RoomMemberStore(SQLBaseStore): } ) - # Update room hosts table - if event.membership == Membership.JOIN: - sql = ( - "INSERT OR IGNORE INTO room_hosts (room_id, host) " - "VALUES (?, ?)" - ) - txn.execute(sql, (event.room_id, domain)) - elif event.membership != Membership.INVITE: - # Check if this was the last person to have left. - member_events = self._get_members_query_txn( - txn, - where_clause=("c.room_id = ? AND m.membership = ?" - " AND m.user_id != ?"), - where_values=(event.room_id, Membership.JOIN, target_user_id,) - ) - - joined_domains = set() - for e in member_events: - try: - joined_domains.add( - UserID.from_string(e.state_key).domain - ) - except: - # FIXME: How do we deal with invalid user ids in the db? - logger.exception("Invalid user_id: %s", event.state_key) - - if domain not in joined_domains: - sql = ( - "DELETE FROM room_hosts WHERE room_id = ? AND host = ?" - ) - - txn.execute(sql, (event.room_id, domain)) - self.get_rooms_for_user.invalidate(target_user_id) - @defer.inlineCallbacks def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -110,41 +75,27 @@ class RoomMemberStore(SQLBaseStore): Returns: Deferred: Results in a MembershipEvent or None. """ - rows = yield self._get_members_by_dict({ - "e.room_id": room_id, - "m.user_id": user_id, - }) + def f(txn): + events = self._get_members_events_txn( + txn, + room_id, + user_id=user_id, + ) - defer.returnValue(rows[0] if rows else None) + return events[0] if events else None - def _get_room_member(self, txn, user_id, room_id): - sql = ( - "SELECT e.* FROM events as e" - " INNER JOIN room_memberships as m" - " ON e.event_id = m.event_id" - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id" - " WHERE m.user_id = ? and e.room_id = ?" - " LIMIT 1" - ) - txn.execute(sql, (user_id, room_id)) - rows = self.cursor_to_dict(txn) - if rows: - return self._parse_events_txn(txn, rows)[0] - else: - return None + return self.runInteraction("get_room_member", f) def get_users_in_room(self, room_id): def f(txn): - sql = ( - "SELECT m.user_id FROM room_memberships as m" - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id" - " WHERE m.membership = ? AND m.room_id = ?" + + rows = self._get_members_rows_txn( + txn, + room_id=room_id, + membership=Membership.JOIN, ) - txn.execute(sql, (Membership.JOIN, room_id)) - return [r[0] for r in txn.fetchall()] + return [r["user_id"] for r in rows] return self.runInteraction("get_users_in_room", f) def get_room_members(self, room_id, membership=None): @@ -159,11 +110,14 @@ class RoomMemberStore(SQLBaseStore): list of namedtuples representing the members in this room. """ - where = {"m.room_id": room_id} - if membership: - where["m.membership"] = membership + def f(txn): + return self._get_members_events_txn( + txn, + room_id, + membership=membership, + ) - return self._get_members_by_dict(where) + return self.runInteraction("get_room_members", f) def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user @@ -209,32 +163,55 @@ class RoomMemberStore(SQLBaseStore): ] def get_joined_hosts_for_room(self, room_id): - return self._simple_select_onecol( - "room_hosts", - {"room_id": room_id}, - "host", - desc="get_joined_hosts_for_room", + return self.runInteraction( + "get_joined_hosts_for_room", + self._get_joined_hosts_for_room_txn, + room_id, + ) + + def _get_joined_hosts_for_room_txn(self, txn, room_id): + rows = self._get_members_rows_txn( + txn, + room_id, membership=Membership.JOIN + ) + + joined_domains = set( + UserID.from_string(r["user_id"]).domain + for r in rows ) - def _get_members_by_dict(self, where_dict): - clause = " AND ".join("%s = ?" % k for k in where_dict.keys()) - vals = where_dict.values() - return self._get_members_query(clause, vals) + return joined_domains def _get_members_query(self, where_clause, where_values): return self.runInteraction( - "get_members_query", self._get_members_query_txn, + "get_members_query", self._get_members_events_txn, where_clause, where_values ) - def _get_members_query_txn(self, txn, where_clause, where_values): + def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None): + rows = self._get_members_rows_txn( + txn, + room_id, membership, user_id, + ) + return self._get_events_txn(txn, [r["event_id"] for r in rows]) + + def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None): + where_clause = "c.room_id = ?" + where_values = [room_id] + + if membership: + where_clause += " AND m.membership = ?" + where_values.append(membership) + + if user_id: + where_clause += " AND m.user_id = ?" + where_values.append(user_id) + sql = ( - "SELECT e.* FROM events as e " - "INNER JOIN room_memberships as m " - "ON e.event_id = m.event_id " - "INNER JOIN current_state_events as c " - "ON m.event_id = c.event_id " - "WHERE %(where)s " + "SELECT m.* FROM room_memberships as m" + " INNER JOIN current_state_events as c" + " ON m.event_id = c.event_id" + " WHERE %(where)s" ) % { "where": where_clause, } @@ -242,8 +219,7 @@ class RoomMemberStore(SQLBaseStore): txn.execute(sql, where_values) rows = self.cursor_to_dict(txn) - results = self._parse_events_txn(txn, rows) - return results + return rows @cached() def get_rooms_for_user(self, user_id): diff --git a/synapse/storage/schema/delta/12/v12.sql b/synapse/storage/schema/delta/12/v12.sql index b87ef1fe79..878c36260a 100644 --- a/synapse/storage/schema/delta/12/v12.sql +++ b/synapse/storage/schema/delta/12/v12.sql @@ -17,26 +17,25 @@ CREATE TABLE IF NOT EXISTS rejections( event_id TEXT NOT NULL, reason TEXT NOT NULL, last_check TEXT NOT NULL, - CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE + UNIQUE (event_id) ); -- Push notification endpoints that users have configured CREATE TABLE IF NOT EXISTS pushers ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_name TEXT NOT NULL, - profile_tag varchar(32) NOT NULL, - kind varchar(8) NOT NULL, - app_id varchar(64) NOT NULL, - app_display_name varchar(64) NOT NULL, - device_display_name varchar(128) NOT NULL, - pushkey blob NOT NULL, - ts BIGINT NOT NULL, - lang varchar(8), - data blob, + profile_tag VARCHAR(32) NOT NULL, + kind VARCHAR(8) NOT NULL, + app_id VARCHAR(64) NOT NULL, + app_display_name VARCHAR(64) NOT NULL, + device_display_name VARCHAR(128) NOT NULL, + pushkey VARBINARY(512) NOT NULL, + ts BIGINT UNSIGNED NOT NULL, + lang VARCHAR(8), + data LONGBLOB, last_token TEXT, - last_success BIGINT, - failing_since BIGINT, - FOREIGN KEY(user_name) REFERENCES users(name), + last_success BIGINT UNSIGNED, + failing_since BIGINT UNSIGNED, UNIQUE (app_id, pushkey) ); @@ -55,13 +54,10 @@ CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name); CREATE TABLE IF NOT EXISTS user_filters( user_id TEXT, - filter_id INTEGER, - filter_json TEXT, - FOREIGN KEY(user_id) REFERENCES users(id) + filter_id BIGINT UNSIGNED, + filter_json LONGBLOB ); CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters( - user_id, filter_id + user_id, filter_id ); - -PRAGMA user_version = 12; diff --git a/synapse/storage/schema/delta/13/v13.sql b/synapse/storage/schema/delta/13/v13.sql index e491ad5aec..3265924013 100644 --- a/synapse/storage/schema/delta/13/v13.sql +++ b/synapse/storage/schema/delta/13/v13.sql @@ -19,16 +19,13 @@ CREATE TABLE IF NOT EXISTS application_services( token TEXT, hs_token TEXT, sender TEXT, - UNIQUE(token) ON CONFLICT ROLLBACK + UNIQUE(token) ); CREATE TABLE IF NOT EXISTS application_services_regex( id INTEGER PRIMARY KEY AUTOINCREMENT, - as_id INTEGER NOT NULL, + as_id BIGINT UNSIGNED NOT NULL, namespace INTEGER, /* enum[room_id|room_alias|user_id] */ regex TEXT, FOREIGN KEY(as_id) REFERENCES application_services(id) ); - - - diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql index 2b27e2a429..db2e720393 100644 --- a/synapse/storage/schema/delta/15/appservice_txns.sql +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -15,16 +15,17 @@ CREATE TABLE IF NOT EXISTS application_services_state( as_id TEXT PRIMARY KEY, - state TEXT, - last_txn TEXT + state VARCHAR(5), + last_txn INTEGER ); CREATE TABLE IF NOT EXISTS application_services_txns( as_id TEXT NOT NULL, txn_id INTEGER NOT NULL, event_ids TEXT NOT NULL, - UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK + UNIQUE(as_id, txn_id) ); - - +CREATE INDEX IF NOT EXISTS application_services_txns_id ON application_services_txns ( + as_id +); diff --git a/synapse/storage/schema/delta/15/presence_indices.sql b/synapse/storage/schema/delta/15/presence_indices.sql new file mode 100644 index 0000000000..6b8d0f1ca7 --- /dev/null +++ b/synapse/storage/schema/delta/15/presence_indices.sql @@ -0,0 +1,2 @@ + +CREATE INDEX IF NOT EXISTS presence_list_user_id ON presence_list (user_id); diff --git a/synapse/storage/schema/delta/15/v15.sql b/synapse/storage/schema/delta/15/v15.sql new file mode 100644 index 0000000000..f5b2a08ca4 --- /dev/null +++ b/synapse/storage/schema/delta/15/v15.sql @@ -0,0 +1,25 @@ +-- Drop, copy & recreate pushers table to change unique key +-- Also add access_token column at the same time +CREATE TABLE IF NOT EXISTS pushers2 ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_name TEXT NOT NULL, + access_token INTEGER DEFAULT NULL, + profile_tag varchar(32) NOT NULL, + kind varchar(8) NOT NULL, + app_id varchar(64) NOT NULL, + app_display_name varchar(64) NOT NULL, + device_display_name varchar(128) NOT NULL, + pushkey blob NOT NULL, + ts BIGINT NOT NULL, + lang varchar(8), + data blob, + last_token TEXT, + last_success BIGINT, + failing_since BIGINT, + FOREIGN KEY(user_name) REFERENCES users(name), + UNIQUE (app_id, pushkey, user_name) +); +INSERT INTO pushers2 (id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since) + SELECT id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since FROM pushers; +DROP TABLE pushers; +ALTER TABLE pushers2 RENAME TO pushers; diff --git a/synapse/storage/schema/delta/16/events_order_index.sql b/synapse/storage/schema/delta/16/events_order_index.sql new file mode 100644 index 0000000000..a48f215170 --- /dev/null +++ b/synapse/storage/schema/delta/16/events_order_index.sql @@ -0,0 +1,4 @@ +CREATE INDEX events_order ON events (topological_ordering, stream_ordering); +CREATE INDEX events_order_room ON events ( + room_id, topological_ordering, stream_ordering +); diff --git a/synapse/storage/schema/delta/16/remote_media_cache_index.sql b/synapse/storage/schema/delta/16/remote_media_cache_index.sql new file mode 100644 index 0000000000..7a15265cb1 --- /dev/null +++ b/synapse/storage/schema/delta/16/remote_media_cache_index.sql @@ -0,0 +1,2 @@ +CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id + ON remote_media_cache_thumbnails (media_id); \ No newline at end of file diff --git a/synapse/storage/schema/delta/16/remove_duplicates.sql b/synapse/storage/schema/delta/16/remove_duplicates.sql new file mode 100644 index 0000000000..65c97b5e2f --- /dev/null +++ b/synapse/storage/schema/delta/16/remove_duplicates.sql @@ -0,0 +1,9 @@ + + +DELETE FROM event_to_state_groups WHERE state_group not in ( + SELECT MAX(state_group) FROM event_to_state_groups GROUP BY event_id +); + +DELETE FROM event_to_state_groups WHERE rowid not in ( + SELECT MIN(rowid) FROM event_to_state_groups GROUP BY event_id +); diff --git a/synapse/storage/schema/delta/16/room_alias_index.sql b/synapse/storage/schema/delta/16/room_alias_index.sql new file mode 100644 index 0000000000..f82486132b --- /dev/null +++ b/synapse/storage/schema/delta/16/room_alias_index.sql @@ -0,0 +1,3 @@ + +CREATE INDEX IF NOT EXISTS room_aliases_id ON room_aliases(room_id); +CREATE INDEX IF NOT EXISTS room_alias_servers_alias ON room_alias_servers(room_alias); diff --git a/synapse/storage/schema/delta/16/unique_constraints.sql b/synapse/storage/schema/delta/16/unique_constraints.sql new file mode 100644 index 0000000000..fecf11118c --- /dev/null +++ b/synapse/storage/schema/delta/16/unique_constraints.sql @@ -0,0 +1,80 @@ + +-- We can use SQLite features here, since other db support was only added in v16 + +-- +DELETE FROM current_state_events WHERE rowid not in ( + SELECT MIN(rowid) FROM current_state_events GROUP BY event_id +); + +DROP INDEX IF EXISTS current_state_events_event_id; +CREATE UNIQUE INDEX current_state_events_event_id ON current_state_events(event_id); + +-- +DELETE FROM room_memberships WHERE rowid not in ( + SELECT MIN(rowid) FROM room_memberships GROUP BY event_id +); + +DROP INDEX IF EXISTS room_memberships_event_id; +CREATE UNIQUE INDEX room_memberships_event_id ON room_memberships(event_id); + +-- +DELETE FROM feedback WHERE rowid not in ( + SELECT MIN(rowid) FROM feedback GROUP BY event_id +); + +DROP INDEX IF EXISTS feedback_event_id; +CREATE UNIQUE INDEX feedback_event_id ON feedback(event_id); + +-- +DELETE FROM topics WHERE rowid not in ( + SELECT MIN(rowid) FROM topics GROUP BY event_id +); + +DROP INDEX IF EXISTS topics_event_id; +CREATE UNIQUE INDEX topics_event_id ON topics(event_id); + +-- +DELETE FROM room_names WHERE rowid not in ( + SELECT MIN(rowid) FROM room_names GROUP BY event_id +); + +DROP INDEX IF EXISTS room_names_id; +CREATE UNIQUE INDEX room_names_id ON room_names(event_id); + +-- +DELETE FROM presence WHERE rowid not in ( + SELECT MIN(rowid) FROM presence GROUP BY user_id +); + +DROP INDEX IF EXISTS presence_id; +CREATE UNIQUE INDEX presence_id ON presence(user_id); + +-- +DELETE FROM presence_allow_inbound WHERE rowid not in ( + SELECT MIN(rowid) FROM presence_allow_inbound + GROUP BY observed_user_id, observer_user_id +); + +DROP INDEX IF EXISTS presence_allow_inbound_observers; +CREATE UNIQUE INDEX presence_allow_inbound_observers ON presence_allow_inbound( + observed_user_id, observer_user_id +); + +-- +DELETE FROM presence_list WHERE rowid not in ( + SELECT MIN(rowid) FROM presence_list + GROUP BY user_id, observed_user_id +); + +DROP INDEX IF EXISTS presence_list_observers; +CREATE UNIQUE INDEX presence_list_observers ON presence_list( + user_id, observed_user_id +); + +-- +DELETE FROM room_aliases WHERE rowid not in ( + SELECT MIN(rowid) FROM room_aliases GROUP BY room_alias +); + +DROP INDEX IF EXISTS room_aliases_id; +CREATE INDEX room_aliases_id ON room_aliases(room_id); diff --git a/synapse/storage/schema/delta/16/users.sql b/synapse/storage/schema/delta/16/users.sql new file mode 100644 index 0000000000..cd0709250d --- /dev/null +++ b/synapse/storage/schema/delta/16/users.sql @@ -0,0 +1,56 @@ +-- Convert `access_tokens`.user from rowids to user strings. +-- MUST BE DONE BEFORE REMOVING ID COLUMN FROM USERS TABLE BELOW +CREATE TABLE IF NOT EXISTS new_access_tokens( + id BIGINT UNSIGNED PRIMARY KEY, + user_id TEXT NOT NULL, + device_id TEXT, + token TEXT NOT NULL, + last_used BIGINT UNSIGNED, + UNIQUE(token) +); + +INSERT INTO new_access_tokens + SELECT a.id, u.name, a.device_id, a.token, a.last_used + FROM access_tokens as a + INNER JOIN users as u ON u.id = a.user_id; + +DROP TABLE access_tokens; + +ALTER TABLE new_access_tokens RENAME TO access_tokens; + +-- Remove ID column from `users` table +CREATE TABLE IF NOT EXISTS new_users( + name TEXT, + password_hash TEXT, + creation_ts BIGINT UNSIGNED, + admin BOOL DEFAULT 0 NOT NULL, + UNIQUE(name) +); + +INSERT INTO new_users SELECT name, password_hash, creation_ts, admin FROM users; + +DROP TABLE users; + +ALTER TABLE new_users RENAME TO users; + + +-- Remove UNIQUE constraint from `user_ips` table +CREATE TABLE IF NOT EXISTS new_user_ips ( + user_id TEXT NOT NULL, + access_token TEXT NOT NULL, + device_id TEXT, + ip TEXT NOT NULL, + user_agent TEXT NOT NULL, + last_seen BIGINT UNSIGNED NOT NULL +); + +INSERT INTO new_user_ips + SELECT user, access_token, device_id, ip, user_agent, last_seen FROM user_ips; + +DROP TABLE user_ips; + +ALTER TABLE new_user_ips RENAME TO user_ips; + +CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user_id); +CREATE INDEX IF NOT EXISTS user_ips_user_ip ON user_ips(user_id, access_token, ip); + diff --git a/synapse/storage/schema/full_schemas/11/event_edges.sql b/synapse/storage/schema/full_schemas/11/event_edges.sql index 1e766d6db2..f7020f7793 100644 --- a/synapse/storage/schema/full_schemas/11/event_edges.sql +++ b/synapse/storage/schema/full_schemas/11/event_edges.sql @@ -16,52 +16,52 @@ CREATE TABLE IF NOT EXISTS event_forward_extremities( event_id TEXT NOT NULL, room_id TEXT NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE + UNIQUE (event_id, room_id) ); -CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id); -CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id); +CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id); +CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_backward_extremities( event_id TEXT NOT NULL, room_id TEXT NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE + UNIQUE (event_id, room_id) ); -CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id); -CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id); +CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id); +CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_edges( event_id TEXT NOT NULL, prev_event_id TEXT NOT NULL, room_id TEXT NOT NULL, - is_state INTEGER NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id, is_state) + is_state BOOL NOT NULL, + UNIQUE (event_id, prev_event_id, room_id, is_state) ); -CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id); -CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id); +CREATE INDEX ev_edges_id ON event_edges(event_id); +CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id); CREATE TABLE IF NOT EXISTS room_depth( room_id TEXT NOT NULL, min_depth INTEGER NOT NULL, - CONSTRAINT uniqueness UNIQUE (room_id) + UNIQUE (room_id) ); -CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id); +CREATE INDEX room_depth_room ON room_depth(room_id); create TABLE IF NOT EXISTS event_destinations( event_id TEXT NOT NULL, destination TEXT NOT NULL, - delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered - CONSTRAINT uniqueness UNIQUE (event_id, destination) ON CONFLICT REPLACE + delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered + UNIQUE (event_id, destination) ); -CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id); +CREATE INDEX event_destinations_id ON event_destinations(event_id); CREATE TABLE IF NOT EXISTS state_forward_extremities( @@ -69,21 +69,21 @@ CREATE TABLE IF NOT EXISTS state_forward_extremities( room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE + UNIQUE (event_id, room_id) ); -CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities( +CREATE INDEX st_extrem_keys ON state_forward_extremities( room_id, type, state_key ); -CREATE INDEX IF NOT EXISTS st_extrem_id ON state_forward_extremities(event_id); +CREATE INDEX st_extrem_id ON state_forward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_auth( event_id TEXT NOT NULL, auth_id TEXT NOT NULL, room_id TEXT NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, auth_id, room_id) + UNIQUE (event_id, auth_id, room_id) ); -CREATE INDEX IF NOT EXISTS evauth_edges_id ON event_auth(event_id); -CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id); \ No newline at end of file +CREATE INDEX evauth_edges_id ON event_auth(event_id); +CREATE INDEX evauth_edges_auth_id ON event_auth(auth_id); diff --git a/synapse/storage/schema/full_schemas/11/event_signatures.sql b/synapse/storage/schema/full_schemas/11/event_signatures.sql index c28c39c48a..636b2d3353 100644 --- a/synapse/storage/schema/full_schemas/11/event_signatures.sql +++ b/synapse/storage/schema/full_schemas/11/event_signatures.sql @@ -16,50 +16,40 @@ CREATE TABLE IF NOT EXISTS event_content_hashes ( event_id TEXT, algorithm TEXT, - hash BLOB, - CONSTRAINT uniqueness UNIQUE (event_id, algorithm) + hash bytea, + UNIQUE (event_id, algorithm) ); -CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes( - event_id -); +CREATE INDEX event_content_hashes_id ON event_content_hashes(event_id); CREATE TABLE IF NOT EXISTS event_reference_hashes ( event_id TEXT, algorithm TEXT, - hash BLOB, - CONSTRAINT uniqueness UNIQUE (event_id, algorithm) + hash bytea, + UNIQUE (event_id, algorithm) ); -CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes ( - event_id -); +CREATE INDEX event_reference_hashes_id ON event_reference_hashes(event_id); CREATE TABLE IF NOT EXISTS event_signatures ( event_id TEXT, signature_name TEXT, key_id TEXT, - signature BLOB, - CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id) + signature bytea, + UNIQUE (event_id, signature_name, key_id) ); -CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures ( - event_id -); +CREATE INDEX event_signatures_id ON event_signatures(event_id); CREATE TABLE IF NOT EXISTS event_edge_hashes( event_id TEXT, prev_event_id TEXT, algorithm TEXT, - hash BLOB, - CONSTRAINT uniqueness UNIQUE ( - event_id, prev_event_id, algorithm - ) + hash bytea, + UNIQUE (event_id, prev_event_id, algorithm) ); -CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes( - event_id -); +CREATE INDEX event_edge_hashes_id ON event_edge_hashes(event_id); diff --git a/synapse/storage/schema/full_schemas/11/im.sql b/synapse/storage/schema/full_schemas/11/im.sql index dd00c1cd2f..1901654ac2 100644 --- a/synapse/storage/schema/full_schemas/11/im.sql +++ b/synapse/storage/schema/full_schemas/11/im.sql @@ -15,7 +15,7 @@ CREATE TABLE IF NOT EXISTS events( stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT, - topological_ordering INTEGER NOT NULL, + topological_ordering BIGINT NOT NULL, event_id TEXT NOT NULL, type TEXT NOT NULL, room_id TEXT NOT NULL, @@ -23,26 +23,24 @@ CREATE TABLE IF NOT EXISTS events( unrecognized_keys TEXT, processed BOOL NOT NULL, outlier BOOL NOT NULL, - depth INTEGER DEFAULT 0 NOT NULL, - CONSTRAINT ev_uniq UNIQUE (event_id) + depth BIGINT DEFAULT 0 NOT NULL, + UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS events_event_id ON events (event_id); -CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering); -CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering); -CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id); +CREATE INDEX events_stream_ordering ON events (stream_ordering); +CREATE INDEX events_topological_ordering ON events (topological_ordering); +CREATE INDEX events_room_id ON events (room_id); CREATE TABLE IF NOT EXISTS event_json( event_id TEXT NOT NULL, room_id TEXT NOT NULL, - internal_metadata NOT NULL, - json BLOB NOT NULL, - CONSTRAINT ev_j_uniq UNIQUE (event_id) + internal_metadata TEXT NOT NULL, + json TEXT NOT NULL, + UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS event_json_id ON event_json(event_id); -CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id); +CREATE INDEX event_json_room_id ON event_json(room_id); CREATE TABLE IF NOT EXISTS state_events( @@ -50,13 +48,13 @@ CREATE TABLE IF NOT EXISTS state_events( room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, - prev_state TEXT + prev_state TEXT, + UNIQUE (event_id) ); -CREATE UNIQUE INDEX IF NOT EXISTS state_events_event_id ON state_events (event_id); -CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id); -CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type); -CREATE INDEX IF NOT EXISTS state_events_state_key ON state_events (state_key); +CREATE INDEX state_events_room_id ON state_events (room_id); +CREATE INDEX state_events_type ON state_events (type); +CREATE INDEX state_events_state_key ON state_events (state_key); CREATE TABLE IF NOT EXISTS current_state_events( @@ -64,13 +62,13 @@ CREATE TABLE IF NOT EXISTS current_state_events( room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, - CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE + UNIQUE (room_id, type, state_key) ); -CREATE INDEX IF NOT EXISTS curr_events_event_id ON current_state_events (event_id); -CREATE INDEX IF NOT EXISTS current_state_events_room_id ON current_state_events (room_id); -CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (type); -CREATE INDEX IF NOT EXISTS current_state_events_state_key ON current_state_events (state_key); +CREATE INDEX curr_events_event_id ON current_state_events (event_id); +CREATE INDEX current_state_events_room_id ON current_state_events (room_id); +CREATE INDEX current_state_events_type ON current_state_events (type); +CREATE INDEX current_state_events_state_key ON current_state_events (state_key); CREATE TABLE IF NOT EXISTS room_memberships( event_id TEXT NOT NULL, @@ -80,9 +78,9 @@ CREATE TABLE IF NOT EXISTS room_memberships( membership TEXT NOT NULL ); -CREATE INDEX IF NOT EXISTS room_memberships_event_id ON room_memberships (event_id); -CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id); -CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id); +CREATE INDEX room_memberships_event_id ON room_memberships (event_id); +CREATE INDEX room_memberships_room_id ON room_memberships (room_id); +CREATE INDEX room_memberships_user_id ON room_memberships (user_id); CREATE TABLE IF NOT EXISTS feedback( event_id TEXT NOT NULL, @@ -98,8 +96,8 @@ CREATE TABLE IF NOT EXISTS topics( topic TEXT NOT NULL ); -CREATE INDEX IF NOT EXISTS topics_event_id ON topics(event_id); -CREATE INDEX IF NOT EXISTS topics_room_id ON topics(room_id); +CREATE INDEX topics_event_id ON topics(event_id); +CREATE INDEX topics_room_id ON topics(room_id); CREATE TABLE IF NOT EXISTS room_names( event_id TEXT NOT NULL, @@ -107,19 +105,19 @@ CREATE TABLE IF NOT EXISTS room_names( name TEXT NOT NULL ); -CREATE INDEX IF NOT EXISTS room_names_event_id ON room_names(event_id); -CREATE INDEX IF NOT EXISTS room_names_room_id ON room_names(room_id); +CREATE INDEX room_names_event_id ON room_names(event_id); +CREATE INDEX room_names_room_id ON room_names(room_id); CREATE TABLE IF NOT EXISTS rooms( room_id TEXT PRIMARY KEY NOT NULL, - is_public INTEGER, + is_public BOOL, creator TEXT ); CREATE TABLE IF NOT EXISTS room_hosts( room_id TEXT NOT NULL, host TEXT NOT NULL, - CONSTRAINT room_hosts_uniq UNIQUE (room_id, host) ON CONFLICT IGNORE + UNIQUE (room_id, host) ); -CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id); +CREATE INDEX room_hosts_room_id ON room_hosts (room_id); diff --git a/synapse/storage/schema/full_schemas/11/keys.sql b/synapse/storage/schema/full_schemas/11/keys.sql index a9e0a4fe0d..afc142045e 100644 --- a/synapse/storage/schema/full_schemas/11/keys.sql +++ b/synapse/storage/schema/full_schemas/11/keys.sql @@ -16,16 +16,16 @@ CREATE TABLE IF NOT EXISTS server_tls_certificates( server_name TEXT, -- Server name. fingerprint TEXT, -- Certificate fingerprint. from_server TEXT, -- Which key server the certificate was fetched from. - ts_added_ms INTEGER, -- When the certifcate was added. - tls_certificate BLOB, -- DER encoded x509 certificate. - CONSTRAINT uniqueness UNIQUE (server_name, fingerprint) + ts_added_ms BIGINT, -- When the certifcate was added. + tls_certificate bytea, -- DER encoded x509 certificate. + UNIQUE (server_name, fingerprint) ); CREATE TABLE IF NOT EXISTS server_signature_keys( server_name TEXT, -- Server name. key_id TEXT, -- Key version. from_server TEXT, -- Which key server the key was fetched form. - ts_added_ms INTEGER, -- When the key was added. - verify_key BLOB, -- NACL verification key. - CONSTRAINT uniqueness UNIQUE (server_name, key_id) + ts_added_ms BIGINT, -- When the key was added. + verify_key bytea, -- NACL verification key. + UNIQUE (server_name, key_id) ); diff --git a/synapse/storage/schema/full_schemas/11/media_repository.sql b/synapse/storage/schema/full_schemas/11/media_repository.sql index afdf48cbfb..e927e581d1 100644 --- a/synapse/storage/schema/full_schemas/11/media_repository.sql +++ b/synapse/storage/schema/full_schemas/11/media_repository.sql @@ -17,10 +17,10 @@ CREATE TABLE IF NOT EXISTS local_media_repository ( media_id TEXT, -- The id used to refer to the media. media_type TEXT, -- The MIME-type of the media. media_length INTEGER, -- Length of the media in bytes. - created_ts INTEGER, -- When the content was uploaded in ms. + created_ts BIGINT, -- When the content was uploaded in ms. upload_name TEXT, -- The name the media was uploaded with. user_id TEXT, -- The user who uploaded the file. - CONSTRAINT uniqueness UNIQUE (media_id) + UNIQUE (media_id) ); CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails ( @@ -30,23 +30,23 @@ CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails ( thumbnail_type TEXT, -- The MIME-type of the thumbnail. thumbnail_method TEXT, -- The method used to make the thumbnail. thumbnail_length INTEGER, -- The length of the thumbnail in bytes. - CONSTRAINT uniqueness UNIQUE ( + UNIQUE ( media_id, thumbnail_width, thumbnail_height, thumbnail_type ) ); -CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id +CREATE INDEX local_media_repository_thumbnails_media_id ON local_media_repository_thumbnails (media_id); CREATE TABLE IF NOT EXISTS remote_media_cache ( media_origin TEXT, -- The remote HS the media came from. media_id TEXT, -- The id used to refer to the media on that server. media_type TEXT, -- The MIME-type of the media. - created_ts INTEGER, -- When the content was uploaded in ms. + created_ts BIGINT, -- When the content was uploaded in ms. upload_name TEXT, -- The name the media was uploaded with. media_length INTEGER, -- Length of the media in bytes. filesystem_id TEXT, -- The name used to store the media on disk. - CONSTRAINT uniqueness UNIQUE (media_origin, media_id) + UNIQUE (media_origin, media_id) ); CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails ( @@ -58,11 +58,8 @@ CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails ( thumbnail_type TEXT, -- The MIME-type of the thumbnail. thumbnail_length INTEGER, -- The length of the thumbnail in bytes. filesystem_id TEXT, -- The name used to store the media on disk. - CONSTRAINT uniqueness UNIQUE ( + UNIQUE ( media_origin, media_id, thumbnail_width, thumbnail_height, - thumbnail_type, thumbnail_type - ) + thumbnail_type + ) ); - -CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id - ON local_media_repository_thumbnails (media_id); diff --git a/synapse/storage/schema/full_schemas/11/presence.sql b/synapse/storage/schema/full_schemas/11/presence.sql index f9f8db9697..d8d82e9fe3 100644 --- a/synapse/storage/schema/full_schemas/11/presence.sql +++ b/synapse/storage/schema/full_schemas/11/presence.sql @@ -13,26 +13,23 @@ * limitations under the License. */ CREATE TABLE IF NOT EXISTS presence( - user_id INTEGER NOT NULL, - state INTEGER, + user_id TEXT NOT NULL, + state VARCHAR(20), status_msg TEXT, - mtime INTEGER, -- miliseconds since last state change - FOREIGN KEY(user_id) REFERENCES users(id) + mtime BIGINT -- miliseconds since last state change ); -- For each of /my/ users which possibly-remote users are allowed to see their -- presence state CREATE TABLE IF NOT EXISTS presence_allow_inbound( - observed_user_id INTEGER NOT NULL, - observer_user_id TEXT, -- a UserID, - FOREIGN KEY(observed_user_id) REFERENCES users(id) + observed_user_id TEXT NOT NULL, + observer_user_id TEXT NOT NULL -- a UserID, ); -- For each of /my/ users (watcher), which possibly-remote users are they -- watching? CREATE TABLE IF NOT EXISTS presence_list( - user_id INTEGER NOT NULL, - observed_user_id TEXT, -- a UserID, - accepted BOOLEAN, - FOREIGN KEY(user_id) REFERENCES users(id) + user_id TEXT NOT NULL, + observed_user_id TEXT NOT NULL, -- a UserID, + accepted BOOLEAN NOT NULL ); diff --git a/synapse/storage/schema/full_schemas/11/profiles.sql b/synapse/storage/schema/full_schemas/11/profiles.sql index f06a528b4d..26e4204437 100644 --- a/synapse/storage/schema/full_schemas/11/profiles.sql +++ b/synapse/storage/schema/full_schemas/11/profiles.sql @@ -13,8 +13,7 @@ * limitations under the License. */ CREATE TABLE IF NOT EXISTS profiles( - user_id INTEGER NOT NULL, + user_id TEXT NOT NULL, displayname TEXT, - avatar_url TEXT, - FOREIGN KEY(user_id) REFERENCES users(id) + avatar_url TEXT ); diff --git a/synapse/storage/schema/full_schemas/11/redactions.sql b/synapse/storage/schema/full_schemas/11/redactions.sql index 5011d95db8..69621955d4 100644 --- a/synapse/storage/schema/full_schemas/11/redactions.sql +++ b/synapse/storage/schema/full_schemas/11/redactions.sql @@ -15,8 +15,8 @@ CREATE TABLE IF NOT EXISTS redactions ( event_id TEXT NOT NULL, redacts TEXT NOT NULL, - CONSTRAINT ev_uniq UNIQUE (event_id) + UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id); -CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts); +CREATE INDEX redactions_event_id ON redactions (event_id); +CREATE INDEX redactions_redacts ON redactions (redacts); diff --git a/synapse/storage/schema/full_schemas/11/room_aliases.sql b/synapse/storage/schema/full_schemas/11/room_aliases.sql index 0d2df01603..5027b1e3f6 100644 --- a/synapse/storage/schema/full_schemas/11/room_aliases.sql +++ b/synapse/storage/schema/full_schemas/11/room_aliases.sql @@ -22,6 +22,3 @@ CREATE TABLE IF NOT EXISTS room_alias_servers( room_alias TEXT NOT NULL, server TEXT NOT NULL ); - - - diff --git a/synapse/storage/schema/full_schemas/11/state.sql b/synapse/storage/schema/full_schemas/11/state.sql index 1fe8f1e430..ffd164ab71 100644 --- a/synapse/storage/schema/full_schemas/11/state.sql +++ b/synapse/storage/schema/full_schemas/11/state.sql @@ -30,18 +30,11 @@ CREATE TABLE IF NOT EXISTS state_groups_state( CREATE TABLE IF NOT EXISTS event_to_state_groups( event_id TEXT NOT NULL, state_group INTEGER NOT NULL, - CONSTRAINT event_to_state_groups_uniq UNIQUE (event_id) + UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id); +CREATE INDEX state_groups_id ON state_groups(id); -CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state( - state_group -); -CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state( - room_id, type, state_key -); - -CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups( - event_id -); \ No newline at end of file +CREATE INDEX state_groups_state_id ON state_groups_state(state_group); +CREATE INDEX state_groups_state_tuple ON state_groups_state(room_id, type, state_key); +CREATE INDEX event_to_state_groups_id ON event_to_state_groups(event_id); diff --git a/synapse/storage/schema/full_schemas/11/transactions.sql b/synapse/storage/schema/full_schemas/11/transactions.sql index 2d30f99b06..cc5b54f5aa 100644 --- a/synapse/storage/schema/full_schemas/11/transactions.sql +++ b/synapse/storage/schema/full_schemas/11/transactions.sql @@ -14,17 +14,16 @@ */ -- Stores what transaction ids we have received and what our response was CREATE TABLE IF NOT EXISTS received_transactions( - transaction_id TEXT, - origin TEXT, - ts INTEGER, + transaction_id TEXT, + origin TEXT, + ts BIGINT, response_code INTEGER, - response_json TEXT, - has_been_referenced BOOL default 0, -- Whether thishas been referenced by a prev_tx - CONSTRAINT uniquesss UNIQUE (transaction_id, origin) ON CONFLICT REPLACE + response_json bytea, + has_been_referenced SMALLINT DEFAULT 0, -- Whether thishas been referenced by a prev_tx + UNIQUE (transaction_id, origin) ); -CREATE UNIQUE INDEX IF NOT EXISTS transactions_txid ON received_transactions(transaction_id, origin); -CREATE INDEX IF NOT EXISTS transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0; +CREATE INDEX transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0; -- Stores what transactions we've sent, what their response was (if we got one) and whether we have @@ -35,17 +34,14 @@ CREATE TABLE IF NOT EXISTS sent_transactions( destination TEXT, response_code INTEGER DEFAULT 0, response_json TEXT, - ts INTEGER + ts BIGINT ); -CREATE INDEX IF NOT EXISTS sent_transaction_dest ON sent_transactions(destination); -CREATE INDEX IF NOT EXISTS sent_transaction_dest_referenced ON sent_transactions( - destination -); -CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id); +CREATE INDEX sent_transaction_dest ON sent_transactions(destination); +CREATE INDEX sent_transaction_txn_id ON sent_transactions(transaction_id); -- So that we can do an efficient look up of all transactions that have yet to be successfully -- sent. -CREATE INDEX IF NOT EXISTS sent_transaction_sent ON sent_transactions(response_code); +CREATE INDEX sent_transaction_sent ON sent_transactions(response_code); -- For sent transactions only. @@ -56,13 +52,12 @@ CREATE TABLE IF NOT EXISTS transaction_id_to_pdu( pdu_origin TEXT ); -CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(transaction_id, destination); -CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); -CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination); +CREATE INDEX transaction_id_to_pdu_tx ON transaction_id_to_pdu(transaction_id, destination); +CREATE INDEX transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); -- To track destination health CREATE TABLE IF NOT EXISTS destinations( destination TEXT PRIMARY KEY, - retry_last_ts INTEGER, + retry_last_ts BIGINT, retry_interval INTEGER ); diff --git a/synapse/storage/schema/full_schemas/11/users.sql b/synapse/storage/schema/full_schemas/11/users.sql index 08ccfdac0a..eec3da3c35 100644 --- a/synapse/storage/schema/full_schemas/11/users.sql +++ b/synapse/storage/schema/full_schemas/11/users.sql @@ -16,19 +16,18 @@ CREATE TABLE IF NOT EXISTS users( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, password_hash TEXT, - creation_ts INTEGER, - admin BOOL DEFAULT 0 NOT NULL, - UNIQUE(name) ON CONFLICT ROLLBACK + creation_ts BIGINT, + admin SMALLINT DEFAULT 0 NOT NULL, + UNIQUE(name) ); CREATE TABLE IF NOT EXISTS access_tokens( id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER NOT NULL, + user_id TEXT NOT NULL, device_id TEXT, token TEXT NOT NULL, - last_used INTEGER, - FOREIGN KEY(user_id) REFERENCES users(id), - UNIQUE(token) ON CONFLICT ROLLBACK + last_used BIGINT, + UNIQUE(token) ); CREATE TABLE IF NOT EXISTS user_ips ( @@ -37,9 +36,8 @@ CREATE TABLE IF NOT EXISTS user_ips ( device_id TEXT, ip TEXT NOT NULL, user_agent TEXT NOT NULL, - last_seen INTEGER NOT NULL, - CONSTRAINT user_ip UNIQUE (user, access_token, ip, user_agent) ON CONFLICT REPLACE + last_seen BIGINT NOT NULL, + UNIQUE (user, access_token, ip, user_agent) ); -CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user); - +CREATE INDEX user_ips_user ON user_ips(user); diff --git a/synapse/storage/schema/full_schemas/16/application_services.sql b/synapse/storage/schema/full_schemas/16/application_services.sql new file mode 100644 index 0000000000..d382d63fbd --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/application_services.sql @@ -0,0 +1,48 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS application_services( + id BIGINT PRIMARY KEY, + url TEXT, + token TEXT, + hs_token TEXT, + sender TEXT, + UNIQUE(token) +); + +CREATE TABLE IF NOT EXISTS application_services_regex( + id BIGINT PRIMARY KEY, + as_id BIGINT NOT NULL, + namespace INTEGER, /* enum[room_id|room_alias|user_id] */ + regex TEXT, + FOREIGN KEY(as_id) REFERENCES application_services(id) +); + +CREATE TABLE IF NOT EXISTS application_services_state( + as_id TEXT PRIMARY KEY, + state VARCHAR(5), + last_txn INTEGER +); + +CREATE TABLE IF NOT EXISTS application_services_txns( + as_id TEXT NOT NULL, + txn_id INTEGER NOT NULL, + event_ids TEXT NOT NULL, + UNIQUE(as_id, txn_id) +); + +CREATE INDEX application_services_txns_id ON application_services_txns ( + as_id +); diff --git a/synapse/storage/schema/full_schemas/16/event_edges.sql b/synapse/storage/schema/full_schemas/16/event_edges.sql new file mode 100644 index 0000000000..f7020f7793 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/event_edges.sql @@ -0,0 +1,89 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_forward_extremities( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + UNIQUE (event_id, room_id) +); + +CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id); +CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id); + + +CREATE TABLE IF NOT EXISTS event_backward_extremities( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + UNIQUE (event_id, room_id) +); + +CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id); +CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id); + + +CREATE TABLE IF NOT EXISTS event_edges( + event_id TEXT NOT NULL, + prev_event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + is_state BOOL NOT NULL, + UNIQUE (event_id, prev_event_id, room_id, is_state) +); + +CREATE INDEX ev_edges_id ON event_edges(event_id); +CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id); + + +CREATE TABLE IF NOT EXISTS room_depth( + room_id TEXT NOT NULL, + min_depth INTEGER NOT NULL, + UNIQUE (room_id) +); + +CREATE INDEX room_depth_room ON room_depth(room_id); + + +create TABLE IF NOT EXISTS event_destinations( + event_id TEXT NOT NULL, + destination TEXT NOT NULL, + delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered + UNIQUE (event_id, destination) +); + +CREATE INDEX event_destinations_id ON event_destinations(event_id); + + +CREATE TABLE IF NOT EXISTS state_forward_extremities( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + type TEXT NOT NULL, + state_key TEXT NOT NULL, + UNIQUE (event_id, room_id) +); + +CREATE INDEX st_extrem_keys ON state_forward_extremities( + room_id, type, state_key +); +CREATE INDEX st_extrem_id ON state_forward_extremities(event_id); + + +CREATE TABLE IF NOT EXISTS event_auth( + event_id TEXT NOT NULL, + auth_id TEXT NOT NULL, + room_id TEXT NOT NULL, + UNIQUE (event_id, auth_id, room_id) +); + +CREATE INDEX evauth_edges_id ON event_auth(event_id); +CREATE INDEX evauth_edges_auth_id ON event_auth(auth_id); diff --git a/synapse/storage/schema/full_schemas/16/event_signatures.sql b/synapse/storage/schema/full_schemas/16/event_signatures.sql new file mode 100644 index 0000000000..636b2d3353 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/event_signatures.sql @@ -0,0 +1,55 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_content_hashes ( + event_id TEXT, + algorithm TEXT, + hash bytea, + UNIQUE (event_id, algorithm) +); + +CREATE INDEX event_content_hashes_id ON event_content_hashes(event_id); + + +CREATE TABLE IF NOT EXISTS event_reference_hashes ( + event_id TEXT, + algorithm TEXT, + hash bytea, + UNIQUE (event_id, algorithm) +); + +CREATE INDEX event_reference_hashes_id ON event_reference_hashes(event_id); + + +CREATE TABLE IF NOT EXISTS event_signatures ( + event_id TEXT, + signature_name TEXT, + key_id TEXT, + signature bytea, + UNIQUE (event_id, signature_name, key_id) +); + +CREATE INDEX event_signatures_id ON event_signatures(event_id); + + +CREATE TABLE IF NOT EXISTS event_edge_hashes( + event_id TEXT, + prev_event_id TEXT, + algorithm TEXT, + hash bytea, + UNIQUE (event_id, prev_event_id, algorithm) +); + +CREATE INDEX event_edge_hashes_id ON event_edge_hashes(event_id); diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql new file mode 100644 index 0000000000..576653a3c9 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/im.sql @@ -0,0 +1,128 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS events( + stream_ordering INTEGER PRIMARY KEY, + topological_ordering BIGINT NOT NULL, + event_id TEXT NOT NULL, + type TEXT NOT NULL, + room_id TEXT NOT NULL, + content TEXT NOT NULL, + unrecognized_keys TEXT, + processed BOOL NOT NULL, + outlier BOOL NOT NULL, + depth BIGINT DEFAULT 0 NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX events_stream_ordering ON events (stream_ordering); +CREATE INDEX events_topological_ordering ON events (topological_ordering); +CREATE INDEX events_order ON events (topological_ordering, stream_ordering); +CREATE INDEX events_room_id ON events (room_id); +CREATE INDEX events_order_room ON events ( + room_id, topological_ordering, stream_ordering +); + + +CREATE TABLE IF NOT EXISTS event_json( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + internal_metadata TEXT NOT NULL, + json TEXT NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX event_json_room_id ON event_json(room_id); + + +CREATE TABLE IF NOT EXISTS state_events( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + type TEXT NOT NULL, + state_key TEXT NOT NULL, + prev_state TEXT, + UNIQUE (event_id) +); + +CREATE INDEX state_events_room_id ON state_events (room_id); +CREATE INDEX state_events_type ON state_events (type); +CREATE INDEX state_events_state_key ON state_events (state_key); + + +CREATE TABLE IF NOT EXISTS current_state_events( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + type TEXT NOT NULL, + state_key TEXT NOT NULL, + UNIQUE (event_id), + UNIQUE (room_id, type, state_key) +); + +CREATE INDEX current_state_events_room_id ON current_state_events (room_id); +CREATE INDEX current_state_events_type ON current_state_events (type); +CREATE INDEX current_state_events_state_key ON current_state_events (state_key); + +CREATE TABLE IF NOT EXISTS room_memberships( + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + sender TEXT NOT NULL, + room_id TEXT NOT NULL, + membership TEXT NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX room_memberships_room_id ON room_memberships (room_id); +CREATE INDEX room_memberships_user_id ON room_memberships (user_id); + +CREATE TABLE IF NOT EXISTS feedback( + event_id TEXT NOT NULL, + feedback_type TEXT, + target_event_id TEXT, + sender TEXT, + room_id TEXT, + UNIQUE (event_id) +); + +CREATE TABLE IF NOT EXISTS topics( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + topic TEXT NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX topics_room_id ON topics(room_id); + +CREATE TABLE IF NOT EXISTS room_names( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + name TEXT NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX room_names_room_id ON room_names(room_id); + +CREATE TABLE IF NOT EXISTS rooms( + room_id TEXT PRIMARY KEY NOT NULL, + is_public BOOL, + creator TEXT +); + +CREATE TABLE IF NOT EXISTS room_hosts( + room_id TEXT NOT NULL, + host TEXT NOT NULL, + UNIQUE (room_id, host) +); + +CREATE INDEX room_hosts_room_id ON room_hosts (room_id); diff --git a/synapse/storage/schema/full_schemas/16/keys.sql b/synapse/storage/schema/full_schemas/16/keys.sql new file mode 100644 index 0000000000..afc142045e --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/keys.sql @@ -0,0 +1,31 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS server_tls_certificates( + server_name TEXT, -- Server name. + fingerprint TEXT, -- Certificate fingerprint. + from_server TEXT, -- Which key server the certificate was fetched from. + ts_added_ms BIGINT, -- When the certifcate was added. + tls_certificate bytea, -- DER encoded x509 certificate. + UNIQUE (server_name, fingerprint) +); + +CREATE TABLE IF NOT EXISTS server_signature_keys( + server_name TEXT, -- Server name. + key_id TEXT, -- Key version. + from_server TEXT, -- Which key server the key was fetched form. + ts_added_ms BIGINT, -- When the key was added. + verify_key bytea, -- NACL verification key. + UNIQUE (server_name, key_id) +); diff --git a/synapse/storage/schema/full_schemas/16/media_repository.sql b/synapse/storage/schema/full_schemas/16/media_repository.sql new file mode 100644 index 0000000000..dacbda40ca --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/media_repository.sql @@ -0,0 +1,68 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS local_media_repository ( + media_id TEXT, -- The id used to refer to the media. + media_type TEXT, -- The MIME-type of the media. + media_length INTEGER, -- Length of the media in bytes. + created_ts BIGINT, -- When the content was uploaded in ms. + upload_name TEXT, -- The name the media was uploaded with. + user_id TEXT, -- The user who uploaded the file. + UNIQUE (media_id) +); + +CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails ( + media_id TEXT, -- The id used to refer to the media. + thumbnail_width INTEGER, -- The width of the thumbnail in pixels. + thumbnail_height INTEGER, -- The height of the thumbnail in pixels. + thumbnail_type TEXT, -- The MIME-type of the thumbnail. + thumbnail_method TEXT, -- The method used to make the thumbnail. + thumbnail_length INTEGER, -- The length of the thumbnail in bytes. + UNIQUE ( + media_id, thumbnail_width, thumbnail_height, thumbnail_type + ) +); + +CREATE INDEX local_media_repository_thumbnails_media_id + ON local_media_repository_thumbnails (media_id); + +CREATE TABLE IF NOT EXISTS remote_media_cache ( + media_origin TEXT, -- The remote HS the media came from. + media_id TEXT, -- The id used to refer to the media on that server. + media_type TEXT, -- The MIME-type of the media. + created_ts BIGINT, -- When the content was uploaded in ms. + upload_name TEXT, -- The name the media was uploaded with. + media_length INTEGER, -- Length of the media in bytes. + filesystem_id TEXT, -- The name used to store the media on disk. + UNIQUE (media_origin, media_id) +); + +CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails ( + media_origin TEXT, -- The remote HS the media came from. + media_id TEXT, -- The id used to refer to the media. + thumbnail_width INTEGER, -- The width of the thumbnail in pixels. + thumbnail_height INTEGER, -- The height of the thumbnail in pixels. + thumbnail_method TEXT, -- The method used to make the thumbnail + thumbnail_type TEXT, -- The MIME-type of the thumbnail. + thumbnail_length INTEGER, -- The length of the thumbnail in bytes. + filesystem_id TEXT, -- The name used to store the media on disk. + UNIQUE ( + media_origin, media_id, thumbnail_width, thumbnail_height, + thumbnail_type + ) +); + +CREATE INDEX remote_media_cache_thumbnails_media_id + ON remote_media_cache_thumbnails (media_id); diff --git a/synapse/storage/schema/full_schemas/16/presence.sql b/synapse/storage/schema/full_schemas/16/presence.sql new file mode 100644 index 0000000000..80088413ba --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/presence.sql @@ -0,0 +1,40 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS presence( + user_id TEXT NOT NULL, + state VARCHAR(20), + status_msg TEXT, + mtime BIGINT, -- miliseconds since last state change + UNIQUE (user_id) +); + +-- For each of /my/ users which possibly-remote users are allowed to see their +-- presence state +CREATE TABLE IF NOT EXISTS presence_allow_inbound( + observed_user_id TEXT NOT NULL, + observer_user_id TEXT NOT NULL, -- a UserID, + UNIQUE (observed_user_id, observer_user_id) +); + +-- For each of /my/ users (watcher), which possibly-remote users are they +-- watching? +CREATE TABLE IF NOT EXISTS presence_list( + user_id TEXT NOT NULL, + observed_user_id TEXT NOT NULL, -- a UserID, + accepted BOOLEAN NOT NULL, + UNIQUE (user_id, observed_user_id) +); + +CREATE INDEX presence_list_user_id ON presence_list (user_id); diff --git a/synapse/storage/schema/full_schemas/16/profiles.sql b/synapse/storage/schema/full_schemas/16/profiles.sql new file mode 100644 index 0000000000..934be86520 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/profiles.sql @@ -0,0 +1,20 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS profiles( + user_id TEXT NOT NULL, + displayname TEXT, + avatar_url TEXT, + UNIQUE(user_id) +); diff --git a/synapse/storage/schema/full_schemas/16/push.sql b/synapse/storage/schema/full_schemas/16/push.sql new file mode 100644 index 0000000000..db6e05cbdf --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/push.sql @@ -0,0 +1,73 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS rejections( + event_id TEXT NOT NULL, + reason TEXT NOT NULL, + last_check TEXT NOT NULL, + UNIQUE (event_id) +); + +-- Push notification endpoints that users have configured +CREATE TABLE IF NOT EXISTS pushers ( + id BIGINT PRIMARY KEY, + user_name TEXT NOT NULL, + profile_tag VARCHAR(32) NOT NULL, + kind VARCHAR(8) NOT NULL, + app_id VARCHAR(64) NOT NULL, + app_display_name VARCHAR(64) NOT NULL, + device_display_name VARCHAR(128) NOT NULL, + pushkey bytea NOT NULL, + ts BIGINT NOT NULL, + lang VARCHAR(8), + data bytea, + last_token TEXT, + last_success BIGINT, + failing_since BIGINT, + UNIQUE (app_id, pushkey) +); + +CREATE TABLE IF NOT EXISTS push_rules ( + id BIGINT PRIMARY KEY, + user_name TEXT NOT NULL, + rule_id TEXT NOT NULL, + priority_class SMALLINT NOT NULL, + priority INTEGER NOT NULL DEFAULT 0, + conditions TEXT NOT NULL, + actions TEXT NOT NULL, + UNIQUE(user_name, rule_id) +); + +CREATE INDEX push_rules_user_name on push_rules (user_name); + +CREATE TABLE IF NOT EXISTS user_filters( + user_id TEXT, + filter_id BIGINT, + filter_json bytea +); + +CREATE INDEX user_filters_by_user_id_filter_id ON user_filters( + user_id, filter_id +); + +CREATE TABLE IF NOT EXISTS push_rules_enable ( + id BIGINT PRIMARY KEY, + user_name TEXT NOT NULL, + rule_id TEXT NOT NULL, + enabled SMALLINT, + UNIQUE(user_name, rule_id) +); + +CREATE INDEX push_rules_enable_user_name on push_rules_enable (user_name); diff --git a/synapse/storage/schema/full_schemas/16/redactions.sql b/synapse/storage/schema/full_schemas/16/redactions.sql new file mode 100644 index 0000000000..69621955d4 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/redactions.sql @@ -0,0 +1,22 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS redactions ( + event_id TEXT NOT NULL, + redacts TEXT NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX redactions_event_id ON redactions (event_id); +CREATE INDEX redactions_redacts ON redactions (redacts); diff --git a/synapse/storage/schema/full_schemas/16/room_aliases.sql b/synapse/storage/schema/full_schemas/16/room_aliases.sql new file mode 100644 index 0000000000..412bb97fad --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/room_aliases.sql @@ -0,0 +1,29 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS room_aliases( + room_alias TEXT NOT NULL, + room_id TEXT NOT NULL, + UNIQUE (room_alias) +); + +CREATE INDEX room_aliases_id ON room_aliases(room_id); + +CREATE TABLE IF NOT EXISTS room_alias_servers( + room_alias TEXT NOT NULL, + server TEXT NOT NULL +); + +CREATE INDEX room_alias_servers_alias ON room_alias_servers(room_alias); diff --git a/synapse/storage/schema/full_schemas/16/state.sql b/synapse/storage/schema/full_schemas/16/state.sql new file mode 100644 index 0000000000..705cac6ce9 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/state.sql @@ -0,0 +1,40 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS state_groups( + id BIGINT PRIMARY KEY, + room_id TEXT NOT NULL, + event_id TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS state_groups_state( + state_group BIGINT NOT NULL, + room_id TEXT NOT NULL, + type TEXT NOT NULL, + state_key TEXT NOT NULL, + event_id TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS event_to_state_groups( + event_id TEXT NOT NULL, + state_group BIGINT NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX state_groups_id ON state_groups(id); + +CREATE INDEX state_groups_state_id ON state_groups_state(state_group); +CREATE INDEX state_groups_state_tuple ON state_groups_state(room_id, type, state_key); +CREATE INDEX event_to_state_groups_id ON event_to_state_groups(event_id); diff --git a/synapse/storage/schema/full_schemas/16/transactions.sql b/synapse/storage/schema/full_schemas/16/transactions.sql new file mode 100644 index 0000000000..1ab77cdb63 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/transactions.sql @@ -0,0 +1,63 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +-- Stores what transaction ids we have received and what our response was +CREATE TABLE IF NOT EXISTS received_transactions( + transaction_id TEXT, + origin TEXT, + ts BIGINT, + response_code INTEGER, + response_json bytea, + has_been_referenced smallint default 0, -- Whether thishas been referenced by a prev_tx + UNIQUE (transaction_id, origin) +); + +CREATE INDEX transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0; + + +-- Stores what transactions we've sent, what their response was (if we got one) and whether we have +-- since referenced the transaction in another outgoing transaction +CREATE TABLE IF NOT EXISTS sent_transactions( + id BIGINT PRIMARY KEY, -- This is used to apply insertion ordering + transaction_id TEXT, + destination TEXT, + response_code INTEGER DEFAULT 0, + response_json TEXT, + ts BIGINT +); + +CREATE INDEX sent_transaction_dest ON sent_transactions(destination); +CREATE INDEX sent_transaction_txn_id ON sent_transactions(transaction_id); +-- So that we can do an efficient look up of all transactions that have yet to be successfully +-- sent. +CREATE INDEX sent_transaction_sent ON sent_transactions(response_code); + + +-- For sent transactions only. +CREATE TABLE IF NOT EXISTS transaction_id_to_pdu( + transaction_id INTEGER, + destination TEXT, + pdu_id TEXT, + pdu_origin TEXT, + UNIQUE (transaction_id, destination) +); + +CREATE INDEX transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); + +-- To track destination health +CREATE TABLE IF NOT EXISTS destinations( + destination TEXT PRIMARY KEY, + retry_last_ts BIGINT, + retry_interval INTEGER +); diff --git a/synapse/storage/schema/full_schemas/16/users.sql b/synapse/storage/schema/full_schemas/16/users.sql new file mode 100644 index 0000000000..d2fa3122da --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/users.sql @@ -0,0 +1,42 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS users( + name TEXT, + password_hash TEXT, + creation_ts BIGINT, + admin SMALLINT DEFAULT 0 NOT NULL, + UNIQUE(name) +); + +CREATE TABLE IF NOT EXISTS access_tokens( + id BIGINT PRIMARY KEY, + user_id TEXT NOT NULL, + device_id TEXT, + token TEXT NOT NULL, + last_used BIGINT, + UNIQUE(token) +); + +CREATE TABLE IF NOT EXISTS user_ips ( + user_id TEXT NOT NULL, + access_token TEXT NOT NULL, + device_id TEXT, + ip TEXT NOT NULL, + user_agent TEXT NOT NULL, + last_seen BIGINT NOT NULL +); + +CREATE INDEX user_ips_user ON user_ips(user_id); +CREATE INDEX user_ips_user_ip ON user_ips(user_id, access_token, ip); diff --git a/synapse/storage/schema/schema_version.sql b/synapse/storage/schema/schema_version.sql index 0431e2d051..d682608aa0 100644 --- a/synapse/storage/schema/schema_version.sql +++ b/synapse/storage/schema/schema_version.sql @@ -14,17 +14,14 @@ */ CREATE TABLE IF NOT EXISTS schema_version( - Lock char(1) NOT NULL DEFAULT 'X', -- Makes sure this table only has one row. + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. version INTEGER NOT NULL, upgraded BOOL NOT NULL, -- Whether we reached this version from an upgrade or an initial schema. - CONSTRAINT schema_version_lock_x CHECK (Lock='X') - CONSTRAINT schema_version_lock_uniq UNIQUE (Lock) + CHECK (Lock='X') ); CREATE TABLE IF NOT EXISTS applied_schema_deltas( version INTEGER NOT NULL, file TEXT NOT NULL, - CONSTRAINT schema_deltas_ver_file UNIQUE (version, file) ON CONFLICT IGNORE + UNIQUE(version, file) ); - -CREATE INDEX IF NOT EXISTS schema_deltas_ver ON applied_schema_deltas(version); diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index d0d53770f2..f051828630 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -56,7 +56,6 @@ class SignatureStore(SQLBaseStore): "algorithm": algorithm, "hash": buffer(hash_bytes), }, - or_ignore=True, ) def get_event_reference_hashes(self, event_ids): @@ -100,7 +99,7 @@ class SignatureStore(SQLBaseStore): " WHERE event_id = ?" ) txn.execute(query, (event_id, )) - return dict(txn.fetchall()) + return {k: v for k, v in txn.fetchall()} def _store_event_reference_hash_txn(self, txn, event_id, algorithm, hash_bytes): @@ -119,7 +118,6 @@ class SignatureStore(SQLBaseStore): "algorithm": algorithm, "hash": buffer(hash_bytes), }, - or_ignore=True, ) def _get_event_signatures_txn(self, txn, event_id): @@ -164,7 +162,6 @@ class SignatureStore(SQLBaseStore): "key_id": key_id, "signature": buffer(signature_bytes), }, - or_ignore=True, ) def _get_prev_event_hashes_txn(self, txn, event_id): @@ -198,5 +195,4 @@ class SignatureStore(SQLBaseStore): "algorithm": algorithm, "hash": buffer(hash_bytes), }, - or_ignore=True, ) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 58dbf2802b..553ba9dd1f 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -17,6 +17,8 @@ from ._base import SQLBaseStore from twisted.internet import defer +from synapse.util.stringutils import random_string + import logging logger = logging.getLogger(__name__) @@ -91,14 +93,15 @@ class StateStore(SQLBaseStore): state_group = context.state_group if not state_group: - state_group = self._simple_insert_txn( + state_group = self._state_groups_id_gen.get_next_txn(txn) + self._simple_insert_txn( txn, table="state_groups", values={ + "id": state_group, "room_id": event.room_id, "event_id": event.event_id, }, - or_ignore=True, ) for state in state_events.values(): @@ -112,7 +115,6 @@ class StateStore(SQLBaseStore): "state_key": state.state_key, "event_id": state.event_id, }, - or_ignore=True, ) self._simple_insert_txn( @@ -122,7 +124,6 @@ class StateStore(SQLBaseStore): "state_group": state_group, "event_id": event.event_id, }, - or_replace=True, ) @defer.inlineCallbacks @@ -154,3 +155,7 @@ class StateStore(SQLBaseStore): events = yield self._parse_events(results) defer.returnValue(events) + + +def _make_group_id(clock): + return str(int(clock.time_msec())) + random_string(5) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 66f307e640..df6de7cbcd 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -35,7 +35,7 @@ what sort order was used: from twisted.internet import defer -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.util.logutils import log_function @@ -110,7 +110,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): if self.topological is None: return "(%d < %s)" % (self.stream, "stream_ordering") else: - return "(%d < %s OR (%d == %s AND %d < %s))" % ( + return "(%d < %s OR (%d = %s AND %d < %s))" % ( self.topological, "topological_ordering", self.topological, "topological_ordering", self.stream, "stream_ordering", @@ -120,7 +120,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): if self.topological is None: return "(%d >= %s)" % (self.stream, "stream_ordering") else: - return "(%d > %s OR (%d == %s AND %d >= %s))" % ( + return "(%d > %s OR (%d = %s AND %d >= %s))" % ( self.topological, "topological_ordering", self.topological, "topological_ordering", self.stream, "stream_ordering", @@ -240,7 +240,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT e.event_id, e.stream_ordering FROM events AS e WHERE " - "(e.outlier = 0 AND (room_id IN (%(current)s)) OR " + "(e.outlier = ? AND (room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " "AND e.stream_ordering > ? AND e.stream_ordering <= ? " "ORDER BY stream_ordering ASC LIMIT %(limit)d " @@ -251,7 +251,7 @@ class StreamStore(SQLBaseStore): } def f(txn): - txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,)) + txn.execute(sql, (False, user_id, user_id, from_id.stream, to_id.stream,)) rows = self.cursor_to_dict(txn) @@ -283,7 +283,7 @@ class StreamStore(SQLBaseStore): # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. - args = [room_id] + args = [False, room_id] if direction == 'b': order = "DESC" bounds = _StreamToken.parse(from_key).upper_bound() @@ -307,7 +307,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT * FROM events" - " WHERE outlier = 0 AND room_id = ? AND %(bounds)s" + " WHERE outlier = ? AND room_id = ? AND %(bounds)s" " ORDER BY topological_ordering %(order)s," " stream_ordering %(order)s %(limit)s" ) % { @@ -358,7 +358,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT stream_ordering, topological_ordering, event_id" " FROM events" - " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0" + " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?" " ORDER BY topological_ordering DESC, stream_ordering DESC" " LIMIT ?" ) @@ -368,17 +368,17 @@ class StreamStore(SQLBaseStore): "SELECT stream_ordering, topological_ordering, event_id" " FROM events" " WHERE room_id = ? AND stream_ordering > ?" - " AND stream_ordering <= ? AND outlier = 0" + " AND stream_ordering <= ? AND outlier = ?" " ORDER BY topological_ordering DESC, stream_ordering DESC" " LIMIT ?" ) def get_recent_events_for_room_txn(txn): if from_token is None: - txn.execute(sql, (room_id, end_token.stream, limit,)) + txn.execute(sql, (room_id, end_token.stream, False, limit,)) else: txn.execute(sql, ( - room_id, from_token.stream, end_token.stream, limit + room_id, from_token.stream, end_token.stream, False, limit )) rows = self.cursor_to_dict(txn) @@ -413,12 +413,10 @@ class StreamStore(SQLBaseStore): "get_recent_events_for_room", get_recent_events_for_room_txn ) - @cached(num_args=0) + @defer.inlineCallbacks def get_room_events_max_id(self): - return self.runInteraction( - "get_room_events_max_id", - self._get_room_events_max_id_txn - ) + token = yield self._stream_id_gen.get_max_token(self) + defer.returnValue("s%d" % (token,)) @defer.inlineCallbacks def _get_min_token(self): @@ -433,27 +431,6 @@ class StreamStore(SQLBaseStore): defer.returnValue(self.min_token) - def get_next_stream_id(self): - with self._next_stream_id_lock: - i = self._next_stream_id - self._next_stream_id += 1 - return i - - def _get_room_events_max_id_txn(self, txn): - txn.execute( - "SELECT MAX(stream_ordering) as m FROM events" - ) - - res = self.cursor_to_dict(txn) - - logger.debug("get_room_events_max_id: %s", res) - - if not res or not res[0] or not res[0]["m"]: - return "s0" - - key = res[0]["m"] - return "s%d" % (key,) - @staticmethod def _set_before_and_after(events, rows): for event, row in zip(events, rows): diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index b777395e06..89dd7d8947 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, Table, cached +from ._base import SQLBaseStore, cached from collections import namedtuple @@ -76,22 +76,18 @@ class TransactionStore(SQLBaseStore): response_json (str) """ - return self.runInteraction( - "set_received_txn_response", - self._set_received_txn_response, - transaction_id, origin, code, response_dict + return self._simple_insert( + table=ReceivedTransactionsTable.table_name, + values={ + "transaction_id": transaction_id, + "origin": origin, + "response_code": code, + "response_json": response_dict, + }, + or_ignore=True, + desc="set_received_txn_response", ) - def _set_received_txn_response(self, txn, transaction_id, origin, code, - response_json): - query = ( - "UPDATE %s " - "SET response_code = ?, response_json = ? " - "WHERE transaction_id = ? AND origin = ?" - ) % ReceivedTransactionsTable.table_name - - txn.execute(query, (code, response_json, transaction_id, origin)) - def prep_send_transaction(self, transaction_id, destination, origin_server_ts): """Persists an outgoing transaction and calculates the values for the @@ -118,41 +114,38 @@ class TransactionStore(SQLBaseStore): def _prep_send_transaction(self, txn, transaction_id, destination, origin_server_ts): + next_id = self._transaction_id_gen.get_next_txn(txn) + # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. - query = "%s ORDER BY id DESC LIMIT 1" % ( - SentTransactions.select_statement("destination = ?"), - ) + query = ( + "SELECT * FROM sent_transactions" + " WHERE destination = ?" + " ORDER BY id DESC LIMIT 1" + ) - results = txn.execute(query, (destination,)) - results = SentTransactions.decode_results(results) + txn.execute(query, (destination,)) + results = self.cursor_to_dict(txn) - prev_txns = [r.transaction_id for r in results] + prev_txns = [r["transaction_id"] for r in results] # Actually add the new transaction to the sent_transactions table. - query = SentTransactions.insert_statement() - txn.execute(query, SentTransactions.EntryType( - None, - transaction_id=transaction_id, - destination=destination, - ts=origin_server_ts, - response_code=0, - response_json=None - )) - - # Update the tx id -> pdu id mapping - - # values = [ - # (transaction_id, destination, pdu[0], pdu[1]) - # for pdu in pdu_list - # ] - # - # logger.debug("Inserting: %s", repr(values)) - # - # query = TransactionsToPduTable.insert_statement() - # txn.executemany(query, values) + self._simple_insert_txn( + txn, + table=SentTransactions.table_name, + values={ + "id": next_id, + "transaction_id": transaction_id, + "destination": destination, + "ts": origin_server_ts, + "response_code": 0, + "response_json": None, + } + ) + + # TODO Update the tx id -> pdu id mapping return prev_txns @@ -171,15 +164,20 @@ class TransactionStore(SQLBaseStore): transaction_id, destination, code, response_dict ) - def _delivered_txn(cls, txn, transaction_id, destination, + def _delivered_txn(self, txn, transaction_id, destination, code, response_json): - query = ( - "UPDATE %s " - "SET response_code = ?, response_json = ? " - "WHERE transaction_id = ? AND destination = ?" - ) % SentTransactions.table_name - - txn.execute(query, (code, response_json, transaction_id, destination)) + self._simple_update_one_txn( + txn, + table=SentTransactions.table_name, + keyvalues={ + "transaction_id": transaction_id, + "destination": destination, + }, + updatevalues={ + "response_code": code, + "response_json": None, # For now, don't persist response_json + } + ) def get_transactions_after(self, transaction_id, destination): """Get all transactions after a given local transaction_id. @@ -189,25 +187,26 @@ class TransactionStore(SQLBaseStore): destination (str) Returns: - list: A list of `ReceivedTransactionsTable.EntryType` + list: A list of dicts """ return self.runInteraction( "get_transactions_after", self._get_transactions_after, transaction_id, destination ) - def _get_transactions_after(cls, txn, transaction_id, destination): - where = ( - "destination = ? AND id > (select id FROM %s WHERE " - "transaction_id = ? AND destination = ?)" - ) % ( - SentTransactions.table_name + def _get_transactions_after(self, txn, transaction_id, destination): + query = ( + "SELECT * FROM sent_transactions" + " WHERE destination = ? AND id >" + " (" + " SELECT id FROM sent_transactions" + " WHERE transaction_id = ? AND destination = ?" + " )" ) - query = SentTransactions.select_statement(where) txn.execute(query, (destination, transaction_id, destination)) - return ReceivedTransactionsTable.decode_results(txn.fetchall()) + return self.cursor_to_dict(txn) @cached() def get_destination_retry_timings(self, destination): @@ -218,22 +217,27 @@ class TransactionStore(SQLBaseStore): Returns: None if not retrying - Otherwise a DestinationsTable.EntryType for the retry scheme + Otherwise a dict for the retry scheme """ return self.runInteraction( "get_destination_retry_timings", self._get_destination_retry_timings, destination) - def _get_destination_retry_timings(cls, txn, destination): - query = DestinationsTable.select_statement("destination = ?") - txn.execute(query, (destination,)) - result = txn.fetchall() - if result: - result = DestinationsTable.decode_single_result(result) - if result.retry_last_ts > 0: - return result - else: - return None + def _get_destination_retry_timings(self, txn, destination): + result = self._simple_select_one_txn( + txn, + table=DestinationsTable.table_name, + keyvalues={ + "destination": destination, + }, + retcols=DestinationsTable.fields, + allow_none=True, + ) + + if result and result["retry_last_ts"] > 0: + return result + else: + return None def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval): @@ -249,11 +253,11 @@ class TransactionStore(SQLBaseStore): # As this is the new value, we might as well prefill the cache self.get_destination_retry_timings.prefill( destination, - DestinationsTable.EntryType( - destination, - retry_last_ts, - retry_interval - ) + { + "destination": destination, + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval + }, ) # XXX: we could chose to not bother persisting this if our cache thinks @@ -266,22 +270,38 @@ class TransactionStore(SQLBaseStore): retry_interval, ) - def _set_destination_retry_timings(cls, txn, destination, + def _set_destination_retry_timings(self, txn, destination, retry_last_ts, retry_interval): - query = ( - "INSERT OR REPLACE INTO %s " - "(destination, retry_last_ts, retry_interval) " - "VALUES (?, ?, ?) " - ) % DestinationsTable.table_name + "UPDATE destinations" + " SET retry_last_ts = ?, retry_interval = ?" + " WHERE destination = ?" + ) + + txn.execute( + query, + ( + retry_last_ts, retry_interval, destination, + ) + ) - txn.execute(query, (destination, retry_last_ts, retry_interval)) + if txn.rowcount == 0: + # destination wasn't already in table. Insert it. + self._simple_insert_txn( + txn, + table="destinations", + values={ + "destination": destination, + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + } + ) def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction. Returns: - list: A list of `DestinationsTable.EntryType` + list: A list of dicts """ return self.runInteraction( @@ -289,14 +309,17 @@ class TransactionStore(SQLBaseStore): self._get_destinations_needing_retry ) - def _get_destinations_needing_retry(cls, txn): - where = "retry_last_ts > 0 and retry_next_ts < now()" - query = DestinationsTable.select_statement(where) - txn.execute(query) - return DestinationsTable.decode_results(txn.fetchall()) + def _get_destinations_needing_retry(self, txn): + query = ( + "SELECT * FROM destinations" + " WHERE retry_last_ts > 0 and retry_next_ts < ?" + ) + + txn.execute(query, (self._clock.time_msec(),)) + return self.cursor_to_dict(txn) -class ReceivedTransactionsTable(Table): +class ReceivedTransactionsTable(object): table_name = "received_transactions" fields = [ @@ -308,10 +331,8 @@ class ReceivedTransactionsTable(Table): "has_been_referenced", ] - EntryType = namedtuple("ReceivedTransactionsEntry", fields) - -class SentTransactions(Table): +class SentTransactions(object): table_name = "sent_transactions" fields = [ @@ -326,7 +347,7 @@ class SentTransactions(Table): EntryType = namedtuple("SentTransactionsEntry", fields) -class TransactionsToPduTable(Table): +class TransactionsToPduTable(object): table_name = "transaction_id_to_pdu" fields = [ @@ -336,10 +357,8 @@ class TransactionsToPduTable(Table): "pdu_origin", ] - EntryType = namedtuple("TransactionsToPduEntry", fields) - -class DestinationsTable(Table): +class DestinationsTable(object): table_name = "destinations" fields = [ @@ -347,5 +366,3 @@ class DestinationsTable(Table): "retry_last_ts", "retry_interval", ] - - EntryType = namedtuple("DestinationsEntry", fields) diff --git a/synapse/storage/util/__init__.py b/synapse/storage/util/__init__.py new file mode 100644 index 0000000000..c488b10d3c --- /dev/null +++ b/synapse/storage/util/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py new file mode 100644 index 0000000000..9d461d5e96 --- /dev/null +++ b/synapse/storage/util/id_generators.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from collections import deque +import contextlib +import threading + + +class IdGenerator(object): + def __init__(self, table, column, store): + self.table = table + self.column = column + self.store = store + self._lock = threading.Lock() + self._next_id = None + + @defer.inlineCallbacks + def get_next(self): + with self._lock: + if not self._next_id: + res = yield self.store._execute_and_decode( + "IdGenerator_%s" % (self.table,), + "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,) + ) + + self._next_id = (res and res[0] and res[0]["mx"]) or 1 + + i = self._next_id + self._next_id += 1 + defer.returnValue(i) + + def get_next_txn(self, txn): + with self._lock: + if self._next_id: + i = self._next_id + self._next_id += 1 + return i + else: + txn.execute( + "SELECT MAX(%s) FROM %s" % (self.column, self.table,) + ) + + val, = txn.fetchone() + cur = val or 0 + cur += 1 + self._next_id = cur + 1 + + return cur + + +class StreamIdGenerator(object): + """Used to generate new stream ids when persisting events while keeping + track of which transactions have been completed. + + This allows us to get the "current" stream id, i.e. the stream id such that + all ids less than or equal to it have completed. This handles the fact that + persistence of events can complete out of order. + + Usage: + with stream_id_gen.get_next_txn(txn) as stream_id: + # ... persist event ... + """ + def __init__(self): + self._lock = threading.Lock() + + self._current_max = None + self._unfinished_ids = deque() + + def get_next_txn(self, txn): + """ + Usage: + with stream_id_gen.get_next_txn(txn) as stream_id: + # ... persist event ... + """ + with self._lock: + if not self._current_max: + self._compute_current_max(txn) + + self._current_max += 1 + next_id = self._current_max + + self._unfinished_ids.append(next_id) + + @contextlib.contextmanager + def manager(): + try: + yield next_id + finally: + with self._lock: + self._unfinished_ids.remove(next_id) + + return manager() + + @defer.inlineCallbacks + def get_max_token(self, store): + """Returns the maximum stream id such that all stream ids less than or + equal to it have been successfully persisted. + """ + with self._lock: + if self._unfinished_ids: + defer.returnValue(self._unfinished_ids[0] - 1) + + if not self._current_max: + yield store.runInteraction( + "_compute_current_max", + self._compute_current_max, + ) + + defer.returnValue(self._current_max) + + def _compute_current_max(self, txn): + txn.execute("SELECT MAX(stream_ordering) FROM events") + val, = txn.fetchone() + + self._current_max = int(val) if val else 1 + + return self._current_max |