diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 48a0633746..a1bd9c4ce9 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -40,6 +40,7 @@ from .filtering import FilteringStore
from .end_to_end_keys import EndToEndKeyStore
from .receipts import ReceiptsStore
+from .search import SearchStore
import logging
@@ -69,6 +70,7 @@ class DataStore(RoomMemberStore, RoomStore,
EventsStore,
ReceiptsStore,
EndToEndKeyStore,
+ SearchStore,
):
def __init__(self, hs):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 693784ad38..218e708054 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -519,7 +519,7 @@ class SQLBaseStore(object):
allow_none=False,
desc="_simple_select_one_onecol"):
"""Executes a SELECT query on the named table, which is expected to
- return a single row, returning a single column from it."
+ return a single row, returning a single column from it.
Args:
table : string giving the table name
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 416ef6af93..e6c1abfc27 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -307,6 +307,8 @@ class EventsStore(SQLBaseStore):
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
self._store_room_topic_txn(txn, event)
+ elif event.type == EventTypes.Message:
+ self._store_room_message_txn(txn, event)
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 5e07b7e0e5..e4e830944a 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -175,6 +175,10 @@ class RoomStore(SQLBaseStore):
},
)
+ self._store_event_search_txn(
+ txn, event, "content.topic", event.content["topic"]
+ )
+
def _store_room_name_txn(self, txn, event):
if hasattr(event, "content") and "name" in event.content:
self._simple_insert_txn(
@@ -187,6 +191,24 @@ class RoomStore(SQLBaseStore):
}
)
+ self._store_event_search_txn(
+ txn, event, "content.name", event.content["name"]
+ )
+
+ def _store_room_message_txn(self, txn, event):
+ if hasattr(event, "content") and "body" in event.content:
+ self._store_event_search_txn(
+ txn, event, "content.body", event.content["body"]
+ )
+
+ def _store_event_search_txn(self, txn, event, key, value):
+ sql = (
+ "INSERT INTO event_search (event_id, room_id, key, vector)"
+ " VALUES (?,?,?,to_tsvector('english', ?))"
+ )
+
+ txn.execute(sql, (event.event_id, event.room_id, key, value,))
+
@cachedInlineCallbacks()
def get_room_name_and_aliases(self, room_id):
def f(txn):
diff --git a/synapse/storage/schema/delta/24/fts.py b/synapse/storage/schema/delta/24/fts.py
new file mode 100644
index 0000000000..b45a5fd820
--- /dev/null
+++ b/synapse/storage/schema/delta/24/fts.py
@@ -0,0 +1,117 @@
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.storage.prepare_database import get_statements
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+
+import ujson
+
+logger = logging.getLogger(__name__)
+
+
+POSTGRES_SQL = """
+CREATE TABLE event_search (
+ event_id TEXT,
+ room_id TEXT,
+ key TEXT,
+ vector tsvector
+);
+
+INSERT INTO event_search SELECT
+ event_id, room_id, 'content.body',
+ to_tsvector('english', json::json->'content'->>'body')
+ FROM events NATURAL JOIN event_json WHERE type = 'm.room.message';
+
+INSERT INTO event_search SELECT
+ event_id, room_id, 'content.name',
+ to_tsvector('english', json::json->'content'->>'name')
+ FROM events NATURAL JOIN event_json WHERE type = 'm.room.name';
+
+INSERT INTO event_search SELECT
+ event_id, room_id, 'content.topic',
+ to_tsvector('english', json::json->'content'->>'topic')
+ FROM events NATURAL JOIN event_json WHERE type = 'm.room.topic';
+
+
+CREATE INDEX event_search_fts_idx ON event_search USING gin(vector);
+CREATE INDEX event_search_ev_idx ON event_search(event_id);
+CREATE INDEX event_search_ev_ridx ON event_search(room_id);
+"""
+
+
+SQLITE_TABLE = (
+ "CREATE VIRTUAL TABLE event_search USING fts3 ( event_id, room_id, key, value)"
+)
+SQLITE_INDEX = "CREATE INDEX event_search_ev_idx ON event_search(event_id)"
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+ if isinstance(database_engine, PostgresEngine):
+ for statement in get_statements(POSTGRES_SQL.splitlines()):
+ cur.execute(statement)
+ return
+
+ if isinstance(database_engine, Sqlite3Engine):
+ cur.execute(SQLITE_TABLE)
+
+ rowid = -1
+ while True:
+ cur.execute(
+ "SELECT rowid, json FROM event_json"
+ " WHERE rowid > ?"
+ " ORDER BY rowid ASC LIMIT 100",
+ (rowid,)
+ )
+
+ res = cur.fetchall()
+
+ if not res:
+ break
+
+ events = [
+ ujson.loads(js)
+ for _, js in res
+ ]
+
+ rowid = max(rid for rid, _ in res)
+
+ rows = []
+ for ev in events:
+ if ev["type"] == "m.room.message":
+ rows.append((
+ ev["event_id"], ev["room_id"], "content.body",
+ ev["content"]["body"]
+ ))
+ if ev["type"] == "m.room.name":
+ rows.append((
+ ev["event_id"], ev["room_id"], "content.name",
+ ev["content"]["name"]
+ ))
+ if ev["type"] == "m.room.topic":
+ rows.append((
+ ev["event_id"], ev["room_id"], "content.topic",
+ ev["content"]["topic"]
+ ))
+
+ if rows:
+ logger.info(rows)
+ cur.executemany(
+ "INSERT INTO event_search (event_id, room_id, key, value)"
+ " VALUES (?,?,?,?)",
+ rows
+ )
+
+ # cur.execute(SQLITE_INDEX)
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
new file mode 100644
index 0000000000..5843f80876
--- /dev/null
+++ b/synapse/storage/search.py
@@ -0,0 +1,87 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from _base import SQLBaseStore
+from synapse.api.constants import KnownRoomEventKeys, SearchConstraintTypes
+from synapse.storage.engines import PostgresEngine
+
+
+class SearchStore(SQLBaseStore):
+ @defer.inlineCallbacks
+ def search_msgs(self, room_ids, constraints):
+ clauses = []
+ args = []
+ fts = None
+
+ clauses.append(
+ "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),)
+ )
+ args.extend(room_ids)
+
+ for c in constraints:
+ local_clauses = []
+ if c.search_type == SearchConstraintTypes.FTS:
+ fts = c.value
+ for key in c.keys:
+ local_clauses.append("key = ?")
+ args.append(key)
+ elif c.search_type == SearchConstraintTypes.EXACT:
+ for key in c.keys:
+ if key == KnownRoomEventKeys.ROOM_ID:
+ for value in c.value:
+ local_clauses.append("room_id = ?")
+ args.append(value)
+ clauses.append(
+ "(%s)" % (" OR ".join(local_clauses),)
+ )
+
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = (
+ "SELECT ts_rank_cd(vector, query) AS rank, event_id"
+ " FROM plainto_tsquery('english', ?) as query, event_search"
+ " WHERE vector @@ query"
+ )
+ else:
+ sql = (
+ "SELECT 0 as rank, event_id FROM event_search"
+ " WHERE value MATCH ?"
+ )
+
+ for clause in clauses:
+ sql += " AND " + clause
+
+ sql += " ORDER BY rank DESC"
+
+ results = yield self._execute(
+ "search_msgs", self.cursor_to_dict, sql, *([fts] + args)
+ )
+
+ events = yield self._get_events([r["event_id"] for r in results])
+
+ event_map = {
+ ev.event_id: ev
+ for ev in events
+ }
+
+ defer.returnValue((
+ {
+ r["event_id"]: r["rank"]
+ for r in results
+ if r["event_id"] in event_map
+ },
+ event_map
+ ))
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index e935b9443b..acfb322a53 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -54,7 +54,7 @@ class StateStore(SQLBaseStore):
defer.returnValue({})
event_to_groups = yield self._get_state_group_for_events(
- room_id, event_ids,
+ event_ids,
)
groups = set(event_to_groups.values())
@@ -208,13 +208,12 @@ class StateStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def get_state_for_events(self, room_id, event_ids, types):
+ def get_state_for_events(self, event_ids, types):
"""Given a list of event_ids and type tuples, return a list of state
dicts for each event. The state dicts will only have the type/state_keys
that are in the `types` list.
Args:
- room_id (str)
event_ids (list)
types (list): List of (type, state_key) tuples which are used to
filter the state fetched. `state_key` may be None, which matches
@@ -225,7 +224,7 @@ class StateStore(SQLBaseStore):
The dicts are mappings from (type, state_key) -> state_events
"""
event_to_groups = yield self._get_state_group_for_events(
- room_id, event_ids,
+ event_ids,
)
groups = set(event_to_groups.values())
@@ -251,8 +250,8 @@ class StateStore(SQLBaseStore):
)
@cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids",
- num_args=2)
- def _get_state_group_for_events(self, room_id, event_ids):
+ num_args=1)
+ def _get_state_group_for_events(self, event_ids):
"""Returns mapping event_id -> state_group
"""
def f(txn):
|