From ce55a8cc4bb26c4518875743a04a06e792ad7ebf Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 10 Sep 2014 15:42:15 +0100 Subject: Move database preparing code out of homserver.py into storage where it belongs --- synapse/storage/__init__.py | 61 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index ad2a484c16..2543fb12b7 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -43,10 +43,28 @@ from .keys import KeyStore import json import logging import os +import sqlite3 logger = logging.getLogger(__name__) + +SCHEMAS = [ + "transactions", + "pdu", + "users", + "profiles", + "presence", + "im", + "room_aliases", +] + + +# Remember to update this number every time an incompatible change is made to +# database schema files, so the users will be informed on server restarts. +SCHEMA_VERSION = 3 + + class _RollbackButIsFineException(Exception): """ This exception is used to rollback a transaction without implying something went wrong. @@ -350,3 +368,46 @@ def read_schema(schema): """ with open(schema_path(schema)) as schema_file: return schema_file.read() + + +def prepare_database(db_name): + """ Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we + don't have to worry about overwriting existing content. + """ + logging.info("Preparing database: %s...", db_name) + + with sqlite3.connect(db_name) as db_conn: + c = db_conn.cursor() + c.execute("PRAGMA user_version") + row = c.fetchone() + + if row and row[0]: + user_version = row[0] + + if user_version > SCHEMA_VERSION: + raise ValueError("Cannot use this database as it is too " + + "new for the server to understand" + ) + elif user_version < SCHEMA_VERSION: + logging.info("Upgrading database from version %d", + user_version + ) + + # Run every version since after the current version. + for v in range(user_version + 1, SCHEMA_VERSION + 1): + sql_script = read_schema("delta/v%d" % (v)) + c.executescript(sql_script) + + db_conn.commit() + + else: + for sql_loc in SCHEMAS: + sql_script = read_schema(sql_loc) + + c.executescript(sql_script) + db_conn.commit() + c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION) + + c.close() + + logging.info("Database prepared in %s.", db_name) -- cgit 1.5.1 From 55397f634770f2b91cd4567e6b40507944144b67 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 10 Sep 2014 16:23:58 +0100 Subject: prepare_database() on db_conn, not plain name, so we can pass in the connection from outside --- synapse/app/homeserver.py | 10 +++++++- synapse/storage/__init__.py | 57 +++++++++++++++++++++------------------------ 2 files changed, 35 insertions(+), 32 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e6377e3060..2f1b954902 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -39,6 +39,7 @@ import logging import os import re import sys +import sqlite3 logger = logging.getLogger(__name__) @@ -208,7 +209,14 @@ def setup(): redirect_root_to_web_client=True, ) - prepare_database(hs.get_db_name()) + db_name = hs.get_db_name() + + logging.info("Preparing database: %s...", db_name) + + with sqlite3.connect(db_name) as db_conn: + prepare_database(db_conn) + + logging.info("Database prepared in %s.", db_name) hs.get_db_pool() diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 2543fb12b7..6b273a0306 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -43,7 +43,6 @@ from .keys import KeyStore import json import logging import os -import sqlite3 logger = logging.getLogger(__name__) @@ -370,44 +369,40 @@ def read_schema(schema): return schema_file.read() -def prepare_database(db_name): +def prepare_database(db_conn): """ Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we don't have to worry about overwriting existing content. """ - logging.info("Preparing database: %s...", db_name) + c = db_conn.cursor() + c.execute("PRAGMA user_version") + row = c.fetchone() - with sqlite3.connect(db_name) as db_conn: - c = db_conn.cursor() - c.execute("PRAGMA user_version") - row = c.fetchone() + if row and row[0]: + user_version = row[0] - if row and row[0]: - user_version = row[0] - - if user_version > SCHEMA_VERSION: - raise ValueError("Cannot use this database as it is too " + - "new for the server to understand" - ) - elif user_version < SCHEMA_VERSION: - logging.info("Upgrading database from version %d", - user_version - ) + if user_version > SCHEMA_VERSION: + raise ValueError("Cannot use this database as it is too " + + "new for the server to understand" + ) + elif user_version < SCHEMA_VERSION: + logging.info("Upgrading database from version %d", + user_version + ) - # Run every version since after the current version. - for v in range(user_version + 1, SCHEMA_VERSION + 1): - sql_script = read_schema("delta/v%d" % (v)) - c.executescript(sql_script) + # Run every version since after the current version. + for v in range(user_version + 1, SCHEMA_VERSION + 1): + sql_script = read_schema("delta/v%d" % (v)) + c.executescript(sql_script) - db_conn.commit() + db_conn.commit() - else: - for sql_loc in SCHEMAS: - sql_script = read_schema(sql_loc) + else: + for sql_loc in SCHEMAS: + sql_script = read_schema(sql_loc) - c.executescript(sql_script) - db_conn.commit() - c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION) + c.executescript(sql_script) + db_conn.commit() + c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION) - c.close() + c.close() - logging.info("Database prepared in %s.", db_name) -- cgit 1.5.1 From aaf9ab68c6969d69150b7aa1f6ebddcf1c496050 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 11 Sep 2014 18:44:04 +0100 Subject: Rename _store_room_member_txn to _store_room_member_from_event_txn so we can create another, more sensible function of that name --- synapse/storage/__init__.py | 2 +- synapse/storage/roommember.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6b273a0306..8228069271 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -154,7 +154,7 @@ class DataStore(RoomMemberStore, RoomStore, @log_function def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None): if event.type == RoomMemberEvent.TYPE: - self._store_room_member_txn(txn, event) + self._store_room_member_from_event_txn(txn, event) elif event.type == FeedbackEvent.TYPE: self._store_feedback_txn(txn, event) elif event.type == RoomNameEvent.TYPE: diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 9a393e2568..437ff03a73 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) class RoomMemberStore(SQLBaseStore): - def _store_room_member_txn(self, txn, event): + def _store_room_member_from_event_txn(self, txn, event): """Store a room member in the database. """ target_user_id = event.state_key -- cgit 1.5.1 From e53d77b5017e823506484bbb95964b4d97f3e2a1 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 12 Sep 2014 13:57:24 +0100 Subject: Add a .runInteraction() method on SQLBaseStore itself to wrap the .db_pool --- synapse/storage/__init__.py | 4 ++-- synapse/storage/_base.py | 20 ++++++++++++-------- synapse/storage/pdu.py | 24 ++++++++++++------------ synapse/storage/registration.py | 4 ++-- synapse/storage/room.py | 4 ++-- synapse/storage/roommember.py | 5 +++++ synapse/storage/stream.py | 2 +- synapse/storage/transactions.py | 12 ++++++------ 8 files changed, 42 insertions(+), 33 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 8228069271..629c110bed 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -94,7 +94,7 @@ class DataStore(RoomMemberStore, RoomStore, stream_ordering = self.min_token try: - yield self._db_pool.runInteraction( + yield self.runInteraction( self._persist_pdu_event_txn, pdu=pdu, event=event, @@ -297,7 +297,7 @@ class DataStore(RoomMemberStore, RoomStore, prev_state_pdu=prev_state_pdu, ) - return self._db_pool.runInteraction(_snapshot) + return self.runInteraction(_snapshot) class Snapshot(object): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 8037225079..8a36f0bc6a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -34,6 +34,10 @@ class SQLBaseStore(object): self.event_factory = hs.get_event_factory() self._clock = hs.get_clock() + def runInteraction(self, txn, *args, **kwargs): + """Wraps the .runInteraction() method on the underlying db_pool.""" + return self._db_pool.runInteraction(txn, *args, **kwargs) + def cursor_to_dict(self, cursor): """Converts a SQL cursor into an list of dicts. @@ -71,7 +75,7 @@ class SQLBaseStore(object): else: return cursor.fetchall() - return self._db_pool.runInteraction(interaction) + return self.runInteraction(interaction) def _execute_and_decode(self, query, *args): return self._execute(self.cursor_to_dict, query, *args) @@ -87,7 +91,7 @@ class SQLBaseStore(object): values : dict of new column names and values for them or_replace : bool; if True performs an INSERT OR REPLACE """ - return self._db_pool.runInteraction( + return self.runInteraction( self._simple_insert_txn, table, values, or_replace=or_replace ) @@ -164,7 +168,7 @@ class SQLBaseStore(object): txn.execute(sql, keyvalues.values()) return txn.fetchall() - res = yield self._db_pool.runInteraction(func) + res = yield self.runInteraction(func) defer.returnValue([r[0] for r in res]) @@ -187,7 +191,7 @@ class SQLBaseStore(object): txn.execute(sql, keyvalues.values()) return self.cursor_to_dict(txn) - return self._db_pool.runInteraction(func) + return self.runInteraction(func) def _simple_update_one(self, table, keyvalues, updatevalues, retcols=None): @@ -255,7 +259,7 @@ class SQLBaseStore(object): raise StoreError(500, "More than one row matched") return ret - return self._db_pool.runInteraction(func) + return self.runInteraction(func) def _simple_delete_one(self, table, keyvalues): """Executes a DELETE query on the named table, expecting to delete a @@ -276,7 +280,7 @@ class SQLBaseStore(object): raise StoreError(404, "No row found") if txn.rowcount > 1: raise StoreError(500, "more than one row matched") - return self._db_pool.runInteraction(func) + return self.runInteraction(func) def _simple_max_id(self, table): """Executes a SELECT query on the named table, expecting to return the @@ -294,7 +298,7 @@ class SQLBaseStore(object): return 0 return max_id - return self._db_pool.runInteraction(func) + return self.runInteraction(func) def _parse_event_from_row(self, row_dict): d = copy.deepcopy({k: v for k, v in row_dict.items() if v}) @@ -313,7 +317,7 @@ class SQLBaseStore(object): ) def _parse_events(self, rows): - return self._db_pool.runInteraction(self._parse_events_txn, rows) + return self.runInteraction(self._parse_events_txn, rows) def _parse_events_txn(self, txn, rows): events = [self._parse_event_from_row(r) for r in rows] diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index 3cbce2d0a1..f770a82bcd 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -42,7 +42,7 @@ class PduStore(SQLBaseStore): PduTuple: If the pdu does not exist in the database, returns None """ - return self._db_pool.runInteraction( + return self.runInteraction( self._get_pdu_tuple, pdu_id, origin ) @@ -94,7 +94,7 @@ class PduStore(SQLBaseStore): list: A list of PduTuples """ - return self._db_pool.runInteraction( + return self.runInteraction( self._get_current_state_for_context, context ) @@ -142,7 +142,7 @@ class PduStore(SQLBaseStore): pdu_origin (str) """ - return self._db_pool.runInteraction( + return self.runInteraction( self._mark_as_processed, pdu_id, pdu_origin ) @@ -151,7 +151,7 @@ class PduStore(SQLBaseStore): def get_all_pdus_from_context(self, context): """Get a list of all PDUs for a given context.""" - return self._db_pool.runInteraction( + return self.runInteraction( self._get_all_pdus_from_context, context, ) @@ -178,7 +178,7 @@ class PduStore(SQLBaseStore): Return: list: A list of PduTuples """ - return self._db_pool.runInteraction( + return self.runInteraction( self._get_backfill, context, pdu_list, limit ) @@ -239,7 +239,7 @@ class PduStore(SQLBaseStore): txn context (str) """ - return self._db_pool.runInteraction( + return self.runInteraction( self._get_min_depth_for_context, context ) @@ -345,7 +345,7 @@ class PduStore(SQLBaseStore): bool """ - return self._db_pool.runInteraction( + return self.runInteraction( self._is_pdu_new, pdu_id=pdu_id, origin=origin, @@ -498,7 +498,7 @@ class StatePduStore(SQLBaseStore): ) def get_unresolved_state_tree(self, new_state_pdu): - return self._db_pool.runInteraction( + return self.runInteraction( self._get_unresolved_state_tree, new_state_pdu ) @@ -537,7 +537,7 @@ class StatePduStore(SQLBaseStore): def update_current_state(self, pdu_id, origin, context, pdu_type, state_key): - return self._db_pool.runInteraction( + return self.runInteraction( self._update_current_state, pdu_id, origin, context, pdu_type, state_key ) @@ -576,7 +576,7 @@ class StatePduStore(SQLBaseStore): PduEntry """ - return self._db_pool.runInteraction( + return self.runInteraction( self._get_current_state_pdu, context, pdu_type, state_key ) @@ -638,7 +638,7 @@ class StatePduStore(SQLBaseStore): PduIdTuple: A pdu that we are missing, or None if we have all the pdus required to do the conflict resolution. """ - return self._db_pool.runInteraction( + return self.runInteraction( self._get_next_missing_pdu, new_pdu ) @@ -682,7 +682,7 @@ class StatePduStore(SQLBaseStore): Returns: bool: True if the new_pdu clobbered the current state, False if not """ - return self._db_pool.runInteraction( + return self.runInteraction( self._handle_new_state, new_pdu ) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index fd762bc643..db20b1daa0 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -62,7 +62,7 @@ class RegistrationStore(SQLBaseStore): Raises: StoreError if the user_id could not be registered. """ - yield self._db_pool.runInteraction(self._register, user_id, token, + yield self.runInteraction(self._register, user_id, token, password_hash) def _register(self, txn, user_id, token, password_hash): @@ -99,7 +99,7 @@ class RegistrationStore(SQLBaseStore): Raises: StoreError if no user was found. """ - user_id = yield self._db_pool.runInteraction(self._query_for_auth, + user_id = yield self.runInteraction(self._query_for_auth, token) defer.returnValue(user_id) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 017169ce00..5adf8cdf1b 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -149,7 +149,7 @@ class RoomStore(SQLBaseStore): defer.returnValue(None) def get_power_level(self, room_id, user_id): - return self._db_pool.runInteraction( + return self.runInteraction( self._get_power_level, room_id, user_id, ) @@ -182,7 +182,7 @@ class RoomStore(SQLBaseStore): return None def get_ops_levels(self, room_id): - return self._db_pool.runInteraction( + return self.runInteraction( self._get_ops_levels, room_id, ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index b357dc3058..8cbc15356d 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -71,6 +71,11 @@ class RoomMemberStore(SQLBaseStore): txn.execute(sql, (room_id, domain)) + def store_room_member(self, user_id, room_id, event_id, membership): + return self.runInteraction(self._store_room_member_txn, + user_id, user_id, room_id, event_id, membership + ) + @defer.inlineCallbacks def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index aff6dc9855..8c766b8a00 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -286,7 +286,7 @@ class StreamStore(SQLBaseStore): defer.returnValue(ret) def get_room_events_max_id(self): - return self._db_pool.runInteraction(self._get_room_events_max_id_txn) + return self.runInteraction(self._get_room_events_max_id_txn) def _get_room_events_max_id_txn(self, txn): txn.execute( diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 7467e1035b..ab4599b468 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -41,7 +41,7 @@ class TransactionStore(SQLBaseStore): this transaction or a 2-tuple of (int, dict) """ - return self._db_pool.runInteraction( + return self.runInteraction( self._get_received_txn_response, transaction_id, origin ) @@ -72,7 +72,7 @@ class TransactionStore(SQLBaseStore): response_json (str) """ - return self._db_pool.runInteraction( + return self.runInteraction( self._set_received_txn_response, transaction_id, origin, code, response_dict ) @@ -104,7 +104,7 @@ class TransactionStore(SQLBaseStore): list: A list of previous transaction ids. """ - return self._db_pool.runInteraction( + return self.runInteraction( self._prep_send_transaction, transaction_id, destination, ts, pdu_list ) @@ -159,7 +159,7 @@ class TransactionStore(SQLBaseStore): code (int) response_json (str) """ - return self._db_pool.runInteraction( + return self.runInteraction( self._delivered_txn, transaction_id, destination, code, response_dict ) @@ -184,7 +184,7 @@ class TransactionStore(SQLBaseStore): Returns: list: A list of `ReceivedTransactionsTable.EntryType` """ - return self._db_pool.runInteraction( + return self.runInteraction( self._get_transactions_after, transaction_id, destination ) @@ -214,7 +214,7 @@ class TransactionStore(SQLBaseStore): Returns list: A list of PduTuple """ - return self._db_pool.runInteraction( + return self.runInteraction( self._get_pdus_after_transaction, transaction_id, destination ) -- cgit 1.5.1 From a87eac4308e2230c2f79f41e2e1817636da4b208 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 12 Sep 2014 15:51:51 +0100 Subject: Revert recent changes to RoomMemberStore --- synapse/storage/__init__.py | 2 +- synapse/storage/roommember.py | 36 +++++++++--------------------------- 2 files changed, 10 insertions(+), 28 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 629c110bed..0dbae504b2 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -154,7 +154,7 @@ class DataStore(RoomMemberStore, RoomStore, @log_function def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None): if event.type == RoomMemberEvent.TYPE: - self._store_room_member_from_event_txn(txn, event) + self._store_room_member_txn(txn, event) elif event.type == FeedbackEvent.TYPE: self._store_feedback_txn(txn, event) elif event.type == RoomNameEvent.TYPE: diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 8cbc15356d..9a393e2568 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -26,55 +26,37 @@ logger = logging.getLogger(__name__) class RoomMemberStore(SQLBaseStore): - def _store_room_member_from_event_txn(self, txn, event): - self._store_room_member_txn(txn, - target_user_id=event.state_key, - sender_user_id=event.user_id, - room_id=event.room_id, - event_id=event.event_id, - membership=event.membership, - ) - - def _store_room_member_txn(self, txn, target_user_id, sender_user_id, - room_id, event_id, membership): + def _store_room_member_txn(self, txn, event): """Store a room member in the database. """ + target_user_id = event.state_key domain = self.hs.parse_userid(target_user_id).domain self._simple_insert_txn( txn, "room_memberships", { - "event_id": event_id, + "event_id": event.event_id, "user_id": target_user_id, - "sender": sender_user_id, - "room_id": room_id, - "membership": membership, + "sender": event.user_id, + "room_id": event.room_id, + "membership": event.membership, } ) # Update room hosts table - # TODO(paul): This code is massively broken currently as it doesn't - # count users per room - meaning it'll delete on the FIRST user to - # have a membership other than JOIN - say, LEAVE, or even INVITE. - # FIXME - if membership == Membership.JOIN: + if event.membership == Membership.JOIN: sql = ( "INSERT OR IGNORE INTO room_hosts (room_id, host) " "VALUES (?, ?)" ) - txn.execute(sql, (room_id, domain)) + txn.execute(sql, (event.room_id, domain)) else: sql = ( "DELETE FROM room_hosts WHERE room_id = ? AND host = ?" ) - txn.execute(sql, (room_id, domain)) - - def store_room_member(self, user_id, room_id, event_id, membership): - return self.runInteraction(self._store_room_member_txn, - user_id, user_id, room_id, event_id, membership - ) + txn.execute(sql, (event.room_id, domain)) @defer.inlineCallbacks def get_room_member(self, user_id, room_id): -- cgit 1.5.1