diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py
index a9991e9c94..0cee196851 100644
--- a/synapse/api/events/__init__.py
+++ b/synapse/api/events/__init__.py
@@ -156,7 +156,8 @@ class SynapseEvent(JsonEncodedObject):
return "Missing %s key" % key
if type(content[key]) != type(template[key]):
- return "Key %s is of the wrong type." % key
+ return "Key %s is of the wrong type (got %s, want %s)" % (
+ key, type(content[key]), type(template[key]))
if type(content[key]) == dict:
# we must go deeper
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index d675d8c8f9..2f1b954902 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage import read_schema
+from synapse.storage import prepare_database
from synapse.server import HomeServer
@@ -36,30 +36,14 @@ from daemonize import Daemonize
import twisted.manhole.telnet
import logging
-import sqlite3
import os
import re
import sys
+import sqlite3
logger = logging.getLogger(__name__)
-SCHEMAS = [
- "transactions",
- "pdu",
- "users",
- "profiles",
- "presence",
- "im",
- "room_aliases",
-]
-
-
-# Remember to update this number every time an incompatible change is made to
-# database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 3
-
-
class SynapseHomeServer(HomeServer):
def build_http_client(self):
@@ -80,52 +64,12 @@ class SynapseHomeServer(HomeServer):
)
def build_db_pool(self):
- """ Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we
- don't have to worry about overwriting existing content.
- """
- logging.info("Preparing database: %s...", self.db_name)
-
- with sqlite3.connect(self.db_name) as db_conn:
- c = db_conn.cursor()
- c.execute("PRAGMA user_version")
- row = c.fetchone()
-
- if row and row[0]:
- user_version = row[0]
-
- if user_version > SCHEMA_VERSION:
- raise ValueError("Cannot use this database as it is too " +
- "new for the server to understand"
- )
- elif user_version < SCHEMA_VERSION:
- logging.info("Upgrading database from version %d",
- user_version
- )
-
- # Run every version since after the current version.
- for v in range(user_version + 1, SCHEMA_VERSION + 1):
- sql_script = read_schema("delta/v%d" % (v))
- c.executescript(sql_script)
-
- db_conn.commit()
-
- else:
- for sql_loc in SCHEMAS:
- sql_script = read_schema(sql_loc)
-
- c.executescript(sql_script)
- db_conn.commit()
- c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION)
-
- c.close()
-
- logging.info("Database prepared in %s.", self.db_name)
-
- pool = adbapi.ConnectionPool(
- 'sqlite3', self.db_name, check_same_thread=False,
- cp_min=1, cp_max=1)
-
- return pool
+ return adbapi.ConnectionPool(
+ "sqlite3", self.get_db_name(),
+ check_same_thread=False,
+ cp_min=1,
+ cp_max=1
+ )
def create_resource_tree(self, web_client, redirect_root_to_web_client):
"""Create the resource tree for this Home Server.
@@ -230,10 +174,6 @@ class SynapseHomeServer(HomeServer):
logger.info("Synapse now listening on port %d", unsecure_port)
-def run():
- reactor.run()
-
-
def setup():
config = HomeServerConfig.load_config(
"Synapse Homeserver",
@@ -268,7 +208,15 @@ def setup():
web_client=config.webclient,
redirect_root_to_web_client=True,
)
- hs.start_listening(config.bind_port, config.unsecure_port)
+
+ db_name = hs.get_db_name()
+
+ logging.info("Preparing database: %s...", db_name)
+
+ with sqlite3.connect(db_name) as db_conn:
+ prepare_database(db_conn)
+
+ logging.info("Database prepared in %s.", db_name)
hs.get_db_pool()
@@ -279,12 +227,14 @@ def setup():
f.namespace['hs'] = hs
reactor.listenTCP(config.manhole, f, interface='127.0.0.1')
+ hs.start_listening(config.bind_port, config.unsecure_port)
+
if config.daemonize:
print config.pid_file
daemon = Daemonize(
app="synapse-homeserver",
pid=config.pid_file,
- action=run,
+ action=reactor.run,
auto_close_fds=False,
verbose=True,
logger=logger,
@@ -292,7 +242,7 @@ def setup():
daemon.start()
else:
- run()
+ reactor.run()
if __name__ == '__main__':
diff --git a/synapse/server.py b/synapse/server.py
index 7c185537aa..cdea49e6ab 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -58,6 +58,7 @@ class BaseHomeServer(object):
DEPENDENCIES = [
'clock',
'http_client',
+ 'db_name',
'db_pool',
'persistence_service',
'replication_layer',
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 1cede2809d..66658f6721 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -47,6 +47,23 @@ import os
logger = logging.getLogger(__name__)
+
+SCHEMAS = [
+ "transactions",
+ "pdu",
+ "users",
+ "profiles",
+ "presence",
+ "im",
+ "room_aliases",
+]
+
+
+# Remember to update this number every time an incompatible change is made to
+# database schema files, so the users will be informed on server restarts.
+SCHEMA_VERSION = 3
+
+
class _RollbackButIsFineException(Exception):
""" This exception is used to rollback a transaction without implying
something went wrong.
@@ -78,7 +95,7 @@ class DataStore(RoomMemberStore, RoomStore,
stream_ordering = self.min_token
try:
- yield self._db_pool.runInteraction(
+ yield self.runInteraction(
self._persist_pdu_event_txn,
pdu=pdu,
event=event,
@@ -291,7 +308,7 @@ class DataStore(RoomMemberStore, RoomStore,
prev_state_pdu=prev_state_pdu,
)
- return self._db_pool.runInteraction(_snapshot)
+ return self.runInteraction(_snapshot)
class Snapshot(object):
@@ -361,3 +378,42 @@ def read_schema(schema):
"""
with open(schema_path(schema)) as schema_file:
return schema_file.read()
+
+
+def prepare_database(db_conn):
+ """ Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we
+ don't have to worry about overwriting existing content.
+ """
+ c = db_conn.cursor()
+ c.execute("PRAGMA user_version")
+ row = c.fetchone()
+
+ if row and row[0]:
+ user_version = row[0]
+
+ if user_version > SCHEMA_VERSION:
+ raise ValueError("Cannot use this database as it is too " +
+ "new for the server to understand"
+ )
+ elif user_version < SCHEMA_VERSION:
+ logging.info("Upgrading database from version %d",
+ user_version
+ )
+
+ # Run every version since after the current version.
+ for v in range(user_version + 1, SCHEMA_VERSION + 1):
+ sql_script = read_schema("delta/v%d" % (v))
+ c.executescript(sql_script)
+
+ db_conn.commit()
+
+ else:
+ for sql_loc in SCHEMAS:
+ sql_script = read_schema(sql_loc)
+
+ c.executescript(sql_script)
+ db_conn.commit()
+ c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION)
+
+ c.close()
+
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index cf88bfc22b..76ed7d06fb 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -26,6 +26,44 @@ import json
logger = logging.getLogger(__name__)
+sql_logger = logging.getLogger("synapse.storage.SQL")
+
+
+class LoggingTransaction(object):
+ """An object that almost-transparently proxies for the 'txn' object
+ passed to the constructor. Adds logging to the .execute() method."""
+ __slots__ = ["txn"]
+
+ def __init__(self, txn):
+ object.__setattr__(self, "txn", txn)
+
+ def __getattribute__(self, name):
+ if name == "execute":
+ return object.__getattribute__(self, "execute")
+
+ return getattr(object.__getattribute__(self, "txn"), name)
+
+ def __setattr__(self, name, value):
+ setattr(object.__getattribute__(self, "txn"), name, value)
+
+ def execute(self, sql, *args, **kwargs):
+ # TODO(paul): Maybe use 'info' and 'debug' for values?
+ sql_logger.debug("[SQL] %s", sql)
+ try:
+ if args and args[0]:
+ values = args[0]
+ sql_logger.debug("[SQL values] " +
+ ", ".join(("<%s>",) * len(values)), *values)
+ except:
+ # Don't let logging failures stop SQL from working
+ pass
+
+ # TODO(paul): Here would be an excellent place to put some timing
+ # measurements, and log (warning?) slow queries.
+ return object.__getattribute__(self, "txn").execute(
+ sql, *args, **kwargs
+ )
+
class SQLBaseStore(object):
@@ -35,6 +73,13 @@ class SQLBaseStore(object):
self.event_factory = hs.get_event_factory()
self._clock = hs.get_clock()
+ def runInteraction(self, func, *args, **kwargs):
+ """Wraps the .runInteraction() method on the underlying db_pool."""
+ def inner_func(txn, *args, **kwargs):
+ return func(LoggingTransaction(txn), *args, **kwargs)
+
+ return self._db_pool.runInteraction(inner_func, *args, **kwargs)
+
def cursor_to_dict(self, cursor):
"""Converts a SQL cursor into an list of dicts.
@@ -60,11 +105,6 @@ class SQLBaseStore(object):
Returns:
The result of decoder(results)
"""
- logger.debug(
- "[SQL] %s Args=%s Func=%s",
- query, args, decoder.__name__ if decoder else None
- )
-
def interaction(txn):
cursor = txn.execute(query, args)
if decoder:
@@ -72,7 +112,7 @@ class SQLBaseStore(object):
else:
return cursor.fetchall()
- return self._db_pool.runInteraction(interaction)
+ return self.runInteraction(interaction)
def _execute_and_decode(self, query, *args):
return self._execute(self.cursor_to_dict, query, *args)
@@ -88,7 +128,7 @@ class SQLBaseStore(object):
values : dict of new column names and values for them
or_replace : bool; if True performs an INSERT OR REPLACE
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._simple_insert_txn, table, values, or_replace=or_replace
)
@@ -172,7 +212,7 @@ class SQLBaseStore(object):
txn.execute(sql, keyvalues.values())
return txn.fetchall()
- res = yield self._db_pool.runInteraction(func)
+ res = yield self.runInteraction(func)
defer.returnValue([r[0] for r in res])
@@ -195,7 +235,7 @@ class SQLBaseStore(object):
txn.execute(sql, keyvalues.values())
return self.cursor_to_dict(txn)
- return self._db_pool.runInteraction(func)
+ return self.runInteraction(func)
def _simple_update_one(self, table, keyvalues, updatevalues,
retcols=None):
@@ -263,7 +303,7 @@ class SQLBaseStore(object):
raise StoreError(500, "More than one row matched")
return ret
- return self._db_pool.runInteraction(func)
+ return self.runInteraction(func)
def _simple_delete_one(self, table, keyvalues):
"""Executes a DELETE query on the named table, expecting to delete a
@@ -284,7 +324,7 @@ class SQLBaseStore(object):
raise StoreError(404, "No row found")
if txn.rowcount > 1:
raise StoreError(500, "more than one row matched")
- return self._db_pool.runInteraction(func)
+ return self.runInteraction(func)
def _simple_max_id(self, table):
"""Executes a SELECT query on the named table, expecting to return the
@@ -302,7 +342,7 @@ class SQLBaseStore(object):
return 0
return max_id
- return self._db_pool.runInteraction(func)
+ return self.runInteraction(func)
def _parse_event_from_row(self, row_dict):
d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
@@ -325,7 +365,7 @@ class SQLBaseStore(object):
)
def _parse_events(self, rows):
- return self._db_pool.runInteraction(self._parse_events_txn, rows)
+ return self.runInteraction(self._parse_events_txn, rows)
def _parse_events_txn(self, txn, rows):
events = [self._parse_event_from_row(r) for r in rows]
diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py
index 3c859fdeac..d70467dcd6 100644
--- a/synapse/storage/pdu.py
+++ b/synapse/storage/pdu.py
@@ -43,7 +43,7 @@ class PduStore(SQLBaseStore):
PduTuple: If the pdu does not exist in the database, returns None
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_pdu_tuple, pdu_id, origin
)
@@ -95,7 +95,7 @@ class PduStore(SQLBaseStore):
list: A list of PduTuples
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_current_state_for_context,
context
)
@@ -143,7 +143,7 @@ class PduStore(SQLBaseStore):
pdu_origin (str)
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._mark_as_processed, pdu_id, pdu_origin
)
@@ -152,7 +152,7 @@ class PduStore(SQLBaseStore):
def get_all_pdus_from_context(self, context):
"""Get a list of all PDUs for a given context."""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_all_pdus_from_context, context,
)
@@ -179,7 +179,7 @@ class PduStore(SQLBaseStore):
Return:
list: A list of PduTuples
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_backfill, context, pdu_list, limit
)
@@ -240,7 +240,7 @@ class PduStore(SQLBaseStore):
txn
context (str)
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_min_depth_for_context, context
)
@@ -346,7 +346,7 @@ class PduStore(SQLBaseStore):
bool
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._is_pdu_new,
pdu_id=pdu_id,
origin=origin,
@@ -499,7 +499,7 @@ class StatePduStore(SQLBaseStore):
)
def get_unresolved_state_tree(self, new_state_pdu):
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_unresolved_state_tree, new_state_pdu
)
@@ -538,7 +538,7 @@ class StatePduStore(SQLBaseStore):
def update_current_state(self, pdu_id, origin, context, pdu_type,
state_key):
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._update_current_state,
pdu_id, origin, context, pdu_type, state_key
)
@@ -577,7 +577,7 @@ class StatePduStore(SQLBaseStore):
PduEntry
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_current_state_pdu, context, pdu_type, state_key
)
@@ -636,7 +636,7 @@ class StatePduStore(SQLBaseStore):
Returns:
bool: True if the new_pdu clobbered the current state, False if not
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._handle_new_state, new_pdu
)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index fd762bc643..db20b1daa0 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -62,7 +62,7 @@ class RegistrationStore(SQLBaseStore):
Raises:
StoreError if the user_id could not be registered.
"""
- yield self._db_pool.runInteraction(self._register, user_id, token,
+ yield self.runInteraction(self._register, user_id, token,
password_hash)
def _register(self, txn, user_id, token, password_hash):
@@ -99,7 +99,7 @@ class RegistrationStore(SQLBaseStore):
Raises:
StoreError if no user was found.
"""
- user_id = yield self._db_pool.runInteraction(self._query_for_auth,
+ user_id = yield self.runInteraction(self._query_for_auth,
token)
defer.returnValue(user_id)
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 017169ce00..5adf8cdf1b 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -149,7 +149,7 @@ class RoomStore(SQLBaseStore):
defer.returnValue(None)
def get_power_level(self, room_id, user_id):
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_power_level,
room_id, user_id,
)
@@ -182,7 +182,7 @@ class RoomStore(SQLBaseStore):
return None
def get_ops_levels(self, room_id):
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_ops_levels,
room_id,
)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 676b2f2653..04b4067d03 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -149,7 +149,7 @@ class RoomMemberStore(SQLBaseStore):
membership_list (list): A list of synapse.api.constants.Membership
values which the user must be in.
Returns:
- A list of dicts with "room_id" and "membership" keys.
+ A list of RoomMemberEvent objects
"""
if not membership_list:
return defer.succeed(None)
@@ -198,10 +198,11 @@ class RoomMemberStore(SQLBaseStore):
return results
@defer.inlineCallbacks
- def user_rooms_intersect(self, user_list):
- """ Checks whether a list of users share a room.
+ def user_rooms_intersect(self, user_id_list):
+ """ Checks whether all the users whose IDs are given in a list share a
+ room.
"""
- user_list_clause = " OR ".join(["m.user_id = ?"] * len(user_list))
+ user_list_clause = " OR ".join(["m.user_id = ?"] * len(user_id_list))
sql = (
"SELECT m.room_id FROM room_memberships as m "
"INNER JOIN current_state_events as c "
@@ -211,8 +212,8 @@ class RoomMemberStore(SQLBaseStore):
"GROUP BY m.room_id HAVING COUNT(m.room_id) = ?"
) % {"clause": user_list_clause}
- args = user_list
- args.append(len(user_list))
+ args = list(user_id_list)
+ args.append(len(user_id_list))
rows = yield self._execute(None, sql, *args)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index aff6dc9855..8c766b8a00 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -286,7 +286,7 @@ class StreamStore(SQLBaseStore):
defer.returnValue(ret)
def get_room_events_max_id(self):
- return self._db_pool.runInteraction(self._get_room_events_max_id_txn)
+ return self.runInteraction(self._get_room_events_max_id_txn)
def _get_room_events_max_id_txn(self, txn):
txn.execute(
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 7467e1035b..ab4599b468 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -41,7 +41,7 @@ class TransactionStore(SQLBaseStore):
this transaction or a 2-tuple of (int, dict)
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_received_txn_response, transaction_id, origin
)
@@ -72,7 +72,7 @@ class TransactionStore(SQLBaseStore):
response_json (str)
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._set_received_txn_response,
transaction_id, origin, code, response_dict
)
@@ -104,7 +104,7 @@ class TransactionStore(SQLBaseStore):
list: A list of previous transaction ids.
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._prep_send_transaction,
transaction_id, destination, ts, pdu_list
)
@@ -159,7 +159,7 @@ class TransactionStore(SQLBaseStore):
code (int)
response_json (str)
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._delivered_txn,
transaction_id, destination, code, response_dict
)
@@ -184,7 +184,7 @@ class TransactionStore(SQLBaseStore):
Returns:
list: A list of `ReceivedTransactionsTable.EntryType`
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_transactions_after, transaction_id, destination
)
@@ -214,7 +214,7 @@ class TransactionStore(SQLBaseStore):
Returns
list: A list of PduTuple
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_pdus_after_transaction,
transaction_id, destination
)
|