summary refs log tree commit diff
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2015-11-11 13:59:40 +0000
committerMark Haines <mark.haines@matrix.org>2015-11-11 13:59:40 +0000
commit940a16119205115c02a3b23e9f3c67a08486bae0 (patch)
tree34d4eebc066d00b3fc0bfce76d35feb537129ed3
parentUse a background task to update databases to use the full text search (diff)
downloadsynapse-940a16119205115c02a3b23e9f3c67a08486bae0.tar.xz
Fix the background update
-rw-r--r--synapse/storage/background_updates.py13
-rw-r--r--synapse/storage/schema/delta/25/fts.py7
-rw-r--r--synapse/storage/search.py16
3 files changed, 19 insertions, 17 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index b6cdc6ec68..45fccc2e5e 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -93,16 +93,19 @@ class BackgroundUpdateStore(SQLBaseStore):
 
             sleep = defer.Deferred()
             self._background_update_timer = self._clock.call_later(
-                self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback
+                self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
             )
             try:
                 yield sleep
             finally:
                 self._background_update_timer = None
 
-            result = yield self.do_background_update(
-                self.BACKGROUND_UPDATE_DURATION_MS
-            )
+            try:
+                result = yield self.do_background_update(
+                    self.BACKGROUND_UPDATE_DURATION_MS
+                )
+            except:
+                logger.exception("Error doing update")
 
             if result is None:
                 logger.info(
@@ -169,7 +172,7 @@ class BackgroundUpdateStore(SQLBaseStore):
         duration_ms = time_stop - time_start
 
         logger.info(
-            "Updating %. Updated %r items in %rms."
+            "Updating %r. Updated %r items in %rms."
             " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r)",
             update_name, items_updated, duration_ms,
             performance.total_items_per_ms(),
diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
index e408d36da0..ce92f59025 100644
--- a/synapse/storage/schema/delta/25/fts.py
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -47,11 +47,10 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
     if isinstance(database_engine, PostgresEngine):
         for statement in get_statements(POSTGRES_TABLE.splitlines()):
             cur.execute(statement)
-        return
-
-    if isinstance(database_engine, Sqlite3Engine):
+    elif isinstance(database_engine, Sqlite3Engine):
         cur.execute(SQLITE_TABLE)
-        return
+    else:
+        raise Exception("Unrecognized database engine")
 
     cur.execute("SELECT MIN(stream_ordering) FROM events")
     rows = cur.fetchall()
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 3b19b118eb..3c0d671129 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -46,18 +46,18 @@ class SearchStore(BackgroundUpdateStore):
 
         def reindex_search_txn(txn):
             sql = (
-                "SELECT stream_id, event_id FROM events"
+                "SELECT stream_ordering, event_id FROM events"
                 " WHERE ? <= stream_ordering AND stream_ordering < ?"
                 " AND (%s)"
                 " ORDER BY stream_ordering DESC"
                 " LIMIT ?"
-            ) % (" OR ".join("type = '%s'" % TYPES),)
+            ) % (" OR ".join("type = '%s'" % (t,) for t in TYPES),)
 
-            txn.execute(sql, target_min_stream_id, max_stream_id, batch_size)
+            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
 
-            rows = txn.fetch_all()
+            rows = txn.fetchall()
             if not rows:
-                return None
+                return 0
 
             min_stream_id = rows[-1][0]
             event_ids = [row[1] for row in rows]
@@ -102,7 +102,7 @@ class SearchStore(BackgroundUpdateStore):
 
             for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE):
                 clump = event_search_rows[index:index + INSERT_CLUMP_SIZE]
-                txn.execute_many(sql, clump)
+                txn.executemany(sql, clump)
 
             progress = {
                 "target_min_stream_id_inclusive": target_min_stream_id,
@@ -116,11 +116,11 @@ class SearchStore(BackgroundUpdateStore):
 
             return len(event_search_rows)
 
-        result = yield self.runInteration(
+        result = yield self.runInteraction(
             self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn
         )
 
-        if result is None:
+        if not result:
             yield self._end_background_update(self.EVENT_SEARCH_UPDATE_NAME)
 
         defer.returnValue(result)