summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/api/constants.py19
-rw-r--r--synapse/handlers/__init__.py2
-rw-r--r--synapse/handlers/federation.py2
-rw-r--r--synapse/handlers/message.py10
-rw-r--r--synapse/handlers/search.py164
-rw-r--r--synapse/handlers/sync.py2
-rw-r--r--synapse/rest/client/v1/room.py17
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/_base.py2
-rw-r--r--synapse/storage/events.py2
-rw-r--r--synapse/storage/room.py22
-rw-r--r--synapse/storage/schema/delta/24/fts.py117
-rw-r--r--synapse/storage/search.py87
-rw-r--r--synapse/storage/state.py11
14 files changed, 445 insertions, 14 deletions
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 008ee64727..7c7f9ff957 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -84,3 +84,22 @@ class RoomCreationPreset(object):
     PRIVATE_CHAT = "private_chat"
     PUBLIC_CHAT = "public_chat"
     TRUSTED_PRIVATE_CHAT = "trusted_private_chat"
+
+
+class SearchConstraintTypes(object):
+    FTS = "fts"
+    EXACT = "exact"
+    PREFIX = "prefix"
+    SUBSTRING = "substring"
+    RANGE = "range"
+
+
+class KnownRoomEventKeys(object):
+    CONTENT_BODY = "content.body"
+    CONTENT_MSGTYPE = "content.msgtype"
+    CONTENT_NAME = "content.name"
+    CONTENT_TOPIC = "content.topic"
+
+    SENDER = "sender"
+    ORIGIN_SERVER_TS = "origin_server_ts"
+    ROOM_ID = "room_id"
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 8725c3c420..87b4d381c7 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -32,6 +32,7 @@ from .sync import SyncHandler
 from .auth import AuthHandler
 from .identity import IdentityHandler
 from .receipts import ReceiptsHandler
+from .search import SearchHandler
 
 
 class Handlers(object):
@@ -68,3 +69,4 @@ class Handlers(object):
         self.sync_handler = SyncHandler(hs)
         self.auth_handler = AuthHandler(hs)
         self.identity_handler = IdentityHandler(hs)
