diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f154b1c8ae..e7443f2838 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -40,24 +40,16 @@ from .filtering import FilteringStore
from .end_to_end_keys import EndToEndKeyStore
from .receipts import ReceiptsStore
+from .search import SearchStore
+from .tags import TagsStore
-import fnmatch
-import imp
import logging
-import os
-import re
logger = logging.getLogger(__name__)
-# Remember to update this number every time a change is made to database
-# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 22
-
-dir_path = os.path.abspath(os.path.dirname(__file__))
-
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
@@ -79,6 +71,8 @@ class DataStore(RoomMemberStore, RoomStore,
EventsStore,
ReceiptsStore,
EndToEndKeyStore,
+ SearchStore,
+ TagsStore,
):
def __init__(self, hs):
@@ -94,9 +88,9 @@ class DataStore(RoomMemberStore, RoomStore,
)
@defer.inlineCallbacks
- def insert_client_ip(self, user, access_token, device_id, ip, user_agent):
+ def insert_client_ip(self, user, access_token, ip, user_agent):
now = int(self._clock.time_msec())
- key = (user.to_string(), access_token, device_id, ip)
+ key = (user.to_string(), access_token, ip)
try:
last_seen = self.client_ip_last_seen.get(key)
@@ -120,389 +114,44 @@ class DataStore(RoomMemberStore, RoomStore,
"user_agent": user_agent,
},
values={
- "device_id": device_id,
"last_seen": now,
},
desc="insert_client_ip",
lock=False,
)
+ @defer.inlineCallbacks
+ def count_daily_users(self):
+ """
+ Counts the number of users who used this homeserver in the last 24 hours.
+ """
+ def _count_users(txn):
+ txn.execute(
+ "SELECT COUNT(DISTINCT user_id) AS users"
+ " FROM user_ips"
+ " WHERE last_seen > ?",
+ # This is close enough to a day for our purposes.
+ (int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),)
+ )
+ rows = self.cursor_to_dict(txn)
+ if rows:
+ return rows[0]["users"]
+ return 0
+
+ ret = yield self.runInteraction("count_users", _count_users)
+ defer.returnValue(ret)
+
def get_user_ip_and_agents(self, user):
return self._simple_select_list(
table="user_ips",
keyvalues={"user_id": user.to_string()},
retcols=[
- "device_id", "access_token", "ip", "user_agent", "last_seen"
+ "access_token", "ip", "user_agent", "last_seen"
],
desc="get_user_ip_and_agents",
)
-def read_schema(path):
- """ Read the named database schema.
-
- Args:
- path: Path of the database schema.
- Returns:
- A string containing the database schema.
- """
- with open(path) as schema_file:
- return schema_file.read()
-
-
-class PrepareDatabaseException(Exception):
- pass
-
-
-class UpgradeDatabaseException(PrepareDatabaseException):
- pass
-
-
-def prepare_database(db_conn, database_engine):
- """Prepares a database for usage. Will either create all necessary tables
- or upgrade from an older schema version.
- """
- try:
- cur = db_conn.cursor()
- version_info = _get_or_create_schema_state(cur, database_engine)
-
- if version_info:
- user_version, delta_files, upgraded = version_info
- _upgrade_existing_database(
- cur, user_version, delta_files, upgraded, database_engine
- )
- else:
- _setup_new_database(cur, database_engine)
-
- # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
-
- cur.close()
- db_conn.commit()
- except:
- db_conn.rollback()
- raise
-
-
-def _setup_new_database(cur, database_engine):
- """Sets up the database by finding a base set of "full schemas" and then
- applying any necessary deltas.
-
- The "full_schemas" directory has subdirectories named after versions. This
- function searches for the highest version less than or equal to
- `SCHEMA_VERSION` and executes all .sql files in that directory.
-
- The function will then apply all deltas for all versions after the base
- version.
-
- Example directory structure:
-
- schema/
- delta/
- ...
- full_schemas/
- 3/
- test.sql
- ...
- 11/
- foo.sql
- bar.sql
- ...
-
- In the example foo.sql and bar.sql would be run, and then any delta files
- for versions strictly greater than 11.
- """
- current_dir = os.path.join(dir_path, "schema", "full_schemas")
- directory_entries = os.listdir(current_dir)
-
- valid_dirs = []
- pattern = re.compile(r"^\d+(\.sql)?$")
- for filename in directory_entries:
- match = pattern.match(filename)
- abs_path = os.path.join(current_dir, filename)
- if match and os.path.isdir(abs_path):
- ver = int(match.group(0))
- if ver <= SCHEMA_VERSION:
- valid_dirs.append((ver, abs_path))
- else:
- logger.warn("Unexpected entry in 'full_schemas': %s", filename)
-
- if not valid_dirs:
- raise PrepareDatabaseException(
- "Could not find a suitable base set of full schemas"
- )
-
- max_current_ver, sql_dir = max(valid_dirs, key=lambda x: x[0])
-
- logger.debug("Initialising schema v%d", max_current_ver)
-
- directory_entries = os.listdir(sql_dir)
-
- for filename in fnmatch.filter(directory_entries, "*.sql"):
- sql_loc = os.path.join(sql_dir, filename)
- logger.debug("Applying schema %s", sql_loc)
- executescript(cur, sql_loc)
-
- cur.execute(
- database_engine.convert_param_style(
- "INSERT INTO schema_version (version, upgraded)"
- " VALUES (?,?)"
- ),
- (max_current_ver, False,)
- )
-
- _upgrade_existing_database(
- cur,
- current_version=max_current_ver,
- applied_delta_files=[],
- upgraded=False,
- database_engine=database_engine,
- )
-
-
-def _upgrade_existing_database(cur, current_version, applied_delta_files,
- upgraded, database_engine):
- """Upgrades an existing database.
-
- Delta files can either be SQL stored in *.sql files, or python modules
- in *.py.
-
- There can be multiple delta files per version. Synapse will keep track of
- which delta files have been applied, and will apply any that haven't been
- even if there has been no version bump. This is useful for development
- where orthogonal schema changes may happen on separate branches.
-
- Different delta files for the same version *must* be orthogonal and give
- the same result when applied in any order. No guarantees are made on the
- order of execution of these scripts.
-
- This is a no-op of current_version == SCHEMA_VERSION.
-
- Example directory structure:
-
- schema/
- delta/
- 11/
- foo.sql
- ...
- 12/
- foo.sql
- bar.py
- ...
- full_schemas/
- ...
-
- In the example, if current_version is 11, then foo.sql will be run if and
- only if `upgraded` is True. Then `foo.sql` and `bar.py` would be run in
- some arbitrary order.
-
- Args:
- cur (Cursor)
- current_version (int): The current version of the schema.
- applied_delta_files (list): A list of deltas that have already been
- applied.
- upgraded (bool): Whether the current version was generated by having
- applied deltas or from full schema file. If `True` the function
- will never apply delta files for the given `current_version`, since
- the current_version wasn't generated by applying those delta files.
- """
-
- if current_version > SCHEMA_VERSION:
- raise ValueError(
- "Cannot use this database as it is too " +
- "new for the server to understand"
- )
-
- start_ver = current_version
- if not upgraded:
- start_ver += 1
-
- logger.debug("applied_delta_files: %s", applied_delta_files)
-
- for v in range(start_ver, SCHEMA_VERSION + 1):
- logger.debug("Upgrading schema to v%d", v)
-
- delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
-
- try:
- directory_entries = os.listdir(delta_dir)
- except OSError:
- logger.exception("Could not open delta dir for version %d", v)
- raise UpgradeDatabaseException(
- "Could not open delta dir for version %d" % (v,)
- )
-
- directory_entries.sort()
- for file_name in directory_entries:
- relative_path = os.path.join(str(v), file_name)
- logger.debug("Found file: %s", relative_path)
- if relative_path in applied_delta_files:
- continue
-
- absolute_path = os.path.join(
- dir_path, "schema", "delta", relative_path,
- )
- root_name, ext = os.path.splitext(file_name)
- if ext == ".py":
- # This is a python upgrade module. We need to import into some
- # package and then execute its `run_upgrade` function.
- module_name = "synapse.storage.v%d_%s" % (
- v, root_name
- )
- with open(absolute_path) as python_file:
- module = imp.load_source(
- module_name, absolute_path, python_file
- )
- logger.debug("Running script %s", relative_path)
- module.run_upgrade(cur, database_engine)
- elif ext == ".pyc":
- # Sometimes .pyc files turn up anyway even though we've
- # disabled their generation; e.g. from distribution package
- # installers. Silently skip it
- pass
- elif ext == ".sql":
- # A plain old .sql file, just read and execute it
- logger.debug("Applying schema %s", relative_path)
- executescript(cur, absolute_path)
- else:
- # Not a valid delta file.
- logger.warn(
- "Found directory entry that did not end in .py or"
- " .sql: %s",
- relative_path,
- )
- continue
-
- # Mark as done.
- cur.execute(
- database_engine.convert_param_style(
- "INSERT INTO applied_schema_deltas (version, file)"
- " VALUES (?,?)",
- ),
- (v, relative_path)
- )
-
- cur.execute("DELETE FROM schema_version")
- cur.execute(
- database_engine.convert_param_style(
- "INSERT INTO schema_version (version, upgraded)"
- " VALUES (?,?)",
- ),
- (v, True)
- )
-
-
-def get_statements(f):
- statement_buffer = ""
- in_comment = False # If we're in a /* ... */ style comment
-
- for line in f:
- line = line.strip()
-
- if in_comment:
- # Check if this line contains an end to the comment
- comments = line.split("*/", 1)
- if len(comments) == 1:
- continue
- line = comments[1]
- in_comment = False
-
- # Remove inline block comments
- line = re.sub(r"/\*.*\*/", " ", line)
-
- # Does this line start a comment?
- comments = line.split("/*", 1)
- if len(comments) > 1:
- line = comments[0]
- in_comment = True
-
- # Deal with line comments
- line = line.split("--", 1)[0]
- line = line.split("//", 1)[0]
-
- # Find *all* semicolons. We need to treat first and last entry
- # specially.
- statements = line.split(";")
-
- # We must prepend statement_buffer to the first statement
- first_statement = "%s %s" % (
- statement_buffer.strip(),
- statements[0].strip()
- )
- statements[0] = first_statement
-
- # Every entry, except the last, is a full statement
- for statement in statements[:-1]:
- yield statement.strip()
-
- # The last entry did *not* end in a semicolon, so we store it for the
- # next semicolon we find
- statement_buffer = statements[-1].strip()
-
-
-def executescript(txn, schema_path):
- with open(schema_path, 'r') as f:
- for statement in get_statements(f):
- txn.execute(statement)
-
-
-def _get_or_create_schema_state(txn, database_engine):
- # Bluntly try creating the schema_version tables.
- schema_path = os.path.join(
- dir_path, "schema", "schema_version.sql",
- )
- executescript(txn, schema_path)
-
- txn.execute("SELECT version, upgraded FROM schema_version")
- row = txn.fetchone()
- current_version = int(row[0]) if row else None
- upgraded = bool(row[1]) if row else None
-
- if current_version:
- txn.execute(
- database_engine.convert_param_style(
- "SELECT file FROM applied_schema_deltas WHERE version >= ?"
- ),
- (current_version,)
- )
- applied_deltas = [d for d, in txn.fetchall()]
- return current_version, applied_deltas, upgraded
-
- return None
-
-
-def prepare_sqlite3_database(db_conn):
- """This function should be called before `prepare_database` on sqlite3
- databases.
-
- Since we changed the way we store the current schema version and handle
- updates to schemas, we need a way to upgrade from the old method to the
- new. This only affects sqlite databases since they were the only ones
- supported at the time.
- """
- with db_conn:
- schema_path = os.path.join(
- dir_path, "schema", "schema_version.sql",
- )
- create_schema = read_schema(schema_path)
- db_conn.executescript(create_schema)
-
- c = db_conn.execute("SELECT * FROM schema_version")
- rows = c.fetchall()
- c.close()
-
- if not rows:
- c = db_conn.execute("PRAGMA user_version")
- row = c.fetchone()
- c.close()
-
- if row and row[0]:
- db_conn.execute(
- "REPLACE INTO schema_version (version, upgraded)"
- " VALUES (?,?)",
- (row[0], False)
- )
-
-
def are_all_users_on_domain(txn, database_engine, domain):
sql = database_engine.convert_param_style(
"SELECT COUNT(*) FROM users WHERE name NOT LIKE ?"
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index d976e17786..218e708054 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -25,8 +25,6 @@ from util.id_generators import IdGenerator, StreamIdGenerator
from twisted.internet import defer
-from collections import namedtuple
-
import sys
import time
import threading
@@ -181,6 +179,7 @@ class SQLBaseStore(object):
self._transaction_id_gen = IdGenerator("sent_transactions", "id", self)
self._state_groups_id_gen = IdGenerator("state_groups", "id", self)
self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self)
+ self._refresh_tokens_id_gen = IdGenerator("refresh_tokens", "id", self)
self._pushers_id_gen = IdGenerator("pushers", "id", self)
self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
@@ -375,9 +374,6 @@ class SQLBaseStore(object):
return self.runInteraction(desc, interaction)
- def _execute_and_decode(self, desc, query, *args):
- return self._execute(desc, self.cursor_to_dict, query, *args)
-
# "Simple" SQL API methods that operate on a single table with no JOINs,
# no complex WHERE clauses, just a dict of values for columns.
@@ -523,7 +519,7 @@ class SQLBaseStore(object):
allow_none=False,
desc="_simple_select_one_onecol"):
"""Executes a SELECT query on the named table, which is expected to
- return a single row, returning a single column from it."
+ return a single row, returning a single column from it.
Args:
table : string giving the table name
@@ -690,37 +686,6 @@ class SQLBaseStore(object):
return dict(zip(retcols, row))
- def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None,
- retcols=None, allow_none=False,
- desc="_simple_selectupdate_one"):
- """ Combined SELECT then UPDATE."""
- def func(txn):
- ret = None
- if retcols:
- ret = self._simple_select_one_txn(
- txn,
- table=table,
- keyvalues=keyvalues,
- retcols=retcols,
- allow_none=allow_none,
- )
-
- if updatevalues:
- self._simple_update_one_txn(
- txn,
- table=table,
- keyvalues=keyvalues,
- updatevalues=updatevalues,
- )
-
- # if txn.rowcount == 0:
- # raise StoreError(404, "No row found")
- if txn.rowcount > 1:
- raise StoreError(500, "More than one row matched")
-
- return ret
- return self.runInteraction(desc, func)
-
def _simple_delete_one(self, table, keyvalues, desc="_simple_delete_one"):
"""Executes a DELETE query on the named table, expecting to delete a
single row.
@@ -742,16 +707,6 @@ class SQLBaseStore(object):
raise StoreError(500, "more than one row matched")
return self.runInteraction(desc, func)
- def _simple_delete(self, table, keyvalues, desc="_simple_delete"):
- """Executes a DELETE query on the named table.
-
- Args:
- table : string giving the table name
- keyvalues : dict of column names and values to select the row with
- """
-
- return self.runInteraction(desc, self._simple_delete_txn)
-
def _simple_delete_txn(self, txn, table, keyvalues):
sql = "DELETE FROM %s WHERE %s" % (
table,
@@ -760,24 +715,6 @@ class SQLBaseStore(object):
return txn.execute(sql, keyvalues.values())
- def _simple_max_id(self, table):
- """Executes a SELECT query on the named table, expecting to return the
- max value for the column "id".
-
- Args:
- table : string giving the table name
- """
- sql = "SELECT MAX(id) AS id FROM %s" % table
-
- def func(txn):
- txn.execute(sql)
- max_id = self.cursor_to_dict(txn)[0]["id"]
- if max_id is None:
- return 0
- return max_id
-
- return self.runInteraction("_simple_max_id", func)
-
def get_next_stream_id(self):
with self._next_stream_id_lock:
i = self._next_stream_id
@@ -790,129 +727,3 @@ class _RollbackButIsFineException(Exception):
something went wrong.
"""
pass
-
-
-class Table(object):
- """ A base class used to store information about a particular table.
- """
-
- table_name = None
- """ str: The name of the table """
-
- fields = None
- """ list: The field names """
-
- EntryType = None
- """ Type: A tuple type used to decode the results """
-
- _select_where_clause = "SELECT %s FROM %s WHERE %s"
- _select_clause = "SELECT %s FROM %s"
- _insert_clause = "REPLACE INTO %s (%s) VALUES (%s)"
-
- @classmethod
- def select_statement(cls, where_clause=None):
- """
- Args:
- where_clause (str): The WHERE clause to use.
-
- Returns:
- str: An SQL statement to select rows from the table with the given
- WHERE clause.
- """
- if where_clause:
- return cls._select_where_clause % (
- ", ".join(cls.fields),
- cls.table_name,
- where_clause
- )
- else:
- return cls._select_clause % (
- ", ".join(cls.fields),
- cls.table_name,
- )
-
- @classmethod
- def insert_statement(cls):
- return cls._insert_clause % (
- cls.table_name,
- ", ".join(cls.fields),
- ", ".join(["?"] * len(cls.fields)),
- )
-
- @classmethod
- def decode_single_result(cls, results):
- """ Given an iterable of tuples, return a single instance of
- `EntryType` or None if the iterable is empty
- Args:
- results (list): The results list to convert to `EntryType`
- Returns:
- EntryType: An instance of `EntryType`
- """
- results = list(results)
- if results:
- return cls.EntryType(*results[0])
- else:
- return None
-
- @classmethod
- def decode_results(cls, results):
- """ Given an iterable of tuples, return a list of `EntryType`
- Args:
- results (list): The results list to convert to `EntryType`
-
- Returns:
- list: A list of `EntryType`
- """
- return [cls.EntryType(*row) for row in results]
-
- @classmethod
- def get_fields_string(cls, prefix=None):
- if prefix:
- to_join = ("%s.%s" % (prefix, f) for f in cls.fields)
- else:
- to_join = cls.fields
-
- return ", ".join(to_join)
-
-
-class JoinHelper(object):
- """ Used to help do joins on tables by looking at the tables' fields and
- creating a list of unique fields to use with SELECTs and a namedtuple
- to dump the results into.
-
- Attributes:
- tables (list): List of `Table` classes
- EntryType (type)
- """
-
- def __init__(self, *tables):
- self.tables = tables
-
- res = []
- for table in self.tables:
- res += [f for f in table.fields if f not in res]
-
- self.EntryType = namedtuple("JoinHelperEntry", res)
-
- def get_fields(self, **prefixes):
- """Get a string representing a list of fields for use in SELECT
- statements with the given prefixes applied to each.
-
- For example::
-
- JoinHelper(PdusTable, StateTable).get_fields(
- PdusTable="pdus",
- StateTable="state"
- )
- """
- res = []
- for field in self.EntryType._fields:
- for table in self.tables:
- if field in table.fields:
- res.append("%s.%s" % (prefixes[table.__name__], field))
- break
-
- return ", ".join(res)
-
- def decode_results(self, rows):
- return [self.EntryType(*row) for row in rows]
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
new file mode 100644
index 0000000000..45fccc2e5e
--- /dev/null
+++ b/synapse/storage/background_updates.py
@@ -0,0 +1,256 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore
+
+from twisted.internet import defer
+
+import ujson as json
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class BackgroundUpdatePerformance(object):
+ """Tracks the how long a background update is taking to update its items"""
+
+ def __init__(self, name):
+ self.name = name
+ self.total_item_count = 0
+ self.total_duration_ms = 0
+ self.avg_item_count = 0
+ self.avg_duration_ms = 0
+
+ def update(self, item_count, duration_ms):
+ """Update the stats after doing an update"""
+ self.total_item_count += item_count
+ self.total_duration_ms += duration_ms
+
+ # Exponential moving averages for the number of items updated and
+ # the duration.
+ self.avg_item_count += 0.1 * (item_count - self.avg_item_count)
+ self.avg_duration_ms += 0.1 * (duration_ms - self.avg_duration_ms)
+
+ def average_items_per_ms(self):
+ """An estimate of how long it takes to do a single update.
+ Returns:
+ A duration in ms as a float
+ """
+ if self.total_item_count == 0:
+ return None
+ else:
+ # Use the exponential moving average so that we can adapt to
+ # changes in how long the update process takes.
+ return float(self.avg_item_count) / float(self.avg_duration_ms)
+
+ def total_items_per_ms(self):
+ """An estimate of how long it takes to do a single update.
+ Returns:
+ A duration in ms as a float
+ """
+ if self.total_item_count == 0:
+ return None
+ else:
+ return float(self.total_item_count) / float(self.total_duration_ms)
+
+
+class BackgroundUpdateStore(SQLBaseStore):
+ """ Background updates are updates to the database that run in the
+ background. Each update processes a batch of data at once. We attempt to
+ limit the impact of each update by monitoring how long each batch takes to
+ process and autotuning the batch size.
+ """
+
+ MINIMUM_BACKGROUND_BATCH_SIZE = 100
+ DEFAULT_BACKGROUND_BATCH_SIZE = 100
+ BACKGROUND_UPDATE_INTERVAL_MS = 1000
+ BACKGROUND_UPDATE_DURATION_MS = 100
+
+ def __init__(self, hs):
+ super(BackgroundUpdateStore, self).__init__(hs)
+ self._background_update_performance = {}
+ self._background_update_queue = []
+ self._background_update_handlers = {}
+ self._background_update_timer = None
+
+ @defer.inlineCallbacks
+ def start_doing_background_updates(self):
+ while True:
+ if self._background_update_timer is not None:
+ return
+
+ sleep = defer.Deferred()
+ self._background_update_timer = self._clock.call_later(
+ self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
+ )
+ try:
+ yield sleep
+ finally:
+ self._background_update_timer = None
+
+ try:
+ result = yield self.do_background_update(
+ self.BACKGROUND_UPDATE_DURATION_MS
+ )
+ except:
+ logger.exception("Error doing update")
+
+ if result is None:
+ logger.info(
+ "No more background updates to do."
+ " Unscheduling background update task."
+ )
+ return
+
+ @defer.inlineCallbacks
+ def do_background_update(self, desired_duration_ms):
+ """Does some amount of work on a background update
+ Args:
+ desired_duration_ms(float): How long we want to spend
+ updating.
+ Returns:
+ A deferred that completes once some amount of work is done.
+ The deferred will have a value of None if there is currently
+ no more work to do.
+ """
+ if not self._background_update_queue:
+ updates = yield self._simple_select_list(
+ "background_updates",
+ keyvalues=None,
+ retcols=("update_name",),
+ )
+ for update in updates:
+ self._background_update_queue.append(update['update_name'])
+
+ if not self._background_update_queue:
+ defer.returnValue(None)
+
+ update_name = self._background_update_queue.pop(0)
+ self._background_update_queue.append(update_name)
+
+ update_handler = self._background_update_handlers[update_name]
+
+ performance = self._background_update_performance.get(update_name)
+
+ if performance is None:
+ performance = BackgroundUpdatePerformance(update_name)
+ self._background_update_performance[update_name] = performance
+
+ items_per_ms = performance.average_items_per_ms()
+
+ if items_per_ms is not None:
+ batch_size = int(desired_duration_ms * items_per_ms)
+ # Clamp the batch size so that we always make progress
+ batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE)
+ else:
+ batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
+
+ progress_json = yield self._simple_select_one_onecol(
+ "background_updates",
+ keyvalues={"update_name": update_name},
+ retcol="progress_json"
+ )
+
+ progress = json.loads(progress_json)
+
+ time_start = self._clock.time_msec()
+ items_updated = yield update_handler(progress, batch_size)
+ time_stop = self._clock.time_msec()
+
+ duration_ms = time_stop - time_start
+
+ logger.info(
+ "Updating %r. Updated %r items in %rms."
+ " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r)",
+ update_name, items_updated, duration_ms,
+ performance.total_items_per_ms(),
+ performance.average_items_per_ms(),
+ performance.total_item_count,
+ )
+
+ performance.update(items_updated, duration_ms)
+
+ defer.returnValue(len(self._background_update_performance))
+
+ def register_background_update_handler(self, update_name, update_handler):
+ """Register a handler for doing a background update.
+
+ The handler should take two arguments:
+
+ * A dict of the current progress
+ * An integer count of the number of items to update in this batch.
+
+ The handler should return a deferred integer count of items updated.
+ The hander is responsible for updating the progress of the update.
+
+ Args:
+ update_name(str): The name of the update that this code handles.
+ update_handler(function): The function that does the update.
+ """
+ self._background_update_handlers[update_name] = update_handler
+
+ def start_background_update(self, update_name, progress):
+ """Starts a background update running.
+
+ Args:
+ update_name: The update to set running.
+ progress: The initial state of the progress of the update.
+
+ Returns:
+ A deferred that completes once the task has been added to the
+ queue.
+ """
+ # Clear the background update queue so that we will pick up the new
+ # task on the next iteration of do_background_update.
+ self._background_update_queue = []
+ progress_json = json.dumps(progress)
+
+ return self._simple_insert(
+ "background_updates",
+ {"update_name": update_name, "progress_json": progress_json}
+ )
+
+ def _end_background_update(self, update_name):
+ """Removes a completed background update task from the queue.
+
+ Args:
+ update_name(str): The name of the completed task to remove
+ Returns:
+ A deferred that completes once the task is removed.
+ """
+ self._background_update_queue = [
+ name for name in self._background_update_queue if name != update_name
+ ]
+ return self._simple_delete_one(
+ "background_updates", keyvalues={"update_name": update_name}
+ )
+
+ def _background_update_progress_txn(self, txn, update_name, progress):
+ """Update the progress of a background update
+
+ Args:
+ txn(cursor): The transaction.
+ update_name(str): The name of the background update task
+ progress(dict): The progress of the update.
+ """
+
+ progress_json = json.dumps(progress)
+
+ self._simple_update_one_txn(
+ txn,
+ "background_updates",
+ keyvalues={"update_name": update_name},
+ updatevalues={"progress_json": progress_json},
+ )
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 4a855ffd56..98d66e0a86 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage import prepare_database
+from synapse.storage.prepare_database import prepare_database
from ._base import IncorrectDatabaseSetup
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index d18e2808d1..a5a54ec011 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -13,7 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage import prepare_database, prepare_sqlite3_database
+from synapse.storage.prepare_database import (
+ prepare_database, prepare_sqlite3_database
+)
+
+import struct
class Sqlite3Engine(object):
@@ -30,6 +34,7 @@ class Sqlite3Engine(object):
def on_new_connection(self, db_conn):
self.prepare_database(db_conn)
+ db_conn.create_function("rank", 1, _rank)
def prepare_database(self, db_conn):
prepare_sqlite3_database(db_conn)
@@ -43,3 +48,27 @@ class Sqlite3Engine(object):
def lock_table(self, txn, table):
return
+
+
+# Following functions taken from: https://github.com/coleifer/peewee
+
+def _parse_match_info(buf):
+ bufsize = len(buf)
+ return [struct.unpack('@I', buf[i:i+4])[0] for i in range(0, bufsize, 4)]
+
+
+def _rank(raw_match_info):
+ """Handle match_info called w/default args 'pcx' - based on the example rank
+ function http://sqlite.org/fts3.html#appendix_a
+ """
+ match_info = _parse_match_info(raw_match_info)
+ score = 0.0
+ p, c = match_info[:2]
+ for phrase_num in range(p):
+ phrase_info_idx = 2 + (phrase_num * c * 3)
+ for col_num in range(c):
+ col_idx = phrase_info_idx + (col_num * 3)
+ x1, x2 = match_info[col_idx:col_idx + 2]
+ if x1 > 0:
+ score += float(x1) / x2
+ return score
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 7cb314dee8..6d4421dd8f 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
-from syutil.base64util import encode_base64
+from unpaddedbase64 import encode_base64
import logging
from Queue import PriorityQueue, Empty
@@ -154,98 +154,6 @@ class EventFederationStore(SQLBaseStore):
return results
- def _get_latest_state_in_room(self, txn, room_id, type, state_key):
- event_ids = self._simple_select_onecol_txn(
- txn,
- table="state_forward_extremities",
- keyvalues={
- "room_id": room_id,
- "type": type,
- "state_key": state_key,
- },
- retcol="event_id",
- )
-
- results = []
- for event_id in event_ids:
- hashes = self._get_event_reference_hashes_txn(txn, event_id)
- prev_hashes = {
- k: encode_base64(v) for k, v in hashes.items()
- if k == "sha256"
- }
- results.append((event_id, prev_hashes))
-
- return results
-
- def _get_prev_events(self, txn, event_id):
- results = self._get_prev_events_and_state(
- txn,
- event_id,
- is_state=0,
- )
-
- return [(e_id, h, ) for e_id, h, _ in results]
-
- def _get_prev_state(self, txn, event_id):
- results = self._get_prev_events_and_state(
- txn,
- event_id,
- is_state=True,
- )
-
- return [(e_id, h, ) for e_id, h, _ in results]
-
- def _get_prev_events_and_state(self, txn, event_id, is_state=None):
- keyvalues = {
- "event_id": event_id,
- }
-
- if is_state is not None:
- keyvalues["is_state"] = bool(is_state)
-
- res = self._simple_select_list_txn(
- txn,
- table="event_edges",
- keyvalues=keyvalues,
- retcols=["prev_event_id", "is_state"],
- )
-
- hashes = self._get_prev_event_hashes_txn(txn, event_id)
-
- results = []
- for d in res:
- edge_hash = self._get_event_reference_hashes_txn(txn, d["prev_event_id"])
- edge_hash.update(hashes.get(d["prev_event_id"], {}))
- prev_hashes = {
- k: encode_base64(v)
- for k, v in edge_hash.items()
- if k == "sha256"
- }
- results.append((d["prev_event_id"], prev_hashes, d["is_state"]))
-
- return results
-
- def _get_auth_events(self, txn, event_id):
- auth_ids = self._simple_select_onecol_txn(
- txn,
- table="event_auth",
- keyvalues={
- "event_id": event_id,
- },
- retcol="auth_id",
- )
-
- results = []
- for auth_id in auth_ids:
- hashes = self._get_event_reference_hashes_txn(txn, auth_id)
- prev_hashes = {
- k: encode_base64(v) for k, v in hashes.items()
- if k == "sha256"
- }
- results.append((auth_id, prev_hashes))
-
- return results
-
def get_min_depth(self, room_id):
""" For hte given room, get the minimum depth we have seen for it.
"""
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 8774b3b388..5d35ca90b9 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
from _base import SQLBaseStore, _RollbackButIsFineException
from twisted.internet import defer, reactor
@@ -24,15 +23,23 @@ from synapse.util.logcontext import preserve_context_over_deferred
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
-from syutil.jsonutil import encode_json
+from canonicaljson import encode_canonical_json
from contextlib import contextmanager
import logging
+import math
import ujson as json
logger = logging.getLogger(__name__)
+def encode_json(json_object):
+ if USE_FROZEN_DICTS:
+ # ujson doesn't like frozen_dicts
+ return encode_canonical_json(json_object)
+ else:
+ return json.dumps(json_object, ensure_ascii=False)
+
# These values are used in the `enqueus_event` and `_do_fetch` methods to
# control how we batch/bulk fetch events from the database.
# The values are plucked out of thing air to make initial sync run faster
@@ -253,8 +260,7 @@ class EventsStore(SQLBaseStore):
)
metadata_json = encode_json(
- event.internal_metadata.get_dict(),
- using_frozen_dicts=USE_FROZEN_DICTS
+ event.internal_metadata.get_dict()
).decode("UTF-8")
sql = (
@@ -301,8 +307,14 @@ class EventsStore(SQLBaseStore):
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
self._store_room_topic_txn(txn, event)
+ elif event.type == EventTypes.Message:
+ self._store_room_message_txn(txn, event)
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
+ elif event.type == EventTypes.RoomHistoryVisibility:
+ self._store_history_visibility_txn(txn, event)
+ elif event.type == EventTypes.GuestAccess:
+ self._store_guest_access_txn(txn, event)
self._store_room_members_txn(
txn,
@@ -331,12 +343,9 @@ class EventsStore(SQLBaseStore):
"event_id": event.event_id,
"room_id": event.room_id,
"internal_metadata": encode_json(
- event.internal_metadata.get_dict(),
- using_frozen_dicts=USE_FROZEN_DICTS
- ).decode("UTF-8"),
- "json": encode_json(
- event_dict(event), using_frozen_dicts=USE_FROZEN_DICTS
+ event.internal_metadata.get_dict()
).decode("UTF-8"),
+ "json": encode_json(event_dict(event)).decode("UTF-8"),
}
for event, _ in events_and_contexts
],
@@ -355,9 +364,7 @@ class EventsStore(SQLBaseStore):
"type": event.type,
"processed": True,
"outlier": event.internal_metadata.is_outlier(),
- "content": encode_json(
- event.content, using_frozen_dicts=USE_FROZEN_DICTS
- ).decode("UTF-8"),
+ "content": encode_json(event.content).decode("UTF-8"),
}
for event, _ in events_and_contexts
],
@@ -824,7 +831,8 @@ class EventsStore(SQLBaseStore):
allow_none=True,
)
if prev:
- ev.unsigned["prev_content"] = prev.get_dict()["content"]
+ ev.unsigned["prev_content"] = prev.content
+ ev.unsigned["prev_sender"] = prev.sender
self._get_event_cache.prefill(
(ev.event_id, check_redacted, get_prev_content), ev
@@ -881,7 +889,8 @@ class EventsStore(SQLBaseStore):
get_prev_content=False,
)
if prev:
- ev.unsigned["prev_content"] = prev.get_dict()["content"]
+ ev.unsigned["prev_content"] = prev.content
+ ev.unsigned["prev_sender"] = prev.sender
self._get_event_cache.prefill(
(ev.event_id, check_redacted, get_prev_content), ev
@@ -889,18 +898,69 @@ class EventsStore(SQLBaseStore):
return ev
- def _parse_events(self, rows):
- return self.runInteraction(
- "_parse_events", self._parse_events_txn, rows
- )
-
def _parse_events_txn(self, txn, rows):
event_ids = [r["event_id"] for r in rows]
return self._get_events_txn(txn, event_ids)
- def _has_been_redacted_txn(self, txn, event):
- sql = "SELECT event_id FROM redactions WHERE redacts = ?"
- txn.execute(sql, (event.event_id,))
- result = txn.fetchone()
- return result[0] if result else None
+ @defer.inlineCallbacks
+ def count_daily_messages(self):
+ """
+ Returns an estimate of the number of messages sent in the last day.
+
+ If it has been significantly less or more than one day since the last
+ call to this function, it will return None.
+ """
+ def _count_messages(txn):
+ now = self.hs.get_clock().time()
+
+ txn.execute(
+ "SELECT reported_stream_token, reported_time FROM stats_reporting"
+ )
+ last_reported = self.cursor_to_dict(txn)
+
+ txn.execute(
+ "SELECT stream_ordering"
+ " FROM events"
+ " ORDER BY stream_ordering DESC"
+ " LIMIT 1"
+ )
+ now_reporting = self.cursor_to_dict(txn)
+ if not now_reporting:
+ return None
+ now_reporting = now_reporting[0]["stream_ordering"]
+
+ txn.execute("DELETE FROM stats_reporting")
+ txn.execute(
+ "INSERT INTO stats_reporting"
+ " (reported_stream_token, reported_time)"
+ " VALUES (?, ?)",
+ (now_reporting, now,)
+ )
+
+ if not last_reported:
+ return None
+
+ # Close enough to correct for our purposes.
+ yesterday = (now - 24 * 60 * 60)
+ if math.fabs(yesterday - last_reported[0]["reported_time"]) > 60 * 60:
+ return None
+
+ txn.execute(
+ "SELECT COUNT(*) as messages"
+ " FROM events NATURAL JOIN event_json"
+ " WHERE json like '%m.room.message%'"
+ " AND stream_ordering > ?"
+ " AND stream_ordering <= ?",
+ (
+ last_reported[0]["reported_stream_token"],
+ now_reporting,
+ )
+ )
+ rows = self.cursor_to_dict(txn)
+ if not rows:
+ return None
+ return rows[0]["messages"]
+
+ ret = yield self.runInteraction("count_messages", _count_messages)
+ defer.returnValue(ret)
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 8800116570..fcd43c7fdd 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -34,10 +34,10 @@ class FilteringStore(SQLBaseStore):
desc="get_user_filter",
)
- defer.returnValue(json.loads(def_json))
+ defer.returnValue(json.loads(str(def_json).decode("utf-8")))
def add_user_filter(self, user_localpart, user_filter):
- def_json = json.dumps(user_filter)
+ def_json = json.dumps(user_filter).encode("utf-8")
# Need an atomic transaction to SELECT the maximal ID so far then
# INSERT a new one
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index ffd6daa880..344cacdc75 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -19,7 +19,7 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks
from twisted.internet import defer
import OpenSSL
-from syutil.crypto.signing_key import decode_verify_key_bytes
+from signedjson.key import decode_verify_key_bytes
import hashlib
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
new file mode 100644
index 0000000000..1a74d6e360
--- /dev/null
+++ b/synapse/storage/prepare_database.py
@@ -0,0 +1,395 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import fnmatch
+import imp
+import logging
+import os
+import re
+
+
+logger = logging.getLogger(__name__)
+
+
+# Remember to update this number every time a change is made to database
+# schema files, so the users will be informed on server restarts.
+SCHEMA_VERSION = 25
+
+dir_path = os.path.abspath(os.path.dirname(__file__))
+
+
+def read_schema(path):
+ """ Read the named database schema.
+
+ Args:
+ path: Path of the database schema.
+ Returns:
+ A string containing the database schema.
+ """
+ with open(path) as schema_file:
+ return schema_file.read()
+
+
+class PrepareDatabaseException(Exception):
+ pass
+
+
+class UpgradeDatabaseException(PrepareDatabaseException):
+ pass
+
+
+def prepare_database(db_conn, database_engine):
+ """Prepares a database for usage. Will either create all necessary tables
+ or upgrade from an older schema version.
+ """
+ try:
+ cur = db_conn.cursor()
+ version_info = _get_or_create_schema_state(cur, database_engine)
+
+ if version_info:
+ user_version, delta_files, upgraded = version_info
+ _upgrade_existing_database(
+ cur, user_version, delta_files, upgraded, database_engine
+ )
+ else:
+ _setup_new_database(cur, database_engine)
+
+ # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
+
+ cur.close()
+ db_conn.commit()
+ except:
+ db_conn.rollback()
+ raise
+
+
+def _setup_new_database(cur, database_engine):
+ """Sets up the database by finding a base set of "full schemas" and then
+ applying any necessary deltas.
+
+ The "full_schemas" directory has subdirectories named after versions. This
+ function searches for the highest version less than or equal to
+ `SCHEMA_VERSION` and executes all .sql files in that directory.
+
+ The function will then apply all deltas for all versions after the base
+ version.
+
+ Example directory structure:
+
+ schema/
+ delta/
+ ...
+ full_schemas/
+ 3/
+ test.sql
+ ...
+ 11/
+ foo.sql
+ bar.sql
+ ...
+
+ In the example foo.sql and bar.sql would be run, and then any delta files
+ for versions strictly greater than 11.
+ """
+ current_dir = os.path.join(dir_path, "schema", "full_schemas")
+ directory_entries = os.listdir(current_dir)
+
+ valid_dirs = []
+ pattern = re.compile(r"^\d+(\.sql)?$")
+ for filename in directory_entries:
+ match = pattern.match(filename)
+ abs_path = os.path.join(current_dir, filename)
+ if match and os.path.isdir(abs_path):
+ ver = int(match.group(0))
+ if ver <= SCHEMA_VERSION:
+ valid_dirs.append((ver, abs_path))
+ else:
+ logger.warn("Unexpected entry in 'full_schemas': %s", filename)
+
+ if not valid_dirs:
+ raise PrepareDatabaseException(
+ "Could not find a suitable base set of full schemas"
+ )
+
+ max_current_ver, sql_dir = max(valid_dirs, key=lambda x: x[0])
+
+ logger.debug("Initialising schema v%d", max_current_ver)
+
+ directory_entries = os.listdir(sql_dir)
+
+ for filename in fnmatch.filter(directory_entries, "*.sql"):
+ sql_loc = os.path.join(sql_dir, filename)
+ logger.debug("Applying schema %s", sql_loc)
+ executescript(cur, sql_loc)
+
+ cur.execute(
+ database_engine.convert_param_style(
+ "INSERT INTO schema_version (version, upgraded)"
+ " VALUES (?,?)"
+ ),
+ (max_current_ver, False,)
+ )
+
+ _upgrade_existing_database(
+ cur,
+ current_version=max_current_ver,
+ applied_delta_files=[],
+ upgraded=False,
+ database_engine=database_engine,
+ )
+
+
+def _upgrade_existing_database(cur, current_version, applied_delta_files,
+ upgraded, database_engine):
+ """Upgrades an existing database.
+
+ Delta files can either be SQL stored in *.sql files, or python modules
+ in *.py.
+
+ There can be multiple delta files per version. Synapse will keep track of
+ which delta files have been applied, and will apply any that haven't been
+ even if there has been no version bump. This is useful for development
+ where orthogonal schema changes may happen on separate branches.
+
+ Different delta files for the same version *must* be orthogonal and give
+ the same result when applied in any order. No guarantees are made on the
+ order of execution of these scripts.
+
+ This is a no-op of current_version == SCHEMA_VERSION.
+
+ Example directory structure:
+
+ schema/
+ delta/
+ 11/
+ foo.sql
+ ...
+ 12/
+ foo.sql
+ bar.py
+ ...
+ full_schemas/
+ ...
+
+ In the example, if current_version is 11, then foo.sql will be run if and
+ only if `upgraded` is True. Then `foo.sql` and `bar.py` would be run in
+ some arbitrary order.
+
+ Args:
+ cur (Cursor)
+ current_version (int): The current version of the schema.
+ applied_delta_files (list): A list of deltas that have already been
+ applied.
+ upgraded (bool): Whether the current version was generated by having
+ applied deltas or from full schema file. If `True` the function
+ will never apply delta files for the given `current_version`, since
+ the current_version wasn't generated by applying those delta files.
+ """
+
+ if current_version > SCHEMA_VERSION:
+ raise ValueError(
+ "Cannot use this database as it is too " +
+ "new for the server to understand"
+ )
+
+ start_ver = current_version
+ if not upgraded:
+ start_ver += 1
+
+ logger.debug("applied_delta_files: %s", applied_delta_files)
+
+ for v in range(start_ver, SCHEMA_VERSION + 1):
+ logger.debug("Upgrading schema to v%d", v)
+
+ delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
+
+ try:
+ directory_entries = os.listdir(delta_dir)
+ except OSError:
+ logger.exception("Could not open delta dir for version %d", v)
+ raise UpgradeDatabaseException(
+ "Could not open delta dir for version %d" % (v,)
+ )
+
+ directory_entries.sort()
+ for file_name in directory_entries:
+ relative_path = os.path.join(str(v), file_name)
+ logger.debug("Found file: %s", relative_path)
+ if relative_path in applied_delta_files:
+ continue
+
+ absolute_path = os.path.join(
+ dir_path, "schema", "delta", relative_path,
+ )
+ root_name, ext = os.path.splitext(file_name)
+ if ext == ".py":
+ # This is a python upgrade module. We need to import into some
+ # package and then execute its `run_upgrade` function.
+ module_name = "synapse.storage.v%d_%s" % (
+ v, root_name
+ )
+ with open(absolute_path) as python_file:
+ module = imp.load_source(
+ module_name, absolute_path, python_file
+ )
+ logger.debug("Running script %s", relative_path)
+ module.run_upgrade(cur, database_engine)
+ elif ext == ".pyc":
+ # Sometimes .pyc files turn up anyway even though we've
+ # disabled their generation; e.g. from distribution package
+ # installers. Silently skip it
+ pass
+ elif ext == ".sql":
+ # A plain old .sql file, just read and execute it
+ logger.debug("Applying schema %s", relative_path)
+ executescript(cur, absolute_path)
+ else:
+ # Not a valid delta file.
+ logger.warn(
+ "Found directory entry that did not end in .py or"
+ " .sql: %s",
+ relative_path,
+ )
+ continue
+
+ # Mark as done.
+ cur.execute(
+ database_engine.convert_param_style(
+ "INSERT INTO applied_schema_deltas (version, file)"
+ " VALUES (?,?)",
+ ),
+ (v, relative_path)
+ )
+
+ cur.execute("DELETE FROM schema_version")
+ cur.execute(
+ database_engine.convert_param_style(
+ "INSERT INTO schema_version (version, upgraded)"
+ " VALUES (?,?)",
+ ),
+ (v, True)
+ )
+
+
+def get_statements(f):
+ statement_buffer = ""
+ in_comment = False # If we're in a /* ... */ style comment
+
+ for line in f:
+ line = line.strip()
+
+ if in_comment:
+ # Check if this line contains an end to the comment
+ comments = line.split("*/", 1)
+ if len(comments) == 1:
+ continue
+ line = comments[1]
+ in_comment = False
+
+ # Remove inline block comments
+ line = re.sub(r"/\*.*\*/", " ", line)
+
+ # Does this line start a comment?
+ comments = line.split("/*", 1)
+ if len(comments) > 1:
+ line = comments[0]
+ in_comment = True
+
+ # Deal with line comments
+ line = line.split("--", 1)[0]
+ line = line.split("//", 1)[0]
+
+ # Find *all* semicolons. We need to treat first and last entry
+ # specially.
+ statements = line.split(";")
+
+ # We must prepend statement_buffer to the first statement
+ first_statement = "%s %s" % (
+ statement_buffer.strip(),
+ statements[0].strip()
+ )
+ statements[0] = first_statement
+
+ # Every entry, except the last, is a full statement
+ for statement in statements[:-1]:
+ yield statement.strip()
+
+ # The last entry did *not* end in a semicolon, so we store it for the
+ # next semicolon we find
+ statement_buffer = statements[-1].strip()
+
+
+def executescript(txn, schema_path):
+ with open(schema_path, 'r') as f:
+ for statement in get_statements(f):
+ txn.execute(statement)
+
+
+def _get_or_create_schema_state(txn, database_engine):
+ # Bluntly try creating the schema_version tables.
+ schema_path = os.path.join(
+ dir_path, "schema", "schema_version.sql",
+ )
+ executescript(txn, schema_path)
+
+ txn.execute("SELECT version, upgraded FROM schema_version")
+ row = txn.fetchone()
+ current_version = int(row[0]) if row else None
+ upgraded = bool(row[1]) if row else None
+
+ if current_version:
+ txn.execute(
+ database_engine.convert_param_style(
+ "SELECT file FROM applied_schema_deltas WHERE version >= ?"
+ ),
+ (current_version,)
+ )
+ applied_deltas = [d for d, in txn.fetchall()]
+ return current_version, applied_deltas, upgraded
+
+ return None
+
+
+def prepare_sqlite3_database(db_conn):
+ """This function should be called before `prepare_database` on sqlite3
+ databases.
+
+ Since we changed the way we store the current schema version and handle
+ updates to schemas, we need a way to upgrade from the old method to the
+ new. This only affects sqlite databases since they were the only ones
+ supported at the time.
+ """
+ with db_conn:
+ schema_path = os.path.join(
+ dir_path, "schema", "schema_version.sql",
+ )
+ create_schema = read_schema(schema_path)
+ db_conn.executescript(create_schema)
+
+ c = db_conn.execute("SELECT * FROM schema_version")
+ rows = c.fetchall()
+ c.close()
+
+ if not rows:
+ c = db_conn.execute("PRAGMA user_version")
+ row = c.fetchone()
+ c.close()
+
+ if row and row[0]:
+ db_conn.execute(
+ "REPLACE INTO schema_version (version, upgraded)"
+ " VALUES (?,?)",
+ (row[0], False)
+ )
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 08ea62681b..345c4e1104 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -13,12 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore, Table
+from ._base import SQLBaseStore
from twisted.internet import defer
from synapse.api.errors import StoreError
-from syutil.jsonutil import encode_canonical_json
+from canonicaljson import encode_canonical_json
import logging
import simplejson as json
@@ -149,5 +149,5 @@ class PusherStore(SQLBaseStore):
)
-class PushersTable(Table):
+class PushersTable(object):
table_name = "pushers"
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 586628579d..2e5eddd259 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -51,6 +51,28 @@ class RegistrationStore(SQLBaseStore):
)
@defer.inlineCallbacks
+ def add_refresh_token_to_user(self, user_id, token):
+ """Adds a refresh token for the given user.
+
+ Args:
+ user_id (str): The user ID.
+ token (str): The new refresh token to add.
+ Raises:
+ StoreError if there was a problem adding this.
+ """
+ next_id = yield self._refresh_tokens_id_gen.get_next()
+
+ yield self._simple_insert(
+ "refresh_tokens",
+ {
+ "id": next_id,
+ "user_id": user_id,
+ "token": token
+ },
+ desc="add_refresh_token_to_user",
+ )
+
+ @defer.inlineCallbacks
def register(self, user_id, token, password_hash):
"""Attempts to register an account.
@@ -80,13 +102,14 @@ class RegistrationStore(SQLBaseStore):
400, "User ID already taken.", errcode=Codes.USER_IN_USE
)
- # it's possible for this to get a conflict, but only for a single user
- # since tokens are namespaced based on their user ID
- txn.execute(
- "INSERT INTO access_tokens(id, user_id, token)"
- " VALUES (?,?,?)",
- (next_id, user_id, token,)
- )
+ if token:
+ # it's possible for this to get a conflict, but only for a single user
+ # since tokens are namespaced based on their user ID
+ txn.execute(
+ "INSERT INTO access_tokens(id, user_id, token)"
+ " VALUES (?,?,?)",
+ (next_id, user_id, token,)
+ )
def get_user_by_id(self, user_id):
return self._simple_select_one(
@@ -146,26 +169,65 @@ class RegistrationStore(SQLBaseStore):
user_id
)
for r in rows:
- self.get_user_by_token.invalidate((r,))
+ self.get_user_by_access_token.invalidate((r,))
@cached()
- def get_user_by_token(self, token):
+ def get_user_by_access_token(self, token):
"""Get a user from the given access token.
Args:
token (str): The access token of a user.
Returns:
- dict: Including the name (user_id), device_id and whether they are
- an admin.
+ dict: Including the name (user_id) and the ID of their access token.
Raises:
StoreError if no user was found.
"""
return self.runInteraction(
- "get_user_by_token",
+ "get_user_by_access_token",
self._query_for_auth,
token
)
+ def exchange_refresh_token(self, refresh_token, token_generator):
+ """Exchange a refresh token for a new access token and refresh token.
+
+ Doing so invalidates the old refresh token - refresh tokens are single
+ use.
+
+ Args:
+ token (str): The refresh token of a user.
+ token_generator (fn: str -> str): Function which, when given a
+ user ID, returns a unique refresh token for that user. This
+ function must never return the same value twice.
+ Returns:
+ tuple of (user_id, refresh_token)
+ Raises:
+ StoreError if no user was found with that refresh token.
+ """
+ return self.runInteraction(
+ "exchange_refresh_token",
+ self._exchange_refresh_token,
+ refresh_token,
+ token_generator
+ )
+
+ def _exchange_refresh_token(self, txn, old_token, token_generator):
+ sql = "SELECT user_id FROM refresh_tokens WHERE token = ?"
+ txn.execute(sql, (old_token,))
+ rows = self.cursor_to_dict(txn)
+ if not rows:
+ raise StoreError(403, "Did not recognize refresh token")
+ user_id = rows[0]["user_id"]
+
+ # TODO(danielwh): Maybe perform a validation on the macaroon that
+ # macaroon.user_id == user_id.
+
+ new_token = token_generator(user_id)
+ sql = "UPDATE refresh_tokens SET token = ? WHERE token = ?"
+ txn.execute(sql, (new_token, old_token,))
+
+ return user_id, new_token
+
@defer.inlineCallbacks
def is_server_admin(self, user):
res = yield self._simple_select_one_onecol(
@@ -180,8 +242,7 @@ class RegistrationStore(SQLBaseStore):
def _query_for_auth(self, txn, token):
sql = (
- "SELECT users.name, users.admin,"
- " access_tokens.device_id, access_tokens.id as token_id"
+ "SELECT users.name, access_tokens.id as token_id"
" FROM users"
" INNER JOIN access_tokens on users.name = access_tokens.user_id"
" WHERE token = ?"
@@ -229,3 +290,16 @@ class RegistrationStore(SQLBaseStore):
if ret:
defer.returnValue(ret['user_id'])
defer.returnValue(None)
+
+ @defer.inlineCallbacks
+ def count_all_users(self):
+ """Counts all users registered on the homeserver."""
+ def _count_users(txn):
+ txn.execute("SELECT COUNT(*) AS users FROM users")
+ rows = self.cursor_to_dict(txn)
+ if rows:
+ return rows[0]["users"]
+ return 0
+
+ ret = yield self.runInteraction("count_users", _count_users)
+ defer.returnValue(ret)
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 5e07b7e0e5..4f08df478c 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -19,6 +19,7 @@ from synapse.api.errors import StoreError
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
+from .engines import PostgresEngine, Sqlite3Engine
import collections
import logging
@@ -98,34 +99,39 @@ class RoomStore(SQLBaseStore):
"""
def f(txn):
- topic_subquery = (
- "SELECT topics.event_id as event_id, "
- "topics.room_id as room_id, topic "
- "FROM topics "
- "INNER JOIN current_state_events as c "
- "ON c.event_id = topics.event_id "
- )
-
- name_subquery = (
- "SELECT room_names.event_id as event_id, "
- "room_names.room_id as room_id, name "
- "FROM room_names "
- "INNER JOIN current_state_events as c "
- "ON c.event_id = room_names.event_id "
- )
+ def subquery(table_name, column_name=None):
+ column_name = column_name or table_name
+ return (
+ "SELECT %(table_name)s.event_id as event_id, "
+ "%(table_name)s.room_id as room_id, %(column_name)s "
+ "FROM %(table_name)s "
+ "INNER JOIN current_state_events as c "
+ "ON c.event_id = %(table_name)s.event_id " % {
+ "column_name": column_name,
+ "table_name": table_name,
+ }
+ )
- # We use non printing ascii character US (\x1F) as a separator
sql = (
- "SELECT r.room_id, max(n.name), max(t.topic)"
+ "SELECT"
+ " r.room_id,"
+ " max(n.name),"
+ " max(t.topic),"
+ " max(v.history_visibility),"
+ " max(g.guest_access)"
" FROM rooms AS r"
" LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id"
" LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id"
+ " LEFT JOIN (%(history_visibility)s) AS v ON v.room_id = r.room_id"
+ " LEFT JOIN (%(guest_access)s) AS g ON g.room_id = r.room_id"
" WHERE r.is_public = ?"
- " GROUP BY r.room_id"
- ) % {
- "topic": topic_subquery,
- "name": name_subquery,
- }
+ " GROUP BY r.room_id" % {
+ "topic": subquery("topics", "topic"),
+ "name": subquery("room_names", "name"),
+ "history_visibility": subquery("history_visibility"),
+ "guest_access": subquery("guest_access"),
+ }
+ )
txn.execute(sql, (is_public,))
@@ -155,10 +161,12 @@ class RoomStore(SQLBaseStore):
"room_id": r[0],
"name": r[1],
"topic": r[2],
- "aliases": r[3],
+ "world_readable": r[3] == "world_readable",
+ "guest_can_join": r[4] == "can_join",
+ "aliases": r[5],
}
for r in rows
- if r[3] # We only return rooms that have at least one alias.
+ if r[5] # We only return rooms that have at least one alias.
]
defer.returnValue(ret)
@@ -175,6 +183,10 @@ class RoomStore(SQLBaseStore):
},
)
+ self._store_event_search_txn(
+ txn, event, "content.topic", event.content["topic"]
+ )
+
def _store_room_name_txn(self, txn, event):
if hasattr(event, "content") and "name" in event.content:
self._simple_insert_txn(
@@ -187,6 +199,52 @@ class RoomStore(SQLBaseStore):
}
)
+ self._store_event_search_txn(
+ txn, event, "content.name", event.content["name"]
+ )
+
+ def _store_room_message_txn(self, txn, event):
+ if hasattr(event, "content") and "body" in event.content:
+ self._store_event_search_txn(
+ txn, event, "content.body", event.content["body"]
+ )
+
+ def _store_history_visibility_txn(self, txn, event):
+ self._store_content_index_txn(txn, event, "history_visibility")
+
+ def _store_guest_access_txn(self, txn, event):
+ self._store_content_index_txn(txn, event, "guest_access")
+
+ def _store_content_index_txn(self, txn, event, key):
+ if hasattr(event, "content") and key in event.content:
+ sql = (
+ "INSERT INTO %(key)s"
+ " (event_id, room_id, %(key)s)"
+ " VALUES (?, ?, ?)" % {"key": key}
+ )
+ txn.execute(sql, (
+ event.event_id,
+ event.room_id,
+ event.content[key]
+ ))
+
+ def _store_event_search_txn(self, txn, event, key, value):
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = (
+ "INSERT INTO event_search (event_id, room_id, key, vector)"
+ " VALUES (?,?,?,to_tsvector('english', ?))"
+ )
+ elif isinstance(self.database_engine, Sqlite3Engine):
+ sql = (
+ "INSERT INTO event_search (event_id, room_id, key, value)"
+ " VALUES (?,?,?,?)"
+ )
+ else:
+ # This should be unreachable.
+ raise Exception("Unrecognized database engine")
+
+ txn.execute(sql, (event.event_id, event.room_id, key, value,))
+
@cachedInlineCallbacks()
def get_room_name_and_aliases(self, room_id):
def f(txn):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 8eee2dfbcc..ae1ad56d9a 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
RoomsForUser = namedtuple(
"RoomsForUser",
- ("room_id", "sender", "membership")
+ ("room_id", "sender", "membership", "event_id", "stream_ordering")
)
@@ -110,6 +110,33 @@ class RoomMemberStore(SQLBaseStore):
membership=membership,
).addCallback(self._get_events)
+ def get_invites_for_user(self, user_id):
+ """ Get all the invite events for a user
+ Args:
+ user_id (str): The user ID.
+ Returns:
+ A deferred list of event objects.
+ """
+
+ return self.get_rooms_for_user_where_membership_is(
+ user_id, [Membership.INVITE]
+ ).addCallback(lambda invites: self._get_events([
+ invites.event_id for invite in invites
+ ]))
+
+ def get_leave_and_ban_events_for_user(self, user_id):
+ """ Get all the leave events for a user
+ Args:
+ user_id (str): The user ID.
+ Returns:
+ A deferred list of event objects.
+ """
+ return self.get_rooms_for_user_where_membership_is(
+ user_id, (Membership.LEAVE, Membership.BAN)
+ ).addCallback(lambda leaves: self._get_events([
+ leave.event_id for leave in leaves
+ ]))
+
def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
""" Get all the rooms for this user where the membership for this user
matches one in the membership list.
@@ -141,11 +168,13 @@ class RoomMemberStore(SQLBaseStore):
args.extend(membership_list)
sql = (
- "SELECT m.room_id, m.sender, m.membership"
- " FROM room_memberships as m"
- " INNER JOIN current_state_events as c"
- " ON m.event_id = c.event_id "
- " AND m.room_id = c.room_id "
+ "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
+ " FROM current_state_events as c"
+ " INNER JOIN room_memberships as m"
+ " ON m.event_id = c.event_id"
+ " INNER JOIN events as e"
+ " ON e.event_id = c.event_id"
+ " AND m.room_id = c.room_id"
" AND m.user_id = c.state_key"
" WHERE %s"
) % (where_clause,)
@@ -176,12 +205,6 @@ class RoomMemberStore(SQLBaseStore):
return joined_domains
- def _get_members_query(self, where_clause, where_values):
- return self.runInteraction(
- "get_members_query", self._get_members_events_txn,
- where_clause, where_values
- ).addCallbacks(self._get_events)
-
def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None):
rows = self._get_members_rows_txn(
txn,
diff --git a/synapse/storage/schema/delta/23/drop_state_index.sql b/synapse/storage/schema/delta/23/drop_state_index.sql
new file mode 100644
index 0000000000..07d0ea5cb2
--- /dev/null
+++ b/synapse/storage/schema/delta/23/drop_state_index.sql
@@ -0,0 +1,16 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+DROP INDEX IF EXISTS state_groups_state_tuple;
diff --git a/synapse/storage/schema/delta/23/refresh_tokens.sql b/synapse/storage/schema/delta/23/refresh_tokens.sql
new file mode 100644
index 0000000000..437b1ac1be
--- /dev/null
+++ b/synapse/storage/schema/delta/23/refresh_tokens.sql
@@ -0,0 +1,21 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS refresh_tokens(
+ id INTEGER PRIMARY KEY,
+ token TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ UNIQUE (token)
+);
diff --git a/synapse/storage/schema/delta/24/stats_reporting.sql b/synapse/storage/schema/delta/24/stats_reporting.sql
new file mode 100644
index 0000000000..e9165d2917
--- /dev/null
+++ b/synapse/storage/schema/delta/24/stats_reporting.sql
@@ -0,0 +1,22 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Should only ever contain one row
+CREATE TABLE IF NOT EXISTS stats_reporting(
+ -- The stream ordering token which was most recently reported as stats
+ reported_stream_token INTEGER,
+ -- The time (seconds since epoch) stats were most recently reported
+ reported_time BIGINT
+);
diff --git a/synapse/storage/schema/delta/25/00background_updates.sql b/synapse/storage/schema/delta/25/00background_updates.sql
new file mode 100644
index 0000000000..41a9b59b1b
--- /dev/null
+++ b/synapse/storage/schema/delta/25/00background_updates.sql
@@ -0,0 +1,21 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE IF NOT EXISTS background_updates(
+ update_name TEXT NOT NULL, -- The name of the background update.
+ progress_json TEXT NOT NULL, -- The current progress of the update as JSON.
+ CONSTRAINT background_updates_uniqueness UNIQUE (update_name)
+);
diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
new file mode 100644
index 0000000000..5239d69073
--- /dev/null
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -0,0 +1,78 @@
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.storage.prepare_database import get_statements
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+
+import ujson
+
+logger = logging.getLogger(__name__)
+
+
+POSTGRES_TABLE = """
+CREATE TABLE IF NOT EXISTS event_search (
+ event_id TEXT,
+ room_id TEXT,
+ sender TEXT,
+ key TEXT,
+ vector tsvector
+);
+
+CREATE INDEX event_search_fts_idx ON event_search USING gin(vector);
+CREATE INDEX event_search_ev_idx ON event_search(event_id);
+CREATE INDEX event_search_ev_ridx ON event_search(room_id);
+"""
+
+
+SQLITE_TABLE = (
+ "CREATE VIRTUAL TABLE IF NOT EXISTS event_search"
+ " USING fts4 ( event_id, room_id, sender, key, value )"
+)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+ if isinstance(database_engine, PostgresEngine):
+ for statement in get_statements(POSTGRES_TABLE.splitlines()):
+ cur.execute(statement)
+ elif isinstance(database_engine, Sqlite3Engine):
+ cur.execute(SQLITE_TABLE)
+ else:
+ raise Exception("Unrecognized database engine")
+
+ cur.execute("SELECT MIN(stream_ordering) FROM events")
+ rows = cur.fetchall()
+ min_stream_id = rows[0][0]
+
+ cur.execute("SELECT MAX(stream_ordering) FROM events")
+ rows = cur.fetchall()
+ max_stream_id = rows[0][0]
+
+ if min_stream_id is not None and max_stream_id is not None:
+ progress = {
+ "target_min_stream_id_inclusive": min_stream_id,
+ "max_stream_id_exclusive": max_stream_id + 1,
+ "rows_inserted": 0,
+ }
+ progress_json = ujson.dumps(progress)
+
+ sql = (
+ "INSERT into background_updates (update_name, progress_json)"
+ " VALUES (?, ?)"
+ )
+
+ sql = database_engine.convert_param_style(sql)
+
+ cur.execute(sql, ("event_search", progress_json))
diff --git a/synapse/storage/schema/delta/25/guest_access.sql b/synapse/storage/schema/delta/25/guest_access.sql
new file mode 100644
index 0000000000..bdb90e7118
--- /dev/null
+++ b/synapse/storage/schema/delta/25/guest_access.sql
@@ -0,0 +1,25 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This is a manual index of guest_access content of state events,
+ * so that we can join on them in SELECT statements.
+ */
+CREATE TABLE IF NOT EXISTS guest_access(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ guest_access TEXT NOT NULL,
+ UNIQUE (event_id)
+);
diff --git a/synapse/storage/schema/delta/25/history_visibility.sql b/synapse/storage/schema/delta/25/history_visibility.sql
new file mode 100644
index 0000000000..532cb05151
--- /dev/null
+++ b/synapse/storage/schema/delta/25/history_visibility.sql
@@ -0,0 +1,25 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This is a manual index of history_visibility content of state events,
+ * so that we can join on them in SELECT statements.
+ */
+CREATE TABLE IF NOT EXISTS history_visibility(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ history_visibility TEXT NOT NULL,
+ UNIQUE (event_id)
+);
diff --git a/synapse/storage/schema/delta/25/tags.sql b/synapse/storage/schema/delta/25/tags.sql
new file mode 100644
index 0000000000..527424c998
--- /dev/null
+++ b/synapse/storage/schema/delta/25/tags.sql
@@ -0,0 +1,38 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE IF NOT EXISTS room_tags(
+ user_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ tag TEXT NOT NULL, -- The name of the tag.
+ content TEXT NOT NULL, -- The JSON content of the tag.
+ CONSTRAINT room_tag_uniqueness UNIQUE (user_id, room_id, tag)
+);
+
+CREATE TABLE IF NOT EXISTS room_tags_revisions (
+ user_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ stream_id BIGINT NOT NULL, -- The current version of the room tags.
+ CONSTRAINT room_tag_revisions_uniqueness UNIQUE (user_id, room_id)
+);
+
+CREATE TABLE IF NOT EXISTS private_user_data_max_stream_id(
+ Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
+ stream_id BIGINT NOT NULL,
+ CHECK (Lock='X')
+);
+
+INSERT INTO private_user_data_max_stream_id (stream_id) VALUES (0);
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
new file mode 100644
index 0000000000..380270b009
--- /dev/null
+++ b/synapse/storage/search.py
@@ -0,0 +1,307 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from .background_updates import BackgroundUpdateStore
+from synapse.api.errors import SynapseError
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class SearchStore(BackgroundUpdateStore):
+
+ EVENT_SEARCH_UPDATE_NAME = "event_search"
+
+ def __init__(self, hs):
+ super(SearchStore, self).__init__(hs)
+ self.register_background_update_handler(
+ self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search
+ )
+
+ @defer.inlineCallbacks
+ def _background_reindex_search(self, progress, batch_size):
+ target_min_stream_id = progress["target_min_stream_id_inclusive"]
+ max_stream_id = progress["max_stream_id_exclusive"]
+ rows_inserted = progress.get("rows_inserted", 0)
+
+ INSERT_CLUMP_SIZE = 1000
+ TYPES = ["m.room.name", "m.room.message", "m.room.topic"]
+
+ def reindex_search_txn(txn):
+ sql = (
+ "SELECT stream_ordering, event_id FROM events"
+ " WHERE ? <= stream_ordering AND stream_ordering < ?"
+ " AND (%s)"
+ " ORDER BY stream_ordering DESC"
+ " LIMIT ?"
+ ) % (" OR ".join("type = '%s'" % (t,) for t in TYPES),)
+
+ txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+ rows = txn.fetchall()
+ if not rows:
+ return 0
+
+ min_stream_id = rows[-1][0]
+ event_ids = [row[1] for row in rows]
+
+ events = self._get_events_txn(txn, event_ids)
+
+ event_search_rows = []
+ for event in events:
+ try:
+ event_id = event.event_id
+ room_id = event.room_id
+ content = event.content
+ if event.type == "m.room.message":
+ key = "content.body"
+ value = content["body"]
+ elif event.type == "m.room.topic":
+ key = "content.topic"
+ value = content["topic"]
+ elif event.type == "m.room.name":
+ key = "content.name"
+ value = content["name"]
+ except (KeyError, AttributeError):
+ # If the event is missing a necessary field then
+ # skip over it.
+ continue
+
+ event_search_rows.append((event_id, room_id, key, value))
+
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = (
+ "INSERT INTO event_search (event_id, room_id, key, vector)"
+ " VALUES (?,?,?,to_tsvector('english', ?))"
+ )
+ elif isinstance(self.database_engine, Sqlite3Engine):
+ sql = (
+ "INSERT INTO event_search (event_id, room_id, key, value)"
+ " VALUES (?,?,?,?)"
+ )
+ else:
+ # This should be unreachable.
+ raise Exception("Unrecognized database engine")
+
+ for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE):
+ clump = event_search_rows[index:index + INSERT_CLUMP_SIZE]
+ txn.executemany(sql, clump)
+
+ progress = {
+ "target_min_stream_id_inclusive": target_min_stream_id,
+ "max_stream_id_exclusive": min_stream_id,
+ "rows_inserted": rows_inserted + len(event_search_rows)
+ }
+
+ self._background_update_progress_txn(
+ txn, self.EVENT_SEARCH_UPDATE_NAME, progress
+ )
+
+ return len(event_search_rows)
+
+ result = yield self.runInteraction(
+ self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn
+ )
+
+ if not result:
+ yield self._end_background_update(self.EVENT_SEARCH_UPDATE_NAME)
+
+ defer.returnValue(result)
+
+ @defer.inlineCallbacks
+ def search_msgs(self, room_ids, search_term, keys):
+ """Performs a full text search over events with given keys.
+
+ Args:
+ room_ids (list): List of room ids to search in
+ search_term (str): Search term to search for
+ keys (list): List of keys to search in, currently supports
+ "content.body", "content.name", "content.topic"
+
+ Returns:
+ list of dicts
+ """
+ clauses = []
+ args = []
+
+ # Make sure we don't explode because the person is in too many rooms.
+ # We filter the results below regardless.
+ if len(room_ids) < 500:
+ clauses.append(
+ "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),)
+ )
+ args.extend(room_ids)
+
+ local_clauses = []
+ for key in keys:
+ local_clauses.append("key = ?")
+ args.append(key)
+
+ clauses.append(
+ "(%s)" % (" OR ".join(local_clauses),)
+ )
+
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = (
+ "SELECT ts_rank_cd(vector, query) AS rank, room_id, event_id"
+ " FROM plainto_tsquery('english', ?) as query, event_search"
+ " WHERE vector @@ query"
+ )
+ elif isinstance(self.database_engine, Sqlite3Engine):
+ sql = (
+ "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id"
+ " FROM event_search"
+ " WHERE value MATCH ?"
+ )
+ else:
+ # This should be unreachable.
+ raise Exception("Unrecognized database engine")
+
+ for clause in clauses:
+ sql += " AND " + clause
+
+ # We add an arbitrary limit here to ensure we don't try to pull the
+ # entire table from the database.
+ sql += " ORDER BY rank DESC LIMIT 500"
+
+ results = yield self._execute(
+ "search_msgs", self.cursor_to_dict, sql, *([search_term] + args)
+ )
+
+ results = filter(lambda row: row["room_id"] in room_ids, results)
+
+ events = yield self._get_events([r["event_id"] for r in results])
+
+ event_map = {
+ ev.event_id: ev
+ for ev in events
+ }
+
+ defer.returnValue([
+ {
+ "event": event_map[r["event_id"]],
+ "rank": r["rank"],
+ }
+ for r in results
+ if r["event_id"] in event_map
+ ])
+
+ @defer.inlineCallbacks
+ def search_room(self, room_id, search_term, keys, limit, pagination_token=None):
+ """Performs a full text search over events with given keys.
+
+ Args:
+ room_id (str): The room_id to search in
+ search_term (str): Search term to search for
+ keys (list): List of keys to search in, currently supports
+ "content.body", "content.name", "content.topic"
+ pagination_token (str): A pagination token previously returned
+
+ Returns:
+ list of dicts
+ """
+ clauses = []
+ args = [search_term, room_id]
+
+ local_clauses = []
+ for key in keys:
+ local_clauses.append("key = ?")
+ args.append(key)
+
+ clauses.append(
+ "(%s)" % (" OR ".join(local_clauses),)
+ )
+
+ if pagination_token:
+ try:
+ topo, stream = pagination_token.split(",")
+ topo = int(topo)
+ stream = int(stream)
+ except:
+ raise SynapseError(400, "Invalid pagination token")
+
+ clauses.append(
+ "(topological_ordering < ?"
+ " OR (topological_ordering = ? AND stream_ordering < ?))"
+ )
+ args.extend([topo, topo, stream])
+
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = (
+ "SELECT ts_rank_cd(vector, query) as rank,"
+ " topological_ordering, stream_ordering, room_id, event_id"
+ " FROM plainto_tsquery('english', ?) as query, event_search"
+ " NATURAL JOIN events"
+ " WHERE vector @@ query AND room_id = ?"
+ )
+ elif isinstance(self.database_engine, Sqlite3Engine):
+ # We use CROSS JOIN here to ensure we use the right indexes.
+ # https://sqlite.org/optoverview.html#crossjoin
+ #
+ # We want to use the full text search index on event_search to
+ # extract all possible matches first, then lookup those matches
+ # in the events table to get the topological ordering. We need
+ # to use the indexes in this order because sqlite refuses to
+ # MATCH unless it uses the full text search index
+ sql = (
+ "SELECT rank(matchinfo) as rank, room_id, event_id,"
+ " topological_ordering, stream_ordering"
+ " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo"
+ " FROM event_search"
+ " WHERE value MATCH ?"
+ " )"
+ " CROSS JOIN events USING (event_id)"
+ " WHERE room_id = ?"
+ )
+ else:
+ # This should be unreachable.
+ raise Exception("Unrecognized database engine")
+
+ for clause in clauses:
+ sql += " AND " + clause
+
+ # We add an arbitrary limit here to ensure we don't try to pull the
+ # entire table from the database.
+ sql += " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
+
+ args.append(limit)
+
+ results = yield self._execute(
+ "search_rooms", self.cursor_to_dict, sql, *args
+ )
+
+ events = yield self._get_events([r["event_id"] for r in results])
+
+ event_map = {
+ ev.event_id: ev
+ for ev in events
+ }
+
+ defer.returnValue([
+ {
+ "event": event_map[r["event_id"]],
+ "rank": r["rank"],
+ "pagination_token": "%s,%s" % (
+ r["topological_ordering"], r["stream_ordering"]
+ ),
+ }
+ for r in results
+ if r["event_id"] in event_map
+ ])
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 4f15e534b4..b070be504d 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -17,48 +17,13 @@ from twisted.internet import defer
from _base import SQLBaseStore
-from syutil.base64util import encode_base64
+from unpaddedbase64 import encode_base64
from synapse.crypto.event_signing import compute_event_reference_hash
class SignatureStore(SQLBaseStore):
"""Persistence for event signatures and hashes"""
- def _get_event_content_hashes_txn(self, txn, event_id):
- """Get all the hashes for a given Event.
- Args:
- txn (cursor):
- event_id (str): Id for the Event.
- Returns:
- A dict of algorithm -> hash.
- """
- query = (
- "SELECT algorithm, hash"
- " FROM event_content_hashes"
- " WHERE event_id = ?"
- )
- txn.execute(query, (event_id, ))
- return dict(txn.fetchall())
-
- def _store_event_content_hash_txn(self, txn, event_id, algorithm,
- hash_bytes):
- """Store a hash for a Event
- Args:
- txn (cursor):
- event_id (str): Id for the Event.
- algorithm (str): Hashing algorithm.
- hash_bytes (bytes): Hash function output bytes.
- """
- self._simple_insert_txn(
- txn,
- "event_content_hashes",
- {
- "event_id": event_id,
- "algorithm": algorithm,
- "hash": buffer(hash_bytes),
- },
- )
-
def get_event_reference_hashes(self, event_ids):
def f(txn):
return [
@@ -123,80 +88,3 @@ class SignatureStore(SQLBaseStore):
table="event_reference_hashes",
values=vals,
)
-
- def _get_event_signatures_txn(self, txn, event_id):
- """Get all the signatures for a given PDU.
- Args:
- txn (cursor):
- event_id (str): Id for the Event.
- Returns:
- A dict of sig name -> dict(key_id -> signature_bytes)
- """
- query = (
- "SELECT signature_name, key_id, signature"
- " FROM event_signatures"
- " WHERE event_id = ? "
- )
- txn.execute(query, (event_id, ))
- rows = txn.fetchall()
-
- res = {}
-
- for name, key, sig in rows:
- res.setdefault(name, {})[key] = sig
-
- return res
-
- def _store_event_signature_txn(self, txn, event_id, signature_name, key_id,
- signature_bytes):
- """Store a signature from the origin server for a PDU.
- Args:
- txn (cursor):
- event_id (str): Id for the Event.
- origin (str): origin of the Event.
- key_id (str): Id for the signing key.
- signature (bytes): The signature.
- """
- self._simple_insert_txn(
- txn,
- "event_signatures",
- {
- "event_id": event_id,
- "signature_name": signature_name,
- "key_id": key_id,
- "signature": buffer(signature_bytes),
- },
- )
-
- def _get_prev_event_hashes_txn(self, txn, event_id):
- """Get all the hashes for previous PDUs of a PDU
- Args:
- txn (cursor):
- event_id (str): Id for the Event.
- Returns:
- dict of (pdu_id, origin) -> dict of algorithm -> hash_bytes.
- """
- query = (
- "SELECT prev_event_id, algorithm, hash"
- " FROM event_edge_hashes"
- " WHERE event_id = ?"
- )
- txn.execute(query, (event_id, ))
- results = {}
- for prev_event_id, algorithm, hash_bytes in txn.fetchall():
- hashes = results.setdefault(prev_event_id, {})
- hashes[algorithm] = hash_bytes
- return results
-
- def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id,
- algorithm, hash_bytes):
- self._simple_insert_txn(
- txn,
- "event_edge_hashes",
- {
- "event_id": event_id,
- "prev_event_id": prev_event_id,
- "algorithm": algorithm,
- "hash": buffer(hash_bytes),
- },
- )
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 9630efcfcc..80e9b63f50 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -20,8 +20,6 @@ from synapse.util.caches.descriptors import (
from twisted.internet import defer
-from synapse.util.stringutils import random_string
-
import logging
logger = logging.getLogger(__name__)
@@ -56,7 +54,7 @@ class StateStore(SQLBaseStore):
defer.returnValue({})
event_to_groups = yield self._get_state_group_for_events(
- room_id, event_ids,
+ event_ids,
)
groups = set(event_to_groups.values())
@@ -210,13 +208,12 @@ class StateStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def get_state_for_events(self, room_id, event_ids, types):
+ def get_state_for_events(self, event_ids, types):
"""Given a list of event_ids and type tuples, return a list of state
dicts for each event. The state dicts will only have the type/state_keys
that are in the `types` list.
Args:
- room_id (str)
event_ids (list)
types (list): List of (type, state_key) tuples which are used to
filter the state fetched. `state_key` may be None, which matches
@@ -227,7 +224,7 @@ class StateStore(SQLBaseStore):
The dicts are mappings from (type, state_key) -> state_events
"""
event_to_groups = yield self._get_state_group_for_events(
- room_id, event_ids,
+ event_ids,
)
groups = set(event_to_groups.values())
@@ -240,6 +237,20 @@ class StateStore(SQLBaseStore):
defer.returnValue({event: event_to_state[event] for event in event_ids})
+ @defer.inlineCallbacks
+ def get_state_for_event(self, event_id, types=None):
+ """
+ Get the state dict corresponding to a particular event
+
+ :param str event_id: event whose state should be returned
+ :param list[(str, str)]|None types: List of (type, state_key) tuples
+ which are used to filter the state fetched. May be None, which
+ matches any key
+ :return: a deferred dict from (type, state_key) -> state_event
+ """
+ state_map = yield self.get_state_for_events([event_id], types)
+ defer.returnValue(state_map[event_id])
+
@cached(num_args=2, lru=True, max_entries=10000)
def _get_state_group_for_event(self, room_id, event_id):
return self._simple_select_one_onecol(
@@ -253,8 +264,8 @@ class StateStore(SQLBaseStore):
)
@cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids",
- num_args=2)
- def _get_state_group_for_events(self, room_id, event_ids):
+ num_args=1)
+ def _get_state_group_for_events(self, event_ids):
"""Returns mapping event_id -> state_group
"""
def f(txn):
@@ -428,7 +439,3 @@ class StateStore(SQLBaseStore):
}
defer.returnValue(results)
-
-
-def _make_group_id(clock):
- return str(int(clock.time_msec())) + random_string(5)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index d7fe423f5a..be8ba76aae 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -23,7 +23,7 @@ paginate bacwards.
This is implemented by keeping two ordering columns: stream_ordering and
topological_ordering. Stream ordering is basically insertion/received order
-(except for events from backfill requests). The topolgical_ordering is a
+(except for events from backfill requests). The topological_ordering is a
weak ordering of events based on the pdu graph.
This means that we have to have two different types of tokens, depending on
@@ -158,16 +158,40 @@ class StreamStore(SQLBaseStore):
defer.returnValue(results)
@log_function
- def get_room_events_stream(self, user_id, from_key, to_key, room_id,
- limit=0, with_feedback=False):
- # TODO (erikj): Handle compressed feedback
-
- current_room_membership_sql = (
- "SELECT m.room_id FROM room_memberships as m "
- " INNER JOIN current_state_events as c"
- " ON m.event_id = c.event_id AND c.state_key = m.user_id"
- " WHERE m.user_id = ? AND m.membership = 'join'"
- )
+ def get_room_events_stream(
+ self,
+ user_id,
+ from_key,
+ to_key,
+ limit=0,
+ is_guest=False,
+ room_ids=None
+ ):
+ room_ids = room_ids or []
+ room_ids = [r for r in room_ids]
+ if is_guest:
+ current_room_membership_sql = (
+ "SELECT c.room_id FROM history_visibility AS h"
+ " INNER JOIN current_state_events AS c"
+ " ON h.event_id = c.event_id"
+ " WHERE c.room_id IN (%s) AND h.history_visibility = 'world_readable'" % (
+ ",".join(map(lambda _: "?", room_ids))
+ )
+ )
+ current_room_membership_args = room_ids
+ else:
+ current_room_membership_sql = (
+ "SELECT m.room_id FROM room_memberships as m "
+ " INNER JOIN current_state_events as c"
+ " ON m.event_id = c.event_id AND c.state_key = m.user_id"
+ " WHERE m.user_id = ? AND m.membership = 'join'"
+ )
+ current_room_membership_args = [user_id]
+ if room_ids:
+ current_room_membership_sql += " AND m.room_id in (%s)" % (
+ ",".join(map(lambda _: "?", room_ids))
+ )
+ current_room_membership_args = [user_id] + room_ids
# We also want to get any membership events about that user, e.g.
# invites or leave notifications.
@@ -176,6 +200,7 @@ class StreamStore(SQLBaseStore):
"INNER JOIN current_state_events as c ON m.event_id = c.event_id "
"WHERE m.user_id = ? "
)
+ membership_args = [user_id]
if limit:
limit = max(limit, MAX_STREAM_SIZE)
@@ -202,7 +227,9 @@ class StreamStore(SQLBaseStore):
}
def f(txn):
- txn.execute(sql, (False, user_id, user_id, from_id.stream, to_id.stream,))
+ args = ([False] + current_room_membership_args + membership_args +
+ [from_id.stream, to_id.stream])
+ txn.execute(sql, args)
rows = self.cursor_to_dict(txn)
@@ -227,10 +254,7 @@ class StreamStore(SQLBaseStore):
@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
- direction='b', limit=-1,
- with_feedback=False):
- # TODO (erikj): Handle compressed feedback
-
+ direction='b', limit=-1):
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
@@ -302,7 +326,6 @@ class StreamStore(SQLBaseStore):
@cachedInlineCallbacks(num_args=4)
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
- # TODO (erikj): Handle compressed feedback
end_token = RoomStreamToken.parse_stream_token(end_token)
@@ -379,6 +402,38 @@ class StreamStore(SQLBaseStore):
)
defer.returnValue("t%d-%d" % (topo, token))
+ def get_stream_token_for_event(self, event_id):
+ """The stream token for an event
+ Args:
+ event_id(str): The id of the event to look up a stream token for.
+ Raises:
+ StoreError if the event wasn't in the database.
+ Returns:
+ A deferred "s%d" stream token.
+ """
+ return self._simple_select_one_onecol(
+ table="events",
+ keyvalues={"event_id": event_id},
+ retcol="stream_ordering",
+ ).addCallback(lambda row: "s%d" % (row,))
+
+ def get_topological_token_for_event(self, event_id):
+ """The stream token for an event
+ Args:
+ event_id(str): The id of the event to look up a stream token for.
+ Raises:
+ StoreError if the event wasn't in the database.
+ Returns:
+ A deferred "t%d-%d" topological token.
+ """
+ return self._simple_select_one(
+ table="events",
+ keyvalues={"event_id": event_id},
+ retcols=("stream_ordering", "topological_ordering"),
+ ).addCallback(lambda row: "t%d-%d" % (
+ row["topological_ordering"], row["stream_ordering"],)
+ )
+
def _get_max_topological_txn(self, txn):
txn.execute(
"SELECT MAX(topological_ordering) FROM events"
@@ -410,3 +465,138 @@ class StreamStore(SQLBaseStore):
internal = event.internal_metadata
internal.before = str(RoomStreamToken(topo, stream - 1))
internal.after = str(RoomStreamToken(topo, stream))
+
+ @defer.inlineCallbacks
+ def get_events_around(self, room_id, event_id, before_limit, after_limit):
+ """Retrieve events and pagination tokens around a given event in a
+ room.
+
+ Args:
+ room_id (str)
+ event_id (str)
+ before_limit (int)
+ after_limit (int)
+
+ Returns:
+ dict
+ """
+
+ results = yield self.runInteraction(
+ "get_events_around", self._get_events_around_txn,
+ room_id, event_id, before_limit, after_limit
+ )
+
+ events_before = yield self._get_events(
+ [e for e in results["before"]["event_ids"]],
+ get_prev_content=True
+ )
+
+ events_after = yield self._get_events(
+ [e for e in results["after"]["event_ids"]],
+ get_prev_content=True
+ )
+
+ defer.returnValue({
+ "events_before": events_before,
+ "events_after": events_after,
+ "start": results["before"]["token"],
+ "end": results["after"]["token"],
+ })
+
+ def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit):
+ """Retrieves event_ids and pagination tokens around a given event in a
+ room.
+
+ Args:
+ room_id (str)
+ event_id (str)
+ before_limit (int)
+ after_limit (int)
+
+ Returns:
+ dict
+ """
+
+ results = self._simple_select_one_txn(
+ txn,
+ "events",
+ keyvalues={
+ "event_id": event_id,
+ "room_id": room_id,
+ },
+ retcols=["stream_ordering", "topological_ordering"],
+ )
+
+ stream_ordering = results["stream_ordering"]
+ topological_ordering = results["topological_ordering"]
+
+ query_before = (
+ "SELECT topological_ordering, stream_ordering, event_id FROM events"
+ " WHERE room_id = ? AND (topological_ordering < ?"
+ " OR (topological_ordering = ? AND stream_ordering < ?))"
+ " ORDER BY topological_ordering DESC, stream_ordering DESC"
+ " LIMIT ?"
+ )
+
+ query_after = (
+ "SELECT topological_ordering, stream_ordering, event_id FROM events"
+ " WHERE room_id = ? AND (topological_ordering > ?"
+ " OR (topological_ordering = ? AND stream_ordering > ?))"
+ " ORDER BY topological_ordering ASC, stream_ordering ASC"
+ " LIMIT ?"
+ )
+
+ txn.execute(
+ query_before,
+ (
+ room_id, topological_ordering, topological_ordering,
+ stream_ordering, before_limit,
+ )
+ )
+
+ rows = self.cursor_to_dict(txn)
+ events_before = [r["event_id"] for r in rows]
+
+ if rows:
+ start_token = str(RoomStreamToken(
+ rows[0]["topological_ordering"],
+ rows[0]["stream_ordering"] - 1,
+ ))
+ else:
+ start_token = str(RoomStreamToken(
+ topological_ordering,
+ stream_ordering - 1,
+ ))
+
+ txn.execute(
+ query_after,
+ (
+ room_id, topological_ordering, topological_ordering,
+ stream_ordering, after_limit,
+ )
+ )
+
+ rows = self.cursor_to_dict(txn)
+ events_after = [r["event_id"] for r in rows]
+
+ if rows:
+ end_token = str(RoomStreamToken(
+ rows[-1]["topological_ordering"],
+ rows[-1]["stream_ordering"],
+ ))
+ else:
+ end_token = str(RoomStreamToken(
+ topological_ordering,
+ stream_ordering,
+ ))
+
+ return {
+ "before": {
+ "event_ids": events_before,
+ "token": start_token,
+ },
+ "after": {
+ "event_ids": events_after,
+ "token": end_token,
+ },
+ }
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
new file mode 100644
index 0000000000..bf695b7800
--- /dev/null
+++ b/synapse/storage/tags.py
@@ -0,0 +1,216 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore
+from synapse.util.caches.descriptors import cached
+from twisted.internet import defer
+from .util.id_generators import StreamIdGenerator
+
+import ujson as json
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class TagsStore(SQLBaseStore):
+ def __init__(self, hs):
+ super(TagsStore, self).__init__(hs)
+
+ self._private_user_data_id_gen = StreamIdGenerator(
+ "private_user_data_max_stream_id", "stream_id"
+ )
+
+ def get_max_private_user_data_stream_id(self):
+ """Get the current max stream id for the private user data stream
+
+ Returns:
+ A deferred int.
+ """
+ return self._private_user_data_id_gen.get_max_token(self)
+
+ @cached()
+ def get_tags_for_user(self, user_id):
+ """Get all the tags for a user.
+
+
+ Args:
+ user_id(str): The user to get the tags for.
+ Returns:
+ A deferred dict mapping from room_id strings to lists of tag
+ strings.
+ """
+
+ deferred = self._simple_select_list(
+ "room_tags", {"user_id": user_id}, ["room_id", "tag", "content"]
+ )
+
+ @deferred.addCallback
+ def tags_by_room(rows):
+ tags_by_room = {}
+ for row in rows:
+ room_tags = tags_by_room.setdefault(row["room_id"], {})
+ room_tags[row["tag"]] = json.loads(row["content"])
+ return tags_by_room
+
+ return deferred
+
+ @defer.inlineCallbacks
+ def get_updated_tags(self, user_id, stream_id):
+ """Get all the tags for the rooms where the tags have changed since the
+ given version
+
+ Args:
+ user_id(str): The user to get the tags for.
+ stream_id(int): The earliest update to get for the user.
+ Returns:
+ A deferred dict mapping from room_id strings to lists of tag
+ strings for all the rooms that changed since the stream_id token.
+ """
+ def get_updated_tags_txn(txn):
+ sql = (
+ "SELECT room_id from room_tags_revisions"
+ " WHERE user_id = ? AND stream_id > ?"
+ )
+ txn.execute(sql, (user_id, stream_id))
+ room_ids = [row[0] for row in txn.fetchall()]
+ return room_ids
+
+ room_ids = yield self.runInteraction(
+ "get_updated_tags", get_updated_tags_txn
+ )
+
+ results = {}
+ if room_ids:
+ tags_by_room = yield self.get_tags_for_user(user_id)
+ for room_id in room_ids:
+ results[room_id] = tags_by_room.get(room_id, {})
+
+ defer.returnValue(results)
+
+ def get_tags_for_room(self, user_id, room_id):
+ """Get all the tags for the given room
+ Args:
+ user_id(str): The user to get tags for
+ room_id(str): The room to get tags for
+ Returns:
+ A deferred list of string tags.
+ """
+ return self._simple_select_list(
+ table="room_tags",
+ keyvalues={"user_id": user_id, "room_id": room_id},
+ retcols=("tag", "content"),
+ desc="get_tags_for_room",
+ ).addCallback(lambda rows: {
+ row["tag"]: json.loads(row["content"]) for row in rows
+ })
+
+ @defer.inlineCallbacks
+ def add_tag_to_room(self, user_id, room_id, tag, content):
+ """Add a tag to a room for a user.
+ Args:
+ user_id(str): The user to add a tag for.
+ room_id(str): The room to add a tag for.
+ tag(str): The tag name to add.
+ content(dict): A json object to associate with the tag.
+ Returns:
+ A deferred that completes once the tag has been added.
+ """
+ content_json = json.dumps(content)
+
+ def add_tag_txn(txn, next_id):
+ self._simple_upsert_txn(
+ txn,
+ table="room_tags",
+ keyvalues={
+ "user_id": user_id,
+ "room_id": room_id,
+ "tag": tag,
+ },
+ values={
+ "content": content_json,
+ }
+ )
+ self._update_revision_txn(txn, user_id, room_id, next_id)
+
+ with (yield self._private_user_data_id_gen.get_next(self)) as next_id:
+ yield self.runInteraction("add_tag", add_tag_txn, next_id)
+
+ self.get_tags_for_user.invalidate((user_id,))
+
+ result = yield self._private_user_data_id_gen.get_max_token(self)
+ defer.returnValue(result)
+
+ @defer.inlineCallbacks
+ def remove_tag_from_room(self, user_id, room_id, tag):
+ """Remove a tag from a room for a user.
+ Returns:
+ A deferred that completes once the tag has been removed
+ """
+ def remove_tag_txn(txn, next_id):
+ sql = (
+ "DELETE FROM room_tags "
+ " WHERE user_id = ? AND room_id = ? AND tag = ?"
+ )
+ txn.execute(sql, (user_id, room_id, tag))
+ self._update_revision_txn(txn, user_id, room_id, next_id)
+
+ with (yield self._private_user_data_id_gen.get_next(self)) as next_id:
+ yield self.runInteraction("remove_tag", remove_tag_txn, next_id)
+
+ self.get_tags_for_user.invalidate((user_id,))
+
+ result = yield self._private_user_data_id_gen.get_max_token(self)
+ defer.returnValue(result)
+
+ def _update_revision_txn(self, txn, user_id, room_id, next_id):
+ """Update the latest revision of the tags for the given user and room.
+
+ Args:
+ txn: The database cursor
+ user_id(str): The ID of the user.
+ room_id(str): The ID of the room.
+ next_id(int): The the revision to advance to.
+ """
+
+ update_max_id_sql = (
+ "UPDATE private_user_data_max_stream_id"
+ " SET stream_id = ?"
+ " WHERE stream_id < ?"
+ )
+ txn.execute(update_max_id_sql, (next_id, next_id))
+
+ update_sql = (
+ "UPDATE room_tags_revisions"
+ " SET stream_id = ?"
+ " WHERE user_id = ?"
+ " AND room_id = ?"
+ )
+ txn.execute(update_sql, (next_id, user_id, room_id))
+
+ if txn.rowcount == 0:
+ insert_sql = (
+ "INSERT INTO room_tags_revisions (user_id, room_id, stream_id)"
+ " VALUES (?, ?, ?)"
+ )
+ try:
+ txn.execute(insert_sql, (user_id, room_id, next_id))
+ except self.database_engine.module.IntegrityError:
+ # Ignore insertion errors. It doesn't matter if the row wasn't
+ # inserted because if two updates happend concurrently the one
+ # with the higher stream_id will not be reported to a client
+ # unless the previous update has completed. It doesn't matter
+ # which stream_id ends up in the table, as long as it is higher
+ # than the id that the client has.
+ pass
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index c8c7e6591a..ad099775eb 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -18,7 +18,7 @@ from synapse.util.caches.descriptors import cached
from collections import namedtuple
-from syutil.jsonutil import encode_canonical_json
+from canonicaljson import encode_canonical_json
import logging
logger = logging.getLogger(__name__)
@@ -59,7 +59,7 @@ class TransactionStore(SQLBaseStore):
allow_none=True,
)
- if result and result.response_code:
+ if result and result["response_code"]:
return result["response_code"], result["response_json"]
else:
return None
@@ -253,16 +253,6 @@ class TransactionStore(SQLBaseStore):
retry_interval (int) - how long until next retry in ms
"""
- # As this is the new value, we might as well prefill the cache
- self.get_destination_retry_timings.prefill(
- destination,
- {
- "destination": destination,
- "retry_last_ts": retry_last_ts,
- "retry_interval": retry_interval
- },
- )
-
# XXX: we could chose to not bother persisting this if our cache thinks
# this is a NOOP
return self.runInteraction(
@@ -275,31 +265,25 @@ class TransactionStore(SQLBaseStore):
def _set_destination_retry_timings(self, txn, destination,
retry_last_ts, retry_interval):
- query = (
- "UPDATE destinations"
- " SET retry_last_ts = ?, retry_interval = ?"
- " WHERE destination = ?"
- )
+ txn.call_after(self.get_destination_retry_timings.invalidate, (destination,))
- txn.execute(
- query,
- (
- retry_last_ts, retry_interval, destination,
- )
+ self._simple_upsert_txn(
+ txn,
+ "destinations",
+ keyvalues={
+ "destination": destination,
+ },
+ values={
+ "retry_last_ts": retry_last_ts,
+ "retry_interval": retry_interval,
+ },
+ insertion_values={
+ "destination": destination,
+ "retry_last_ts": retry_last_ts,
+ "retry_interval": retry_interval,
+ }
)
- if txn.rowcount == 0:
- # destination wasn't already in table. Insert it.
- self._simple_insert_txn(
- txn,
- table="destinations",
- values={
- "destination": destination,
- "retry_last_ts": retry_last_ts,
- "retry_interval": retry_interval,
- }
- )
-
def get_destinations_needing_retry(self):
"""Get all destinations which are due a retry for sending a transaction.
|