summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-11-30 17:45:31 +0000
committerErik Johnston <erik@matrix.org>2015-11-30 17:45:31 +0000
commit4dcaa42b6d1fde87e29eb4f3c0080ea92fcc7fa2 (patch)
tree0f63de9a91447d13548456120a4f6c9e68fc523e /synapse
parentReturn words to highlight in search results (diff)
downloadsynapse-4dcaa42b6d1fde87e29eb4f3c0080ea92fcc7fa2.tar.xz
Allow paginating search ordered by recents
Diffstat (limited to '')
-rw-r--r--synapse/handlers/search.py128
-rw-r--r--synapse/storage/events.py77
-rw-r--r--synapse/storage/schema/delta/26/ts.py57
-rw-r--r--synapse/storage/search.py41
4 files changed, 219 insertions, 84 deletions
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 6d2197339e..671dbb61b8 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -131,6 +131,17 @@ class SearchHandler(BaseHandler):
         if batch_group == "room_id":
             room_ids.intersection_update({batch_group_key})
 
+        if not room_ids:
+            defer.returnValue({
+                "search_categories": {
+                    "room_events": {
+                        "results": {},
+                        "count": 0,
+                        "highlights": [],
+                    }
+                }
+            })
+
         rank_map = {}  # event_id -> rank of event
         allowed_events = []
         room_groups = {}  # Holds result of grouping by room, if applicable
@@ -178,85 +189,66 @@ class SearchHandler(BaseHandler):
                 s["results"].append(e.event_id)
 
         elif order_by == "recent":
-            # In this case we specifically loop through each room as the given
-            # limit applies to each room, rather than a global list.
-            # This is not necessarilly a good idea.
-            for room_id in room_ids:
-                room_events = []
-                if batch_group == "room_id" and batch_group_key == room_id:
-                    pagination_token = batch_token
-                else:
-                    pagination_token = None
-                i = 0
-
-                # We keep looping and we keep filtering until we reach the limit
-                # or we run out of things.
-                # But only go around 5 times since otherwise synapse will be sad.
-                while len(room_events) < search_filter.limit() and i < 5:
-                    i += 1
-                    search_result = yield self.store.search_room(
-                        room_id, search_term, keys, search_filter.limit() * 2,
-                        pagination_token=pagination_token,
-                    )
+            room_events = []
+            i = 0
+
+            pagination_token = batch_token
+
+            # We keep looping and we keep filtering until we reach the limit
+            # or we run out of things.
+            # But only go around 5 times since otherwise synapse will be sad.
+            while len(room_events) < search_filter.limit() and i < 5:
+                i += 1
+                search_result = yield self.store.search_rooms(
+                    room_ids, search_term, keys, search_filter.limit() * 2,
+                    pagination_token=pagination_token,
+                )
 
-                    if search_result["highlights"]:
-                        highlights.update(search_result["highlights"])
+                if search_result["highlights"]:
+                    highlights.update(search_result["highlights"])
 
-                    results = search_result["results"]
+                results = search_result["results"]
 
-                    results_map = {r["event"].event_id: r for r in results}
+                results_map = {r["event"].event_id: r for r in results}
 
-                    rank_map.update({r["event"].event_id: r["rank"] for r in results})
+                rank_map.update({r["event"].event_id: r["rank"] for r in results})
 
-                    filtered_events = search_filter.filter([
-                        r["event"] for r in results
-                    ])
+                filtered_events = search_filter.filter([
+                    r["event"] for r in results
+                ])
 
-                    events = yield self._filter_events_for_client(
-                        user.to_string(), filtered_events
-                    )
+                events = yield self._filter_events_for_client(
+                    user.to_string(), filtered_events
+                )
 
-                    room_events.extend(events)
-                    room_events = room_events[:search_filter.limit()]
+                room_events.extend(events)
+                room_events = room_events[:search_filter.limit()]
 
-                    if len(results) < search_filter.limit() * 2:
-                        pagination_token = None
-                        break
-                    else:
-                        pagination_token = results[-1]["pagination_token"]
-
-                if room_events:
-                    res = results_map[room_events[-1].event_id]
-                    pagination_token = res["pagination_token"]
-
-                    group = room_groups.setdefault(room_id, {})
-                    if pagination_token:
-                        next_batch = encode_base64("%s\n%s\n%s" % (
-                            "room_id", room_id, pagination_token
-                        ))
-                        group["next_batch"] = next_batch
-
-                        if batch_token:
-                            global_next_batch = next_batch
-
-                    group["results"] = [e.event_id for e in room_events]
-                    group["order"] = max(
-                        e.origin_server_ts/1000 for e in room_events
-                        if hasattr(e, "origin_server_ts")
-                    )
+                if len(results) < search_filter.limit() * 2:
+                    pagination_token = None
+                else:
+                    pagination_token = results[-1]["pagination_token"]
 
