From 05e01f21d7012c1853ff566c8a76aa66087bfbd7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 3 Jun 2016 17:12:48 +0100 Subject: Remove event fetching from DB threads --- synapse/storage/search.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) (limited to 'synapse/storage/search.py') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 0224299625..12941d1775 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -21,6 +21,7 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging import re +import ujson as json logger = logging.getLogger(__name__) @@ -52,7 +53,7 @@ class SearchStore(BackgroundUpdateStore): def reindex_search_txn(txn): sql = ( - "SELECT stream_ordering, event_id FROM events" + "SELECT stream_ordering, event_id, room_id, type, content FROM events" " WHERE ? <= stream_ordering AND stream_ordering < ?" " AND (%s)" " ORDER BY stream_ordering DESC" @@ -61,28 +62,30 @@ class SearchStore(BackgroundUpdateStore): txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) - rows = txn.fetchall() + rows = self.cursor_to_dict(txn) if not rows: return 0 - min_stream_id = rows[-1][0] - event_ids = [row[1] for row in rows] - - events = self._get_events_txn(txn, event_ids) + min_stream_id = rows[-1]["stream_ordering"] event_search_rows = [] - for event in events: + for row in rows: try: - event_id = event.event_id - room_id = event.room_id - content = event.content - if event.type == "m.room.message": + event_id = row["event_id"] + room_id = row["room_id"] + etype = row["type"] + try: + content = json.loads(row["content"]) + except: + continue + + if etype == "m.room.message": key = "content.body" value = content["body"] - elif event.type == "m.room.topic": + elif etype == "m.room.topic": key = "content.topic" value = content["topic"] - elif event.type == "m.room.name": + elif etype == "m.room.name": key = "content.name" value = content["name"] except (KeyError, AttributeError): -- cgit 1.4.1