diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index b6cdc6ec68..45fccc2e5e 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -93,16 +93,19 @@ class BackgroundUpdateStore(SQLBaseStore):
sleep = defer.Deferred()
self._background_update_timer = self._clock.call_later(
- self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback
+ self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
)
try:
yield sleep
finally:
self._background_update_timer = None
- result = yield self.do_background_update(
- self.BACKGROUND_UPDATE_DURATION_MS
- )
+ try:
+ result = yield self.do_background_update(
+ self.BACKGROUND_UPDATE_DURATION_MS
+ )
+ except:
+ logger.exception("Error doing update")
if result is None:
logger.info(
@@ -169,7 +172,7 @@ class BackgroundUpdateStore(SQLBaseStore):
duration_ms = time_stop - time_start
logger.info(
- "Updating %. Updated %r items in %rms."
+ "Updating %r. Updated %r items in %rms."
" (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r)",
update_name, items_updated, duration_ms,
performance.total_items_per_ms(),
diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
index e408d36da0..ce92f59025 100644
--- a/synapse/storage/schema/delta/25/fts.py
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -47,11 +47,10 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
if isinstance(database_engine, PostgresEngine):
for statement in get_statements(POSTGRES_TABLE.splitlines()):
cur.execute(statement)
- return
-
- if isinstance(database_engine, Sqlite3Engine):
+ elif isinstance(database_engine, Sqlite3Engine):
cur.execute(SQLITE_TABLE)
- return
+ else:
+ raise Exception("Unrecognized database engine")
cur.execute("SELECT MIN(stream_ordering) FROM events")
rows = cur.fetchall()
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 3b19b118eb..3c0d671129 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -46,18 +46,18 @@ class SearchStore(BackgroundUpdateStore):
def reindex_search_txn(txn):
sql = (
- "SELECT stream_id, event_id FROM events"
+ "SELECT stream_ordering, event_id FROM events"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)"
" ORDER BY stream_ordering DESC"
" LIMIT ?"
- ) % (" OR ".join("type = '%s'" % TYPES),)
+ ) % (" OR ".join("type = '%s'" % (t,) for t in TYPES),)
- txn.execute(sql, target_min_stream_id, max_stream_id, batch_size)
+ txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
- rows = txn.fetch_all()
+ rows = txn.fetchall()
if not rows:
- return None
+ return 0
min_stream_id = rows[-1][0]
event_ids = [row[1] for row in rows]
@@ -102,7 +102,7 @@ class SearchStore(BackgroundUpdateStore):
for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE):
clump = event_search_rows[index:index + INSERT_CLUMP_SIZE]
- txn.execute_many(sql, clump)
+ txn.executemany(sql, clump)
progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
@@ -116,11 +116,11 @@ class SearchStore(BackgroundUpdateStore):
return len(event_search_rows)
- result = yield self.runInteration(
+ result = yield self.runInteraction(
self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn
)
- if result is None:
+ if not result:
yield self._end_background_update(self.EVENT_SEARCH_UPDATE_NAME)
defer.returnValue(result)
|