diff options
Diffstat (limited to 'synapse')
45 files changed, 1032 insertions, 740 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index afb46d2e23..fbc9a43d66 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -17,9 +17,8 @@ import sys sys.dont_write_bytecode = True -from synapse.storage import ( - prepare_database, prepare_sqlite3_database, UpgradeDatabaseException, -) +from synapse.storage import UpgradeDatabaseException +from synapse.storage.engines import create_engine from synapse.server import HomeServer @@ -57,9 +56,10 @@ import os import re import resource import subprocess -import sqlite3 +import yaml + -logger = logging.getLogger(__name__) +logger = logging.getLogger("synapse.app.homeserver") class SynapseHomeServer(HomeServer): @@ -103,13 +103,11 @@ class SynapseHomeServer(HomeServer): return None def build_db_pool(self): + name = self.db_config["name"] + return adbapi.ConnectionPool( - "sqlite3", self.get_db_name(), - check_same_thread=False, - cp_min=1, - cp_max=1, - cp_openfun=prepare_database, # Prepare the database for each conn - # so that :memory: sqlite works + name, + **self.db_config.get("args", {}) ) def create_resource_tree(self, redirect_root_to_web_client): @@ -352,15 +350,52 @@ def setup(config_options): tls_context_factory = context_factory.ServerContextFactory(config) + if config.database_config: + with open(config.database_config, 'r') as f: + db_config = yaml.safe_load(f) + else: + db_config = { + "name": "sqlite3", + "args": { + "database": config.database_path, + }, + } + + db_config = { + k: v for k, v in db_config.items() + if not k.startswith("cp_") + } + + name = db_config.get("name", None) + if name in ["MySQLdb", "mysql.connector"]: + db_config.setdefault("args", {}).update({ + "sql_mode": "TRADITIONAL", + "charset": "utf8mb4", + "use_unicode": True, + "collation": "utf8mb4_general_ci", + }) + elif name == "sqlite3": + db_config.setdefault("args", {}).update({ + "cp_min": 1, + "cp_max": 1, + }) + else: + raise RuntimeError("Unsupported database type '%s'" % (name,)) + + database_engine = create_engine(name) + db_config["args"]["cp_openfun"] = database_engine.on_new_connection + hs = SynapseHomeServer( config.server_name, domain_with_port=domain_with_port, upload_dir=os.path.abspath("uploads"), db_name=config.database_path, + db_config=db_config, tls_context_factory=tls_context_factory, config=config, content_addr=config.content_addr, version_string=version_string, + database_engine=database_engine, ) hs.create_resource_tree( @@ -372,9 +407,16 @@ def setup(config_options): logger.info("Preparing database: %s...", db_name) try: - with sqlite3.connect(db_name) as db_conn: - prepare_sqlite3_database(db_conn) - prepare_database(db_conn) + db_conn = database_engine.module.connect( + **{ + k: v for k, v in db_config.get("args", {}).items() + if not k.startswith("cp_") + } + ) + + database_engine.prepare_database(db_conn) + + db_conn.commit() except UpgradeDatabaseException: sys.stderr.write( "\nFailed to upgrade database.\n" diff --git a/synapse/config/database.py b/synapse/config/database.py index 87efe54645..8dc9873f8c 100644 --- a/synapse/config/database.py +++ b/synapse/config/database.py @@ -26,6 +26,11 @@ class DatabaseConfig(Config): self.database_path = self.abspath(args.database_path) self.event_cache_size = self.parse_size(args.event_cache_size) + if args.database_config: + self.database_config = self.abspath(args.database_config) + else: + self.database_config = None + @classmethod def add_arguments(cls, parser): super(DatabaseConfig, cls).add_arguments(parser) @@ -38,6 +43,10 @@ class DatabaseConfig(Config): "--event-cache-size", default="100K", help="Number of events to cache in memory." ) + db_group.add_argument( + "--database-config", default=None, + help="Location of the database configuration file." + ) @classmethod def generate_config(cls, args, config_dir_path): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 15ba417e06..9a4773ac02 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -179,7 +179,7 @@ class FederationHandler(BaseHandler): # it's probably a good idea to mark it as not in retry-state # for sending (although this is a bit of a leap) retry_timings = yield self.store.get_destination_retry_timings(origin) - if (retry_timings and retry_timings.retry_last_ts): + if retry_timings and retry_timings["retry_last_ts"]: self.store.set_destination_retry_timings(origin, 0, 0) room = yield self.store.get_room(event.room_id) diff --git a/synapse/handlers/login.py b/synapse/handlers/login.py index 7447800460..76647c7941 100644 --- a/synapse/handlers/login.py +++ b/synapse/handlers/login.py @@ -57,7 +57,7 @@ class LoginHandler(BaseHandler): logger.warn("Attempted to login as %s but they do not exist", user) raise LoginError(403, "", errcode=Codes.FORBIDDEN) - stored_hash = user_info[0]["password_hash"] + stored_hash = user_info["password_hash"] if bcrypt.checkpw(password, stored_hash): # generate an access token and store it. token = self.reg_handler._generate_token(user) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 823affc380..bc7f1c2402 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -311,25 +311,6 @@ class RoomMemberHandler(BaseHandler): defer.returnValue(chunk_data) @defer.inlineCallbacks - def get_room_member(self, room_id, member_user_id, auth_user_id): - """Retrieve a room member from a room. - - Args: - room_id : The room the member is in. - member_user_id : The member's user ID - auth_user_id : The user ID of the user making this request. - Returns: - The room member, or None if this member does not exist. - Raises: - SynapseError if something goes wrong. - """ - yield self.auth.check_joined_room(room_id, auth_user_id) - - member = yield self.store.get_room_member(user_id=member_user_id, - room_id=room_id) - defer.returnValue(member) - - @defer.inlineCallbacks def change_membership(self, event, context, do_auth=True): """ Change the membership status of a user in a room. diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py index 1e77eb49cf..7387b4adb9 100644 --- a/synapse/rest/client/v1/profile.py +++ b/synapse/rest/client/v1/profile.py @@ -19,9 +19,13 @@ from twisted.internet import defer from .base import ClientV1RestServlet, client_path_pattern from synapse.types import UserID +import logging import simplejson as json +logger = logging.getLogger(__name__) + + class ProfileDisplaynameRestServlet(ClientV1RestServlet): PATTERN = client_path_pattern("/profile/(?P<user_id>[^/]*)/displayname") @@ -47,7 +51,8 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet): defer.returnValue((400, "Unable to parse name")) yield self.handlers.profile_handler.set_displayname( - user, auth_user, new_name) + user, auth_user, new_name + ) defer.returnValue((200, {})) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f4dec70393..87db382fbb 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -104,14 +104,16 @@ class DataStore(RoomMemberStore, RoomStore, self.client_ip_last_seen.prefill(*key + (now,)) - yield self._simple_insert( + yield self._simple_upsert( "user_ips", - { + keyvalues={ "user": user.to_string(), "access_token": access_token, - "device_id": device_id, "ip": ip, "user_agent": user_agent, + }, + values={ + "device_id": device_id, "last_seen": now, }, desc="insert_client_ip", @@ -148,21 +150,23 @@ class UpgradeDatabaseException(PrepareDatabaseException): pass -def prepare_database(db_conn): +def prepare_database(db_conn, database_engine): """Prepares a database for usage. Will either create all necessary tables or upgrade from an older schema version. """ try: cur = db_conn.cursor() - version_info = _get_or_create_schema_state(cur) + version_info = _get_or_create_schema_state(cur, database_engine) if version_info: user_version, delta_files, upgraded = version_info - _upgrade_existing_database(cur, user_version, delta_files, upgraded) + _upgrade_existing_database( + cur, user_version, delta_files, upgraded, database_engine + ) else: - _setup_new_database(cur) + _setup_new_database(cur, database_engine) - cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,)) + # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,)) cur.close() db_conn.commit() @@ -171,7 +175,7 @@ def prepare_database(db_conn): raise -def _setup_new_database(cur): +def _setup_new_database(cur, database_engine): """Sets up the database by finding a base set of "full schemas" and then applying any necessary deltas. @@ -225,31 +229,30 @@ def _setup_new_database(cur): directory_entries = os.listdir(sql_dir) - sql_script = "BEGIN TRANSACTION;\n" for filename in fnmatch.filter(directory_entries, "*.sql"): sql_loc = os.path.join(sql_dir, filename) logger.debug("Applying schema %s", sql_loc) - sql_script += read_schema(sql_loc) - sql_script += "\n" - sql_script += "COMMIT TRANSACTION;" - cur.executescript(sql_script) + executescript(cur, sql_loc) cur.execute( - "INSERT OR REPLACE INTO schema_version (version, upgraded)" - " VALUES (?,?)", - (max_current_ver, False) + database_engine.convert_param_style( + "REPLACE INTO schema_version (version, upgraded)" + " VALUES (?,?)" + ), + (max_current_ver, False,) ) _upgrade_existing_database( cur, current_version=max_current_ver, applied_delta_files=[], - upgraded=False + upgraded=False, + database_engine=database_engine, ) def _upgrade_existing_database(cur, current_version, applied_delta_files, - upgraded): + upgraded, database_engine): """Upgrades an existing database. Delta files can either be SQL stored in *.sql files, or python modules @@ -305,6 +308,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, if not upgraded: start_ver += 1 + logger.debug("applied_delta_files: %s", applied_delta_files) + for v in range(start_ver, SCHEMA_VERSION + 1): logger.debug("Upgrading schema to v%d", v) @@ -321,6 +326,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, directory_entries.sort() for file_name in directory_entries: relative_path = os.path.join(str(v), file_name) + logger.debug("Found file: %s", relative_path) if relative_path in applied_delta_files: continue @@ -342,9 +348,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, module.run_upgrade(cur) elif ext == ".sql": # A plain old .sql file, just read and execute it - delta_schema = read_schema(absolute_path) logger.debug("Applying schema %s", relative_path) - cur.executescript(delta_schema) + executescript(cur, absolute_path) else: # Not a valid delta file. logger.warn( @@ -356,24 +361,85 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, # Mark as done. cur.execute( - "INSERT INTO applied_schema_deltas (version, file)" - " VALUES (?,?)", + database_engine.convert_param_style( + "INSERT INTO applied_schema_deltas (version, file)" + " VALUES (?,?)", + ), (v, relative_path) ) cur.execute( - "INSERT OR REPLACE INTO schema_version (version, upgraded)" - " VALUES (?,?)", + database_engine.convert_param_style( + "REPLACE INTO schema_version (version, upgraded)" + " VALUES (?,?)", + ), (v, True) ) -def _get_or_create_schema_state(txn): - schema_path = os.path.join( - dir_path, "schema", "schema_version.sql", - ) - create_schema = read_schema(schema_path) - txn.executescript(create_schema) +def get_statements(f): + statement_buffer = "" + in_comment = False # If we're in a /* ... */ style comment + + for line in f: + line = line.strip() + + if in_comment: + # Check if this line contains an end to the comment + comments = line.split("*/", 1) + if len(comments) == 1: + continue + line = comments[1] + in_comment = False + + # Remove inline block comments + line = re.sub(r"/\*.*\*/", " ", line) + + # Does this line start a comment? + comments = line.split("/*", 1) + if len(comments) > 1: + line = comments[0] + in_comment = True + + # Deal with line comments + line = line.split("--", 1)[0] + line = line.split("//", 1)[0] + + # Find *all* semicolons. We need to treat first and last entry + # specially. + statements = line.split(";") + + # We must prepend statement_buffer to the first statement + first_statement = "%s %s" % ( + statement_buffer.strip(), + statements[0].strip() + ) + statements[0] = first_statement + + # Every entry, except the last, is a full statement + for statement in statements[:-1]: + yield statement.strip() + + # The last entry did *not* end in a semicolon, so we store it for the + # next semicolon we find + statement_buffer = statements[-1].strip() + + +def executescript(txn, schema_path): + with open(schema_path, 'r') as f: + for statement in get_statements(f): + txn.execute(statement) + + +def _get_or_create_schema_state(txn, database_engine): + try: + # Bluntly try creating the schema_version tables. + schema_path = os.path.join( + dir_path, "schema", "schema_version.sql", + ) + executescript(txn, schema_path) + except: + pass txn.execute("SELECT version, upgraded FROM schema_version") row = txn.fetchone() @@ -382,10 +448,13 @@ def _get_or_create_schema_state(txn): if current_version: txn.execute( - "SELECT file FROM applied_schema_deltas WHERE version >= ?", + database_engine.convert_param_style( + "SELECT file FROM applied_schema_deltas WHERE version >= ?" + ), (current_version,) ) - return current_version, txn.fetchall(), upgraded + applied_deltas = [d for d, in txn.fetchall()] + return current_version, applied_deltas, upgraded return None @@ -417,7 +486,7 @@ def prepare_sqlite3_database(db_conn): if row and row[0]: db_conn.execute( - "INSERT OR REPLACE INTO schema_version (version, upgraded)" + "REPLACE INTO schema_version (version, upgraded)" " VALUES (?,?)", (row[0], False) ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 53eee10d51..23289bbdd4 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -29,6 +29,7 @@ import functools import simplejson as json import sys import time +import threading logger = logging.getLogger(__name__) @@ -145,11 +146,12 @@ class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() method.""" - __slots__ = ["txn", "name"] + __slots__ = ["txn", "name", "database_engine"] - def __init__(self, txn, name): + def __init__(self, txn, name, database_engine): object.__setattr__(self, "txn", txn) object.__setattr__(self, "name", name) + object.__setattr__(self, "database_engine", database_engine) def __getattr__(self, name): return getattr(self.txn, name) @@ -161,26 +163,32 @@ class LoggingTransaction(object): # TODO(paul): Maybe use 'info' and 'debug' for values? sql_logger.debug("[SQL] {%s} %s", self.name, sql) - try: - if args and args[0]: - values = args[0] + sql = self.database_engine.convert_param_style(sql) + + if args and args[0]: + args = list(args) + args[0] = [ + self.database_engine.encode_parameter(a) for a in args[0] + ] + try: sql_logger.debug( - "[SQL values] {%s} " + ", ".join(("<%r>",) * len(values)), + "[SQL values] {%s} " + ", ".join(("<%r>",) * len(args[0])), self.name, - *values + *args[0] ) - except: - # Don't let logging failures stop SQL from working - pass + except: + # Don't let logging failures stop SQL from working + pass start = time.time() * 1000 + try: return self.txn.execute( sql, *args, **kwargs ) - except: - logger.exception("[SQL FAIL] {%s}", self.name) - raise + except Exception as e: + logger.debug("[SQL FAIL] {%s} %s", self.name, e) + raise finally: msecs = (time.time() * 1000) - start sql_logger.debug("[SQL time] {%s} %f", self.name, msecs) @@ -224,6 +232,46 @@ class PerformanceCounters(object): return top_n_counters +class IdGenerator(object): + def __init__(self, table, column, store): + self.table = table + self.column = column + self.store = store + self._lock = threading.Lock() + self._next_id = None + + @defer.inlineCallbacks + def get_next(self): + with self._lock: + if not self._next_id: + res = yield self.store._execute_and_decode( + "IdGenerator_%s" % (self.table,), + "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,) + ) + + self._next_id = (res and res[0] and res[0]["mx"]) or 1 + + i = self._next_id + self._next_id += 1 + defer.returnValue(i) + + def get_next_txn(self, txn): + with self._lock: + if self._next_id: + i = self._next_id + self._next_id += 1 + return i + else: + txn.execute( + "SELECT MAX(%s) FROM %s" % (self.column, self.table,) + ) + + val, = txn.fetchone() + self._next_id = val or 2 + + return 1 + + class SQLBaseStore(object): _TXN_ID = 0 @@ -244,9 +292,16 @@ class SQLBaseStore(object): self._get_event_cache = LruCache(hs.config.event_cache_size) + self.database_engine = hs.database_engine + # Pretend the getEventCache is just another named cache caches_by_name["*getEvent*"] = self._get_event_cache + self._stream_id_gen = IdGenerator("events", "stream_ordering", self) + self._transaction_id_gen = IdGenerator("sent_transactions", "id", self) + self._state_groups_id_gen = IdGenerator("state_groups", "id", self) + self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self) + def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() @@ -283,7 +338,7 @@ class SQLBaseStore(object): start_time = time.time() * 1000 - def inner_func(txn, *args, **kwargs): + def inner_func(conn, *args, **kwargs): with LoggingContext("runInteraction") as context: current_context.copy_to(context) start = time.time() * 1000 @@ -298,9 +353,26 @@ class SQLBaseStore(object): sql_scheduling_timer.inc_by(time.time() * 1000 - start_time) transaction_logger.debug("[TXN START] {%s}", name) try: - return func(LoggingTransaction(txn, name), *args, **kwargs) - except: - logger.exception("[TXN FAIL] {%s}", name) + i = 0 + N = 5 + while True: + try: + txn = conn.cursor() + return func( + LoggingTransaction(txn, name, self.database_engine), + *args, **kwargs + ) + except self.database_engine.module.DatabaseError as e: + logger.warn("[TXN DEADLOCK] {%s} %r, %r", name, e.errno, e.sqlstate) + if self.database_engine.is_deadlock(e): + logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N) + if i < N: + i += 1 + conn.rollback() + continue + raise + except Exception as e: + logger.debug("[TXN FAIL] {%s}", name, e) raise finally: end = time.time() * 1000 @@ -313,7 +385,7 @@ class SQLBaseStore(object): sql_txn_timer.inc_by(duration, desc) with PreserveLoggingContext(): - result = yield self._db_pool.runInteraction( + result = yield self._db_pool.runWithConnection( inner_func, *args, **kwargs ) defer.returnValue(result) @@ -344,11 +416,11 @@ class SQLBaseStore(object): The result of decoder(results) """ def interaction(txn): - cursor = txn.execute(query, args) + txn.execute(query, args) if decoder: - return decoder(cursor) + return decoder(txn) else: - return cursor.fetchall() + return txn.fetchall() return self.runInteraction(desc, interaction) @@ -358,27 +430,23 @@ class SQLBaseStore(object): # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. - def _simple_insert(self, table, values, or_replace=False, or_ignore=False, + def _simple_insert(self, table, values, or_ignore=False, desc="_simple_insert"): """Executes an INSERT query on the named table. Args: table : string giving the table name values : dict of new column names and values for them - or_replace : bool; if True performs an INSERT OR REPLACE """ return self.runInteraction( desc, - self._simple_insert_txn, table, values, or_replace=or_replace, - or_ignore=or_ignore, + self._simple_insert_txn, table, values, + or_ignore=or_ignore ) @log_function - def _simple_insert_txn(self, txn, table, values, or_replace=False, - or_ignore=False): - sql = "%s INTO %s (%s) VALUES(%s)" % ( - ("INSERT OR REPLACE" if or_replace else - "INSERT OR IGNORE" if or_ignore else "INSERT"), + def _simple_insert_txn(self, txn, table, values, or_ignore=False): + sql = "INSERT INTO %s (%s) VALUES(%s)" % ( table, ", ".join(k for k in values), ", ".join("?" for k in values) @@ -389,8 +457,11 @@ class SQLBaseStore(object): sql, values.values(), ) - txn.execute(sql, values.values()) - return txn.lastrowid + try: + txn.execute(sql, values.values()) + except self.database_engine.module.IntegrityError: + if not or_ignore: + raise def _simple_upsert(self, table, keyvalues, values, desc="_simple_upsert"): """ @@ -491,8 +562,7 @@ class SQLBaseStore(object): def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol): sql = ( - "SELECT %(retcol)s FROM %(table)s WHERE %(where)s " - "ORDER BY rowid asc" + "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" ) % { "retcol": retcol, "table": table, @@ -550,14 +620,14 @@ class SQLBaseStore(object): retcols : list of strings giving the names of the columns to return """ if keyvalues: - sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( + sql = "SELECT %s FROM %s WHERE %s" % ( ", ".join(retcols), table, " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) txn.execute(sql, keyvalues.values()) else: - sql = "SELECT %s FROM %s ORDER BY rowid asc" % ( + sql = "SELECT %s FROM %s" % ( ", ".join(retcols), table ) @@ -609,10 +679,10 @@ class SQLBaseStore(object): def _simple_select_one_txn(self, txn, table, keyvalues, retcols, allow_none=False): - select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( + select_sql = "SELECT %s FROM %s WHERE %s" % ( ", ".join(retcols), table, - " AND ".join("%s = ?" % (k) for k in keyvalues) + " AND ".join("%s = ?" % (k,) for k in keyvalues) ) txn.execute(select_sql, keyvalues.values()) @@ -650,6 +720,11 @@ class SQLBaseStore(object): updatevalues=updatevalues, ) + # if txn.rowcount == 0: + # raise StoreError(404, "No row found") + if txn.rowcount > 1: + raise StoreError(500, "More than one row matched") + return ret return self.runInteraction(desc, func) @@ -798,10 +873,12 @@ class SQLBaseStore(object): sql_getevents_timer.inc_by(curr_time - last_time, desc) return curr_time - d = json.loads(js) + logger.debug("Got js: %r", js) + d = json.loads(str(js).decode("utf8")) start_time = update_counter("decode_json", start_time) - internal_metadata = json.loads(internal_metadata) + logger.debug("Got internal_metadata: %r", internal_metadata) + internal_metadata = json.loads(str(internal_metadata).decode("utf8")) start_time = update_counter("decode_internal", start_time) ev = FrozenEvent(d, internal_metadata_dict=internal_metadata) @@ -851,6 +928,12 @@ class SQLBaseStore(object): result = txn.fetchone() return result[0] if result else None + def get_next_stream_id(self): + with self._next_stream_id_lock: + i = self._next_stream_id + self._next_stream_id += 1 + return i + class _RollbackButIsFineException(Exception): """ This exception is used to rollback a transaction without implying @@ -874,7 +957,7 @@ class Table(object): _select_where_clause = "SELECT %s FROM %s WHERE %s" _select_clause = "SELECT %s FROM %s" - _insert_clause = "INSERT OR REPLACE INTO %s (%s) VALUES (%s)" + _insert_clause = "REPLACE INTO %s (%s) VALUES (%s)" @classmethod def select_statement(cls, where_clause=None): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index f8cbb3f323..40e05b3635 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -366,11 +366,13 @@ class ApplicationServiceTransactionStore(SQLBaseStore): new_txn_id = max(highest_txn_id, last_txn_id) + 1 # Insert new txn into txn table - event_ids = [e.event_id for e in events] + event_ids = buffer( + json.dumps([e.event_id for e in events]).encode("utf8") + ) txn.execute( "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " "VALUES(?,?,?)", - (service.id, new_txn_id, json.dumps(event_ids)) + (service.id, new_txn_id, event_ids) ) return AppServiceTransaction( service=service, id=new_txn_id, events=events diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 0199539fea..cfb2005706 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -120,12 +120,12 @@ class DirectoryStore(SQLBaseStore): defer.returnValue(room_id) def _delete_room_alias_txn(self, txn, room_alias): - cursor = txn.execute( + txn.execute( "SELECT room_id FROM room_aliases WHERE room_alias = ?", (room_alias.to_string(),) ) - res = cursor.fetchone() + res = txn.fetchone() if res: room_id = res[0] else: diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py new file mode 100644 index 0000000000..29702be923 --- /dev/null +++ b/synapse/storage/engines/__init__.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .maria import MariaEngine +from .sqlite3 import Sqlite3Engine + +import importlib + + +SUPPORTED_MODULE = { + "sqlite3": Sqlite3Engine, + "mysql.connector": MariaEngine, +} + + +def create_engine(name): + engine_class = SUPPORTED_MODULE.get(name, None) + + if engine_class: + module = importlib.import_module(name) + return engine_class(module) + + raise RuntimeError( + "Unsupported database engine '%s'" % (name,) + ) diff --git a/synapse/storage/engines/maria.py b/synapse/storage/engines/maria.py new file mode 100644 index 0000000000..230b32858c --- /dev/null +++ b/synapse/storage/engines/maria.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage import prepare_database + +import types + + +class MariaEngine(object): + def __init__(self, database_module): + self.module = database_module + + def convert_param_style(self, sql): + return sql.replace("?", "%s") + + def encode_parameter(self, param): + if isinstance(param, types.BufferType): + return str(param) + return param + + def on_new_connection(self, db_conn): + pass + + def prepare_database(self, db_conn): + cur = db_conn.cursor() + cur.execute( + "ALTER DATABASE CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci" + ) + db_conn.commit() + prepare_database(db_conn, self) + + def is_deadlock(self, error): + if isinstance(error, self.module.DatabaseError): + return error.sqlstate == "40001" and error.errno == 1213 + return False diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py new file mode 100644 index 0000000000..72c11df461 --- /dev/null +++ b/synapse/storage/engines/sqlite3.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage import prepare_database, prepare_sqlite3_database + + +class Sqlite3Engine(object): + def __init__(self, database_module): + self.module = database_module + + def convert_param_style(self, sql): + return sql + + def encode_parameter(self, param): + return param + + def on_new_connection(self, db_conn): + self.prepare_database(db_conn) + + def prepare_database(self, db_conn): + prepare_sqlite3_database(db_conn) + prepare_database(db_conn, self) + + def is_deadlock(self, error): + return False diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 032334bfd6..79ad5ddc9c 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -242,7 +242,6 @@ class EventFederationStore(SQLBaseStore): "room_id": room_id, "min_depth": depth, }, - or_replace=True, ) def _handle_prev_events(self, txn, outlier, event_id, prev_events, @@ -262,7 +261,6 @@ class EventFederationStore(SQLBaseStore): "room_id": room_id, "is_state": 0, }, - or_ignore=True, ) # Update the extremities table if this is not an outlier. @@ -281,19 +279,19 @@ class EventFederationStore(SQLBaseStore): # We only insert as a forward extremity the new event if there are # no other events that reference it as a prev event query = ( - "INSERT OR IGNORE INTO %(table)s (event_id, room_id) " - "SELECT ?, ? WHERE NOT EXISTS (" - "SELECT 1 FROM %(event_edges)s WHERE " - "prev_event_id = ? " - ")" - ) % { - "table": "event_forward_extremities", - "event_edges": "event_edges", - } + "SELECT 1 FROM event_edges WHERE prev_event_id = ?" + ) - logger.debug("query: %s", query) + txn.execute(query, (event_id,)) + + if not txn.fetchone(): + query = ( + "INSERT INTO event_forward_extremities" + " (event_id, room_id)" + " VALUES (?, ?)" + ) - txn.execute(query, (event_id, room_id, event_id)) + txn.execute(query, (event_id, room_id)) # Insert all the prev_events as a backwards thing, they'll get # deleted in a second if they're incorrect anyway. @@ -306,7 +304,6 @@ class EventFederationStore(SQLBaseStore): "event_id": e_id, "room_id": room_id, }, - or_ignore=True, ) # Also delete from the backwards extremities table all ones that diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a86230d92c..3b3416716e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -96,12 +96,16 @@ class EventsStore(SQLBaseStore): # Remove the any existing cache entries for the event_id self._get_event_cache.pop(event.event_id) + if stream_ordering is None: + stream_ordering = self._stream_id_gen.get_next_txn(txn) + # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) + self._simple_delete_txn( + txn, + table="current_state_events", + keyvalues={"room_id": event.room_id}, ) for s in current_state: @@ -114,7 +118,6 @@ class EventsStore(SQLBaseStore): "type": s.type, "state_key": s.state_key, }, - or_replace=True, ) if event.is_state() and is_new_state: @@ -128,7 +131,6 @@ class EventsStore(SQLBaseStore): "type": event.type, "state_key": event.state_key, }, - or_replace=True, ) for prev_state_id, _ in event.prev_state: @@ -151,14 +153,6 @@ class EventsStore(SQLBaseStore): event.depth ) - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, - ) - have_persisted = self._simple_select_one_onecol_txn( txn, table="event_json", @@ -185,7 +179,7 @@ class EventsStore(SQLBaseStore): ) txn.execute( sql, - (metadata_json.decode("UTF-8"), event.event_id,) + (buffer(metadata_json), event.event_id,) ) sql = ( @@ -198,10 +192,16 @@ class EventsStore(SQLBaseStore): ) return + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + if event.type == EventTypes.Member: self._store_room_member_txn(txn, event) - elif event.type == EventTypes.Feedback: - self._store_feedback_txn(txn, event) elif event.type == EventTypes.Name: self._store_room_name_txn(txn, event) elif event.type == EventTypes.Topic: @@ -224,15 +224,14 @@ class EventsStore(SQLBaseStore): values={ "event_id": event.event_id, "room_id": event.room_id, - "internal_metadata": metadata_json.decode("UTF-8"), - "json": encode_canonical_json(event_dict).decode("UTF-8"), + "internal_metadata": buffer(metadata_json), + "json": buffer(encode_canonical_json(event_dict)), }, - or_replace=True, ) - content = encode_canonical_json( + content = buffer(encode_canonical_json( event.content - ).decode("UTF-8") + )) vals = { "topological_ordering": event.depth, @@ -245,9 +244,6 @@ class EventsStore(SQLBaseStore): "depth": event.depth, } - if stream_ordering is not None: - vals["stream_ordering"] = stream_ordering - unrec = { k: v for k, v in event.get_dict().items() @@ -260,25 +256,22 @@ class EventsStore(SQLBaseStore): ] } - vals["unrecognized_keys"] = encode_canonical_json( + vals["unrecognized_keys"] = buffer(encode_canonical_json( unrec - ).decode("UTF-8") + )) - try: - self._simple_insert_txn( - txn, - "events", - vals, - or_replace=(not outlier), - or_ignore=bool(outlier), - ) - except: - logger.warn( - "Failed to persist, probably duplicate: %s", - event.event_id, - exc_info=True, - ) - raise _RollbackButIsFineException("_persist_event") + sql = ( + "INSERT INTO events" + " (stream_ordering, topological_ordering, event_id, type," + " room_id, content, processed, outlier, depth)" + " VALUES (%s,?,?,?,?,?,?,?,?)" + ) % (stream_ordering,) + + txn.execute( + sql, + (event.depth, event.event_id, event.type, event.room_id, + content, True, outlier, event.depth) + ) if context.rejected: self._store_rejections_txn(txn, event.event_id, context.rejected) @@ -302,15 +295,17 @@ class EventsStore(SQLBaseStore): ) if is_new_state and not context.rejected: - self._simple_insert_txn( + self._simple_upsert_txn( txn, "current_state_events", - { - "event_id": event.event_id, + keyvalues={ "room_id": event.room_id, "type": event.type, "state_key": event.state_key, }, + values={ + "event_id": event.event_id, + } ) for e_id, h in event.prev_state: diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 09d1e63657..d3b9b38664 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -57,16 +57,18 @@ class KeyStore(SQLBaseStore): OpenSSL.crypto.FILETYPE_ASN1, tls_certificate ) fingerprint = hashlib.sha256(tls_certificate_bytes).hexdigest() - return self._simple_insert( + return self._simple_upsert( table="server_tls_certificates", - values={ + keyvalues={ "server_name": server_name, "fingerprint": fingerprint, + }, + values={ "from_server": from_server, "ts_added_ms": time_now_ms, "tls_certificate": buffer(tls_certificate_bytes), }, - or_ignore=True, + desc="store_server_certificate", ) @defer.inlineCallbacks @@ -107,14 +109,16 @@ class KeyStore(SQLBaseStore): ts_now_ms (int): The time now in milliseconds verification_key (VerifyKey): The NACL verify key. """ - return self._simple_insert( + return self._simple_upsert( table="server_signature_keys", - values={ + keyvalues={ "server_name": server_name, "key_id": "%s:%s" % (verify_key.alg, verify_key.version), + }, + values={ "from_server": from_server, "ts_added_ms": time_now_ms, "verify_key": buffer(verify_key.encode()), }, - or_ignore=True, + desc="store_server_verify_key", ) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 87fba55439..22ec94bc16 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -57,6 +57,7 @@ class PresenceStore(SQLBaseStore): values={"observed_user_id": observed_localpart, "observer_user_id": observer_userid}, desc="allow_presence_visible", + or_ignore=True, ) def disallow_presence_visible(self, observed_localpart, observer_userid): diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index a6e52cb248..09778045bf 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore @@ -24,19 +26,25 @@ class ProfileStore(SQLBaseStore): desc="create_profile", ) + @defer.inlineCallbacks def get_profile_displayname(self, user_localpart): - return self._simple_select_one_onecol( + name = yield self._simple_select_one_onecol( table="profiles", keyvalues={"user_id": user_localpart}, retcol="displayname", desc="get_profile_displayname", ) + if name: + name = name.decode("utf8") + + defer.returnValue(name) + def set_profile_displayname(self, user_localpart, new_displayname): return self._simple_update_one( table="profiles", keyvalues={"user_id": user_localpart}, - updatevalues={"displayname": new_displayname}, + updatevalues={"displayname": new_displayname.encode("utf8")}, desc="set_profile_displayname", ) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index c47bdc2861..ee7718d5ed 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -154,7 +154,7 @@ class PushRuleStore(SQLBaseStore): txn.execute(sql, (user_name, priority_class, new_rule_priority)) # now insert the new rule - sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" (" + sql = "INSERT INTO "+PushRuleTable.table_name+" (" sql += ",".join(new_rule.keys())+") VALUES (" sql += ", ".join(["?" for _ in new_rule.keys()])+")" @@ -183,7 +183,7 @@ class PushRuleStore(SQLBaseStore): new_rule['priority_class'] = priority_class new_rule['priority'] = new_prio - sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" (" + sql = "INSERT INTO "+PushRuleTable.table_name+" (" sql += ",".join(new_rule.keys())+") VALUES (" sql += ", ".join(["?" for _ in new_rule.keys()])+")" diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index f24154f146..0f9d898e5d 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -39,17 +39,13 @@ class RegistrationStore(SQLBaseStore): Raises: StoreError if there was a problem adding this. """ - row = yield self._simple_select_one( - "users", {"name": user_id}, ["id"], - desc="add_access_token_to_user", - ) - if not row: - raise StoreError(400, "Bad user ID supplied.") - row_id = row["id"] - yield self._simple_insert( + next_id = yield self._access_tokens_id_gen.get_next() + + self._simple_insert( "access_tokens", { - "user_id": row_id, + "id": next_id, + "user_id": user_id, "token": token }, desc="add_access_token_to_user", @@ -74,6 +70,8 @@ class RegistrationStore(SQLBaseStore): def _register(self, txn, user_id, token, password_hash): now = int(self.clock.time()) + next_id = self._access_tokens_id_gen.get_next_txn(txn) + try: txn.execute("INSERT INTO users(name, password_hash, creation_ts) " "VALUES (?,?,?)", @@ -85,16 +83,28 @@ class RegistrationStore(SQLBaseStore): # it's possible for this to get a conflict, but only for a single user # since tokens are namespaced based on their user ID - txn.execute("INSERT INTO access_tokens(user_id, token) " + - "VALUES (?,?)", [txn.lastrowid, token]) + txn.execute( + "INSERT INTO access_tokens(id, user_id, token)" + " VALUES (?,?,?)", + (next_id, user_id, token,) + ) + @defer.inlineCallbacks def get_user_by_id(self, user_id): - query = ("SELECT users.name, users.password_hash FROM users" - " WHERE users.name = ?") - return self._execute( - "get_user_by_id", self.cursor_to_dict, query, user_id + user_info = yield self._simple_select_one( + table="users", + keyvalues={ + "name": user_id, + }, + retcols=["name", "password_hash"], + allow_none=True, ) + if user_info: + user_info["password_hash"] = user_info["password_hash"].decode("utf8") + + defer.returnValue(user_info) + @cached() # TODO(paul): Currently there's no code to invalidate this cache. That # means if/when we ever add internal ways to invalidate access tokens or @@ -134,12 +144,12 @@ class RegistrationStore(SQLBaseStore): "SELECT users.name, users.admin," " access_tokens.device_id, access_tokens.id as token_id" " FROM users" - " INNER JOIN access_tokens on users.id = access_tokens.user_id" + " INNER JOIN access_tokens on users.name = access_tokens.user_id" " WHERE token = ?" ) - cursor = txn.execute(sql, (token,)) - rows = self.cursor_to_dict(cursor) + txn.execute(sql, (token,)) + rows = self.cursor_to_dict(txn) if rows: return rows[0] diff --git a/synapse/storage/room.py b/synapse/storage/room.py index be3e28c2ea..a1a76280fe 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -72,6 +72,7 @@ class RoomStore(SQLBaseStore): keyvalues={"room_id": room_id}, retcols=RoomsTable.fields, desc="get_room", + allow_none=True, ) @defer.inlineCallbacks @@ -102,10 +103,10 @@ class RoomStore(SQLBaseStore): "ON c.event_id = room_names.event_id " ) - # We use non printing ascii character US () as a seperator + # We use non printing ascii character US (\x1F) as a separator sql = ( "SELECT r.room_id, n.name, t.topic, " - "group_concat(a.room_alias, '') " + "group_concat(a.room_alias, '\x1F') " "FROM rooms AS r " "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id " "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id " @@ -117,9 +118,9 @@ class RoomStore(SQLBaseStore): "name": name_subquery, } - c = txn.execute(sql, (is_public,)) + txn.execute(sql, (is_public,)) - return c.fetchall() + return txn.fetchall() rows = yield self.runInteraction( "get_rooms", f @@ -130,7 +131,7 @@ class RoomStore(SQLBaseStore): "room_id": r[0], "name": r[1], "topic": r[2], - "aliases": r[3].split(""), + "aliases": r[3].split("\x1F"), } for r in rows ] diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 52c37c76f5..8ea5756d61 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -40,7 +40,6 @@ class RoomMemberStore(SQLBaseStore): """ try: target_user_id = event.state_key - domain = UserID.from_string(target_user_id).domain except: logger.exception( "Failed to parse target_user_id=%s", target_user_id @@ -65,42 +64,8 @@ class RoomMemberStore(SQLBaseStore): } ) - # Update room hosts table - if event.membership == Membership.JOIN: - sql = ( - "INSERT OR IGNORE INTO room_hosts (room_id, host) " - "VALUES (?, ?)" - ) - txn.execute(sql, (event.room_id, domain)) - elif event.membership != Membership.INVITE: - # Check if this was the last person to have left. - member_events = self._get_members_query_txn( - txn, - where_clause=("c.room_id = ? AND m.membership = ?" - " AND m.user_id != ?"), - where_values=(event.room_id, Membership.JOIN, target_user_id,) - ) - - joined_domains = set() - for e in member_events: - try: - joined_domains.add( - UserID.from_string(e.state_key).domain - ) - except: - # FIXME: How do we deal with invalid user ids in the db? - logger.exception("Invalid user_id: %s", event.state_key) - - if domain not in joined_domains: - sql = ( - "DELETE FROM room_hosts WHERE room_id = ? AND host = ?" - ) - - txn.execute(sql, (event.room_id, domain)) - self.get_rooms_for_user.invalidate(target_user_id) - @defer.inlineCallbacks def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -110,41 +75,27 @@ class RoomMemberStore(SQLBaseStore): Returns: Deferred: Results in a MembershipEvent or None. """ - rows = yield self._get_members_by_dict({ - "e.room_id": room_id, - "m.user_id": user_id, - }) + def f(txn): + events = self._get_members_events_txn( + txn, + room_id, + user_id=user_id, + ) - defer.returnValue(rows[0] if rows else None) + return events[0] if events else None - def _get_room_member(self, txn, user_id, room_id): - sql = ( - "SELECT e.* FROM events as e" - " INNER JOIN room_memberships as m" - " ON e.event_id = m.event_id" - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id" - " WHERE m.user_id = ? and e.room_id = ?" - " LIMIT 1" - ) - txn.execute(sql, (user_id, room_id)) - rows = self.cursor_to_dict(txn) - if rows: - return self._parse_events_txn(txn, rows)[0] - else: - return None + return self.runInteraction("get_room_member", f) def get_users_in_room(self, room_id): def f(txn): - sql = ( - "SELECT m.user_id FROM room_memberships as m" - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id" - " WHERE m.membership = ? AND m.room_id = ?" + + rows = self._get_members_rows_txn( + txn, + room_id=room_id, + membership=Membership.JOIN, ) - txn.execute(sql, (Membership.JOIN, room_id)) - return [r[0] for r in txn.fetchall()] + return [r["user_id"] for r in rows] return self.runInteraction("get_users_in_room", f) def get_room_members(self, room_id, membership=None): @@ -159,11 +110,14 @@ class RoomMemberStore(SQLBaseStore): list of namedtuples representing the members in this room. """ - where = {"m.room_id": room_id} - if membership: - where["m.membership"] = membership + def f(txn): + return self._get_members_events_txn( + txn, + room_id, + membership=membership, + ) - return self._get_members_by_dict(where) + return self.runInteraction("get_room_members", f) def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user @@ -209,32 +163,55 @@ class RoomMemberStore(SQLBaseStore): ] def get_joined_hosts_for_room(self, room_id): - return self._simple_select_onecol( - "room_hosts", - {"room_id": room_id}, - "host", - desc="get_joined_hosts_for_room", + return self.runInteraction( + "get_joined_hosts_for_room", + self._get_joined_hosts_for_room_txn, + room_id, + ) + + def _get_joined_hosts_for_room_txn(self, txn, room_id): + rows = self._get_members_rows_txn( + txn, + room_id, membership=Membership.JOIN + ) + + joined_domains = set( + UserID.from_string(r["user_id"]).domain + for r in rows ) - def _get_members_by_dict(self, where_dict): - clause = " AND ".join("%s = ?" % k for k in where_dict.keys()) - vals = where_dict.values() - return self._get_members_query(clause, vals) + return joined_domains def _get_members_query(self, where_clause, where_values): return self.runInteraction( - "get_members_query", self._get_members_query_txn, + "get_members_query", self._get_members_events_txn, where_clause, where_values ) - def _get_members_query_txn(self, txn, where_clause, where_values): + def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None): + rows = self._get_members_rows_txn( + txn, + room_id, membership, user_id, + ) + return self._get_events_txn(txn, [r["event_id"] for r in rows]) + + def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None): + where_clause = "c.room_id = ?" + where_values = [room_id] + + if membership: + where_clause += " AND m.membership = ?" + where_values.append(membership) + + if user_id: + where_clause += " AND m.user_id = ?" + where_values.append(user_id) + sql = ( - "SELECT e.* FROM events as e " - "INNER JOIN room_memberships as m " - "ON e.event_id = m.event_id " - "INNER JOIN current_state_events as c " - "ON m.event_id = c.event_id " - "WHERE %(where)s " + "SELECT m.* FROM room_memberships as m" + " INNER JOIN current_state_events as c" + " ON m.event_id = c.event_id" + " WHERE %(where)s" ) % { "where": where_clause, } @@ -242,8 +219,7 @@ class RoomMemberStore(SQLBaseStore): txn.execute(sql, where_values) rows = self.cursor_to_dict(txn) - results = self._parse_events_txn(txn, rows) - return results + return rows @cached() def get_rooms_for_user(self, user_id): diff --git a/synapse/storage/schema/delta/12/v12.sql b/synapse/storage/schema/delta/12/v12.sql index b87ef1fe79..717d289f78 100644 --- a/synapse/storage/schema/delta/12/v12.sql +++ b/synapse/storage/schema/delta/12/v12.sql @@ -14,54 +14,50 @@ */ CREATE TABLE IF NOT EXISTS rejections( - event_id TEXT NOT NULL, - reason TEXT NOT NULL, - last_check TEXT NOT NULL, - CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE -); + event_id VARCHAR(150) NOT NULL, + reason VARCHAR(150) NOT NULL, + last_check VARCHAR(150) NOT NULL, + UNIQUE (event_id) +) ; -- Push notification endpoints that users have configured CREATE TABLE IF NOT EXISTS pushers ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_name TEXT NOT NULL, - profile_tag varchar(32) NOT NULL, - kind varchar(8) NOT NULL, - app_id varchar(64) NOT NULL, - app_display_name varchar(64) NOT NULL, - device_display_name varchar(128) NOT NULL, - pushkey blob NOT NULL, + id BIGINT PRIMARY KEY, + user_name VARCHAR(150) NOT NULL, + profile_tag VARCHAR(32) NOT NULL, + kind VARCHAR(8) NOT NULL, + app_id VARCHAR(64) NOT NULL, + app_display_name VARCHAR(64) NOT NULL, + device_display_name VARCHAR(128) NOT NULL, + pushkey VARBINARY(512) NOT NULL, ts BIGINT NOT NULL, - lang varchar(8), - data blob, + lang VARCHAR(8), + data BLOB, last_token TEXT, last_success BIGINT, failing_since BIGINT, - FOREIGN KEY(user_name) REFERENCES users(name), UNIQUE (app_id, pushkey) -); +) ; CREATE TABLE IF NOT EXISTS push_rules ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_name TEXT NOT NULL, - rule_id TEXT NOT NULL, + id BIGINT PRIMARY KEY, + user_name VARCHAR(150) NOT NULL, + rule_id VARCHAR(150) NOT NULL, priority_class TINYINT NOT NULL, priority INTEGER NOT NULL DEFAULT 0, - conditions TEXT NOT NULL, - actions TEXT NOT NULL, + conditions VARCHAR(150) NOT NULL, + actions VARCHAR(150) NOT NULL, UNIQUE(user_name, rule_id) -); +) ; CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name); CREATE TABLE IF NOT EXISTS user_filters( - user_id TEXT, - filter_id INTEGER, - filter_json TEXT, - FOREIGN KEY(user_id) REFERENCES users(id) -); + user_id VARCHAR(150), + filter_id BIGINT, + filter_json BLOB +) ; CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters( - user_id, filter_id + user_id, filter_id ); - -PRAGMA user_version = 12; diff --git a/synapse/storage/schema/delta/13/v13.sql b/synapse/storage/schema/delta/13/v13.sql index e491ad5aec..f5275a59b6 100644 --- a/synapse/storage/schema/delta/13/v13.sql +++ b/synapse/storage/schema/delta/13/v13.sql @@ -14,21 +14,18 @@ */ CREATE TABLE IF NOT EXISTS application_services( - id INTEGER PRIMARY KEY AUTOINCREMENT, - url TEXT, - token TEXT, - hs_token TEXT, - sender TEXT, - UNIQUE(token) ON CONFLICT ROLLBACK -); + id BIGINT PRIMARY KEY, + url VARCHAR(150), + token VARCHAR(150), + hs_token VARCHAR(150), + sender VARCHAR(150), + UNIQUE(token) +) ; CREATE TABLE IF NOT EXISTS application_services_regex( - id INTEGER PRIMARY KEY AUTOINCREMENT, - as_id INTEGER NOT NULL, + id BIGINT PRIMARY KEY, + as_id BIGINT NOT NULL, namespace INTEGER, /* enum[room_id|room_alias|user_id] */ - regex TEXT, + regex VARCHAR(150), FOREIGN KEY(as_id) REFERENCES application_services(id) -); - - - +) ; diff --git a/synapse/storage/schema/delta/14/v14.sql b/synapse/storage/schema/delta/14/v14.sql index 0212726448..1d582cc626 100644 --- a/synapse/storage/schema/delta/14/v14.sql +++ b/synapse/storage/schema/delta/14/v14.sql @@ -1,9 +1,9 @@ CREATE TABLE IF NOT EXISTS push_rules_enable ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_name TEXT NOT NULL, - rule_id TEXT NOT NULL, + id BIGINT PRIMARY KEY, + user_name VARCHAR(150) NOT NULL, + rule_id VARCHAR(150) NOT NULL, enabled TINYINT, UNIQUE(user_name, rule_id) -); +) ; CREATE INDEX IF NOT EXISTS push_rules_enable_user_name on push_rules_enable (user_name); diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql index 2b27e2a429..2f4c3eae5f 100644 --- a/synapse/storage/schema/delta/15/appservice_txns.sql +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -14,17 +14,18 @@ */ CREATE TABLE IF NOT EXISTS application_services_state( - as_id TEXT PRIMARY KEY, - state TEXT, - last_txn TEXT + as_id VARCHAR(150) PRIMARY KEY, + state VARCHAR(5), + last_txn INTEGER ); CREATE TABLE IF NOT EXISTS application_services_txns( - as_id TEXT NOT NULL, + as_id VARCHAR(150) NOT NULL, txn_id INTEGER NOT NULL, - event_ids TEXT NOT NULL, - UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK + event_ids BLOB NOT NULL, + UNIQUE(as_id, txn_id) ); - - +CREATE INDEX IF NOT EXISTS application_services_txns_id ON application_services_txns ( + as_id +); diff --git a/synapse/storage/schema/delta/15/presence_indices.sql b/synapse/storage/schema/delta/15/presence_indices.sql new file mode 100644 index 0000000000..6b8d0f1ca7 --- /dev/null +++ b/synapse/storage/schema/delta/15/presence_indices.sql @@ -0,0 +1,2 @@ + +CREATE INDEX IF NOT EXISTS presence_list_user_id ON presence_list (user_id); diff --git a/synapse/storage/schema/full_schemas/11/event_edges.sql b/synapse/storage/schema/full_schemas/11/event_edges.sql index 1e766d6db2..124c9a9bdf 100644 --- a/synapse/storage/schema/full_schemas/11/event_edges.sql +++ b/synapse/storage/schema/full_schemas/11/event_edges.sql @@ -14,63 +14,63 @@ */ CREATE TABLE IF NOT EXISTS event_forward_extremities( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE -); + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + UNIQUE (event_id, room_id) +) ; CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id); CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_backward_extremities( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE -); + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + UNIQUE (event_id, room_id) +) ; CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id); CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_edges( - event_id TEXT NOT NULL, - prev_event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - is_state INTEGER NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id, is_state) -); + event_id VARCHAR(150) NOT NULL, + prev_event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + is_state BOOL NOT NULL, + UNIQUE (event_id, prev_event_id, room_id, is_state) +) ; CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id); CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id); CREATE TABLE IF NOT EXISTS room_depth( - room_id TEXT NOT NULL, + room_id VARCHAR(150) NOT NULL, min_depth INTEGER NOT NULL, - CONSTRAINT uniqueness UNIQUE (room_id) -); + UNIQUE (room_id) +) ; CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id); create TABLE IF NOT EXISTS event_destinations( - event_id TEXT NOT NULL, - destination TEXT NOT NULL, - delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered - CONSTRAINT uniqueness UNIQUE (event_id, destination) ON CONFLICT REPLACE -); + event_id VARCHAR(150) NOT NULL, + destination VARCHAR(150) NOT NULL, + delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered + UNIQUE (event_id, destination) +) ; CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id); CREATE TABLE IF NOT EXISTS state_forward_extremities( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - type TEXT NOT NULL, - state_key TEXT NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE -); + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + type VARCHAR(150) NOT NULL, + state_key VARCHAR(150) NOT NULL, + UNIQUE (event_id, room_id) +) ; CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities( room_id, type, state_key @@ -79,11 +79,11 @@ CREATE INDEX IF NOT EXISTS st_extrem_id ON state_forward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_auth( - event_id TEXT NOT NULL, - auth_id TEXT NOT NULL, - room_id TEXT NOT NULL, - CONSTRAINT uniqueness UNIQUE (event_id, auth_id, room_id) -); + event_id VARCHAR(150) NOT NULL, + auth_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + UNIQUE (event_id, auth_id, room_id) +) ; CREATE INDEX IF NOT EXISTS evauth_edges_id ON event_auth(event_id); -CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id); \ No newline at end of file +CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id); diff --git a/synapse/storage/schema/full_schemas/11/event_signatures.sql b/synapse/storage/schema/full_schemas/11/event_signatures.sql index c28c39c48a..30e3f71c5f 100644 --- a/synapse/storage/schema/full_schemas/11/event_signatures.sql +++ b/synapse/storage/schema/full_schemas/11/event_signatures.sql @@ -14,52 +14,42 @@ */ CREATE TABLE IF NOT EXISTS event_content_hashes ( - event_id TEXT, - algorithm TEXT, + event_id VARCHAR(150), + algorithm VARCHAR(150), hash BLOB, - CONSTRAINT uniqueness UNIQUE (event_id, algorithm) -); + UNIQUE (event_id, algorithm) +) ; -CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes( - event_id -); +CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes(event_id); CREATE TABLE IF NOT EXISTS event_reference_hashes ( - event_id TEXT, - algorithm TEXT, + event_id VARCHAR(150), + algorithm VARCHAR(150), hash BLOB, - CONSTRAINT uniqueness UNIQUE (event_id, algorithm) -); + UNIQUE (event_id, algorithm) +) ; -CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes ( - event_id -); +CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes(event_id); CREATE TABLE IF NOT EXISTS event_signatures ( - event_id TEXT, - signature_name TEXT, - key_id TEXT, + event_id VARCHAR(150), + signature_name VARCHAR(150), + key_id VARCHAR(150), signature BLOB, - CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id) -); + UNIQUE (event_id, signature_name, key_id) +) ; -CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures ( - event_id -); +CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures(event_id); CREATE TABLE IF NOT EXISTS event_edge_hashes( - event_id TEXT, - prev_event_id TEXT, - algorithm TEXT, + event_id VARCHAR(150), + prev_event_id VARCHAR(150), + algorithm VARCHAR(150), hash BLOB, - CONSTRAINT uniqueness UNIQUE ( - event_id, prev_event_id, algorithm - ) -); + UNIQUE (event_id, prev_event_id, algorithm) +) ; -CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes( - event_id -); +CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes(event_id); diff --git a/synapse/storage/schema/full_schemas/11/im.sql b/synapse/storage/schema/full_schemas/11/im.sql index dd00c1cd2f..7cb8f802e1 100644 --- a/synapse/storage/schema/full_schemas/11/im.sql +++ b/synapse/storage/schema/full_schemas/11/im.sql @@ -14,112 +14,111 @@ */ CREATE TABLE IF NOT EXISTS events( - stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT, - topological_ordering INTEGER NOT NULL, - event_id TEXT NOT NULL, - type TEXT NOT NULL, - room_id TEXT NOT NULL, - content TEXT NOT NULL, - unrecognized_keys TEXT, + stream_ordering BIGINT PRIMARY KEY, + topological_ordering BIGINT NOT NULL, + event_id VARCHAR(150) NOT NULL, + type VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + content BLOB NOT NULL, + unrecognized_keys BLOB, processed BOOL NOT NULL, outlier BOOL NOT NULL, - depth INTEGER DEFAULT 0 NOT NULL, - CONSTRAINT ev_uniq UNIQUE (event_id) -); + depth BIGINT DEFAULT 0 NOT NULL, + UNIQUE (event_id) +) ; -CREATE INDEX IF NOT EXISTS events_event_id ON events (event_id); CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering); CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering); CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id); CREATE TABLE IF NOT EXISTS event_json( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - internal_metadata NOT NULL, + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + internal_metadata BLOB NOT NULL, json BLOB NOT NULL, - CONSTRAINT ev_j_uniq UNIQUE (event_id) -); + UNIQUE (event_id) +) ; -CREATE INDEX IF NOT EXISTS event_json_id ON event_json(event_id); CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id); CREATE TABLE IF NOT EXISTS state_events( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - type TEXT NOT NULL, - state_key TEXT NOT NULL, - prev_state TEXT -); + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + type VARCHAR(150) NOT NULL, + state_key VARCHAR(150) NOT NULL, + prev_state VARCHAR(150), + UNIQUE (event_id) +) ; -CREATE UNIQUE INDEX IF NOT EXISTS state_events_event_id ON state_events (event_id); CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id); CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type); CREATE INDEX IF NOT EXISTS state_events_state_key ON state_events (state_key); CREATE TABLE IF NOT EXISTS current_state_events( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - type TEXT NOT NULL, - state_key TEXT NOT NULL, - CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE -); + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + type VARCHAR(150) NOT NULL, + state_key VARCHAR(150) NOT NULL, + UNIQUE (event_id), + UNIQUE (room_id, type, state_key) +) ; -CREATE INDEX IF NOT EXISTS curr_events_event_id ON current_state_events (event_id); CREATE INDEX IF NOT EXISTS current_state_events_room_id ON current_state_events (room_id); CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (type); CREATE INDEX IF NOT EXISTS current_state_events_state_key ON current_state_events (state_key); CREATE TABLE IF NOT EXISTS room_memberships( - event_id TEXT NOT NULL, - user_id TEXT NOT NULL, - sender TEXT NOT NULL, - room_id TEXT NOT NULL, - membership TEXT NOT NULL -); + event_id VARCHAR(150) NOT NULL, + user_id VARCHAR(150) NOT NULL, + sender VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + membership VARCHAR(150) NOT NULL, + UNIQUE (event_id) +) ; -CREATE INDEX IF NOT EXISTS room_memberships_event_id ON room_memberships (event_id); CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id); CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id); CREATE TABLE IF NOT EXISTS feedback( - event_id TEXT NOT NULL, - feedback_type TEXT, - target_event_id TEXT, - sender TEXT, - room_id TEXT -); + event_id VARCHAR(150) NOT NULL, + feedback_type VARCHAR(150), + target_event_id VARCHAR(150), + sender VARCHAR(150), + room_id VARCHAR(150), + UNIQUE (event_id) +) ; CREATE TABLE IF NOT EXISTS topics( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - topic TEXT NOT NULL -); + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + topic VARCHAR(150) NOT NULL, + UNIQUE (event_id) +) ; -CREATE INDEX IF NOT EXISTS topics_event_id ON topics(event_id); CREATE INDEX IF NOT EXISTS topics_room_id ON topics(room_id); CREATE TABLE IF NOT EXISTS room_names( - event_id TEXT NOT NULL, - room_id TEXT NOT NULL, - name TEXT NOT NULL + event_id VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + name VARCHAR(150) NOT NULL, + UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS room_names_event_id ON room_names(event_id); CREATE INDEX IF NOT EXISTS room_names_room_id ON room_names(room_id); CREATE TABLE IF NOT EXISTS rooms( - room_id TEXT PRIMARY KEY NOT NULL, - is_public INTEGER, - creator TEXT -); + room_id VARCHAR(150) PRIMARY KEY NOT NULL, + is_public BOOL, + creator VARCHAR(150) +) ; CREATE TABLE IF NOT EXISTS room_hosts( - room_id TEXT NOT NULL, - host TEXT NOT NULL, - CONSTRAINT room_hosts_uniq UNIQUE (room_id, host) ON CONFLICT IGNORE -); + room_id VARCHAR(150) NOT NULL, + host VARCHAR(150) NOT NULL, + UNIQUE (room_id, host) +) ; CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id); diff --git a/synapse/storage/schema/full_schemas/11/keys.sql b/synapse/storage/schema/full_schemas/11/keys.sql index a9e0a4fe0d..062ca53fef 100644 --- a/synapse/storage/schema/full_schemas/11/keys.sql +++ b/synapse/storage/schema/full_schemas/11/keys.sql @@ -13,19 +13,19 @@ * limitations under the License. */ CREATE TABLE IF NOT EXISTS server_tls_certificates( - server_name TEXT, -- Server name. - fingerprint TEXT, -- Certificate fingerprint. - from_server TEXT, -- Which key server the certificate was fetched from. - ts_added_ms INTEGER, -- When the certifcate was added. + server_name VARCHAR(150), -- Server name. + fingerprint VARCHAR(150), -- Certificate fingerprint. + from_server VARCHAR(150), -- Which key server the certificate was fetched from. + ts_added_ms BIGINT, -- When the certifcate was added. tls_certificate BLOB, -- DER encoded x509 certificate. - CONSTRAINT uniqueness UNIQUE (server_name, fingerprint) -); + UNIQUE (server_name, fingerprint) +) ; CREATE TABLE IF NOT EXISTS server_signature_keys( - server_name TEXT, -- Server name. - key_id TEXT, -- Key version. - from_server TEXT, -- Which key server the key was fetched form. - ts_added_ms INTEGER, -- When the key was added. + server_name VARCHAR(150), -- Server name. + key_id VARCHAR(150), -- Key version. + from_server VARCHAR(150), -- Which key server the key was fetched form. + ts_added_ms BIGINT, -- When the key was added. verify_key BLOB, -- NACL verification key. - CONSTRAINT uniqueness UNIQUE (server_name, key_id) -); + UNIQUE (server_name, key_id) +) ; diff --git a/synapse/storage/schema/full_schemas/11/media_repository.sql b/synapse/storage/schema/full_schemas/11/media_repository.sql index afdf48cbfb..c8c5f1d2f0 100644 --- a/synapse/storage/schema/full_schemas/11/media_repository.sql +++ b/synapse/storage/schema/full_schemas/11/media_repository.sql @@ -14,55 +14,55 @@ */ CREATE TABLE IF NOT EXISTS local_media_repository ( - media_id TEXT, -- The id used to refer to the media. - media_type TEXT, -- The MIME-type of the media. + media_id VARCHAR(150), -- The id used to refer to the media. + media_type VARCHAR(150), -- The MIME-type of the media. media_length INTEGER, -- Length of the media in bytes. - created_ts INTEGER, -- When the content was uploaded in ms. - upload_name TEXT, -- The name the media was uploaded with. - user_id TEXT, -- The user who uploaded the file. - CONSTRAINT uniqueness UNIQUE (media_id) -); + created_ts BIGINT, -- When the content was uploaded in ms. + upload_name VARCHAR(150), -- The name the media was uploaded with. + user_id VARCHAR(150), -- The user who uploaded the file. + UNIQUE (media_id) +) ; CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails ( - media_id TEXT, -- The id used to refer to the media. + media_id VARCHAR(150), -- The id used to refer to the media. thumbnail_width INTEGER, -- The width of the thumbnail in pixels. thumbnail_height INTEGER, -- The height of the thumbnail in pixels. - thumbnail_type TEXT, -- The MIME-type of the thumbnail. - thumbnail_method TEXT, -- The method used to make the thumbnail. + thumbnail_type VARCHAR(150), -- The MIME-type of the thumbnail. + thumbnail_method VARCHAR(150), -- The method used to make the thumbnail. thumbnail_length INTEGER, -- The length of the thumbnail in bytes. - CONSTRAINT uniqueness UNIQUE ( + UNIQUE ( media_id, thumbnail_width, thumbnail_height, thumbnail_type ) -); +) ; CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id ON local_media_repository_thumbnails (media_id); CREATE TABLE IF NOT EXISTS remote_media_cache ( - media_origin TEXT, -- The remote HS the media came from. - media_id TEXT, -- The id used to refer to the media on that server. - media_type TEXT, -- The MIME-type of the media. - created_ts INTEGER, -- When the content was uploaded in ms. - upload_name TEXT, -- The name the media was uploaded with. + media_origin VARCHAR(150), -- The remote HS the media came from. + media_id VARCHAR(150), -- The id used to refer to the media on that server. + media_type VARCHAR(150), -- The MIME-type of the media. + created_ts BIGINT, -- When the content was uploaded in ms. + upload_name VARCHAR(150), -- The name the media was uploaded with. media_length INTEGER, -- Length of the media in bytes. - filesystem_id TEXT, -- The name used to store the media on disk. - CONSTRAINT uniqueness UNIQUE (media_origin, media_id) -); + filesystem_id VARCHAR(150), -- The name used to store the media on disk. + UNIQUE (media_origin, media_id) +) ; CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails ( - media_origin TEXT, -- The remote HS the media came from. - media_id TEXT, -- The id used to refer to the media. + media_origin VARCHAR(150), -- The remote HS the media came from. + media_id VARCHAR(150), -- The id used to refer to the media. thumbnail_width INTEGER, -- The width of the thumbnail in pixels. thumbnail_height INTEGER, -- The height of the thumbnail in pixels. - thumbnail_method TEXT, -- The method used to make the thumbnail - thumbnail_type TEXT, -- The MIME-type of the thumbnail. + thumbnail_method VARCHAR(150), -- The method used to make the thumbnail + thumbnail_type VARCHAR(150), -- The MIME-type of the thumbnail. thumbnail_length INTEGER, -- The length of the thumbnail in bytes. - filesystem_id TEXT, -- The name used to store the media on disk. - CONSTRAINT uniqueness UNIQUE ( + filesystem_id VARCHAR(150), -- The name used to store the media on disk. + UNIQUE ( media_origin, media_id, thumbnail_width, thumbnail_height, - thumbnail_type, thumbnail_type - ) -); + thumbnail_type + ) +) ; CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id - ON local_media_repository_thumbnails (media_id); + ON remote_media_cache_thumbnails (media_id); diff --git a/synapse/storage/schema/full_schemas/11/presence.sql b/synapse/storage/schema/full_schemas/11/presence.sql index f9f8db9697..00d803a5cd 100644 --- a/synapse/storage/schema/full_schemas/11/presence.sql +++ b/synapse/storage/schema/full_schemas/11/presence.sql @@ -13,26 +13,26 @@ * limitations under the License. */ CREATE TABLE IF NOT EXISTS presence( - user_id INTEGER NOT NULL, - state INTEGER, - status_msg TEXT, - mtime INTEGER, -- miliseconds since last state change - FOREIGN KEY(user_id) REFERENCES users(id) -); + user_id VARCHAR(150) NOT NULL, + state VARCHAR(20), + status_msg VARCHAR(150), + mtime BIGINT, -- miliseconds since last state change + UNIQUE (user_id) +) ; -- For each of /my/ users which possibly-remote users are allowed to see their -- presence state CREATE TABLE IF NOT EXISTS presence_allow_inbound( - observed_user_id INTEGER NOT NULL, - observer_user_id TEXT, -- a UserID, - FOREIGN KEY(observed_user_id) REFERENCES users(id) -); + observed_user_id VARCHAR(150) NOT NULL, + observer_user_id VARCHAR(150) NOT NULL, -- a UserID, + UNIQUE (observed_user_id, observer_user_id) +) ; -- For each of /my/ users (watcher), which possibly-remote users are they -- watching? CREATE TABLE IF NOT EXISTS presence_list( - user_id INTEGER NOT NULL, - observed_user_id TEXT, -- a UserID, - accepted BOOLEAN, - FOREIGN KEY(user_id) REFERENCES users(id) -); + user_id VARCHAR(150) NOT NULL, + observed_user_id VARCHAR(150) NOT NULL, -- a UserID, + accepted BOOLEAN NOT NULL, + UNIQUE (user_id, observed_user_id) +) ; diff --git a/synapse/storage/schema/full_schemas/11/profiles.sql b/synapse/storage/schema/full_schemas/11/profiles.sql index f06a528b4d..023060a54c 100644 --- a/synapse/storage/schema/full_schemas/11/profiles.sql +++ b/synapse/storage/schema/full_schemas/11/profiles.sql @@ -13,8 +13,8 @@ * limitations under the License. */ CREATE TABLE IF NOT EXISTS profiles( - user_id INTEGER NOT NULL, - displayname TEXT, - avatar_url TEXT, - FOREIGN KEY(user_id) REFERENCES users(id) -); + user_id VARCHAR(150) NOT NULL, + displayname VARCHAR(150), + avatar_url VARCHAR(150), + UNIQUE(user_id) +) ; diff --git a/synapse/storage/schema/full_schemas/11/redactions.sql b/synapse/storage/schema/full_schemas/11/redactions.sql index 5011d95db8..5c23188d62 100644 --- a/synapse/storage/schema/full_schemas/11/redactions.sql +++ b/synapse/storage/schema/full_schemas/11/redactions.sql @@ -13,10 +13,10 @@ * limitations under the License. */ CREATE TABLE IF NOT EXISTS redactions ( - event_id TEXT NOT NULL, - redacts TEXT NOT NULL, - CONSTRAINT ev_uniq UNIQUE (event_id) -); + event_id VARCHAR(150) NOT NULL, + redacts VARCHAR(150) NOT NULL, + UNIQUE (event_id) +) ; CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id); CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts); diff --git a/synapse/storage/schema/full_schemas/11/room_aliases.sql b/synapse/storage/schema/full_schemas/11/room_aliases.sql index 0d2df01603..63fe0f5c64 100644 --- a/synapse/storage/schema/full_schemas/11/room_aliases.sql +++ b/synapse/storage/schema/full_schemas/11/room_aliases.sql @@ -14,14 +14,12 @@ */ CREATE TABLE IF NOT EXISTS room_aliases( - room_alias TEXT NOT NULL, - room_id TEXT NOT NULL -); + room_alias VARCHAR(150) NOT NULL, + room_id VARCHAR(150) NOT NULL, + UNIQUE (room_alias) +) ; CREATE TABLE IF NOT EXISTS room_alias_servers( - room_alias TEXT NOT NULL, - server TEXT NOT NULL -); - - - + room_alias VARCHAR(150) NOT NULL, + server VARCHAR(150) NOT NULL +) ; diff --git a/synapse/storage/schema/full_schemas/11/state.sql b/synapse/storage/schema/full_schemas/11/state.sql index 1fe8f1e430..acfb76439b 100644 --- a/synapse/storage/schema/full_schemas/11/state.sql +++ b/synapse/storage/schema/full_schemas/11/state.sql @@ -14,34 +14,27 @@ */ CREATE TABLE IF NOT EXISTS state_groups( - id INTEGER PRIMARY KEY, - room_id TEXT NOT NULL, - event_id TEXT NOT NULL -); + id VARCHAR(20) PRIMARY KEY, + room_id VARCHAR(150) NOT NULL, + event_id VARCHAR(150) NOT NULL +) ; CREATE TABLE IF NOT EXISTS state_groups_state( - state_group INTEGER NOT NULL, - room_id TEXT NOT NULL, - type TEXT NOT NULL, - state_key TEXT NOT NULL, - event_id TEXT NOT NULL -); + state_group VARCHAR(20) NOT NULL, + room_id VARCHAR(150) NOT NULL, + type VARCHAR(150) NOT NULL, + state_key VARCHAR(150) NOT NULL, + event_id VARCHAR(150) NOT NULL +) ; CREATE TABLE IF NOT EXISTS event_to_state_groups( - event_id TEXT NOT NULL, - state_group INTEGER NOT NULL, - CONSTRAINT event_to_state_groups_uniq UNIQUE (event_id) -); + event_id VARCHAR(150) NOT NULL, + state_group VARCHAR(150) NOT NULL, + UNIQUE (event_id) +) ; CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id); -CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state( - state_group -); -CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state( - room_id, type, state_key -); - -CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups( - event_id -); \ No newline at end of file +CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state(state_group); +CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state(room_id, type, state_key); +CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups(event_id); \ No newline at end of file diff --git a/synapse/storage/schema/full_schemas/11/transactions.sql b/synapse/storage/schema/full_schemas/11/transactions.sql index 2d30f99b06..43541661ce 100644 --- a/synapse/storage/schema/full_schemas/11/transactions.sql +++ b/synapse/storage/schema/full_schemas/11/transactions.sql @@ -14,34 +14,30 @@ */ -- Stores what transaction ids we have received and what our response was CREATE TABLE IF NOT EXISTS received_transactions( - transaction_id TEXT, - origin TEXT, - ts INTEGER, + transaction_id VARCHAR(150), + origin VARCHAR(150), + ts BIGINT, response_code INTEGER, - response_json TEXT, + response_json BLOB, has_been_referenced BOOL default 0, -- Whether thishas been referenced by a prev_tx - CONSTRAINT uniquesss UNIQUE (transaction_id, origin) ON CONFLICT REPLACE -); + UNIQUE (transaction_id, origin) +) ; -CREATE UNIQUE INDEX IF NOT EXISTS transactions_txid ON received_transactions(transaction_id, origin); CREATE INDEX IF NOT EXISTS transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0; -- Stores what transactions we've sent, what their response was (if we got one) and whether we have -- since referenced the transaction in another outgoing transaction CREATE TABLE IF NOT EXISTS sent_transactions( - id INTEGER PRIMARY KEY AUTOINCREMENT, -- This is used to apply insertion ordering - transaction_id TEXT, - destination TEXT, + id BIGINT PRIMARY KEY, -- This is used to apply insertion ordering + transaction_id VARCHAR(150), + destination VARCHAR(150), response_code INTEGER DEFAULT 0, - response_json TEXT, - ts INTEGER -); + response_json BLOB, + ts BIGINT +) ; CREATE INDEX IF NOT EXISTS sent_transaction_dest ON sent_transactions(destination); -CREATE INDEX IF NOT EXISTS sent_transaction_dest_referenced ON sent_transactions( - destination -); CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id); -- So that we can do an efficient look up of all transactions that have yet to be successfully -- sent. @@ -51,18 +47,17 @@ CREATE INDEX IF NOT EXISTS sent_transaction_sent ON sent_transactions(response_c -- For sent transactions only. CREATE TABLE IF NOT EXISTS transaction_id_to_pdu( transaction_id INTEGER, - destination TEXT, - pdu_id TEXT, - pdu_origin TEXT -); + destination VARCHAR(150), + pdu_id VARCHAR(150), + pdu_origin VARCHAR(150), + UNIQUE (transaction_id, destination) +) ; -CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(transaction_id, destination); CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); -CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination); -- To track destination health CREATE TABLE IF NOT EXISTS destinations( - destination TEXT PRIMARY KEY, - retry_last_ts INTEGER, + destination VARCHAR(150) PRIMARY KEY, + retry_last_ts BIGINT, retry_interval INTEGER -); +) ; diff --git a/synapse/storage/schema/full_schemas/11/users.sql b/synapse/storage/schema/full_schemas/11/users.sql index 08ccfdac0a..ba0f42d455 100644 --- a/synapse/storage/schema/full_schemas/11/users.sql +++ b/synapse/storage/schema/full_schemas/11/users.sql @@ -13,33 +13,30 @@ * limitations under the License. */ CREATE TABLE IF NOT EXISTS users( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT, - password_hash TEXT, - creation_ts INTEGER, + name VARCHAR(150), + password_hash VARCHAR(150), + creation_ts BIGINT, admin BOOL DEFAULT 0 NOT NULL, - UNIQUE(name) ON CONFLICT ROLLBACK -); + UNIQUE(name) +) ; CREATE TABLE IF NOT EXISTS access_tokens( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER NOT NULL, - device_id TEXT, - token TEXT NOT NULL, - last_used INTEGER, - FOREIGN KEY(user_id) REFERENCES users(id), - UNIQUE(token) ON CONFLICT ROLLBACK -); + id BIGINT PRIMARY KEY, + user_id VARCHAR(150) NOT NULL, + device_id VARCHAR(150), + token VARCHAR(150) NOT NULL, + last_used BIGINT, + UNIQUE(token) +) ; CREATE TABLE IF NOT EXISTS user_ips ( - user TEXT NOT NULL, - access_token TEXT NOT NULL, - device_id TEXT, - ip TEXT NOT NULL, - user_agent TEXT NOT NULL, - last_seen INTEGER NOT NULL, - CONSTRAINT user_ip UNIQUE (user, access_token, ip, user_agent) ON CONFLICT REPLACE -); + user VARCHAR(150) NOT NULL, + access_token VARCHAR(150) NOT NULL, + device_id VARCHAR(150), + ip VARCHAR(150) NOT NULL, + user_agent VARCHAR(150) NOT NULL, + last_seen BIGINT NOT NULL +) ; CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user); - +CREATE INDEX IF NOT EXISTS user_ips_user_ip ON user_ips(user, access_token, ip); diff --git a/synapse/storage/schema/schema_version.sql b/synapse/storage/schema/schema_version.sql index 0431e2d051..e7fa6fe569 100644 --- a/synapse/storage/schema/schema_version.sql +++ b/synapse/storage/schema/schema_version.sql @@ -14,17 +14,16 @@ */ CREATE TABLE IF NOT EXISTS schema_version( - Lock char(1) NOT NULL DEFAULT 'X', -- Makes sure this table only has one row. - version INTEGER NOT NULL, - upgraded BOOL NOT NULL, -- Whether we reached this version from an upgrade or an initial schema. - CONSTRAINT schema_version_lock_x CHECK (Lock='X') - CONSTRAINT schema_version_lock_uniq UNIQUE (Lock) + `Lock` CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + `version` INTEGER NOT NULL, + `upgraded` BOOL NOT NULL, -- Whether we reached this version from an upgrade or an initial schema. + CHECK (`Lock`='X') ); CREATE TABLE IF NOT EXISTS applied_schema_deltas( - version INTEGER NOT NULL, - file TEXT NOT NULL, - CONSTRAINT schema_deltas_ver_file UNIQUE (version, file) ON CONFLICT IGNORE + `version` INTEGER NOT NULL, + `file` VARCHAR(150) NOT NULL, + UNIQUE(version, file) ); CREATE INDEX IF NOT EXISTS schema_deltas_ver ON applied_schema_deltas(version); diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index d0d53770f2..f051828630 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -56,7 +56,6 @@ class SignatureStore(SQLBaseStore): "algorithm": algorithm, "hash": buffer(hash_bytes), }, - or_ignore=True, ) def get_event_reference_hashes(self, event_ids): @@ -100,7 +99,7 @@ class SignatureStore(SQLBaseStore): " WHERE event_id = ?" ) txn.execute(query, (event_id, )) - return dict(txn.fetchall()) + return {k: v for k, v in txn.fetchall()} def _store_event_reference_hash_txn(self, txn, event_id, algorithm, hash_bytes): @@ -119,7 +118,6 @@ class SignatureStore(SQLBaseStore): "algorithm": algorithm, "hash": buffer(hash_bytes), }, - or_ignore=True, ) def _get_event_signatures_txn(self, txn, event_id): @@ -164,7 +162,6 @@ class SignatureStore(SQLBaseStore): "key_id": key_id, "signature": buffer(signature_bytes), }, - or_ignore=True, ) def _get_prev_event_hashes_txn(self, txn, event_id): @@ -198,5 +195,4 @@ class SignatureStore(SQLBaseStore): "algorithm": algorithm, "hash": buffer(hash_bytes), }, - or_ignore=True, ) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 58dbf2802b..4994bacd6c 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -17,6 +17,8 @@ from ._base import SQLBaseStore from twisted.internet import defer +from synapse.util.stringutils import random_string + import logging logger = logging.getLogger(__name__) @@ -91,14 +93,15 @@ class StateStore(SQLBaseStore): state_group = context.state_group if not state_group: - state_group = self._simple_insert_txn( + state_group = _make_group_id(self._clock) + self._simple_insert_txn( txn, table="state_groups", values={ + "id": state_group, "room_id": event.room_id, "event_id": event.event_id, }, - or_ignore=True, ) for state in state_events.values(): @@ -112,7 +115,6 @@ class StateStore(SQLBaseStore): "state_key": state.state_key, "event_id": state.event_id, }, - or_ignore=True, ) self._simple_insert_txn( @@ -122,7 +124,6 @@ class StateStore(SQLBaseStore): "state_group": state_group, "event_id": event.event_id, }, - or_replace=True, ) @defer.inlineCallbacks @@ -154,3 +155,7 @@ class StateStore(SQLBaseStore): events = yield self._parse_events(results) defer.returnValue(events) + + +def _make_group_id(clock): + return str(int(clock.time_msec())) + random_string(5) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 66f307e640..e6bb5a8077 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -110,7 +110,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): if self.topological is None: return "(%d < %s)" % (self.stream, "stream_ordering") else: - return "(%d < %s OR (%d == %s AND %d < %s))" % ( + return "(%d < %s OR (%d = %s AND %d < %s))" % ( self.topological, "topological_ordering", self.topological, "topological_ordering", self.stream, "stream_ordering", @@ -120,7 +120,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): if self.topological is None: return "(%d >= %s)" % (self.stream, "stream_ordering") else: - return "(%d > %s OR (%d == %s AND %d >= %s))" % ( + return "(%d > %s OR (%d = %s AND %d >= %s))" % ( self.topological, "topological_ordering", self.topological, "topological_ordering", self.stream, "stream_ordering", @@ -433,12 +433,6 @@ class StreamStore(SQLBaseStore): defer.returnValue(self.min_token) - def get_next_stream_id(self): - with self._next_stream_id_lock: - i = self._next_stream_id - self._next_stream_id += 1 - return i - def _get_room_events_max_id_txn(self, txn): txn.execute( "SELECT MAX(stream_ordering) as m FROM events" diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index b777395e06..4c3dc58662 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, Table, cached +from ._base import SQLBaseStore, cached from collections import namedtuple @@ -84,13 +84,18 @@ class TransactionStore(SQLBaseStore): def _set_received_txn_response(self, txn, transaction_id, origin, code, response_json): - query = ( - "UPDATE %s " - "SET response_code = ?, response_json = ? " - "WHERE transaction_id = ? AND origin = ?" - ) % ReceivedTransactionsTable.table_name - - txn.execute(query, (code, response_json, transaction_id, origin)) + self._simple_upsert_txn( + txn, + table=ReceivedTransactionsTable.table_name, + keyvalues={ + "transaction_id": transaction_id, + "origin": origin, + }, + values={ + "response_code": code, + "response_json": response_json, + } + ) def prep_send_transaction(self, transaction_id, destination, origin_server_ts): @@ -118,41 +123,38 @@ class TransactionStore(SQLBaseStore): def _prep_send_transaction(self, txn, transaction_id, destination, origin_server_ts): + next_id = self._transaction_id_gen.get_next_txn(txn) + # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. - query = "%s ORDER BY id DESC LIMIT 1" % ( - SentTransactions.select_statement("destination = ?"), - ) + query = ( + "SELECT * FROM sent_transactions" + " WHERE destination = ?" + " ORDER BY id DESC LIMIT 1" + ) - results = txn.execute(query, (destination,)) - results = SentTransactions.decode_results(results) + txn.execute(query, (destination,)) + results = self.cursor_to_dict(txn) - prev_txns = [r.transaction_id for r in results] + prev_txns = [r["transaction_id"] for r in results] # Actually add the new transaction to the sent_transactions table. - query = SentTransactions.insert_statement() - txn.execute(query, SentTransactions.EntryType( - None, - transaction_id=transaction_id, - destination=destination, - ts=origin_server_ts, - response_code=0, - response_json=None - )) - - # Update the tx id -> pdu id mapping - - # values = [ - # (transaction_id, destination, pdu[0], pdu[1]) - # for pdu in pdu_list - # ] - # - # logger.debug("Inserting: %s", repr(values)) - # - # query = TransactionsToPduTable.insert_statement() - # txn.executemany(query, values) + self._simple_insert_txn( + txn, + table=SentTransactions.table_name, + values={ + "id": next_id, + "transaction_id": transaction_id, + "destination": destination, + "ts": origin_server_ts, + "response_code": 0, + "response_json": None, + } + ) + + # TODO Update the tx id -> pdu id mapping return prev_txns @@ -171,15 +173,20 @@ class TransactionStore(SQLBaseStore): transaction_id, destination, code, response_dict ) - def _delivered_txn(cls, txn, transaction_id, destination, + def _delivered_txn(self, txn, transaction_id, destination, code, response_json): - query = ( - "UPDATE %s " - "SET response_code = ?, response_json = ? " - "WHERE transaction_id = ? AND destination = ?" - ) % SentTransactions.table_name - - txn.execute(query, (code, response_json, transaction_id, destination)) + self._simple_update_one_txn( + txn, + table=SentTransactions.table_name, + keyvalues={ + "transaction_id": transaction_id, + "destination": destination, + }, + updatevalues={ + "response_code": code, + "response_json": response_json, + } + ) def get_transactions_after(self, transaction_id, destination): """Get all transactions after a given local transaction_id. @@ -189,25 +196,26 @@ class TransactionStore(SQLBaseStore): destination (str) Returns: - list: A list of `ReceivedTransactionsTable.EntryType` + list: A list of dicts """ return self.runInteraction( "get_transactions_after", self._get_transactions_after, transaction_id, destination ) - def _get_transactions_after(cls, txn, transaction_id, destination): - where = ( - "destination = ? AND id > (select id FROM %s WHERE " - "transaction_id = ? AND destination = ?)" - ) % ( - SentTransactions.table_name + def _get_transactions_after(self, txn, transaction_id, destination): + query = ( + "SELECT * FROM sent_transactions" + " WHERE destination = ? AND id >" + " (" + " SELECT id FROM sent_transactions" + " WHERE transaction_id = ? AND destination = ?" + " )" ) - query = SentTransactions.select_statement(where) txn.execute(query, (destination, transaction_id, destination)) - return ReceivedTransactionsTable.decode_results(txn.fetchall()) + return self.cursor_to_dict(txn) @cached() def get_destination_retry_timings(self, destination): @@ -218,22 +226,27 @@ class TransactionStore(SQLBaseStore): Returns: None if not retrying - Otherwise a DestinationsTable.EntryType for the retry scheme + Otherwise a dict for the retry scheme """ return self.runInteraction( "get_destination_retry_timings", self._get_destination_retry_timings, destination) - def _get_destination_retry_timings(cls, txn, destination): - query = DestinationsTable.select_statement("destination = ?") - txn.execute(query, (destination,)) - result = txn.fetchall() - if result: - result = DestinationsTable.decode_single_result(result) - if result.retry_last_ts > 0: - return result - else: - return None + def _get_destination_retry_timings(self, txn, destination): + result = self._simple_select_one_txn( + txn, + table=DestinationsTable.table_name, + keyvalues={ + "destination": destination, + }, + retcols=DestinationsTable.fields, + allow_none=True, + ) + + if result and result["retry_last_ts"] > 0: + return result + else: + return None def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval): @@ -249,11 +262,11 @@ class TransactionStore(SQLBaseStore): # As this is the new value, we might as well prefill the cache self.get_destination_retry_timings.prefill( destination, - DestinationsTable.EntryType( - destination, - retry_last_ts, - retry_interval - ) + { + "destination": destination, + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval + }, ) # XXX: we could chose to not bother persisting this if our cache thinks @@ -266,22 +279,38 @@ class TransactionStore(SQLBaseStore): retry_interval, ) - def _set_destination_retry_timings(cls, txn, destination, + def _set_destination_retry_timings(self, txn, destination, retry_last_ts, retry_interval): - query = ( - "INSERT OR REPLACE INTO %s " - "(destination, retry_last_ts, retry_interval) " - "VALUES (?, ?, ?) " - ) % DestinationsTable.table_name + "UPDATE destinations" + " SET retry_last_ts = ?, retry_interval = ?" + " WHERE destination = ?" + ) + + txn.execute( + query, + ( + retry_last_ts, retry_interval, destination, + ) + ) - txn.execute(query, (destination, retry_last_ts, retry_interval)) + if txn.rowcount == 0: + # destination wasn't already in table. Insert it. + self._simple_insert_txn( + txn, + table="destinations", + values={ + "destination": destination, + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + } + ) def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction. Returns: - list: A list of `DestinationsTable.EntryType` + list: A list of dicts """ return self.runInteraction( @@ -289,14 +318,17 @@ class TransactionStore(SQLBaseStore): self._get_destinations_needing_retry ) - def _get_destinations_needing_retry(cls, txn): - where = "retry_last_ts > 0 and retry_next_ts < now()" - query = DestinationsTable.select_statement(where) - txn.execute(query) - return DestinationsTable.decode_results(txn.fetchall()) + def _get_destinations_needing_retry(self, txn): + query = ( + "SELECT * FROM destinations" + " WHERE retry_last_ts > 0 and retry_next_ts < ?" + ) + + txn.execute(query, (self._clock.time_msec(),)) + return self.cursor_to_dict(txn) -class ReceivedTransactionsTable(Table): +class ReceivedTransactionsTable(object): table_name = "received_transactions" fields = [ @@ -308,10 +340,8 @@ class ReceivedTransactionsTable(Table): "has_been_referenced", ] - EntryType = namedtuple("ReceivedTransactionsEntry", fields) - -class SentTransactions(Table): +class SentTransactions(object): table_name = "sent_transactions" fields = [ @@ -326,7 +356,7 @@ class SentTransactions(Table): EntryType = namedtuple("SentTransactionsEntry", fields) -class TransactionsToPduTable(Table): +class TransactionsToPduTable(object): table_name = "transaction_id_to_pdu" fields = [ @@ -336,10 +366,8 @@ class TransactionsToPduTable(Table): "pdu_origin", ] - EntryType = namedtuple("TransactionsToPduEntry", fields) - -class DestinationsTable(Table): +class DestinationsTable(object): table_name = "destinations" fields = [ @@ -347,5 +375,3 @@ class DestinationsTable(Table): "retry_last_ts", "retry_interval", ] - - EntryType = namedtuple("DestinationsEntry", fields) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 4e82232796..a42138f556 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -60,7 +60,7 @@ def get_retry_limiter(destination, clock, store, **kwargs): if retry_timings: retry_last_ts, retry_interval = ( - retry_timings.retry_last_ts, retry_timings.retry_interval + retry_timings["retry_last_ts"], retry_timings["retry_interval"] ) now = int(clock.time_msec()) |