diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 4beb951b9f..a3ff995695 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
+from .appservice import ApplicationServiceStore
from .directory import DirectoryStore
from .feedback import FeedbackStore
from .presence import PresenceStore
@@ -29,10 +30,14 @@ from .stream import StreamStore
from .transactions import TransactionStore
from .keys import KeyStore
from .event_federation import EventFederationStore
+from .pusher import PusherStore
+from .push_rule import PushRuleStore
from .media_repository import MediaRepositoryStore
+from .rejections import RejectionsStore
from .state import StateStore
from .signatures import SignatureStore
+from .filtering import FilteringStore
from syutil.base64util import decode_base64
from syutil.jsonutil import encode_canonical_json
@@ -40,33 +45,21 @@ from syutil.jsonutil import encode_canonical_json
from synapse.crypto.event_signing import compute_event_reference_hash
-import json
+import fnmatch
+import imp
import logging
import os
+import re
logger = logging.getLogger(__name__)
-SCHEMAS = [
- "transactions",
- "users",
- "profiles",
- "presence",
- "im",
- "room_aliases",
- "keys",
- "redactions",
- "state",
- "event_edges",
- "event_signatures",
- "media_repository",
-]
+# Remember to update this number every time a change is made to database
+# schema files, so the users will be informed on server restarts.
+SCHEMA_VERSION = 14
-
-# Remember to update this number every time an incompatible change is made to
-# database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 11
+dir_path = os.path.abspath(os.path.dirname(__file__))
class _RollbackButIsFineException(Exception):
@@ -80,8 +73,13 @@ class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
PresenceStore, TransactionStore,
DirectoryStore, KeyStore, StateStore, SignatureStore,
+ ApplicationServiceStore,
EventFederationStore,
MediaRepositoryStore,
+ RejectionsStore,
+ FilteringStore,
+ PusherStore,
+ PushRuleStore
):
def __init__(self, hs):
@@ -117,21 +115,147 @@ class DataStore(RoomMemberStore, RoomStore,
pass
@defer.inlineCallbacks
- def get_event(self, event_id, allow_none=False):
- events = yield self._get_events([event_id])
+ def get_event(self, event_id, check_redacted=True,
+ get_prev_content=False, allow_rejected=False,
+ allow_none=False):
+ """Get an event from the database by event_id.
+
+ Args:
+ event_id (str): The event_id of the event to fetch
+ check_redacted (bool): If True, check if event has been redacted
+ and redact it.
+ get_prev_content (bool): If True and event is a state event,
+ include the previous states content in the unsigned field.
+ allow_rejected (bool): If True return rejected events.
+ allow_none (bool): If True, return None if no event found, if
+ False throw an exception.
+
+ Returns:
+ Deferred : A FrozenEvent.
+ """
+ event = yield self.runInteraction(
+ "get_event", self._get_event_txn,
+ event_id,
+ check_redacted=check_redacted,
+ get_prev_content=get_prev_content,
+ allow_rejected=allow_rejected,
+ )
- if not events:
- if allow_none:
- defer.returnValue(None)
- else:
- raise RuntimeError("Could not find event %s" % (event_id,))
+ if not event and not allow_none:
+ raise RuntimeError("Could not find event %s" % (event_id,))
- defer.returnValue(events[0])
+ defer.returnValue(event)
@log_function
def _persist_event_txn(self, txn, event, context, backfilled,
stream_ordering=None, is_new_state=True,
current_state=None):
+
+ # Remove the any existing cache entries for the event_id
+ self._get_event_cache.pop(event.event_id)
+
+ # We purposefully do this first since if we include a `current_state`
+ # key, we *want* to update the `current_state_events` table
+ if current_state:
+ txn.execute(
+ "DELETE FROM current_state_events WHERE room_id = ?",
+ (event.room_id,)
+ )
+
+ for s in current_state:
+ self._simple_insert_txn(
+ txn,
+ "current_state_events",
+ {
+ "event_id": s.event_id,
+ "room_id": s.room_id,
+ "type": s.type,
+ "state_key": s.state_key,
+ },
+ or_replace=True,
+ )
+
+ if event.is_state() and is_new_state:
+ if not backfilled and not context.rejected:
+ self._simple_insert_txn(
+ txn,
+ table="state_forward_extremities",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ or_replace=True,
+ )
+
+ for prev_state_id, _ in event.prev_state:
+ self._simple_delete_txn(
+ txn,
+ table="state_forward_extremities",
+ keyvalues={
+ "event_id": prev_state_id,
+ }
+ )
+
+ outlier = event.internal_metadata.is_outlier()
+
+ if not outlier:
+ self._store_state_groups_txn(txn, event, context)
+
+ self._update_min_depth_for_room_txn(
+ txn,
+ event.room_id,
+ event.depth
+ )
+
+ self._handle_prev_events(
+ txn,
+ outlier=outlier,
+ event_id=event.event_id,
+ prev_events=event.prev_events,
+ room_id=event.room_id,
+ )
+
+ have_persisted = self._simple_select_one_onecol_txn(
+ txn,
+ table="event_json",
+ keyvalues={"event_id": event.event_id},
+ retcol="event_id",
+ allow_none=True,
+ )
+
+ metadata_json = encode_canonical_json(
+ event.internal_metadata.get_dict()
+ )
+
+ # If we have already persisted this event, we don't need to do any
+ # more processing.
+ # The processing above must be done on every call to persist event,
+ # since they might not have happened on previous calls. For example,
+ # if we are persisting an event that we had persisted as an outlier,
+ # but is no longer one.
+ if have_persisted:
+ if not outlier:
+ sql = (
+ "UPDATE event_json SET internal_metadata = ?"
+ " WHERE event_id = ?"
+ )
+ txn.execute(
+ sql,
+ (metadata_json.decode("UTF-8"), event.event_id,)
+ )
+
+ sql = (
+ "UPDATE events SET outlier = 0"
+ " WHERE event_id = ?"
+ )
+ txn.execute(
+ sql,
+ (event.event_id,)
+ )
+ return
+
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
elif event.type == EventTypes.Feedback:
@@ -143,8 +267,6 @@ class DataStore(RoomMemberStore, RoomStore,
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
- outlier = event.internal_metadata.is_outlier()
-
event_dict = {
k: v
for k, v in event.get_dict().items()
@@ -154,10 +276,6 @@ class DataStore(RoomMemberStore, RoomStore,
]
}
- metadata_json = encode_canonical_json(
- event.internal_metadata.get_dict()
- )
-
self._simple_insert_txn(
txn,
table="event_json",
@@ -170,12 +288,16 @@ class DataStore(RoomMemberStore, RoomStore,
or_replace=True,
)
+ content = encode_canonical_json(
+ event.content
+ ).decode("UTF-8")
+
vals = {
"topological_ordering": event.depth,
"event_id": event.event_id,
"type": event.type,
"room_id": event.room_id,
- "content": json.dumps(event.get_dict()["content"]),
+ "content": content,
"processed": True,
"outlier": outlier,
"depth": event.depth,
@@ -195,7 +317,10 @@ class DataStore(RoomMemberStore, RoomStore,
"prev_events",
]
}
- vals["unrecognized_keys"] = json.dumps(unrec)
+
+ vals["unrecognized_keys"] = encode_canonical_json(
+ unrec
+ ).decode("UTF-8")
try:
self._simple_insert_txn(
@@ -213,38 +338,10 @@ class DataStore(RoomMemberStore, RoomStore,
)
raise _RollbackButIsFineException("_persist_event")
- self._handle_prev_events(
- txn,
- outlier=outlier,
- event_id=event.event_id,
- prev_events=event.prev_events,
- room_id=event.room_id,
- )
-
- if not outlier:
- self._store_state_groups_txn(txn, event, context)
-
- if current_state:
- txn.execute(
- "DELETE FROM current_state_events WHERE room_id = ?",
- (event.room_id,)
- )
-
- for s in current_state:
- self._simple_insert_txn(
- txn,
- "current_state_events",
- {
- "event_id": s.event_id,
- "room_id": s.room_id,
- "type": s.type,
- "state_key": s.state_key,
- },
- or_replace=True,
- )
+ if context.rejected:
+ self._store_rejections_txn(txn, event.event_id, context.rejected)
- is_state = hasattr(event, "state_key") and event.state_key is not None
- if is_state:
+ if event.is_state():
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
@@ -252,6 +349,7 @@ class DataStore(RoomMemberStore, RoomStore,
"state_key": event.state_key,
}
+ # TODO: How does this work with backfilling?
if hasattr(event, "replaces_state"):
vals["prev_state"] = event.replaces_state
@@ -262,7 +360,7 @@ class DataStore(RoomMemberStore, RoomStore,
or_replace=True,
)
- if is_new_state:
+ if is_new_state and not context.rejected:
self._simple_insert_txn(
txn,
"current_state_events",
@@ -288,28 +386,6 @@ class DataStore(RoomMemberStore, RoomStore,
or_ignore=True,
)
- if not backfilled:
- self._simple_insert_txn(
- txn,
- table="state_forward_extremities",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- },
- or_replace=True,
- )
-
- for prev_state_id, _ in event.prev_state:
- self._simple_delete_txn(
- txn,
- table="state_forward_extremities",
- keyvalues={
- "event_id": prev_state_id,
- }
- )
-
for hash_alg, hash_base64 in event.hashes.items():
hash_bytes = decode_base64(hash_base64)
self._store_event_content_hash_txn(
@@ -340,14 +416,9 @@ class DataStore(RoomMemberStore, RoomStore,
txn, event.event_id, ref_alg, ref_hash_bytes
)
- if not outlier:
- self._update_min_depth_for_room_txn(
- txn,
- event.room_id,
- event.depth
- )
-
def _store_redaction(self, txn, event):
+ # invalidate the cache for the redacted event
+ self._get_event_cache.pop(event.redacts)
txn.execute(
"INSERT OR IGNORE INTO redactions "
"(event_id, redacts) VALUES (?,?)",
@@ -370,9 +441,12 @@ class DataStore(RoomMemberStore, RoomStore,
"redacted": del_sql,
}
- if event_type:
+ if event_type and state_key is not None:
sql += " AND s.type = ? AND s.state_key = ? "
args = (room_id, event_type, state_key)
+ elif event_type:
+ sql += " AND s.type = ?"
+ args = (room_id, event_type)
else:
args = (room_id, )
@@ -382,6 +456,41 @@ class DataStore(RoomMemberStore, RoomStore,
defer.returnValue(events)
@defer.inlineCallbacks
+ def get_room_name_and_aliases(self, room_id):
+ del_sql = (
+ "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+ "LIMIT 1"
+ )
+
+ sql = (
+ "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
+ "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+ "INNER JOIN state_events as s ON e.event_id = s.event_id "
+ "WHERE c.room_id = ? "
+ ) % {
+ "redacted": del_sql,
+ }
+
+ sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
+ sql += " OR s.type = 'm.room.aliases')"
+ args = (room_id,)
+
+ results = yield self._execute_and_decode(sql, *args)
+
+ events = yield self._parse_events(results)
+
+ name = None
+ aliases = []
+
+ for e in events:
+ if e.type == 'm.room.name':
+ name = e.content['name']
+ elif e.type == 'm.room.aliases':
+ aliases.extend(e.content['aliases'])
+
+ defer.returnValue((name, aliases))
+
+ @defer.inlineCallbacks
def _get_min_token(self):
row = yield self._execute(
None,
@@ -417,30 +526,48 @@ class DataStore(RoomMemberStore, RoomStore,
],
)
+ def have_events(self, event_ids):
+ """Given a list of event ids, check if we have already processed them.
+
+ Returns:
+ dict: Has an entry for each event id we already have seen. Maps to
+ the rejected reason string if we rejected the event, else maps to
+ None.
+ """
+ if not event_ids:
+ return defer.succeed({})
+
+ def f(txn):
+ sql = (
+ "SELECT e.event_id, reason FROM events as e "
+ "LEFT JOIN rejections as r ON e.event_id = r.event_id "
+ "WHERE e.event_id = ?"
+ )
-def schema_path(schema):
- """ Get a filesystem path for the named database schema
+ res = {}
+ for event_id in event_ids:
+ txn.execute(sql, (event_id,))
+ row = txn.fetchone()
+ if row:
+ _, rejected = row
+ res[event_id] = rejected
- Args:
- schema: Name of the database schema.
- Returns:
- A filesystem path pointing at a ".sql" file.
+ return res
- """
- dir_path = os.path.dirname(__file__)
- schemaPath = os.path.join(dir_path, "schema", schema + ".sql")
- return schemaPath
+ return self.runInteraction(
+ "have_events", f,
+ )
-def read_schema(schema):
+def read_schema(path):
""" Read the named database schema.
Args:
- schema: Name of the datbase schema.
+ path: Path of the database schema.
Returns:
A string containing the database schema.
"""
- with open(schema_path(schema)) as schema_file:
+ with open(path) as schema_file:
return schema_file.read()
@@ -453,46 +580,275 @@ class UpgradeDatabaseException(PrepareDatabaseException):
def prepare_database(db_conn):
- """ Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we
- don't have to worry about overwriting existing content.
+ """Prepares a database for usage. Will either create all necessary tables
+ or upgrade from an older schema version.
"""
- c = db_conn.cursor()
- c.execute("PRAGMA user_version")
- row = c.fetchone()
+ try:
+ cur = db_conn.cursor()
+ version_info = _get_or_create_schema_state(cur)
+
+ if version_info:
+ user_version, delta_files, upgraded = version_info
+ _upgrade_existing_database(cur, user_version, delta_files, upgraded)
+ else:
+ _setup_new_database(cur)
- if row and row[0]:
- user_version = row[0]
+ cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
- if user_version > SCHEMA_VERSION:
- raise ValueError(
- "Cannot use this database as it is too " +
- "new for the server to understand"
- )
- elif user_version < SCHEMA_VERSION:
- logger.info(
- "Upgrading database from version %d",
- user_version
+ cur.close()
+ db_conn.commit()
+ except:
+ db_conn.rollback()
+ raise
+
+
+def _setup_new_database(cur):
+ """Sets up the database by finding a base set of "full schemas" and then
+ applying any necessary deltas.
+
+ The "full_schemas" directory has subdirectories named after versions. This
+ function searches for the highest version less than or equal to
+ `SCHEMA_VERSION` and executes all .sql files in that directory.
+
+ The function will then apply all deltas for all versions after the base
+ version.
+
+ Example directory structure:
+
+ schema/
+ delta/
+ ...
+ full_schemas/
+ 3/
+ test.sql
+ ...
+ 11/
+ foo.sql
+ bar.sql
+ ...
+
+ In the example foo.sql and bar.sql would be run, and then any delta files
+ for versions strictly greater than 11.
+ """
+ current_dir = os.path.join(dir_path, "schema", "full_schemas")
+ directory_entries = os.listdir(current_dir)
+
+ valid_dirs = []
+ pattern = re.compile(r"^\d+(\.sql)?$")
+ for filename in directory_entries:
+ match = pattern.match(filename)
+ abs_path = os.path.join(current_dir, filename)
+ if match and os.path.isdir(abs_path):
+ ver = int(match.group(0))
+ if ver <= SCHEMA_VERSION:
+ valid_dirs.append((ver, abs_path))
+ else:
+ logger.warn("Unexpected entry in 'full_schemas': %s", filename)
+
+ if not valid_dirs:
+ raise PrepareDatabaseException(
+ "Could not find a suitable base set of full schemas"
+ )
+
+ max_current_ver, sql_dir = max(valid_dirs, key=lambda x: x[0])
+
+ logger.debug("Initialising schema v%d", max_current_ver)
+
+ directory_entries = os.listdir(sql_dir)
+
+ sql_script = "BEGIN TRANSACTION;\n"
+ for filename in fnmatch.filter(directory_entries, "*.sql"):
+ sql_loc = os.path.join(sql_dir, filename)
+ logger.debug("Applying schema %s", sql_loc)
+ sql_script += read_schema(sql_loc)
+ sql_script += "\n"
+ sql_script += "COMMIT TRANSACTION;"
+ cur.executescript(sql_script)
+
+ cur.execute(
+ "INSERT OR REPLACE INTO schema_version (version, upgraded)"
+ " VALUES (?,?)",
+ (max_current_ver, False)
+ )
+
+ _upgrade_existing_database(
+ cur,
+ current_version=max_current_ver,
+ applied_delta_files=[],
+ upgraded=False
+ )
+
+
+def _upgrade_existing_database(cur, current_version, applied_delta_files,
+ upgraded):
+ """Upgrades an existing database.
+
+ Delta files can either be SQL stored in *.sql files, or python modules
+ in *.py.
+
+ There can be multiple delta files per version. Synapse will keep track of
+ which delta files have been applied, and will apply any that haven't been
+ even if there has been no version bump. This is useful for development
+ where orthogonal schema changes may happen on separate branches.
+
+ Different delta files for the same version *must* be orthogonal and give
+ the same result when applied in any order. No guarantees are made on the
+ order of execution of these scripts.
+
+ This is a no-op of current_version == SCHEMA_VERSION.
+
+ Example directory structure:
+
+ schema/
+ delta/
+ 11/
+ foo.sql
+ ...
+ 12/
+ foo.sql
+ bar.py
+ ...
+ full_schemas/
+ ...
+
+ In the example, if current_version is 11, then foo.sql will be run if and
+ only if `upgraded` is True. Then `foo.sql` and `bar.py` would be run in
+ some arbitrary order.
+
+ Args:
+ cur (Cursor)
+ current_version (int): The current version of the schema.
+ applied_delta_files (list): A list of deltas that have already been
+ applied.
+ upgraded (bool): Whether the current version was generated by having
+ applied deltas or from full schema file. If `True` the function
+ will never apply delta files for the given `current_version`, since
+ the current_version wasn't generated by applying those delta files.
+ """
+
+ if current_version > SCHEMA_VERSION:
+ raise ValueError(
+ "Cannot use this database as it is too " +
+ "new for the server to understand"
+ )
+
+ start_ver = current_version
+ if not upgraded:
+ start_ver += 1
+
+ for v in range(start_ver, SCHEMA_VERSION + 1):
+ logger.debug("Upgrading schema to v%d", v)
+
+ delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
+
+ try:
+ directory_entries = os.listdir(delta_dir)
+ except OSError:
+ logger.exception("Could not open delta dir for version %d", v)
+ raise UpgradeDatabaseException(
+ "Could not open delta dir for version %d" % (v,)
)
- # Run every version since after the current version.
- for v in range(user_version + 1, SCHEMA_VERSION + 1):
- if v == 10:
- raise UpgradeDatabaseException(
- "No delta for version 10"
+ directory_entries.sort()
+ for file_name in directory_entries:
+ relative_path = os.path.join(str(v), file_name)
+ if relative_path in applied_delta_files:
+ continue
+
+ absolute_path = os.path.join(
+ dir_path, "schema", "delta", relative_path,
+ )
+ root_name, ext = os.path.splitext(file_name)
+ if ext == ".py":
+ # This is a python upgrade module. We need to import into some
+ # package and then execute its `run_upgrade` function.
+ module_name = "synapse.storage.v%d_%s" % (
+ v, root_name
+ )
+ with open(absolute_path) as python_file:
+ module = imp.load_source(
+ module_name, absolute_path, python_file
)
- sql_script = read_schema("delta/v%d" % (v))
- c.executescript(sql_script)
-
- db_conn.commit()
-
- else:
- sql_script = "BEGIN TRANSACTION;\n"
- for sql_loc in SCHEMAS:
- sql_script += read_schema(sql_loc)
- sql_script += "\n"
- sql_script += "COMMIT TRANSACTION;"
- c.executescript(sql_script)
- db_conn.commit()
- c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION)
+ logger.debug("Running script %s", relative_path)
+ module.run_upgrade(cur)
+ elif ext == ".sql":
+ # A plain old .sql file, just read and execute it
+ delta_schema = read_schema(absolute_path)
+ logger.debug("Applying schema %s", relative_path)
+ cur.executescript(delta_schema)
+ else:
+ # Not a valid delta file.
+ logger.warn(
+ "Found directory entry that did not end in .py or"
+ " .sql: %s",
+ relative_path,
+ )
+ continue
+
+ # Mark as done.
+ cur.execute(
+ "INSERT INTO applied_schema_deltas (version, file)"
+ " VALUES (?,?)",
+ (v, relative_path)
+ )
- c.close()
+ cur.execute(
+ "INSERT OR REPLACE INTO schema_version (version, upgraded)"
+ " VALUES (?,?)",
+ (v, True)
+ )
+
+
+def _get_or_create_schema_state(txn):
+ schema_path = os.path.join(
+ dir_path, "schema", "schema_version.sql",
+ )
+ create_schema = read_schema(schema_path)
+ txn.executescript(create_schema)
+
+ txn.execute("SELECT version, upgraded FROM schema_version")
+ row = txn.fetchone()
+ current_version = int(row[0]) if row else None
+ upgraded = bool(row[1]) if row else None
+
+ if current_version:
+ txn.execute(
+ "SELECT file FROM applied_schema_deltas WHERE version >= ?",
+ (current_version,)
+ )
+ return current_version, txn.fetchall(), upgraded
+
+ return None
+
+
+def prepare_sqlite3_database(db_conn):
+ """This function should be called before `prepare_database` on sqlite3
+ databases.
+
+ Since we changed the way we store the current schema version and handle
+ updates to schemas, we need a way to upgrade from the old method to the
+ new. This only affects sqlite databases since they were the only ones
+ supported at the time.
+ """
+ with db_conn:
+ schema_path = os.path.join(
+ dir_path, "schema", "schema_version.sql",
+ )
+ create_schema = read_schema(schema_path)
+ db_conn.executescript(create_schema)
+
+ c = db_conn.execute("SELECT * FROM schema_version")
+ rows = c.fetchall()
+ c.close()
+
+ if not rows:
+ c = db_conn.execute("PRAGMA user_version")
+ row = c.fetchone()
+ c.close()
+
+ if row and row[0]:
+ db_conn.execute(
+ "INSERT OR REPLACE INTO schema_version (version, upgraded)"
+ " VALUES (?,?)",
+ (row[0], False)
+ )
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index ce63f12008..3725c9795d 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -19,11 +19,12 @@ from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
+from synapse.util.lrucache import LruCache
from twisted.internet import defer
-import collections
-import json
+from collections import namedtuple, OrderedDict
+import simplejson as json
import sys
import time
@@ -34,6 +35,52 @@ sql_logger = logging.getLogger("synapse.storage.SQL")
transaction_logger = logging.getLogger("synapse.storage.txn")
+# TODO(paul):
+# * more generic key management
+# * export monitoring stats
+# * consider other eviction strategies - LRU?
+def cached(max_entries=1000):
+ """ A method decorator that applies a memoizing cache around the function.
+
+ The function is presumed to take one additional argument, which is used as
+ the key for the cache. Cache hits are served directly from the cache;
+ misses use the function body to generate the value.
+
+ The wrapped function has an additional member, a callable called
+ "invalidate". This can be used to remove individual entries from the cache.
+
+ The wrapped function has another additional callable, called "prefill",
+ which can be used to insert values into the cache specifically, without
+ calling the calculation function.
+ """
+ def wrap(orig):
+ cache = OrderedDict()
+
+ def prefill(key, value):
+ while len(cache) > max_entries:
+ cache.popitem(last=False)
+
+ cache[key] = value
+
+ @defer.inlineCallbacks
+ def wrapped(self, key):
+ if key in cache:
+ defer.returnValue(cache[key])
+
+ ret = yield orig(self, key)
+ prefill(key, ret)
+ defer.returnValue(ret)
+
+ def invalidate(key):
+ cache.pop(key, None)
+
+ wrapped.invalidate = invalidate
+ wrapped.prefill = prefill
+ return wrapped
+
+ return wrap
+
+
class LoggingTransaction(object):
"""An object that almost-transparently proxies for the 'txn' object
passed to the constructor. Adds logging to the .execute() method."""
@@ -77,6 +124,43 @@ class LoggingTransaction(object):
sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
+class PerformanceCounters(object):
+ def __init__(self):
+ self.current_counters = {}
+ self.previous_counters = {}
+
+ def update(self, key, start_time, end_time=None):
+ if end_time is None:
+ end_time = time.time() * 1000
+ duration = end_time - start_time
+ count, cum_time = self.current_counters.get(key, (0, 0))
+ count += 1
+ cum_time += duration
+ self.current_counters[key] = (count, cum_time)
+ return end_time
+
+ def interval(self, interval_duration, limit=3):
+ counters = []
+ for name, (count, cum_time) in self.current_counters.items():
+ prev_count, prev_time = self.previous_counters.get(name, (0, 0))
+ counters.append((
+ (cum_time - prev_time) / interval_duration,
+ count - prev_count,
+ name
+ ))
+
+ self.previous_counters = dict(self.current_counters)
+
+ counters.sort(reverse=True)
+
+ top_n_counters = ", ".join(
+ "%s(%d): %.3f%%" % (name, count, 100 * ratio)
+ for ratio, count, name in counters[:limit]
+ )
+
+ return top_n_counters
+
+
class SQLBaseStore(object):
_TXN_ID = 0
@@ -85,6 +169,43 @@ class SQLBaseStore(object):
self._db_pool = hs.get_db_pool()
self._clock = hs.get_clock()
+ self._previous_txn_total_time = 0
+ self._current_txn_total_time = 0
+ self._previous_loop_ts = 0
+ self._txn_perf_counters = PerformanceCounters()
+ self._get_event_counters = PerformanceCounters()
+
+ self._get_event_cache = LruCache(hs.config.event_cache_size)
+
+ def start_profiling(self):
+ self._previous_loop_ts = self._clock.time_msec()
+
+ def loop():
+ curr = self._current_txn_total_time
+ prev = self._previous_txn_total_time
+ self._previous_txn_total_time = curr
+
+ time_now = self._clock.time_msec()
+ time_then = self._previous_loop_ts
+ self._previous_loop_ts = time_now
+
+ ratio = (curr - prev)/(time_now - time_then)
+
+ top_three_counters = self._txn_perf_counters.interval(
+ time_now - time_then, limit=3
+ )
+
+ top_3_event_counters = self._get_event_counters.interval(
+ time_now - time_then, limit=3
+ )
+
+ logger.info(
+ "Total database time: %.3f%% {%s} {%s}",
+ ratio * 100, top_three_counters, top_3_event_counters
+ )
+
+ self._clock.looping_call(loop, 10000)
+
@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
"""Wraps the .runInteraction() method on the underlying db_pool."""
@@ -94,8 +215,7 @@ class SQLBaseStore(object):
with LoggingContext("runInteraction") as context:
current_context.copy_to(context)
start = time.time() * 1000
- txn_id = SQLBaseStore._TXN_ID
- SQLBaseStore._TXN_ID += 1
+ txn_id = self._TXN_ID
# We don't really need these to be unique, so lets stop it from
# growing really large.
@@ -115,6 +235,10 @@ class SQLBaseStore(object):
"[TXN END] {%s} %f",
name, end - start
)
+
+ self._current_txn_total_time += end - start
+ self._txn_perf_counters.update(desc, start, end)
+
with PreserveLoggingContext():
result = yield self._db_pool.runInteraction(
inner_func, *args, **kwargs
@@ -194,6 +318,50 @@ class SQLBaseStore(object):
txn.execute(sql, values.values())
return txn.lastrowid
+ def _simple_upsert(self, table, keyvalues, values):
+ """
+ Args:
+ table (str): The table to upsert into
+ keyvalues (dict): The unique key tables and their new values
+ values (dict): The nonunique columns and their new values
+ Returns: A deferred
+ """
+ return self.runInteraction(
+ "_simple_upsert",
+ self._simple_upsert_txn, table, keyvalues, values
+ )
+
+ def _simple_upsert_txn(self, txn, table, keyvalues, values):
+ # Try to update
+ sql = "UPDATE %s SET %s WHERE %s" % (
+ table,
+ ", ".join("%s = ?" % (k,) for k in values),
+ " AND ".join("%s = ?" % (k,) for k in keyvalues)
+ )
+ sqlargs = values.values() + keyvalues.values()
+ logger.debug(
+ "[SQL] %s Args=%s",
+ sql, sqlargs,
+ )
+
+ txn.execute(sql, sqlargs)
+ if txn.rowcount == 0:
+ # We didn't update and rows so insert a new one
+ allvalues = {}
+ allvalues.update(keyvalues)
+ allvalues.update(values)
+
+ sql = "INSERT INTO %s (%s) VALUES (%s)" % (
+ table,
+ ", ".join(k for k in allvalues),
+ ", ".join("?" for _ in allvalues)
+ )
+ logger.debug(
+ "[SQL] %s Args=%s",
+ sql, keyvalues.values(),
+ )
+ txn.execute(sql, allvalues.values())
+
def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False):
"""Executes a SELECT query on the named table, which is expected to
@@ -282,7 +450,8 @@ class SQLBaseStore(object):
Args:
table : string giving the table name
- keyvalues : dict of column names and values to select the rows with
+ keyvalues : dict of column names and values to select the rows with,
+ or None to not apply a WHERE clause.
retcols : list of strings giving the names of the columns to return
"""
return self.runInteraction(
@@ -301,13 +470,20 @@ class SQLBaseStore(object):
keyvalues : dict of column names and values to select the rows with
retcols : list of strings giving the names of the columns to return
"""
- sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
- ", ".join(retcols),
- table,
- " AND ".join("%s = ?" % (k, ) for k in keyvalues)
- )
+ if keyvalues:
+ sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
+ ", ".join(retcols),
+ table,
+ " AND ".join("%s = ?" % (k, ) for k in keyvalues)
+ )
+ txn.execute(sql, keyvalues.values())
+ else:
+ sql = "SELECT %s FROM %s ORDER BY rowid asc" % (
+ ", ".join(retcols),
+ table
+ )
+ txn.execute(sql)
- txn.execute(sql, keyvalues.values())
return self.cursor_to_dict(txn)
def _simple_update_one(self, table, keyvalues, updatevalues,
@@ -345,8 +521,8 @@ class SQLBaseStore(object):
if updatevalues:
update_sql = "UPDATE %s SET %s WHERE %s" % (
table,
- ", ".join("%s = ?" % (k) for k in updatevalues),
- " AND ".join("%s = ?" % (k) for k in keyvalues)
+ ", ".join("%s = ?" % (k,) for k in updatevalues),
+ " AND ".join("%s = ?" % (k,) for k in keyvalues)
)
def func(txn):
@@ -459,10 +635,26 @@ class SQLBaseStore(object):
return [e for e in events if e]
def _get_event_txn(self, txn, event_id, check_redacted=True,
- get_prev_content=False):
+ get_prev_content=False, allow_rejected=False):
+
+ start_time = time.time() * 1000
+ update_counter = self._get_event_counters.update
+
+ cache = self._get_event_cache.setdefault(event_id, {})
+
+ try:
+ # Separate cache entries for each way to invoke _get_event_txn
+ return cache[(check_redacted, get_prev_content, allow_rejected)]
+ except KeyError:
+ pass
+ finally:
+ start_time = update_counter("event_cache", start_time)
+
sql = (
- "SELECT internal_metadata, json, r.event_id FROM event_json as e "
+ "SELECT e.internal_metadata, e.json, r.event_id, rej.reason "
+ "FROM event_json as e "
"LEFT JOIN redactions as r ON e.event_id = r.redacts "
+ "LEFT JOIN rejections as rej on rej.event_id = e.event_id "
"WHERE e.event_id = ? "
"LIMIT 1 "
)
@@ -474,20 +666,35 @@ class SQLBaseStore(object):
if not res:
return None
- internal_metadata, js, redacted = res
+ internal_metadata, js, redacted, rejected_reason = res
- return self._get_event_from_row_txn(
- txn, internal_metadata, js, redacted,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- )
+ start_time = update_counter("select_event", start_time)
+
+ if allow_rejected or not rejected_reason:
+ result = self._get_event_from_row_txn(
+ txn, internal_metadata, js, redacted,
+ check_redacted=check_redacted,
+ get_prev_content=get_prev_content,
+ )
+ cache[(check_redacted, get_prev_content, allow_rejected)] = result
+ return result
+ else:
+ return None
def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
check_redacted=True, get_prev_content=False):
+
+ start_time = time.time() * 1000
+ update_counter = self._get_event_counters.update
+
d = json.loads(js)
+ start_time = update_counter("decode_json", start_time)
+
internal_metadata = json.loads(internal_metadata)
+ start_time = update_counter("decode_internal", start_time)
ev = FrozenEvent(d, internal_metadata_dict=internal_metadata)
+ start_time = update_counter("build_frozen_event", start_time)
if check_redacted and redacted:
ev = prune_event(ev)
@@ -503,6 +710,7 @@ class SQLBaseStore(object):
if because:
ev.unsigned["redacted_because"] = because
+ start_time = update_counter("redact_event", start_time)
if get_prev_content and "replaces_state" in ev.unsigned:
prev = self._get_event_txn(
@@ -512,6 +720,7 @@ class SQLBaseStore(object):
)
if prev:
ev.unsigned["prev_content"] = prev.get_dict()["content"]
+ start_time = update_counter("get_prev_content", start_time)
return ev
@@ -632,7 +841,7 @@ class JoinHelper(object):
for table in self.tables:
res += [f for f in table.fields if f not in res]
- self.EntryType = collections.namedtuple("JoinHelperEntry", res)
+ self.EntryType = namedtuple("JoinHelperEntry", res)
def get_fields(self, **prefixes):
"""Get a string representing a list of fields for use in SELECT
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
new file mode 100644
index 0000000000..e30265750a
--- /dev/null
+++ b/synapse/storage/appservice.py
@@ -0,0 +1,338 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import simplejson
+from simplejson import JSONDecodeError
+from twisted.internet import defer
+
+from synapse.api.constants import Membership
+from synapse.api.errors import StoreError
+from synapse.appservice import ApplicationService
+from synapse.storage.roommember import RoomsForUser
+from ._base import SQLBaseStore
+
+
+logger = logging.getLogger(__name__)
+
+
+def log_failure(failure):
+ logger.error("Failed to detect application services: %s", failure.value)
+ logger.error(failure.getTraceback())
+
+
+class ApplicationServiceStore(SQLBaseStore):
+
+ def __init__(self, hs):
+ super(ApplicationServiceStore, self).__init__(hs)
+ self.services_cache = []
+ self.cache_defer = self._populate_cache()
+ self.cache_defer.addErrback(log_failure)
+
+ @defer.inlineCallbacks
+ def unregister_app_service(self, token):
+ """Unregisters this service.
+
+ This removes all AS specific regex and the base URL. The token is the
+ only thing preserved for future registration attempts.
+ """
+ yield self.cache_defer # make sure the cache is ready
+ yield self.runInteraction(
+ "unregister_app_service",
+ self._unregister_app_service_txn,
+ token,
+ )
+ # update cache TODO: Should this be in the txn?
+ for service in self.services_cache:
+ if service.token == token:
+ service.url = None
+ service.namespaces = None
+ service.hs_token = None
+
+ def _unregister_app_service_txn(self, txn, token):
+ # kill the url to prevent pushes
+ txn.execute(
+ "UPDATE application_services SET url=NULL WHERE token=?",
+ (token,)
+ )
+
+ # cleanup regex
+ as_id = self._get_as_id_txn(txn, token)
+ if not as_id:
+ logger.warning(
+ "unregister_app_service_txn: Failed to find as_id for token=",
+ token
+ )
+ return False
+
+ txn.execute(
+ "DELETE FROM application_services_regex WHERE as_id=?",
+ (as_id,)
+ )
+ return True
+
+ @defer.inlineCallbacks
+ def update_app_service(self, service):
+ """Update an application service, clobbering what was previously there.
+
+ Args:
+ service(ApplicationService): The updated service.
+ """
+ yield self.cache_defer # make sure the cache is ready
+
+ # NB: There is no "insert" since we provide no public-facing API to
+ # allocate new ASes. It relies on the server admin inserting the AS
+ # token into the database manually.
+
+ if not service.token or not service.url:
+ raise StoreError(400, "Token and url must be specified.")
+
+ if not service.hs_token:
+ raise StoreError(500, "No HS token")
+
+ yield self.runInteraction(
+ "update_app_service",
+ self._update_app_service_txn,
+ service
+ )
+
+ # update cache TODO: Should this be in the txn?
+ for (index, cache_service) in enumerate(self.services_cache):
+ if service.token == cache_service.token:
+ self.services_cache[index] = service
+ logger.info("Updated: %s", service)
+ return
+ # new entry
+ self.services_cache.append(service)
+ logger.info("Updated(new): %s", service)
+
+ def _update_app_service_txn(self, txn, service):
+ as_id = self._get_as_id_txn(txn, service.token)
+ if not as_id:
+ logger.warning(
+ "update_app_service_txn: Failed to find as_id for token=",
+ service.token
+ )
+ return False
+
+ txn.execute(
+ "UPDATE application_services SET url=?, hs_token=?, sender=? "
+ "WHERE id=?",
+ (service.url, service.hs_token, service.sender, as_id,)
+ )
+ # cleanup regex
+ txn.execute(
+ "DELETE FROM application_services_regex WHERE as_id=?",
+ (as_id,)
+ )
+ for (ns_int, ns_str) in enumerate(ApplicationService.NS_LIST):
+ if ns_str in service.namespaces:
+ for regex_obj in service.namespaces[ns_str]:
+ txn.execute(
+ "INSERT INTO application_services_regex("
+ "as_id, namespace, regex) values(?,?,?)",
+ (as_id, ns_int, simplejson.dumps(regex_obj))
+ )
+ return True
+
+ def _get_as_id_txn(self, txn, token):
+ cursor = txn.execute(
+ "SELECT id FROM application_services WHERE token=?",
+ (token,)
+ )
+ res = cursor.fetchone()
+ if res:
+ return res[0]
+
+ @defer.inlineCallbacks
+ def get_app_services(self):
+ yield self.cache_defer # make sure the cache is ready
+ defer.returnValue(self.services_cache)
+
+ @defer.inlineCallbacks
+ def get_app_service_by_user_id(self, user_id):
+ """Retrieve an application service from their user ID.
+
+ All application services have associated with them a particular user ID.
+ There is no distinguishing feature on the user ID which indicates it
+ represents an application service. This function allows you to map from
+ a user ID to an application service.
+
+ Args:
+ user_id(str): The user ID to see if it is an application service.
+ Returns:
+ synapse.appservice.ApplicationService or None.
+ """
+
+ yield self.cache_defer # make sure the cache is ready
+
+ for service in self.services_cache:
+ if service.sender == user_id:
+ defer.returnValue(service)
+ return
+ defer.returnValue(None)
+
+ @defer.inlineCallbacks
+ def get_app_service_by_token(self, token, from_cache=True):
+ """Get the application service with the given appservice token.
+
+ Args:
+ token (str): The application service token.
+ from_cache (bool): True to get this service from the cache, False to
+ check the database.
+ Raises:
+ StoreError if there was a problem retrieving this service.
+ """
+ yield self.cache_defer # make sure the cache is ready
+
+ if from_cache:
+ for service in self.services_cache:
+ if service.token == token:
+ defer.returnValue(service)
+ return
+ defer.returnValue(None)
+
+ # TODO: The from_cache=False impl
+ # TODO: This should be JOINed with the application_services_regex table.
+
+ def get_app_service_rooms(self, service):
+ """Get a list of RoomsForUser for this application service.
+
+ Application services may be "interested" in lots of rooms depending on
+ the room ID, the room aliases, or the members in the room. This function
+ takes all of these into account and returns a list of RoomsForUser which
+ represent the entire list of room IDs that this application service
+ wants to know about.
+
+ Args:
+ service: The application service to get a room list for.
+ Returns:
+ A list of RoomsForUser.
+ """
+ return self.runInteraction(
+ "get_app_service_rooms",
+ self._get_app_service_rooms_txn,
+ service,
+ )
+
+ def _get_app_service_rooms_txn(self, txn, service):
+ # get all rooms matching the room ID regex.
+ room_entries = self._simple_select_list_txn(
+ txn=txn, table="rooms", keyvalues=None, retcols=["room_id"]
+ )
+ matching_room_list = set([
+ r["room_id"] for r in room_entries if
+ service.is_interested_in_room(r["room_id"])
+ ])
+
+ # resolve room IDs for matching room alias regex.
+ room_alias_mappings = self._simple_select_list_txn(
+ txn=txn, table="room_aliases", keyvalues=None,
+ retcols=["room_id", "room_alias"]
+ )
+ matching_room_list |= set([
+ r["room_id"] for r in room_alias_mappings if
+ service.is_interested_in_alias(r["room_alias"])
+ ])
+
+ # get all rooms for every user for this AS. This is scoped to users on
+ # this HS only.
+ user_list = self._simple_select_list_txn(
+ txn=txn, table="users", keyvalues=None, retcols=["name"]
+ )
+ user_list = [
+ u["name"] for u in user_list if
+ service.is_interested_in_user(u["name"])
+ ]
+ rooms_for_user_matching_user_id = set() # RoomsForUser list
+ for user_id in user_list:
+ # FIXME: This assumes this store is linked with RoomMemberStore :(
+ rooms_for_user = self._get_rooms_for_user_where_membership_is_txn(
+ txn=txn,
+ user_id=user_id,
+ membership_list=[Membership.JOIN]
+ )
+ rooms_for_user_matching_user_id |= set(rooms_for_user)
+
+ # make RoomsForUser tuples for room ids and aliases which are not in the
+ # main rooms_for_user_list - e.g. they are rooms which do not have AS
+ # registered users in it.
+ known_room_ids = [r.room_id for r in rooms_for_user_matching_user_id]
+ missing_rooms_for_user = [
+ RoomsForUser(r, service.sender, "join") for r in
+ matching_room_list if r not in known_room_ids
+ ]
+ rooms_for_user_matching_user_id |= set(missing_rooms_for_user)
+
+ return rooms_for_user_matching_user_id
+
+ @defer.inlineCallbacks
+ def _populate_cache(self):
+ """Populates the ApplicationServiceCache from the database."""
+ sql = ("SELECT * FROM application_services LEFT JOIN "
+ "application_services_regex ON application_services.id = "
+ "application_services_regex.as_id")
+ # SQL results in the form:
+ # [
+ # {
+ # 'regex': "something",
+ # 'url': "something",
+ # 'namespace': enum,
+ # 'as_id': 0,
+ # 'token': "something",
+ # 'hs_token': "otherthing",
+ # 'id': 0
+ # }
+ # ]
+ services = {}
+ results = yield self._execute_and_decode(sql)
+ for res in results:
+ as_token = res["token"]
+ if as_token not in services:
+ # add the service
+ services[as_token] = {
+ "url": res["url"],
+ "token": as_token,
+ "hs_token": res["hs_token"],
+ "sender": res["sender"],
+ "namespaces": {
+ ApplicationService.NS_USERS: [],
+ ApplicationService.NS_ALIASES: [],
+ ApplicationService.NS_ROOMS: []
+ }
+ }
+ # add the namespace regex if one exists
+ ns_int = res["namespace"]
+ if ns_int is None:
+ continue
+ try:
+ services[as_token]["namespaces"][
+ ApplicationService.NS_LIST[ns_int]].append(
+ simplejson.loads(res["regex"])
+ )
+ except IndexError:
+ logger.error("Bad namespace enum '%s'. %s", ns_int, res)
+ except JSONDecodeError:
+ logger.error("Bad regex object '%s'", res["regex"])
+
+ # TODO get last successful txn id f.e. service
+ for service in services.values():
+ logger.info("Found application service: %s", service)
+ self.services_cache.append(ApplicationService(
+ token=service["token"],
+ url=service["url"],
+ namespaces=service["namespaces"],
+ hs_token=service["hs_token"],
+ sender=service["sender"]
+ ))
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 0cbcdd1b55..2deda8ac50 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -55,17 +55,19 @@ class EventFederationStore(SQLBaseStore):
results = set()
base_sql = (
- "SELECT auth_id FROM event_auth WHERE %s"
+ "SELECT auth_id FROM event_auth WHERE event_id = ?"
)
front = set(event_ids)
while front:
- sql = base_sql % (
- " OR ".join(["event_id=?"] * len(front)),
- )
+ new_front = set()
+ for f in front:
+ txn.execute(base_sql, (f,))
+ new_front.update([r[0] for r in txn.fetchall()])
+
+ new_front -= results
- txn.execute(sql, list(front))
- front = [r[0] for r in txn.fetchall()]
+ front = new_front
results.update(front)
return list(results)
@@ -379,3 +381,51 @@ class EventFederationStore(SQLBaseStore):
event_results += new_front
return self._get_events_txn(txn, event_results)
+
+ def get_missing_events(self, room_id, earliest_events, latest_events,
+ limit, min_depth):
+ return self.runInteraction(
+ "get_missing_events",
+ self._get_missing_events,
+ room_id, earliest_events, latest_events, limit, min_depth
+ )
+
+ def _get_missing_events(self, txn, room_id, earliest_events, latest_events,
+ limit, min_depth):
+
+ earliest_events = set(earliest_events)
+ front = set(latest_events) - earliest_events
+
+ event_results = set()
+
+ query = (
+ "SELECT prev_event_id FROM event_edges "
+ "WHERE room_id = ? AND event_id = ? AND is_state = 0 "
+ "LIMIT ?"
+ )
+
+ while front and len(event_results) < limit:
+ new_front = set()
+ for event_id in front:
+ txn.execute(
+ query,
+ (room_id, event_id, limit - len(event_results))
+ )
+
+ for e_id, in txn.fetchall():
+ new_front.add(e_id)
+
+ new_front -= earliest_events
+ new_front -= event_results
+
+ front = new_front
+ event_results |= new_front
+
+ events = self._get_events_txn(txn, event_results)
+
+ events = sorted(
+ [ev for ev in events if ev.depth >= min_depth],
+ key=lambda e: e.depth,
+ )
+
+ return events[:limit]
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
new file mode 100644
index 0000000000..457a11fd02
--- /dev/null
+++ b/synapse/storage/filtering.py
@@ -0,0 +1,63 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from ._base import SQLBaseStore
+
+import simplejson as json
+
+
+class FilteringStore(SQLBaseStore):
+ @defer.inlineCallbacks
+ def get_user_filter(self, user_localpart, filter_id):
+ def_json = yield self._simple_select_one_onecol(
+ table="user_filters",
+ keyvalues={
+ "user_id": user_localpart,
+ "filter_id": filter_id,
+ },
+ retcol="filter_json",
+ allow_none=False,
+ )
+
+ defer.returnValue(json.loads(def_json))
+
+ def add_user_filter(self, user_localpart, user_filter):
+ def_json = json.dumps(user_filter)
+
+ # Need an atomic transaction to SELECT the maximal ID so far then
+ # INSERT a new one
+ def _do_txn(txn):
+ sql = (
+ "SELECT MAX(filter_id) FROM user_filters "
+ "WHERE user_id = ?"
+ )
+ txn.execute(sql, (user_localpart,))
+ max_id = txn.fetchone()[0]
+ if max_id is None:
+ filter_id = 0
+ else:
+ filter_id = max_id + 1
+
+ sql = (
+ "INSERT INTO user_filters (user_id, filter_id, filter_json)"
+ "VALUES(?, ?, ?)"
+ )
+ txn.execute(sql, (user_localpart, filter_id, def_json))
+
+ return filter_id
+
+ return self.runInteraction("add_user_filter", _do_txn)
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
new file mode 100644
index 0000000000..bbf322cc84
--- /dev/null
+++ b/synapse/storage/push_rule.py
@@ -0,0 +1,264 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+
+from ._base import SQLBaseStore, Table
+from twisted.internet import defer
+
+import logging
+import copy
+import simplejson as json
+
+logger = logging.getLogger(__name__)
+
+
+class PushRuleStore(SQLBaseStore):
+ @defer.inlineCallbacks
+ def get_push_rules_for_user(self, user_name):
+ sql = (
+ "SELECT "+",".join(PushRuleTable.fields)+" "
+ "FROM "+PushRuleTable.table_name+" "
+ "WHERE user_name = ? "
+ "ORDER BY priority_class DESC, priority DESC"
+ )
+ rows = yield self._execute(None, sql, user_name)
+
+ dicts = []
+ for r in rows:
+ d = {}
+ for i, f in enumerate(PushRuleTable.fields):
+ d[f] = r[i]
+ dicts.append(d)
+
+ defer.returnValue(dicts)
+
+ @defer.inlineCallbacks
+ def get_push_rules_enabled_for_user(self, user_name):
+ results = yield self._simple_select_list(
+ PushRuleEnableTable.table_name,
+ {'user_name': user_name},
+ PushRuleEnableTable.fields
+ )
+ defer.returnValue(
+ {r['rule_id']: False if r['enabled'] == 0 else True for r in results}
+ )
+
+ @defer.inlineCallbacks
+ def get_push_rule_enabled_by_user_rule_id(self, user_name, rule_id):
+ results = yield self._simple_select_list(
+ PushRuleEnableTable.table_name,
+ {'user_name': user_name, 'rule_id': rule_id},
+ ['enabled']
+ )
+ if not results:
+ defer.returnValue(True)
+ defer.returnValue(results[0])
+
+ @defer.inlineCallbacks
+ def add_push_rule(self, before, after, **kwargs):
+ vals = copy.copy(kwargs)
+ if 'conditions' in vals:
+ vals['conditions'] = json.dumps(vals['conditions'])
+ if 'actions' in vals:
+ vals['actions'] = json.dumps(vals['actions'])
+ # we could check the rest of the keys are valid column names
+ # but sqlite will do that anyway so I think it's just pointless.
+ if 'id' in vals:
+ del vals['id']
+
+ if before or after:
+ ret = yield self.runInteraction(
+ "_add_push_rule_relative_txn",
+ self._add_push_rule_relative_txn,
+ before=before,
+ after=after,
+ **vals
+ )
+ defer.returnValue(ret)
+ else:
+ ret = yield self.runInteraction(
+ "_add_push_rule_highest_priority_txn",
+ self._add_push_rule_highest_priority_txn,
+ **vals
+ )
+ defer.returnValue(ret)
+
+ def _add_push_rule_relative_txn(self, txn, user_name, **kwargs):
+ after = None
+ relative_to_rule = None
+ if 'after' in kwargs and kwargs['after']:
+ after = kwargs['after']
+ relative_to_rule = after
+ if 'before' in kwargs and kwargs['before']:
+ relative_to_rule = kwargs['before']
+
+ # get the priority of the rule we're inserting after/before
+ sql = (
+ "SELECT priority_class, priority FROM ? "
+ "WHERE user_name = ? and rule_id = ?" % (PushRuleTable.table_name,)
+ )
+ txn.execute(sql, (user_name, relative_to_rule))
+ res = txn.fetchall()
+ if not res:
+ raise RuleNotFoundException(
+ "before/after rule not found: %s" % (relative_to_rule,)
+ )
+ priority_class, base_rule_priority = res[0]
+
+ if 'priority_class' in kwargs and kwargs['priority_class'] != priority_class:
+ raise InconsistentRuleException(
+ "Given priority class does not match class of relative rule"
+ )
+
+ new_rule = copy.copy(kwargs)
+ if 'before' in new_rule:
+ del new_rule['before']
+ if 'after' in new_rule:
+ del new_rule['after']
+ new_rule['priority_class'] = priority_class
+ new_rule['user_name'] = user_name
+
+ # check if the priority before/after is free
+ new_rule_priority = base_rule_priority
+ if after:
+ new_rule_priority -= 1
+ else:
+ new_rule_priority += 1
+
+ new_rule['priority'] = new_rule_priority
+
+ sql = (
+ "SELECT COUNT(*) FROM " + PushRuleTable.table_name +
+ " WHERE user_name = ? AND priority_class = ? AND priority = ?"
+ )
+ txn.execute(sql, (user_name, priority_class, new_rule_priority))
+ res = txn.fetchall()
+ num_conflicting = res[0][0]
+
+ # if there are conflicting rules, bump everything
+ if num_conflicting:
+ sql = "UPDATE "+PushRuleTable.table_name+" SET priority = priority "
+ if after:
+ sql += "-1"
+ else:
+ sql += "+1"
+ sql += " WHERE user_name = ? AND priority_class = ? AND priority "
+ if after:
+ sql += "<= ?"
+ else:
+ sql += ">= ?"
+
+ txn.execute(sql, (user_name, priority_class, new_rule_priority))
+
+ # now insert the new rule
+ sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+ sql += ",".join(new_rule.keys())+") VALUES ("
+ sql += ", ".join(["?" for _ in new_rule.keys()])+")"
+
+ txn.execute(sql, new_rule.values())
+
+ def _add_push_rule_highest_priority_txn(self, txn, user_name,
+ priority_class, **kwargs):
+ # find the highest priority rule in that class
+ sql = (
+ "SELECT COUNT(*), MAX(priority) FROM " + PushRuleTable.table_name +
+ " WHERE user_name = ? and priority_class = ?"
+ )
+ txn.execute(sql, (user_name, priority_class))
+ res = txn.fetchall()
+ (how_many, highest_prio) = res[0]
+
+ new_prio = 0
+ if how_many > 0:
+ new_prio = highest_prio + 1
+
+ # and insert the new rule
+ new_rule = copy.copy(kwargs)
+ if 'id' in new_rule:
+ del new_rule['id']
+ new_rule['user_name'] = user_name
+ new_rule['priority_class'] = priority_class
+ new_rule['priority'] = new_prio
+
+ sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+ sql += ",".join(new_rule.keys())+") VALUES ("
+ sql += ", ".join(["?" for _ in new_rule.keys()])+")"
+
+ txn.execute(sql, new_rule.values())
+
+ @defer.inlineCallbacks
+ def delete_push_rule(self, user_name, rule_id):
+ """
+ Delete a push rule. Args specify the row to be deleted and can be
+ any of the columns in the push_rule table, but below are the
+ standard ones
+
+ Args:
+ user_name (str): The matrix ID of the push rule owner
+ rule_id (str): The rule_id of the rule to be deleted
+ """
+ yield self._simple_delete_one(
+ PushRuleTable.table_name,
+ {'user_name': user_name, 'rule_id': rule_id}
+ )
+
+ @defer.inlineCallbacks
+ def set_push_rule_enabled(self, user_name, rule_id, enabled):
+ if enabled:
+ yield self._simple_delete_one(
+ PushRuleEnableTable.table_name,
+ {'user_name': user_name, 'rule_id': rule_id}
+ )
+ else:
+ yield self._simple_upsert(
+ PushRuleEnableTable.table_name,
+ {'user_name': user_name, 'rule_id': rule_id},
+ {'enabled': False}
+ )
+
+
+class RuleNotFoundException(Exception):
+ pass
+
+
+class InconsistentRuleException(Exception):
+ pass
+
+
+class PushRuleTable(Table):
+ table_name = "push_rules"
+
+ fields = [
+ "id",
+ "user_name",
+ "rule_id",
+ "priority_class",
+ "priority",
+ "conditions",
+ "actions",
+ ]
+
+ EntryType = collections.namedtuple("PushRuleEntry", fields)
+
+
+class PushRuleEnableTable(Table):
+ table_name = "push_rules_enable"
+
+ fields = [
+ "user_name",
+ "rule_id",
+ "enabled"
+ ]
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
new file mode 100644
index 0000000000..6622b4d18a
--- /dev/null
+++ b/synapse/storage/pusher.py
@@ -0,0 +1,173 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+
+from ._base import SQLBaseStore, Table
+from twisted.internet import defer
+
+from synapse.api.errors import StoreError
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class PusherStore(SQLBaseStore):
+ @defer.inlineCallbacks
+ def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
+ sql = (
+ "SELECT id, user_name, kind, profile_tag, app_id,"
+ "app_display_name, device_display_name, pushkey, ts, data, "
+ "last_token, last_success, failing_since "
+ "FROM pushers "
+ "WHERE app_id = ? AND pushkey = ?"
+ )
+
+ rows = yield self._execute(
+ None, sql, app_id_and_pushkey[0], app_id_and_pushkey[1]
+ )
+
+ ret = [
+ {
+ "id": r[0],
+ "user_name": r[1],
+ "kind": r[2],
+ "profile_tag": r[3],
+ "app_id": r[4],
+ "app_display_name": r[5],
+ "device_display_name": r[6],
+ "pushkey": r[7],
+ "pushkey_ts": r[8],
+ "data": r[9],
+ "last_token": r[10],
+ "last_success": r[11],
+ "failing_since": r[12]
+ }
+ for r in rows
+ ]
+
+ defer.returnValue(ret[0])
+
+ @defer.inlineCallbacks
+ def get_all_pushers(self):
+ sql = (
+ "SELECT id, user_name, kind, profile_tag, app_id,"
+ "app_display_name, device_display_name, pushkey, ts, data, "
+ "last_token, last_success, failing_since "
+ "FROM pushers"
+ )
+
+ rows = yield self._execute(None, sql)
+
+ ret = [
+ {
+ "id": r[0],
+ "user_name": r[1],
+ "kind": r[2],
+ "profile_tag": r[3],
+ "app_id": r[4],
+ "app_display_name": r[5],
+ "device_display_name": r[6],
+ "pushkey": r[7],
+ "pushkey_ts": r[8],
+ "data": r[9],
+ "last_token": r[10],
+ "last_success": r[11],
+ "failing_since": r[12]
+ }
+ for r in rows
+ ]
+
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def add_pusher(self, user_name, profile_tag, kind, app_id,
+ app_display_name, device_display_name,
+ pushkey, pushkey_ts, lang, data):
+ try:
+ yield self._simple_upsert(
+ PushersTable.table_name,
+ dict(
+ app_id=app_id,
+ pushkey=pushkey,
+ ),
+ dict(
+ user_name=user_name,
+ kind=kind,
+ profile_tag=profile_tag,
+ app_display_name=app_display_name,
+ device_display_name=device_display_name,
+ ts=pushkey_ts,
+ lang=lang,
+ data=data
+ ))
+ except Exception as e:
+ logger.error("create_pusher with failed: %s", e)
+ raise StoreError(500, "Problem creating pusher.")
+
+ @defer.inlineCallbacks
+ def delete_pusher_by_app_id_pushkey(self, app_id, pushkey):
+ yield self._simple_delete_one(
+ PushersTable.table_name,
+ dict(app_id=app_id, pushkey=pushkey)
+ )
+
+ @defer.inlineCallbacks
+ def update_pusher_last_token(self, app_id, pushkey, last_token):
+ yield self._simple_update_one(
+ PushersTable.table_name,
+ {'app_id': app_id, 'pushkey': pushkey},
+ {'last_token': last_token}
+ )
+
+ @defer.inlineCallbacks
+ def update_pusher_last_token_and_success(self, app_id, pushkey,
+ last_token, last_success):
+ yield self._simple_update_one(
+ PushersTable.table_name,
+ {'app_id': app_id, 'pushkey': pushkey},
+ {'last_token': last_token, 'last_success': last_success}
+ )
+
+ @defer.inlineCallbacks
+ def update_pusher_failing_since(self, app_id, pushkey, failing_since):
+ yield self._simple_update_one(
+ PushersTable.table_name,
+ {'app_id': app_id, 'pushkey': pushkey},
+ {'failing_since': failing_since}
+ )
+
+
+class PushersTable(Table):
+ table_name = "pushers"
+
+ fields = [
+ "id",
+ "user_name",
+ "kind",
+ "profile_tag",
+ "app_id",
+ "app_display_name",
+ "device_display_name",
+ "pushkey",
+ "pushkey_ts",
+ "data",
+ "last_token",
+ "last_success",
+ "failing_since"
+ ]
+
+ EntryType = collections.namedtuple("PusherEntry", fields)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 75dffa4db2..029b07cc66 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -122,7 +122,8 @@ class RegistrationStore(SQLBaseStore):
def _query_for_auth(self, txn, token):
sql = (
- "SELECT users.name, users.admin, access_tokens.device_id"
+ "SELECT users.name, users.admin,"
+ " access_tokens.device_id, access_tokens.id as token_id"
" FROM users"
" INNER JOIN access_tokens on users.id = access_tokens.user_id"
" WHERE token = ?"
diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py
new file mode 100644
index 0000000000..4e1a9a2783
--- /dev/null
+++ b/synapse/storage/rejections.py
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class RejectionsStore(SQLBaseStore):
+ def _store_rejections_txn(self, txn, event_id, reason):
+ self._simple_insert_txn(
+ txn,
+ table="rejections",
+ values={
+ "event_id": event_id,
+ "reason": reason,
+ "last_check": self._clock.time_msec(),
+ }
+ )
+
+ def get_rejection_reason(self, event_id):
+ return self._simple_select_one_onecol(
+ table="rejections",
+ retcol="reason",
+ keyvalues={
+ "event_id": event_id,
+ },
+ allow_none=True,
+ )
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 6542f8e4f8..750b17a45f 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -82,38 +82,45 @@ class RoomStore(SQLBaseStore):
"topic" key if one is set, and a "name" key if one is set
"""
- topic_subquery = (
- "SELECT topics.event_id as event_id, "
- "topics.room_id as room_id, topic "
- "FROM topics "
- "INNER JOIN current_state_events as c "
- "ON c.event_id = topics.event_id "
- )
+ def f(txn):
+ topic_subquery = (
+ "SELECT topics.event_id as event_id, "
+ "topics.room_id as room_id, topic "
+ "FROM topics "
+ "INNER JOIN current_state_events as c "
+ "ON c.event_id = topics.event_id "
+ )
- name_subquery = (
- "SELECT room_names.event_id as event_id, "
- "room_names.room_id as room_id, name "
- "FROM room_names "
- "INNER JOIN current_state_events as c "
- "ON c.event_id = room_names.event_id "
- )
+ name_subquery = (
+ "SELECT room_names.event_id as event_id, "
+ "room_names.room_id as room_id, name "
+ "FROM room_names "
+ "INNER JOIN current_state_events as c "
+ "ON c.event_id = room_names.event_id "
+ )
- # We use non printing ascii character US () as a seperator
- sql = (
- "SELECT r.room_id, n.name, t.topic, "
- "group_concat(a.room_alias, '') "
- "FROM rooms AS r "
- "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
- "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
- "INNER JOIN room_aliases AS a ON a.room_id = r.room_id "
- "WHERE r.is_public = ? "
- "GROUP BY r.room_id "
- ) % {
- "topic": topic_subquery,
- "name": name_subquery,
- }
-
- rows = yield self._execute(None, sql, is_public)
+ # We use non printing ascii character US () as a seperator
+ sql = (
+ "SELECT r.room_id, n.name, t.topic, "
+ "group_concat(a.room_alias, '') "
+ "FROM rooms AS r "
+ "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
+ "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
+ "INNER JOIN room_aliases AS a ON a.room_id = r.room_id "
+ "WHERE r.is_public = ? "
+ "GROUP BY r.room_id "
+ ) % {
+ "topic": topic_subquery,
+ "name": name_subquery,
+ }
+
+ c = txn.execute(sql, (is_public,))
+
+ return c.fetchall()
+
+ rows = yield self.runInteraction(
+ "get_rooms", f
+ )
ret = [
{
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index e59e65529b..65ffb4627f 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -17,9 +17,10 @@ from twisted.internet import defer
from collections import namedtuple
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore, cached
from synapse.api.constants import Membership
+from synapse.types import UserID
import logging
@@ -39,7 +40,7 @@ class RoomMemberStore(SQLBaseStore):
"""
try:
target_user_id = event.state_key
- domain = self.hs.parse_userid(target_user_id).domain
+ domain = UserID.from_string(target_user_id).domain
except:
logger.exception(
"Failed to parse target_user_id=%s", target_user_id
@@ -84,7 +85,7 @@ class RoomMemberStore(SQLBaseStore):
for e in member_events:
try:
joined_domains.add(
- self.hs.parse_userid(e.state_key).domain
+ UserID.from_string(e.state_key).domain
)
except:
# FIXME: How do we deal with invalid user ids in the db?
@@ -97,6 +98,8 @@ class RoomMemberStore(SQLBaseStore):
txn.execute(sql, (event.room_id, domain))
+ self.get_rooms_for_user.invalidate(target_user_id)
+
@defer.inlineCallbacks
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.
@@ -177,6 +180,14 @@ class RoomMemberStore(SQLBaseStore):
if not membership_list:
return defer.succeed(None)
+ return self.runInteraction(
+ "get_rooms_for_user_where_membership_is",
+ self._get_rooms_for_user_where_membership_is_txn,
+ user_id, membership_list
+ )
+
+ def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id,
+ membership_list):
where_clause = "user_id = ? AND (%s)" % (
" OR ".join(["membership = ?" for _ in membership_list]),
)
@@ -184,24 +195,18 @@ class RoomMemberStore(SQLBaseStore):
args = [user_id]
args.extend(membership_list)
- def f(txn):
- sql = (
- "SELECT m.room_id, m.sender, m.membership"
- " FROM room_memberships as m"
- " INNER JOIN current_state_events as c"
- " ON m.event_id = c.event_id"
- " WHERE %s"
- ) % (where_clause,)
-
- txn.execute(sql, args)
- return [
- RoomsForUser(**r) for r in self.cursor_to_dict(txn)
- ]
+ sql = (
+ "SELECT m.room_id, m.sender, m.membership"
+ " FROM room_memberships as m"
+ " INNER JOIN current_state_events as c"
+ " ON m.event_id = c.event_id"
+ " WHERE %s"
+ ) % (where_clause,)
- return self.runInteraction(
- "get_rooms_for_user_where_membership_is",
- f
- )
+ txn.execute(sql, args)
+ return [
+ RoomsForUser(**r) for r in self.cursor_to_dict(txn)
+ ]
def get_joined_hosts_for_room(self, room_id):
return self._simple_select_onecol(
@@ -239,28 +244,32 @@ class RoomMemberStore(SQLBaseStore):
results = self._parse_events_txn(txn, rows)
return results
+ @cached()
+ def get_rooms_for_user(self, user_id):
+ return self.get_rooms_for_user_where_membership_is(
+ user_id, membership_list=[Membership.JOIN],
+ )
+
+ @defer.inlineCallbacks
def user_rooms_intersect(self, user_id_list):
""" Checks whether all the users whose IDs are given in a list share a
room.
+
+ This is a "hot path" function that's called a lot, e.g. by presence for
+ generating the event stream. As such, it is implemented locally by
+ wrapping logic around heavily-cached database queries.
"""
- def interaction(txn):
- user_list_clause = " OR ".join(["m.user_id = ?"] * len(user_id_list))
- sql = (
- "SELECT m.room_id FROM room_memberships as m "
- "INNER JOIN current_state_events as c "
- "ON m.event_id = c.event_id "
- "WHERE m.membership = 'join' "
- "AND (%(clause)s) "
- # TODO(paul): We've got duplicate rows in the database somewhere
- # so we have to DISTINCT m.user_id here
- "GROUP BY m.room_id HAVING COUNT(DISTINCT m.user_id) = ?"
- ) % {"clause": user_list_clause}
+ if len(user_id_list) < 2:
+ defer.returnValue(True)
+
+ deferreds = [self.get_rooms_for_user(u) for u in user_id_list]
- args = list(user_id_list)
- args.append(len(user_id_list))
+ results = yield defer.DeferredList(deferreds, consumeErrors=True)
- txn.execute(sql, args)
+ # A list of sets of strings giving room IDs for each user
+ room_id_lists = [set([r.room_id for r in result[1]]) for result in results]
- return len(txn.fetchall()) > 0
+ # There isn't a setintersection(*list_of_sets)
+ ret = len(room_id_lists.pop(0).intersection(*room_id_lists)) > 0
- return self.runInteraction("user_rooms_intersect", interaction)
+ defer.returnValue(ret)
diff --git a/synapse/storage/schema/delta/v11.sql b/synapse/storage/schema/delta/11/v11.sql
index 313592221b..313592221b 100644
--- a/synapse/storage/schema/delta/v11.sql
+++ b/synapse/storage/schema/delta/11/v11.sql
diff --git a/synapse/storage/schema/delta/12/v12.sql b/synapse/storage/schema/delta/12/v12.sql
new file mode 100644
index 0000000000..b87ef1fe79
--- /dev/null
+++ b/synapse/storage/schema/delta/12/v12.sql
@@ -0,0 +1,67 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS rejections(
+ event_id TEXT NOT NULL,
+ reason TEXT NOT NULL,
+ last_check TEXT NOT NULL,
+ CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE
+);
+
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_name TEXT NOT NULL,
+ profile_tag varchar(32) NOT NULL,
+ kind varchar(8) NOT NULL,
+ app_id varchar(64) NOT NULL,
+ app_display_name varchar(64) NOT NULL,
+ device_display_name varchar(128) NOT NULL,
+ pushkey blob NOT NULL,
+ ts BIGINT NOT NULL,
+ lang varchar(8),
+ data blob,
+ last_token TEXT,
+ last_success BIGINT,
+ failing_since BIGINT,
+ FOREIGN KEY(user_name) REFERENCES users(name),
+ UNIQUE (app_id, pushkey)
+);
+
+CREATE TABLE IF NOT EXISTS push_rules (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_name TEXT NOT NULL,
+ rule_id TEXT NOT NULL,
+ priority_class TINYINT NOT NULL,
+ priority INTEGER NOT NULL DEFAULT 0,
+ conditions TEXT NOT NULL,
+ actions TEXT NOT NULL,
+ UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
+
+CREATE TABLE IF NOT EXISTS user_filters(
+ user_id TEXT,
+ filter_id INTEGER,
+ filter_json TEXT,
+ FOREIGN KEY(user_id) REFERENCES users(id)
+);
+
+CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters(
+ user_id, filter_id
+);
+
+PRAGMA user_version = 12;
diff --git a/synapse/storage/schema/delta/v3.sql b/synapse/storage/schema/delta/13/v13.sql
index c67e38ff52..e491ad5aec 100644
--- a/synapse/storage/schema/delta/v3.sql
+++ b/synapse/storage/schema/delta/13/v13.sql
@@ -1,4 +1,4 @@
-/* Copyright 2014, 2015 OpenMarket Ltd
+/* Copyright 2015 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,15 +13,22 @@
* limitations under the License.
*/
+CREATE TABLE IF NOT EXISTS application_services(
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ url TEXT,
+ token TEXT,
+ hs_token TEXT,
+ sender TEXT,
+ UNIQUE(token) ON CONFLICT ROLLBACK
+);
-CREATE INDEX IF NOT EXISTS room_aliases_alias ON room_aliases(room_alias);
-CREATE INDEX IF NOT EXISTS room_aliases_id ON room_aliases(room_id);
+CREATE TABLE IF NOT EXISTS application_services_regex(
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ as_id INTEGER NOT NULL,
+ namespace INTEGER, /* enum[room_id|room_alias|user_id] */
+ regex TEXT,
+ FOREIGN KEY(as_id) REFERENCES application_services(id)
+);
-CREATE INDEX IF NOT EXISTS room_alias_servers_alias ON room_alias_servers(room_alias);
-DELETE FROM room_aliases WHERE rowid NOT IN (SELECT max(rowid) FROM room_aliases GROUP BY room_alias, room_id);
-
-CREATE UNIQUE INDEX IF NOT EXISTS room_aliases_uniq ON room_aliases(room_alias, room_id);
-
-PRAGMA user_version = 3;
diff --git a/synapse/storage/schema/delta/14/upgrade_appservice_db.py b/synapse/storage/schema/delta/14/upgrade_appservice_db.py
new file mode 100644
index 0000000000..847b1c5b89
--- /dev/null
+++ b/synapse/storage/schema/delta/14/upgrade_appservice_db.py
@@ -0,0 +1,23 @@
+import json
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+def run_upgrade(cur):
+ cur.execute("SELECT id, regex FROM application_services_regex")
+ for row in cur.fetchall():
+ try:
+ logger.debug("Checking %s..." % row[0])
+ json.loads(row[1])
+ except ValueError:
+ # row isn't in json, make it so.
+ string_regex = row[1]
+ new_regex = json.dumps({
+ "regex": string_regex,
+ "exclusive": True
+ })
+ cur.execute(
+ "UPDATE application_services_regex SET regex=? WHERE id=?",
+ (new_regex, row[0])
+ )
diff --git a/synapse/storage/schema/delta/14/v14.sql b/synapse/storage/schema/delta/14/v14.sql
new file mode 100644
index 0000000000..0212726448
--- /dev/null
+++ b/synapse/storage/schema/delta/14/v14.sql
@@ -0,0 +1,9 @@
+CREATE TABLE IF NOT EXISTS push_rules_enable (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_name TEXT NOT NULL,
+ rule_id TEXT NOT NULL,
+ enabled TINYINT,
+ UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX IF NOT EXISTS push_rules_enable_user_name on push_rules_enable (user_name);
diff --git a/synapse/storage/schema/delta/v2.sql b/synapse/storage/schema/delta/v2.sql
deleted file mode 100644
index f740f6dd5d..0000000000
--- a/synapse/storage/schema/delta/v2.sql
+++ /dev/null
@@ -1,168 +0,0 @@
-/* Copyright 2014, 2015 OpenMarket Ltd
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-CREATE TABLE IF NOT EXISTS events(
- stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT,
- topological_ordering INTEGER NOT NULL,
- event_id TEXT NOT NULL,
- type TEXT NOT NULL,
- room_id TEXT NOT NULL,
- content TEXT NOT NULL,
- unrecognized_keys TEXT,
- processed BOOL NOT NULL,
- outlier BOOL NOT NULL,
- CONSTRAINT ev_uniq UNIQUE (event_id)
-);
-
-CREATE INDEX IF NOT EXISTS events_event_id ON events (event_id);
-CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering);
-CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering);
-CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id);
-
-CREATE TABLE IF NOT EXISTS state_events(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- type TEXT NOT NULL,
- state_key TEXT NOT NULL,
- prev_state TEXT
-);
-
-CREATE UNIQUE INDEX IF NOT EXISTS state_events_event_id ON state_events (event_id);
-CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id);
-CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type);
-CREATE INDEX IF NOT EXISTS state_events_state_key ON state_events (state_key);
-
-
-CREATE TABLE IF NOT EXISTS current_state_events(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- type TEXT NOT NULL,
- state_key TEXT NOT NULL,
- CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
-);
-
-CREATE INDEX IF NOT EXISTS curr_events_event_id ON current_state_events (event_id);
-CREATE INDEX IF NOT EXISTS current_state_events_room_id ON current_state_events (room_id);
-CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (type);
-CREATE INDEX IF NOT EXISTS current_state_events_state_key ON current_state_events (state_key);
-
-CREATE TABLE IF NOT EXISTS room_memberships(
- event_id TEXT NOT NULL,
- user_id TEXT NOT NULL,
- sender TEXT NOT NULL,
- room_id TEXT NOT NULL,
- membership TEXT NOT NULL
-);
-
-CREATE INDEX IF NOT EXISTS room_memberships_event_id ON room_memberships (event_id);
-CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id);
-CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id);
-
-CREATE TABLE IF NOT EXISTS feedback(
- event_id TEXT NOT NULL,
- feedback_type TEXT,
- target_event_id TEXT,
- sender TEXT,
- room_id TEXT
-);
-
-CREATE TABLE IF NOT EXISTS topics(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- topic TEXT NOT NULL
-);
-
-CREATE TABLE IF NOT EXISTS room_names(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- name TEXT NOT NULL
-);
-
-CREATE TABLE IF NOT EXISTS rooms(
- room_id TEXT PRIMARY KEY NOT NULL,
- is_public INTEGER,
- creator TEXT
-);
-
-CREATE TABLE IF NOT EXISTS room_join_rules(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- join_rule TEXT NOT NULL
-);
-CREATE INDEX IF NOT EXISTS room_join_rules_event_id ON room_join_rules(event_id);
-CREATE INDEX IF NOT EXISTS room_join_rules_room_id ON room_join_rules(room_id);
-
-
-CREATE TABLE IF NOT EXISTS room_power_levels(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- user_id TEXT NOT NULL,
- level INTEGER NOT NULL
-);
-CREATE INDEX IF NOT EXISTS room_power_levels_event_id ON room_power_levels(event_id);
-CREATE INDEX IF NOT EXISTS room_power_levels_room_id ON room_power_levels(room_id);
-CREATE INDEX IF NOT EXISTS room_power_levels_room_user ON room_power_levels(room_id, user_id);
-
-
-CREATE TABLE IF NOT EXISTS room_default_levels(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- level INTEGER NOT NULL
-);
-
-CREATE INDEX IF NOT EXISTS room_default_levels_event_id ON room_default_levels(event_id);
-CREATE INDEX IF NOT EXISTS room_default_levels_room_id ON room_default_levels(room_id);
-
-
-CREATE TABLE IF NOT EXISTS room_add_state_levels(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- level INTEGER NOT NULL
-);
-
-CREATE INDEX IF NOT EXISTS room_add_state_levels_event_id ON room_add_state_levels(event_id);
-CREATE INDEX IF NOT EXISTS room_add_state_levels_room_id ON room_add_state_levels(room_id);
-
-
-CREATE TABLE IF NOT EXISTS room_send_event_levels(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- level INTEGER NOT NULL
-);
-
-CREATE INDEX IF NOT EXISTS room_send_event_levels_event_id ON room_send_event_levels(event_id);
-CREATE INDEX IF NOT EXISTS room_send_event_levels_room_id ON room_send_event_levels(room_id);
-
-
-CREATE TABLE IF NOT EXISTS room_ops_levels(
- event_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- ban_level INTEGER,
- kick_level INTEGER
-);
-
-CREATE INDEX IF NOT EXISTS room_ops_levels_event_id ON room_ops_levels(event_id);
-CREATE INDEX IF NOT EXISTS room_ops_levels_room_id ON room_ops_levels(room_id);
-
-
-CREATE TABLE IF NOT EXISTS room_hosts(
- room_id TEXT NOT NULL,
- host TEXT NOT NULL,
- CONSTRAINT room_hosts_uniq UNIQUE (room_id, host) ON CONFLICT IGNORE
-);
-
-CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id);
-
-PRAGMA user_version = 2;
diff --git a/synapse/storage/schema/delta/v4.sql b/synapse/storage/schema/delta/v4.sql
deleted file mode 100644
index d3807b7686..0000000000
--- a/synapse/storage/schema/delta/v4.sql
+++ /dev/null
@@ -1,26 +0,0 @@
-/* Copyright 2014, 2015 OpenMarket Ltd
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-CREATE TABLE IF NOT EXISTS redactions (
- event_id TEXT NOT NULL,
- redacts TEXT NOT NULL,
- CONSTRAINT ev_uniq UNIQUE (event_id)
-);
-
-CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id);
-CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts);
-
-ALTER TABLE room_ops_levels ADD COLUMN redact_level INTEGER;
-
-PRAGMA user_version = 4;
diff --git a/synapse/storage/schema/delta/v5.sql b/synapse/storage/schema/delta/v5.sql
deleted file mode 100644
index 0874a15431..0000000000
--- a/synapse/storage/schema/delta/v5.sql
+++ /dev/null
@@ -1,30 +0,0 @@
-/* Copyright 2014, 2015 OpenMarket Ltd
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-CREATE TABLE IF NOT EXISTS user_ips (
- user TEXT NOT NULL,
- access_token TEXT NOT NULL,
- device_id TEXT,
- ip TEXT NOT NULL,
- user_agent TEXT NOT NULL,
- last_seen INTEGER NOT NULL,
- CONSTRAINT user_ip UNIQUE (user, access_token, ip, user_agent) ON CONFLICT REPLACE
-);
-
-CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user);
-
-ALTER TABLE users ADD COLUMN admin BOOL DEFAULT 0 NOT NULL;
-
-PRAGMA user_version = 5;
diff --git a/synapse/storage/schema/delta/v6.sql b/synapse/storage/schema/delta/v6.sql
deleted file mode 100644
index a9e0a4fe0d..0000000000
--- a/synapse/storage/schema/delta/v6.sql
+++ /dev/null
@@ -1,31 +0,0 @@
-/* Copyright 2014, 2015 OpenMarket Ltd
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-CREATE TABLE IF NOT EXISTS server_tls_certificates(
- server_name TEXT, -- Server name.
- fingerprint TEXT, -- Certificate fingerprint.
- from_server TEXT, -- Which key server the certificate was fetched from.
- ts_added_ms INTEGER, -- When the certifcate was added.
- tls_certificate BLOB, -- DER encoded x509 certificate.
- CONSTRAINT uniqueness UNIQUE (server_name, fingerprint)
-);
-
-CREATE TABLE IF NOT EXISTS server_signature_keys(
- server_name TEXT, -- Server name.
- key_id TEXT, -- Key version.
- from_server TEXT, -- Which key server the key was fetched form.
- ts_added_ms INTEGER, -- When the key was added.
- verify_key BLOB, -- NACL verification key.
- CONSTRAINT uniqueness UNIQUE (server_name, key_id)
-);
diff --git a/synapse/storage/schema/delta/v8.sql b/synapse/storage/schema/delta/v8.sql
deleted file mode 100644
index 1e9f8b18cb..0000000000
--- a/synapse/storage/schema/delta/v8.sql
+++ /dev/null
@@ -1,34 +0,0 @@
-/* Copyright 2014, 2015 OpenMarket Ltd
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- CREATE TABLE IF NOT EXISTS event_signatures_2 (
- event_id TEXT,
- signature_name TEXT,
- key_id TEXT,
- signature BLOB,
- CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
-);
-
-INSERT INTO event_signatures_2 (event_id, signature_name, key_id, signature)
-SELECT event_id, signature_name, key_id, signature FROM event_signatures;
-
-DROP TABLE event_signatures;
-ALTER TABLE event_signatures_2 RENAME TO event_signatures;
-
-CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
- event_id
-);
-
-PRAGMA user_version = 8;
\ No newline at end of file
diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql
deleted file mode 100644
index 455d51a70c..0000000000
--- a/synapse/storage/schema/delta/v9.sql
+++ /dev/null
@@ -1,79 +0,0 @@
-/* Copyright 2014, 2015 OpenMarket Ltd
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
--- To track destination health
-CREATE TABLE IF NOT EXISTS destinations(
- destination TEXT PRIMARY KEY,
- retry_last_ts INTEGER,
- retry_interval INTEGER
-);
-
-
-CREATE TABLE IF NOT EXISTS local_media_repository (
- media_id TEXT, -- The id used to refer to the media.
- media_type TEXT, -- The MIME-type of the media.
- media_length INTEGER, -- Length of the media in bytes.
- created_ts INTEGER, -- When the content was uploaded in ms.
- upload_name TEXT, -- The name the media was uploaded with.
- user_id TEXT, -- The user who uploaded the file.
- CONSTRAINT uniqueness UNIQUE (media_id)
-);
-
-CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
- media_id TEXT, -- The id used to refer to the media.
- thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
- thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
- thumbnail_type TEXT, -- The MIME-type of the thumbnail.
- thumbnail_method TEXT, -- The method used to make the thumbnail.
- thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
- CONSTRAINT uniqueness UNIQUE (
- media_id, thumbnail_width, thumbnail_height, thumbnail_type
- )
-);
-
-CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id
- ON local_media_repository_thumbnails (media_id);
-
-CREATE TABLE IF NOT EXISTS remote_media_cache (
- media_origin TEXT, -- The remote HS the media came from.
- media_id TEXT, -- The id used to refer to the media on that server.
- media_type TEXT, -- The MIME-type of the media.
- created_ts INTEGER, -- When the content was uploaded in ms.
- upload_name TEXT, -- The name the media was uploaded with.
- media_length INTEGER, -- Length of the media in bytes.
- filesystem_id TEXT, -- The name used to store the media on disk.
- CONSTRAINT uniqueness UNIQUE (media_origin, media_id)
-);
-
-CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
- media_origin TEXT, -- The remote HS the media came from.
- media_id TEXT, -- The id used to refer to the media.
- thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
- thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
- thumbnail_method TEXT, -- The method used to make the thumbnail
- thumbnail_type TEXT, -- The MIME-type of the thumbnail.
- thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
- filesystem_id TEXT, -- The name used to store the media on disk.
- CONSTRAINT uniqueness UNIQUE (
- media_origin, media_id, thumbnail_width, thumbnail_height,
- thumbnail_type, thumbnail_type
- )
-);
-
-CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
- ON local_media_repository_thumbnails (media_id);
-
-
-PRAGMA user_version = 9;
diff --git a/synapse/storage/schema/event_edges.sql b/synapse/storage/schema/full_schemas/11/event_edges.sql
index 1e766d6db2..1e766d6db2 100644
--- a/synapse/storage/schema/event_edges.sql
+++ b/synapse/storage/schema/full_schemas/11/event_edges.sql
diff --git a/synapse/storage/schema/event_signatures.sql b/synapse/storage/schema/full_schemas/11/event_signatures.sql
index c28c39c48a..c28c39c48a 100644
--- a/synapse/storage/schema/event_signatures.sql
+++ b/synapse/storage/schema/full_schemas/11/event_signatures.sql
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/full_schemas/11/im.sql
index dd00c1cd2f..dd00c1cd2f 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/full_schemas/11/im.sql
diff --git a/synapse/storage/schema/keys.sql b/synapse/storage/schema/full_schemas/11/keys.sql
index a9e0a4fe0d..a9e0a4fe0d 100644
--- a/synapse/storage/schema/keys.sql
+++ b/synapse/storage/schema/full_schemas/11/keys.sql
diff --git a/synapse/storage/schema/media_repository.sql b/synapse/storage/schema/full_schemas/11/media_repository.sql
index afdf48cbfb..afdf48cbfb 100644
--- a/synapse/storage/schema/media_repository.sql
+++ b/synapse/storage/schema/full_schemas/11/media_repository.sql
diff --git a/synapse/storage/schema/presence.sql b/synapse/storage/schema/full_schemas/11/presence.sql
index f9f8db9697..f9f8db9697 100644
--- a/synapse/storage/schema/presence.sql
+++ b/synapse/storage/schema/full_schemas/11/presence.sql
diff --git a/synapse/storage/schema/profiles.sql b/synapse/storage/schema/full_schemas/11/profiles.sql
index f06a528b4d..f06a528b4d 100644
--- a/synapse/storage/schema/profiles.sql
+++ b/synapse/storage/schema/full_schemas/11/profiles.sql
diff --git a/synapse/storage/schema/redactions.sql b/synapse/storage/schema/full_schemas/11/redactions.sql
index 5011d95db8..5011d95db8 100644
--- a/synapse/storage/schema/redactions.sql
+++ b/synapse/storage/schema/full_schemas/11/redactions.sql
diff --git a/synapse/storage/schema/room_aliases.sql b/synapse/storage/schema/full_schemas/11/room_aliases.sql
index 0d2df01603..0d2df01603 100644
--- a/synapse/storage/schema/room_aliases.sql
+++ b/synapse/storage/schema/full_schemas/11/room_aliases.sql
diff --git a/synapse/storage/schema/state.sql b/synapse/storage/schema/full_schemas/11/state.sql
index 1fe8f1e430..1fe8f1e430 100644
--- a/synapse/storage/schema/state.sql
+++ b/synapse/storage/schema/full_schemas/11/state.sql
diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/full_schemas/11/transactions.sql
index 2d30f99b06..2d30f99b06 100644
--- a/synapse/storage/schema/transactions.sql
+++ b/synapse/storage/schema/full_schemas/11/transactions.sql
diff --git a/synapse/storage/schema/users.sql b/synapse/storage/schema/full_schemas/11/users.sql
index 08ccfdac0a..08ccfdac0a 100644
--- a/synapse/storage/schema/users.sql
+++ b/synapse/storage/schema/full_schemas/11/users.sql
diff --git a/synapse/storage/schema/schema_version.sql b/synapse/storage/schema/schema_version.sql
new file mode 100644
index 0000000000..0431e2d051
--- /dev/null
+++ b/synapse/storage/schema/schema_version.sql
@@ -0,0 +1,30 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS schema_version(
+ Lock char(1) NOT NULL DEFAULT 'X', -- Makes sure this table only has one row.
+ version INTEGER NOT NULL,
+ upgraded BOOL NOT NULL, -- Whether we reached this version from an upgrade or an initial schema.
+ CONSTRAINT schema_version_lock_x CHECK (Lock='X')
+ CONSTRAINT schema_version_lock_uniq UNIQUE (Lock)
+);
+
+CREATE TABLE IF NOT EXISTS applied_schema_deltas(
+ version INTEGER NOT NULL,
+ file TEXT NOT NULL,
+ CONSTRAINT schema_deltas_ver_file UNIQUE (version, file) ON CONFLICT IGNORE
+);
+
+CREATE INDEX IF NOT EXISTS schema_deltas_ver ON applied_schema_deltas(version);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 8ac2adab05..09bc522210 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -36,6 +36,7 @@ what sort order was used:
from twisted.internet import defer
from ._base import SQLBaseStore
+from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.util.logutils import log_function
@@ -82,10 +83,10 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
def parse(cls, string):
try:
if string[0] == 's':
- return cls(None, int(string[1:]))
+ return cls(topological=None, stream=int(string[1:]))
if string[0] == 't':
parts = string[1:].split('-', 1)
- return cls(int(parts[1]), int(parts[0]))
+ return cls(topological=int(parts[0]), stream=int(parts[1]))
except:
pass
raise SynapseError(400, "Invalid token %r" % (string,))
@@ -94,7 +95,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
def parse_stream_token(cls, string):
try:
if string[0] == 's':
- return cls(None, int(string[1:]))
+ return cls(topological=None, stream=int(string[1:]))
except:
pass
raise SynapseError(400, "Invalid token %r" % (string,))
@@ -127,6 +128,85 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
class StreamStore(SQLBaseStore):
+
+ @defer.inlineCallbacks
+ def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
+ # NB this lives here instead of appservice.py so we can reuse the
+ # 'private' StreamToken class in this file.
+ if limit:
+ limit = max(limit, MAX_STREAM_SIZE)
+ else:
+ limit = MAX_STREAM_SIZE
+
+ # From and to keys should be integers from ordering.
+ from_id = _StreamToken.parse_stream_token(from_key)
+ to_id = _StreamToken.parse_stream_token(to_key)
+
+ if from_key == to_key:
+ defer.returnValue(([], to_key))
+ return
+
+ # select all the events between from/to with a sensible limit
+ sql = (
+ "SELECT e.event_id, e.room_id, e.type, s.state_key, "
+ "e.stream_ordering FROM events AS e LEFT JOIN state_events as s ON "
+ "e.event_id = s.event_id "
+ "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
+ "ORDER BY stream_ordering ASC LIMIT %(limit)d "
+ ) % {
+ "limit": limit
+ }
+
+ def f(txn):
+ # pull out all the events between the tokens
+ txn.execute(sql, (from_id.stream, to_id.stream,))
+ rows = self.cursor_to_dict(txn)
+
+ # Logic:
+ # - We want ALL events which match the AS room_id regex
+ # - We want ALL events which match the rooms represented by the AS
+ # room_alias regex
+ # - We want ALL events for rooms that AS users have joined.
+ # This is currently supported via get_app_service_rooms (which is
+ # used for the Notifier listener rooms). We can't reasonably make a
+ # SQL query for these room IDs, so we'll pull all the events between
+ # from/to and filter in python.
+ rooms_for_as = self._get_app_service_rooms_txn(txn, service)
+ room_ids_for_as = [r.room_id for r in rooms_for_as]
+
+ def app_service_interested(row):
+ if row["room_id"] in room_ids_for_as:
+ return True
+
+ if row["type"] == EventTypes.Member:
+ if service.is_interested_in_user(row.get("state_key")):
+ return True
+ return False
+
+ ret = self._get_events_txn(
+ txn,
+ # apply the filter on the room id list
+ [
+ r["event_id"] for r in rows
+ if app_service_interested(r)
+ ],
+ get_prev_content=True
+ )
+
+ self._set_before_and_after(ret, rows)
+
+ if rows:
+ key = "s%d" % max(r["stream_ordering"] for r in rows)
+ else:
+ # Assume we didn't get anything because there was nothing to
+ # get.
+ key = to_key
+
+ return ret, key
+
+ results = yield self.runInteraction("get_appservice_room_stream", f)
+ defer.returnValue(results)
+
@log_function
def get_room_events_stream(self, user_id, from_key, to_key, room_id,
limit=0, with_feedback=False):
@@ -181,8 +261,10 @@ class StreamStore(SQLBaseStore):
get_prev_content=True
)
+ self._set_before_and_after(ret, rows)
+
if rows:
- key = "s%d" % max([r["stream_ordering"] for r in rows])
+ key = "s%d" % max(r["stream_ordering"] for r in rows)
else:
# Assume we didn't get anything because there was nothing to
# get.
@@ -260,22 +342,44 @@ class StreamStore(SQLBaseStore):
get_prev_content=True
)
+ self._set_before_and_after(events, rows)
+
return events, next_token,
return self.runInteraction("paginate_room_events", f)
def get_recent_events_for_room(self, room_id, limit, end_token,
- with_feedback=False):
+ with_feedback=False, from_token=None):
# TODO (erikj): Handle compressed feedback
- sql = (
- "SELECT stream_ordering, topological_ordering, event_id FROM events "
- "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
- "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
- )
+ end_token = _StreamToken.parse_stream_token(end_token)
- def f(txn):
- txn.execute(sql, (room_id, end_token, limit,))
+ if from_token is None:
+ sql = (
+ "SELECT stream_ordering, topological_ordering, event_id"
+ " FROM events"
+ " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0"
+ " ORDER BY topological_ordering DESC, stream_ordering DESC"
+ " LIMIT ?"
+ )
+ else:
+ from_token = _StreamToken.parse_stream_token(from_token)
+ sql = (
+ "SELECT stream_ordering, topological_ordering, event_id"
+ " FROM events"
+ " WHERE room_id = ? AND stream_ordering > ?"
+ " AND stream_ordering <= ? AND outlier = 0"
+ " ORDER BY topological_ordering DESC, stream_ordering DESC"
+ " LIMIT ?"
+ )
+
+ def get_recent_events_for_room_txn(txn):
+ if from_token is None:
+ txn.execute(sql, (room_id, end_token.stream, limit,))
+ else:
+ txn.execute(sql, (
+ room_id, from_token.stream, end_token.stream, limit
+ ))
rows = self.cursor_to_dict(txn)
@@ -291,9 +395,9 @@ class StreamStore(SQLBaseStore):
toke = rows[0]["stream_ordering"] - 1
start_token = str(_StreamToken(topo, toke))
- token = (start_token, end_token)
+ token = (start_token, str(end_token))
else:
- token = (end_token, end_token)
+ token = (str(end_token), str(end_token))
events = self._get_events_txn(
txn,
@@ -301,9 +405,13 @@ class StreamStore(SQLBaseStore):
get_prev_content=True
)
+ self._set_before_and_after(events, rows)
+
return events, token
- return self.runInteraction("get_recent_events_for_room", f)
+ return self.runInteraction(
+ "get_recent_events_for_room", get_recent_events_for_room_txn
+ )
def get_room_events_max_id(self):
return self.runInteraction(
@@ -325,3 +433,12 @@ class StreamStore(SQLBaseStore):
key = res[0]["m"]
return "s%d" % (key,)
+
+ @staticmethod
+ def _set_before_and_after(events, rows):
+ for event, row in zip(events, rows):
+ stream = row["stream_ordering"]
+ topo = event.depth
+ internal = event.internal_metadata
+ internal.before = str(_StreamToken(topo, stream - 1))
+ internal.after = str(_StreamToken(topo, stream))
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index e06ef35690..0b8a3b7a07 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -13,12 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore, Table
+from ._base import SQLBaseStore, Table, cached
from collections import namedtuple
-from twisted.internet import defer
-
import logging
logger = logging.getLogger(__name__)
@@ -28,10 +26,6 @@ class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs.
"""
- # a write-through cache of DestinationsTable.EntryType indexed by
- # destination string
- destination_retry_cache = {}
-
def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
already responded to it. If so, return the response code and response
@@ -211,6 +205,7 @@ class TransactionStore(SQLBaseStore):
return ReceivedTransactionsTable.decode_results(txn.fetchall())
+ @cached()
def get_destination_retry_timings(self, destination):
"""Gets the current retry timings (if any) for a given destination.
@@ -221,9 +216,6 @@ class TransactionStore(SQLBaseStore):
None if not retrying
Otherwise a DestinationsTable.EntryType for the retry scheme
"""
- if destination in self.destination_retry_cache:
- return defer.succeed(self.destination_retry_cache[destination])
-
return self.runInteraction(
"get_destination_retry_timings",
self._get_destination_retry_timings, destination)
@@ -250,7 +242,9 @@ class TransactionStore(SQLBaseStore):
retry_interval (int) - how long until next retry in ms
"""
- self.destination_retry_cache[destination] = (
+ # As this is the new value, we might as well prefill the cache
+ self.get_destination_retry_timings.prefill(
+ destination,
DestinationsTable.EntryType(
destination,
retry_last_ts,
|