diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 48a0633746..a1bd9c4ce9 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -40,6 +40,7 @@ from .filtering import FilteringStore
from .end_to_end_keys import EndToEndKeyStore
from .receipts import ReceiptsStore
+from .search import SearchStore
import logging
@@ -69,6 +70,7 @@ class DataStore(RoomMemberStore, RoomStore,
EventsStore,
ReceiptsStore,
EndToEndKeyStore,
+ SearchStore,
):
def __init__(self, hs):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 693784ad38..218e708054 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -519,7 +519,7 @@ class SQLBaseStore(object):
allow_none=False,
desc="_simple_select_one_onecol"):
"""Executes a SELECT query on the named table, which is expected to
- return a single row, returning a single column from it."
+ return a single row, returning a single column from it.
Args:
table : string giving the table name
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index bad3b5c5ac..a5a54ec011 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -17,6 +17,8 @@ from synapse.storage.prepare_database import (
prepare_database, prepare_sqlite3_database
)
+import struct
+
class Sqlite3Engine(object):
single_threaded = True
@@ -32,6 +34,7 @@ class Sqlite3Engine(object):
def on_new_connection(self, db_conn):
self.prepare_database(db_conn)
+ db_conn.create_function("rank", 1, _rank)
def prepare_database(self, db_conn):
prepare_sqlite3_database(db_conn)
@@ -45,3 +48,27 @@ class Sqlite3Engine(object):
def lock_table(self, txn, table):
return
+
+
+# Following functions taken from: https://github.com/coleifer/peewee
+
+def _parse_match_info(buf):
+ bufsize = len(buf)
+ return [struct.unpack('@I', buf[i:i+4])[0] for i in range(0, bufsize, 4)]
+
+
+def _rank(raw_match_info):
+ """Handle match_info called w/default args 'pcx' - based on the example rank
+ function http://sqlite.org/fts3.html#appendix_a
+ """
+ match_info = _parse_match_info(raw_match_info)
+ score = 0.0
+ p, c = match_info[:2]
+ for phrase_num in range(p):
+ phrase_info_idx = 2 + (phrase_num * c * 3)
+ for col_num in range(c):
+ col_idx = phrase_info_idx + (col_num * 3)
+ x1, x2 = match_info[col_idx:col_idx + 2]
+ if x1 > 0:
+ score += float(x1) / x2
+ return score
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 416ef6af93..e6c1abfc27 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -307,6 +307,8 @@ class EventsStore(SQLBaseStore):
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
self._store_room_topic_txn(txn, event)
+ elif event.type == EventTypes.Message:
+ self._store_room_message_txn(txn, event)
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 1ddf55be4d..1a74d6e360 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 24
+SCHEMA_VERSION = 25
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 5e07b7e0e5..13441fcdce 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -19,6 +19,7 @@ from synapse.api.errors import StoreError
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
+from .engines import PostgresEngine, Sqlite3Engine
import collections
import logging
@@ -175,6 +176,10 @@ class RoomStore(SQLBaseStore):
},
)
+ self._store_event_search_txn(
+ txn, event, "content.topic", event.content["topic"]
+ )
+
def _store_room_name_txn(self, txn, event):
if hasattr(event, "content") and "name" in event.content:
self._simple_insert_txn(
@@ -187,6 +192,33 @@ class RoomStore(SQLBaseStore):
}
)
+ self._store_event_search_txn(
+ txn, event, "content.name", event.content["name"]
+ )
+
+ def _store_room_message_txn(self, txn, event):
+ if hasattr(event, "content") and "body" in event.content:
+ self._store_event_search_txn(
+ txn, event, "content.body", event.content["body"]
+ )
+
+ def _store_event_search_txn(self, txn, event, key, value):
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = (
+ "INSERT INTO event_search (event_id, room_id, key, vector)"
+ " VALUES (?,?,?,to_tsvector('english', ?))"
+ )
+ elif isinstance(self.database_engine, Sqlite3Engine):
+ sql = (
+ "INSERT INTO event_search (event_id, room_id, key, value)"
+ " VALUES (?,?,?,?)"
+ )
+ else:
+ # This should be unreachable.
+ raise Exception("Unrecognized database engine")
+
+ txn.execute(sql, (event.event_id, event.room_id, key, value,))
+
@cachedInlineCallbacks()
def get_room_name_and_aliases(self, room_id):
def f(txn):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index dd98dcfda8..ae1ad56d9a 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -124,6 +124,19 @@ class RoomMemberStore(SQLBaseStore):
invites.event_id for invite in invites
]))
+ def get_leave_and_ban_events_for_user(self, user_id):
+ """ Get all the leave events for a user
+ Args:
+ user_id (str): The user ID.
+ Returns:
+ A deferred list of event objects.
+ """
+ return self.get_rooms_for_user_where_membership_is(
+ user_id, (Membership.LEAVE, Membership.BAN)
+ ).addCallback(lambda leaves: self._get_events([
+ leave.event_id for leave in leaves
+ ]))
+
def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
""" Get all the rooms for this user where the membership for this user
matches one in the membership list.
diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
new file mode 100644
index 0000000000..b7cd0ce3b8
--- /dev/null
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -0,0 +1,127 @@
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.storage.prepare_database import get_statements
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+
+import ujson
+
+logger = logging.getLogger(__name__)
+
+
+POSTGRES_SQL = """
+CREATE TABLE IF NOT EXISTS event_search (
+ event_id TEXT,
+ room_id TEXT,
+ sender TEXT,
+ key TEXT,
+ vector tsvector
+);
+
+INSERT INTO event_search SELECT
+ event_id, room_id, json::json->>'sender', 'content.body',
+ to_tsvector('english', json::json->'content'->>'body')
+ FROM events NATURAL JOIN event_json WHERE type = 'm.room.message';
+
+INSERT INTO event_search SELECT
+ event_id, room_id, json::json->>'sender', 'content.name',
+ to_tsvector('english', json::json->'content'->>'name')
+ FROM events NATURAL JOIN event_json WHERE type = 'm.room.name';
+
+INSERT INTO event_search SELECT
+ event_id, room_id, json::json->>'sender', 'content.topic',
+ to_tsvector('english', json::json->'content'->>'topic')
+ FROM events NATURAL JOIN event_json WHERE type = 'm.room.topic';
+
+
+CREATE INDEX event_search_fts_idx ON event_search USING gin(vector);
+CREATE INDEX event_search_ev_idx ON event_search(event_id);
+CREATE INDEX event_search_ev_ridx ON event_search(room_id);
+"""
+
+
+SQLITE_TABLE = (
+ "CREATE VIRTUAL TABLE IF NOT EXISTS event_search"
+ " USING fts4 ( event_id, room_id, sender, key, value )"
+)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+ if isinstance(database_engine, PostgresEngine):
+ run_postgres_upgrade(cur)
+ return
+
+ if isinstance(database_engine, Sqlite3Engine):
+ run_sqlite_upgrade(cur)
+ return
+
+
+def run_postgres_upgrade(cur):
+ for statement in get_statements(POSTGRES_SQL.splitlines()):
+ cur.execute(statement)
+
+
+def run_sqlite_upgrade(cur):
+ cur.execute(SQLITE_TABLE)
+
+ rowid = -1
+ while True:
+ cur.execute(
+ "SELECT rowid, json FROM event_json"
+ " WHERE rowid > ?"
+ " ORDER BY rowid ASC LIMIT 100",
+ (rowid,)
+ )
+
+ res = cur.fetchall()
+
+ if not res:
+ break
+
+ events = [
+ ujson.loads(js)
+ for _, js in res
+ ]
+
+ rowid = max(rid for rid, _ in res)
+
+ rows = []
+ for ev in events:
+ content = ev.get("content", {})
+ body = content.get("body", None)
+ name = content.get("name", None)
+ topic = content.get("topic", None)
+ sender = ev.get("sender", None)
+ if ev["type"] == "m.room.message" and body:
+ rows.append((
+ ev["event_id"], ev["room_id"], sender, "content.body", body
+ ))
+ if ev["type"] == "m.room.name" and name:
+ rows.append((
+ ev["event_id"], ev["room_id"], sender, "content.name", name
+ ))
+ if ev["type"] == "m.room.topic" and topic:
+ rows.append((
+ ev["event_id"], ev["room_id"], sender, "content.topic", topic
+ ))
+
+ if rows:
+ logger.info(rows)
+ cur.executemany(
+ "INSERT INTO event_search (event_id, room_id, sender, key, value)"
+ " VALUES (?,?,?,?,?)",
+ rows
+ )
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
new file mode 100644
index 0000000000..cdf003502f
--- /dev/null
+++ b/synapse/storage/search.py
@@ -0,0 +1,111 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from _base import SQLBaseStore
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+
+from collections import namedtuple
+
+"""The result of a search.
+
+Fields:
+ rank_map (dict): Mapping event_id -> rank
+ event_map (dict): Mapping event_id -> event
+ pagination_token (str): Pagination token
+"""
+SearchResult = namedtuple("SearchResult", ("rank_map", "event_map", "pagination_token"))
+
+
+class SearchStore(SQLBaseStore):
+ @defer.inlineCallbacks
+ def search_msgs(self, room_ids, search_term, keys):
+ """Performs a full text search over events with given keys.
+
+ Args:
+ room_ids (list): List of room ids to search in
+ search_term (str): Search term to search for
+ keys (list): List of keys to search in, currently supports
+ "content.body", "content.name", "content.topic"
+
+ Returns:
+ SearchResult
+ """
+ clauses = []
+ args = []
+
+ # Make sure we don't explode because the person is in too many rooms.
+ # We filter the results below regardless.
+ if len(room_ids) < 500:
+ clauses.append(
+ "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),)
+ )
+ args.extend(room_ids)
+
+ local_clauses = []
+ for key in keys:
+ local_clauses.append("key = ?")
+ args.append(key)
+
+ clauses.append(
+ "(%s)" % (" OR ".join(local_clauses),)
+ )
+
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = (
+ "SELECT ts_rank_cd(vector, query) AS rank, room_id, event_id"
+ " FROM plainto_tsquery('english', ?) as query, event_search"
+ " WHERE vector @@ query"
+ )
+ elif isinstance(self.database_engine, Sqlite3Engine):
+ sql = (
+ "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id"
+ " FROM event_search"
+ " WHERE value MATCH ?"
+ )
+ else:
+ # This should be unreachable.
+ raise Exception("Unrecognized database engine")
+
+ for clause in clauses:
+ sql += " AND " + clause
+
+ # We add an arbitrary limit here to ensure we don't try to pull the
+ # entire table from the database.
+ sql += " ORDER BY rank DESC LIMIT 500"
+
+ results = yield self._execute(
+ "search_msgs", self.cursor_to_dict, sql, *([search_term] + args)
+ )
+
+ results = filter(lambda row: row["room_id"] in room_ids, results)
+
+ events = yield self._get_events([r["event_id"] for r in results])
+
+ event_map = {
+ ev.event_id: ev
+ for ev in events
+ }
+
+ defer.returnValue(SearchResult(
+ {
+ r["event_id"]: r["rank"]
+ for r in results
+ if r["event_id"] in event_map
+ },
+ event_map,
+ None
+ ))
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 6f2a50d585..acfb322a53 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -214,7 +214,6 @@ class StateStore(SQLBaseStore):
that are in the `types` list.
Args:
- room_id (str)
event_ids (list)
types (list): List of (type, state_key) tuples which are used to
filter the state fetched. `state_key` may be None, which matches
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 3cab06fdef..15d4c2bf68 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -23,7 +23,7 @@ paginate bacwards.
This is implemented by keeping two ordering columns: stream_ordering and
topological_ordering. Stream ordering is basically insertion/received order
-(except for events from backfill requests). The topolgical_ordering is a
+(except for events from backfill requests). The topological_ordering is a
weak ordering of events based on the pdu graph.
This means that we have to have two different types of tokens, depending on
@@ -436,3 +436,138 @@ class StreamStore(SQLBaseStore):
internal = event.internal_metadata
internal.before = str(RoomStreamToken(topo, stream - 1))
internal.after = str(RoomStreamToken(topo, stream))
+
+ @defer.inlineCallbacks
+ def get_events_around(self, room_id, event_id, before_limit, after_limit):
+ """Retrieve events and pagination tokens around a given event in a
+ room.
+
+ Args:
+ room_id (str)
+ event_id (str)
+ before_limit (int)
+ after_limit (int)
+
+ Returns:
+ dict
+ """
+
+ results = yield self.runInteraction(
+ "get_events_around", self._get_events_around_txn,
+ room_id, event_id, before_limit, after_limit
+ )
+
+ events_before = yield self._get_events(
+ [e for e in results["before"]["event_ids"]],
+ get_prev_content=True
+ )
+
+ events_after = yield self._get_events(
+ [e for e in results["after"]["event_ids"]],
+ get_prev_content=True
+ )
+
+ defer.returnValue({
+ "events_before": events_before,
+ "events_after": events_after,
+ "start": results["before"]["token"],
+ "end": results["after"]["token"],
+ })
+
+ def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit):
+ """Retrieves event_ids and pagination tokens around a given event in a
+ room.
+
+ Args:
+ room_id (str)
+ event_id (str)
+ before_limit (int)
+ after_limit (int)
+
+ Returns:
+ dict
+ """
+
+ results = self._simple_select_one_txn(
+ txn,
+ "events",
+ keyvalues={
+ "event_id": event_id,
+ "room_id": room_id,
+ },
+ retcols=["stream_ordering", "topological_ordering"],
+ )
+
+ stream_ordering = results["stream_ordering"]
+ topological_ordering = results["topological_ordering"]
+
+ query_before = (
+ "SELECT topological_ordering, stream_ordering, event_id FROM events"
+ " WHERE room_id = ? AND (topological_ordering < ?"
+ " OR (topological_ordering = ? AND stream_ordering < ?))"
+ " ORDER BY topological_ordering DESC, stream_ordering DESC"
+ " LIMIT ?"
+ )
+
+ query_after = (
+ "SELECT topological_ordering, stream_ordering, event_id FROM events"
+ " WHERE room_id = ? AND (topological_ordering > ?"
+ " OR (topological_ordering = ? AND stream_ordering > ?))"
+ " ORDER BY topological_ordering ASC, stream_ordering ASC"
+ " LIMIT ?"
+ )
+
+ txn.execute(
+ query_before,
+ (
+ room_id, topological_ordering, topological_ordering,
+ stream_ordering, before_limit,
+ )
+ )
+
+ rows = self.cursor_to_dict(txn)
+ events_before = [r["event_id"] for r in rows]
+
+ if rows:
+ start_token = str(RoomStreamToken(
+ rows[0]["topological_ordering"],
+ rows[0]["stream_ordering"] - 1,
+ ))
+ else:
+ start_token = str(RoomStreamToken(
+ topological_ordering,
+ stream_ordering - 1,
+ ))
+
+ txn.execute(
+ query_after,
+ (
+ room_id, topological_ordering, topological_ordering,
+ stream_ordering, after_limit,
+ )
+ )
+
+ rows = self.cursor_to_dict(txn)
+ events_after = [r["event_id"] for r in rows]
+
+ if rows:
+ end_token = str(RoomStreamToken(
+ rows[-1]["topological_ordering"],
+ rows[-1]["stream_ordering"],
+ ))
+ else:
+ end_token = str(RoomStreamToken(
+ topological_ordering,
+ stream_ordering,
+ ))
+
+ return {
+ "before": {
+ "event_ids": events_before,
+ "token": start_token,
+ },
+ "after": {
+ "event_ids": events_after,
+ "token": end_token,
+ },
+ }
|