diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/handlers/search.py | 201 |
1 files changed, 183 insertions, 18 deletions
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 2718e9482e..b7545c111f 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -22,6 +22,8 @@ from synapse.api.filtering import Filter from synapse.api.errors import SynapseError from synapse.events.utils import serialize_event +from unpaddedbase64 import decode_base64, encode_base64 + import logging @@ -34,27 +36,59 @@ class SearchHandler(BaseHandler): super(SearchHandler, self).__init__(hs) @defer.inlineCallbacks - def search(self, user, content): + def search(self, user, content, batch=None): """Performs a full text search for a user. Args: user (UserID) content (dict): Search parameters + batch (str): The next_batch parameter. Used for pagination. Returns: dict to be returned to the client with results of search """ + batch_group = None + batch_group_key = None + batch_token = None + if batch: + try: + b = decode_base64(batch) + batch_group, batch_group_key, batch_token = b.split("\n") + + assert batch_group is not None + assert batch_group_key is not None + assert batch_token is not None + except: + raise SynapseError(400, "Invalid batch") + try: - search_term = content["search_categories"]["room_events"]["search_term"] - keys = content["search_categories"]["room_events"].get("keys", [ + room_cat = content["search_categories"]["room_events"] + + # The actual thing to query in FTS + search_term = room_cat["search_term"] + + # Which "keys" to search over in FTS query + keys = room_cat.get("keys", [ "content.body", "content.name", "content.topic", ]) - filter_dict = content["search_categories"]["room_events"].get("filter", {}) - event_context = content["search_categories"]["room_events"].get( + + # Filter to apply to results + filter_dict = room_cat.get("filter", {}) + + # What to order results by (impacts whether pagination can be doen) + order_by = room_cat.get("order_by", "rank") + + # Include context around each event? + event_context = room_cat.get( "event_context", None ) + # Group results together? May allow clients to paginate within a + # group + group_by = room_cat.get("groupings", {}).get("group_by", {}) + group_keys = [g["key"] for g in group_by] + if event_context is not None: before_limit = int(event_context.get( "before_limit", 5 @@ -65,6 +99,15 @@ class SearchHandler(BaseHandler): except KeyError: raise SynapseError(400, "Invalid search query") + if order_by not in ("rank", "recent"): + raise SynapseError(400, "Invalid order by: %r" % (order_by,)) + + if set(group_keys) - {"room_id", "sender"}: + raise SynapseError( + 400, + "Invalid group by keys: %r" % (set(group_keys) - {"room_id", "sender"},) + ) + search_filter = Filter(filter_dict) # TODO: Search through left rooms too @@ -77,19 +120,130 @@ class SearchHandler(BaseHandler): room_ids = search_filter.filter_rooms(room_ids) - rank_map, event_map, _ = yield self.store.search_msgs( - room_ids, search_term, keys - ) + if batch_group == "room_id": + room_ids.intersection_update({batch_group_key}) - filtered_events = search_filter.filter(event_map.values()) + rank_map = {} # event_id -> rank of event + allowed_events = [] + room_groups = {} # Holds result of grouping by room, if applicable + sender_group = {} # Holds result of grouping by sender, if applicable - allowed_events = yield self._filter_events_for_client( - user.to_string(), filtered_events - ) + # Holds the next_batch for the entire result set if one of those exists + global_next_batch = None - allowed_events.sort(key=lambda e: -rank_map[e.event_id]) - allowed_events = allowed_events[:search_filter.limit()] + if order_by == "rank": + results = yield self.store.search_msgs( + room_ids, search_term, keys + ) + + results_map = {r["event"].event_id: r for r in results} + + rank_map.update({r["event"].event_id: r["rank"] for r in results}) + + filtered_events = search_filter.filter([r["event"] for r in results]) + + events = yield self._filter_events_for_client( + user.to_string(), filtered_events + ) + + events.sort(key=lambda e: -rank_map[e.event_id]) + allowed_events = events[:search_filter.limit()] + + for e in allowed_events: + rm = room_groups.setdefault(e.room_id, { + "results": [], + "order": rank_map[e.event_id], + }) + rm["results"].append(e.event_id) + + s = sender_group.setdefault(e.sender, { + "results": [], + "order": rank_map[e.event_id], + }) + s["results"].append(e.event_id) + + elif order_by == "recent": + # In this case we specifically loop through each room as the given + # limit applies to each room, rather than a global list. + # This is not necessarilly a good idea. + for room_id in room_ids: + room_events = [] + if batch_group == "room_id" and batch_group_key == room_id: + pagination_token = batch_token + else: + pagination_token = None + i = 0 + + # We keep looping and we keep filtering until we reach the limit + # or we run out of things. + # But only go around 5 times since otherwise synapse will be sad. + while len(room_events) < search_filter.limit() and i < 5: + i += 1 + results = yield self.store.search_room( + room_id, search_term, keys, search_filter.limit() * 2, + pagination_token=pagination_token, + ) + + results_map = {r["event"].event_id: r for r in results} + + rank_map.update({r["event"].event_id: r["rank"] for r in results}) + + filtered_events = search_filter.filter([ + r["event"] for r in results + ]) + + events = yield self._filter_events_for_client( + user.to_string(), filtered_events + ) + + room_events.extend(events) + room_events = room_events[:search_filter.limit()] + + if len(results) < search_filter.limit() * 2: + pagination_token = None + break + else: + pagination_token = results[-1]["pagination_token"] + + if room_events: + res = results_map[room_events[-1].event_id] + pagination_token = res["pagination_token"] + + group = room_groups.setdefault(room_id, {}) + if pagination_token: + next_batch = encode_base64("%s\n%s\n%s" % ( + "room_id", room_id, pagination_token + )) + group["next_batch"] = next_batch + + if batch_token: + global_next_batch = next_batch + + group["results"] = [e.event_id for e in room_events] + group["order"] = max( + e.origin_server_ts/1000 for e in room_events + if hasattr(e, "origin_server_ts") + ) + + allowed_events.extend(room_events) + + # Normalize the group orders + if room_groups: + if len(room_groups) > 1: + mx = max(g["order"] for g in room_groups.values()) + mn = min(g["order"] for g in room_groups.values()) + + for g in room_groups.values(): + g["order"] = (g["order"] - mn) * 1.0 / (mx - mn) + else: + room_groups.values()[0]["order"] = 1 + else: + # We should never get here due to the guard earlier. + raise NotImplementedError() + + # If client has asked for "context" for each event (i.e. some surrounding + # events and state), fetch that if event_context is not None: now_token = yield self.hs.get_event_sources().get_current_token() @@ -144,11 +298,22 @@ class SearchHandler(BaseHandler): logger.info("Found %d results", len(results)) + rooms_cat_res = { + "results": results, + "count": len(results) + } + + if room_groups and "room_id" in group_keys: + rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups + + if sender_group and "sender" in group_keys: + rooms_cat_res.setdefault("groups", {})["sender"] = sender_group + + if global_next_batch: + rooms_cat_res["next_batch"] = global_next_batch + defer.returnValue({ "search_categories": { - "room_events": { - "results": results, - "count": len(results) - } + "room_events": rooms_cat_res } }) |