diff options
Diffstat (limited to 'synapse')
66 files changed, 2056 insertions, 732 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 27e53a9e56..f8a33120b5 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -17,9 +17,8 @@ import sys sys.dont_write_bytecode = True -from synapse.storage import ( - prepare_database, prepare_sqlite3_database, UpgradeDatabaseException, -) +from synapse.storage import UpgradeDatabaseException +from synapse.storage.engines import create_engine from synapse.server import HomeServer @@ -57,9 +56,10 @@ import os import re import resource import subprocess -import sqlite3 +import yaml + -logger = logging.getLogger(__name__) +logger = logging.getLogger("synapse.app.homeserver") class SynapseHomeServer(HomeServer): @@ -103,13 +103,11 @@ class SynapseHomeServer(HomeServer): return None def build_db_pool(self): + name = self.db_config["name"] + return adbapi.ConnectionPool( - "sqlite3", self.get_db_name(), - check_same_thread=False, - cp_min=1, - cp_max=1, - cp_openfun=prepare_database, # Prepare the database for each conn - # so that :memory: sqlite works + name, + **self.db_config.get("args", {}) ) def create_resource_tree(self, redirect_root_to_web_client): @@ -352,15 +350,53 @@ def setup(config_options): tls_context_factory = context_factory.ServerContextFactory(config) + if config.database_config: + with open(config.database_config, 'r') as f: + db_config = yaml.safe_load(f) + else: + db_config = { + "name": "sqlite3", + "args": { + "database": config.database_path, + }, + } + + db_config = { + k: v for k, v in db_config.items() + } + + name = db_config.get("name", None) + if name in ["MySQLdb", "mysql.connector"]: + db_config.setdefault("args", {}).update({ + "sql_mode": "TRADITIONAL", + "charset": "utf8mb4", + "use_unicode": True, + "collation": "utf8mb4_bin", + }) + elif name == "psycopg2": + pass + elif name == "sqlite3": + db_config.setdefault("args", {}).update({ + "cp_min": 1, + "cp_max": 1, + }) + else: + raise RuntimeError("Unsupported database type '%s'" % (name,)) + + database_engine = create_engine(name) + db_config["args"]["cp_openfun"] = database_engine.on_new_connection + hs = SynapseHomeServer( config.server_name, domain_with_port=domain_with_port, upload_dir=os.path.abspath("uploads"), db_name=config.database_path, + db_config=db_config, tls_context_factory=tls_context_factory, config=config, content_addr=config.content_addr, version_string=version_string, + database_engine=database_engine, ) hs.create_resource_tree( @@ -372,9 +408,16 @@ def setup(config_options): logger.info("Preparing database: %s...", db_name) try: - with sqlite3.connect(db_name) as db_conn: - prepare_sqlite3_database(db_conn) - prepare_database(db_conn) + db_conn = database_engine.module.connect( + **{ + k: v for k, v in db_config.get("args", {}).items() + if not k.startswith("cp_") + } + ) + + database_engine.prepare_database(db_conn) + + db_conn.commit() except UpgradeDatabaseException: sys.stderr.write( "\nFailed to upgrade database.\n" diff --git a/synapse/config/database.py b/synapse/config/database.py index 87efe54645..8dc9873f8c 100644 --- a/synapse/config/database.py +++ b/synapse/config/database.py @@ -26,6 +26,11 @@ class DatabaseConfig(Config): self.database_path = self.abspath(args.database_path) self.event_cache_size = self.parse_size(args.event_cache_size) + if args.database_config: + self.database_config = self.abspath(args.database_config) + else: + self.database_config = None + @classmethod def add_arguments(cls, parser): super(DatabaseConfig, cls).add_arguments(parser) @@ -38,6 +43,10 @@ class DatabaseConfig(Config): "--event-cache-size", default="100K", help="Number of events to cache in memory." ) + db_group.add_argument( + "--database-config", default=None, + help="Location of the database configuration file." + ) @classmethod def generate_config(cls, args, config_dir_path): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8aceac28cf..98148c13d7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -179,7 +179,7 @@ class FederationHandler(BaseHandler): # it's probably a good idea to mark it as not in retry-state # for sending (although this is a bit of a leap) retry_timings = yield self.store.get_destination_retry_timings(origin) - if (retry_timings and retry_timings.retry_last_ts): + if retry_timings and retry_timings["retry_last_ts"]: self.store.set_destination_retry_timings(origin, 0, 0) room = yield self.store.get_room(event.room_id) diff --git a/synapse/handlers/login.py b/synapse/handlers/login.py index 7447800460..76647c7941 100644 --- a/synapse/handlers/login.py +++ b/synapse/handlers/login.py @@ -57,7 +57,7 @@ class LoginHandler(BaseHandler): logger.warn("Attempted to login as %s but they do not exist", user) raise LoginError(403, "", errcode=Codes.FORBIDDEN) - stored_hash = user_info[0]["password_hash"] + stored_hash = user_info["password_hash"] if bcrypt.checkpw(password, stored_hash): # generate an access token and store it. token = self.reg_handler._generate_token(user) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 823affc380..bc7f1c2402 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -311,25 +311,6 @@ class RoomMemberHandler(BaseHandler): defer.returnValue(chunk_data) @defer.inlineCallbacks - def get_room_member(self, room_id, member_user_id, auth_user_id): - """Retrieve a room member from a room. - - Args: - room_id : The room the member is in. - member_user_id : The member's user ID - auth_user_id : The user ID of the user making this request. - Returns: - The room member, or None if this member does not exist. - Raises: - SynapseError if something goes wrong. - """ - yield self.auth.check_joined_room(room_id, auth_user_id) - - member = yield self.store.get_room_member(user_id=member_user_id, - room_id=room_id) - defer.returnValue(member) - - @defer.inlineCallbacks def change_membership(self, event, context, do_auth=True): """ Change the membership status of a user in a room. diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py index 1e77eb49cf..7387b4adb9 100644 --- a/synapse/rest/client/v1/profile.py +++ b/synapse/rest/client/v1/profile.py @@ -19,9 +19,13 @@ from twisted.internet import defer from .base import ClientV1RestServlet, client_path_pattern from synapse.types import UserID +import logging import simplejson as json +logger = logging.getLogger(__name__) + + class ProfileDisplaynameRestServlet(ClientV1RestServlet): PATTERN = client_path_pattern("/profile/(?P<user_id>[^/]*)/displayname") @@ -47,7 +51,8 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet): defer.returnValue((400, "Unable to parse name")) yield self.handlers.profile_handler.set_displayname( - user, auth_user, new_name) + user, auth_user, new_name + ) defer.returnValue((200, {})) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f4dec70393..995114e405 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -51,7 +51,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 15 +SCHEMA_VERSION = 16 dir_path = os.path.abspath(os.path.dirname(__file__)) @@ -104,14 +104,16 @@ class DataStore(RoomMemberStore, RoomStore, self.client_ip_last_seen.prefill(*key + (now,)) - yield self._simple_insert( + yield self._simple_upsert( "user_ips", - { - "user": user.to_string(), + keyvalues={ + "user_id": user.to_string(), "access_token": access_token, - "device_id": device_id, "ip": ip, "user_agent": user_agent, + }, + values={ + "device_id": device_id, "last_seen": now, }, desc="insert_client_ip", @@ -120,7 +122,7 @@ class DataStore(RoomMemberStore, RoomStore, def get_user_ip_and_agents(self, user): return self._simple_select_list( table="user_ips", - keyvalues={"user": user.to_string()}, + keyvalues={"user_id": user.to_string()}, retcols=[ "device_id", "access_token", "ip", "user_agent", "last_seen" ], @@ -148,21 +150,23 @@ class UpgradeDatabaseException(PrepareDatabaseException): pass -def prepare_database(db_conn): +def prepare_database(db_conn, database_engine): """Prepares a database for usage. Will either create all necessary tables or upgrade from an older schema version. """ try: cur = db_conn.cursor() - version_info = _get_or_create_schema_state(cur) + version_info = _get_or_create_schema_state(cur, database_engine) if version_info: user_version, delta_files, upgraded = version_info - _upgrade_existing_database(cur, user_version, delta_files, upgraded) + _upgrade_existing_database( + cur, user_version, delta_files, upgraded, database_engine + ) else: - _setup_new_database(cur) + _setup_new_database(cur, database_engine) - cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,)) + # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,)) cur.close() db_conn.commit() @@ -171,7 +175,7 @@ def prepare_database(db_conn): raise -def _setup_new_database(cur): +def _setup_new_database(cur, database_engine): """Sets up the database by finding a base set of "full schemas" and then applying any necessary deltas. @@ -225,31 +229,30 @@ def _setup_new_database(cur): directory_entries = os.listdir(sql_dir) - sql_script = "BEGIN TRANSACTION;\n" for filename in fnmatch.filter(directory_entries, "*.sql"): sql_loc = os.path.join(sql_dir, filename) logger.debug("Applying schema %s", sql_loc) - sql_script += read_schema(sql_loc) - sql_script += "\n" - sql_script += "COMMIT TRANSACTION;" - cur.executescript(sql_script) + executescript(cur, sql_loc) cur.execute( - "INSERT OR REPLACE INTO schema_version (version, upgraded)" - " VALUES (?,?)", - (max_current_ver, False) + database_engine.convert_param_style( + "INSERT INTO schema_version (version, upgraded)" + " VALUES (?,?)" + ), + (max_current_ver, False,) ) _upgrade_existing_database( cur, current_version=max_current_ver, applied_delta_files=[], - upgraded=False + upgraded=False, + database_engine=database_engine, ) def _upgrade_existing_database(cur, current_version, applied_delta_files, - upgraded): + upgraded, database_engine): """Upgrades an existing database. Delta files can either be SQL stored in *.sql files, or python modules @@ -305,6 +308,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, if not upgraded: start_ver += 1 + logger.debug("applied_delta_files: %s", applied_delta_files) + for v in range(start_ver, SCHEMA_VERSION + 1): logger.debug("Upgrading schema to v%d", v) @@ -321,6 +326,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, directory_entries.sort() for file_name in directory_entries: relative_path = os.path.join(str(v), file_name) + logger.debug("Found file: %s", relative_path) if relative_path in applied_delta_files: continue @@ -342,9 +348,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, module.run_upgrade(cur) elif ext == ".sql": # A plain old .sql file, just read and execute it - delta_schema = read_schema(absolute_path) logger.debug("Applying schema %s", relative_path) - cur.executescript(delta_schema) + executescript(cur, absolute_path) else: # Not a valid delta file. logger.warn( @@ -356,24 +361,82 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, # Mark as done. cur.execute( - "INSERT INTO applied_schema_deltas (version, file)" - " VALUES (?,?)", + database_engine.convert_param_style( + "INSERT INTO applied_schema_deltas (version, file)" + " VALUES (?,?)", + ), (v, relative_path) ) cur.execute( - "INSERT OR REPLACE INTO schema_version (version, upgraded)" - " VALUES (?,?)", + database_engine.convert_param_style( + "REPLACE INTO schema_version (version, upgraded)" + " VALUES (?,?)", + ), (v, True) ) -def _get_or_create_schema_state(txn): +def get_statements(f): + statement_buffer = "" + in_comment = False # If we're in a /* ... */ style comment + + for line in f: + line = line.strip() + + if in_comment: + # Check if this line contains an end to the comment + comments = line.split("*/", 1) + if len(comments) == 1: + continue + line = comments[1] + in_comment = False + + # Remove inline block comments + line = re.sub(r"/\*.*\*/", " ", line) + + # Does this line start a comment? + comments = line.split("/*", 1) + if len(comments) > 1: + line = comments[0] + in_comment = True + + # Deal with line comments + line = line.split("--", 1)[0] + line = line.split("//", 1)[0] + + # Find *all* semicolons. We need to treat first and last entry + # specially. + statements = line.split(";") + + # We must prepend statement_buffer to the first statement + first_statement = "%s %s" % ( + statement_buffer.strip(), + statements[0].strip() + ) + statements[0] = first_statement + + # Every entry, except the last, is a full statement + for statement in statements[:-1]: + yield statement.strip() + + # The last entry did *not* end in a semicolon, so we store it for the + # next semicolon we find + statement_buffer = statements[-1].strip() + + +def executescript(txn, schema_path): + with open(schema_path, 'r') as f: + for statement in get_statements(f): + txn.execute(statement) + + +def _get_or_create_schema_state(txn, database_engine): + # Bluntly try creating the schema_version tables. schema_path = os.path.join( dir_path, "schema", "schema_version.sql", ) - create_schema = read_schema(schema_path) - txn.executescript(create_schema) + executescript(txn, schema_path) txn.execute("SELECT version, upgraded FROM schema_version") row = txn.fetchone() @@ -382,10 +445,13 @@ def _get_or_create_schema_state(txn): if current_version: txn.execute( - "SELECT file FROM applied_schema_deltas WHERE version >= ?", + database_engine.convert_param_style( + "SELECT file FROM applied_schema_deltas WHERE version >= ?" + ), (current_version,) ) - return current_version, txn.fetchall(), upgraded + applied_deltas = [d for d, in txn.fetchall()] + return current_version, applied_deltas, upgraded return None @@ -417,7 +483,7 @@ def prepare_sqlite3_database(db_conn): if row and row[0]: db_conn.execute( - "INSERT OR REPLACE INTO schema_version (version, upgraded)" + "REPLACE INTO schema_version (version, upgraded)" " VALUES (?,?)", (row[0], False) ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e3e67d8e0d..fa5199104a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -22,6 +22,8 @@ from synapse.util.logcontext import PreserveLoggingContext, LoggingContext from synapse.util.lrucache import LruCache import synapse.metrics +from util.id_generators import IdGenerator, StreamIdGenerator + from twisted.internet import defer from collections import namedtuple, OrderedDict @@ -145,11 +147,12 @@ class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() method.""" - __slots__ = ["txn", "name"] + __slots__ = ["txn", "name", "database_engine"] - def __init__(self, txn, name): + def __init__(self, txn, name, database_engine): object.__setattr__(self, "txn", txn) object.__setattr__(self, "name", name) + object.__setattr__(self, "database_engine", database_engine) def __getattr__(self, name): return getattr(self.txn, name) @@ -161,26 +164,32 @@ class LoggingTransaction(object): # TODO(paul): Maybe use 'info' and 'debug' for values? sql_logger.debug("[SQL] {%s} %s", self.name, sql) - try: - if args and args[0]: - values = args[0] + sql = self.database_engine.convert_param_style(sql) + + if args and args[0]: + args = list(args) + args[0] = [ + self.database_engine.encode_parameter(a) for a in args[0] + ] + try: sql_logger.debug( - "[SQL values] {%s} " + ", ".join(("<%r>",) * len(values)), + "[SQL values] {%s} " + ", ".join(("<%r>",) * len(args[0])), self.name, - *values + *args[0] ) - except: - # Don't let logging failures stop SQL from working - pass + except: + # Don't let logging failures stop SQL from working + pass start = time.time() * 1000 + try: return self.txn.execute( sql, *args, **kwargs ) - except: - logger.exception("[SQL FAIL] {%s}", self.name) - raise + except Exception as e: + logger.debug("[SQL FAIL] {%s} %s", self.name, e) + raise finally: msecs = (time.time() * 1000) - start sql_logger.debug("[SQL time] {%s} %f", self.name, msecs) @@ -245,6 +254,13 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) + self.database_engine = hs.database_engine + + self._stream_id_gen = StreamIdGenerator() + self._transaction_id_gen = IdGenerator("sent_transactions", "id", self) + self._state_groups_id_gen = IdGenerator("state_groups", "id", self) + self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self) + def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() @@ -281,7 +297,7 @@ class SQLBaseStore(object): start_time = time.time() * 1000 - def inner_func(txn, *args, **kwargs): + def inner_func(conn, *args, **kwargs): with LoggingContext("runInteraction") as context: current_context.copy_to(context) start = time.time() * 1000 @@ -296,9 +312,25 @@ class SQLBaseStore(object): sql_scheduling_timer.inc_by(time.time() * 1000 - start_time) transaction_logger.debug("[TXN START] {%s}", name) try: - return func(LoggingTransaction(txn, name), *args, **kwargs) - except: - logger.exception("[TXN FAIL] {%s}", name) + i = 0 + N = 5 + while True: + try: + txn = conn.cursor() + return func( + LoggingTransaction(txn, name, self.database_engine), + *args, **kwargs + ) + except self.database_engine.module.DatabaseError as e: + if self.database_engine.is_deadlock(e): + logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N) + if i < N: + i += 1 + conn.rollback() + continue + raise + except Exception as e: + logger.debug("[TXN FAIL] {%s} %s", name, e) raise finally: end = time.time() * 1000 @@ -311,7 +343,7 @@ class SQLBaseStore(object): sql_txn_timer.inc_by(duration, desc) with PreserveLoggingContext(): - result = yield self._db_pool.runInteraction( + result = yield self._db_pool.runWithConnection( inner_func, *args, **kwargs ) defer.returnValue(result) @@ -342,11 +374,11 @@ class SQLBaseStore(object): The result of decoder(results) """ def interaction(txn): - cursor = txn.execute(query, args) + txn.execute(query, args) if decoder: - return decoder(cursor) + return decoder(txn) else: - return cursor.fetchall() + return txn.fetchall() return self.runInteraction(desc, interaction) @@ -356,27 +388,23 @@ class SQLBaseStore(object): # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. - def _simple_insert(self, table, values, or_replace=False, or_ignore=False, + def _simple_insert(self, table, values, or_ignore=False, desc="_simple_insert"): """Executes an INSERT query on the named table. Args: table : string giving the table name values : dict of new column names and values for them - or_replace : bool; if True performs an INSERT OR REPLACE """ return self.runInteraction( desc, - self._simple_insert_txn, table, values, or_replace=or_replace, - or_ignore=or_ignore, + self._simple_insert_txn, table, values, + or_ignore=or_ignore ) @log_function - def _simple_insert_txn(self, txn, table, values, or_replace=False, - or_ignore=False): - sql = "%s INTO %s (%s) VALUES(%s)" % ( - ("INSERT OR REPLACE" if or_replace else - "INSERT OR IGNORE" if or_ignore else "INSERT"), + def _simple_insert_txn(self, txn, table, values, or_ignore=False): + sql = "INSERT INTO %s (%s) VALUES(%s)" % ( table, ", ".join(k for k in values), ", ".join("?" for k in values) @@ -387,8 +415,11 @@ class SQLBaseStore(object): sql, values.values(), ) - txn.execute(sql, values.values()) - return txn.lastrowid + try: + txn.execute(sql, values.values()) + except self.database_engine.module.IntegrityError: + if not or_ignore: + raise def _simple_upsert(self, table, keyvalues, values, desc="_simple_upsert"): """ @@ -489,8 +520,7 @@ class SQLBaseStore(object): def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol): sql = ( - "SELECT %(retcol)s FROM %(table)s WHERE %(where)s " - "ORDER BY rowid asc" + "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" ) % { "retcol": retcol, "table": table, @@ -548,14 +578,14 @@ class SQLBaseStore(object): retcols : list of strings giving the names of the columns to return """ if keyvalues: - sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( + sql = "SELECT %s FROM %s WHERE %s" % ( ", ".join(retcols), table, " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) txn.execute(sql, keyvalues.values()) else: - sql = "SELECT %s FROM %s ORDER BY rowid asc" % ( + sql = "SELECT %s FROM %s" % ( ", ".join(retcols), table ) @@ -607,10 +637,10 @@ class SQLBaseStore(object): def _simple_select_one_txn(self, txn, table, keyvalues, retcols, allow_none=False): - select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( + select_sql = "SELECT %s FROM %s WHERE %s" % ( ", ".join(retcols), table, - " AND ".join("%s = ?" % (k) for k in keyvalues) + " AND ".join("%s = ?" % (k,) for k in keyvalues) ) txn.execute(select_sql, keyvalues.values()) @@ -648,6 +678,11 @@ class SQLBaseStore(object): updatevalues=updatevalues, ) + # if txn.rowcount == 0: + # raise StoreError(404, "No row found") + if txn.rowcount > 1: + raise StoreError(500, "More than one row matched") + return ret return self.runInteraction(desc, func) @@ -777,6 +812,9 @@ class SQLBaseStore(object): internal_metadata, js, redacted, rejected_reason = res + internal_metadata = self.database_engine.load_unicode(internal_metadata) + js = self.database_engine.load_unicode(js) + start_time = update_counter("select_event", start_time) result = self._get_event_from_row_txn( @@ -803,9 +841,11 @@ class SQLBaseStore(object): sql_getevents_timer.inc_by(curr_time - last_time, desc) return curr_time + logger.debug("Got js: %r", js) d = json.loads(js) start_time = update_counter("decode_json", start_time) + logger.debug("Got internal_metadata: %r", internal_metadata) internal_metadata = json.loads(internal_metadata) start_time = update_counter("decode_internal", start_time) @@ -860,6 +900,12 @@ class SQLBaseStore(object): result = txn.fetchone() return result[0] if result else None + def get_next_stream_id(self): + with self._next_stream_id_lock: + i = self._next_stream_id + self._next_stream_id += 1 + return i + class _RollbackButIsFineException(Exception): """ This exception is used to rollback a transaction without implying @@ -883,7 +929,7 @@ class Table(object): _select_where_clause = "SELECT %s FROM %s WHERE %s" _select_clause = "SELECT %s FROM %s" - _insert_clause = "INSERT OR REPLACE INTO %s (%s) VALUES (%s)" + _insert_clause = "REPLACE INTO %s (%s) VALUES (%s)" @classmethod def select_statement(cls, where_clause=None): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index f8cbb3f323..40e05b3635 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -366,11 +366,13 @@ class ApplicationServiceTransactionStore(SQLBaseStore): new_txn_id = max(highest_txn_id, last_txn_id) + 1 # Insert new txn into txn table - event_ids = [e.event_id for e in events] + event_ids = buffer( + json.dumps([e.event_id for e in events]).encode("utf8") + ) txn.execute( "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " "VALUES(?,?,?)", - (service.id, new_txn_id, json.dumps(event_ids)) + (service.id, new_txn_id, event_ids) ) return AppServiceTransaction( service=service, id=new_txn_id, events=events diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 0199539fea..2b2bdf8615 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -21,8 +21,6 @@ from twisted.internet import defer from collections import namedtuple -import sqlite3 - RoomAliasMapping = namedtuple( "RoomAliasMapping", @@ -91,7 +89,7 @@ class DirectoryStore(SQLBaseStore): }, desc="create_room_alias_association", ) - except sqlite3.IntegrityError: + except self.database_engine.module.IntegrityError: raise SynapseError( 409, "Room alias %s already exists" % room_alias.to_string() ) @@ -120,12 +118,12 @@ class DirectoryStore(SQLBaseStore): defer.returnValue(room_id) def _delete_room_alias_txn(self, txn, room_alias): - cursor = txn.execute( + txn.execute( "SELECT room_id FROM room_aliases WHERE room_alias = ?", (room_alias.to_string(),) ) - res = cursor.fetchone() + res = txn.fetchone() if res: room_id = res[0] else: diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py new file mode 100644 index 0000000000..548d4e1b42 --- /dev/null +++ b/synapse/storage/engines/__init__.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .maria import MariaEngine +from .postgres import PostgresEngine +from .sqlite3 import Sqlite3Engine + +import importlib + + +SUPPORTED_MODULE = { + "sqlite3": Sqlite3Engine, + "mysql.connector": MariaEngine, + "psycopg2": PostgresEngine, +} + + +def create_engine(name): + engine_class = SUPPORTED_MODULE.get(name, None) + + if engine_class: + module = importlib.import_module(name) + return engine_class(module) + + raise RuntimeError( + "Unsupported database engine '%s'" % (name,) + ) diff --git a/synapse/storage/engines/maria.py b/synapse/storage/engines/maria.py new file mode 100644 index 0000000000..90165f6849 --- /dev/null +++ b/synapse/storage/engines/maria.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage import prepare_database + +import types + + +class MariaEngine(object): + def __init__(self, database_module): + self.module = database_module + + def convert_param_style(self, sql): + return sql.replace("?", "%s") + + def encode_parameter(self, param): + if isinstance(param, types.BufferType): + return bytes(param) + return param + + def on_new_connection(self, db_conn): + pass + + def prepare_database(self, db_conn): + cur = db_conn.cursor() + cur.execute( + "ALTER DATABASE CHARACTER SET utf8mb4 COLLATE utf8mb4_bin" + ) + db_conn.commit() + prepare_database(db_conn, self) + + def is_deadlock(self, error): + if isinstance(error, self.module.DatabaseError): + return error.sqlstate == "40001" and error.errno == 1213 + return False + + def load_unicode(self, v): + return bytes(v).decode("UTF8") diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py new file mode 100644 index 0000000000..457c1f70a5 --- /dev/null +++ b/synapse/storage/engines/postgres.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage import prepare_database + + +class PostgresEngine(object): + def __init__(self, database_module): + self.module = database_module + self.module.extensions.register_type(self.module.extensions.UNICODE) + + def convert_param_style(self, sql): + return sql.replace("?", "%s") + + def encode_parameter(self, param): + return param + + def on_new_connection(self, db_conn): + db_conn.set_isolation_level( + self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ + ) + + def prepare_database(self, db_conn): + prepare_database(db_conn, self) + + def is_deadlock(self, error): + if isinstance(error, self.module.DatabaseError): + return error.pgcode in ["40001", "40P01"] + return False + + def load_unicode(self, v): + return bytes(v).decode("UTF8") diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py new file mode 100644 index 0000000000..389df35eb5 --- /dev/null +++ b/synapse/storage/engines/sqlite3.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage import prepare_database, prepare_sqlite3_database + +import types + + +class Sqlite3Engine(object): + def __init__(self, database_module): + self.module = database_module + + def convert_param_style(self, sql): + return sql + + def encode_parameter(self, param): + return param + + def on_new_connection(self, db_conn): + self.prepare_database(db_conn) + + def prepare_database(self, db_conn): + prepare_sqlite3_database(db_conn) + prepare_database(db_conn, self) + + def is_deadlock(self, error): + return False + + def load_unicode(self, v): + if isinstance(v, types.UnicodeType): + return v + return bytes(v).decode("UTF8") diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 032334bfd6..54a3c9d805 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -153,7 +153,7 @@ class EventFederationStore(SQLBaseStore): results = self._get_prev_events_and_state( txn, event_id, - is_state=1, + is_state=True, ) return [(e_id, h, ) for e_id, h, _ in results] @@ -164,7 +164,7 @@ class EventFederationStore(SQLBaseStore): } if is_state is not None: - keyvalues["is_state"] = is_state + keyvalues["is_state"] = bool(is_state) res = self._simple_select_list_txn( txn, @@ -242,7 +242,6 @@ class EventFederationStore(SQLBaseStore): "room_id": room_id, "min_depth": depth, }, - or_replace=True, ) def _handle_prev_events(self, txn, outlier, event_id, prev_events, @@ -260,9 +259,8 @@ class EventFederationStore(SQLBaseStore): "event_id": event_id, "prev_event_id": e_id, "room_id": room_id, - "is_state": 0, + "is_state": False, }, - or_ignore=True, ) # Update the extremities table if this is not an outlier. @@ -281,19 +279,19 @@ class EventFederationStore(SQLBaseStore): # We only insert as a forward extremity the new event if there are # no other events that reference it as a prev event query = ( - "INSERT OR IGNORE INTO %(table)s (event_id, room_id) " - "SELECT ?, ? WHERE NOT EXISTS (" - "SELECT 1 FROM %(event_edges)s WHERE " - "prev_event_id = ? " - ")" - ) % { - "table": "event_forward_extremities", - "event_edges": "event_edges", - } + "SELECT 1 FROM event_edges WHERE prev_event_id = ?" + ) - logger.debug("query: %s", query) + txn.execute(query, (event_id,)) - txn.execute(query, (event_id, room_id, event_id)) + if not txn.fetchone(): + query = ( + "INSERT INTO event_forward_extremities" + " (event_id, room_id)" + " VALUES (?, ?)" + ) + + txn.execute(query, (event_id, room_id)) # Insert all the prev_events as a backwards thing, they'll get # deleted in a second if they're incorrect anyway. @@ -306,7 +304,6 @@ class EventFederationStore(SQLBaseStore): "event_id": e_id, "room_id": room_id, }, - or_ignore=True, ) # Also delete from the backwards extremities table all ones that @@ -400,7 +397,7 @@ class EventFederationStore(SQLBaseStore): query = ( "SELECT prev_event_id FROM event_edges " - "WHERE room_id = ? AND event_id = ? AND is_state = 0 " + "WHERE room_id = ? AND event_id = ? AND is_state = ? " "LIMIT ?" ) @@ -409,7 +406,7 @@ class EventFederationStore(SQLBaseStore): for event_id in front: txn.execute( query, - (room_id, event_id, limit - len(event_results)) + (room_id, event_id, False, limit - len(event_results)) ) for e_id, in txn.fetchall(): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2425f57f5f..0373d152b2 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -52,7 +52,6 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) - self.get_room_events_max_id.invalidate() except _RollbackButIsFineException: pass @@ -96,12 +95,22 @@ class EventsStore(SQLBaseStore): # Remove the any existing cache entries for the event_id self._invalidate_get_event_cache(event.event_id) + if stream_ordering is None: + with self._stream_id_gen.get_next_txn(txn) as stream_ordering: + return self._persist_event_txn( + txn, event, context, backfilled, + stream_ordering=stream_ordering, + is_new_state=is_new_state, + current_state=current_state, + ) + # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) + self._simple_delete_txn( + txn, + table="current_state_events", + keyvalues={"room_id": event.room_id}, ) for s in current_state: @@ -114,7 +123,6 @@ class EventsStore(SQLBaseStore): "type": s.type, "state_key": s.state_key, }, - or_replace=True, ) if event.is_state() and is_new_state: @@ -128,7 +136,6 @@ class EventsStore(SQLBaseStore): "type": event.type, "state_key": event.state_key, }, - or_replace=True, ) for prev_state_id, _ in event.prev_state: @@ -151,14 +158,6 @@ class EventsStore(SQLBaseStore): event.depth ) - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, - ) - have_persisted = self._simple_select_one_onecol_txn( txn, table="event_json", @@ -185,23 +184,29 @@ class EventsStore(SQLBaseStore): ) txn.execute( sql, - (metadata_json.decode("UTF-8"), event.event_id,) + (buffer(metadata_json), event.event_id,) ) sql = ( - "UPDATE events SET outlier = 0" + "UPDATE events SET outlier = ?" " WHERE event_id = ?" ) txn.execute( sql, - (event.event_id,) + (False, event.event_id,) ) return + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + if event.type == EventTypes.Member: self._store_room_member_txn(txn, event) - elif event.type == EventTypes.Feedback: - self._store_feedback_txn(txn, event) elif event.type == EventTypes.Name: self._store_room_name_txn(txn, event) elif event.type == EventTypes.Topic: @@ -224,15 +229,14 @@ class EventsStore(SQLBaseStore): values={ "event_id": event.event_id, "room_id": event.room_id, - "internal_metadata": metadata_json.decode("UTF-8"), - "json": encode_canonical_json(event_dict).decode("UTF-8"), + "internal_metadata": buffer(metadata_json), + "json": buffer(encode_canonical_json(event_dict)), }, - or_replace=True, ) - content = encode_canonical_json( + content = buffer(encode_canonical_json( event.content - ).decode("UTF-8") + )) vals = { "topological_ordering": event.depth, @@ -245,9 +249,6 @@ class EventsStore(SQLBaseStore): "depth": event.depth, } - if stream_ordering is not None: - vals["stream_ordering"] = stream_ordering - unrec = { k: v for k, v in event.get_dict().items() @@ -260,25 +261,24 @@ class EventsStore(SQLBaseStore): ] } - vals["unrecognized_keys"] = encode_canonical_json( + vals["unrecognized_keys"] = buffer(encode_canonical_json( unrec - ).decode("UTF-8") + )) - try: - self._simple_insert_txn( - txn, - "events", - vals, - or_replace=(not outlier), - or_ignore=bool(outlier), - ) - except: - logger.warn( - "Failed to persist, probably duplicate: %s", - event.event_id, - exc_info=True, + sql = ( + "INSERT INTO events" + " (stream_ordering, topological_ordering, event_id, type," + " room_id, content, processed, outlier, depth)" + " VALUES (?,?,?,?,?,?,?,?,?)" + ) + + txn.execute( + sql, + ( + stream_ordering, event.depth, event.event_id, event.type, + event.room_id, content, True, outlier, event.depth ) - raise _RollbackButIsFineException("_persist_event") + ) if context.rejected: self._store_rejections_txn(txn, event.event_id, context.rejected) @@ -302,15 +302,17 @@ class EventsStore(SQLBaseStore): ) if is_new_state and not context.rejected: - self._simple_insert_txn( + self._simple_upsert_txn( txn, "current_state_events", - { - "event_id": event.event_id, + keyvalues={ "room_id": event.room_id, "type": event.type, "state_key": event.state_key, }, + values={ + "event_id": event.event_id, + } ) for e_id, h in event.prev_state: @@ -321,7 +323,7 @@ class EventsStore(SQLBaseStore): "event_id": event.event_id, "prev_event_id": e_id, "room_id": event.room_id, - "is_state": 1, + "is_state": True, }, ) diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 09d1e63657..d3b9b38664 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -57,16 +57,18 @@ class KeyStore(SQLBaseStore): OpenSSL.crypto.FILETYPE_ASN1, tls_certificate ) fingerprint = hashlib.sha256(tls_certificate_bytes).hexdigest() - return self._simple_insert( + return self._simple_upsert( table="server_tls_certificates", - values={ + keyvalues={ "server_name": server_name, "fingerprint": fingerprint, + }, + values={ "from_server": from_server, "ts_added_ms": time_now_ms, "tls_certificate": buffer(tls_certificate_bytes), }, - or_ignore=True, + desc="store_server_certificate", ) @defer.inlineCallbacks @@ -107,14 +109,16 @@ class KeyStore(SQLBaseStore): ts_now_ms (int): The time now in milliseconds verification_key (VerifyKey): The NACL verify key. """ - return self._simple_insert( + return self._simple_upsert( table="server_signature_keys", - values={ + keyvalues={ "server_name": server_name, "key_id": "%s:%s" % (verify_key.alg, verify_key.version), + }, + values={ "from_server": from_server, "ts_added_ms": time_now_ms, "verify_key": buffer(verify_key.encode()), }, - or_ignore=True, + desc="store_server_verify_key", ) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 87fba55439..22ec94bc16 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -57,6 +57,7 @@ class PresenceStore(SQLBaseStore): values={"observed_user_id": observed_localpart, "observer_user_id": observer_userid}, desc="allow_presence_visible", + or_ignore=True, ) def disallow_presence_visible(self, observed_localpart, observer_userid): diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index a6e52cb248..e33963d0b4 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore @@ -24,19 +26,25 @@ class ProfileStore(SQLBaseStore): desc="create_profile", ) + @defer.inlineCallbacks def get_profile_displayname(self, user_localpart): - return self._simple_select_one_onecol( + name = yield self._simple_select_one_onecol( table="profiles", keyvalues={"user_id": user_localpart}, retcol="displayname", desc="get_profile_displayname", ) + if name: + name = self.database_engine.load_unicode(name) + + defer.returnValue(name) + def set_profile_displayname(self, user_localpart, new_displayname): return self._simple_update_one( table="profiles", keyvalues={"user_id": user_localpart}, - updatevalues={"displayname": new_displayname}, + updatevalues={"displayname": new_displayname.encode("utf8")}, desc="set_profile_displayname", ) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index c47bdc2861..ee7718d5ed 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -154,7 +154,7 @@ class PushRuleStore(SQLBaseStore): txn.execute(sql, (user_name, priority_class, new_rule_priority)) # now insert the new rule - sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" (" + sql = "INSERT INTO "+PushRuleTable.table_name+" (" sql += ",".join(new_rule.keys())+") VALUES (" sql += ", ".join(["?" for _ in new_rule.keys()])+")" @@ -183,7 +183,7 @@ class PushRuleStore(SQLBaseStore): new_rule['priority_class'] = priority_class new_rule['priority'] = new_prio - sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" (" + sql = "INSERT INTO "+PushRuleTable.table_name+" (" sql += ",".join(new_rule.keys())+") VALUES (" sql += ", ".join(["?" for _ in new_rule.keys()])+")" diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index f24154f146..8a63fe4691 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -15,8 +15,6 @@ from twisted.internet import defer -from sqlite3 import IntegrityError - from synapse.api.errors import StoreError, Codes from ._base import SQLBaseStore, cached @@ -39,17 +37,13 @@ class RegistrationStore(SQLBaseStore): Raises: StoreError if there was a problem adding this. """ - row = yield self._simple_select_one( - "users", {"name": user_id}, ["id"], - desc="add_access_token_to_user", - ) - if not row: - raise StoreError(400, "Bad user ID supplied.") - row_id = row["id"] + next_id = yield self._access_tokens_id_gen.get_next() + yield self._simple_insert( "access_tokens", { - "user_id": row_id, + "id": next_id, + "user_id": user_id, "token": token }, desc="add_access_token_to_user", @@ -74,27 +68,43 @@ class RegistrationStore(SQLBaseStore): def _register(self, txn, user_id, token, password_hash): now = int(self.clock.time()) + next_id = self._access_tokens_id_gen.get_next_txn(txn) + try: txn.execute("INSERT INTO users(name, password_hash, creation_ts) " "VALUES (?,?,?)", [user_id, password_hash, now]) - except IntegrityError: + except self.database_engine.module.IntegrityError: raise StoreError( 400, "User ID already taken.", errcode=Codes.USER_IN_USE ) # it's possible for this to get a conflict, but only for a single user # since tokens are namespaced based on their user ID - txn.execute("INSERT INTO access_tokens(user_id, token) " + - "VALUES (?,?)", [txn.lastrowid, token]) + txn.execute( + "INSERT INTO access_tokens(id, user_id, token)" + " VALUES (?,?,?)", + (next_id, user_id, token,) + ) + @defer.inlineCallbacks def get_user_by_id(self, user_id): - query = ("SELECT users.name, users.password_hash FROM users" - " WHERE users.name = ?") - return self._execute( - "get_user_by_id", self.cursor_to_dict, query, user_id + user_info = yield self._simple_select_one( + table="users", + keyvalues={ + "name": user_id, + }, + retcols=["name", "password_hash"], + allow_none=True, ) + if user_info: + user_info["password_hash"] = self.database_engine.load_unicode( + user_info["password_hash"] + ) + + defer.returnValue(user_info) + @cached() # TODO(paul): Currently there's no code to invalidate this cache. That # means if/when we ever add internal ways to invalidate access tokens or @@ -134,12 +144,12 @@ class RegistrationStore(SQLBaseStore): "SELECT users.name, users.admin," " access_tokens.device_id, access_tokens.id as token_id" " FROM users" - " INNER JOIN access_tokens on users.id = access_tokens.user_id" + " INNER JOIN access_tokens on users.name = access_tokens.user_id" " WHERE token = ?" ) - cursor = txn.execute(sql, (token,)) - rows = self.cursor_to_dict(cursor) + txn.execute(sql, (token,)) + rows = self.cursor_to_dict(txn) if rows: return rows[0] diff --git a/synapse/storage/room.py b/synapse/storage/room.py index be3e28c2ea..48ebb33057 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -72,6 +72,7 @@ class RoomStore(SQLBaseStore): keyvalues={"room_id": room_id}, retcols=RoomsTable.fields, desc="get_room", + allow_none=True, ) @defer.inlineCallbacks @@ -102,24 +103,37 @@ class RoomStore(SQLBaseStore): "ON c.event_id = room_names.event_id " ) - # We use non printing ascii character US () as a seperator + # We use non printing ascii character US (\x1F) as a separator sql = ( - "SELECT r.room_id, n.name, t.topic, " - "group_concat(a.room_alias, '') " - "FROM rooms AS r " - "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id " - "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id " - "INNER JOIN room_aliases AS a ON a.room_id = r.room_id " - "WHERE r.is_public = ? " - "GROUP BY r.room_id " + "SELECT r.room_id, max(n.name), max(t.topic)" + " FROM rooms AS r" + " LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id" + " LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id" + " WHERE r.is_public = ?" + " GROUP BY r.room_id" ) % { "topic": topic_subquery, "name": name_subquery, } - c = txn.execute(sql, (is_public,)) + txn.execute(sql, (is_public,)) - return c.fetchall() + rows = txn.fetchall() + + for i, row in enumerate(rows): + room_id = row[0] + aliases = self._simple_select_onecol_txn( + txn, + table="room_aliases", + keyvalues={ + "room_id": room_id + }, + retcol="room_alias", + ) + + rows[i] = list(row) + [aliases] + + return rows rows = yield self.runInteraction( "get_rooms", f @@ -130,9 +144,10 @@ class RoomStore(SQLBaseStore): "room_id": r[0], "name": r[1], "topic": r[2], - "aliases": r[3].split(""), + "aliases": r[3], } for r in rows + if r[3] # We only return rooms that have at least one alias. ] defer.returnValue(ret) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 52c37c76f5..8ea5756d61 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -40,7 +40,6 @@ class RoomMemberStore(SQLBaseStore): """ try: target_user_id = event.state_key - domain = UserID.from_string(target_user_id).domain except: logger.exception( "Failed to parse target_user_id=%s", target_user_id @@ -65,42 +64,8 @@ class RoomMemberStore(SQLBaseStore): } ) - # Update room hosts table - if event.membership == Membership.JOIN: - sql = ( - "INSERT OR IGNORE INTO room_hosts (room_id, host) " - "VALUES (?, ?)" - ) - txn.execute(sql, (event.room_id, domain)) - elif event.membership != Membership.INVITE: - # Check if this was the last person to have left. - member_events = self._get_members_query_txn( - txn, - where_clause=("c.room_id = ? AND m.membership = ?" - " AND m.user_id != ?"), - where_values=(event.room_id, Membership.JOIN, target_user_id,) - ) - - joined_domains = set() - for e in member_events: - try: - joined_domains.add( - UserID.from_string(e.state_key).domain - ) - except: - # FIXME: How do we deal with invalid user ids in the db? - logger.exception("Invalid user_id: %s", event.state_key) - - if domain not in joined_domains: - sql = ( - "DELETE FROM room_hosts WHERE room_id = ? AND host = ?" - ) - - txn.execute(sql, (event.room_id, domain)) - self.get_rooms_for_user.invalidate(target_user_id) - @defer.inlineCallbacks def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -110,41 +75,27 @@ class RoomMemberStore(SQLBaseStore): Returns: Deferred: Results in a MembershipEvent or None. """ - rows = yield self._get_members_by_dict({ - "e.room_id": room_id, - "m.user_id": user_id, - }) + def f(txn): + events = self._get_members_events_txn( + txn, + room_id, + user_id=user_id, + ) - defer.returnValue(rows[0] if rows else None) + return events[0] if events else None - def _get_room_member(self, txn, user_id, room_id): - sql = ( - "SELECT e.* FROM events as e" - " INNER JOIN room_memberships as m" - " ON e.event_id = m.event_id" - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id" - " WHERE m.user_id = ? and e.room_id = ?" - " LIMIT 1" - ) - txn.execute(sql, (user_id, room_id)) - rows = self.cursor_to_dict(txn) - if rows: - return self._parse_events_txn(txn, rows)[0] - else: - return None + return self.runInteraction("get_room_member", f) def get_users_in_room(self, room_id): def f(txn): - sql = ( - "SELECT m.user_id FROM room_memberships as m" - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id" - " WHERE m.membership = ? AND m.room_id = ?" + + rows = self._get_members_rows_txn( + txn, + room_id=room_id, + membership=Membership.JOIN, ) - txn.execute(sql, (Membership.JOIN, room_id)) - return [r[0] for r in txn.fetchall()] + return [r["user_id"] for r in rows] return self.runInteraction("get_users_in_room", f) def get_room_members(self, room_id, membership=None): @@ -159,11 +110,14 @@ class RoomMemberStore(SQLBaseStore): list of namedtuples representing the members in this room. """ - where = {"m.room_id": room_id} - if membership: - where["m.membership"] = membership + def f(txn): + return self._get_members_events_txn( + txn, + room_id, + membership=membership, + ) - return self._get_members_by_dict(where) + return self.runInteraction("get_room_members", f) def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user @@ -209,32 +163,55 @@ class RoomMemberStore(SQLBaseStore): ] def get_joined_hosts_for_room(self, room_id): - return self._simple_select_onecol( - "room_hosts", - {"room_id": room_id}, - "host", - desc="get_joined_hosts_for_room", + return self.runInteraction( + "get_joined_hosts_for_room", + self._get_joined_hosts_for_room_txn, + room_id, + ) + + def _get_joined_hosts_for_room_txn(self, txn, room_id): + rows = self._get_members_rows_txn( + txn, + room_id, membership=Membership.JOIN + ) + + joined_domains = set( + UserID.from_string(r["user_id"]).domain + for r in rows ) - def _get_members_by_dict(self, where_dict): - clause = " AND ".join("%s = ?" % k for k in where_dict.keys()) - vals = where_dict.values() - return self._get_members_query(clause, vals) + return joined_domains def _get_members_query(self, where_clause, where_values): return self.runInteraction( - "get_members_query", self._get_members_query_txn, + "get_members_query", self._get_members_events_txn, where_clause, where_values ) - def _get_members_query_txn(self, txn, where_clause, where_values): + def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None): + rows = self._get_members_rows_txn( + txn, + room_id, membership, user_id, + ) + return self._get_events_txn(txn, [r["event_id"] for r in rows]) + + def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None): + where_clause = "c.room_id = ?" + where_values = [room_id] + + if membership: + where_clause += " AND m.membership = ?" + where_values.append(membership) + + if user_id: + where_clause += " AND m.user_id = ?" + where_values.append(user_id) + sql = ( - "SELECT e.* FROM events as e " - "INNER JOIN room_memberships as m " - "ON e.event_id = m.event_id " - "INNER JOIN current_state_events as c " - "ON m.event_id = c.event_id " - "WHERE %(where)s " + "SELECT m.* FROM room_memberships as m" + " INNER JOIN current_state_events as c" + " ON m.event_id = c.event_id" + " WHERE %(where)s" ) % { "where": where_clause, } @@ -242,8 +219,7 @@ class RoomMemberStore(SQLBaseStore): txn.execute(sql, where_values) rows = self.cursor_to_dict(txn) - results = self._parse_events_txn(txn, rows) - return results + return rows @cached() def get_rooms_for_user(self, user_id): diff --git a/synapse/storage/schema/delta/12/v12.sql b/synapse/storage/schema/delta/12/v12.sql index b87ef1fe79..a246943f5a 100644 --- a/synapse/storage/schema/delta/12/v12.sql +++ b/synapse/storage/schema/delta/12/v12.sql @@ -14,54 +14,50 @@ */ CREATE TABLE IF NOT EXISTS rejections( - event_id TEXT NOT NULL, - reason TEXT NOT NULL, - last_check TEXT NOT NULL, - CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE + event_id VARCHAR(150) NOT NULL, + reason VARCHAR(150) NOT NULL, + last_check VARCHAR(150) NOT NULL, + UNIQUE (event_id) ); -- Push notification endpoints that users have configured CREATE TABLE IF NOT EXISTS pushers ( id INTEGER PRIMARY KEY AUTOINCREMENT, - user_name TEXT NOT NULL, - profile_tag varchar(32) NOT NULL, - kind varchar(8) NOT NULL, - app_id varchar(64) NOT NULL, - app_display_name varchar(64) NOT NULL, - device_display_name varchar(128) NOT NULL, - pushkey blob NOT NULL, - ts BIGINT NOT NULL, - lang varchar(8), - data blob, + user_name VARCHAR(150) NOT NULL, + profile_tag VARCHAR(32) NOT NULL, + kind VARCHAR(8) NOT NULL, + app_id VARCHAR(64) NOT NULL, + app_display_name VARCHAR(64) NOT NULL, + device_display_name VARCHAR(128) NOT NULL, + pushkey VARBINARY(512) NOT NULL, + ts BIGINT UNSIGNED NOT NULL, + lang VARCHAR(8), + data LONGBLOB, last_token TEXT, - last_success BIGINT, - failing_since BIGINT, - FOREIGN KEY(user_name) REFERENCES users(name), + last_success BIGINT UNSIGNED, + failing_since BIGINT UNSIGNED, UNIQUE (app_id, pushkey) ); CREATE TABLE IF NOT EXISTS push_rules ( id INTEGER PRIMARY KEY AUTOINCREMENT, - user_name TEXT NOT NULL, - rule_id TEXT NOT NULL, + user_name VARCHAR(150) NOT NULL, + rule_id VARCHAR(150) NOT NULL, priority_class TINYINT NOT NULL, priority INTEGER NOT NULL DEFAULT 0, - conditions TEXT NOT NULL, - actions TEXT NOT NULL, + conditions VARCHAR(150) NOT NULL, + actions VARCHAR(150) NOT NULL, UNIQUE(user_name, rule_id) ); CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name); CREATE TABLE IF NOT EXISTS user_filters( - user_id TEXT, - filter_id INTEGER, - filter_json TEXT, - FOREIGN KEY(user_id) REFERENCES users(id) + user_id VARCHAR(150), + filter_id BIGINT UNSIGNED, + filter_json LONGBLOB ); CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters( - user_id, filter_id + user_id, filter_id ); - -PRAGMA user_version = 12; diff --git a/synapse/storage/schema/delta/13/v13.sql b/synapse/storage/schema/delta/13/v13.sql index e491ad5aec..d1da2b48e2 100644 --- a/synapse/storage/schema/delta/13/v13.sql +++ b/synapse/storage/schema/delta/13/v13.sql @@ -15,20 +15,17 @@ CREATE TABLE IF NOT EXISTS application_services( id INTEGER PRIMARY KEY AUTOINCREMENT, - url TEXT, - token TEXT, - hs_token TEXT, - sender TEXT, - UNIQUE(token) ON CONFLICT ROLLBACK + url VARCHAR(150), + token VARCHAR(150), + hs_token VARCHAR(150), + sender VARCHAR(150), + UNIQUE(token) ); CREATE TABLE IF NOT EXISTS application_services_regex( id INTEGER PRIMARY KEY AUTOINCREMENT, - as_id INTEGER NOT NULL, + as_id BIGINT UNSIGNED NOT NULL, namespace INTEGER, /* enum[room_id|room_alias|user_id] */ - regex TEXT, + regex VARCHAR(150), FOREIGN KEY(as_id) REFERENCES application_services(id) ); - - - diff --git a/synapse/storage/schema/delta/14/v14.sql b/synapse/storage/schema/delta/14/v14.sql index 0212726448..8c47d4b0f4 100644 --- a/synapse/storage/schema/delta/14/v14.sql +++ b/synapse/storage/schema/delta/14/v14.sql @@ -1,7 +1,7 @@ CREATE TABLE IF NOT EXISTS push_rules_enable ( id INTEGER PRIMARY KEY AUTOINCREMENT, - user_name TEXT NOT NULL, - rule_id TEXT NOT NULL, + user_name VARCHAR(150) NOT NULL, + rule_id VARCHAR(150) NOT NULL, enabled TINYINT, UNIQUE(user_name, rule_id) ); diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql index 2b27e2a429..ddea8fc693 100644 --- a/synapse/storage/schema/delta/15/appservice_txns.sql +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -14,17 +14,18 @@ */ CREATE TABLE IF NOT EXISTS application_services_state( - as_id TEXT PRIMARY KEY, - state TEXT, - last_txn TEXT + as_id VARCHAR(150) PRIMARY KEY, + state VARCHAR(5), + last_txn INTEGER ); CREATE TABLE IF NOT EXISTS application_services_txns( - as_id TEXT NOT NULL, + as_id VARCHAR(150) NOT NULL, txn_id INTEGER NOT NULL, - event_ids TEXT NOT NULL, - UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK + event_ids LONGBLOB NOT NULL, + UNIQUE(as_id, txn_id) ); - - +CREATE INDEX IF NOT EXISTS application_services_txns_id ON application_services_txns ( + as_id +); diff --git a/synapse/storage/schema/delta/15/presence_indices.sql b/synapse/storage/schema/delta/15/presence_indices.sql new file mode 100644 index 0000000000..6b8d0f1ca7 --- /dev/null +++ b/synapse/storage/schema/delta/15/presence_indices.sql @@ -0,0 +1,2 @@ + +CREATE INDEX IF NOT EXISTS presence_list_user_id ON presence_list (user_id); diff --git a/synapse/storage/schema/delta/16/remote_media_cache_index.sql b/synapse/storage/schema/delta/16/remote_media_cache_index.sql new file mode 100644 index 0000000000..7a15265cb1 --- /dev/null +++ b/synapse/storage/schema/delta/16/remote_media_cache_index.sql @@ -0,0 +1,2 @@ +CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id + ON remote_media_cache_thumbnails (media_id); \ No newline at end of file diff --git a/synapse/storage/schema/delta/16/remove_duplicates.sql b/synapse/storage/schema/delta/16/remove_duplicates.sql new file mode 100644 index 0000000000..65c97b5e2f --- /dev/null +++ b/synapse/storage/schema/delta/16/remove_duplicates.sql @@ -0,0 +1,9 @@ + + +DELETE FROM event_to_state_groups WHERE state_group not in ( + SELECT MAX(state_group) FROM event_to_state_groups GROUP BY event_id +); + +DELETE FROM event_to_state_groups WHERE rowid not in ( + SELECT MIN(rowid) FROM event_to_state_groups GROUP BY event_id +); diff --git a/synapse/storage/schema/delta/16/unique_constraints.sql b/synapse/storage/schema/delta/16/unique_constraints.sql new file mode 100644 index 0000000000..f9fbb6b448 --- /dev/null +++ b/synapse/storage/schema/delta/16/unique_constraints.sql @@ -0,0 +1,72 @@ + +-- We can use SQLite features here, since mysql support was only added in v16 + +-- +DELETE FROM current_state_events WHERE rowid not in ( + SELECT MIN(rowid) FROM current_state_events GROUP BY event_id +); + +DROP INDEX IF EXISTS current_state_events_event_id; +CREATE UNIQUE INDEX current_state_events_event_id ON current_state_events(event_id); + +-- +DELETE FROM room_memberships WHERE rowid not in ( + SELECT MIN(rowid) FROM room_memberships GROUP BY event_id +); + +DROP INDEX IF EXISTS room_memberships_event_id; +CREATE UNIQUE INDEX room_memberships_event_id ON room_memberships(event_id); + +-- +DELETE FROM feedback WHERE rowid not in ( + SELECT MIN(rowid) FROM feedback GROUP BY event_id +); + +DROP INDEX IF EXISTS feedback_event_id; +CREATE UNIQUE INDEX feedback_event_id ON feedback(event_id); + +-- +DELETE FROM topics WHERE rowid not in ( + SELECT MIN(rowid) FROM topics GROUP BY event_id +); + +DROP INDEX IF EXISTS topics_event_id; +CREATE UNIQUE INDEX topics_event_id ON topics(event_id); + +-- +DELETE FROM room_names WHERE rowid not in ( + SELECT MIN(rowid) FROM room_names GROUP BY event_id +); + +DROP INDEX IF EXISTS room_names_id; +CREATE UNIQUE INDEX room_names_id ON room_names(event_id); + +-- +DELETE FROM presence WHERE rowid not in ( + SELECT MIN(rowid) FROM presence GROUP BY user_id +); + +DROP INDEX IF EXISTS presence_id; +CREATE UNIQUE INDEX presence_id ON presence(user_id); + +-- +DELETE FROM presence_allow_inbound WHERE rowid not in ( + SELECT MIN(rowid) FROM presence_allow_inbound + GROUP BY observed_user_id, observer_user_id +); + +DROP INDEX IF EXISTS presence_allow_inbound_observers; +CREATE UNIQUE INDEX presence_allow_inbound_observers ON presence_allow_inbound( + observed_user_id, observer_user_id +); + +-- +DELETE FROM presence_list WHERE rowid not in ( + SELECT MIN(rowid) FROM presence_list + GROUP BY user_id, observed_user_id +); + +DROP INDEX IF EXISTS presence_list_observers; +CREATE UNIQUE INDEX presence_list_observers ON presence_list( + user_id, observed_user_id +); diff --git a/synapse/storage/schema/delta/16/users.sql b/synapse/storage/schema/delta/16/users.sql new file mode 100644 index 0000000000..db27bdca02 --- /dev/null +++ b/synapse/storage/schema/delta/16/users.sql @@ -0,0 +1,56 @@ +-- Convert `access_tokens`.user from rowids to user strings. +-- MUST BE DONE BEFORE REMOVING ID COLUMN FROM USERS TABLE BELOW +CREATE TABLE IF NOT EXISTS new_access_tokens( + id BIGINT UNSIGNED PRIMARY KEY, + user_id VARCHAR(150) NOT NULL, + device_id VARCHAR(150), + token VARCHAR(150) NOT NULL, + last_used BIGINT UNSIGNED, + UNIQUE(token) +); + +INSERT INTO new_access_tokens + SELECT a.id, u.name, a.device_id, a.token, a.last_used + FROM access_tokens as a + INNER JOIN users as u ON u.id = a.user_id; + +DROP TABLE access_tokens; + +ALTER TABLE new_access_tokens RENAME TO access_tokens; + +-- Remove ID column from `users` table +CREATE TABLE IF NOT EXISTS new_users( + name VARCHAR(150), + password_hash VARCHAR(150), + creation_ts BIGINT UNSIGNED, + admin BOOL DEFAULT 0 NOT NULL, + UNIQUE(name) +); + +INSERT INTO new_users SELECT name, password_hash, creation_ts, admin FROM users; + +DROP TABLE users; + +ALTER TABLE new_users RENAME TO users; + + +-- Remove UNIQUE constraint from `user_ips` table +CREATE TABLE IF NOT EXISTS new_user_ips ( + user_id VARCHAR(150) NOT NULL, + access_token VARCHAR(150) NOT NULL, + device_id VARCHAR(150), + ip VARCHAR(150) NOT NULL, + user_agent VARCHAR(150) NOT NULL, + last_seen BIGINT UNSIGNED NOT NULL +); + +INSERT INTO new_user_ips + SELECT user, access_token, device_id, ip, user_agent, last_seen FROM user_ips; + +DROP TABLE user_ips; + +ALTER TABLE new_user_ips RENAME TO user_ips; + +CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user_id); +CREATE INDEX IF NOT EXISTS user_ips_user_ip ON user_ips(user_id, access_token, ip); + diff --git a/synapse/storage/schema/full_schemas/11/event_edges.sql b/synapse/storage/schema/full_schemas/11/event_edges.sql index 1e766d6db2..bdb1109094 100644 --- a/synapse/storage/schema/full_schemas/11/event_edges.sql +++ b/synapse/storage/schema/full_schemas/11/event_edges.sql @@ -14,9 +14,9 @@ */ CREATE TABLE IF NOT EXISTS event_forward_extremities( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + UNIQUE (event_id, room_id) ); CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id); @@ -24,9 +24,9 @@ CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_backward_extremities( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + UNIQUE (event_id, room_id) ); CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id); @@ -34,11 +34,11 @@ CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id CREATE TABLE IF NOT EXISTS event_edges( - event_id TEXT NOT NULL, - prev_event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - is_state INTEGER NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id, is_state) + event_id VARCHAR(150) NOT NULL, + prev_event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + is_state BOOL NOT NULL, + UNIQUE (event_id, prev_event_id, room_id, is_state) ); CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id); @@ -46,30 +46,30 @@ CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id); CREATE TABLE IF NOT EXISTS room_depth( - room_id TEXT NOT NULL, + room_id VARCHAR(150) NOT NULL, min_depth INTEGER NOT NULL, - CONSTRAINT uniqueness UNIQUE (room_id) + UNIQUE (room_id) ); CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id); create TABLE IF NOT EXISTS event_destinations( - event_id TEXT NOT NULL, - destination TEXT NOT NULL, - delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered - CONSTRAINT uniqueness UNIQUE (event_id, destination) ON CONFLICT REPLACE + event_id VARCHAR(150) NOT NULL, + destination VARCHAR(150) NOT NULL, + delivered_ts BIGINT UNSIGNED DEFAULT 0, -- or 0 if not delivered + UNIQUE (event_id, destination) ); CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id); CREATE TABLE IF NOT EXISTS state_forward_extremities( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - type TEXT NOT NULL, - state_key TEXT NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + type VARCHAR(150) NOT NULL, + state_key VARCHAR(150) NOT NULL, + UNIQUE (event_id, room_id) ); CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities( @@ -79,11 +79,11 @@ CREATE INDEX IF NOT EXISTS st_extrem_id ON state_forward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_auth( - event_id TEXT NOT NULL, - auth_id TEXT NOT NULL, - room_id TEXT NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, auth_id, room_id) + event_id VARCHAR(150) NOT NULL, + auth_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + UNIQUE (event_id, auth_id, room_id) ); CREATE INDEX IF NOT EXISTS evauth_edges_id ON event_auth(event_id); -CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id); \ No newline at end of file +CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id); diff --git a/synapse/storage/schema/full_schemas/11/event_signatures.sql b/synapse/storage/schema/full_schemas/11/event_signatures.sql index c28c39c48a..09886f607c 100644 --- a/synapse/storage/schema/full_schemas/11/event_signatures.sql +++ b/synapse/storage/schema/full_schemas/11/event_signatures.sql @@ -14,52 +14,42 @@ */ CREATE TABLE IF NOT EXISTS event_content_hashes ( - event_id TEXT, - algorithm TEXT, - hash BLOB, - CONSTRAINT uniqueness UNIQUE (event_id, algorithm) + event_id VARCHAR(150), + algorithm VARCHAR(150), + hash LONGBLOB, + UNIQUE (event_id, algorithm) ); -CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes( - event_id -); +CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes(event_id); CREATE TABLE IF NOT EXISTS event_reference_hashes ( - event_id TEXT, - algorithm TEXT, - hash BLOB, - CONSTRAINT uniqueness UNIQUE (event_id, algorithm) + event_id VARCHAR(150), + algorithm VARCHAR(150), + hash LONGBLOB, + UNIQUE (event_id, algorithm) ); -CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes ( - event_id -); +CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes(event_id); CREATE TABLE IF NOT EXISTS event_signatures ( - event_id TEXT, - signature_name TEXT, - key_id TEXT, - signature BLOB, - CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id) + event_id VARCHAR(150), + signature_name VARCHAR(150), + key_id VARCHAR(150), + signature LONGBLOB, + UNIQUE (event_id, signature_name, key_id) ); -CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures ( - event_id -); +CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures(event_id); CREATE TABLE IF NOT EXISTS event_edge_hashes( - event_id TEXT, - prev_event_id TEXT, - algorithm TEXT, - hash BLOB, - CONSTRAINT uniqueness UNIQUE ( - event_id, prev_event_id, algorithm - ) + event_id VARCHAR(150), + prev_event_id VARCHAR(150), + algorithm VARCHAR(150), + hash LONGBLOB, + UNIQUE (event_id, prev_event_id, algorithm) ); -CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes( - event_id -); +CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes(event_id); diff --git a/synapse/storage/schema/full_schemas/11/im.sql b/synapse/storage/schema/full_schemas/11/im.sql index dd00c1cd2f..9c47f51742 100644 --- a/synapse/storage/schema/full_schemas/11/im.sql +++ b/synapse/storage/schema/full_schemas/11/im.sql @@ -15,56 +15,54 @@ CREATE TABLE IF NOT EXISTS events( stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT, - topological_ordering INTEGER NOT NULL, - event_id TEXT NOT NULL, - type TEXT NOT NULL, - room_id TEXT NOT NULL, - content TEXT NOT NULL, - unrecognized_keys TEXT, + topological_ordering BIGINT UNSIGNED NOT NULL, + event_id VARCHAR(150) NOT NULL, + type VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + content LONGBLOB NOT NULL, + unrecognized_keys LONGBLOB, processed BOOL NOT NULL, outlier BOOL NOT NULL, - depth INTEGER DEFAULT 0 NOT NULL, - CONSTRAINT ev_uniq UNIQUE (event_id) + depth BIGINT UNSIGNED DEFAULT 0 NOT NULL, + UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS events_event_id ON events (event_id); CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering); CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering); CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id); CREATE TABLE IF NOT EXISTS event_json( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - internal_metadata NOT NULL, - json BLOB NOT NULL, - CONSTRAINT ev_j_uniq UNIQUE (event_id) + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + internal_metadata LONGBLOB NOT NULL, + json LONGBLOB NOT NULL, + UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS event_json_id ON event_json(event_id); CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id); CREATE TABLE IF NOT EXISTS state_events( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - type TEXT NOT NULL, - state_key TEXT NOT NULL, - prev_state TEXT + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + type VARCHAR(150) NOT NULL, + state_key VARCHAR(150) NOT NULL, + prev_state VARCHAR(150), + UNIQUE (event_id) ); -CREATE UNIQUE INDEX IF NOT EXISTS state_events_event_id ON state_events (event_id); CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id); CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type); CREATE INDEX IF NOT EXISTS state_events_state_key ON state_events (state_key); CREATE TABLE IF NOT EXISTS current_state_events( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - type TEXT NOT NULL, - state_key TEXT NOT NULL, - CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + type VARCHAR(150) NOT NULL, + state_key VARCHAR(150) NOT NULL, + UNIQUE (room_id, type, state_key) ); CREATE INDEX IF NOT EXISTS curr_events_event_id ON current_state_events (event_id); @@ -73,11 +71,11 @@ CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (ty CREATE INDEX IF NOT EXISTS current_state_events_state_key ON current_state_events (state_key); CREATE TABLE IF NOT EXISTS room_memberships( - event_id TEXT NOT NULL, - user_id TEXT NOT NULL, - sender TEXT NOT NULL, - room_id TEXT NOT NULL, - membership TEXT NOT NULL + event_id VARCHAR(150) NOT NULL, + user_id VARCHAR(150) NOT NULL, + sender VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + membership VARCHAR(150) NOT NULL ); CREATE INDEX IF NOT EXISTS room_memberships_event_id ON room_memberships (event_id); @@ -85,16 +83,16 @@ CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id); CREATE TABLE IF NOT EXISTS feedback( - event_id TEXT NOT NULL, - feedback_type TEXT, - target_event_id TEXT, - sender TEXT, - room_id TEXT + event_id VARCHAR(150) NOT NULL, + feedback_type VARCHAR(150), + target_event_id VARCHAR(150), + sender VARCHAR(150), + room_id VARCHAR(150) ); CREATE TABLE IF NOT EXISTS topics( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, topic TEXT NOT NULL ); @@ -102,8 +100,8 @@ CREATE INDEX IF NOT EXISTS topics_event_id ON topics(event_id); CREATE INDEX IF NOT EXISTS topics_room_id ON topics(room_id); CREATE TABLE IF NOT EXISTS room_names( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, name TEXT NOT NULL ); @@ -111,15 +109,15 @@ CREATE INDEX IF NOT EXISTS room_names_event_id ON room_names(event_id); CREATE INDEX IF NOT EXISTS room_names_room_id ON room_names(room_id); CREATE TABLE IF NOT EXISTS rooms( - room_id TEXT PRIMARY KEY NOT NULL, - is_public INTEGER, - creator TEXT + room_id VARCHAR(150) PRIMARY KEY NOT NULL, + is_public BOOL, + creator VARCHAR(150) ); CREATE TABLE IF NOT EXISTS room_hosts( - room_id TEXT NOT NULL, - host TEXT NOT NULL, - CONSTRAINT room_hosts_uniq UNIQUE (room_id, host) ON CONFLICT IGNORE + room_id VARCHAR(150) NOT NULL, + host VARCHAR(150) NOT NULL, + UNIQUE (room_id, host) ); CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id); diff --git a/synapse/storage/schema/full_schemas/11/keys.sql b/synapse/storage/schema/full_schemas/11/keys.sql index a9e0a4fe0d..35f141c288 100644 --- a/synapse/storage/schema/full_schemas/11/keys.sql +++ b/synapse/storage/schema/full_schemas/11/keys.sql @@ -13,19 +13,19 @@ * limitations under the License. */ CREATE TABLE IF NOT EXISTS server_tls_certificates( - server_name TEXT, -- Server name. - fingerprint TEXT, -- Certificate fingerprint. - from_server TEXT, -- Which key server the certificate was fetched from. - ts_added_ms INTEGER, -- When the certifcate was added. - tls_certificate BLOB, -- DER encoded x509 certificate. - CONSTRAINT uniqueness UNIQUE (server_name, fingerprint) + server_name VARCHAR(150), -- Server name. + fingerprint VARCHAR(150), -- Certificate fingerprint. + from_server VARCHAR(150), -- Which key server the certificate was fetched from. + ts_added_ms BIGINT UNSIGNED, -- When the certifcate was added. + tls_certificate LONGBLOB, -- DER encoded x509 certificate. + UNIQUE (server_name, fingerprint) ); CREATE TABLE IF NOT EXISTS server_signature_keys( - server_name TEXT, -- Server name. - key_id TEXT, -- Key version. - from_server TEXT, -- Which key server the key was fetched form. - ts_added_ms INTEGER, -- When the key was added. - verify_key BLOB, -- NACL verification key. - CONSTRAINT uniqueness UNIQUE (server_name, key_id) + server_name VARCHAR(150), -- Server name. + key_id VARCHAR(150), -- Key version. + from_server VARCHAR(150), -- Which key server the key was fetched form. + ts_added_ms BIGINT UNSIGNED, -- When the key was added. + verify_key LONGBLOB, -- NACL verification key. + UNIQUE (server_name, key_id) ); diff --git a/synapse/storage/schema/full_schemas/11/media_repository.sql b/synapse/storage/schema/full_schemas/11/media_repository.sql index afdf48cbfb..134e7fbcec 100644 --- a/synapse/storage/schema/full_schemas/11/media_repository.sql +++ b/synapse/storage/schema/full_schemas/11/media_repository.sql @@ -14,23 +14,23 @@ */ CREATE TABLE IF NOT EXISTS local_media_repository ( - media_id TEXT, -- The id used to refer to the media. - media_type TEXT, -- The MIME-type of the media. + media_id VARCHAR(150), -- The id used to refer to the media. + media_type VARCHAR(150), -- The MIME-type of the media. media_length INTEGER, -- Length of the media in bytes. - created_ts INTEGER, -- When the content was uploaded in ms. - upload_name TEXT, -- The name the media was uploaded with. - user_id TEXT, -- The user who uploaded the file. - CONSTRAINT uniqueness UNIQUE (media_id) + created_ts BIGINT UNSIGNED, -- When the content was uploaded in ms. + upload_name VARCHAR(150), -- The name the media was uploaded with. + user_id VARCHAR(150), -- The user who uploaded the file. + UNIQUE (media_id) ); CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails ( - media_id TEXT, -- The id used to refer to the media. + media_id VARCHAR(150), -- The id used to refer to the media. thumbnail_width INTEGER, -- The width of the thumbnail in pixels. thumbnail_height INTEGER, -- The height of the thumbnail in pixels. - thumbnail_type TEXT, -- The MIME-type of the thumbnail. - thumbnail_method TEXT, -- The method used to make the thumbnail. + thumbnail_type VARCHAR(150), -- The MIME-type of the thumbnail. + thumbnail_method VARCHAR(150), -- The method used to make the thumbnail. thumbnail_length INTEGER, -- The length of the thumbnail in bytes. - CONSTRAINT uniqueness UNIQUE ( + UNIQUE ( media_id, thumbnail_width, thumbnail_height, thumbnail_type ) ); @@ -39,30 +39,27 @@ CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id ON local_media_repository_thumbnails (media_id); CREATE TABLE IF NOT EXISTS remote_media_cache ( - media_origin TEXT, -- The remote HS the media came from. - media_id TEXT, -- The id used to refer to the media on that server. - media_type TEXT, -- The MIME-type of the media. - created_ts INTEGER, -- When the content was uploaded in ms. - upload_name TEXT, -- The name the media was uploaded with. + media_origin VARCHAR(150), -- The remote HS the media came from. + media_id VARCHAR(150), -- The id used to refer to the media on that server. + media_type VARCHAR(150), -- The MIME-type of the media. + created_ts BIGINT UNSIGNED, -- When the content was uploaded in ms. + upload_name VARCHAR(150), -- The name the media was uploaded with. media_length INTEGER, -- Length of the media in bytes. - filesystem_id TEXT, -- The name used to store the media on disk. - CONSTRAINT uniqueness UNIQUE (media_origin, media_id) + filesystem_id VARCHAR(150), -- The name used to store the media on disk. + UNIQUE (media_origin, media_id) ); CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails ( - media_origin TEXT, -- The remote HS the media came from. - media_id TEXT, -- The id used to refer to the media. + media_origin VARCHAR(150), -- The remote HS the media came from. + media_id VARCHAR(150), -- The id used to refer to the media. thumbnail_width INTEGER, -- The width of the thumbnail in pixels. thumbnail_height INTEGER, -- The height of the thumbnail in pixels. - thumbnail_method TEXT, -- The method used to make the thumbnail - thumbnail_type TEXT, -- The MIME-type of the thumbnail. + thumbnail_method VARCHAR(150), -- The method used to make the thumbnail + thumbnail_type VARCHAR(150), -- The MIME-type of the thumbnail. thumbnail_length INTEGER, -- The length of the thumbnail in bytes. - filesystem_id TEXT, -- The name used to store the media on disk. - CONSTRAINT uniqueness UNIQUE ( + filesystem_id VARCHAR(150), -- The name used to store the media on disk. + UNIQUE ( media_origin, media_id, thumbnail_width, thumbnail_height, - thumbnail_type, thumbnail_type - ) + thumbnail_type + ) ); - -CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id - ON local_media_repository_thumbnails (media_id); diff --git a/synapse/storage/schema/full_schemas/11/presence.sql b/synapse/storage/schema/full_schemas/11/presence.sql index f9f8db9697..c617ebea73 100644 --- a/synapse/storage/schema/full_schemas/11/presence.sql +++ b/synapse/storage/schema/full_schemas/11/presence.sql @@ -13,26 +13,23 @@ * limitations under the License. */ CREATE TABLE IF NOT EXISTS presence( - user_id INTEGER NOT NULL, - state INTEGER, - status_msg TEXT, - mtime INTEGER, -- miliseconds since last state change - FOREIGN KEY(user_id) REFERENCES users(id) + user_id VARCHAR(150) NOT NULL, + state VARCHAR(20), + status_msg VARCHAR(150), + mtime BIGINT UNSIGNED -- miliseconds since last state change ); -- For each of /my/ users which possibly-remote users are allowed to see their -- presence state CREATE TABLE IF NOT EXISTS presence_allow_inbound( - observed_user_id INTEGER NOT NULL, - observer_user_id TEXT, -- a UserID, - FOREIGN KEY(observed_user_id) REFERENCES users(id) + observed_user_id VARCHAR(150) NOT NULL, + observer_user_id VARCHAR(150) NOT NULL -- a UserID, ); -- For each of /my/ users (watcher), which possibly-remote users are they -- watching? CREATE TABLE IF NOT EXISTS presence_list( - user_id INTEGER NOT NULL, - observed_user_id TEXT, -- a UserID, - accepted BOOLEAN, - FOREIGN KEY(user_id) REFERENCES users(id) + user_id VARCHAR(150) NOT NULL, + observed_user_id VARCHAR(150) NOT NULL, -- a UserID, + accepted BOOLEAN NOT NULL ); diff --git a/synapse/storage/schema/full_schemas/11/profiles.sql b/synapse/storage/schema/full_schemas/11/profiles.sql index f06a528b4d..ffe75edf9f 100644 --- a/synapse/storage/schema/full_schemas/11/profiles.sql +++ b/synapse/storage/schema/full_schemas/11/profiles.sql @@ -13,8 +13,7 @@ * limitations under the License. */ CREATE TABLE IF NOT EXISTS profiles( - user_id INTEGER NOT NULL, - displayname TEXT, - avatar_url TEXT, - FOREIGN KEY(user_id) REFERENCES users(id) + user_id VARCHAR(150) NOT NULL, + displayname VARCHAR(150), + avatar_url VARCHAR(150) ); diff --git a/synapse/storage/schema/full_schemas/11/redactions.sql b/synapse/storage/schema/full_schemas/11/redactions.sql index 5011d95db8..b81451eab4 100644 --- a/synapse/storage/schema/full_schemas/11/redactions.sql +++ b/synapse/storage/schema/full_schemas/11/redactions.sql @@ -13,9 +13,9 @@ * limitations under the License. */ CREATE TABLE IF NOT EXISTS redactions ( - event_id TEXT NOT NULL, - redacts TEXT NOT NULL, - CONSTRAINT ev_uniq UNIQUE (event_id) + event_id VARCHAR(150) NOT NULL, + redacts VARCHAR(150) NOT NULL, + UNIQUE (event_id) ); CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id); diff --git a/synapse/storage/schema/full_schemas/11/room_aliases.sql b/synapse/storage/schema/full_schemas/11/room_aliases.sql index 0d2df01603..6226913227 100644 --- a/synapse/storage/schema/full_schemas/11/room_aliases.sql +++ b/synapse/storage/schema/full_schemas/11/room_aliases.sql @@ -14,14 +14,11 @@ */ CREATE TABLE IF NOT EXISTS room_aliases( - room_alias TEXT NOT NULL, - room_id TEXT NOT NULL + room_alias VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL ); CREATE TABLE IF NOT EXISTS room_alias_servers( - room_alias TEXT NOT NULL, - server TEXT NOT NULL + room_alias VARCHAR(150) NOT NULL, + server VARCHAR(150) NOT NULL ); - - - diff --git a/synapse/storage/schema/full_schemas/11/state.sql b/synapse/storage/schema/full_schemas/11/state.sql index 1fe8f1e430..737c3e35c7 100644 --- a/synapse/storage/schema/full_schemas/11/state.sql +++ b/synapse/storage/schema/full_schemas/11/state.sql @@ -14,34 +14,27 @@ */ CREATE TABLE IF NOT EXISTS state_groups( - id INTEGER PRIMARY KEY, - room_id TEXT NOT NULL, - event_id TEXT NOT NULL + id VARCHAR(20) PRIMARY KEY, + room_id VARCHAR(150) NOT NULL, + event_id VARCHAR(150) NOT NULL ); CREATE TABLE IF NOT EXISTS state_groups_state( - state_group INTEGER NOT NULL, - room_id TEXT NOT NULL, - type TEXT NOT NULL, - state_key TEXT NOT NULL, - event_id TEXT NOT NULL + state_group VARCHAR(20) NOT NULL, + room_id VARCHAR(150) NOT NULL, + type VARCHAR(150) NOT NULL, + state_key VARCHAR(150) NOT NULL, + event_id VARCHAR(150) NOT NULL ); CREATE TABLE IF NOT EXISTS event_to_state_groups( - event_id TEXT NOT NULL, - state_group INTEGER NOT NULL, - CONSTRAINT event_to_state_groups_uniq UNIQUE (event_id) + event_id VARCHAR(150) NOT NULL, + state_group VARCHAR(150) NOT NULL, + UNIQUE (event_id) ); CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id); -CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state( - state_group -); -CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state( - room_id, type, state_key -); - -CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups( - event_id -); \ No newline at end of file +CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state(state_group); +CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state(room_id, type, state_key); +CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups(event_id); \ No newline at end of file diff --git a/synapse/storage/schema/full_schemas/11/transactions.sql b/synapse/storage/schema/full_schemas/11/transactions.sql index 2d30f99b06..c2fab10aa0 100644 --- a/synapse/storage/schema/full_schemas/11/transactions.sql +++ b/synapse/storage/schema/full_schemas/11/transactions.sql @@ -14,16 +14,15 @@ */ -- Stores what transaction ids we have received and what our response was CREATE TABLE IF NOT EXISTS received_transactions( - transaction_id TEXT, - origin TEXT, - ts INTEGER, + transaction_id VARCHAR(150), + origin VARCHAR(150), + ts BIGINT UNSIGNED, response_code INTEGER, - response_json TEXT, + response_json LONGBLOB, has_been_referenced BOOL default 0, -- Whether thishas been referenced by a prev_tx - CONSTRAINT uniquesss UNIQUE (transaction_id, origin) ON CONFLICT REPLACE + UNIQUE (transaction_id, origin) ); -CREATE UNIQUE INDEX IF NOT EXISTS transactions_txid ON received_transactions(transaction_id, origin); CREATE INDEX IF NOT EXISTS transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0; @@ -31,17 +30,14 @@ CREATE INDEX IF NOT EXISTS transactions_have_ref ON received_transactions(origin -- since referenced the transaction in another outgoing transaction CREATE TABLE IF NOT EXISTS sent_transactions( id INTEGER PRIMARY KEY AUTOINCREMENT, -- This is used to apply insertion ordering - transaction_id TEXT, - destination TEXT, + transaction_id VARCHAR(150), + destination VARCHAR(150), response_code INTEGER DEFAULT 0, - response_json TEXT, - ts INTEGER + response_json LONGBLOB, + ts BIGINT UNSIGNED ); CREATE INDEX IF NOT EXISTS sent_transaction_dest ON sent_transactions(destination); -CREATE INDEX IF NOT EXISTS sent_transaction_dest_referenced ON sent_transactions( - destination -); CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id); -- So that we can do an efficient look up of all transactions that have yet to be successfully -- sent. @@ -51,18 +47,17 @@ CREATE INDEX IF NOT EXISTS sent_transaction_sent ON sent_transactions(response_c -- For sent transactions only. CREATE TABLE IF NOT EXISTS transaction_id_to_pdu( transaction_id INTEGER, - destination TEXT, - pdu_id TEXT, - pdu_origin TEXT + destination VARCHAR(150), + pdu_id VARCHAR(150), + pdu_origin VARCHAR(150) ); CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(transaction_id, destination); CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); -CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination); -- To track destination health CREATE TABLE IF NOT EXISTS destinations( - destination TEXT PRIMARY KEY, - retry_last_ts INTEGER, + destination VARCHAR(150) PRIMARY KEY, + retry_last_ts BIGINT UNSIGNED, retry_interval INTEGER ); diff --git a/synapse/storage/schema/full_schemas/11/users.sql b/synapse/storage/schema/full_schemas/11/users.sql index 08ccfdac0a..0ddfccd410 100644 --- a/synapse/storage/schema/full_schemas/11/users.sql +++ b/synapse/storage/schema/full_schemas/11/users.sql @@ -14,32 +14,30 @@ */ CREATE TABLE IF NOT EXISTS users( id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT, - password_hash TEXT, - creation_ts INTEGER, + name VARCHAR(150), + password_hash VARCHAR(150), + creation_ts BIGINT UNSIGNED, admin BOOL DEFAULT 0 NOT NULL, - UNIQUE(name) ON CONFLICT ROLLBACK + UNIQUE(name) ); CREATE TABLE IF NOT EXISTS access_tokens( id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER NOT NULL, - device_id TEXT, - token TEXT NOT NULL, - last_used INTEGER, - FOREIGN KEY(user_id) REFERENCES users(id), - UNIQUE(token) ON CONFLICT ROLLBACK + user_id VARCHAR(150) NOT NULL, + device_id VARCHAR(150), + token VARCHAR(150) NOT NULL, + last_used BIGINT UNSIGNED, + UNIQUE(token) ); CREATE TABLE IF NOT EXISTS user_ips ( - user TEXT NOT NULL, - access_token TEXT NOT NULL, - device_id TEXT, - ip TEXT NOT NULL, - user_agent TEXT NOT NULL, - last_seen INTEGER NOT NULL, - CONSTRAINT user_ip UNIQUE (user, access_token, ip, user_agent) ON CONFLICT REPLACE + user VARCHAR(150) NOT NULL, + access_token VARCHAR(150) NOT NULL, + device_id VARCHAR(150), + ip VARCHAR(150) NOT NULL, + user_agent VARCHAR(150) NOT NULL, + last_seen BIGINT UNSIGNED NOT NULL, + UNIQUE (user, access_token, ip, user_agent) ); CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user); - diff --git a/synapse/storage/schema/full_schemas/16/application_services.sql b/synapse/storage/schema/full_schemas/16/application_services.sql new file mode 100644 index 0000000000..f08c5bcf76 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/application_services.sql @@ -0,0 +1,48 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS application_services( + id BIGINT PRIMARY KEY, + url VARCHAR(150), + token VARCHAR(150), + hs_token VARCHAR(150), + sender VARCHAR(150), + UNIQUE(token) +); + +CREATE TABLE IF NOT EXISTS application_services_regex( + id BIGINT PRIMARY KEY, + as_id BIGINT NOT NULL, + namespace INTEGER, /* enum[room_id|room_alias|user_id] */ + regex VARCHAR(150), + FOREIGN KEY(as_id) REFERENCES application_services(id) +); + +CREATE TABLE IF NOT EXISTS application_services_state( + as_id VARCHAR(150) PRIMARY KEY, + state VARCHAR(5), + last_txn INTEGER +); + +CREATE TABLE IF NOT EXISTS application_services_txns( + as_id VARCHAR(150) NOT NULL, + txn_id INTEGER NOT NULL, + event_ids bytea NOT NULL, + UNIQUE(as_id, txn_id) +); + +CREATE INDEX application_services_txns_id ON application_services_txns ( + as_id +); diff --git a/synapse/storage/schema/full_schemas/16/event_edges.sql b/synapse/storage/schema/full_schemas/16/event_edges.sql new file mode 100644 index 0000000000..05d0874f0d --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/event_edges.sql @@ -0,0 +1,89 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_forward_extremities( + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + UNIQUE (event_id, room_id) +); + +CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id); +CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id); + + +CREATE TABLE IF NOT EXISTS event_backward_extremities( + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + UNIQUE (event_id, room_id) +); + +CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id); +CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id); + + +CREATE TABLE IF NOT EXISTS event_edges( + event_id VARCHAR(150) NOT NULL, + prev_event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + is_state BOOL NOT NULL, + UNIQUE (event_id, prev_event_id, room_id, is_state) +); + +CREATE INDEX ev_edges_id ON event_edges(event_id); +CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id); + + +CREATE TABLE IF NOT EXISTS room_depth( + room_id VARCHAR(150) NOT NULL, + min_depth INTEGER NOT NULL, + UNIQUE (room_id) +); + +CREATE INDEX room_depth_room ON room_depth(room_id); + + +create TABLE IF NOT EXISTS event_destinations( + event_id VARCHAR(150) NOT NULL, + destination VARCHAR(150) NOT NULL, + delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered + UNIQUE (event_id, destination) +); + +CREATE INDEX event_destinations_id ON event_destinations(event_id); + + +CREATE TABLE IF NOT EXISTS state_forward_extremities( + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + type VARCHAR(150) NOT NULL, + state_key VARCHAR(150) NOT NULL, + UNIQUE (event_id, room_id) +); + +CREATE INDEX st_extrem_keys ON state_forward_extremities( + room_id, type, state_key +); +CREATE INDEX st_extrem_id ON state_forward_extremities(event_id); + + +CREATE TABLE IF NOT EXISTS event_auth( + event_id VARCHAR(150) NOT NULL, + auth_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + UNIQUE (event_id, auth_id, room_id) +); + +CREATE INDEX evauth_edges_id ON event_auth(event_id); +CREATE INDEX evauth_edges_auth_id ON event_auth(auth_id); diff --git a/synapse/storage/schema/full_schemas/16/event_signatures.sql b/synapse/storage/schema/full_schemas/16/event_signatures.sql new file mode 100644 index 0000000000..4291827368 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/event_signatures.sql @@ -0,0 +1,55 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_content_hashes ( + event_id VARCHAR(150), + algorithm VARCHAR(150), + hash bytea, + UNIQUE (event_id, algorithm) +); + +CREATE INDEX event_content_hashes_id ON event_content_hashes(event_id); + + +CREATE TABLE IF NOT EXISTS event_reference_hashes ( + event_id VARCHAR(150), + algorithm VARCHAR(150), + hash bytea, + UNIQUE (event_id, algorithm) +); + +CREATE INDEX event_reference_hashes_id ON event_reference_hashes(event_id); + + +CREATE TABLE IF NOT EXISTS event_signatures ( + event_id VARCHAR(150), + signature_name VARCHAR(150), + key_id VARCHAR(150), + signature bytea, + UNIQUE (event_id, signature_name, key_id) +); + +CREATE INDEX event_signatures_id ON event_signatures(event_id); + + +CREATE TABLE IF NOT EXISTS event_edge_hashes( + event_id VARCHAR(150), + prev_event_id VARCHAR(150), + algorithm VARCHAR(150), + hash bytea, + UNIQUE (event_id, prev_event_id, algorithm) +); + +CREATE INDEX event_edge_hashes_id ON event_edge_hashes(event_id); diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql new file mode 100644 index 0000000000..a661fc160c --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/im.sql @@ -0,0 +1,124 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS events( + stream_ordering BIGINT PRIMARY KEY, + topological_ordering BIGINT NOT NULL, + event_id VARCHAR(150) NOT NULL, + type VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + content bytea NOT NULL, + unrecognized_keys bytea, + processed BOOL NOT NULL, + outlier BOOL NOT NULL, + depth BIGINT DEFAULT 0 NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX events_stream_ordering ON events (stream_ordering); +CREATE INDEX events_topological_ordering ON events (topological_ordering); +CREATE INDEX events_room_id ON events (room_id); + + +CREATE TABLE IF NOT EXISTS event_json( + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + internal_metadata bytea NOT NULL, + json bytea NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX event_json_room_id ON event_json(room_id); + + +CREATE TABLE IF NOT EXISTS state_events( + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + type VARCHAR(150) NOT NULL, + state_key VARCHAR(150) NOT NULL, + prev_state VARCHAR(150), + UNIQUE (event_id) +); + +CREATE INDEX state_events_room_id ON state_events (room_id); +CREATE INDEX state_events_type ON state_events (type); +CREATE INDEX state_events_state_key ON state_events (state_key); + + +CREATE TABLE IF NOT EXISTS current_state_events( + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + type VARCHAR(150) NOT NULL, + state_key VARCHAR(150) NOT NULL, + UNIQUE (event_id), + UNIQUE (room_id, type, state_key) +); + +CREATE INDEX current_state_events_room_id ON current_state_events (room_id); +CREATE INDEX current_state_events_type ON current_state_events (type); +CREATE INDEX current_state_events_state_key ON current_state_events (state_key); + +CREATE TABLE IF NOT EXISTS room_memberships( + event_id VARCHAR(150) NOT NULL, + user_id VARCHAR(150) NOT NULL, + sender VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + membership VARCHAR(150) NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX room_memberships_room_id ON room_memberships (room_id); +CREATE INDEX room_memberships_user_id ON room_memberships (user_id); + +CREATE TABLE IF NOT EXISTS feedback( + event_id VARCHAR(150) NOT NULL, + feedback_type VARCHAR(150), + target_event_id VARCHAR(150), + sender VARCHAR(150), + room_id VARCHAR(150), + UNIQUE (event_id) +); + +CREATE TABLE IF NOT EXISTS topics( + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + topic TEXT NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX topics_room_id ON topics(room_id); + +CREATE TABLE IF NOT EXISTS room_names( + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + name TEXT NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX room_names_room_id ON room_names(room_id); + +CREATE TABLE IF NOT EXISTS rooms( + room_id VARCHAR(150) PRIMARY KEY NOT NULL, + is_public BOOL, + creator VARCHAR(150) +); + +CREATE TABLE IF NOT EXISTS room_hosts( + room_id VARCHAR(150) NOT NULL, + host VARCHAR(150) NOT NULL, + UNIQUE (room_id, host) +); + +CREATE INDEX room_hosts_room_id ON room_hosts (room_id); diff --git a/synapse/storage/schema/full_schemas/16/keys.sql b/synapse/storage/schema/full_schemas/16/keys.sql new file mode 100644 index 0000000000..459b510427 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/keys.sql @@ -0,0 +1,31 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS server_tls_certificates( + server_name VARCHAR(150), -- Server name. + fingerprint VARCHAR(150), -- Certificate fingerprint. + from_server VARCHAR(150), -- Which key server the certificate was fetched from. + ts_added_ms BIGINT, -- When the certifcate was added. + tls_certificate bytea, -- DER encoded x509 certificate. + UNIQUE (server_name, fingerprint) +); + +CREATE TABLE IF NOT EXISTS server_signature_keys( + server_name VARCHAR(150), -- Server name. + key_id VARCHAR(150), -- Key version. + from_server VARCHAR(150), -- Which key server the key was fetched form. + ts_added_ms BIGINT, -- When the key was added. + verify_key bytea, -- NACL verification key. + UNIQUE (server_name, key_id) +); diff --git a/synapse/storage/schema/full_schemas/16/media_repository.sql b/synapse/storage/schema/full_schemas/16/media_repository.sql new file mode 100644 index 0000000000..0e819fca38 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/media_repository.sql @@ -0,0 +1,68 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS local_media_repository ( + media_id VARCHAR(150), -- The id used to refer to the media. + media_type VARCHAR(150), -- The MIME-type of the media. + media_length INTEGER, -- Length of the media in bytes. + created_ts BIGINT, -- When the content was uploaded in ms. + upload_name VARCHAR(150), -- The name the media was uploaded with. + user_id VARCHAR(150), -- The user who uploaded the file. + UNIQUE (media_id) +); + +CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails ( + media_id VARCHAR(150), -- The id used to refer to the media. + thumbnail_width INTEGER, -- The width of the thumbnail in pixels. + thumbnail_height INTEGER, -- The height of the thumbnail in pixels. + thumbnail_type VARCHAR(150), -- The MIME-type of the thumbnail. + thumbnail_method VARCHAR(150), -- The method used to make the thumbnail. + thumbnail_length INTEGER, -- The length of the thumbnail in bytes. + UNIQUE ( + media_id, thumbnail_width, thumbnail_height, thumbnail_type + ) +); + +CREATE INDEX local_media_repository_thumbnails_media_id + ON local_media_repository_thumbnails (media_id); + +CREATE TABLE IF NOT EXISTS remote_media_cache ( + media_origin VARCHAR(150), -- The remote HS the media came from. + media_id VARCHAR(150), -- The id used to refer to the media on that server. + media_type VARCHAR(150), -- The MIME-type of the media. + created_ts BIGINT, -- When the content was uploaded in ms. + upload_name VARCHAR(150), -- The name the media was uploaded with. + media_length INTEGER, -- Length of the media in bytes. + filesystem_id VARCHAR(150), -- The name used to store the media on disk. + UNIQUE (media_origin, media_id) +); + +CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails ( + media_origin VARCHAR(150), -- The remote HS the media came from. + media_id VARCHAR(150), -- The id used to refer to the media. + thumbnail_width INTEGER, -- The width of the thumbnail in pixels. + thumbnail_height INTEGER, -- The height of the thumbnail in pixels. + thumbnail_method VARCHAR(150), -- The method used to make the thumbnail + thumbnail_type VARCHAR(150), -- The MIME-type of the thumbnail. + thumbnail_length INTEGER, -- The length of the thumbnail in bytes. + filesystem_id VARCHAR(150), -- The name used to store the media on disk. + UNIQUE ( + media_origin, media_id, thumbnail_width, thumbnail_height, + thumbnail_type + ) +); + +CREATE INDEX remote_media_cache_thumbnails_media_id + ON remote_media_cache_thumbnails (media_id); diff --git a/synapse/storage/schema/full_schemas/16/presence.sql b/synapse/storage/schema/full_schemas/16/presence.sql new file mode 100644 index 0000000000..9c41be296e --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/presence.sql @@ -0,0 +1,40 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS presence( + user_id VARCHAR(150) NOT NULL, + state VARCHAR(20), + status_msg VARCHAR(150), + mtime BIGINT, -- miliseconds since last state change + UNIQUE (user_id) +); + +-- For each of /my/ users which possibly-remote users are allowed to see their +-- presence state +CREATE TABLE IF NOT EXISTS presence_allow_inbound( + observed_user_id VARCHAR(150) NOT NULL, + observer_user_id VARCHAR(150) NOT NULL, -- a UserID, + UNIQUE (observed_user_id, observer_user_id) +); + +-- For each of /my/ users (watcher), which possibly-remote users are they +-- watching? +CREATE TABLE IF NOT EXISTS presence_list( + user_id VARCHAR(150) NOT NULL, + observed_user_id VARCHAR(150) NOT NULL, -- a UserID, + accepted BOOLEAN NOT NULL, + UNIQUE (user_id, observed_user_id) +); + +CREATE INDEX presence_list_user_id ON presence_list (user_id); diff --git a/synapse/storage/schema/full_schemas/16/profiles.sql b/synapse/storage/schema/full_schemas/16/profiles.sql new file mode 100644 index 0000000000..21c58a99bc --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/profiles.sql @@ -0,0 +1,20 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS profiles( + user_id VARCHAR(150) NOT NULL, + displayname VARCHAR(150), + avatar_url VARCHAR(150), + UNIQUE(user_id) +); diff --git a/synapse/storage/schema/full_schemas/16/push.sql b/synapse/storage/schema/full_schemas/16/push.sql new file mode 100644 index 0000000000..5c0c7bc201 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/push.sql @@ -0,0 +1,73 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS rejections( + event_id VARCHAR(150) NOT NULL, + reason VARCHAR(150) NOT NULL, + last_check VARCHAR(150) NOT NULL, + UNIQUE (event_id) +); + +-- Push notification endpoints that users have configured +CREATE TABLE IF NOT EXISTS pushers ( + id BIGINT PRIMARY KEY, + user_name VARCHAR(150) NOT NULL, + profile_tag VARCHAR(32) NOT NULL, + kind VARCHAR(8) NOT NULL, + app_id VARCHAR(64) NOT NULL, + app_display_name VARCHAR(64) NOT NULL, + device_display_name VARCHAR(128) NOT NULL, + pushkey bytea NOT NULL, + ts BIGINT NOT NULL, + lang VARCHAR(8), + data bytea, + last_token TEXT, + last_success BIGINT, + failing_since BIGINT, + UNIQUE (app_id, pushkey) +); + +CREATE TABLE IF NOT EXISTS push_rules ( + id BIGINT PRIMARY KEY, + user_name VARCHAR(150) NOT NULL, + rule_id VARCHAR(150) NOT NULL, + priority_class SMALLINT NOT NULL, + priority INTEGER NOT NULL DEFAULT 0, + conditions VARCHAR(150) NOT NULL, + actions VARCHAR(150) NOT NULL, + UNIQUE(user_name, rule_id) +); + +CREATE INDEX push_rules_user_name on push_rules (user_name); + +CREATE TABLE IF NOT EXISTS user_filters( + user_id VARCHAR(150), + filter_id BIGINT, + filter_json bytea +); + +CREATE INDEX user_filters_by_user_id_filter_id ON user_filters( + user_id, filter_id +); + +CREATE TABLE IF NOT EXISTS push_rules_enable ( + id BIGINT PRIMARY KEY, + user_name VARCHAR(150) NOT NULL, + rule_id VARCHAR(150) NOT NULL, + enabled SMALLINT, + UNIQUE(user_name, rule_id) +); + +CREATE INDEX push_rules_enable_user_name on push_rules_enable (user_name); diff --git a/synapse/storage/schema/full_schemas/16/redactions.sql b/synapse/storage/schema/full_schemas/16/redactions.sql new file mode 100644 index 0000000000..492fd22033 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/redactions.sql @@ -0,0 +1,22 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS redactions ( + event_id VARCHAR(150) NOT NULL, + redacts VARCHAR(150) NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX redactions_event_id ON redactions (event_id); +CREATE INDEX redactions_redacts ON redactions (redacts); diff --git a/synapse/storage/schema/full_schemas/16/room_aliases.sql b/synapse/storage/schema/full_schemas/16/room_aliases.sql new file mode 100644 index 0000000000..952cae35b7 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/room_aliases.sql @@ -0,0 +1,25 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS room_aliases( + room_alias VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + UNIQUE (room_alias) +); + +CREATE TABLE IF NOT EXISTS room_alias_servers( + room_alias VARCHAR(150) NOT NULL, + server VARCHAR(150) NOT NULL +); diff --git a/synapse/storage/schema/full_schemas/16/state.sql b/synapse/storage/schema/full_schemas/16/state.sql new file mode 100644 index 0000000000..3c54595e64 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/state.sql @@ -0,0 +1,40 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS state_groups( + id BIGINT PRIMARY KEY, + room_id VARCHAR(150) NOT NULL, + event_id VARCHAR(150) NOT NULL +); + +CREATE TABLE IF NOT EXISTS state_groups_state( + state_group VARCHAR(20) NOT NULL, + room_id VARCHAR(150) NOT NULL, + type VARCHAR(150) NOT NULL, + state_key VARCHAR(150) NOT NULL, + event_id VARCHAR(150) NOT NULL +); + +CREATE TABLE IF NOT EXISTS event_to_state_groups( + event_id VARCHAR(150) NOT NULL, + state_group VARCHAR(150) NOT NULL, + UNIQUE (event_id) +); + +CREATE INDEX state_groups_id ON state_groups(id); + +CREATE INDEX state_groups_state_id ON state_groups_state(state_group); +CREATE INDEX state_groups_state_tuple ON state_groups_state(room_id, type, state_key); +CREATE INDEX event_to_state_groups_id ON event_to_state_groups(event_id); diff --git a/synapse/storage/schema/full_schemas/16/transactions.sql b/synapse/storage/schema/full_schemas/16/transactions.sql new file mode 100644 index 0000000000..bc64064936 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/transactions.sql @@ -0,0 +1,63 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +-- Stores what transaction ids we have received and what our response was +CREATE TABLE IF NOT EXISTS received_transactions( + transaction_id VARCHAR(150), + origin VARCHAR(150), + ts BIGINT, + response_code INTEGER, + response_json bytea, + has_been_referenced smallint default 0, -- Whether thishas been referenced by a prev_tx + UNIQUE (transaction_id, origin) +); + +CREATE INDEX transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0; + + +-- Stores what transactions we've sent, what their response was (if we got one) and whether we have +-- since referenced the transaction in another outgoing transaction +CREATE TABLE IF NOT EXISTS sent_transactions( + id BIGINT PRIMARY KEY, -- This is used to apply insertion ordering + transaction_id VARCHAR(150), + destination VARCHAR(150), + response_code INTEGER DEFAULT 0, + response_json bytea, + ts BIGINT +); + +CREATE INDEX sent_transaction_dest ON sent_transactions(destination); +CREATE INDEX sent_transaction_txn_id ON sent_transactions(transaction_id); +-- So that we can do an efficient look up of all transactions that have yet to be successfully +-- sent. +CREATE INDEX sent_transaction_sent ON sent_transactions(response_code); + + +-- For sent transactions only. +CREATE TABLE IF NOT EXISTS transaction_id_to_pdu( + transaction_id INTEGER, + destination VARCHAR(150), + pdu_id VARCHAR(150), + pdu_origin VARCHAR(150), + UNIQUE (transaction_id, destination) +); + +CREATE INDEX transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); + +-- To track destination health +CREATE TABLE IF NOT EXISTS destinations( + destination VARCHAR(150) PRIMARY KEY, + retry_last_ts BIGINT, + retry_interval INTEGER +); diff --git a/synapse/storage/schema/full_schemas/16/users.sql b/synapse/storage/schema/full_schemas/16/users.sql new file mode 100644 index 0000000000..006b249fc0 --- /dev/null +++ b/synapse/storage/schema/full_schemas/16/users.sql @@ -0,0 +1,42 @@ +/* Copyright 2014, 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS users( + name VARCHAR(150), + password_hash VARCHAR(150), + creation_ts BIGINT, + admin SMALLINT DEFAULT 0 NOT NULL, + UNIQUE(name) +); + +CREATE TABLE IF NOT EXISTS access_tokens( + id BIGINT PRIMARY KEY, + user_id VARCHAR(150) NOT NULL, + device_id VARCHAR(150), + token VARCHAR(150) NOT NULL, + last_used BIGINT, + UNIQUE(token) +); + +CREATE TABLE IF NOT EXISTS user_ips ( + user_id VARCHAR(150) NOT NULL, + access_token VARCHAR(150) NOT NULL, + device_id VARCHAR(150), + ip VARCHAR(150) NOT NULL, + user_agent VARCHAR(150) NOT NULL, + last_seen BIGINT NOT NULL +); + +CREATE INDEX user_ips_user ON user_ips(user_id); +CREATE INDEX user_ips_user_ip ON user_ips(user_id, access_token, ip); diff --git a/synapse/storage/schema/schema_version.sql b/synapse/storage/schema/schema_version.sql index 0431e2d051..d9494611e0 100644 --- a/synapse/storage/schema/schema_version.sql +++ b/synapse/storage/schema/schema_version.sql @@ -14,17 +14,14 @@ */ CREATE TABLE IF NOT EXISTS schema_version( - Lock char(1) NOT NULL DEFAULT 'X', -- Makes sure this table only has one row. + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. version INTEGER NOT NULL, upgraded BOOL NOT NULL, -- Whether we reached this version from an upgrade or an initial schema. - CONSTRAINT schema_version_lock_x CHECK (Lock='X') - CONSTRAINT schema_version_lock_uniq UNIQUE (Lock) + CHECK (Lock='X') ); CREATE TABLE IF NOT EXISTS applied_schema_deltas( version INTEGER NOT NULL, - file TEXT NOT NULL, - CONSTRAINT schema_deltas_ver_file UNIQUE (version, file) ON CONFLICT IGNORE + file VARCHAR(150) NOT NULL, + UNIQUE(version, file) ); - -CREATE INDEX IF NOT EXISTS schema_deltas_ver ON applied_schema_deltas(version); diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index d0d53770f2..f051828630 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -56,7 +56,6 @@ class SignatureStore(SQLBaseStore): "algorithm": algorithm, "hash": buffer(hash_bytes), }, - or_ignore=True, ) def get_event_reference_hashes(self, event_ids): @@ -100,7 +99,7 @@ class SignatureStore(SQLBaseStore): " WHERE event_id = ?" ) txn.execute(query, (event_id, )) - return dict(txn.fetchall()) + return {k: v for k, v in txn.fetchall()} def _store_event_reference_hash_txn(self, txn, event_id, algorithm, hash_bytes): @@ -119,7 +118,6 @@ class SignatureStore(SQLBaseStore): "algorithm": algorithm, "hash": buffer(hash_bytes), }, - or_ignore=True, ) def _get_event_signatures_txn(self, txn, event_id): @@ -164,7 +162,6 @@ class SignatureStore(SQLBaseStore): "key_id": key_id, "signature": buffer(signature_bytes), }, - or_ignore=True, ) def _get_prev_event_hashes_txn(self, txn, event_id): @@ -198,5 +195,4 @@ class SignatureStore(SQLBaseStore): "algorithm": algorithm, "hash": buffer(hash_bytes), }, - or_ignore=True, ) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 58dbf2802b..553ba9dd1f 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -17,6 +17,8 @@ from ._base import SQLBaseStore from twisted.internet import defer +from synapse.util.stringutils import random_string + import logging logger = logging.getLogger(__name__) @@ -91,14 +93,15 @@ class StateStore(SQLBaseStore): state_group = context.state_group if not state_group: - state_group = self._simple_insert_txn( + state_group = self._state_groups_id_gen.get_next_txn(txn) + self._simple_insert_txn( txn, table="state_groups", values={ + "id": state_group, "room_id": event.room_id, "event_id": event.event_id, }, - or_ignore=True, ) for state in state_events.values(): @@ -112,7 +115,6 @@ class StateStore(SQLBaseStore): "state_key": state.state_key, "event_id": state.event_id, }, - or_ignore=True, ) self._simple_insert_txn( @@ -122,7 +124,6 @@ class StateStore(SQLBaseStore): "state_group": state_group, "event_id": event.event_id, }, - or_replace=True, ) @defer.inlineCallbacks @@ -154,3 +155,7 @@ class StateStore(SQLBaseStore): events = yield self._parse_events(results) defer.returnValue(events) + + +def _make_group_id(clock): + return str(int(clock.time_msec())) + random_string(5) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 66f307e640..df6de7cbcd 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -35,7 +35,7 @@ what sort order was used: from twisted.internet import defer -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.util.logutils import log_function @@ -110,7 +110,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): if self.topological is None: return "(%d < %s)" % (self.stream, "stream_ordering") else: - return "(%d < %s OR (%d == %s AND %d < %s))" % ( + return "(%d < %s OR (%d = %s AND %d < %s))" % ( self.topological, "topological_ordering", self.topological, "topological_ordering", self.stream, "stream_ordering", @@ -120,7 +120,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): if self.topological is None: return "(%d >= %s)" % (self.stream, "stream_ordering") else: - return "(%d > %s OR (%d == %s AND %d >= %s))" % ( + return "(%d > %s OR (%d = %s AND %d >= %s))" % ( self.topological, "topological_ordering", self.topological, "topological_ordering", self.stream, "stream_ordering", @@ -240,7 +240,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT e.event_id, e.stream_ordering FROM events AS e WHERE " - "(e.outlier = 0 AND (room_id IN (%(current)s)) OR " + "(e.outlier = ? AND (room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " "AND e.stream_ordering > ? AND e.stream_ordering <= ? " "ORDER BY stream_ordering ASC LIMIT %(limit)d " @@ -251,7 +251,7 @@ class StreamStore(SQLBaseStore): } def f(txn): - txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,)) + txn.execute(sql, (False, user_id, user_id, from_id.stream, to_id.stream,)) rows = self.cursor_to_dict(txn) @@ -283,7 +283,7 @@ class StreamStore(SQLBaseStore): # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. - args = [room_id] + args = [False, room_id] if direction == 'b': order = "DESC" bounds = _StreamToken.parse(from_key).upper_bound() @@ -307,7 +307,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT * FROM events" - " WHERE outlier = 0 AND room_id = ? AND %(bounds)s" + " WHERE outlier = ? AND room_id = ? AND %(bounds)s" " ORDER BY topological_ordering %(order)s," " stream_ordering %(order)s %(limit)s" ) % { @@ -358,7 +358,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT stream_ordering, topological_ordering, event_id" " FROM events" - " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0" + " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?" " ORDER BY topological_ordering DESC, stream_ordering DESC" " LIMIT ?" ) @@ -368,17 +368,17 @@ class StreamStore(SQLBaseStore): "SELECT stream_ordering, topological_ordering, event_id" " FROM events" " WHERE room_id = ? AND stream_ordering > ?" - " AND stream_ordering <= ? AND outlier = 0" + " AND stream_ordering <= ? AND outlier = ?" " ORDER BY topological_ordering DESC, stream_ordering DESC" " LIMIT ?" ) def get_recent_events_for_room_txn(txn): if from_token is None: - txn.execute(sql, (room_id, end_token.stream, limit,)) + txn.execute(sql, (room_id, end_token.stream, False, limit,)) else: txn.execute(sql, ( - room_id, from_token.stream, end_token.stream, limit + room_id, from_token.stream, end_token.stream, False, limit )) rows = self.cursor_to_dict(txn) @@ -413,12 +413,10 @@ class StreamStore(SQLBaseStore): "get_recent_events_for_room", get_recent_events_for_room_txn ) - @cached(num_args=0) + @defer.inlineCallbacks def get_room_events_max_id(self): - return self.runInteraction( - "get_room_events_max_id", - self._get_room_events_max_id_txn - ) + token = yield self._stream_id_gen.get_max_token(self) + defer.returnValue("s%d" % (token,)) @defer.inlineCallbacks def _get_min_token(self): @@ -433,27 +431,6 @@ class StreamStore(SQLBaseStore): defer.returnValue(self.min_token) - def get_next_stream_id(self): - with self._next_stream_id_lock: - i = self._next_stream_id - self._next_stream_id += 1 - return i - - def _get_room_events_max_id_txn(self, txn): - txn.execute( - "SELECT MAX(stream_ordering) as m FROM events" - ) - - res = self.cursor_to_dict(txn) - - logger.debug("get_room_events_max_id: %s", res) - - if not res or not res[0] or not res[0]["m"]: - return "s0" - - key = res[0]["m"] - return "s%d" % (key,) - @staticmethod def _set_before_and_after(events, rows): for event, row in zip(events, rows): diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index b777395e06..4c3dc58662 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, Table, cached +from ._base import SQLBaseStore, cached from collections import namedtuple @@ -84,13 +84,18 @@ class TransactionStore(SQLBaseStore): def _set_received_txn_response(self, txn, transaction_id, origin, code, response_json): - query = ( - "UPDATE %s " - "SET response_code = ?, response_json = ? " - "WHERE transaction_id = ? AND origin = ?" - ) % ReceivedTransactionsTable.table_name - - txn.execute(query, (code, response_json, transaction_id, origin)) + self._simple_upsert_txn( + txn, + table=ReceivedTransactionsTable.table_name, + keyvalues={ + "transaction_id": transaction_id, + "origin": origin, + }, + values={ + "response_code": code, + "response_json": response_json, + } + ) def prep_send_transaction(self, transaction_id, destination, origin_server_ts): @@ -118,41 +123,38 @@ class TransactionStore(SQLBaseStore): def _prep_send_transaction(self, txn, transaction_id, destination, origin_server_ts): + next_id = self._transaction_id_gen.get_next_txn(txn) + # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. - query = "%s ORDER BY id DESC LIMIT 1" % ( - SentTransactions.select_statement("destination = ?"), - ) + query = ( + "SELECT * FROM sent_transactions" + " WHERE destination = ?" + " ORDER BY id DESC LIMIT 1" + ) - results = txn.execute(query, (destination,)) - results = SentTransactions.decode_results(results) + txn.execute(query, (destination,)) + results = self.cursor_to_dict(txn) - prev_txns = [r.transaction_id for r in results] + prev_txns = [r["transaction_id"] for r in results] # Actually add the new transaction to the sent_transactions table. - query = SentTransactions.insert_statement() - txn.execute(query, SentTransactions.EntryType( - None, - transaction_id=transaction_id, - destination=destination, - ts=origin_server_ts, - response_code=0, - response_json=None - )) - - # Update the tx id -> pdu id mapping - - # values = [ - # (transaction_id, destination, pdu[0], pdu[1]) - # for pdu in pdu_list - # ] - # - # logger.debug("Inserting: %s", repr(values)) - # - # query = TransactionsToPduTable.insert_statement() - # txn.executemany(query, values) + self._simple_insert_txn( + txn, + table=SentTransactions.table_name, + values={ + "id": next_id, + "transaction_id": transaction_id, + "destination": destination, + "ts": origin_server_ts, + "response_code": 0, + "response_json": None, + } + ) + + # TODO Update the tx id -> pdu id mapping return prev_txns @@ -171,15 +173,20 @@ class TransactionStore(SQLBaseStore): transaction_id, destination, code, response_dict ) - def _delivered_txn(cls, txn, transaction_id, destination, + def _delivered_txn(self, txn, transaction_id, destination, code, response_json): - query = ( - "UPDATE %s " - "SET response_code = ?, response_json = ? " - "WHERE transaction_id = ? AND destination = ?" - ) % SentTransactions.table_name - - txn.execute(query, (code, response_json, transaction_id, destination)) + self._simple_update_one_txn( + txn, + table=SentTransactions.table_name, + keyvalues={ + "transaction_id": transaction_id, + "destination": destination, + }, + updatevalues={ + "response_code": code, + "response_json": response_json, + } + ) def get_transactions_after(self, transaction_id, destination): """Get all transactions after a given local transaction_id. @@ -189,25 +196,26 @@ class TransactionStore(SQLBaseStore): destination (str) Returns: - list: A list of `ReceivedTransactionsTable.EntryType` + list: A list of dicts """ return self.runInteraction( "get_transactions_after", self._get_transactions_after, transaction_id, destination ) - def _get_transactions_after(cls, txn, transaction_id, destination): - where = ( - "destination = ? AND id > (select id FROM %s WHERE " - "transaction_id = ? AND destination = ?)" - ) % ( - SentTransactions.table_name + def _get_transactions_after(self, txn, transaction_id, destination): + query = ( + "SELECT * FROM sent_transactions" + " WHERE destination = ? AND id >" + " (" + " SELECT id FROM sent_transactions" + " WHERE transaction_id = ? AND destination = ?" + " )" ) - query = SentTransactions.select_statement(where) txn.execute(query, (destination, transaction_id, destination)) - return ReceivedTransactionsTable.decode_results(txn.fetchall()) + return self.cursor_to_dict(txn) @cached() def get_destination_retry_timings(self, destination): @@ -218,22 +226,27 @@ class TransactionStore(SQLBaseStore): Returns: None if not retrying - Otherwise a DestinationsTable.EntryType for the retry scheme + Otherwise a dict for the retry scheme """ return self.runInteraction( "get_destination_retry_timings", self._get_destination_retry_timings, destination) - def _get_destination_retry_timings(cls, txn, destination): - query = DestinationsTable.select_statement("destination = ?") - txn.execute(query, (destination,)) - result = txn.fetchall() - if result: - result = DestinationsTable.decode_single_result(result) - if result.retry_last_ts > 0: - return result - else: - return None + def _get_destination_retry_timings(self, txn, destination): + result = self._simple_select_one_txn( + txn, + table=DestinationsTable.table_name, + keyvalues={ + "destination": destination, + }, + retcols=DestinationsTable.fields, + allow_none=True, + ) + + if result and result["retry_last_ts"] > 0: + return result + else: + return None def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval): @@ -249,11 +262,11 @@ class TransactionStore(SQLBaseStore): # As this is the new value, we might as well prefill the cache self.get_destination_retry_timings.prefill( destination, - DestinationsTable.EntryType( - destination, - retry_last_ts, - retry_interval - ) + { + "destination": destination, + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval + }, ) # XXX: we could chose to not bother persisting this if our cache thinks @@ -266,22 +279,38 @@ class TransactionStore(SQLBaseStore): retry_interval, ) - def _set_destination_retry_timings(cls, txn, destination, + def _set_destination_retry_timings(self, txn, destination, retry_last_ts, retry_interval): - query = ( - "INSERT OR REPLACE INTO %s " - "(destination, retry_last_ts, retry_interval) " - "VALUES (?, ?, ?) " - ) % DestinationsTable.table_name + "UPDATE destinations" + " SET retry_last_ts = ?, retry_interval = ?" + " WHERE destination = ?" + ) + + txn.execute( + query, + ( + retry_last_ts, retry_interval, destination, + ) + ) - txn.execute(query, (destination, retry_last_ts, retry_interval)) + if txn.rowcount == 0: + # destination wasn't already in table. Insert it. + self._simple_insert_txn( + txn, + table="destinations", + values={ + "destination": destination, + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + } + ) def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction. Returns: - list: A list of `DestinationsTable.EntryType` + list: A list of dicts """ return self.runInteraction( @@ -289,14 +318,17 @@ class TransactionStore(SQLBaseStore): self._get_destinations_needing_retry ) - def _get_destinations_needing_retry(cls, txn): - where = "retry_last_ts > 0 and retry_next_ts < now()" - query = DestinationsTable.select_statement(where) - txn.execute(query) - return DestinationsTable.decode_results(txn.fetchall()) + def _get_destinations_needing_retry(self, txn): + query = ( + "SELECT * FROM destinations" + " WHERE retry_last_ts > 0 and retry_next_ts < ?" + ) + + txn.execute(query, (self._clock.time_msec(),)) + return self.cursor_to_dict(txn) -class ReceivedTransactionsTable(Table): +class ReceivedTransactionsTable(object): table_name = "received_transactions" fields = [ @@ -308,10 +340,8 @@ class ReceivedTransactionsTable(Table): "has_been_referenced", ] - EntryType = namedtuple("ReceivedTransactionsEntry", fields) - -class SentTransactions(Table): +class SentTransactions(object): table_name = "sent_transactions" fields = [ @@ -326,7 +356,7 @@ class SentTransactions(Table): EntryType = namedtuple("SentTransactionsEntry", fields) -class TransactionsToPduTable(Table): +class TransactionsToPduTable(object): table_name = "transaction_id_to_pdu" fields = [ @@ -336,10 +366,8 @@ class TransactionsToPduTable(Table): "pdu_origin", ] - EntryType = namedtuple("TransactionsToPduEntry", fields) - -class DestinationsTable(Table): +class DestinationsTable(object): table_name = "destinations" fields = [ @@ -347,5 +375,3 @@ class DestinationsTable(Table): "retry_last_ts", "retry_interval", ] - - EntryType = namedtuple("DestinationsEntry", fields) diff --git a/synapse/storage/util/__init__.py b/synapse/storage/util/__init__.py new file mode 100644 index 0000000000..c488b10d3c --- /dev/null +++ b/synapse/storage/util/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py new file mode 100644 index 0000000000..e5dec1c948 --- /dev/null +++ b/synapse/storage/util/id_generators.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from collections import deque +import contextlib +import threading + + +class IdGenerator(object): + def __init__(self, table, column, store): + self.table = table + self.column = column + self.store = store + self._lock = threading.Lock() + self._next_id = None + + @defer.inlineCallbacks + def get_next(self): + with self._lock: + if not self._next_id: + res = yield self.store._execute_and_decode( + "IdGenerator_%s" % (self.table,), + "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,) + ) + + self._next_id = (res and res[0] and res[0]["mx"]) or 1 + + i = self._next_id + self._next_id += 1 + defer.returnValue(i) + + def get_next_txn(self, txn): + with self._lock: + if self._next_id: + i = self._next_id + self._next_id += 1 + return i + else: + txn.execute( + "SELECT MAX(%s) FROM %s" % (self.column, self.table,) + ) + + val, = txn.fetchone() + cur = val or 0 + cur += 1 + self._next_id = cur + 1 + + return cur + + +class StreamIdGenerator(object): + """Used to generate new stream ids when persisting events while keeping + track of which transactions have been completed. + + This allows us to get the "current" stream id, i.e. the stream id such that + all ids less than or equal to it have completed. This handles the fact that + persistence of events can complete out of order. + + Usage: + with stream_id_gen.get_next_txn(txn) as stream_id: + # ... persist event ... + """ + def __init__(self): + self._lock = threading.Lock() + + self._current_max = None + self._unfinished_ids = deque() + + def get_next_txn(self, txn): + """ + Usage: + with stream_id_gen.get_next_txn(txn) as stream_id: + # ... persist event ... + """ + with self._lock: + if not self._current_max: + self._compute_current_max(txn) + + self._current_max += 1 + next_id = self._current_max + + self._unfinished_ids.append(next_id) + + @contextlib.contextmanager + def manager(): + try: + yield next_id + finally: + with self._lock: + self._unfinished_ids.remove(next_id) + + return manager() + + def get_max_token(self, store): + """Returns the maximum stream id such that all stream ids less than or + equal to it have been successfully persisted. + """ + with self._lock: + if self._unfinished_ids: + return self._unfinished_ids[0] - 1 + + if not self._current_max: + return store.runInteraction( + "_compute_current_max", + self._compute_current_max, + ) + + return self._current_max + + def _compute_current_max(self, txn): + txn.execute("SELECT MAX(stream_ordering) FROM events") + val, = txn.fetchone() + + self._current_max = int(val) if val else 1 + + return self._current_max diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 4e82232796..a42138f556 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -60,7 +60,7 @@ def get_retry_limiter(destination, clock, store, **kwargs): if retry_timings: retry_last_ts, retry_interval = ( - retry_timings.retry_last_ts, retry_timings.retry_interval + retry_timings["retry_last_ts"], retry_timings["retry_interval"] ) now = int(clock.time_msec()) |