diff --git a/scripts/port_to_maria.py b/scripts/port_to_maria.py
new file mode 100644
index 0000000000..0d7ba92357
--- /dev/null
+++ b/scripts/port_to_maria.py
@@ -0,0 +1,383 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer, reactor
+from twisted.enterprise import adbapi
+
+from synapse.storage._base import LoggingTransaction, SQLBaseStore
+from synapse.storage.engines import create_engine
+
+import argparse
+import itertools
+import logging
+import types
+import yaml
+
+
+logger = logging.getLogger("port_to_maria")
+
+
+BINARY_COLUMNS = {
+ "event_content_hashes": ["hash"],
+ "event_reference_hashes": ["hash"],
+ "event_signatures": ["signature"],
+ "event_edge_hashes": ["hash"],
+ "events": ["content", "unrecognized_keys"],
+ "event_json": ["internal_metadata", "json"],
+ "application_services_txns": ["event_ids"],
+ "received_transactions": ["response_json"],
+ "sent_transactions": ["response_json"],
+ "server_tls_certificates": ["tls_certificate"],
+ "server_signature_keys": ["verify_key"],
+ "pushers": ["pushkey", "data"],
+ "user_filters": ["filter_json"],
+}
+
+UNICODE_COLUMNS = {
+ "events": ["content", "unrecognized_keys"],
+ "event_json": ["internal_metadata", "json"],
+ "users": ["password_hash"],
+}
+
+
+BOOLEAN_COLUMNS = {
+ "events": ["processed", "outlier"],
+ "rooms": ["is_public"],
+ "event_edges": ["is_state"],
+ "presence_list": ["accepted"],
+}
+
+
+APPEND_ONLY_TABLES = [
+ "event_content_hashes",
+ "event_reference_hashes",
+ "event_signatures",
+ "event_edge_hashes",
+ "events",
+ "event_json",
+ "state_events",
+ "room_memberships",
+ "feedback",
+ "topics",
+ "room_names",
+ "rooms",
+ "local_media_repository",
+ "local_media_repository_thumbnails",
+ "remote_media_cache",
+ "remote_media_cache_thumbnails",
+ "redactions",
+ "event_edges",
+ "event_auth",
+ "received_transactions",
+ "sent_transactions",
+ "transaction_id_to_pdu",
+ "users",
+ "state_groups",
+ "state_groups_state",
+ "event_to_state_groups",
+ "rejections",
+]
+
+
+class Store(object):
+ def __init__(self, db_pool, engine):
+ self.db_pool = db_pool
+ self.database_engine = engine
+
+ _simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"]
+ _simple_insert = SQLBaseStore.__dict__["_simple_insert"]
+
+ _simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"]
+ _simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"]
+ _simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"]
+ _simple_select_one_onecol_txn = SQLBaseStore.__dict__["_simple_select_one_onecol_txn"]
+
+ _simple_update_one = SQLBaseStore.__dict__["_simple_update_one"]
+ _simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"]
+
+ _execute_and_decode = SQLBaseStore.__dict__["_execute_and_decode"]
+
+ def runInteraction(self, desc, func, *args, **kwargs):
+ def r(conn):
+ try:
+ i = 0
+ N = 5
+ while True:
+ try:
+ txn = conn.cursor()
+ return func(
+ LoggingTransaction(txn, desc, self.database_engine),
+ *args, **kwargs
+ )
+ except self.database_engine.module.DatabaseError as e:
+ if self.database_engine.is_deadlock(e):
+ logger.warn("[TXN DEADLOCK] {%s} %d/%d", desc, i, N)
+ if i < N:
+ i += 1
+ conn.rollback()
+ continue
+ raise
+ except Exception as e:
+ logger.debug("[TXN FAIL] {%s}", desc, e)
+ raise
+
+ return self.db_pool.runWithConnection(r)
+
+ def insert_many_txn(self, txn, table, headers, rows):
+ sql = "INSERT INTO %s (%s) VALUES (%s)" % (
+ table,
+ ", ".join(k for k in headers),
+ ", ".join("%s" for _ in headers)
+ )
+
+ try:
+ txn.executemany(sql, rows)
+ except:
+ logger.exception(
+ "Failed to insert: %s",
+ table,
+ )
+ raise
+
+
+
+def chunks(n):
+ for i in itertools.count(0, n):
+ yield range(i, i+n)
+
+
+@defer.inlineCallbacks
+def handle_table(table, sqlite_store, mysql_store):
+ if table in APPEND_ONLY_TABLES:
+ # It's safe to just carry on inserting.
+ next_chunk = yield mysql_store._simple_select_one_onecol(
+ table="port_from_sqlite3",
+ keyvalues={"table_name": table},
+ retcol="rowid",
+ allow_none=True,
+ )
+
+ if next_chunk is None:
+ yield mysql_store._simple_insert(
+ table="port_from_sqlite3",
+ values={"table_name": table, "rowid": 0}
+ )
+
+ next_chunk = 0
+ else:
+ def delete_all(txn):
+ txn.execute(
+ "DELETE FROM port_from_sqlite3 WHERE table_name = %s",
+ (table,)
+ )
+ txn.execute("TRUNCATE %s CASCADE" % (table,))
+ mysql_store._simple_insert_txn(
+ txn,
+ table="port_from_sqlite3",
+ values={"table_name": table, "rowid": 0}
+ )
+
+ yield mysql_store.runInteraction(
+ "delete_non_append_only", delete_all
+ )
+
+ next_chunk = 0
+
+ logger.info("next_chunk for %s: %d", table, next_chunk)
+
+ N = 5000
+
+ select = "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,)
+
+ uni_col_names = UNICODE_COLUMNS.get(table, [])
+ bool_col_names = BOOLEAN_COLUMNS.get(table, [])
+ bin_col_names = BINARY_COLUMNS.get(table, [])
+
+ while True:
+ def r(txn):
+ txn.execute(select, (next_chunk, N,))
+ rows = txn.fetchall()
+ headers = [column[0] for column in txn.description]
+
+ return headers, rows
+
+ headers, rows = yield sqlite_store.runInteraction("select", r)
+
+ logger.info("Got %d rows for %s", len(rows), table)
+
+ if rows:
+ uni_cols = [i for i, h in enumerate(headers) if h in uni_col_names]
+ bool_cols = [i for i, h in enumerate(headers) if h in bool_col_names]
+ bin_cols = [i for i, h in enumerate(headers) if h in bin_col_names]
+ next_chunk = rows[-1][0] + 1
+
+ def conv(j, col):
+ if j in uni_cols:
+ col = sqlite_store.database_engine.load_unicode(col)
+ if j in bool_cols:
+ return bool(col)
+
+ if j in bin_cols:
+ if isinstance(col, types.UnicodeType):
+ col = buffer(col.encode("utf8"))
+
+ return col
+
+ for i, row in enumerate(rows):
+ rows[i] = tuple(
+ mysql_store.database_engine.encode_parameter(
+ conv(j, col)
+ )
+ for j, col in enumerate(row)
+ if j > 0
+ )
+
+ def ins(txn):
+ mysql_store.insert_many_txn(txn, table, headers[1:], rows)
+
+ mysql_store._simple_update_one_txn(
+ txn,
+ table="port_from_sqlite3",
+ keyvalues={"table_name": table},
+ updatevalues={"rowid": next_chunk},
+ )
+
+ yield mysql_store.runInteraction("insert_many", ins)
+ else:
+ return
+
+
+def setup_db(db_config, database_engine):
+ db_conn = database_engine.module.connect(
+ **{
+ k: v for k, v in db_config.get("args", {}).items()
+ if not k.startswith("cp_")
+ }
+ )
+
+ database_engine.prepare_database(db_conn)
+
+ db_conn.commit()
+
+
+@defer.inlineCallbacks
+def main(sqlite_config, mysql_config):
+ try:
+ sqlite_db_pool = adbapi.ConnectionPool(
+ sqlite_config["name"],
+ **sqlite_config["args"]
+ )
+
+ mysql_db_pool = adbapi.ConnectionPool(
+ mysql_config["name"],
+ **mysql_config["args"]
+ )
+
+ sqlite_engine = create_engine("sqlite3")
+ mysql_engine = create_engine("psycopg2")
+
+ sqlite_store = Store(sqlite_db_pool, sqlite_engine)
+ mysql_store = Store(mysql_db_pool, mysql_engine)
+
+ # Step 1. Set up mysql database.
+ logger.info("Preparing sqlite database...")
+ setup_db(sqlite_config, sqlite_engine)
+
+ logger.info("Preparing mysql database...")
+ setup_db(mysql_config, mysql_engine)
+
+ # Step 2. Get tables.
+ logger.info("Fetching tables...")
+ tables = yield sqlite_store._simple_select_onecol(
+ table="sqlite_master",
+ keyvalues={
+ "type": "table",
+ },
+ retcol="name",
+ )
+
+ logger.info("Found %d tables", len(tables))
+
+ def create_port_table(txn):
+ txn.execute(
+ "CREATE TABLE port_from_sqlite3 ("
+ " table_name varchar(100) NOT NULL UNIQUE,"
+ " rowid bigint NOT NULL"
+ ")"
+ )
+
+ try:
+ yield mysql_store.runInteraction(
+ "create_port_table", create_port_table
+ )
+ except Exception as e:
+ logger.info("Failed to create port table: %s", e)
+
+ # Process tables.
+ yield defer.gatherResults(
+ [
+ handle_table(table, sqlite_store, mysql_store)
+ for table in tables
+ if table not in ["schema_version", "applied_schema_deltas"]
+ and not table.startswith("sqlite_")
+ ],
+ consumeErrors=True,
+ )
+
+ # for table in ["current_state_events"]: # tables:
+ # if table not in ["schema_version", "applied_schema_deltas"]:
+ # if not table.startswith("sqlite_"):
+ # yield handle_table(table, sqlite_store, mysql_store)
+ except:
+ logger.exception("")
+ finally:
+ reactor.stop()
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--sqlite-database")
+ parser.add_argument(
+ "--mysql-config", type=argparse.FileType('r'),
+ )
+
+ args = parser.parse_args()
+ logging.basicConfig(level=logging.INFO)
+
+ sqlite_config = {
+ "name": "sqlite3",
+ "args": {
+ "database": args.sqlite_database,
+ "cp_min": 1,
+ "cp_max": 1,
+ "check_same_thread": False,
+ },
+ }
+
+ mysql_config = yaml.safe_load(args.mysql_config)
+ # mysql_config["args"].update({
+ # "sql_mode": "TRADITIONAL",
+ # "charset": "utf8mb4",
+ # "use_unicode": True,
+ # "collation": "utf8mb4_bin",
+ # })
+
+ reactor.callWhenRunning(
+ main,
+ sqlite_config=sqlite_config,
+ mysql_config=mysql_config,
+ )
+
+ reactor.run()
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 27e53a9e56..f8a33120b5 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -17,9 +17,8 @@
import sys
sys.dont_write_bytecode = True
-from synapse.storage import (
- prepare_database, prepare_sqlite3_database, UpgradeDatabaseException,
-)
+from synapse.storage import UpgradeDatabaseException
+from synapse.storage.engines import create_engine
from synapse.server import HomeServer
@@ -57,9 +56,10 @@ import os
import re
import resource
import subprocess
-import sqlite3
+import yaml
+
-logger = logging.getLogger(__name__)
+logger = logging.getLogger("synapse.app.homeserver")
class SynapseHomeServer(HomeServer):
@@ -103,13 +103,11 @@ class SynapseHomeServer(HomeServer):
return None
def build_db_pool(self):
+ name = self.db_config["name"]
+
return adbapi.ConnectionPool(
- "sqlite3", self.get_db_name(),
- check_same_thread=False,
- cp_min=1,
- cp_max=1,
- cp_openfun=prepare_database, # Prepare the database for each conn
- # so that :memory: sqlite works
+ name,
+ **self.db_config.get("args", {})
)
def create_resource_tree(self, redirect_root_to_web_client):
@@ -352,15 +350,53 @@ def setup(config_options):
tls_context_factory = context_factory.ServerContextFactory(config)
+ if config.database_config:
+ with open(config.database_config, 'r') as f:
+ db_config = yaml.safe_load(f)
+ else:
+ db_config = {
+ "name": "sqlite3",
+ "args": {
+ "database": config.database_path,
+ },
+ }
+
+ db_config = {
+ k: v for k, v in db_config.items()
+ }
+
+ name = db_config.get("name", None)
+ if name in ["MySQLdb", "mysql.connector"]:
+ db_config.setdefault("args", {}).update({
+ "sql_mode": "TRADITIONAL",
+ "charset": "utf8mb4",
+ "use_unicode": True,
+ "collation": "utf8mb4_bin",
+ })
+ elif name == "psycopg2":
+ pass
+ elif name == "sqlite3":
+ db_config.setdefault("args", {}).update({
+ "cp_min": 1,
+ "cp_max": 1,
+ })
+ else:
+ raise RuntimeError("Unsupported database type '%s'" % (name,))
+
+ database_engine = create_engine(name)
+ db_config["args"]["cp_openfun"] = database_engine.on_new_connection
+
hs = SynapseHomeServer(
config.server_name,
domain_with_port=domain_with_port,
upload_dir=os.path.abspath("uploads"),
db_name=config.database_path,
+ db_config=db_config,
tls_context_factory=tls_context_factory,
config=config,
content_addr=config.content_addr,
version_string=version_string,
+ database_engine=database_engine,
)
hs.create_resource_tree(
@@ -372,9 +408,16 @@ def setup(config_options):
logger.info("Preparing database: %s...", db_name)
try:
- with sqlite3.connect(db_name) as db_conn:
- prepare_sqlite3_database(db_conn)
- prepare_database(db_conn)
+ db_conn = database_engine.module.connect(
+ **{
+ k: v for k, v in db_config.get("args", {}).items()
+ if not k.startswith("cp_")
+ }
+ )
+
+ database_engine.prepare_database(db_conn)
+
+ db_conn.commit()
except UpgradeDatabaseException:
sys.stderr.write(
"\nFailed to upgrade database.\n"
diff --git a/synapse/config/database.py b/synapse/config/database.py
index 87efe54645..8dc9873f8c 100644
--- a/synapse/config/database.py
+++ b/synapse/config/database.py
@@ -26,6 +26,11 @@ class DatabaseConfig(Config):
self.database_path = self.abspath(args.database_path)
self.event_cache_size = self.parse_size(args.event_cache_size)
+ if args.database_config:
+ self.database_config = self.abspath(args.database_config)
+ else:
+ self.database_config = None
+
@classmethod
def add_arguments(cls, parser):
super(DatabaseConfig, cls).add_arguments(parser)
@@ -38,6 +43,10 @@ class DatabaseConfig(Config):
"--event-cache-size", default="100K",
help="Number of events to cache in memory."
)
+ db_group.add_argument(
+ "--database-config", default=None,
+ help="Location of the database configuration file."
+ )
@classmethod
def generate_config(cls, args, config_dir_path):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8aceac28cf..98148c13d7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -179,7 +179,7 @@ class FederationHandler(BaseHandler):
# it's probably a good idea to mark it as not in retry-state
# for sending (although this is a bit of a leap)
retry_timings = yield self.store.get_destination_retry_timings(origin)
- if (retry_timings and retry_timings.retry_last_ts):
+ if retry_timings and retry_timings["retry_last_ts"]:
self.store.set_destination_retry_timings(origin, 0, 0)
room = yield self.store.get_room(event.room_id)
diff --git a/synapse/handlers/login.py b/synapse/handlers/login.py
index 7447800460..76647c7941 100644
--- a/synapse/handlers/login.py
+++ b/synapse/handlers/login.py
@@ -57,7 +57,7 @@ class LoginHandler(BaseHandler):
logger.warn("Attempted to login as %s but they do not exist", user)
raise LoginError(403, "", errcode=Codes.FORBIDDEN)
- stored_hash = user_info[0]["password_hash"]
+ stored_hash = user_info["password_hash"]
if bcrypt.checkpw(password, stored_hash):
# generate an access token and store it.
token = self.reg_handler._generate_token(user)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 823affc380..bc7f1c2402 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -311,25 +311,6 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue(chunk_data)
@defer.inlineCallbacks
- def get_room_member(self, room_id, member_user_id, auth_user_id):
- """Retrieve a room member from a room.
-
- Args:
- room_id : The room the member is in.
- member_user_id : The member's user ID
- auth_user_id : The user ID of the user making this request.
- Returns:
- The room member, or None if this member does not exist.
- Raises:
- SynapseError if something goes wrong.
- """
- yield self.auth.check_joined_room(room_id, auth_user_id)
-
- member = yield self.store.get_room_member(user_id=member_user_id,
- room_id=room_id)
- defer.returnValue(member)
-
- @defer.inlineCallbacks
def change_membership(self, event, context, do_auth=True):
""" Change the membership status of a user in a room.
diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py
index 1e77eb49cf..7387b4adb9 100644
--- a/synapse/rest/client/v1/profile.py
+++ b/synapse/rest/client/v1/profile.py
@@ -19,9 +19,13 @@ from twisted.internet import defer
from .base import ClientV1RestServlet, client_path_pattern
from synapse.types import UserID
+import logging
import simplejson as json
+logger = logging.getLogger(__name__)
+
+
class ProfileDisplaynameRestServlet(ClientV1RestServlet):
PATTERN = client_path_pattern("/profile/(?P<user_id>[^/]*)/displayname")
@@ -47,7 +51,8 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet):
defer.returnValue((400, "Unable to parse name"))
yield self.handlers.profile_handler.set_displayname(
- user, auth_user, new_name)
+ user, auth_user, new_name
+ )
defer.returnValue((200, {}))
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f4dec70393..995114e405 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -51,7 +51,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 15
+SCHEMA_VERSION = 16
dir_path = os.path.abspath(os.path.dirname(__file__))
@@ -104,14 +104,16 @@ class DataStore(RoomMemberStore, RoomStore,
self.client_ip_last_seen.prefill(*key + (now,))
- yield self._simple_insert(
+ yield self._simple_upsert(
"user_ips",
- {
- "user": user.to_string(),
+ keyvalues={
+ "user_id": user.to_string(),
"access_token": access_token,
- "device_id": device_id,
"ip": ip,
"user_agent": user_agent,
+ },
+ values={
+ "device_id": device_id,
"last_seen": now,
},
desc="insert_client_ip",
@@ -120,7 +122,7 @@ class DataStore(RoomMemberStore, RoomStore,
def get_user_ip_and_agents(self, user):
return self._simple_select_list(
table="user_ips",
- keyvalues={"user": user.to_string()},
+ keyvalues={"user_id": user.to_string()},
retcols=[
"device_id", "access_token", "ip", "user_agent", "last_seen"
],
@@ -148,21 +150,23 @@ class UpgradeDatabaseException(PrepareDatabaseException):
pass
-def prepare_database(db_conn):
+def prepare_database(db_conn, database_engine):
"""Prepares a database for usage. Will either create all necessary tables
or upgrade from an older schema version.
"""
try:
cur = db_conn.cursor()
- version_info = _get_or_create_schema_state(cur)
+ version_info = _get_or_create_schema_state(cur, database_engine)
if version_info:
user_version, delta_files, upgraded = version_info
- _upgrade_existing_database(cur, user_version, delta_files, upgraded)
+ _upgrade_existing_database(
+ cur, user_version, delta_files, upgraded, database_engine
+ )
else:
- _setup_new_database(cur)
+ _setup_new_database(cur, database_engine)
- cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
+ # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
cur.close()
db_conn.commit()
@@ -171,7 +175,7 @@ def prepare_database(db_conn):
raise
-def _setup_new_database(cur):
+def _setup_new_database(cur, database_engine):
"""Sets up the database by finding a base set of "full schemas" and then
applying any necessary deltas.
@@ -225,31 +229,30 @@ def _setup_new_database(cur):
directory_entries = os.listdir(sql_dir)
- sql_script = "BEGIN TRANSACTION;\n"
for filename in fnmatch.filter(directory_entries, "*.sql"):
sql_loc = os.path.join(sql_dir, filename)
logger.debug("Applying schema %s", sql_loc)
- sql_script += read_schema(sql_loc)
- sql_script += "\n"
- sql_script += "COMMIT TRANSACTION;"
- cur.executescript(sql_script)
+ executescript(cur, sql_loc)
cur.execute(
- "INSERT OR REPLACE INTO schema_version (version, upgraded)"
- " VALUES (?,?)",
- (max_current_ver, False)
+ database_engine.convert_param_style(
+ "INSERT INTO schema_version (version, upgraded)"
+ " VALUES (?,?)"
+ ),
+ (max_current_ver, False,)
)
_upgrade_existing_database(
cur,
current_version=max_current_ver,
applied_delta_files=[],
- upgraded=False
+ upgraded=False,
+ database_engine=database_engine,
)
def _upgrade_existing_database(cur, current_version, applied_delta_files,
- upgraded):
+ upgraded, database_engine):
"""Upgrades an existing database.
Delta files can either be SQL stored in *.sql files, or python modules
@@ -305,6 +308,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
if not upgraded:
start_ver += 1
+ logger.debug("applied_delta_files: %s", applied_delta_files)
+
for v in range(start_ver, SCHEMA_VERSION + 1):
logger.debug("Upgrading schema to v%d", v)
@@ -321,6 +326,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
directory_entries.sort()
for file_name in directory_entries:
relative_path = os.path.join(str(v), file_name)
+ logger.debug("Found file: %s", relative_path)
if relative_path in applied_delta_files:
continue
@@ -342,9 +348,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
module.run_upgrade(cur)
elif ext == ".sql":
# A plain old .sql file, just read and execute it
- delta_schema = read_schema(absolute_path)
logger.debug("Applying schema %s", relative_path)
- cur.executescript(delta_schema)
+ executescript(cur, absolute_path)
else:
# Not a valid delta file.
logger.warn(
@@ -356,24 +361,82 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
# Mark as done.
cur.execute(
- "INSERT INTO applied_schema_deltas (version, file)"
- " VALUES (?,?)",
+ database_engine.convert_param_style(
+ "INSERT INTO applied_schema_deltas (version, file)"
+ " VALUES (?,?)",
+ ),
(v, relative_path)
)
cur.execute(
- "INSERT OR REPLACE INTO schema_version (version, upgraded)"
- " VALUES (?,?)",
+ database_engine.convert_param_style(
+ "REPLACE INTO schema_version (version, upgraded)"
+ " VALUES (?,?)",
+ ),
(v, True)
)
-def _get_or_create_schema_state(txn):
+def get_statements(f):
+ statement_buffer = ""
+ in_comment = False # If we're in a /* ... */ style comment
+
+ for line in f:
+ line = line.strip()
+
+ if in_comment:
+ # Check if this line contains an end to the comment
+ comments = line.split("*/", 1)
+ if len(comments) == 1:
+ continue
+ line = comments[1]
+ in_comment = False
+
+ # Remove inline block comments
+ line = re.sub(r"/\*.*\*/", " ", line)
+
+ # Does this line start a comment?
+ comments = line.split("/*", 1)
+ if len(comments) > 1:
+ line = comments[0]
+ in_comment = True
+
+ # Deal with line comments
+ line = line.split("--", 1)[0]
+ line = line.split("//", 1)[0]
+
+ # Find *all* semicolons. We need to treat first and last entry
+ # specially.
+ statements = line.split(";")
+
+ # We must prepend statement_buffer to the first statement
+ first_statement = "%s %s" % (
+ statement_buffer.strip(),
+ statements[0].strip()
+ )
+ statements[0] = first_statement
+
+ # Every entry, except the last, is a full statement
+ for statement in statements[:-1]:
+ yield statement.strip()
+
+ # The last entry did *not* end in a semicolon, so we store it for the
+ # next semicolon we find
+ statement_buffer = statements[-1].strip()
+
+
+def executescript(txn, schema_path):
+ with open(schema_path, 'r') as f:
+ for statement in get_statements(f):
+ txn.execute(statement)
+
+
+def _get_or_create_schema_state(txn, database_engine):
+ # Bluntly try creating the schema_version tables.
schema_path = os.path.join(
dir_path, "schema", "schema_version.sql",
)
- create_schema = read_schema(schema_path)
- txn.executescript(create_schema)
+ executescript(txn, schema_path)
txn.execute("SELECT version, upgraded FROM schema_version")
row = txn.fetchone()
@@ -382,10 +445,13 @@ def _get_or_create_schema_state(txn):
if current_version:
txn.execute(
- "SELECT file FROM applied_schema_deltas WHERE version >= ?",
+ database_engine.convert_param_style(
+ "SELECT file FROM applied_schema_deltas WHERE version >= ?"
+ ),
(current_version,)
)
- return current_version, txn.fetchall(), upgraded
+ applied_deltas = [d for d, in txn.fetchall()]
+ return current_version, applied_deltas, upgraded
return None
@@ -417,7 +483,7 @@ def prepare_sqlite3_database(db_conn):
if row and row[0]:
db_conn.execute(
- "INSERT OR REPLACE INTO schema_version (version, upgraded)"
+ "REPLACE INTO schema_version (version, upgraded)"
" VALUES (?,?)",
(row[0], False)
)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e3e67d8e0d..fa5199104a 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -22,6 +22,8 @@ from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
from synapse.util.lrucache import LruCache
import synapse.metrics
+from util.id_generators import IdGenerator, StreamIdGenerator
+
from twisted.internet import defer
from collections import namedtuple, OrderedDict
@@ -145,11 +147,12 @@ class LoggingTransaction(object):
"""An object that almost-transparently proxies for the 'txn' object
passed to the constructor. Adds logging and metrics to the .execute()
method."""
- __slots__ = ["txn", "name"]
+ __slots__ = ["txn", "name", "database_engine"]
- def __init__(self, txn, name):
+ def __init__(self, txn, name, database_engine):
object.__setattr__(self, "txn", txn)
object.__setattr__(self, "name", name)
+ object.__setattr__(self, "database_engine", database_engine)
def __getattr__(self, name):
return getattr(self.txn, name)
@@ -161,26 +164,32 @@ class LoggingTransaction(object):
# TODO(paul): Maybe use 'info' and 'debug' for values?
sql_logger.debug("[SQL] {%s} %s", self.name, sql)
- try:
- if args and args[0]:
- values = args[0]
+ sql = self.database_engine.convert_param_style(sql)
+
+ if args and args[0]:
+ args = list(args)
+ args[0] = [
+ self.database_engine.encode_parameter(a) for a in args[0]
+ ]
+ try:
sql_logger.debug(
- "[SQL values] {%s} " + ", ".join(("<%r>",) * len(values)),
+ "[SQL values] {%s} " + ", ".join(("<%r>",) * len(args[0])),
self.name,
- *values
+ *args[0]
)
- except:
- # Don't let logging failures stop SQL from working
- pass
+ except:
+ # Don't let logging failures stop SQL from working
+ pass
start = time.time() * 1000
+
try:
return self.txn.execute(
sql, *args, **kwargs
)
- except:
- logger.exception("[SQL FAIL] {%s}", self.name)
- raise
+ except Exception as e:
+ logger.debug("[SQL FAIL] {%s} %s", self.name, e)
+ raise
finally:
msecs = (time.time() * 1000) - start
sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
@@ -245,6 +254,13 @@ class SQLBaseStore(object):
self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
max_entries=hs.config.event_cache_size)
+ self.database_engine = hs.database_engine
+
+ self._stream_id_gen = StreamIdGenerator()
+ self._transaction_id_gen = IdGenerator("sent_transactions", "id", self)
+ self._state_groups_id_gen = IdGenerator("state_groups", "id", self)
+ self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self)
+
def start_profiling(self):
self._previous_loop_ts = self._clock.time_msec()
@@ -281,7 +297,7 @@ class SQLBaseStore(object):
start_time = time.time() * 1000
- def inner_func(txn, *args, **kwargs):
+ def inner_func(conn, *args, **kwargs):
with LoggingContext("runInteraction") as context:
current_context.copy_to(context)
start = time.time() * 1000
@@ -296,9 +312,25 @@ class SQLBaseStore(object):
sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
transaction_logger.debug("[TXN START] {%s}", name)
try:
- return func(LoggingTransaction(txn, name), *args, **kwargs)
- except:
- logger.exception("[TXN FAIL] {%s}", name)
+ i = 0
+ N = 5
+ while True:
+ try:
+ txn = conn.cursor()
+ return func(
+ LoggingTransaction(txn, name, self.database_engine),
+ *args, **kwargs
+ )
+ except self.database_engine.module.DatabaseError as e:
+ if self.database_engine.is_deadlock(e):
+ logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N)
+ if i < N:
+ i += 1
+ conn.rollback()
+ continue
+ raise
+ except Exception as e:
+ logger.debug("[TXN FAIL] {%s} %s", name, e)
raise
finally:
end = time.time() * 1000
@@ -311,7 +343,7 @@ class SQLBaseStore(object):
sql_txn_timer.inc_by(duration, desc)
with PreserveLoggingContext():
- result = yield self._db_pool.runInteraction(
+ result = yield self._db_pool.runWithConnection(
inner_func, *args, **kwargs
)
defer.returnValue(result)
@@ -342,11 +374,11 @@ class SQLBaseStore(object):
The result of decoder(results)
"""
def interaction(txn):
- cursor = txn.execute(query, args)
+ txn.execute(query, args)
if decoder:
- return decoder(cursor)
+ return decoder(txn)
else:
- return cursor.fetchall()
+ return txn.fetchall()
return self.runInteraction(desc, interaction)
@@ -356,27 +388,23 @@ class SQLBaseStore(object):
# "Simple" SQL API methods that operate on a single table with no JOINs,
# no complex WHERE clauses, just a dict of values for columns.
- def _simple_insert(self, table, values, or_replace=False, or_ignore=False,
+ def _simple_insert(self, table, values, or_ignore=False,
desc="_simple_insert"):
"""Executes an INSERT query on the named table.
Args:
table : string giving the table name
values : dict of new column names and values for them
- or_replace : bool; if True performs an INSERT OR REPLACE
"""
return self.runInteraction(
desc,
- self._simple_insert_txn, table, values, or_replace=or_replace,
- or_ignore=or_ignore,
+ self._simple_insert_txn, table, values,
+ or_ignore=or_ignore
)
@log_function
- def _simple_insert_txn(self, txn, table, values, or_replace=False,
- or_ignore=False):
- sql = "%s INTO %s (%s) VALUES(%s)" % (
- ("INSERT OR REPLACE" if or_replace else
- "INSERT OR IGNORE" if or_ignore else "INSERT"),
+ def _simple_insert_txn(self, txn, table, values, or_ignore=False):
+ sql = "INSERT INTO %s (%s) VALUES(%s)" % (
table,
", ".join(k for k in values),
", ".join("?" for k in values)
@@ -387,8 +415,11 @@ class SQLBaseStore(object):
sql, values.values(),
)
- txn.execute(sql, values.values())
- return txn.lastrowid
+ try:
+ txn.execute(sql, values.values())
+ except self.database_engine.module.IntegrityError:
+ if not or_ignore:
+ raise
def _simple_upsert(self, table, keyvalues, values, desc="_simple_upsert"):
"""
@@ -489,8 +520,7 @@ class SQLBaseStore(object):
def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol):
sql = (
- "SELECT %(retcol)s FROM %(table)s WHERE %(where)s "
- "ORDER BY rowid asc"
+ "SELECT %(retcol)s FROM %(table)s WHERE %(where)s"
) % {
"retcol": retcol,
"table": table,
@@ -548,14 +578,14 @@ class SQLBaseStore(object):
retcols : list of strings giving the names of the columns to return
"""
if keyvalues:
- sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
+ sql = "SELECT %s FROM %s WHERE %s" % (
", ".join(retcols),
table,
" AND ".join("%s = ?" % (k, ) for k in keyvalues)
)
txn.execute(sql, keyvalues.values())
else:
- sql = "SELECT %s FROM %s ORDER BY rowid asc" % (
+ sql = "SELECT %s FROM %s" % (
", ".join(retcols),
table
)
@@ -607,10 +637,10 @@ class SQLBaseStore(object):
def _simple_select_one_txn(self, txn, table, keyvalues, retcols,
allow_none=False):
- select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
+ select_sql = "SELECT %s FROM %s WHERE %s" % (
", ".join(retcols),
table,
- " AND ".join("%s = ?" % (k) for k in keyvalues)
+ " AND ".join("%s = ?" % (k,) for k in keyvalues)
)
txn.execute(select_sql, keyvalues.values())
@@ -648,6 +678,11 @@ class SQLBaseStore(object):
updatevalues=updatevalues,
)
+ # if txn.rowcount == 0:
+ # raise StoreError(404, "No row found")
+ if txn.rowcount > 1:
+ raise StoreError(500, "More than one row matched")
+
return ret
return self.runInteraction(desc, func)
@@ -777,6 +812,9 @@ class SQLBaseStore(object):
internal_metadata, js, redacted, rejected_reason = res
+ internal_metadata = self.database_engine.load_unicode(internal_metadata)
+ js = self.database_engine.load_unicode(js)
+
start_time = update_counter("select_event", start_time)
result = self._get_event_from_row_txn(
@@ -803,9 +841,11 @@ class SQLBaseStore(object):
sql_getevents_timer.inc_by(curr_time - last_time, desc)
return curr_time
+ logger.debug("Got js: %r", js)
d = json.loads(js)
start_time = update_counter("decode_json", start_time)
+ logger.debug("Got internal_metadata: %r", internal_metadata)
internal_metadata = json.loads(internal_metadata)
start_time = update_counter("decode_internal", start_time)
@@ -860,6 +900,12 @@ class SQLBaseStore(object):
result = txn.fetchone()
return result[0] if result else None
+ def get_next_stream_id(self):
+ with self._next_stream_id_lock:
+ i = self._next_stream_id
+ self._next_stream_id += 1
+ return i
+
class _RollbackButIsFineException(Exception):
""" This exception is used to rollback a transaction without implying
@@ -883,7 +929,7 @@ class Table(object):
_select_where_clause = "SELECT %s FROM %s WHERE %s"
_select_clause = "SELECT %s FROM %s"
- _insert_clause = "INSERT OR REPLACE INTO %s (%s) VALUES (%s)"
+ _insert_clause = "REPLACE INTO %s (%s) VALUES (%s)"
@classmethod
def select_statement(cls, where_clause=None):
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index f8cbb3f323..40e05b3635 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -366,11 +366,13 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
new_txn_id = max(highest_txn_id, last_txn_id) + 1
# Insert new txn into txn table
- event_ids = [e.event_id for e in events]
+ event_ids = buffer(
+ json.dumps([e.event_id for e in events]).encode("utf8")
+ )
txn.execute(
"INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
"VALUES(?,?,?)",
- (service.id, new_txn_id, json.dumps(event_ids))
+ (service.id, new_txn_id, event_ids)
)
return AppServiceTransaction(
service=service, id=new_txn_id, events=events
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 0199539fea..2b2bdf8615 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -21,8 +21,6 @@ from twisted.internet import defer
from collections import namedtuple
-import sqlite3
-
RoomAliasMapping = namedtuple(
"RoomAliasMapping",
@@ -91,7 +89,7 @@ class DirectoryStore(SQLBaseStore):
},
desc="create_room_alias_association",
)
- except sqlite3.IntegrityError:
+ except self.database_engine.module.IntegrityError:
raise SynapseError(
409, "Room alias %s already exists" % room_alias.to_string()
)
@@ -120,12 +118,12 @@ class DirectoryStore(SQLBaseStore):
defer.returnValue(room_id)
def _delete_room_alias_txn(self, txn, room_alias):
- cursor = txn.execute(
+ txn.execute(
"SELECT room_id FROM room_aliases WHERE room_alias = ?",
(room_alias.to_string(),)
)
- res = cursor.fetchone()
+ res = txn.fetchone()
if res:
room_id = res[0]
else:
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
new file mode 100644
index 0000000000..548d4e1b42
--- /dev/null
+++ b/synapse/storage/engines/__init__.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .maria import MariaEngine
+from .postgres import PostgresEngine
+from .sqlite3 import Sqlite3Engine
+
+import importlib
+
+
+SUPPORTED_MODULE = {
+ "sqlite3": Sqlite3Engine,
+ "mysql.connector": MariaEngine,
+ "psycopg2": PostgresEngine,
+}
+
+
+def create_engine(name):
+ engine_class = SUPPORTED_MODULE.get(name, None)
+
+ if engine_class:
+ module = importlib.import_module(name)
+ return engine_class(module)
+
+ raise RuntimeError(
+ "Unsupported database engine '%s'" % (name,)
+ )
diff --git a/synapse/storage/engines/maria.py b/synapse/storage/engines/maria.py
new file mode 100644
index 0000000000..90165f6849
--- /dev/null
+++ b/synapse/storage/engines/maria.py
@@ -0,0 +1,50 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage import prepare_database
+
+import types
+
+
+class MariaEngine(object):
+ def __init__(self, database_module):
+ self.module = database_module
+
+ def convert_param_style(self, sql):
+ return sql.replace("?", "%s")
+
+ def encode_parameter(self, param):
+ if isinstance(param, types.BufferType):
+ return bytes(param)
+ return param
+
+ def on_new_connection(self, db_conn):
+ pass
+
+ def prepare_database(self, db_conn):
+ cur = db_conn.cursor()
+ cur.execute(
+ "ALTER DATABASE CHARACTER SET utf8mb4 COLLATE utf8mb4_bin"
+ )
+ db_conn.commit()
+ prepare_database(db_conn, self)
+
+ def is_deadlock(self, error):
+ if isinstance(error, self.module.DatabaseError):
+ return error.sqlstate == "40001" and error.errno == 1213
+ return False
+
+ def load_unicode(self, v):
+ return bytes(v).decode("UTF8")
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
new file mode 100644
index 0000000000..457c1f70a5
--- /dev/null
+++ b/synapse/storage/engines/postgres.py
@@ -0,0 +1,44 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage import prepare_database
+
+
+class PostgresEngine(object):
+ def __init__(self, database_module):
+ self.module = database_module
+ self.module.extensions.register_type(self.module.extensions.UNICODE)
+
+ def convert_param_style(self, sql):
+ return sql.replace("?", "%s")
+
+ def encode_parameter(self, param):
+ return param
+
+ def on_new_connection(self, db_conn):
+ db_conn.set_isolation_level(
+ self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
+ )
+
+ def prepare_database(self, db_conn):
+ prepare_database(db_conn, self)
+
+ def is_deadlock(self, error):
+ if isinstance(error, self.module.DatabaseError):
+ return error.pgcode in ["40001", "40P01"]
+ return False
+
+ def load_unicode(self, v):
+ return bytes(v).decode("UTF8")
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
new file mode 100644
index 0000000000..389df35eb5
--- /dev/null
+++ b/synapse/storage/engines/sqlite3.py
@@ -0,0 +1,44 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage import prepare_database, prepare_sqlite3_database
+
+import types
+
+
+class Sqlite3Engine(object):
+ def __init__(self, database_module):
+ self.module = database_module
+
+ def convert_param_style(self, sql):
+ return sql
+
+ def encode_parameter(self, param):
+ return param
+
+ def on_new_connection(self, db_conn):
+ self.prepare_database(db_conn)
+
+ def prepare_database(self, db_conn):
+ prepare_sqlite3_database(db_conn)
+ prepare_database(db_conn, self)
+
+ def is_deadlock(self, error):
+ return False
+
+ def load_unicode(self, v):
+ if isinstance(v, types.UnicodeType):
+ return v
+ return bytes(v).decode("UTF8")
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 032334bfd6..54a3c9d805 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -153,7 +153,7 @@ class EventFederationStore(SQLBaseStore):
results = self._get_prev_events_and_state(
txn,
event_id,
- is_state=1,
+ is_state=True,
)
return [(e_id, h, ) for e_id, h, _ in results]
@@ -164,7 +164,7 @@ class EventFederationStore(SQLBaseStore):
}
if is_state is not None:
- keyvalues["is_state"] = is_state
+ keyvalues["is_state"] = bool(is_state)
res = self._simple_select_list_txn(
txn,
@@ -242,7 +242,6 @@ class EventFederationStore(SQLBaseStore):
"room_id": room_id,
"min_depth": depth,
},
- or_replace=True,
)
def _handle_prev_events(self, txn, outlier, event_id, prev_events,
@@ -260,9 +259,8 @@ class EventFederationStore(SQLBaseStore):
"event_id": event_id,
"prev_event_id": e_id,
"room_id": room_id,
- "is_state": 0,
+ "is_state": False,
},
- or_ignore=True,
)
# Update the extremities table if this is not an outlier.
@@ -281,19 +279,19 @@ class EventFederationStore(SQLBaseStore):
# We only insert as a forward extremity the new event if there are
# no other events that reference it as a prev event
query = (
- "INSERT OR IGNORE INTO %(table)s (event_id, room_id) "
- "SELECT ?, ? WHERE NOT EXISTS ("
- "SELECT 1 FROM %(event_edges)s WHERE "
- "prev_event_id = ? "
- ")"
- ) % {
- "table": "event_forward_extremities",
- "event_edges": "event_edges",
- }
+ "SELECT 1 FROM event_edges WHERE prev_event_id = ?"
+ )
- logger.debug("query: %s", query)
+ txn.execute(query, (event_id,))
- txn.execute(query, (event_id, room_id, event_id))
+ if not txn.fetchone():
+ query = (
+ "INSERT INTO event_forward_extremities"
+ " (event_id, room_id)"
+ " VALUES (?, ?)"
+ )
+
+ txn.execute(query, (event_id, room_id))
# Insert all the prev_events as a backwards thing, they'll get
# deleted in a second if they're incorrect anyway.
@@ -306,7 +304,6 @@ class EventFederationStore(SQLBaseStore):
"event_id": e_id,
"room_id": room_id,
},
- or_ignore=True,
)
# Also delete from the backwards extremities table all ones that
@@ -400,7 +397,7 @@ class EventFederationStore(SQLBaseStore):
query = (
"SELECT prev_event_id FROM event_edges "
- "WHERE room_id = ? AND event_id = ? AND is_state = 0 "
+ "WHERE room_id = ? AND event_id = ? AND is_state = ? "
"LIMIT ?"
)
@@ -409,7 +406,7 @@ class EventFederationStore(SQLBaseStore):
for event_id in front:
txn.execute(
query,
- (room_id, event_id, limit - len(event_results))
+ (room_id, event_id, False, limit - len(event_results))
)
for e_id, in txn.fetchall():
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2425f57f5f..0373d152b2 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -52,7 +52,6 @@ class EventsStore(SQLBaseStore):
is_new_state=is_new_state,
current_state=current_state,
)
- self.get_room_events_max_id.invalidate()
except _RollbackButIsFineException:
pass
@@ -96,12 +95,22 @@ class EventsStore(SQLBaseStore):
# Remove the any existing cache entries for the event_id
self._invalidate_get_event_cache(event.event_id)
+ if stream_ordering is None:
+ with self._stream_id_gen.get_next_txn(txn) as stream_ordering:
+ return self._persist_event_txn(
+ txn, event, context, backfilled,
+ stream_ordering=stream_ordering,
+ is_new_state=is_new_state,
+ current_state=current_state,
+ )
+
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
- txn.execute(
- "DELETE FROM current_state_events WHERE room_id = ?",
- (event.room_id,)
+ self._simple_delete_txn(
+ txn,
+ table="current_state_events",
+ keyvalues={"room_id": event.room_id},
)
for s in current_state:
@@ -114,7 +123,6 @@ class EventsStore(SQLBaseStore):
"type": s.type,
"state_key": s.state_key,
},
- or_replace=True,
)
if event.is_state() and is_new_state:
@@ -128,7 +136,6 @@ class EventsStore(SQLBaseStore):
"type": event.type,
"state_key": event.state_key,
},
- or_replace=True,
)
for prev_state_id, _ in event.prev_state:
@@ -151,14 +158,6 @@ class EventsStore(SQLBaseStore):
event.depth
)
- self._handle_prev_events(
- txn,
- outlier=outlier,
- event_id=event.event_id,
- prev_events=event.prev_events,
- room_id=event.room_id,
- )
-
have_persisted = self._simple_select_one_onecol_txn(
txn,
table="event_json",
@@ -185,23 +184,29 @@ class EventsStore(SQLBaseStore):
)
txn.execute(
sql,
- (metadata_json.decode("UTF-8"), event.event_id,)
+ (buffer(metadata_json), event.event_id,)
)
sql = (
- "UPDATE events SET outlier = 0"
+ "UPDATE events SET outlier = ?"
" WHERE event_id = ?"
)
txn.execute(
sql,
- (event.event_id,)
+ (False, event.event_id,)
)
return
+ self._handle_prev_events(
+ txn,
+ outlier=outlier,
+ event_id=event.event_id,
+ prev_events=event.prev_events,
+ room_id=event.room_id,
+ )
+
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
- elif event.type == EventTypes.Feedback:
- self._store_feedback_txn(txn, event)
elif event.type == EventTypes.Name:
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
@@ -224,15 +229,14 @@ class EventsStore(SQLBaseStore):
values={
"event_id": event.event_id,
"room_id": event.room_id,
- "internal_metadata": metadata_json.decode("UTF-8"),
- "json": encode_canonical_json(event_dict).decode("UTF-8"),
+ "internal_metadata": buffer(metadata_json),
+ "json": buffer(encode_canonical_json(event_dict)),
},
- or_replace=True,
)
- content = encode_canonical_json(
+ content = buffer(encode_canonical_json(
event.content
- ).decode("UTF-8")
+ ))
vals = {
"topological_ordering": event.depth,
@@ -245,9 +249,6 @@ class EventsStore(SQLBaseStore):
"depth": event.depth,
}
- if stream_ordering is not None:
- vals["stream_ordering"] = stream_ordering
-
unrec = {
k: v
for k, v in event.get_dict().items()
@@ -260,25 +261,24 @@ class EventsStore(SQLBaseStore):
]
}
- vals["unrecognized_keys"] = encode_canonical_json(
+ vals["unrecognized_keys"] = buffer(encode_canonical_json(
unrec
- ).decode("UTF-8")
+ ))
- try:
- self._simple_insert_txn(
- txn,
- "events",
- vals,
- or_replace=(not outlier),
- or_ignore=bool(outlier),
- )
- except:
- logger.warn(
- "Failed to persist, probably duplicate: %s",
- event.event_id,
- exc_info=True,
+ sql = (
+ "INSERT INTO events"
+ " (stream_ordering, topological_ordering, event_id, type,"
+ " room_id, content, processed, outlier, depth)"
+ " VALUES (?,?,?,?,?,?,?,?,?)"
+ )
+
+ txn.execute(
+ sql,
+ (
+ stream_ordering, event.depth, event.event_id, event.type,
+ event.room_id, content, True, outlier, event.depth
)
- raise _RollbackButIsFineException("_persist_event")
+ )
if context.rejected:
self._store_rejections_txn(txn, event.event_id, context.rejected)
@@ -302,15 +302,17 @@ class EventsStore(SQLBaseStore):
)
if is_new_state and not context.rejected:
- self._simple_insert_txn(
+ self._simple_upsert_txn(
txn,
"current_state_events",
- {
- "event_id": event.event_id,
+ keyvalues={
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
+ values={
+ "event_id": event.event_id,
+ }
)
for e_id, h in event.prev_state:
@@ -321,7 +323,7 @@ class EventsStore(SQLBaseStore):
"event_id": event.event_id,
"prev_event_id": e_id,
"room_id": event.room_id,
- "is_state": 1,
+ "is_state": True,
},
)
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 09d1e63657..d3b9b38664 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -57,16 +57,18 @@ class KeyStore(SQLBaseStore):
OpenSSL.crypto.FILETYPE_ASN1, tls_certificate
)
fingerprint = hashlib.sha256(tls_certificate_bytes).hexdigest()
- return self._simple_insert(
+ return self._simple_upsert(
table="server_tls_certificates",
- values={
+ keyvalues={
"server_name": server_name,
"fingerprint": fingerprint,
+ },
+ values={
"from_server": from_server,
"ts_added_ms": time_now_ms,
"tls_certificate": buffer(tls_certificate_bytes),
},
- or_ignore=True,
+ desc="store_server_certificate",
)
@defer.inlineCallbacks
@@ -107,14 +109,16 @@ class KeyStore(SQLBaseStore):
ts_now_ms (int): The time now in milliseconds
verification_key (VerifyKey): The NACL verify key.
"""
- return self._simple_insert(
+ return self._simple_upsert(
table="server_signature_keys",
- values={
+ keyvalues={
"server_name": server_name,
"key_id": "%s:%s" % (verify_key.alg, verify_key.version),
+ },
+ values={
"from_server": from_server,
"ts_added_ms": time_now_ms,
"verify_key": buffer(verify_key.encode()),
},
- or_ignore=True,
+ desc="store_server_verify_key",
)
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 87fba55439..22ec94bc16 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -57,6 +57,7 @@ class PresenceStore(SQLBaseStore):
values={"observed_user_id": observed_localpart,
"observer_user_id": observer_userid},
desc="allow_presence_visible",
+ or_ignore=True,
)
def disallow_presence_visible(self, observed_localpart, observer_userid):
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index a6e52cb248..e33963d0b4 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
+
from ._base import SQLBaseStore
@@ -24,19 +26,25 @@ class ProfileStore(SQLBaseStore):
desc="create_profile",
)
+ @defer.inlineCallbacks
def get_profile_displayname(self, user_localpart):
- return self._simple_select_one_onecol(
+ name = yield self._simple_select_one_onecol(
table="profiles",
keyvalues={"user_id": user_localpart},
retcol="displayname",
desc="get_profile_displayname",
)
+ if name:
+ name = self.database_engine.load_unicode(name)
+
+ defer.returnValue(name)
+
def set_profile_displayname(self, user_localpart, new_displayname):
return self._simple_update_one(
table="profiles",
keyvalues={"user_id": user_localpart},
- updatevalues={"displayname": new_displayname},
+ updatevalues={"displayname": new_displayname.encode("utf8")},
desc="set_profile_displayname",
)
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index c47bdc2861..ee7718d5ed 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -154,7 +154,7 @@ class PushRuleStore(SQLBaseStore):
txn.execute(sql, (user_name, priority_class, new_rule_priority))
# now insert the new rule
- sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+ sql = "INSERT INTO "+PushRuleTable.table_name+" ("
sql += ",".join(new_rule.keys())+") VALUES ("
sql += ", ".join(["?" for _ in new_rule.keys()])+")"
@@ -183,7 +183,7 @@ class PushRuleStore(SQLBaseStore):
new_rule['priority_class'] = priority_class
new_rule['priority'] = new_prio
- sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+ sql = "INSERT INTO "+PushRuleTable.table_name+" ("
sql += ",".join(new_rule.keys())+") VALUES ("
sql += ", ".join(["?" for _ in new_rule.keys()])+")"
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index f24154f146..8a63fe4691 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,8 +15,6 @@
from twisted.internet import defer
-from sqlite3 import IntegrityError
-
from synapse.api.errors import StoreError, Codes
from ._base import SQLBaseStore, cached
@@ -39,17 +37,13 @@ class RegistrationStore(SQLBaseStore):
Raises:
StoreError if there was a problem adding this.
"""
- row = yield self._simple_select_one(
- "users", {"name": user_id}, ["id"],
- desc="add_access_token_to_user",
- )
- if not row:
- raise StoreError(400, "Bad user ID supplied.")
- row_id = row["id"]
+ next_id = yield self._access_tokens_id_gen.get_next()
+
yield self._simple_insert(
"access_tokens",
{
- "user_id": row_id,
+ "id": next_id,
+ "user_id": user_id,
"token": token
},
desc="add_access_token_to_user",
@@ -74,27 +68,43 @@ class RegistrationStore(SQLBaseStore):
def _register(self, txn, user_id, token, password_hash):
now = int(self.clock.time())
+ next_id = self._access_tokens_id_gen.get_next_txn(txn)
+
try:
txn.execute("INSERT INTO users(name, password_hash, creation_ts) "
"VALUES (?,?,?)",
[user_id, password_hash, now])
- except IntegrityError:
+ except self.database_engine.module.IntegrityError:
raise StoreError(
400, "User ID already taken.", errcode=Codes.USER_IN_USE
)
# it's possible for this to get a conflict, but only for a single user
# since tokens are namespaced based on their user ID
- txn.execute("INSERT INTO access_tokens(user_id, token) " +
- "VALUES (?,?)", [txn.lastrowid, token])
+ txn.execute(
+ "INSERT INTO access_tokens(id, user_id, token)"
+ " VALUES (?,?,?)",
+ (next_id, user_id, token,)
+ )
+ @defer.inlineCallbacks
def get_user_by_id(self, user_id):
- query = ("SELECT users.name, users.password_hash FROM users"
- " WHERE users.name = ?")
- return self._execute(
- "get_user_by_id", self.cursor_to_dict, query, user_id
+ user_info = yield self._simple_select_one(
+ table="users",
+ keyvalues={
+ "name": user_id,
+ },
+ retcols=["name", "password_hash"],
+ allow_none=True,
)
+ if user_info:
+ user_info["password_hash"] = self.database_engine.load_unicode(
+ user_info["password_hash"]
+ )
+
+ defer.returnValue(user_info)
+
@cached()
# TODO(paul): Currently there's no code to invalidate this cache. That
# means if/when we ever add internal ways to invalidate access tokens or
@@ -134,12 +144,12 @@ class RegistrationStore(SQLBaseStore):
"SELECT users.name, users.admin,"
" access_tokens.device_id, access_tokens.id as token_id"
" FROM users"
- " INNER JOIN access_tokens on users.id = access_tokens.user_id"
+ " INNER JOIN access_tokens on users.name = access_tokens.user_id"
" WHERE token = ?"
)
- cursor = txn.execute(sql, (token,))
- rows = self.cursor_to_dict(cursor)
+ txn.execute(sql, (token,))
+ rows = self.cursor_to_dict(txn)
if rows:
return rows[0]
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index be3e28c2ea..48ebb33057 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -72,6 +72,7 @@ class RoomStore(SQLBaseStore):
keyvalues={"room_id": room_id},
retcols=RoomsTable.fields,
desc="get_room",
+ allow_none=True,
)
@defer.inlineCallbacks
@@ -102,24 +103,37 @@ class RoomStore(SQLBaseStore):
"ON c.event_id = room_names.event_id "
)
- # We use non printing ascii character US () as a seperator
+ # We use non printing ascii character US (\x1F) as a separator
sql = (
- "SELECT r.room_id, n.name, t.topic, "
- "group_concat(a.room_alias, '') "
- "FROM rooms AS r "
- "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
- "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
- "INNER JOIN room_aliases AS a ON a.room_id = r.room_id "
- "WHERE r.is_public = ? "
- "GROUP BY r.room_id "
+ "SELECT r.room_id, max(n.name), max(t.topic)"
+ " FROM rooms AS r"
+ " LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id"
+ " LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id"
+ " WHERE r.is_public = ?"
+ " GROUP BY r.room_id"
) % {
"topic": topic_subquery,
"name": name_subquery,
}
- c = txn.execute(sql, (is_public,))
+ txn.execute(sql, (is_public,))
- return c.fetchall()
+ rows = txn.fetchall()
+
+ for i, row in enumerate(rows):
+ room_id = row[0]
+ aliases = self._simple_select_onecol_txn(
+ txn,
+ table="room_aliases",
+ keyvalues={
+ "room_id": room_id
+ },
+ retcol="room_alias",
+ )
+
+ rows[i] = list(row) + [aliases]
+
+ return rows
rows = yield self.runInteraction(
"get_rooms", f
@@ -130,9 +144,10 @@ class RoomStore(SQLBaseStore):
"room_id": r[0],
"name": r[1],
"topic": r[2],
- "aliases": r[3].split(""),
+ "aliases": r[3],
}
for r in rows
+ if r[3] # We only return rooms that have at least one alias.
]
defer.returnValue(ret)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 52c37c76f5..8ea5756d61 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -40,7 +40,6 @@ class RoomMemberStore(SQLBaseStore):
"""
try:
target_user_id = event.state_key
- domain = UserID.from_string(target_user_id).domain
except:
logger.exception(
"Failed to parse target_user_id=%s", target_user_id
@@ -65,42 +64,8 @@ class RoomMemberStore(SQLBaseStore):
}
)
- # Update room hosts table
- if event.membership == Membership.JOIN:
- sql = (
- "INSERT OR IGNORE INTO room_hosts (room_id, host) "
- "VALUES (?, ?)"
- )
- txn.execute(sql, (event.room_id, domain))
- elif event.membership != Membership.INVITE:
- # Check if this was the last person to have left.
- member_events = self._get_members_query_txn(
- txn,
- where_clause=("c.room_id = ? AND m.membership = ?"
- " AND m.user_id != ?"),
- where_values=(event.room_id, Membership.JOIN, target_user_id,)
- )
-
- joined_domains = set()
- for e in member_events:
- try:
- joined_domains.add(
- UserID.from_string(e.state_key).domain
- )
- except:
- # FIXME: How do we deal with invalid user ids in the db?
- logger.exception("Invalid user_id: %s", event.state_key)
-
- if domain not in joined_domains:
- sql = (
- "DELETE FROM room_hosts WHERE room_id = ? AND host = ?"
- )
-
- txn.execute(sql, (event.room_id, domain))
-
self.get_rooms_for_user.invalidate(target_user_id)
- @defer.inlineCallbacks
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.
@@ -110,41 +75,27 @@ class RoomMemberStore(SQLBaseStore):
Returns:
Deferred: Results in a MembershipEvent or None.
"""
- rows = yield self._get_members_by_dict({
- "e.room_id": room_id,
- "m.user_id": user_id,
- })
+ def f(txn):
+ events = self._get_members_events_txn(
+ txn,
+ room_id,
+ user_id=user_id,
+ )
- defer.returnValue(rows[0] if rows else None)
+ return events[0] if events else None
- def _get_room_member(self, txn, user_id, room_id):
- sql = (
- "SELECT e.* FROM events as e"
- " INNER JOIN room_memberships as m"
- " ON e.event_id = m.event_id"
- " INNER JOIN current_state_events as c"
- " ON m.event_id = c.event_id"
- " WHERE m.user_id = ? and e.room_id = ?"
- " LIMIT 1"
- )
- txn.execute(sql, (user_id, room_id))
- rows = self.cursor_to_dict(txn)
- if rows:
- return self._parse_events_txn(txn, rows)[0]
- else:
- return None
+ return self.runInteraction("get_room_member", f)
def get_users_in_room(self, room_id):
def f(txn):
- sql = (
- "SELECT m.user_id FROM room_memberships as m"
- " INNER JOIN current_state_events as c"
- " ON m.event_id = c.event_id"
- " WHERE m.membership = ? AND m.room_id = ?"
+
+ rows = self._get_members_rows_txn(
+ txn,
+ room_id=room_id,
+ membership=Membership.JOIN,
)
- txn.execute(sql, (Membership.JOIN, room_id))
- return [r[0] for r in txn.fetchall()]
+ return [r["user_id"] for r in rows]
return self.runInteraction("get_users_in_room", f)
def get_room_members(self, room_id, membership=None):
@@ -159,11 +110,14 @@ class RoomMemberStore(SQLBaseStore):
list of namedtuples representing the members in this room.
"""
- where = {"m.room_id": room_id}
- if membership:
- where["m.membership"] = membership
+ def f(txn):
+ return self._get_members_events_txn(
+ txn,
+ room_id,
+ membership=membership,
+ )
- return self._get_members_by_dict(where)
+ return self.runInteraction("get_room_members", f)
def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
""" Get all the rooms for this user where the membership for this user
@@ -209,32 +163,55 @@ class RoomMemberStore(SQLBaseStore):
]
def get_joined_hosts_for_room(self, room_id):
- return self._simple_select_onecol(
- "room_hosts",
- {"room_id": room_id},
- "host",
- desc="get_joined_hosts_for_room",
+ return self.runInteraction(
+ "get_joined_hosts_for_room",
+ self._get_joined_hosts_for_room_txn,
+ room_id,
+ )
+
+ def _get_joined_hosts_for_room_txn(self, txn, room_id):
+ rows = self._get_members_rows_txn(
+ txn,
+ room_id, membership=Membership.JOIN
+ )
+
+ joined_domains = set(
+ UserID.from_string(r["user_id"]).domain
+ for r in rows
)
- def _get_members_by_dict(self, where_dict):
- clause = " AND ".join("%s = ?" % k for k in where_dict.keys())
- vals = where_dict.values()
- return self._get_members_query(clause, vals)
+ return joined_domains
def _get_members_query(self, where_clause, where_values):
return self.runInteraction(
- "get_members_query", self._get_members_query_txn,
+ "get_members_query", self._get_members_events_txn,
where_clause, where_values
)
- def _get_members_query_txn(self, txn, where_clause, where_values):
+ def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None):
+ rows = self._get_members_rows_txn(
+ txn,
+ room_id, membership, user_id,
+ )
+ return self._get_events_txn(txn, [r["event_id"] for r in rows])
+
+ def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None):
+ where_clause = "c.room_id = ?"
+ where_values = [room_id]
+
+ if membership:
+ where_clause += " AND m.membership = ?"
+ where_values.append(membership)
+
+ if user_id:
+ where_clause += " AND m.user_id = ?"
+ where_values.append(user_id)
+
sql = (
- "SELECT e.* FROM events as e "
- "INNER JOIN room_memberships as m "
- "ON e.event_id = m.event_id "
- "INNER JOIN current_state_events as c "
- "ON m.event_id = c.event_id "
- "WHERE %(where)s "
+ "SELECT m.* FROM room_memberships as m"
+ " INNER JOIN current_state_events as c"
+ " ON m.event_id = c.event_id"
+ " WHERE %(where)s"
) % {
"where": where_clause,
}
@@ -242,8 +219,7 @@ class RoomMemberStore(SQLBaseStore):
txn.execute(sql, where_values)
rows = self.cursor_to_dict(txn)
- results = self._parse_events_txn(txn, rows)
- return results
+ return rows
@cached()
def get_rooms_for_user(self, user_id):
diff --git a/synapse/storage/schema/delta/12/v12.sql b/synapse/storage/schema/delta/12/v12.sql
index b87ef1fe79..a246943f5a 100644
--- a/synapse/storage/schema/delta/12/v12.sql
+++ b/synapse/storage/schema/delta/12/v12.sql
@@ -14,54 +14,50 @@
*/
CREATE TABLE IF NOT EXISTS rejections(
- event_id TEXT NOT NULL,
- reason TEXT NOT NULL,
- last_check TEXT NOT NULL,
- CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE
+ event_id VARCHAR(150) NOT NULL,
+ reason VARCHAR(150) NOT NULL,
+ last_check VARCHAR(150) NOT NULL,
+ UNIQUE (event_id)
);
-- Push notification endpoints that users have configured
CREATE TABLE IF NOT EXISTS pushers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
- user_name TEXT NOT NULL,
- profile_tag varchar(32) NOT NULL,
- kind varchar(8) NOT NULL,
- app_id varchar(64) NOT NULL,
- app_display_name varchar(64) NOT NULL,
- device_display_name varchar(128) NOT NULL,
- pushkey blob NOT NULL,
- ts BIGINT NOT NULL,
- lang varchar(8),
- data blob,
+ user_name VARCHAR(150) NOT NULL,
+ profile_tag VARCHAR(32) NOT NULL,
+ kind VARCHAR(8) NOT NULL,
+ app_id VARCHAR(64) NOT NULL,
+ app_display_name VARCHAR(64) NOT NULL,
+ device_display_name VARCHAR(128) NOT NULL,
+ pushkey VARBINARY(512) NOT NULL,
+ ts BIGINT UNSIGNED NOT NULL,
+ lang VARCHAR(8),
+ data LONGBLOB,
last_token TEXT,
- last_success BIGINT,
- failing_since BIGINT,
- FOREIGN KEY(user_name) REFERENCES users(name),
+ last_success BIGINT UNSIGNED,
+ failing_since BIGINT UNSIGNED,
UNIQUE (app_id, pushkey)
);
CREATE TABLE IF NOT EXISTS push_rules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
- user_name TEXT NOT NULL,
- rule_id TEXT NOT NULL,
+ user_name VARCHAR(150) NOT NULL,
+ rule_id VARCHAR(150) NOT NULL,
priority_class TINYINT NOT NULL,
priority INTEGER NOT NULL DEFAULT 0,
- conditions TEXT NOT NULL,
- actions TEXT NOT NULL,
+ conditions VARCHAR(150) NOT NULL,
+ actions VARCHAR(150) NOT NULL,
UNIQUE(user_name, rule_id)
);
CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
CREATE TABLE IF NOT EXISTS user_filters(
- user_id TEXT,
- filter_id INTEGER,
- filter_json TEXT,
- FOREIGN KEY(user_id) REFERENCES users(id)
+ user_id VARCHAR(150),
+ filter_id BIGINT UNSIGNED,
+ filter_json LONGBLOB
);
CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters(
- user_id, filter_id
+ user_id, filter_id
);
-
-PRAGMA user_version = 12;
diff --git a/synapse/storage/schema/delta/13/v13.sql b/synapse/storage/schema/delta/13/v13.sql
index e491ad5aec..d1da2b48e2 100644
--- a/synapse/storage/schema/delta/13/v13.sql
+++ b/synapse/storage/schema/delta/13/v13.sql
@@ -15,20 +15,17 @@
CREATE TABLE IF NOT EXISTS application_services(
id INTEGER PRIMARY KEY AUTOINCREMENT,
- url TEXT,
- token TEXT,
- hs_token TEXT,
- sender TEXT,
- UNIQUE(token) ON CONFLICT ROLLBACK
+ url VARCHAR(150),
+ token VARCHAR(150),
+ hs_token VARCHAR(150),
+ sender VARCHAR(150),
+ UNIQUE(token)
);
CREATE TABLE IF NOT EXISTS application_services_regex(
id INTEGER PRIMARY KEY AUTOINCREMENT,
- as_id INTEGER NOT NULL,
+ as_id BIGINT UNSIGNED NOT NULL,
namespace INTEGER, /* enum[room_id|room_alias|user_id] */
- regex TEXT,
+ regex VARCHAR(150),
FOREIGN KEY(as_id) REFERENCES application_services(id)
);
-
-
-
diff --git a/synapse/storage/schema/delta/14/v14.sql b/synapse/storage/schema/delta/14/v14.sql
index 0212726448..8c47d4b0f4 100644
--- a/synapse/storage/schema/delta/14/v14.sql
+++ b/synapse/storage/schema/delta/14/v14.sql
@@ -1,7 +1,7 @@
CREATE TABLE IF NOT EXISTS push_rules_enable (
id INTEGER PRIMARY KEY AUTOINCREMENT,
- user_name TEXT NOT NULL,
- rule_id TEXT NOT NULL,
+ user_name VARCHAR(150) NOT NULL,
+ rule_id VARCHAR(150) NOT NULL,
enabled TINYINT,
UNIQUE(user_name, rule_id)
);
diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql
index 2b27e2a429..ddea8fc693 100644
--- a/synapse/storage/schema/delta/15/appservice_txns.sql
+++ b/synapse/storage/schema/delta/15/appservice_txns.sql
@@ -14,17 +14,18 @@
*/
CREATE TABLE IF NOT EXISTS application_services_state(
- as_id TEXT PRIMARY KEY,
- state TEXT,
- last_txn TEXT
+ as_id VARCHAR(150) PRIMARY KEY,
+ state VARCHAR(5),
+ last_txn INTEGER
);
CREATE TABLE IF NOT EXISTS application_services_txns(
- as_id TEXT NOT NULL,
+ as_id VARCHAR(150) NOT NULL,
txn_id INTEGER NOT NULL,
- event_ids TEXT NOT NULL,
- UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK
+ event_ids LONGBLOB NOT NULL,
+ UNIQUE(as_id, txn_id)
);
-
-
+CREATE INDEX IF NOT EXISTS application_services_txns_id ON application_services_txns (
+ as_id
+);
diff --git a/synapse/storage/schema/delta/15/presence_indices.sql b/synapse/storage/schema/delta/15/presence_indices.sql
new file mode 100644
index 0000000000..6b8d0f1ca7
--- /dev/null
+++ b/synapse/storage/schema/delta/15/presence_indices.sql
@@ -0,0 +1,2 @@
+
+CREATE INDEX IF NOT EXISTS presence_list_user_id ON presence_list (user_id);
diff --git a/synapse/storage/schema/delta/16/remote_media_cache_index.sql b/synapse/storage/schema/delta/16/remote_media_cache_index.sql
new file mode 100644
index 0000000000..7a15265cb1
--- /dev/null
+++ b/synapse/storage/schema/delta/16/remote_media_cache_index.sql
@@ -0,0 +1,2 @@
+CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
+ ON remote_media_cache_thumbnails (media_id);
\ No newline at end of file
diff --git a/synapse/storage/schema/delta/16/remove_duplicates.sql b/synapse/storage/schema/delta/16/remove_duplicates.sql
new file mode 100644
index 0000000000..65c97b5e2f
--- /dev/null
+++ b/synapse/storage/schema/delta/16/remove_duplicates.sql
@@ -0,0 +1,9 @@
+
+
+DELETE FROM event_to_state_groups WHERE state_group not in (
+ SELECT MAX(state_group) FROM event_to_state_groups GROUP BY event_id
+);
+
+DELETE FROM event_to_state_groups WHERE rowid not in (
+ SELECT MIN(rowid) FROM event_to_state_groups GROUP BY event_id
+);
diff --git a/synapse/storage/schema/delta/16/unique_constraints.sql b/synapse/storage/schema/delta/16/unique_constraints.sql
new file mode 100644
index 0000000000..f9fbb6b448
--- /dev/null
+++ b/synapse/storage/schema/delta/16/unique_constraints.sql
@@ -0,0 +1,72 @@
+
+-- We can use SQLite features here, since mysql support was only added in v16
+
+--
+DELETE FROM current_state_events WHERE rowid not in (
+ SELECT MIN(rowid) FROM current_state_events GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS current_state_events_event_id;
+CREATE UNIQUE INDEX current_state_events_event_id ON current_state_events(event_id);
+
+--
+DELETE FROM room_memberships WHERE rowid not in (
+ SELECT MIN(rowid) FROM room_memberships GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS room_memberships_event_id;
+CREATE UNIQUE INDEX room_memberships_event_id ON room_memberships(event_id);
+
+--
+DELETE FROM feedback WHERE rowid not in (
+ SELECT MIN(rowid) FROM feedback GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS feedback_event_id;
+CREATE UNIQUE INDEX feedback_event_id ON feedback(event_id);
+
+--
+DELETE FROM topics WHERE rowid not in (
+ SELECT MIN(rowid) FROM topics GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS topics_event_id;
+CREATE UNIQUE INDEX topics_event_id ON topics(event_id);
+
+--
+DELETE FROM room_names WHERE rowid not in (
+ SELECT MIN(rowid) FROM room_names GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS room_names_id;
+CREATE UNIQUE INDEX room_names_id ON room_names(event_id);
+
+--
+DELETE FROM presence WHERE rowid not in (
+ SELECT MIN(rowid) FROM presence GROUP BY user_id
+);
+
+DROP INDEX IF EXISTS presence_id;
+CREATE UNIQUE INDEX presence_id ON presence(user_id);
+
+--
+DELETE FROM presence_allow_inbound WHERE rowid not in (
+ SELECT MIN(rowid) FROM presence_allow_inbound
+ GROUP BY observed_user_id, observer_user_id
+);
+
+DROP INDEX IF EXISTS presence_allow_inbound_observers;
+CREATE UNIQUE INDEX presence_allow_inbound_observers ON presence_allow_inbound(
+ observed_user_id, observer_user_id
+);
+
+--
+DELETE FROM presence_list WHERE rowid not in (
+ SELECT MIN(rowid) FROM presence_list
+ GROUP BY user_id, observed_user_id
+);
+
+DROP INDEX IF EXISTS presence_list_observers;
+CREATE UNIQUE INDEX presence_list_observers ON presence_list(
+ user_id, observed_user_id
+);
diff --git a/synapse/storage/schema/delta/16/users.sql b/synapse/storage/schema/delta/16/users.sql
new file mode 100644
index 0000000000..db27bdca02
--- /dev/null
+++ b/synapse/storage/schema/delta/16/users.sql
@@ -0,0 +1,56 @@
+-- Convert `access_tokens`.user from rowids to user strings.
+-- MUST BE DONE BEFORE REMOVING ID COLUMN FROM USERS TABLE BELOW
+CREATE TABLE IF NOT EXISTS new_access_tokens(
+ id BIGINT UNSIGNED PRIMARY KEY,
+ user_id VARCHAR(150) NOT NULL,
+ device_id VARCHAR(150),
+ token VARCHAR(150) NOT NULL,
+ last_used BIGINT UNSIGNED,
+ UNIQUE(token)
+);
+
+INSERT INTO new_access_tokens
+ SELECT a.id, u.name, a.device_id, a.token, a.last_used
+ FROM access_tokens as a
+ INNER JOIN users as u ON u.id = a.user_id;
+
+DROP TABLE access_tokens;
+
+ALTER TABLE new_access_tokens RENAME TO access_tokens;
+
+-- Remove ID column from `users` table
+CREATE TABLE IF NOT EXISTS new_users(
+ name VARCHAR(150),
+ password_hash VARCHAR(150),
+ creation_ts BIGINT UNSIGNED,
+ admin BOOL DEFAULT 0 NOT NULL,
+ UNIQUE(name)
+);
+
+INSERT INTO new_users SELECT name, password_hash, creation_ts, admin FROM users;
+
+DROP TABLE users;
+
+ALTER TABLE new_users RENAME TO users;
+
+
+-- Remove UNIQUE constraint from `user_ips` table
+CREATE TABLE IF NOT EXISTS new_user_ips (
+ user_id VARCHAR(150) NOT NULL,
+ access_token VARCHAR(150) NOT NULL,
+ device_id VARCHAR(150),
+ ip VARCHAR(150) NOT NULL,
+ user_agent VARCHAR(150) NOT NULL,
+ last_seen BIGINT UNSIGNED NOT NULL
+);
+
+INSERT INTO new_user_ips
+ SELECT user, access_token, device_id, ip, user_agent, last_seen FROM user_ips;
+
+DROP TABLE user_ips;
+
+ALTER TABLE new_user_ips RENAME TO user_ips;
+
+CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user_id);
+CREATE INDEX IF NOT EXISTS user_ips_user_ip ON user_ips(user_id, access_token, ip);
+
diff --git a/synapse/storage/schema/full_schemas/11/event_edges.sql b/synapse/storage/schema/full_schemas/11/event_edges.sql
index 1e766d6db2..bdb1109094 100644
--- a/synapse/storage/schema/full_schemas/11/event_edges.sql
+++ b/synapse/storage/schema/full_schemas/11/event_edges.sql
@@ -14,9 +14,9 @@
*/
CREATE TABLE IF NOT EXISTS event_forward_extremities(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
+ event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ UNIQUE (event_id, room_id)
);
CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id);
@@ -24,9 +24,9 @@ CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id);
CREATE TABLE IF NOT EXISTS event_backward_extremities(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
+ event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ UNIQUE (event_id, room_id)
);
CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id);
@@ -34,11 +34,11 @@ CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id
CREATE TABLE IF NOT EXISTS event_edges(
- event_id TEXT NOT NULL,
- prev_event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- is_state INTEGER NOT NULL,
- CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id, is_state)
+ event_id VARCHAR(150) NOT NULL,
+ prev_event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ is_state BOOL NOT NULL,
+ UNIQUE (event_id, prev_event_id, room_id, is_state)
);
CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id);
@@ -46,30 +46,30 @@ CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id);
CREATE TABLE IF NOT EXISTS room_depth(
- room_id TEXT NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
min_depth INTEGER NOT NULL,
- CONSTRAINT uniqueness UNIQUE (room_id)
+ UNIQUE (room_id)
);
CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id);
create TABLE IF NOT EXISTS event_destinations(
- event_id TEXT NOT NULL,
- destination TEXT NOT NULL,
- delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered
- CONSTRAINT uniqueness UNIQUE (event_id, destination) ON CONFLICT REPLACE
+ event_id VARCHAR(150) NOT NULL,
+ destination VARCHAR(150) NOT NULL,
+ delivered_ts BIGINT UNSIGNED DEFAULT 0, -- or 0 if not delivered
+ UNIQUE (event_id, destination)
);
CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id);
CREATE TABLE IF NOT EXISTS state_forward_extremities(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- type TEXT NOT NULL,
- state_key TEXT NOT NULL,
- CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
+ event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ type VARCHAR(150) NOT NULL,
+ state_key VARCHAR(150) NOT NULL,
+ UNIQUE (event_id, room_id)
);
CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities(
@@ -79,11 +79,11 @@ CREATE INDEX IF NOT EXISTS st_extrem_id ON state_forward_extremities(event_id);
CREATE TABLE IF NOT EXISTS event_auth(
- event_id TEXT NOT NULL,
- auth_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- CONSTRAINT uniqueness UNIQUE (event_id, auth_id, room_id)
+ event_id VARCHAR(150) NOT NULL,
+ auth_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ UNIQUE (event_id, auth_id, room_id)
);
CREATE INDEX IF NOT EXISTS evauth_edges_id ON event_auth(event_id);
-CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id);
\ No newline at end of file
+CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id);
diff --git a/synapse/storage/schema/full_schemas/11/event_signatures.sql b/synapse/storage/schema/full_schemas/11/event_signatures.sql
index c28c39c48a..09886f607c 100644
--- a/synapse/storage/schema/full_schemas/11/event_signatures.sql
+++ b/synapse/storage/schema/full_schemas/11/event_signatures.sql
@@ -14,52 +14,42 @@
*/
CREATE TABLE IF NOT EXISTS event_content_hashes (
- event_id TEXT,
- algorithm TEXT,
- hash BLOB,
- CONSTRAINT uniqueness UNIQUE (event_id, algorithm)
+ event_id VARCHAR(150),
+ algorithm VARCHAR(150),
+ hash LONGBLOB,
+ UNIQUE (event_id, algorithm)
);
-CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes(
- event_id
-);
+CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes(event_id);
CREATE TABLE IF NOT EXISTS event_reference_hashes (
- event_id TEXT,
- algorithm TEXT,
- hash BLOB,
- CONSTRAINT uniqueness UNIQUE (event_id, algorithm)
+ event_id VARCHAR(150),
+ algorithm VARCHAR(150),
+ hash LONGBLOB,
+ UNIQUE (event_id, algorithm)
);
-CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes (
- event_id
-);
+CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes(event_id);
CREATE TABLE IF NOT EXISTS event_signatures (
- event_id TEXT,
- signature_name TEXT,
- key_id TEXT,
- signature BLOB,
- CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
+ event_id VARCHAR(150),
+ signature_name VARCHAR(150),
+ key_id VARCHAR(150),
+ signature LONGBLOB,
+ UNIQUE (event_id, signature_name, key_id)
);
-CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
- event_id
-);
+CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures(event_id);
CREATE TABLE IF NOT EXISTS event_edge_hashes(
- event_id TEXT,
- prev_event_id TEXT,
- algorithm TEXT,
- hash BLOB,
- CONSTRAINT uniqueness UNIQUE (
- event_id, prev_event_id, algorithm
- )
+ event_id VARCHAR(150),
+ prev_event_id VARCHAR(150),
+ algorithm VARCHAR(150),
+ hash LONGBLOB,
+ UNIQUE (event_id, prev_event_id, algorithm)
);
-CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes(
- event_id
-);
+CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes(event_id);
diff --git a/synapse/storage/schema/full_schemas/11/im.sql b/synapse/storage/schema/full_schemas/11/im.sql
index dd00c1cd2f..9c47f51742 100644
--- a/synapse/storage/schema/full_schemas/11/im.sql
+++ b/synapse/storage/schema/full_schemas/11/im.sql
@@ -15,56 +15,54 @@
CREATE TABLE IF NOT EXISTS events(
stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT,
- topological_ordering INTEGER NOT NULL,
- event_id TEXT NOT NULL,
- type TEXT NOT NULL,
- room_id TEXT NOT NULL,
- content TEXT NOT NULL,
- unrecognized_keys TEXT,
+ topological_ordering BIGINT UNSIGNED NOT NULL,
+ event_id VARCHAR(150) NOT NULL,
+ type VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ content LONGBLOB NOT NULL,
+ unrecognized_keys LONGBLOB,
processed BOOL NOT NULL,
outlier BOOL NOT NULL,
- depth INTEGER DEFAULT 0 NOT NULL,
- CONSTRAINT ev_uniq UNIQUE (event_id)
+ depth BIGINT UNSIGNED DEFAULT 0 NOT NULL,
+ UNIQUE (event_id)
);
-CREATE INDEX IF NOT EXISTS events_event_id ON events (event_id);
CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering);
CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering);
CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id);
CREATE TABLE IF NOT EXISTS event_json(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- internal_metadata NOT NULL,
- json BLOB NOT NULL,
- CONSTRAINT ev_j_uniq UNIQUE (event_id)
+ event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ internal_metadata LONGBLOB NOT NULL,
+ json LONGBLOB NOT NULL,
+ UNIQUE (event_id)
);
-CREATE INDEX IF NOT EXISTS event_json_id ON event_json(event_id);
CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id);
CREATE TABLE IF NOT EXISTS state_events(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- type TEXT NOT NULL,
- state_key TEXT NOT NULL,
- prev_state TEXT
+ event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ type VARCHAR(150) NOT NULL,
+ state_key VARCHAR(150) NOT NULL,
+ prev_state VARCHAR(150),
+ UNIQUE (event_id)
);
-CREATE UNIQUE INDEX IF NOT EXISTS state_events_event_id ON state_events (event_id);
CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id);
CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type);
CREATE INDEX IF NOT EXISTS state_events_state_key ON state_events (state_key);
CREATE TABLE IF NOT EXISTS current_state_events(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- type TEXT NOT NULL,
- state_key TEXT NOT NULL,
- CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
+ event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ type VARCHAR(150) NOT NULL,
+ state_key VARCHAR(150) NOT NULL,
+ UNIQUE (room_id, type, state_key)
);
CREATE INDEX IF NOT EXISTS curr_events_event_id ON current_state_events (event_id);
@@ -73,11 +71,11 @@ CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (ty
CREATE INDEX IF NOT EXISTS current_state_events_state_key ON current_state_events (state_key);
CREATE TABLE IF NOT EXISTS room_memberships(
- event_id TEXT NOT NULL,
- user_id TEXT NOT NULL,
- sender TEXT NOT NULL,
- room_id TEXT NOT NULL,
- membership TEXT NOT NULL
+ event_id VARCHAR(150) NOT NULL,
+ user_id VARCHAR(150) NOT NULL,
+ sender VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ membership VARCHAR(150) NOT NULL
);
CREATE INDEX IF NOT EXISTS room_memberships_event_id ON room_memberships (event_id);
@@ -85,16 +83,16 @@ CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id
CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id);
CREATE TABLE IF NOT EXISTS feedback(
- event_id TEXT NOT NULL,
- feedback_type TEXT,
- target_event_id TEXT,
- sender TEXT,
- room_id TEXT
+ event_id VARCHAR(150) NOT NULL,
+ feedback_type VARCHAR(150),
+ target_event_id VARCHAR(150),
+ sender VARCHAR(150),
+ room_id VARCHAR(150)
);
CREATE TABLE IF NOT EXISTS topics(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
+ event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
topic TEXT NOT NULL
);
@@ -102,8 +100,8 @@ CREATE INDEX IF NOT EXISTS topics_event_id ON topics(event_id);
CREATE INDEX IF NOT EXISTS topics_room_id ON topics(room_id);
CREATE TABLE IF NOT EXISTS room_names(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
+ event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
name TEXT NOT NULL
);
@@ -111,15 +109,15 @@ CREATE INDEX IF NOT EXISTS room_names_event_id ON room_names(event_id);
CREATE INDEX IF NOT EXISTS room_names_room_id ON room_names(room_id);
CREATE TABLE IF NOT EXISTS rooms(
- room_id TEXT PRIMARY KEY NOT NULL,
- is_public INTEGER,
- creator TEXT
+ room_id VARCHAR(150) PRIMARY KEY NOT NULL,
+ is_public BOOL,
+ creator VARCHAR(150)
);
CREATE TABLE IF NOT EXISTS room_hosts(
- room_id TEXT NOT NULL,
- host TEXT NOT NULL,
- CONSTRAINT room_hosts_uniq UNIQUE (room_id, host) ON CONFLICT IGNORE
+ room_id VARCHAR(150) NOT NULL,
+ host VARCHAR(150) NOT NULL,
+ UNIQUE (room_id, host)
);
CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id);
diff --git a/synapse/storage/schema/full_schemas/11/keys.sql b/synapse/storage/schema/full_schemas/11/keys.sql
index a9e0a4fe0d..35f141c288 100644
--- a/synapse/storage/schema/full_schemas/11/keys.sql
+++ b/synapse/storage/schema/full_schemas/11/keys.sql
@@ -13,19 +13,19 @@
* limitations under the License.
*/
CREATE TABLE IF NOT EXISTS server_tls_certificates(
- server_name TEXT, -- Server name.
- fingerprint TEXT, -- Certificate fingerprint.
- from_server TEXT, -- Which key server the certificate was fetched from.
- ts_added_ms INTEGER, -- When the certifcate was added.
- tls_certificate BLOB, -- DER encoded x509 certificate.
- CONSTRAINT uniqueness UNIQUE (server_name, fingerprint)
+ server_name VARCHAR(150), -- Server name.
+ fingerprint VARCHAR(150), -- Certificate fingerprint.
+ from_server VARCHAR(150), -- Which key server the certificate was fetched from.
+ ts_added_ms BIGINT UNSIGNED, -- When the certifcate was added.
+ tls_certificate LONGBLOB, -- DER encoded x509 certificate.
+ UNIQUE (server_name, fingerprint)
);
CREATE TABLE IF NOT EXISTS server_signature_keys(
- server_name TEXT, -- Server name.
- key_id TEXT, -- Key version.
- from_server TEXT, -- Which key server the key was fetched form.
- ts_added_ms INTEGER, -- When the key was added.
- verify_key BLOB, -- NACL verification key.
- CONSTRAINT uniqueness UNIQUE (server_name, key_id)
+ server_name VARCHAR(150), -- Server name.
+ key_id VARCHAR(150), -- Key version.
+ from_server VARCHAR(150), -- Which key server the key was fetched form.
+ ts_added_ms BIGINT UNSIGNED, -- When the key was added.
+ verify_key LONGBLOB, -- NACL verification key.
+ UNIQUE (server_name, key_id)
);
diff --git a/synapse/storage/schema/full_schemas/11/media_repository.sql b/synapse/storage/schema/full_schemas/11/media_repository.sql
index afdf48cbfb..134e7fbcec 100644
--- a/synapse/storage/schema/full_schemas/11/media_repository.sql
+++ b/synapse/storage/schema/full_schemas/11/media_repository.sql
@@ -14,23 +14,23 @@
*/
CREATE TABLE IF NOT EXISTS local_media_repository (
- media_id TEXT, -- The id used to refer to the media.
- media_type TEXT, -- The MIME-type of the media.
+ media_id VARCHAR(150), -- The id used to refer to the media.
+ media_type VARCHAR(150), -- The MIME-type of the media.
media_length INTEGER, -- Length of the media in bytes.
- created_ts INTEGER, -- When the content was uploaded in ms.
- upload_name TEXT, -- The name the media was uploaded with.
- user_id TEXT, -- The user who uploaded the file.
- CONSTRAINT uniqueness UNIQUE (media_id)
+ created_ts BIGINT UNSIGNED, -- When the content was uploaded in ms.
+ upload_name VARCHAR(150), -- The name the media was uploaded with.
+ user_id VARCHAR(150), -- The user who uploaded the file.
+ UNIQUE (media_id)
);
CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
- media_id TEXT, -- The id used to refer to the media.
+ media_id VARCHAR(150), -- The id used to refer to the media.
thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
- thumbnail_type TEXT, -- The MIME-type of the thumbnail.
- thumbnail_method TEXT, -- The method used to make the thumbnail.
+ thumbnail_type VARCHAR(150), -- The MIME-type of the thumbnail.
+ thumbnail_method VARCHAR(150), -- The method used to make the thumbnail.
thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
- CONSTRAINT uniqueness UNIQUE (
+ UNIQUE (
media_id, thumbnail_width, thumbnail_height, thumbnail_type
)
);
@@ -39,30 +39,27 @@ CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id
ON local_media_repository_thumbnails (media_id);
CREATE TABLE IF NOT EXISTS remote_media_cache (
- media_origin TEXT, -- The remote HS the media came from.
- media_id TEXT, -- The id used to refer to the media on that server.
- media_type TEXT, -- The MIME-type of the media.
- created_ts INTEGER, -- When the content was uploaded in ms.
- upload_name TEXT, -- The name the media was uploaded with.
+ media_origin VARCHAR(150), -- The remote HS the media came from.
+ media_id VARCHAR(150), -- The id used to refer to the media on that server.
+ media_type VARCHAR(150), -- The MIME-type of the media.
+ created_ts BIGINT UNSIGNED, -- When the content was uploaded in ms.
+ upload_name VARCHAR(150), -- The name the media was uploaded with.
media_length INTEGER, -- Length of the media in bytes.
- filesystem_id TEXT, -- The name used to store the media on disk.
- CONSTRAINT uniqueness UNIQUE (media_origin, media_id)
+ filesystem_id VARCHAR(150), -- The name used to store the media on disk.
+ UNIQUE (media_origin, media_id)
);
CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
- media_origin TEXT, -- The remote HS the media came from.
- media_id TEXT, -- The id used to refer to the media.
+ media_origin VARCHAR(150), -- The remote HS the media came from.
+ media_id VARCHAR(150), -- The id used to refer to the media.
thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
- thumbnail_method TEXT, -- The method used to make the thumbnail
- thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+ thumbnail_method VARCHAR(150), -- The method used to make the thumbnail
+ thumbnail_type VARCHAR(150), -- The MIME-type of the thumbnail.
thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
- filesystem_id TEXT, -- The name used to store the media on disk.
- CONSTRAINT uniqueness UNIQUE (
+ filesystem_id VARCHAR(150), -- The name used to store the media on disk.
+ UNIQUE (
media_origin, media_id, thumbnail_width, thumbnail_height,
- thumbnail_type, thumbnail_type
- )
+ thumbnail_type
+ )
);
-
-CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
- ON local_media_repository_thumbnails (media_id);
diff --git a/synapse/storage/schema/full_schemas/11/presence.sql b/synapse/storage/schema/full_schemas/11/presence.sql
index f9f8db9697..c617ebea73 100644
--- a/synapse/storage/schema/full_schemas/11/presence.sql
+++ b/synapse/storage/schema/full_schemas/11/presence.sql
@@ -13,26 +13,23 @@
* limitations under the License.
*/
CREATE TABLE IF NOT EXISTS presence(
- user_id INTEGER NOT NULL,
- state INTEGER,
- status_msg TEXT,
- mtime INTEGER, -- miliseconds since last state change
- FOREIGN KEY(user_id) REFERENCES users(id)
+ user_id VARCHAR(150) NOT NULL,
+ state VARCHAR(20),
+ status_msg VARCHAR(150),
+ mtime BIGINT UNSIGNED -- miliseconds since last state change
);
-- For each of /my/ users which possibly-remote users are allowed to see their
-- presence state
CREATE TABLE IF NOT EXISTS presence_allow_inbound(
- observed_user_id INTEGER NOT NULL,
- observer_user_id TEXT, -- a UserID,
- FOREIGN KEY(observed_user_id) REFERENCES users(id)
+ observed_user_id VARCHAR(150) NOT NULL,
+ observer_user_id VARCHAR(150) NOT NULL -- a UserID,
);
-- For each of /my/ users (watcher), which possibly-remote users are they
-- watching?
CREATE TABLE IF NOT EXISTS presence_list(
- user_id INTEGER NOT NULL,
- observed_user_id TEXT, -- a UserID,
- accepted BOOLEAN,
- FOREIGN KEY(user_id) REFERENCES users(id)
+ user_id VARCHAR(150) NOT NULL,
+ observed_user_id VARCHAR(150) NOT NULL, -- a UserID,
+ accepted BOOLEAN NOT NULL
);
diff --git a/synapse/storage/schema/full_schemas/11/profiles.sql b/synapse/storage/schema/full_schemas/11/profiles.sql
index f06a528b4d..ffe75edf9f 100644
--- a/synapse/storage/schema/full_schemas/11/profiles.sql
+++ b/synapse/storage/schema/full_schemas/11/profiles.sql
@@ -13,8 +13,7 @@
* limitations under the License.
*/
CREATE TABLE IF NOT EXISTS profiles(
- user_id INTEGER NOT NULL,
- displayname TEXT,
- avatar_url TEXT,
- FOREIGN KEY(user_id) REFERENCES users(id)
+ user_id VARCHAR(150) NOT NULL,
+ displayname VARCHAR(150),
+ avatar_url VARCHAR(150)
);
diff --git a/synapse/storage/schema/full_schemas/11/redactions.sql b/synapse/storage/schema/full_schemas/11/redactions.sql
index 5011d95db8..b81451eab4 100644
--- a/synapse/storage/schema/full_schemas/11/redactions.sql
+++ b/synapse/storage/schema/full_schemas/11/redactions.sql
@@ -13,9 +13,9 @@
* limitations under the License.
*/
CREATE TABLE IF NOT EXISTS redactions (
- event_id TEXT NOT NULL,
- redacts TEXT NOT NULL,
- CONSTRAINT ev_uniq UNIQUE (event_id)
+ event_id VARCHAR(150) NOT NULL,
+ redacts VARCHAR(150) NOT NULL,
+ UNIQUE (event_id)
);
CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id);
diff --git a/synapse/storage/schema/full_schemas/11/room_aliases.sql b/synapse/storage/schema/full_schemas/11/room_aliases.sql
index 0d2df01603..6226913227 100644
--- a/synapse/storage/schema/full_schemas/11/room_aliases.sql
+++ b/synapse/storage/schema/full_schemas/11/room_aliases.sql
@@ -14,14 +14,11 @@
*/
CREATE TABLE IF NOT EXISTS room_aliases(
- room_alias TEXT NOT NULL,
- room_id TEXT NOT NULL
+ room_alias VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL
);
CREATE TABLE IF NOT EXISTS room_alias_servers(
- room_alias TEXT NOT NULL,
- server TEXT NOT NULL
+ room_alias VARCHAR(150) NOT NULL,
+ server VARCHAR(150) NOT NULL
);
-
-
-
diff --git a/synapse/storage/schema/full_schemas/11/state.sql b/synapse/storage/schema/full_schemas/11/state.sql
index 1fe8f1e430..737c3e35c7 100644
--- a/synapse/storage/schema/full_schemas/11/state.sql
+++ b/synapse/storage/schema/full_schemas/11/state.sql
@@ -14,34 +14,27 @@
*/
CREATE TABLE IF NOT EXISTS state_groups(
- id INTEGER PRIMARY KEY,
- room_id TEXT NOT NULL,
- event_id TEXT NOT NULL
+ id VARCHAR(20) PRIMARY KEY,
+ room_id VARCHAR(150) NOT NULL,
+ event_id VARCHAR(150) NOT NULL
);
CREATE TABLE IF NOT EXISTS state_groups_state(
- state_group INTEGER NOT NULL,
- room_id TEXT NOT NULL,
- type TEXT NOT NULL,
- state_key TEXT NOT NULL,
- event_id TEXT NOT NULL
+ state_group VARCHAR(20) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ type VARCHAR(150) NOT NULL,
+ state_key VARCHAR(150) NOT NULL,
+ event_id VARCHAR(150) NOT NULL
);
CREATE TABLE IF NOT EXISTS event_to_state_groups(
- event_id TEXT NOT NULL,
- state_group INTEGER NOT NULL,
- CONSTRAINT event_to_state_groups_uniq UNIQUE (event_id)
+ event_id VARCHAR(150) NOT NULL,
+ state_group VARCHAR(150) NOT NULL,
+ UNIQUE (event_id)
);
CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id);
-CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state(
- state_group
-);
-CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state(
- room_id, type, state_key
-);
-
-CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups(
- event_id
-);
\ No newline at end of file
+CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state(state_group);
+CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state(room_id, type, state_key);
+CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups(event_id);
\ No newline at end of file
diff --git a/synapse/storage/schema/full_schemas/11/transactions.sql b/synapse/storage/schema/full_schemas/11/transactions.sql
index 2d30f99b06..c2fab10aa0 100644
--- a/synapse/storage/schema/full_schemas/11/transactions.sql
+++ b/synapse/storage/schema/full_schemas/11/transactions.sql
@@ -14,16 +14,15 @@
*/
-- Stores what transaction ids we have received and what our response was
CREATE TABLE IF NOT EXISTS received_transactions(
- transaction_id TEXT,
- origin TEXT,
- ts INTEGER,
+ transaction_id VARCHAR(150),
+ origin VARCHAR(150),
+ ts BIGINT UNSIGNED,
response_code INTEGER,
- response_json TEXT,
+ response_json LONGBLOB,
has_been_referenced BOOL default 0, -- Whether thishas been referenced by a prev_tx
- CONSTRAINT uniquesss UNIQUE (transaction_id, origin) ON CONFLICT REPLACE
+ UNIQUE (transaction_id, origin)
);
-CREATE UNIQUE INDEX IF NOT EXISTS transactions_txid ON received_transactions(transaction_id, origin);
CREATE INDEX IF NOT EXISTS transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0;
@@ -31,17 +30,14 @@ CREATE INDEX IF NOT EXISTS transactions_have_ref ON received_transactions(origin
-- since referenced the transaction in another outgoing transaction
CREATE TABLE IF NOT EXISTS sent_transactions(
id INTEGER PRIMARY KEY AUTOINCREMENT, -- This is used to apply insertion ordering
- transaction_id TEXT,
- destination TEXT,
+ transaction_id VARCHAR(150),
+ destination VARCHAR(150),
response_code INTEGER DEFAULT 0,
- response_json TEXT,
- ts INTEGER
+ response_json LONGBLOB,
+ ts BIGINT UNSIGNED
);
CREATE INDEX IF NOT EXISTS sent_transaction_dest ON sent_transactions(destination);
-CREATE INDEX IF NOT EXISTS sent_transaction_dest_referenced ON sent_transactions(
- destination
-);
CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id);
-- So that we can do an efficient look up of all transactions that have yet to be successfully
-- sent.
@@ -51,18 +47,17 @@ CREATE INDEX IF NOT EXISTS sent_transaction_sent ON sent_transactions(response_c
-- For sent transactions only.
CREATE TABLE IF NOT EXISTS transaction_id_to_pdu(
transaction_id INTEGER,
- destination TEXT,
- pdu_id TEXT,
- pdu_origin TEXT
+ destination VARCHAR(150),
+ pdu_id VARCHAR(150),
+ pdu_origin VARCHAR(150)
);
CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(transaction_id, destination);
CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
-CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination);
-- To track destination health
CREATE TABLE IF NOT EXISTS destinations(
- destination TEXT PRIMARY KEY,
- retry_last_ts INTEGER,
+ destination VARCHAR(150) PRIMARY KEY,
+ retry_last_ts BIGINT UNSIGNED,
retry_interval INTEGER
);
diff --git a/synapse/storage/schema/full_schemas/11/users.sql b/synapse/storage/schema/full_schemas/11/users.sql
index 08ccfdac0a..0ddfccd410 100644
--- a/synapse/storage/schema/full_schemas/11/users.sql
+++ b/synapse/storage/schema/full_schemas/11/users.sql
@@ -14,32 +14,30 @@
*/
CREATE TABLE IF NOT EXISTS users(
id INTEGER PRIMARY KEY AUTOINCREMENT,
- name TEXT,
- password_hash TEXT,
- creation_ts INTEGER,
+ name VARCHAR(150),
+ password_hash VARCHAR(150),
+ creation_ts BIGINT UNSIGNED,
admin BOOL DEFAULT 0 NOT NULL,
- UNIQUE(name) ON CONFLICT ROLLBACK
+ UNIQUE(name)
);
CREATE TABLE IF NOT EXISTS access_tokens(
id INTEGER PRIMARY KEY AUTOINCREMENT,
- user_id INTEGER NOT NULL,
- device_id TEXT,
- token TEXT NOT NULL,
- last_used INTEGER,
- FOREIGN KEY(user_id) REFERENCES users(id),
- UNIQUE(token) ON CONFLICT ROLLBACK
+ user_id VARCHAR(150) NOT NULL,
+ device_id VARCHAR(150),
+ token VARCHAR(150) NOT NULL,
+ last_used BIGINT UNSIGNED,
+ UNIQUE(token)
);
CREATE TABLE IF NOT EXISTS user_ips (
- user TEXT NOT NULL,
- access_token TEXT NOT NULL,
- device_id TEXT,
- ip TEXT NOT NULL,
- user_agent TEXT NOT NULL,
- last_seen INTEGER NOT NULL,
- CONSTRAINT user_ip UNIQUE (user, access_token, ip, user_agent) ON CONFLICT REPLACE
+ user VARCHAR(150) NOT NULL,
+ access_token VARCHAR(150) NOT NULL,
+ device_id VARCHAR(150),
+ ip VARCHAR(150) NOT NULL,
+ user_agent VARCHAR(150) NOT NULL,
+ last_seen BIGINT UNSIGNED NOT NULL,
+ UNIQUE (user, access_token, ip, user_agent)
);
CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user);
-
diff --git a/synapse/storage/schema/full_schemas/16/application_services.sql b/synapse/storage/schema/full_schemas/16/application_services.sql
new file mode 100644
index 0000000000..f08c5bcf76
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/application_services.sql
@@ -0,0 +1,48 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS application_services(
+ id BIGINT PRIMARY KEY,
+ url VARCHAR(150),
+ token VARCHAR(150),
+ hs_token VARCHAR(150),
+ sender VARCHAR(150),
+ UNIQUE(token)
+);
+
+CREATE TABLE IF NOT EXISTS application_services_regex(
+ id BIGINT PRIMARY KEY,
+ as_id BIGINT NOT NULL,
+ namespace INTEGER, /* enum[room_id|room_alias|user_id] */
+ regex VARCHAR(150),
+ FOREIGN KEY(as_id) REFERENCES application_services(id)
+);
+
+CREATE TABLE IF NOT EXISTS application_services_state(
+ as_id VARCHAR(150) PRIMARY KEY,
+ state VARCHAR(5),
+ last_txn INTEGER
+);
+
+CREATE TABLE IF NOT EXISTS application_services_txns(
+ as_id VARCHAR(150) NOT NULL,
+ txn_id INTEGER NOT NULL,
+ event_ids bytea NOT NULL,
+ UNIQUE(as_id, txn_id)
+);
+
+CREATE INDEX application_services_txns_id ON application_services_txns (
+ as_id
+);
diff --git a/synapse/storage/schema/full_schemas/16/event_edges.sql b/synapse/storage/schema/full_schemas/16/event_edges.sql
new file mode 100644
index 0000000000..05d0874f0d
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/event_edges.sql
@@ -0,0 +1,89 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS event_forward_extremities(
+ event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ UNIQUE (event_id, room_id)
+);
+
+CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id);
+CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_backward_extremities(
+ event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ UNIQUE (event_id, room_id)
+);
+
+CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id);
+CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_edges(
+ event_id VARCHAR(150) NOT NULL,
+ prev_event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ is_state BOOL NOT NULL,
+ UNIQUE (event_id, prev_event_id, room_id, is_state)
+);
+
+CREATE INDEX ev_edges_id ON event_edges(event_id);
+CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id);
+
+
+CREATE TABLE IF NOT EXISTS room_depth(
+ room_id VARCHAR(150) NOT NULL,
+ min_depth INTEGER NOT NULL,
+ UNIQUE (room_id)
+);
+
+CREATE INDEX room_depth_room ON room_depth(room_id);
+
+
+create TABLE IF NOT EXISTS event_destinations(
+ event_id VARCHAR(150) NOT NULL,
+ destination VARCHAR(150) NOT NULL,
+ delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered
+ UNIQUE (event_id, destination)
+);
+
+CREATE INDEX event_destinations_id ON event_destinations(event_id);
+
+
+CREATE TABLE IF NOT EXISTS state_forward_extremities(
+ event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ type VARCHAR(150) NOT NULL,
+ state_key VARCHAR(150) NOT NULL,
+ UNIQUE (event_id, room_id)
+);
+
+CREATE INDEX st_extrem_keys ON state_forward_extremities(
+ room_id, type, state_key
+);
+CREATE INDEX st_extrem_id ON state_forward_extremities(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_auth(
+ event_id VARCHAR(150) NOT NULL,
+ auth_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ UNIQUE (event_id, auth_id, room_id)
+);
+
+CREATE INDEX evauth_edges_id ON event_auth(event_id);
+CREATE INDEX evauth_edges_auth_id ON event_auth(auth_id);
diff --git a/synapse/storage/schema/full_schemas/16/event_signatures.sql b/synapse/storage/schema/full_schemas/16/event_signatures.sql
new file mode 100644
index 0000000000..4291827368
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/event_signatures.sql
@@ -0,0 +1,55 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS event_content_hashes (
+ event_id VARCHAR(150),
+ algorithm VARCHAR(150),
+ hash bytea,
+ UNIQUE (event_id, algorithm)
+);
+
+CREATE INDEX event_content_hashes_id ON event_content_hashes(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_reference_hashes (
+ event_id VARCHAR(150),
+ algorithm VARCHAR(150),
+ hash bytea,
+ UNIQUE (event_id, algorithm)
+);
+
+CREATE INDEX event_reference_hashes_id ON event_reference_hashes(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_signatures (
+ event_id VARCHAR(150),
+ signature_name VARCHAR(150),
+ key_id VARCHAR(150),
+ signature bytea,
+ UNIQUE (event_id, signature_name, key_id)
+);
+
+CREATE INDEX event_signatures_id ON event_signatures(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_edge_hashes(
+ event_id VARCHAR(150),
+ prev_event_id VARCHAR(150),
+ algorithm VARCHAR(150),
+ hash bytea,
+ UNIQUE (event_id, prev_event_id, algorithm)
+);
+
+CREATE INDEX event_edge_hashes_id ON event_edge_hashes(event_id);
diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql
new file mode 100644
index 0000000000..a661fc160c
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/im.sql
@@ -0,0 +1,124 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS events(
+ stream_ordering BIGINT PRIMARY KEY,
+ topological_ordering BIGINT NOT NULL,
+ event_id VARCHAR(150) NOT NULL,
+ type VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ content bytea NOT NULL,
+ unrecognized_keys bytea,
+ processed BOOL NOT NULL,
+ outlier BOOL NOT NULL,
+ depth BIGINT DEFAULT 0 NOT NULL,
+ UNIQUE (event_id)
+);
+
+CREATE INDEX events_stream_ordering ON events (stream_ordering);
+CREATE INDEX events_topological_ordering ON events (topological_ordering);
+CREATE INDEX events_room_id ON events (room_id);
+
+
+CREATE TABLE IF NOT EXISTS event_json(
+ event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ internal_metadata bytea NOT NULL,
+ json bytea NOT NULL,
+ UNIQUE (event_id)
+);
+
+CREATE INDEX event_json_room_id ON event_json(room_id);
+
+
+CREATE TABLE IF NOT EXISTS state_events(
+ event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ type VARCHAR(150) NOT NULL,
+ state_key VARCHAR(150) NOT NULL,
+ prev_state VARCHAR(150),
+ UNIQUE (event_id)
+);
+
+CREATE INDEX state_events_room_id ON state_events (room_id);
+CREATE INDEX state_events_type ON state_events (type);
+CREATE INDEX state_events_state_key ON state_events (state_key);
+
+
+CREATE TABLE IF NOT EXISTS current_state_events(
+ event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ type VARCHAR(150) NOT NULL,
+ state_key VARCHAR(150) NOT NULL,
+ UNIQUE (event_id),
+ UNIQUE (room_id, type, state_key)
+);
+
+CREATE INDEX current_state_events_room_id ON current_state_events (room_id);
+CREATE INDEX current_state_events_type ON current_state_events (type);
+CREATE INDEX current_state_events_state_key ON current_state_events (state_key);
+
+CREATE TABLE IF NOT EXISTS room_memberships(
+ event_id VARCHAR(150) NOT NULL,
+ user_id VARCHAR(150) NOT NULL,
+ sender VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ membership VARCHAR(150) NOT NULL,
+ UNIQUE (event_id)
+);
+
+CREATE INDEX room_memberships_room_id ON room_memberships (room_id);
+CREATE INDEX room_memberships_user_id ON room_memberships (user_id);
+
+CREATE TABLE IF NOT EXISTS feedback(
+ event_id VARCHAR(150) NOT NULL,
+ feedback_type VARCHAR(150),
+ target_event_id VARCHAR(150),
+ sender VARCHAR(150),
+ room_id VARCHAR(150),
+ UNIQUE (event_id)
+);
+
+CREATE TABLE IF NOT EXISTS topics(
+ event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ topic TEXT NOT NULL,
+ UNIQUE (event_id)
+);
+
+CREATE INDEX topics_room_id ON topics(room_id);
+
+CREATE TABLE IF NOT EXISTS room_names(
+ event_id VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ name TEXT NOT NULL,
+ UNIQUE (event_id)
+);
+
+CREATE INDEX room_names_room_id ON room_names(room_id);
+
+CREATE TABLE IF NOT EXISTS rooms(
+ room_id VARCHAR(150) PRIMARY KEY NOT NULL,
+ is_public BOOL,
+ creator VARCHAR(150)
+);
+
+CREATE TABLE IF NOT EXISTS room_hosts(
+ room_id VARCHAR(150) NOT NULL,
+ host VARCHAR(150) NOT NULL,
+ UNIQUE (room_id, host)
+);
+
+CREATE INDEX room_hosts_room_id ON room_hosts (room_id);
diff --git a/synapse/storage/schema/full_schemas/16/keys.sql b/synapse/storage/schema/full_schemas/16/keys.sql
new file mode 100644
index 0000000000..459b510427
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/keys.sql
@@ -0,0 +1,31 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS server_tls_certificates(
+ server_name VARCHAR(150), -- Server name.
+ fingerprint VARCHAR(150), -- Certificate fingerprint.
+ from_server VARCHAR(150), -- Which key server the certificate was fetched from.
+ ts_added_ms BIGINT, -- When the certifcate was added.
+ tls_certificate bytea, -- DER encoded x509 certificate.
+ UNIQUE (server_name, fingerprint)
+);
+
+CREATE TABLE IF NOT EXISTS server_signature_keys(
+ server_name VARCHAR(150), -- Server name.
+ key_id VARCHAR(150), -- Key version.
+ from_server VARCHAR(150), -- Which key server the key was fetched form.
+ ts_added_ms BIGINT, -- When the key was added.
+ verify_key bytea, -- NACL verification key.
+ UNIQUE (server_name, key_id)
+);
diff --git a/synapse/storage/schema/full_schemas/16/media_repository.sql b/synapse/storage/schema/full_schemas/16/media_repository.sql
new file mode 100644
index 0000000000..0e819fca38
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/media_repository.sql
@@ -0,0 +1,68 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS local_media_repository (
+ media_id VARCHAR(150), -- The id used to refer to the media.
+ media_type VARCHAR(150), -- The MIME-type of the media.
+ media_length INTEGER, -- Length of the media in bytes.
+ created_ts BIGINT, -- When the content was uploaded in ms.
+ upload_name VARCHAR(150), -- The name the media was uploaded with.
+ user_id VARCHAR(150), -- The user who uploaded the file.
+ UNIQUE (media_id)
+);
+
+CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
+ media_id VARCHAR(150), -- The id used to refer to the media.
+ thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+ thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+ thumbnail_type VARCHAR(150), -- The MIME-type of the thumbnail.
+ thumbnail_method VARCHAR(150), -- The method used to make the thumbnail.
+ thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+ UNIQUE (
+ media_id, thumbnail_width, thumbnail_height, thumbnail_type
+ )
+);
+
+CREATE INDEX local_media_repository_thumbnails_media_id
+ ON local_media_repository_thumbnails (media_id);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache (
+ media_origin VARCHAR(150), -- The remote HS the media came from.
+ media_id VARCHAR(150), -- The id used to refer to the media on that server.
+ media_type VARCHAR(150), -- The MIME-type of the media.
+ created_ts BIGINT, -- When the content was uploaded in ms.
+ upload_name VARCHAR(150), -- The name the media was uploaded with.
+ media_length INTEGER, -- Length of the media in bytes.
+ filesystem_id VARCHAR(150), -- The name used to store the media on disk.
+ UNIQUE (media_origin, media_id)
+);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
+ media_origin VARCHAR(150), -- The remote HS the media came from.
+ media_id VARCHAR(150), -- The id used to refer to the media.
+ thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+ thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+ thumbnail_method VARCHAR(150), -- The method used to make the thumbnail
+ thumbnail_type VARCHAR(150), -- The MIME-type of the thumbnail.
+ thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+ filesystem_id VARCHAR(150), -- The name used to store the media on disk.
+ UNIQUE (
+ media_origin, media_id, thumbnail_width, thumbnail_height,
+ thumbnail_type
+ )
+);
+
+CREATE INDEX remote_media_cache_thumbnails_media_id
+ ON remote_media_cache_thumbnails (media_id);
diff --git a/synapse/storage/schema/full_schemas/16/presence.sql b/synapse/storage/schema/full_schemas/16/presence.sql
new file mode 100644
index 0000000000..9c41be296e
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/presence.sql
@@ -0,0 +1,40 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS presence(
+ user_id VARCHAR(150) NOT NULL,
+ state VARCHAR(20),
+ status_msg VARCHAR(150),
+ mtime BIGINT, -- miliseconds since last state change
+ UNIQUE (user_id)
+);
+
+-- For each of /my/ users which possibly-remote users are allowed to see their
+-- presence state
+CREATE TABLE IF NOT EXISTS presence_allow_inbound(
+ observed_user_id VARCHAR(150) NOT NULL,
+ observer_user_id VARCHAR(150) NOT NULL, -- a UserID,
+ UNIQUE (observed_user_id, observer_user_id)
+);
+
+-- For each of /my/ users (watcher), which possibly-remote users are they
+-- watching?
+CREATE TABLE IF NOT EXISTS presence_list(
+ user_id VARCHAR(150) NOT NULL,
+ observed_user_id VARCHAR(150) NOT NULL, -- a UserID,
+ accepted BOOLEAN NOT NULL,
+ UNIQUE (user_id, observed_user_id)
+);
+
+CREATE INDEX presence_list_user_id ON presence_list (user_id);
diff --git a/synapse/storage/schema/full_schemas/16/profiles.sql b/synapse/storage/schema/full_schemas/16/profiles.sql
new file mode 100644
index 0000000000..21c58a99bc
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/profiles.sql
@@ -0,0 +1,20 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS profiles(
+ user_id VARCHAR(150) NOT NULL,
+ displayname VARCHAR(150),
+ avatar_url VARCHAR(150),
+ UNIQUE(user_id)
+);
diff --git a/synapse/storage/schema/full_schemas/16/push.sql b/synapse/storage/schema/full_schemas/16/push.sql
new file mode 100644
index 0000000000..5c0c7bc201
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/push.sql
@@ -0,0 +1,73 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS rejections(
+ event_id VARCHAR(150) NOT NULL,
+ reason VARCHAR(150) NOT NULL,
+ last_check VARCHAR(150) NOT NULL,
+ UNIQUE (event_id)
+);
+
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+ id BIGINT PRIMARY KEY,
+ user_name VARCHAR(150) NOT NULL,
+ profile_tag VARCHAR(32) NOT NULL,
+ kind VARCHAR(8) NOT NULL,
+ app_id VARCHAR(64) NOT NULL,
+ app_display_name VARCHAR(64) NOT NULL,
+ device_display_name VARCHAR(128) NOT NULL,
+ pushkey bytea NOT NULL,
+ ts BIGINT NOT NULL,
+ lang VARCHAR(8),
+ data bytea,
+ last_token TEXT,
+ last_success BIGINT,
+ failing_since BIGINT,
+ UNIQUE (app_id, pushkey)
+);
+
+CREATE TABLE IF NOT EXISTS push_rules (
+ id BIGINT PRIMARY KEY,
+ user_name VARCHAR(150) NOT NULL,
+ rule_id VARCHAR(150) NOT NULL,
+ priority_class SMALLINT NOT NULL,
+ priority INTEGER NOT NULL DEFAULT 0,
+ conditions VARCHAR(150) NOT NULL,
+ actions VARCHAR(150) NOT NULL,
+ UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX push_rules_user_name on push_rules (user_name);
+
+CREATE TABLE IF NOT EXISTS user_filters(
+ user_id VARCHAR(150),
+ filter_id BIGINT,
+ filter_json bytea
+);
+
+CREATE INDEX user_filters_by_user_id_filter_id ON user_filters(
+ user_id, filter_id
+);
+
+CREATE TABLE IF NOT EXISTS push_rules_enable (
+ id BIGINT PRIMARY KEY,
+ user_name VARCHAR(150) NOT NULL,
+ rule_id VARCHAR(150) NOT NULL,
+ enabled SMALLINT,
+ UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX push_rules_enable_user_name on push_rules_enable (user_name);
diff --git a/synapse/storage/schema/full_schemas/16/redactions.sql b/synapse/storage/schema/full_schemas/16/redactions.sql
new file mode 100644
index 0000000000..492fd22033
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/redactions.sql
@@ -0,0 +1,22 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS redactions (
+ event_id VARCHAR(150) NOT NULL,
+ redacts VARCHAR(150) NOT NULL,
+ UNIQUE (event_id)
+);
+
+CREATE INDEX redactions_event_id ON redactions (event_id);
+CREATE INDEX redactions_redacts ON redactions (redacts);
diff --git a/synapse/storage/schema/full_schemas/16/room_aliases.sql b/synapse/storage/schema/full_schemas/16/room_aliases.sql
new file mode 100644
index 0000000000..952cae35b7
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/room_aliases.sql
@@ -0,0 +1,25 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS room_aliases(
+ room_alias VARCHAR(150) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ UNIQUE (room_alias)
+);
+
+CREATE TABLE IF NOT EXISTS room_alias_servers(
+ room_alias VARCHAR(150) NOT NULL,
+ server VARCHAR(150) NOT NULL
+);
diff --git a/synapse/storage/schema/full_schemas/16/state.sql b/synapse/storage/schema/full_schemas/16/state.sql
new file mode 100644
index 0000000000..3c54595e64
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/state.sql
@@ -0,0 +1,40 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS state_groups(
+ id BIGINT PRIMARY KEY,
+ room_id VARCHAR(150) NOT NULL,
+ event_id VARCHAR(150) NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS state_groups_state(
+ state_group VARCHAR(20) NOT NULL,
+ room_id VARCHAR(150) NOT NULL,
+ type VARCHAR(150) NOT NULL,
+ state_key VARCHAR(150) NOT NULL,
+ event_id VARCHAR(150) NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS event_to_state_groups(
+ event_id VARCHAR(150) NOT NULL,
+ state_group VARCHAR(150) NOT NULL,
+ UNIQUE (event_id)
+);
+
+CREATE INDEX state_groups_id ON state_groups(id);
+
+CREATE INDEX state_groups_state_id ON state_groups_state(state_group);
+CREATE INDEX state_groups_state_tuple ON state_groups_state(room_id, type, state_key);
+CREATE INDEX event_to_state_groups_id ON event_to_state_groups(event_id);
diff --git a/synapse/storage/schema/full_schemas/16/transactions.sql b/synapse/storage/schema/full_schemas/16/transactions.sql
new file mode 100644
index 0000000000..bc64064936
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/transactions.sql
@@ -0,0 +1,63 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- Stores what transaction ids we have received and what our response was
+CREATE TABLE IF NOT EXISTS received_transactions(
+ transaction_id VARCHAR(150),
+ origin VARCHAR(150),
+ ts BIGINT,
+ response_code INTEGER,
+ response_json bytea,
+ has_been_referenced smallint default 0, -- Whether thishas been referenced by a prev_tx
+ UNIQUE (transaction_id, origin)
+);
+
+CREATE INDEX transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0;
+
+
+-- Stores what transactions we've sent, what their response was (if we got one) and whether we have
+-- since referenced the transaction in another outgoing transaction
+CREATE TABLE IF NOT EXISTS sent_transactions(
+ id BIGINT PRIMARY KEY, -- This is used to apply insertion ordering
+ transaction_id VARCHAR(150),
+ destination VARCHAR(150),
+ response_code INTEGER DEFAULT 0,
+ response_json bytea,
+ ts BIGINT
+);
+
+CREATE INDEX sent_transaction_dest ON sent_transactions(destination);
+CREATE INDEX sent_transaction_txn_id ON sent_transactions(transaction_id);
+-- So that we can do an efficient look up of all transactions that have yet to be successfully
+-- sent.
+CREATE INDEX sent_transaction_sent ON sent_transactions(response_code);
+
+
+-- For sent transactions only.
+CREATE TABLE IF NOT EXISTS transaction_id_to_pdu(
+ transaction_id INTEGER,
+ destination VARCHAR(150),
+ pdu_id VARCHAR(150),
+ pdu_origin VARCHAR(150),
+ UNIQUE (transaction_id, destination)
+);
+
+CREATE INDEX transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
+
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+ destination VARCHAR(150) PRIMARY KEY,
+ retry_last_ts BIGINT,
+ retry_interval INTEGER
+);
diff --git a/synapse/storage/schema/full_schemas/16/users.sql b/synapse/storage/schema/full_schemas/16/users.sql
new file mode 100644
index 0000000000..006b249fc0
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/users.sql
@@ -0,0 +1,42 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS users(
+ name VARCHAR(150),
+ password_hash VARCHAR(150),
+ creation_ts BIGINT,
+ admin SMALLINT DEFAULT 0 NOT NULL,
+ UNIQUE(name)
+);
+
+CREATE TABLE IF NOT EXISTS access_tokens(
+ id BIGINT PRIMARY KEY,
+ user_id VARCHAR(150) NOT NULL,
+ device_id VARCHAR(150),
+ token VARCHAR(150) NOT NULL,
+ last_used BIGINT,
+ UNIQUE(token)
+);
+
+CREATE TABLE IF NOT EXISTS user_ips (
+ user_id VARCHAR(150) NOT NULL,
+ access_token VARCHAR(150) NOT NULL,
+ device_id VARCHAR(150),
+ ip VARCHAR(150) NOT NULL,
+ user_agent VARCHAR(150) NOT NULL,
+ last_seen BIGINT NOT NULL
+);
+
+CREATE INDEX user_ips_user ON user_ips(user_id);
+CREATE INDEX user_ips_user_ip ON user_ips(user_id, access_token, ip);
diff --git a/synapse/storage/schema/schema_version.sql b/synapse/storage/schema/schema_version.sql
index 0431e2d051..d9494611e0 100644
--- a/synapse/storage/schema/schema_version.sql
+++ b/synapse/storage/schema/schema_version.sql
@@ -14,17 +14,14 @@
*/
CREATE TABLE IF NOT EXISTS schema_version(
- Lock char(1) NOT NULL DEFAULT 'X', -- Makes sure this table only has one row.
+ Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
version INTEGER NOT NULL,
upgraded BOOL NOT NULL, -- Whether we reached this version from an upgrade or an initial schema.
- CONSTRAINT schema_version_lock_x CHECK (Lock='X')
- CONSTRAINT schema_version_lock_uniq UNIQUE (Lock)
+ CHECK (Lock='X')
);
CREATE TABLE IF NOT EXISTS applied_schema_deltas(
version INTEGER NOT NULL,
- file TEXT NOT NULL,
- CONSTRAINT schema_deltas_ver_file UNIQUE (version, file) ON CONFLICT IGNORE
+ file VARCHAR(150) NOT NULL,
+ UNIQUE(version, file)
);
-
-CREATE INDEX IF NOT EXISTS schema_deltas_ver ON applied_schema_deltas(version);
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index d0d53770f2..f051828630 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -56,7 +56,6 @@ class SignatureStore(SQLBaseStore):
"algorithm": algorithm,
"hash": buffer(hash_bytes),
},
- or_ignore=True,
)
def get_event_reference_hashes(self, event_ids):
@@ -100,7 +99,7 @@ class SignatureStore(SQLBaseStore):
" WHERE event_id = ?"
)
txn.execute(query, (event_id, ))
- return dict(txn.fetchall())
+ return {k: v for k, v in txn.fetchall()}
def _store_event_reference_hash_txn(self, txn, event_id, algorithm,
hash_bytes):
@@ -119,7 +118,6 @@ class SignatureStore(SQLBaseStore):
"algorithm": algorithm,
"hash": buffer(hash_bytes),
},
- or_ignore=True,
)
def _get_event_signatures_txn(self, txn, event_id):
@@ -164,7 +162,6 @@ class SignatureStore(SQLBaseStore):
"key_id": key_id,
"signature": buffer(signature_bytes),
},
- or_ignore=True,
)
def _get_prev_event_hashes_txn(self, txn, event_id):
@@ -198,5 +195,4 @@ class SignatureStore(SQLBaseStore):
"algorithm": algorithm,
"hash": buffer(hash_bytes),
},
- or_ignore=True,
)
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 58dbf2802b..553ba9dd1f 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -17,6 +17,8 @@ from ._base import SQLBaseStore
from twisted.internet import defer
+from synapse.util.stringutils import random_string
+
import logging
logger = logging.getLogger(__name__)
@@ -91,14 +93,15 @@ class StateStore(SQLBaseStore):
state_group = context.state_group
if not state_group:
- state_group = self._simple_insert_txn(
+ state_group = self._state_groups_id_gen.get_next_txn(txn)
+ self._simple_insert_txn(
txn,
table="state_groups",
values={
+ "id": state_group,
"room_id": event.room_id,
"event_id": event.event_id,
},
- or_ignore=True,
)
for state in state_events.values():
@@ -112,7 +115,6 @@ class StateStore(SQLBaseStore):
"state_key": state.state_key,
"event_id": state.event_id,
},
- or_ignore=True,
)
self._simple_insert_txn(
@@ -122,7 +124,6 @@ class StateStore(SQLBaseStore):
"state_group": state_group,
"event_id": event.event_id,
},
- or_replace=True,
)
@defer.inlineCallbacks
@@ -154,3 +155,7 @@ class StateStore(SQLBaseStore):
events = yield self._parse_events(results)
defer.returnValue(events)
+
+
+def _make_group_id(clock):
+ return str(int(clock.time_msec())) + random_string(5)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 66f307e640..df6de7cbcd 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -35,7 +35,7 @@ what sort order was used:
from twisted.internet import defer
-from ._base import SQLBaseStore, cached
+from ._base import SQLBaseStore
from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.util.logutils import log_function
@@ -110,7 +110,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
if self.topological is None:
return "(%d < %s)" % (self.stream, "stream_ordering")
else:
- return "(%d < %s OR (%d == %s AND %d < %s))" % (
+ return "(%d < %s OR (%d = %s AND %d < %s))" % (
self.topological, "topological_ordering",
self.topological, "topological_ordering",
self.stream, "stream_ordering",
@@ -120,7 +120,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
if self.topological is None:
return "(%d >= %s)" % (self.stream, "stream_ordering")
else:
- return "(%d > %s OR (%d == %s AND %d >= %s))" % (
+ return "(%d > %s OR (%d = %s AND %d >= %s))" % (
self.topological, "topological_ordering",
self.topological, "topological_ordering",
self.stream, "stream_ordering",
@@ -240,7 +240,7 @@ class StreamStore(SQLBaseStore):
sql = (
"SELECT e.event_id, e.stream_ordering FROM events AS e WHERE "
- "(e.outlier = 0 AND (room_id IN (%(current)s)) OR "
+ "(e.outlier = ? AND (room_id IN (%(current)s)) OR "
"(event_id IN (%(invites)s))) "
"AND e.stream_ordering > ? AND e.stream_ordering <= ? "
"ORDER BY stream_ordering ASC LIMIT %(limit)d "
@@ -251,7 +251,7 @@ class StreamStore(SQLBaseStore):
}
def f(txn):
- txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,))
+ txn.execute(sql, (False, user_id, user_id, from_id.stream, to_id.stream,))
rows = self.cursor_to_dict(txn)
@@ -283,7 +283,7 @@ class StreamStore(SQLBaseStore):
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
- args = [room_id]
+ args = [False, room_id]
if direction == 'b':
order = "DESC"
bounds = _StreamToken.parse(from_key).upper_bound()
@@ -307,7 +307,7 @@ class StreamStore(SQLBaseStore):
sql = (
"SELECT * FROM events"
- " WHERE outlier = 0 AND room_id = ? AND %(bounds)s"
+ " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
" ORDER BY topological_ordering %(order)s,"
" stream_ordering %(order)s %(limit)s"
) % {
@@ -358,7 +358,7 @@ class StreamStore(SQLBaseStore):
sql = (
"SELECT stream_ordering, topological_ordering, event_id"
" FROM events"
- " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0"
+ " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?"
" ORDER BY topological_ordering DESC, stream_ordering DESC"
" LIMIT ?"
)
@@ -368,17 +368,17 @@ class StreamStore(SQLBaseStore):
"SELECT stream_ordering, topological_ordering, event_id"
" FROM events"
" WHERE room_id = ? AND stream_ordering > ?"
- " AND stream_ordering <= ? AND outlier = 0"
+ " AND stream_ordering <= ? AND outlier = ?"
" ORDER BY topological_ordering DESC, stream_ordering DESC"
" LIMIT ?"
)
def get_recent_events_for_room_txn(txn):
if from_token is None:
- txn.execute(sql, (room_id, end_token.stream, limit,))
+ txn.execute(sql, (room_id, end_token.stream, False, limit,))
else:
txn.execute(sql, (
- room_id, from_token.stream, end_token.stream, limit
+ room_id, from_token.stream, end_token.stream, False, limit
))
rows = self.cursor_to_dict(txn)
@@ -413,12 +413,10 @@ class StreamStore(SQLBaseStore):
"get_recent_events_for_room", get_recent_events_for_room_txn
)
- @cached(num_args=0)
+ @defer.inlineCallbacks
def get_room_events_max_id(self):
- return self.runInteraction(
- "get_room_events_max_id",
- self._get_room_events_max_id_txn
- )
+ token = yield self._stream_id_gen.get_max_token(self)
+ defer.returnValue("s%d" % (token,))
@defer.inlineCallbacks
def _get_min_token(self):
@@ -433,27 +431,6 @@ class StreamStore(SQLBaseStore):
defer.returnValue(self.min_token)
- def get_next_stream_id(self):
- with self._next_stream_id_lock:
- i = self._next_stream_id
- self._next_stream_id += 1
- return i
-
- def _get_room_events_max_id_txn(self, txn):
- txn.execute(
- "SELECT MAX(stream_ordering) as m FROM events"
- )
-
- res = self.cursor_to_dict(txn)
-
- logger.debug("get_room_events_max_id: %s", res)
-
- if not res or not res[0] or not res[0]["m"]:
- return "s0"
-
- key = res[0]["m"]
- return "s%d" % (key,)
-
@staticmethod
def _set_before_and_after(events, rows):
for event, row in zip(events, rows):
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index b777395e06..4c3dc58662 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore, Table, cached
+from ._base import SQLBaseStore, cached
from collections import namedtuple
@@ -84,13 +84,18 @@ class TransactionStore(SQLBaseStore):
def _set_received_txn_response(self, txn, transaction_id, origin, code,
response_json):
- query = (
- "UPDATE %s "
- "SET response_code = ?, response_json = ? "
- "WHERE transaction_id = ? AND origin = ?"
- ) % ReceivedTransactionsTable.table_name
-
- txn.execute(query, (code, response_json, transaction_id, origin))
+ self._simple_upsert_txn(
+ txn,
+ table=ReceivedTransactionsTable.table_name,
+ keyvalues={
+ "transaction_id": transaction_id,
+ "origin": origin,
+ },
+ values={
+ "response_code": code,
+ "response_json": response_json,
+ }
+ )
def prep_send_transaction(self, transaction_id, destination,
origin_server_ts):
@@ -118,41 +123,38 @@ class TransactionStore(SQLBaseStore):
def _prep_send_transaction(self, txn, transaction_id, destination,
origin_server_ts):
+ next_id = self._transaction_id_gen.get_next_txn(txn)
+
# First we find out what the prev_txns should be.
# Since we know that we are only sending one transaction at a time,
# we can simply take the last one.
- query = "%s ORDER BY id DESC LIMIT 1" % (
- SentTransactions.select_statement("destination = ?"),
- )
+ query = (
+ "SELECT * FROM sent_transactions"
+ " WHERE destination = ?"
+ " ORDER BY id DESC LIMIT 1"
+ )
- results = txn.execute(query, (destination,))
- results = SentTransactions.decode_results(results)
+ txn.execute(query, (destination,))
+ results = self.cursor_to_dict(txn)
- prev_txns = [r.transaction_id for r in results]
+ prev_txns = [r["transaction_id"] for r in results]
# Actually add the new transaction to the sent_transactions table.
- query = SentTransactions.insert_statement()
- txn.execute(query, SentTransactions.EntryType(
- None,
- transaction_id=transaction_id,
- destination=destination,
- ts=origin_server_ts,
- response_code=0,
- response_json=None
- ))
-
- # Update the tx id -> pdu id mapping
-
- # values = [
- # (transaction_id, destination, pdu[0], pdu[1])
- # for pdu in pdu_list
- # ]
- #
- # logger.debug("Inserting: %s", repr(values))
- #
- # query = TransactionsToPduTable.insert_statement()
- # txn.executemany(query, values)
+ self._simple_insert_txn(
+ txn,
+ table=SentTransactions.table_name,
+ values={
+ "id": next_id,
+ "transaction_id": transaction_id,
+ "destination": destination,
+ "ts": origin_server_ts,
+ "response_code": 0,
+ "response_json": None,
+ }
+ )
+
+ # TODO Update the tx id -> pdu id mapping
return prev_txns
@@ -171,15 +173,20 @@ class TransactionStore(SQLBaseStore):
transaction_id, destination, code, response_dict
)
- def _delivered_txn(cls, txn, transaction_id, destination,
+ def _delivered_txn(self, txn, transaction_id, destination,
code, response_json):
- query = (
- "UPDATE %s "
- "SET response_code = ?, response_json = ? "
- "WHERE transaction_id = ? AND destination = ?"
- ) % SentTransactions.table_name
-
- txn.execute(query, (code, response_json, transaction_id, destination))
+ self._simple_update_one_txn(
+ txn,
+ table=SentTransactions.table_name,
+ keyvalues={
+ "transaction_id": transaction_id,
+ "destination": destination,
+ },
+ updatevalues={
+ "response_code": code,
+ "response_json": response_json,
+ }
+ )
def get_transactions_after(self, transaction_id, destination):
"""Get all transactions after a given local transaction_id.
@@ -189,25 +196,26 @@ class TransactionStore(SQLBaseStore):
destination (str)
Returns:
- list: A list of `ReceivedTransactionsTable.EntryType`
+ list: A list of dicts
"""
return self.runInteraction(
"get_transactions_after",
self._get_transactions_after, transaction_id, destination
)
- def _get_transactions_after(cls, txn, transaction_id, destination):
- where = (
- "destination = ? AND id > (select id FROM %s WHERE "
- "transaction_id = ? AND destination = ?)"
- ) % (
- SentTransactions.table_name
+ def _get_transactions_after(self, txn, transaction_id, destination):
+ query = (
+ "SELECT * FROM sent_transactions"
+ " WHERE destination = ? AND id >"
+ " ("
+ " SELECT id FROM sent_transactions"
+ " WHERE transaction_id = ? AND destination = ?"
+ " )"
)
- query = SentTransactions.select_statement(where)
txn.execute(query, (destination, transaction_id, destination))
- return ReceivedTransactionsTable.decode_results(txn.fetchall())
+ return self.cursor_to_dict(txn)
@cached()
def get_destination_retry_timings(self, destination):
@@ -218,22 +226,27 @@ class TransactionStore(SQLBaseStore):
Returns:
None if not retrying
- Otherwise a DestinationsTable.EntryType for the retry scheme
+ Otherwise a dict for the retry scheme
"""
return self.runInteraction(
"get_destination_retry_timings",
self._get_destination_retry_timings, destination)
- def _get_destination_retry_timings(cls, txn, destination):
- query = DestinationsTable.select_statement("destination = ?")
- txn.execute(query, (destination,))
- result = txn.fetchall()
- if result:
- result = DestinationsTable.decode_single_result(result)
- if result.retry_last_ts > 0:
- return result
- else:
- return None
+ def _get_destination_retry_timings(self, txn, destination):
+ result = self._simple_select_one_txn(
+ txn,
+ table=DestinationsTable.table_name,
+ keyvalues={
+ "destination": destination,
+ },
+ retcols=DestinationsTable.fields,
+ allow_none=True,
+ )
+
+ if result and result["retry_last_ts"] > 0:
+ return result
+ else:
+ return None
def set_destination_retry_timings(self, destination,
retry_last_ts, retry_interval):
@@ -249,11 +262,11 @@ class TransactionStore(SQLBaseStore):
# As this is the new value, we might as well prefill the cache
self.get_destination_retry_timings.prefill(
destination,
- DestinationsTable.EntryType(
- destination,
- retry_last_ts,
- retry_interval
- )
+ {
+ "destination": destination,
+ "retry_last_ts": retry_last_ts,
+ "retry_interval": retry_interval
+ },
)
# XXX: we could chose to not bother persisting this if our cache thinks
@@ -266,22 +279,38 @@ class TransactionStore(SQLBaseStore):
retry_interval,
)
- def _set_destination_retry_timings(cls, txn, destination,
+ def _set_destination_retry_timings(self, txn, destination,
retry_last_ts, retry_interval):
-
query = (
- "INSERT OR REPLACE INTO %s "
- "(destination, retry_last_ts, retry_interval) "
- "VALUES (?, ?, ?) "
- ) % DestinationsTable.table_name
+ "UPDATE destinations"
+ " SET retry_last_ts = ?, retry_interval = ?"
+ " WHERE destination = ?"
+ )
+
+ txn.execute(
+ query,
+ (
+ retry_last_ts, retry_interval, destination,
+ )
+ )
- txn.execute(query, (destination, retry_last_ts, retry_interval))
+ if txn.rowcount == 0:
+ # destination wasn't already in table. Insert it.
+ self._simple_insert_txn(
+ txn,
+ table="destinations",
+ values={
+ "destination": destination,
+ "retry_last_ts": retry_last_ts,
+ "retry_interval": retry_interval,
+ }
+ )
def get_destinations_needing_retry(self):
"""Get all destinations which are due a retry for sending a transaction.
Returns:
- list: A list of `DestinationsTable.EntryType`
+ list: A list of dicts
"""
return self.runInteraction(
@@ -289,14 +318,17 @@ class TransactionStore(SQLBaseStore):
self._get_destinations_needing_retry
)
- def _get_destinations_needing_retry(cls, txn):
- where = "retry_last_ts > 0 and retry_next_ts < now()"
- query = DestinationsTable.select_statement(where)
- txn.execute(query)
- return DestinationsTable.decode_results(txn.fetchall())
+ def _get_destinations_needing_retry(self, txn):
+ query = (
+ "SELECT * FROM destinations"
+ " WHERE retry_last_ts > 0 and retry_next_ts < ?"
+ )
+
+ txn.execute(query, (self._clock.time_msec(),))
+ return self.cursor_to_dict(txn)
-class ReceivedTransactionsTable(Table):
+class ReceivedTransactionsTable(object):
table_name = "received_transactions"
fields = [
@@ -308,10 +340,8 @@ class ReceivedTransactionsTable(Table):
"has_been_referenced",
]
- EntryType = namedtuple("ReceivedTransactionsEntry", fields)
-
-class SentTransactions(Table):
+class SentTransactions(object):
table_name = "sent_transactions"
fields = [
@@ -326,7 +356,7 @@ class SentTransactions(Table):
EntryType = namedtuple("SentTransactionsEntry", fields)
-class TransactionsToPduTable(Table):
+class TransactionsToPduTable(object):
table_name = "transaction_id_to_pdu"
fields = [
@@ -336,10 +366,8 @@ class TransactionsToPduTable(Table):
"pdu_origin",
]
- EntryType = namedtuple("TransactionsToPduEntry", fields)
-
-class DestinationsTable(Table):
+class DestinationsTable(object):
table_name = "destinations"
fields = [
@@ -347,5 +375,3 @@ class DestinationsTable(Table):
"retry_last_ts",
"retry_interval",
]
-
- EntryType = namedtuple("DestinationsEntry", fields)
diff --git a/synapse/storage/util/__init__.py b/synapse/storage/util/__init__.py
new file mode 100644
index 0000000000..c488b10d3c
--- /dev/null
+++ b/synapse/storage/util/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
new file mode 100644
index 0000000000..e5dec1c948
--- /dev/null
+++ b/synapse/storage/util/id_generators.py
@@ -0,0 +1,130 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from collections import deque
+import contextlib
+import threading
+
+
+class IdGenerator(object):
+ def __init__(self, table, column, store):
+ self.table = table
+ self.column = column
+ self.store = store
+ self._lock = threading.Lock()
+ self._next_id = None
+
+ @defer.inlineCallbacks
+ def get_next(self):
+ with self._lock:
+ if not self._next_id:
+ res = yield self.store._execute_and_decode(
+ "IdGenerator_%s" % (self.table,),
+ "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,)
+ )
+
+ self._next_id = (res and res[0] and res[0]["mx"]) or 1
+
+ i = self._next_id
+ self._next_id += 1
+ defer.returnValue(i)
+
+ def get_next_txn(self, txn):
+ with self._lock:
+ if self._next_id:
+ i = self._next_id
+ self._next_id += 1
+ return i
+ else:
+ txn.execute(
+ "SELECT MAX(%s) FROM %s" % (self.column, self.table,)
+ )
+
+ val, = txn.fetchone()
+ cur = val or 0
+ cur += 1
+ self._next_id = cur + 1
+
+ return cur
+
+
+class StreamIdGenerator(object):
+ """Used to generate new stream ids when persisting events while keeping
+ track of which transactions have been completed.
+
+ This allows us to get the "current" stream id, i.e. the stream id such that
+ all ids less than or equal to it have completed. This handles the fact that
+ persistence of events can complete out of order.
+
+ Usage:
+ with stream_id_gen.get_next_txn(txn) as stream_id:
+ # ... persist event ...
+ """
+ def __init__(self):
+ self._lock = threading.Lock()
+
+ self._current_max = None
+ self._unfinished_ids = deque()
+
+ def get_next_txn(self, txn):
+ """
+ Usage:
+ with stream_id_gen.get_next_txn(txn) as stream_id:
+ # ... persist event ...
+ """
+ with self._lock:
+ if not self._current_max:
+ self._compute_current_max(txn)
+
+ self._current_max += 1
+ next_id = self._current_max
+
+ self._unfinished_ids.append(next_id)
+
+ @contextlib.contextmanager
+ def manager():
+ try:
+ yield next_id
+ finally:
+ with self._lock:
+ self._unfinished_ids.remove(next_id)
+
+ return manager()
+
+ def get_max_token(self, store):
+ """Returns the maximum stream id such that all stream ids less than or
+ equal to it have been successfully persisted.
+ """
+ with self._lock:
+ if self._unfinished_ids:
+ return self._unfinished_ids[0] - 1
+
+ if not self._current_max:
+ return store.runInteraction(
+ "_compute_current_max",
+ self._compute_current_max,
+ )
+
+ return self._current_max
+
+ def _compute_current_max(self, txn):
+ txn.execute("SELECT MAX(stream_ordering) FROM events")
+ val, = txn.fetchone()
+
+ self._current_max = int(val) if val else 1
+
+ return self._current_max
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 4e82232796..a42138f556 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -60,7 +60,7 @@ def get_retry_limiter(destination, clock, store, **kwargs):
if retry_timings:
retry_last_ts, retry_interval = (
- retry_timings.retry_last_ts, retry_timings.retry_interval
+ retry_timings["retry_last_ts"], retry_timings["retry_interval"]
)
now = int(clock.time_msec())
diff --git a/tests/federation/test_federation.py b/tests/federation/test_federation.py
index 2ecd00d2ad..a4ef60b911 100644
--- a/tests/federation/test_federation.py
+++ b/tests/federation/test_federation.py
@@ -24,8 +24,6 @@ from ..utils import MockHttpResource, MockClock, setup_test_homeserver
from synapse.federation import initialize_http_replication
from synapse.events import FrozenEvent
-from synapse.storage.transactions import DestinationsTable
-
def make_pdu(prev_pdus=[], **kwargs):
"""Provide some default fields for making a PduTuple."""
@@ -57,8 +55,14 @@ class FederationTestCase(unittest.TestCase):
self.mock_persistence.get_received_txn_response.return_value = (
defer.succeed(None)
)
+
+ retry_timings_res = {
+ "destination": "",
+ "retry_last_ts": 0,
+ "retry_interval": 0,
+ }
self.mock_persistence.get_destination_retry_timings.return_value = (
- defer.succeed(DestinationsTable.EntryType("", 0, 0))
+ defer.succeed(retry_timings_res)
)
self.mock_persistence.get_auth_chain.return_value = []
self.clock = MockClock()
diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py
index c13ade3286..08d2404b6c 100644
--- a/tests/handlers/test_federation.py
+++ b/tests/handlers/test_federation.py
@@ -87,6 +87,15 @@ class FederationTestCase(unittest.TestCase):
self.datastore.get_room.return_value = defer.succeed(True)
self.auth.check_host_in_room.return_value = defer.succeed(True)
+ retry_timings_res = {
+ "destination": "",
+ "retry_last_ts": 0,
+ "retry_interval": 0,
+ }
+ self.datastore.get_destination_retry_timings.return_value = (
+ defer.succeed(retry_timings_res)
+ )
+
def have_events(event_ids):
return defer.succeed({})
self.datastore.have_events.side_effect = have_events
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 04eba4289e..9b0e606918 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -194,8 +194,13 @@ class MockedDatastorePresenceTestCase(PresenceTestCase):
return datastore
def setUp_datastore_federation_mocks(self, datastore):
+ retry_timings_res = {
+ "destination": "",
+ "retry_last_ts": 0,
+ "retry_interval": 0,
+ }
datastore.get_destination_retry_timings.return_value = (
- defer.succeed(DestinationsTable.EntryType("", 0, 0))
+ defer.succeed(retry_timings_res)
)
def get_received_txn_response(*args):
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index bf34b7ccbd..2d76b23564 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -96,8 +96,13 @@ class TypingNotificationsTestCase(unittest.TestCase):
self.event_source = hs.get_event_sources().sources["typing"]
self.datastore = hs.get_datastore()
+ retry_timings_res = {
+ "destination": "",
+ "retry_last_ts": 0,
+ "retry_interval": 0,
+ }
self.datastore.get_destination_retry_timings.return_value = (
- defer.succeed(DestinationsTable.EntryType("", 0, 0))
+ defer.succeed(retry_timings_res)
)
def get_received_txn_response(*args):
diff --git a/tests/rest/client/v1/test_events.py b/tests/rest/client/v1/test_events.py
index 36b0f2ff6d..445272e323 100644
--- a/tests/rest/client/v1/test_events.py
+++ b/tests/rest/client/v1/test_events.py
@@ -115,12 +115,6 @@ class EventStreamPermissionsTestCase(RestTestCase):
hs = yield setup_test_homeserver(
http_client=None,
replication_layer=Mock(),
- clock=Mock(spec=[
- "call_later",
- "cancel_call_later",
- "time_msec",
- "time"
- ]),
ratelimiter=NonCallableMock(spec_set=[
"send_message",
]),
@@ -132,9 +126,6 @@ class EventStreamPermissionsTestCase(RestTestCase):
hs.get_handlers().federation_handler = Mock()
- hs.get_clock().time_msec.return_value = 1000000
- hs.get_clock().time.return_value = 1000
-
synapse.rest.client.v1.register.register_servlets(hs, self.mock_resource)
synapse.rest.client.v1.events.register_servlets(hs, self.mock_resource)
synapse.rest.client.v1.room.register_servlets(hs, self.mock_resource)
diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py
index 675959c56c..77376b348e 100644
--- a/tests/storage/test_appservice.py
+++ b/tests/storage/test_appservice.py
@@ -15,6 +15,7 @@
from tests import unittest
from twisted.internet import defer
+from tests.utils import setup_test_homeserver
from synapse.appservice import ApplicationService, ApplicationServiceState
from synapse.server import HomeServer
from synapse.storage.appservice import (
@@ -33,14 +34,10 @@ class ApplicationServiceStoreTestCase(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
self.as_yaml_files = []
- db_pool = SQLiteMemoryDbPool()
- yield db_pool.prepare()
- hs = HomeServer(
- "test", db_pool=db_pool, clock=MockClock(),
- config=Mock(
- app_service_config_files=self.as_yaml_files
- )
+ config = Mock(
+ app_service_config_files=self.as_yaml_files
)
+ hs = yield setup_test_homeserver(config=config)
self.as_token = "token1"
self.as_url = "some_url"
@@ -102,8 +99,13 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
self.as_yaml_files = []
- self.db_pool = SQLiteMemoryDbPool()
- yield self.db_pool.prepare()
+
+ config = Mock(
+ app_service_config_files=self.as_yaml_files
+ )
+ hs = yield setup_test_homeserver(config=config)
+ self.db_pool = hs.get_db_pool()
+
self.as_list = [
{
"token": "token1",
@@ -129,11 +131,8 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
for s in self.as_list:
yield self._add_service(s["url"], s["token"])
- hs = HomeServer(
- "test", db_pool=self.db_pool, clock=MockClock(), config=Mock(
- app_service_config_files=self.as_yaml_files
- )
- )
+ self.as_yaml_files = []
+
self.store = TestTransactionStore(hs)
def _add_service(self, url, as_token):
@@ -302,7 +301,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
(service.id,)
)
self.assertEquals(1, len(res))
- self.assertEquals(str(txn_id), res[0][0])
+ self.assertEquals(txn_id, res[0][0])
res = yield self.db_pool.runQuery(
"SELECT * FROM application_services_txns WHERE txn_id=?",
@@ -325,7 +324,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
(service.id,)
)
self.assertEquals(1, len(res))
- self.assertEquals(str(txn_id), res[0][0])
+ self.assertEquals(txn_id, res[0][0])
self.assertEquals(ApplicationServiceState.UP, res[0][1])
res = yield self.db_pool.runQuery(
diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py
index 7f5845cf0c..a64d2b821e 100644
--- a/tests/storage/test_base.py
+++ b/tests/storage/test_base.py
@@ -24,6 +24,7 @@ from collections import OrderedDict
from synapse.server import HomeServer
from synapse.storage._base import SQLBaseStore
+from synapse.storage.engines import create_engine
class SQLBaseStoreTestCase(unittest.TestCase):
@@ -32,15 +33,26 @@ class SQLBaseStoreTestCase(unittest.TestCase):
def setUp(self):
self.db_pool = Mock(spec=["runInteraction"])
self.mock_txn = Mock()
+ self.mock_conn = Mock(spec_set=["cursor"])
+ self.mock_conn.cursor.return_value = self.mock_txn
# Our fake runInteraction just runs synchronously inline
def runInteraction(func, *args, **kwargs):
return defer.succeed(func(self.mock_txn, *args, **kwargs))
self.db_pool.runInteraction = runInteraction
+ def runWithConnection(func, *args, **kwargs):
+ return defer.succeed(func(self.mock_conn, *args, **kwargs))
+ self.db_pool.runWithConnection = runWithConnection
+
config = Mock()
config.event_cache_size = 1
- hs = HomeServer("test", db_pool=self.db_pool, config=config)
+ hs = HomeServer(
+ "test",
+ db_pool=self.db_pool,
+ config=config,
+ database_engine=create_engine("sqlite3"),
+ )
self.datastore = SQLBaseStore(hs)
@@ -86,8 +98,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
self.assertEquals("Value", value)
self.mock_txn.execute.assert_called_with(
- "SELECT retcol FROM tablename WHERE keycol = ? "
- "ORDER BY rowid asc",
+ "SELECT retcol FROM tablename WHERE keycol = ?",
["TheKey"]
)
@@ -104,8 +115,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
self.assertEquals({"colA": 1, "colB": 2, "colC": 3}, ret)
self.mock_txn.execute.assert_called_with(
- "SELECT colA, colB, colC FROM tablename WHERE keycol = ? "
- "ORDER BY rowid asc",
+ "SELECT colA, colB, colC FROM tablename WHERE keycol = ?",
["TheKey"]
)
@@ -139,8 +149,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
self.assertEquals([{"colA": 1}, {"colA": 2}, {"colA": 3}], ret)
self.mock_txn.execute.assert_called_with(
- "SELECT colA FROM tablename WHERE keycol = ? "
- "ORDER BY rowid asc",
+ "SELECT colA FROM tablename WHERE keycol = ?",
["A set"]
)
@@ -189,8 +198,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
self.assertEquals({"columname": "Old Value"}, ret)
self.mock_txn.execute.assert_has_calls([
- call('SELECT columname FROM tablename WHERE keycol = ? '
- 'ORDER BY rowid asc',
+ call('SELECT columname FROM tablename WHERE keycol = ?',
['TheKey']),
call("UPDATE tablename SET columname = ? WHERE keycol = ?",
["New Value", "TheKey"])
diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py
index e0b81f2b57..78f6004204 100644
--- a/tests/storage/test_registration.py
+++ b/tests/storage/test_registration.py
@@ -42,28 +42,38 @@ class RegistrationStoreTestCase(unittest.TestCase):
self.assertEquals(
# TODO(paul): Surely this field should be 'user_id', not 'name'
# Additionally surely it shouldn't come in a 1-element list
- [{"name": self.user_id, "password_hash": self.pwhash}],
+ {"name": self.user_id, "password_hash": self.pwhash},
(yield self.store.get_user_by_id(self.user_id))
)
- self.assertEquals(
- {"admin": 0,
- "device_id": None,
- "name": self.user_id,
- "token_id": 1},
- (yield self.store.get_user_by_token(self.tokens[0]))
+ result = yield self.store.get_user_by_token(self.tokens[1])
+
+ self.assertDictContainsSubset(
+ {
+ "admin": 0,
+ "device_id": None,
+ "name": self.user_id,
+ },
+ result
)
+ self.assertTrue("token_id" in result)
+
@defer.inlineCallbacks
def test_add_tokens(self):
yield self.store.register(self.user_id, self.tokens[0], self.pwhash)
yield self.store.add_access_token_to_user(self.user_id, self.tokens[1])
- self.assertEquals(
- {"admin": 0,
- "device_id": None,
- "name": self.user_id,
- "token_id": 2},
- (yield self.store.get_user_by_token(self.tokens[1]))
+ result = yield self.store.get_user_by_token(self.tokens[1])
+
+ self.assertDictContainsSubset(
+ {
+ "admin": 0,
+ "device_id": None,
+ "name": self.user_id,
+ },
+ result
)
+ self.assertTrue("token_id" in result)
+
diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py
index 811fea544b..785953cc89 100644
--- a/tests/storage/test_roommember.py
+++ b/tests/storage/test_roommember.py
@@ -119,7 +119,7 @@ class RoomMemberStoreTestCase(unittest.TestCase):
yield self.inject_room_member(self.room, self.u_alice, Membership.JOIN)
self.assertEquals(
- ["test"],
+ {"test"},
(yield self.store.get_joined_hosts_for_room(self.room.to_string()))
)
@@ -127,7 +127,7 @@ class RoomMemberStoreTestCase(unittest.TestCase):
yield self.inject_room_member(self.room, self.u_bob, Membership.JOIN)
self.assertEquals(
- ["test"],
+ {"test"},
(yield self.store.get_joined_hosts_for_room(self.room.to_string()))
)
@@ -136,9 +136,9 @@ class RoomMemberStoreTestCase(unittest.TestCase):
self.assertEquals(
{"test", "elsewhere"},
- set((yield
+ (yield
self.store.get_joined_hosts_for_room(self.room.to_string())
- ))
+ )
)
# Should still have both hosts
@@ -146,15 +146,15 @@ class RoomMemberStoreTestCase(unittest.TestCase):
self.assertEquals(
{"test", "elsewhere"},
- set((yield
+ (yield
self.store.get_joined_hosts_for_room(self.room.to_string())
- ))
+ )
)
# Should have only one host after other leaves
yield self.inject_room_member(self.room, self.u_charlie, Membership.LEAVE)
self.assertEquals(
- ["test"],
+ {"test"},
(yield self.store.get_joined_hosts_for_room(self.room.to_string()))
)
diff --git a/tests/utils.py b/tests/utils.py
index 81e82a80df..cc038fecf1 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -17,6 +17,7 @@ from synapse.http.server import HttpServer
from synapse.api.errors import cs_error, CodeMessageException, StoreError
from synapse.api.constants import EventTypes
from synapse.storage import prepare_database
+from synapse.storage.engines import create_engine
from synapse.server import HomeServer
from synapse.util.logcontext import LoggingContext
@@ -44,18 +45,23 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
config.event_cache_size = 1
config.disable_registration = False
+ if "clock" not in kargs:
+ kargs["clock"] = MockClock()
+
if datastore is None:
db_pool = SQLiteMemoryDbPool()
yield db_pool.prepare()
hs = HomeServer(
name, db_pool=db_pool, config=config,
version_string="Synapse/tests",
+ database_engine=create_engine("sqlite3"),
**kargs
)
else:
hs = HomeServer(
name, db_pool=None, datastore=datastore, config=config,
version_string="Synapse/tests",
+ database_engine=create_engine("sqlite3"),
**kargs
)
@@ -227,7 +233,10 @@ class SQLiteMemoryDbPool(ConnectionPool, object):
)
def prepare(self):
- return self.runWithConnection(prepare_database)
+ engine = create_engine("sqlite3")
+ return self.runWithConnection(
+ lambda conn: prepare_database(conn, engine)
+ )
class MemoryDataStore(object):
|