-                allowed_events.extend(room_events)
+            if room_events:
+                for event in room_events:
+                    group = room_groups.setdefault(event.room_id, {
+                        "results": [],
+                    })
+                    group["results"].append(event.event_id)
 
-            # Normalize the group orders
-            if room_groups:
-                if len(room_groups) > 1:
-                    mx = max(g["order"] for g in room_groups.values())
-                    mn = min(g["order"] for g in room_groups.values())
+            pagination_token = results_map[room_events[-1].event_id]["pagination_token"]
 
-                    for g in room_groups.values():
-                        g["order"] = (g["order"] - mn) * 1.0 / (mx - mn)
-                else:
-                    room_groups.values()[0]["order"] = 1
+            if pagination_token:
+                global_next_batch = encode_base64("%s\n%s\n%s" % (
+                    "all", "", pagination_token
+                ))
+
+                for room_id, group in room_groups.items():
+                    group["next_batch"] = encode_base64("%s\n%s\n%s" % (
+                        "room_id", room_id, pagination_token
+                    ))
+
+            allowed_events.extend(room_events)
 
         else:
             # We should never get here due to the guard earlier.
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5d35ca90b9..7088f2709b 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -51,6 +51,14 @@ EVENT_QUEUE_TIMEOUT_S = 0.1  # Timeout when waiting for requests for events
 
 
 class EventsStore(SQLBaseStore):
+    EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
+
+    def __init__(self, hs):
+        super(EventsStore, self).__init__(hs)
+        self.register_background_update_handler(
+            self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
+        )
+
     @defer.inlineCallbacks
     def persist_events(self, events_and_contexts, backfilled=False,
                        is_new_state=True):
@@ -365,6 +373,7 @@ class EventsStore(SQLBaseStore):
                     "processed": True,
                     "outlier": event.internal_metadata.is_outlier(),
                     "content": encode_json(event.content).decode("UTF-8"),
+                    "origin_server_ts": int(event.origin_server_ts),
                 }
                 for event, _ in events_and_contexts
             ],
@@ -964,3 +973,71 @@ class EventsStore(SQLBaseStore):
 
         ret = yield self.runInteraction("count_messages", _count_messages)
         defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def _background_reindex_origin_server_ts(self, progress, batch_size):
+        target_min_stream_id = progress["target_min_stream_id_inclusive"]
+        max_stream_id = progress["max_stream_id_exclusive"]
+        rows_inserted = progress.get("rows_inserted", 0)
+
+        INSERT_CLUMP_SIZE = 1000
+
+        def reindex_search_txn(txn):
+            sql = (
+                "SELECT stream_ordering, event_id FROM events"
+                " WHERE ? <= stream_ordering AND stream_ordering < ?"
+                " ORDER BY stream_ordering DESC"
+                " LIMIT ?"
+            )
+
+            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+            rows = txn.fetchall()
+            if not rows:
+                return 0
+
+            min_stream_id = rows[-1][0]
+            event_ids = [row[1] for row in rows]
+
+            events = self._get_events_txn(txn, event_ids)
+
+            rows = []
+            for event in events:
+                try:
+                    event_id = event.event_id
+                    origin_server_ts = event.origin_server_ts
+                except (KeyError, AttributeError):
+                    # If the event is missing a necessary field then
+                    # skip over it.
+                    continue
+
+                rows.append((origin_server_ts, event_id))
+
+            sql = (
+                "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
+            )
+
+            for index in range(0, len(rows), INSERT_CLUMP_SIZE):
+                clump = rows[index:index + INSERT_CLUMP_SIZE]
+                txn.executemany(sql, clump)
+
+            progress = {
+                "target_min_stream_id_inclusive": target_min_stream_id,
+                "max_stream_id_exclusive": min_stream_id,
+                "rows_inserted": rows_inserted + len(rows)
+            }
+
+            self._background_update_progress_txn(
+                txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
+            )
+
+            return len(rows)
+
+        result = yield self.runInteraction(
+            self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
+        )
+
+        if not result:
+            yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
+
+        defer.returnValue(result)
diff --git a/synapse/storage/schema/delta/26/ts.py b/synapse/storage/schema/delta/26/ts.py
new file mode 100644
index 0000000000..8d4a981975
--- /dev/null
+++ b/synapse/storage/schema/delta/26/ts.py
@@ -0,0 +1,57 @@
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.storage.prepare_database import get_statements
+
+import ujson
+
+logger = logging.getLogger(__name__)
+
+
+ALTER_TABLE = (
+    "ALTER TABLE events ADD COLUMN origin_server_ts BIGINT;"
+    "CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);"
+)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    for statement in get_statements(ALTER_TABLE.splitlines()):
+        cur.execute(statement)
+
+    cur.execute("SELECT MIN(stream_ordering) FROM events")
+    rows = cur.fetchall()
+    min_stream_id = rows[0][0]
+
+    cur.execute("SELECT MAX(stream_ordering) FROM events")
+    rows = cur.fetchall()
+    max_stream_id = rows[0][0]
+
+    if min_stream_id is not None and max_stream_id is not None:
+        progress = {
+            "target_min_stream_id_inclusive": min_stream_id,
+            "max_stream_id_exclusive": max_stream_id + 1,
+            "rows_inserted": 0,
+        }
+        progress_json = ujson.dumps(progress)
+
+        sql = (
+            "INSERT into background_updates (update_name, progress_json)"
+            " VALUES (?, ?)"
+        )
+
+        sql = database_engine.convert_param_style(sql)
+
+        cur.execute(sql, ("event_origin_server_ts", progress_json))
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index c6386642df..20a62d07ff 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -212,11 +212,11 @@ class SearchStore(BackgroundUpdateStore):
         })
 
     @defer.inlineCallbacks
