diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
new file mode 100644
index 0000000000..45fccc2e5e
--- /dev/null
+++ b/synapse/storage/background_updates.py
@@ -0,0 +1,256 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore
+
+from twisted.internet import defer
+
+import ujson as json
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class BackgroundUpdatePerformance(object):
+ """Tracks the how long a background update is taking to update its items"""
+
+ def __init__(self, name):
+ self.name = name
+ self.total_item_count = 0
+ self.total_duration_ms = 0
+ self.avg_item_count = 0
+ self.avg_duration_ms = 0
+
+ def update(self, item_count, duration_ms):
+ """Update the stats after doing an update"""
+ self.total_item_count += item_count
+ self.total_duration_ms += duration_ms
+
+ # Exponential moving averages for the number of items updated and
+ # the duration.
+ self.avg_item_count += 0.1 * (item_count - self.avg_item_count)
+ self.avg_duration_ms += 0.1 * (duration_ms - self.avg_duration_ms)
+
+ def average_items_per_ms(self):
+ """An estimate of how long it takes to do a single update.
+ Returns:
+ A duration in ms as a float
+ """
+ if self.total_item_count == 0:
+ return None
+ else:
+ # Use the exponential moving average so that we can adapt to
+ # changes in how long the update process takes.
+ return float(self.avg_item_count) / float(self.avg_duration_ms)
+
+ def total_items_per_ms(self):
+ """An estimate of how long it takes to do a single update.
+ Returns:
+ A duration in ms as a float
+ """
+ if self.total_item_count == 0:
+ return None
+ else:
+ return float(self.total_item_count) / float(self.total_duration_ms)
+
+
+class BackgroundUpdateStore(SQLBaseStore):
+ """ Background updates are updates to the database that run in the
+ background. Each update processes a batch of data at once. We attempt to
+ limit the impact of each update by monitoring how long each batch takes to
+ process and autotuning the batch size.
+ """
+
+ MINIMUM_BACKGROUND_BATCH_SIZE = 100
+ DEFAULT_BACKGROUND_BATCH_SIZE = 100
+ BACKGROUND_UPDATE_INTERVAL_MS = 1000
+ BACKGROUND_UPDATE_DURATION_MS = 100
+
+ def __init__(self, hs):
+ super(BackgroundUpdateStore, self).__init__(hs)
+ self._background_update_performance = {}
+ self._background_update_queue = []
+ self._background_update_handlers = {}
+ self._background_update_timer = None
+
+ @defer.inlineCallbacks
+ def start_doing_background_updates(self):
+ while True:
+ if self._background_update_timer is not None:
+ return
+
+ sleep = defer.Deferred()
+ self._background_update_timer = self._clock.call_later(
+ self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
+ )
+ try:
+ yield sleep
+ finally:
+ self._background_update_timer = None
+
+ try:
+ result = yield self.do_background_update(
+ self.BACKGROUND_UPDATE_DURATION_MS
+ )
+ except:
+ logger.exception("Error doing update")
+
+ if result is None:
+ logger.info(
+ "No more background updates to do."
+ " Unscheduling background update task."
+ )
+ return
+
+ @defer.inlineCallbacks
+ def do_background_update(self, desired_duration_ms):
+ """Does some amount of work on a background update
+ Args:
+ desired_duration_ms(float): How long we want to spend
+ updating.
+ Returns:
+ A deferred that completes once some amount of work is done.
+ The deferred will have a value of None if there is currently
+ no more work to do.
+ """
+ if not self._background_update_queue:
+ updates = yield self._simple_select_list(
+ "background_updates",
+ keyvalues=None,
+ retcols=("update_name",),
+ )
+ for update in updates:
+ self._background_update_queue.append(update['update_name'])
+
+ if not self._background_update_queue:
+ defer.returnValue(None)
+
+ update_name = self._background_update_queue.pop(0)
+ self._background_update_queue.append(update_name)
+
+ update_handler = self._background_update_handlers[update_name]
+
+ performance = self._background_update_performance.get(update_name)
+
+ if performance is None:
+ performance = BackgroundUpdatePerformance(update_name)
+ self._background_update_performance[update_name] = performance
+
+ items_per_ms = performance.average_items_per_ms()
+
+ if items_per_ms is not None:
+ batch_size = int(desired_duration_ms * items_per_ms)
+ # Clamp the batch size so that we always make progress
+ batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE)
+ else:
+ batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
+
+ progress_json = yield self._simple_select_one_onecol(
+ "background_updates",
+ keyvalues={"update_name": update_name},
+ retcol="progress_json"
+ )
+
+ progress = json.loads(progress_json)
+
+ time_start = self._clock.time_msec()
+ items_updated = yield update_handler(progress, batch_size)
+ time_stop = self._clock.time_msec()
+
+ duration_ms = time_stop - time_start
+
+ logger.info(
+ "Updating %r. Updated %r items in %rms."
+ " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r)",
+ update_name, items_updated, duration_ms,
+ performance.total_items_per_ms(),
+ performance.average_items_per_ms(),
+ performance.total_item_count,
+ )
+
+ performance.update(items_updated, duration_ms)
+
+ defer.returnValue(len(self._background_update_performance))
+
+ def register_background_update_handler(self, update_name, update_handler):
+ """Register a handler for doing a background update.
+
+ The handler should take two arguments:
+
+ * A dict of the current progress
+ * An integer count of the number of items to update in this batch.
+
+ The handler should return a deferred integer count of items updated.
+ The hander is responsible for updating the progress of the update.
+
+ Args:
+ update_name(str): The name of the update that this code handles.
+ update_handler(function): The function that does the update.
+ """
+ self._background_update_handlers[update_name] = update_handler
+
+ def start_background_update(self, update_name, progress):
+ """Starts a background update running.
+
+ Args:
+ update_name: The update to set running.
+ progress: The initial state of the progress of the update.
+
+ Returns:
+ A deferred that completes once the task has been added to the
+ queue.
+ """
+ # Clear the background update queue so that we will pick up the new
+ # task on the next iteration of do_background_update.
+ self._background_update_queue = []
+ progress_json = json.dumps(progress)
+
+ return self._simple_insert(
+ "background_updates",
+ {"update_name": update_name, "progress_json": progress_json}
+ )
+
+ def _end_background_update(self, update_name):
+ """Removes a completed background update task from the queue.
+
+ Args:
+ update_name(str): The name of the completed task to remove
+ Returns:
+ A deferred that completes once the task is removed.
+ """
+ self._background_update_queue = [
+ name for name in self._background_update_queue if name != update_name
+ ]
+ return self._simple_delete_one(
+ "background_updates", keyvalues={"update_name": update_name}
+ )
+
+ def _background_update_progress_txn(self, txn, update_name, progress):
+ """Update the progress of a background update
+
+ Args:
+ txn(cursor): The transaction.
+ update_name(str): The name of the background update task
+ progress(dict): The progress of the update.
+ """
+
+ progress_json = json.dumps(progress)
+
+ self._simple_update_one_txn(
+ txn,
+ "background_updates",
+ keyvalues={"update_name": update_name},
+ updatevalues={"progress_json": progress_json},
+ )
diff --git a/synapse/storage/schema/delta/25/00background_updates.sql b/synapse/storage/schema/delta/25/00background_updates.sql
new file mode 100644
index 0000000000..41a9b59b1b
--- /dev/null
+++ b/synapse/storage/schema/delta/25/00background_updates.sql
@@ -0,0 +1,21 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE IF NOT EXISTS background_updates(
+ update_name TEXT NOT NULL, -- The name of the background update.
+ progress_json TEXT NOT NULL, -- The current progress of the update as JSON.
+ CONSTRAINT background_updates_uniqueness UNIQUE (update_name)
+);
diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
index b7cd0ce3b8..5239d69073 100644
--- a/synapse/storage/schema/delta/25/fts.py
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -22,7 +22,7 @@ import ujson
logger = logging.getLogger(__name__)
-POSTGRES_SQL = """
+POSTGRES_TABLE = """
CREATE TABLE IF NOT EXISTS event_search (
event_id TEXT,
room_id TEXT,
@@ -31,22 +31,6 @@ CREATE TABLE IF NOT EXISTS event_search (
vector tsvector
);
-INSERT INTO event_search SELECT
- event_id, room_id, json::json->>'sender', 'content.body',
- to_tsvector('english', json::json->'content'->>'body')
- FROM events NATURAL JOIN event_json WHERE type = 'm.room.message';
-
-INSERT INTO event_search SELECT
- event_id, room_id, json::json->>'sender', 'content.name',
- to_tsvector('english', json::json->'content'->>'name')
- FROM events NATURAL JOIN event_json WHERE type = 'm.room.name';
-
-INSERT INTO event_search SELECT
- event_id, room_id, json::json->>'sender', 'content.topic',
- to_tsvector('english', json::json->'content'->>'topic')
- FROM events NATURAL JOIN event_json WHERE type = 'm.room.topic';
-
-
CREATE INDEX event_search_fts_idx ON event_search USING gin(vector);
CREATE INDEX event_search_ev_idx ON event_search(event_id);
CREATE INDEX event_search_ev_ridx ON event_search(room_id);
@@ -61,67 +45,34 @@ SQLITE_TABLE = (
def run_upgrade(cur, database_engine, *args, **kwargs):
if isinstance(database_engine, PostgresEngine):
- run_postgres_upgrade(cur)
- return
+ for statement in get_statements(POSTGRES_TABLE.splitlines()):
+ cur.execute(statement)
+ elif isinstance(database_engine, Sqlite3Engine):
+ cur.execute(SQLITE_TABLE)
+ else:
+ raise Exception("Unrecognized database engine")
- if isinstance(database_engine, Sqlite3Engine):
- run_sqlite_upgrade(cur)
- return
+ cur.execute("SELECT MIN(stream_ordering) FROM events")
+ rows = cur.fetchall()
+ min_stream_id = rows[0][0]
+ cur.execute("SELECT MAX(stream_ordering) FROM events")
+ rows = cur.fetchall()
+ max_stream_id = rows[0][0]
-def run_postgres_upgrade(cur):
- for statement in get_statements(POSTGRES_SQL.splitlines()):
- cur.execute(statement)
+ if min_stream_id is not None and max_stream_id is not None:
+ progress = {
+ "target_min_stream_id_inclusive": min_stream_id,
+ "max_stream_id_exclusive": max_stream_id + 1,
+ "rows_inserted": 0,
+ }
+ progress_json = ujson.dumps(progress)
+ sql = (
+ "INSERT into background_updates (update_name, progress_json)"
+ " VALUES (?, ?)"
+ )
-def run_sqlite_upgrade(cur):
- cur.execute(SQLITE_TABLE)
+ sql = database_engine.convert_param_style(sql)
- rowid = -1
- while True:
- cur.execute(
- "SELECT rowid, json FROM event_json"
- " WHERE rowid > ?"
- " ORDER BY rowid ASC LIMIT 100",
- (rowid,)
- )
-
- res = cur.fetchall()
-
- if not res:
- break
-
- events = [
- ujson.loads(js)
- for _, js in res
- ]
-
- rowid = max(rid for rid, _ in res)
-
- rows = []
- for ev in events:
- content = ev.get("content", {})
- body = content.get("body", None)
- name = content.get("name", None)
- topic = content.get("topic", None)
- sender = ev.get("sender", None)
- if ev["type"] == "m.room.message" and body:
- rows.append((
- ev["event_id"], ev["room_id"], sender, "content.body", body
- ))
- if ev["type"] == "m.room.name" and name:
- rows.append((
- ev["event_id"], ev["room_id"], sender, "content.name", name
- ))
- if ev["type"] == "m.room.topic" and topic:
- rows.append((
- ev["event_id"], ev["room_id"], sender, "content.topic", topic
- ))
-
- if rows:
- logger.info(rows)
- cur.executemany(
- "INSERT INTO event_search (event_id, room_id, sender, key, value)"
- " VALUES (?,?,?,?,?)",
- rows
- )
+ cur.execute(sql, ("event_search", progress_json))
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 91a96d2a83..2e88c51ad0 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -15,7 +15,7 @@
from twisted.internet import defer
-from _base import SQLBaseStore
+from .background_updates import BackgroundUpdateStore
from synapse.api.errors import SynapseError
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
@@ -25,7 +25,106 @@ import logging
logger = logging.getLogger(__name__)
-class SearchStore(SQLBaseStore):
+class SearchStore(BackgroundUpdateStore):
+
+ EVENT_SEARCH_UPDATE_NAME = "event_search"
+
+ def __init__(self, hs):
+ super(SearchStore, self).__init__(hs)
+ self.register_background_update_handler(
+ self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search
+ )
+
+ @defer.inlineCallbacks
+ def _background_reindex_search(self, progress, batch_size):
+ target_min_stream_id = progress["target_min_stream_id_inclusive"]
+ max_stream_id = progress["max_stream_id_exclusive"]
+ rows_inserted = progress.get("rows_inserted", 0)
+
+ INSERT_CLUMP_SIZE = 1000
+ TYPES = ["m.room.name", "m.room.message", "m.room.topic"]
+
+ def reindex_search_txn(txn):
+ sql = (
+ "SELECT stream_ordering, event_id FROM events"
+ " WHERE ? <= stream_ordering AND stream_ordering < ?"
+ " AND (%s)"
+ " ORDER BY stream_ordering DESC"
+ " LIMIT ?"
+ ) % (" OR ".join("type = '%s'" % (t,) for t in TYPES),)
+
+ txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+ rows = txn.fetchall()
+ if not rows:
+ return 0
+
+ min_stream_id = rows[-1][0]
+ event_ids = [row[1] for row in rows]
+
+ events = self._get_events_txn(txn, event_ids)
+
+ event_search_rows = []
+ for event in events:
+ try:
+ event_id = event.event_id
+ room_id = event.room_id
+ content = event.content
+ if event.type == "m.room.message":
+ key = "content.body"
+ value = content["body"]
+ elif event.type == "m.room.topic":
+ key = "content.topic"
+ value = content["topic"]
+ elif event.type == "m.room.name":
+ key = "content.name"
+ value = content["name"]
+ except (KeyError, AttributeError):
+ # If the event is missing a necessary field then
+ # skip over it.
+ continue
+
+ event_search_rows.append((event_id, room_id, key, value))
+
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = (
+ "INSERT INTO event_search (event_id, room_id, key, vector)"
+ " VALUES (?,?,?,to_tsvector('english', ?))"
+ )
+ elif isinstance(self.database_engine, Sqlite3Engine):
+ sql = (
+ "INSERT INTO event_search (event_id, room_id, key, value)"
+ " VALUES (?,?,?,?)"
+ )
+ else:
+ # This should be unreachable.
+ raise Exception("Unrecognized database engine")
+
+ for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE):
+ clump = event_search_rows[index:index + INSERT_CLUMP_SIZE]
+ txn.executemany(sql, clump)
+
+ progress = {
+ "target_min_stream_id_inclusive": target_min_stream_id,
+ "max_stream_id_exclusive": min_stream_id,
+ "rows_inserted": rows_inserted + len(event_search_rows)
+ }
+
+ self._background_update_progress_txn(
+ txn, self.EVENT_SEARCH_UPDATE_NAME, progress
+ )
+
+ return len(event_search_rows)
+
+ result = yield self.runInteraction(
+ self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn
+ )
+
+ if not result:
+ yield self._end_background_update(self.EVENT_SEARCH_UPDATE_NAME)
+
+ defer.returnValue(result)
+
@defer.inlineCallbacks
def search_msgs(self, room_ids, search_term, keys):
"""Performs a full text search over events with given keys.
|