+        self.search_handler = SearchHandler(hs)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3882ba79ed..a710bdcfdb 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -242,7 +242,7 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     def _filter_events_for_server(self, server_name, room_id, events):
         event_to_state = yield self.store.get_state_for_events(
-            room_id, frozenset(e.event_id for e in events),
+            frozenset(e.event_id for e in events),
             types=(
                 (EventTypes.RoomHistoryVisibility, ""),
                 (EventTypes.Member, None),
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index b70258697b..dfeeae76db 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -164,7 +164,7 @@ class MessageHandler(BaseHandler):
     @defer.inlineCallbacks
     def _filter_events_for_client(self, user_id, room_id, events):
         event_id_to_state = yield self.store.get_state_for_events(
-            room_id, frozenset(e.event_id for e in events),
+            frozenset(e.event_id for e in events),
             types=(
                 (EventTypes.RoomHistoryVisibility, ""),
                 (EventTypes.Member, user_id),
@@ -290,7 +290,7 @@ class MessageHandler(BaseHandler):
         elif member_event.membership == Membership.LEAVE:
             key = (event_type, state_key)
             room_state = yield self.store.get_state_for_events(
-                room_id, [member_event.event_id], [key]
+                [member_event.event_id], [key]
             )
             data = room_state[member_event.event_id].get(key)
 
@@ -314,7 +314,7 @@ class MessageHandler(BaseHandler):
             room_state = yield self.state_handler.get_current_state(room_id)
         elif member_event.membership == Membership.LEAVE:
             room_state = yield self.store.get_state_for_events(
-                room_id, [member_event.event_id], None
+                [member_event.event_id], None
             )
             room_state = room_state[member_event.event_id]
 
@@ -406,7 +406,7 @@ class MessageHandler(BaseHandler):
                 elif event.membership == Membership.LEAVE:
                     room_end_token = "s%d" % (event.stream_ordering,)
                     deferred_room_state = self.store.get_state_for_events(
-                        event.room_id, [event.event_id], None
+                        [event.event_id], None
                     )
                     deferred_room_state.addCallback(
                         lambda states: states[event.event_id]
@@ -499,7 +499,7 @@ class MessageHandler(BaseHandler):
     def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
                                   member_event):
         room_state = yield self.store.get_state_for_events(
-            member_event.room_id, [member_event.event_id], None
+            [member_event.event_id], None
         )
 
         room_state = room_state[member_event.event_id]
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
new file mode 100644
index 0000000000..d5c395061c
--- /dev/null
+++ b/synapse/handlers/search.py
@@ -0,0 +1,164 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from ._base import BaseHandler
+
+from synapse.api.constants import (
+    EventTypes, KnownRoomEventKeys, Membership, SearchConstraintTypes
+)
+from synapse.api.errors import SynapseError
+from synapse.events.utils import serialize_event
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+KEYS_TO_ALLOWED_CONSTRAINT_TYPES = {
+    KnownRoomEventKeys.CONTENT_BODY: [SearchConstraintTypes.FTS],
+    KnownRoomEventKeys.CONTENT_MSGTYPE: [SearchConstraintTypes.EXACT],
+    KnownRoomEventKeys.CONTENT_NAME: [
+        SearchConstraintTypes.FTS,
+        SearchConstraintTypes.EXACT,
+        SearchConstraintTypes.SUBSTRING,
+    ],
+    KnownRoomEventKeys.CONTENT_TOPIC: [SearchConstraintTypes.FTS],
+    KnownRoomEventKeys.SENDER: [SearchConstraintTypes.EXACT],
+    KnownRoomEventKeys.ORIGIN_SERVER_TS: [SearchConstraintTypes.RANGE],
+    KnownRoomEventKeys.ROOM_ID: [SearchConstraintTypes.EXACT],
+}
+
+
+class RoomConstraint(object):
+    def __init__(self, search_type, keys, value):
+        self.search_type = search_type
+        self.keys = keys
+        self.value = value
+
+    @classmethod
+    def from_dict(cls, d):
+        search_type = d["type"]
+        keys = d["keys"]
+
+        for key in keys:
+            if key not in KEYS_TO_ALLOWED_CONSTRAINT_TYPES:
+                raise SynapseError(400, "Unrecognized key %r", key)
+
+            if search_type not in KEYS_TO_ALLOWED_CONSTRAINT_TYPES[key]:
+                raise SynapseError(
+                    400,
+                    "Disallowed constraint type %r for key %r", search_type, key
+                )
+
+        return cls(search_type, keys, d["value"])
+
+
+class SearchHandler(BaseHandler):
+
+    def __init__(self, hs):
+        super(SearchHandler, self).__init__(hs)
+
+    @defer.inlineCallbacks
+    def _filter_events_for_client(self, user_id, events):
+        event_id_to_state = yield self.store.get_state_for_events(
+            frozenset(e.event_id for e in events),
+            types=(
+                (EventTypes.RoomHistoryVisibility, ""),
+                (EventTypes.Member, user_id),
+            )
+        )
+
+        def allowed(event, state):
+            if event.type == EventTypes.RoomHistoryVisibility:
+                return True
+
+            membership_ev = state.get((EventTypes.Member, user_id), None)
+            if membership_ev:
+                membership = membership_ev.membership
+            else:
+                membership = Membership.LEAVE
+
+            if membership == Membership.JOIN:
+                return True
+
+            history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
+            if history:
+                visibility = history.content.get("history_visibility", "shared")
+            else:
+                visibility = "shared"
+
+            if visibility == "public":
+                return True
+            elif visibility == "shared":
+                return True
+            elif visibility == "joined":
+                return membership == Membership.JOIN
+            elif visibility == "invited":
+                return membership == Membership.INVITE
+
+            return True
+
+        defer.returnValue([
+            event
+            for event in events
+            if allowed(event, event_id_to_state[event.event_id])
+        ])
+
+    @defer.inlineCallbacks
+    def search(self, user, content):
+        constraint_dicts = content["search_categories"]["room_events"]["constraints"]
+        constraints = [RoomConstraint.from_dict(c)for c in constraint_dicts]
+
+        fts = False
+        for c in constraints:
+            if c.search_type == SearchConstraintTypes.FTS:
+                if fts:
+                    raise SynapseError(400, "Only one constraint can be FTS")
+                fts = True
+
+        rooms = yield self.store.get_rooms_for_user_where_membership_is(
+            user.to_string(), membership_list=[Membership.JOIN, Membership.LEAVE],
+        )
+        room_ids = set(r.room_id for r in rooms)
+
+        rank_map, event_map = yield self.store.search_msgs(room_ids, constraints)
+
+        allowed_events = yield self._filter_events_for_client(
+            user.to_string(), event_map.values()
+        )
+
+        time_now = self.clock.time_msec()
+
+        results = {
+            e.event_id: {
+                "rank": rank_map[e.event_id],
+                "result": serialize_event(e, time_now)
+            }
+            for e in allowed_events
+        }
+
+        logger.info("returning: %r", results)
+
+        defer.returnValue({
+            "search_categories": {
+                "room_events": {
+                    "results": results,
+                    "count": len(results)
+                }
+            }
+        })
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9914ff6f9c..a8940de166 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -312,7 +312,7 @@ class SyncHandler(BaseHandler):
     @defer.inlineCallbacks
     def _filter_events_for_client(self, user_id, room_id, events):
         event_id_to_state = yield self.store.get_state_for_events(
-            room_id, frozenset(e.event_id for e in events),
+            frozenset(e.event_id for e in events),
             types=(
                 (EventTypes.RoomHistoryVisibility, ""),
                 (EventTypes.Member, user_id),
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 23871f161e..94adabca62 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -529,6 +529,22 @@ class RoomTypingRestServlet(ClientV1RestServlet):
         defer.returnValue((200, {}))
 
 
+class SearchRestServlet(ClientV1RestServlet):
+    PATTERN = client_path_pattern(
+        "/search$"
+    )
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        auth_user, _ = yield self.auth.get_user_by_req(request)
+
+        content = _parse_json(request)
+
+        results = yield self.handlers.search_handler.search(auth_user, content)
+
+        defer.returnValue((200, results))
+
+
 def _parse_json(request):
     try:
         content = json.loads(request.content.read())
@@ -585,3 +601,4 @@ def register_servlets(hs, http_server):
     RoomInitialSyncRestServlet(hs).register(http_server)
     RoomRedactEventRestServlet(hs).register(http_server)
     RoomTypingRestServlet(hs).register(http_server)
+    SearchRestServlet(hs).register(http_server)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 48a0633746..a1bd9c4ce9 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -40,6 +40,7 @@ from .filtering import FilteringStore
 from .end_to_end_keys import EndToEndKeyStore
 
 from .receipts import ReceiptsStore
+from .search import SearchStore
 
 
 import logging
@@ -69,6 +70,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 EventsStore,
                 ReceiptsStore,
                 EndToEndKeyStore,
+                SearchStore,
                 ):
 
     def __init__(self, hs):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 693784ad38..218e708054 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -519,7 +519,7 @@ class SQLBaseStore(object):
                                   allow_none=False,
                                   desc="_simple_select_one_onecol"):
         """Executes a SELECT query on the named table, which is expected to
-        return a single row, returning a single column from it."
+        return a single row, returning a single column from it.
 
         Args:
             table : string giving the table name
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 416ef6af93..e6c1abfc27 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -307,6 +307,8 @@ class EventsStore(SQLBaseStore):
                 self._store_room_name_txn(txn, event)
             elif event.type == EventTypes.Topic:
                 self._store_room_topic_txn(txn, event)
+            elif event.type == EventTypes.Message:
+                self._store_room_message_txn(txn, event)
             elif event.type == EventTypes.Redaction:
                 self._store_redaction(txn, event)
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 5e07b7e0e5..e4e830944a 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -175,6 +175,10 @@ class RoomStore(SQLBaseStore):
                 },
             )
 
+            self._store_event_search_txn(
+                txn, event, "content.topic", event.content["topic"]
+            )
+
     def _store_room_name_txn(self, txn, event):
         if hasattr(event, "content") and "name" in event.content:
             self._simple_insert_txn(
@@ -187,6 +191,24 @@ class RoomStore(SQLBaseStore):
                 }
             )
 
+            self._store_event_search_txn(
+                txn, event, "content.name", event.content["name"]
+            )
+
+    def _store_room_message_txn(self, txn, event):
+        if hasattr(event, "content") and "body" in event.content:
+            self._store_event_search_txn(
+                txn, event, "content.body", event.content["body"]
+            )
+
+    def _store_event_search_txn(self, txn, event, key, value):
+        sql = (
+            "INSERT INTO event_search (event_id, room_id, key, vector)"
+            " VALUES (?,?,?,to_tsvector('english', ?))"
+        )
+
+        txn.execute(sql, (event.event_id, event.room_id, key, value,))
+
     @cachedInlineCallbacks()
     def get_room_name_and_aliases(self, room_id):
         def f(txn):
diff --git a/synapse/storage/schema/delta/24/fts.py b/synapse/storage/schema/delta/24/fts.py
new file mode 100644
index 0000000000..b45a5fd820
--- /dev/null
+++ b/synapse/storage/schema/delta/24/fts.py
@@ -0,0 +1,117 @@
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.storage.prepare_database import get_statements
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+
+import ujson
+
+logger = logging.getLogger(__name__)
+
+
+POSTGRES_SQL = """
+CREATE TABLE event_search (
+    event_id TEXT,
+    room_id TEXT,
+    key TEXT,
+    vector tsvector
+);
+
+INSERT INTO event_search SELECT
+    event_id, room_id, 'content.body',
+    to_tsvector('english', json::json->'content'->>'body')
+    FROM events NATURAL JOIN event_json WHERE type = 'm.room.message';
+
+INSERT INTO event_search SELECT
+    event_id, room_id, 'content.name',
+    to_tsvector('english', json::json->'content'->>'name')
+    FROM events NATURAL JOIN event_json WHERE type = 'm.room.name';
+
+INSERT INTO event_search SELECT
+    event_id, room_id, 'content.topic',
+    to_tsvector('english', json::json->'content'->>'topic')
+    FROM events NATURAL JOIN event_json WHERE type = 'm.room.topic';
+
+
+CREATE INDEX event_search_fts_idx ON event_search USING gin(vector);
+CREATE INDEX event_search_ev_idx ON event_search(event_id);
+CREATE INDEX event_search_ev_ridx ON event_search(room_id);
+"""
+
+
+SQLITE_TABLE = (
+    "CREATE VIRTUAL TABLE event_search USING fts3 ( event_id, room_id, key, value)"
+)
+SQLITE_INDEX = "CREATE INDEX event_search_ev_idx ON event_search(event_id)"
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    if isinstance(database_engine, PostgresEngine):
+        for statement in get_statements(POSTGRES_SQL.splitlines()):
+            cur.execute(statement)
+        return
+
+    if isinstance(database_engine, Sqlite3Engine):
+        cur.execute(SQLITE_TABLE)
+
+        rowid = -1
+        while True:
+            cur.execute(
+                "SELECT rowid, json FROM event_json"
+                " WHERE rowid > ?"
+                " ORDER BY rowid ASC LIMIT 100",
+                (rowid,)
+            )
+
+            res = cur.fetchall()
+
+            if not res:
+                break
+
+            events = [
+                ujson.loads(js)
+                for _, js in res
+            ]
+
+            rowid = max(rid for rid, _ in res)
+
+            rows = []
+            for ev in events:
+                if ev["type"] == "m.room.message":
+                    rows.append((
+                        ev["event_id"], ev["room_id"], "content.body",
+                        ev["content"]["body"]
+                    ))
+                if ev["type"] == "m.room.name":
+                    rows.append((
+                        ev["event_id"], ev["room_id"], "content.name",
+                        ev["content"]["name"]
+                    ))
+                if ev["type"] == "m.room.topic":
+                    rows.append((
+                        ev["event_id"], ev["room_id"], "content.topic",
+                        ev["content"]["topic"]
+                    ))
+
+            if rows:
+                logger.info(rows)
+                cur.executemany(
+                    "INSERT INTO event_search (event_id, room_id, key, value)"
+                    " VALUES (?,?,?,?)",
+                    rows
+                )
+
+        # cur.execute(SQLITE_INDEX)
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
new file mode 100644
index 0000000000..5843f80876
--- /dev/null
+++ b/synapse/storage/search.py
@@ -0,0 +1,87 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from _base import SQLBaseStore
+from synapse.api.constants import KnownRoomEventKeys, SearchConstraintTypes
+from synapse.storage.engines import PostgresEngine
+
+
+class SearchStore(SQLBaseStore):
+    @defer.inlineCallbacks
+    def search_msgs(self, room_ids, constraints):
+        clauses = []
+        args = []
+        fts = None
+
+        clauses.append(
+            "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),)
+        )
+        args.extend(room_ids)
+
+        for c in constraints:
+            local_clauses = []
+            if c.search_type == SearchConstraintTypes.FTS:
+                fts = c.value
+                for key in c.keys:
+                    local_clauses.append("key = ?")
+                    args.append(key)
+            elif c.search_type == SearchConstraintTypes.EXACT:
+                for key in c.keys:
+                    if key == KnownRoomEventKeys.ROOM_ID:
+                        for value in c.value:
+                            local_clauses.append("room_id = ?")
+                            args.append(value)
+            clauses.append(
+                "(%s)" % (" OR ".join(local_clauses),)
+            )
+
+        if isinstance(self.database_engine, PostgresEngine):
+            sql = (
+                "SELECT ts_rank_cd(vector, query) AS rank, event_id"
+                " FROM plainto_tsquery('english', ?) as query, event_search"
+                " WHERE vector @@ query"
+            )
+        else:
+            sql = (
+                "SELECT 0 as rank, event_id FROM event_search"
+                " WHERE value MATCH ?"
+            )
+
+        for clause in clauses:
+            sql += " AND " + clause
+
+        sql += " ORDER BY rank DESC"
+
+        results = yield self._execute(
+            "search_msgs", self.cursor_to_dict, sql, *([fts] + args)
+        )
+
+        events = yield self._get_events([r["event_id"] for r in results])
+
+        event_map = {
+            ev.event_id: ev
+            for ev in events
+        }
+
+        defer.returnValue((
+            {
+                r["event_id"]: r["rank"]
+                for r in results
+                if r["event_id"] in event_map
+            },
+            event_map
+        ))
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index e935b9443b..acfb322a53 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -54,7 +54,7 @@ class StateStore(SQLBaseStore):
             defer.returnValue({})
 
         event_to_groups = yield self._get_state_group_for_events(
-            room_id, event_ids,
+            event_ids,
         )
 
         groups = set(event_to_groups.values())
@@ -208,13 +208,12 @@ class StateStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def get_state_for_events(self, room_id, event_ids, types):
+    def get_state_for_events(self, event_ids, types):
         """Given a list of event_ids and type tuples, return a list of state
         dicts for each event. The state dicts will only have the type/state_keys
         that are in the `types` list.
 
         Args:
-            room_id (str)
             event_ids (list)
             types (list): List of (type, state_key) tuples which are used to
                 filter the state fetched. `state_key` may be None, which matches
@@ -225,7 +224,7 @@ class StateStore(SQLBaseStore):
             The dicts are mappings from (type, state_key) -> state_events
         """
         event_to_groups = yield self._get_state_group_for_events(
-            room_id, event_ids,
+            event_ids,
         )
 
         groups = set(event_to_groups.values())
@@ -251,8 +250,8 @@ class StateStore(SQLBaseStore):
         )
 
     @cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids",
-                num_args=2)
-    def _get_state_group_for_events(self, room_id, event_ids):
+                num_args=1)
+    def _get_state_group_for_events(self, event_ids):
         """Returns mapping event_id -> state_group
         """
         def f(txn):