summary refs log tree commit diff
path: root/synapse/storage/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/__init__.py')
-rw-r--r--synapse/storage/__init__.py43
1 files changed, 21 insertions, 22 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index abde7d0df5..f8053484cf 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -77,9 +77,6 @@ class DataStore(RoomMemberStore, RoomStore,
         self.min_token_deferred = self._get_min_token()
         self.min_token = None
 
-        self._next_stream_id_lock = threading.Lock()
-        self._next_stream_id = int(hs.get_clock().time_msec()) * 1000
-
     def insert_client_ip(self, user, access_token, device_id, ip, user_agent):
         return self._simple_upsert(
             "user_ips",
@@ -127,19 +124,21 @@ class UpgradeDatabaseException(PrepareDatabaseException):
     pass
 
 
-def prepare_database(db_conn):
+def prepare_database(db_conn, database_engine):
     """Prepares a database for usage. Will either create all necessary tables
     or upgrade from an older schema version.
     """
     try:
         cur = db_conn.cursor()
-        version_info = _get_or_create_schema_state(cur)
+        version_info = _get_or_create_schema_state(cur, database_engine)
 
         if version_info:
             user_version, delta_files, upgraded = version_info
-            _upgrade_existing_database(cur, user_version, delta_files, upgraded)
+            _upgrade_existing_database(
+                cur, user_version, delta_files, upgraded, database_engine
+            )
         else:
-            _setup_new_database(cur)
+            _setup_new_database(cur, database_engine)
 
         # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
 
@@ -150,7 +149,7 @@ def prepare_database(db_conn):
         raise
 
 
-def _setup_new_database(cur):
+def _setup_new_database(cur, database_engine):
     """Sets up the database by finding a base set of "full schemas" and then
     applying any necessary deltas.
 
@@ -210,7 +209,7 @@ def _setup_new_database(cur):
         executescript(cur, sql_loc)
 
     cur.execute(
-        _convert_param_style(
+        database_engine.convert_param_style(
             "REPLACE INTO schema_version (version, upgraded)"
             " VALUES (?,?)"
         ),
@@ -221,12 +220,13 @@ def _setup_new_database(cur):
         cur,
         current_version=max_current_ver,
         applied_delta_files=[],
-        upgraded=False
+        upgraded=False,
+        database_engine=database_engine,
     )
 
 
 def _upgrade_existing_database(cur, current_version, applied_delta_files,
-                               upgraded):
+                               upgraded, database_engine):
     """Upgrades an existing database.
 
     Delta files can either be SQL stored in *.sql files, or python modules
@@ -335,26 +335,22 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
 
             # Mark as done.
             cur.execute(
-                _convert_param_style(
+                database_engine.convert_param_style(
                     "INSERT INTO applied_schema_deltas (version, file)"
-                    " VALUES (?,?)"
+                    " VALUES (?,?)",
                 ),
                 (v, relative_path)
             )
 
             cur.execute(
-                _convert_param_style(
+                database_engine.convert_param_style(
                     "REPLACE INTO schema_version (version, upgraded)"
-                    " VALUES (?,?)"
+                    " VALUES (?,?)",
                 ),
                 (v, True)
             )
 
 
-def _convert_param_style(sql):
-    return sql.replace("?", "%s")
-
-
 def get_statements(f):
     statement_buffer = ""
     in_comment = False  # If we're in a /* ... */ style comment
@@ -409,7 +405,7 @@ def executescript(txn, schema_path):
             txn.execute(statement)
 
 
-def _get_or_create_schema_state(txn):
+def _get_or_create_schema_state(txn, database_engine):
     try:
         # Bluntly try creating the schema_version tables.
         schema_path = os.path.join(
@@ -426,7 +422,7 @@ def _get_or_create_schema_state(txn):
 
     if current_version:
         txn.execute(
-            _convert_param_style(
+            database_engine.convert_param_style(
                 "SELECT file FROM applied_schema_deltas WHERE version >= ?"
             ),
             (current_version,)
@@ -446,6 +442,8 @@ def prepare_sqlite3_database(db_conn):
     new. This only affects sqlite databases since they were the only ones
     supported at the time.
     """
+    import sqlite3
+
     with db_conn:
         schema_path = os.path.join(
             dir_path, "schema", "schema_version.sql",
@@ -466,7 +464,8 @@ def prepare_sqlite3_database(db_conn):
                 db_conn.execute(
                     _convert_param_style(
                         "REPLACE INTO schema_version (version, upgraded)"
-                        " VALUES (?,?)"
+                        " VALUES (?,?)",
+                        sqlite3
                     ),
                     (row[0], False)
                 )