-    def search_room(self, room_id, search_term, keys, limit, pagination_token=None):
+    def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None):
         """Performs a full text search over events with given keys.
 
         Args:
-            room_id (str): The room_id to search in
+            room_id (list): The room_ids to search in
             search_term (str): Search term to search for
             keys (list): List of keys to search in, currently supports
                 "content.body", "content.name", "content.topic"
@@ -226,7 +226,15 @@ class SearchStore(BackgroundUpdateStore):
             list of dicts
         """
         clauses = []
-        args = [search_term, room_id]
+        args = [search_term]
+
+        # Make sure we don't explode because the person is in too many rooms.
+        # We filter the results below regardless.
+        if len(room_ids) < 500:
+            clauses.append(
+                "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),)
+            )
+            args.extend(room_ids)
 
         local_clauses = []
         for key in keys:
@@ -239,25 +247,25 @@ class SearchStore(BackgroundUpdateStore):
 
         if pagination_token:
             try:
-                topo, stream = pagination_token.split(",")
-                topo = int(topo)
+                origin_server_ts, stream = pagination_token.split(",")
+                origin_server_ts = int(origin_server_ts)
                 stream = int(stream)
             except:
                 raise SynapseError(400, "Invalid pagination token")
 
             clauses.append(
-                "(topological_ordering < ?"
-                " OR (topological_ordering = ? AND stream_ordering < ?))"
+                "(origin_server_ts < ?"
+                " OR (origin_server_ts = ? AND stream_ordering < ?))"
             )
-            args.extend([topo, topo, stream])
+            args.extend([origin_server_ts, origin_server_ts, stream])
 
         if isinstance(self.database_engine, PostgresEngine):
             sql = (
                 "SELECT ts_rank_cd(vector, query) as rank,"
-                " topological_ordering, stream_ordering, room_id, event_id"
+                " origin_server_ts, stream_ordering, room_id, event_id"
                 " FROM plainto_tsquery('english', ?) as query, event_search"
                 " NATURAL JOIN events"
-                " WHERE vector @@ query AND room_id = ?"
+                " WHERE vector @@ query AND "
             )
         elif isinstance(self.database_engine, Sqlite3Engine):
             # We use CROSS JOIN here to ensure we use the right indexes.
@@ -270,24 +278,23 @@ class SearchStore(BackgroundUpdateStore):
             # MATCH unless it uses the full text search index
             sql = (
                 "SELECT rank(matchinfo) as rank, room_id, event_id,"
-                " topological_ordering, stream_ordering"
+                " origin_server_ts, stream_ordering"
                 " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo"
                 " FROM event_search"
                 " WHERE value MATCH ?"
                 " )"
                 " CROSS JOIN events USING (event_id)"
-                " WHERE room_id = ?"
+                " WHERE "
             )
         else:
             # This should be unreachable.
             raise Exception("Unrecognized database engine")
 
-        for clause in clauses:
-            sql += " AND " + clause
+        sql += " AND ".join(clauses)
 
         # We add an arbitrary limit here to ensure we don't try to pull the
         # entire table from the database.
-        sql += " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
+        sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?"
 
         args.append(limit)
 
@@ -295,6 +302,8 @@ class SearchStore(BackgroundUpdateStore):
             "search_rooms", self.cursor_to_dict, sql, *args
         )
 
+        results = filter(lambda row: row["room_id"] in room_ids, results)
+
         events = yield self._get_events([r["event_id"] for r in results])
 
         event_map = {
@@ -312,7 +321,7 @@ class SearchStore(BackgroundUpdateStore):
                     "event": event_map[r["event_id"]],
                     "rank": r["rank"],
                     "pagination_token": "%s,%s" % (
-                        r["topological_ordering"], r["stream_ordering"]
+                        r["origin_server_ts"], r["stream_ordering"]
                     ),
                 }
                 for r in results