diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index abde7d0df5..f8053484cf 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -77,9 +77,6 @@ class DataStore(RoomMemberStore, RoomStore,
self.min_token_deferred = self._get_min_token()
self.min_token = None
- self._next_stream_id_lock = threading.Lock()
- self._next_stream_id = int(hs.get_clock().time_msec()) * 1000
-
def insert_client_ip(self, user, access_token, device_id, ip, user_agent):
return self._simple_upsert(
"user_ips",
@@ -127,19 +124,21 @@ class UpgradeDatabaseException(PrepareDatabaseException):
pass
-def prepare_database(db_conn):
+def prepare_database(db_conn, database_engine):
"""Prepares a database for usage. Will either create all necessary tables
or upgrade from an older schema version.
"""
try:
cur = db_conn.cursor()
- version_info = _get_or_create_schema_state(cur)
+ version_info = _get_or_create_schema_state(cur, database_engine)
if version_info:
user_version, delta_files, upgraded = version_info
- _upgrade_existing_database(cur, user_version, delta_files, upgraded)
+ _upgrade_existing_database(
+ cur, user_version, delta_files, upgraded, database_engine
+ )
else:
- _setup_new_database(cur)
+ _setup_new_database(cur, database_engine)
# cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
@@ -150,7 +149,7 @@ def prepare_database(db_conn):
raise
-def _setup_new_database(cur):
+def _setup_new_database(cur, database_engine):
"""Sets up the database by finding a base set of "full schemas" and then
applying any necessary deltas.
@@ -210,7 +209,7 @@ def _setup_new_database(cur):
executescript(cur, sql_loc)
cur.execute(
- _convert_param_style(
+ database_engine.convert_param_style(
"REPLACE INTO schema_version (version, upgraded)"
" VALUES (?,?)"
),
@@ -221,12 +220,13 @@ def _setup_new_database(cur):
cur,
current_version=max_current_ver,
applied_delta_files=[],
- upgraded=False
+ upgraded=False,
+ database_engine=database_engine,
)
def _upgrade_existing_database(cur, current_version, applied_delta_files,
- upgraded):
+ upgraded, database_engine):
"""Upgrades an existing database.
Delta files can either be SQL stored in *.sql files, or python modules
@@ -335,26 +335,22 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
# Mark as done.
cur.execute(
- _convert_param_style(
+ database_engine.convert_param_style(
"INSERT INTO applied_schema_deltas (version, file)"
- " VALUES (?,?)"
+ " VALUES (?,?)",
),
(v, relative_path)
)
cur.execute(
- _convert_param_style(
+ database_engine.convert_param_style(
"REPLACE INTO schema_version (version, upgraded)"
- " VALUES (?,?)"
+ " VALUES (?,?)",
),
(v, True)
)
-def _convert_param_style(sql):
- return sql.replace("?", "%s")
-
-
def get_statements(f):
statement_buffer = ""
in_comment = False # If we're in a /* ... */ style comment
@@ -409,7 +405,7 @@ def executescript(txn, schema_path):
txn.execute(statement)
-def _get_or_create_schema_state(txn):
+def _get_or_create_schema_state(txn, database_engine):
try:
# Bluntly try creating the schema_version tables.
schema_path = os.path.join(
@@ -426,7 +422,7 @@ def _get_or_create_schema_state(txn):
if current_version:
txn.execute(
- _convert_param_style(
+ database_engine.convert_param_style(
"SELECT file FROM applied_schema_deltas WHERE version >= ?"
),
(current_version,)
@@ -446,6 +442,8 @@ def prepare_sqlite3_database(db_conn):
new. This only affects sqlite databases since they were the only ones
supported at the time.
"""
+ import sqlite3
+
with db_conn:
schema_path = os.path.join(
dir_path, "schema", "schema_version.sql",
@@ -466,7 +464,8 @@ def prepare_sqlite3_database(db_conn):
db_conn.execute(
_convert_param_style(
"REPLACE INTO schema_version (version, upgraded)"
- " VALUES (?,?)"
+ " VALUES (?,?)",
+ sqlite3
),
(row[0], False)
)
|