From ba26eb3d5d487edb90c21db7efec631b80adf24b Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Tue, 17 Nov 2015 17:17:30 -0500 Subject: Allow users to forget rooms --- synapse/storage/prepare_database.py | 2 +- synapse/storage/roommember.py | 36 ++++++++++++++++++++++ .../schema/delta/26/forgotten_memberships.sql | 24 +++++++++++++++ 3 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/26/forgotten_memberships.sql (limited to 'synapse/storage') diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 1a74d6e360..9800fd4203 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 25 +SCHEMA_VERSION = 26 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index ae1ad56d9a..183855ba40 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -269,3 +269,39 @@ class RoomMemberStore(SQLBaseStore): ret = len(room_id_lists.pop(0).intersection(*room_id_lists)) > 0 defer.returnValue(ret) + + def forget(self, user_id, room_id): + def f(txn): + sql = ( + "UPDATE" + " room_memberships" + " SET" + " forgotten = 1" + " WHERE" + " user_id = ?" + " AND" + " room_id = ?" + ) + txn.execute(sql, (user_id, room_id)) + self.runInteraction("forget_membership", f) + + @defer.inlineCallbacks + def did_forget(self, user_id, room_id): + def f(txn): + sql = ( + "SELECT" + " COUNT(*)" + "FROM" + " room_memberships" + " WHERE" + " user_id = ?" + " AND" + " room_id = ?" + " AND" + " forgotten = 1" + ) + txn.execute(sql, (user_id, room_id)) + rows = txn.fetchall() + return rows[0][0] + count = yield self.runInteraction("did_forget_membership", f) + defer.returnValue(count > 0) diff --git a/synapse/storage/schema/delta/26/forgotten_memberships.sql b/synapse/storage/schema/delta/26/forgotten_memberships.sql new file mode 100644 index 0000000000..df55b9c6f6 --- /dev/null +++ b/synapse/storage/schema/delta/26/forgotten_memberships.sql @@ -0,0 +1,24 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Keeps track of what rooms users have left and don't want to be able to + * access again. + * + * If all users on this server have left a room, we can delete the room + * entirely. + */ + + ALTER TABLE room_memberships ADD COLUMN forgotten INTEGER(1) DEFAULT 0; -- cgit 1.5.1 From bed7889703371dca893893d33f67e59e99cda111 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Wed, 18 Nov 2015 18:08:22 -0500 Subject: Apply forgetting properly to historical events --- synapse/handlers/_base.py | 10 +++++++++- synapse/storage/roommember.py | 34 ++++++++++++++++++++++++++++++++-- 2 files changed, 41 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 6519f183df..95bb06395a 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -92,7 +92,15 @@ class BaseHandler(object): membership_event = state.get((EventTypes.Member, user_id), None) if membership_event: - membership = membership_event.membership + was_forgotten_at_event = yield self.store.was_forgotten_at( + membership_event.user_id, + membership_event.room_id, + membership_event.event_id + ) + if was_forgotten_at_event: + membership = None + else: + membership = membership_event.membership else: membership = None diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 183855ba40..5e92cdc811 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -271,6 +271,7 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(ret) def forget(self, user_id, room_id): + """Indicate that user_id wishes to discard history for room_id.""" def f(txn): sql = ( "UPDATE" @@ -287,6 +288,9 @@ class RoomMemberStore(SQLBaseStore): @defer.inlineCallbacks def did_forget(self, user_id, room_id): + """Returns whether user_id has elected to discard history for room_id. + + Returns False if they have since re-joined.""" def f(txn): sql = ( "SELECT" @@ -298,10 +302,36 @@ class RoomMemberStore(SQLBaseStore): " AND" " room_id = ?" " AND" - " forgotten = 1" + " forgotten = 0" ) txn.execute(sql, (user_id, room_id)) rows = txn.fetchall() return rows[0][0] count = yield self.runInteraction("did_forget_membership", f) - defer.returnValue(count > 0) + defer.returnValue(count == 0) + + @defer.inlineCallbacks + def was_forgotten_at(self, user_id, room_id, event_id): + """Returns whether user_id has elected to discard history for room_id at event_id. + + event_id must be a membership event.""" + def f(txn): + sql = ( + "SELECT" + " COUNT(*)" + "FROM" + " room_memberships" + " WHERE" + " user_id = ?" + " AND" + " room_id = ?" + " AND" + " event_id = ?" + " AND" + " forgotten = 1" + ) + txn.execute(sql, (user_id, room_id, event_id)) + rows = txn.fetchall() + return rows[0][0] + count = yield self.runInteraction("did_forget_membership_at", f) + defer.returnValue(count == 1) -- cgit 1.5.1 From 9da4c5340da7e5a8e03a3bd7e028a1c862ce9616 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 19 Nov 2015 10:07:21 -0500 Subject: Simplify code --- synapse/handlers/_base.py | 2 +- synapse/storage/roommember.py | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 95bb06395a..5fd20285d2 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -93,7 +93,7 @@ class BaseHandler(object): membership_event = state.get((EventTypes.Member, user_id), None) if membership_event: was_forgotten_at_event = yield self.store.was_forgotten_at( - membership_event.user_id, + membership_event.state_key, membership_event.room_id, membership_event.event_id ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 5e92cdc811..c3e11b91da 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -295,7 +295,7 @@ class RoomMemberStore(SQLBaseStore): sql = ( "SELECT" " COUNT(*)" - "FROM" + " FROM" " room_memberships" " WHERE" " user_id = ?" @@ -318,8 +318,8 @@ class RoomMemberStore(SQLBaseStore): def f(txn): sql = ( "SELECT" - " COUNT(*)" - "FROM" + " forgotten" + " FROM" " room_memberships" " WHERE" " user_id = ?" @@ -327,11 +327,9 @@ class RoomMemberStore(SQLBaseStore): " room_id = ?" " AND" " event_id = ?" - " AND" - " forgotten = 1" ) txn.execute(sql, (user_id, room_id, event_id)) rows = txn.fetchall() return rows[0][0] - count = yield self.runInteraction("did_forget_membership_at", f) - defer.returnValue(count == 1) + forgot = yield self.runInteraction("did_forget_membership_at", f) + defer.returnValue(forgot == 1) -- cgit 1.5.1 From df6824a00843b8eb944b1fe119f2ce84ea3525d9 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 19 Nov 2015 14:54:47 -0500 Subject: Ignore forgotten rooms in v2 sync --- synapse/storage/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index c3e11b91da..d32ce1ab1e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -160,7 +160,7 @@ class RoomMemberStore(SQLBaseStore): def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id, membership_list): - where_clause = "user_id = ? AND (%s)" % ( + where_clause = "user_id = ? AND (%s) AND NOT forgotten" % ( " OR ".join(["membership = ?" for _ in membership_list]), ) -- cgit 1.5.1 From 7dfa4555087049ed7e379d25880b5ffa660df8cb Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Mon, 23 Nov 2015 18:35:25 +0000 Subject: Remove size specifier for database column Postgres doesn't support them like this. We don't have a bool type in common between postgres and sqlite. --- synapse/storage/schema/delta/26/forgotten_memberships.sql | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/26/forgotten_memberships.sql b/synapse/storage/schema/delta/26/forgotten_memberships.sql index df55b9c6f6..beeb8a288b 100644 --- a/synapse/storage/schema/delta/26/forgotten_memberships.sql +++ b/synapse/storage/schema/delta/26/forgotten_memberships.sql @@ -19,6 +19,8 @@ * * If all users on this server have left a room, we can delete the room * entirely. + * + * This column should always contain either 0 or 1. */ - ALTER TABLE room_memberships ADD COLUMN forgotten INTEGER(1) DEFAULT 0; + ALTER TABLE room_memberships ADD COLUMN forgotten INTEGER DEFAULT 0; -- cgit 1.5.1 From 3e573a5c6b1206ceb6f2fc4a48e7a122ce468a89 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Mon, 23 Nov 2015 18:48:53 +0000 Subject: Fix SQL for postgres --- synapse/storage/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index d32ce1ab1e..c83c043f9b 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -160,7 +160,7 @@ class RoomMemberStore(SQLBaseStore): def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id, membership_list): - where_clause = "user_id = ? AND (%s) AND NOT forgotten" % ( + where_clause = "user_id = ? AND (%s) AND forgotten == 0" % ( " OR ".join(["membership = ?" for _ in membership_list]), ) -- cgit 1.5.1 From df7cf6c0ebee59b2fe17a450de01519ee3093195 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Mon, 23 Nov 2015 18:54:41 +0000 Subject: Fix SQL for postgres again --- synapse/storage/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index c83c043f9b..69398b7c8e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -160,7 +160,7 @@ class RoomMemberStore(SQLBaseStore): def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id, membership_list): - where_clause = "user_id = ? AND (%s) AND forgotten == 0" % ( + where_clause = "user_id = ? AND (%s) AND forgotten = 0" % ( " OR ".join(["membership = ?" for _ in membership_list]), ) -- cgit 1.5.1 From 76936f43ae0f88d4523fe07b7a9ccf8ddb5563ac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Nov 2015 16:40:42 +0000 Subject: Return words to highlight in search results --- synapse/handlers/search.py | 19 +++++-- synapse/storage/search.py | 123 ++++++++++++++++++++++++++++++++++++++------- 2 files changed, 120 insertions(+), 22 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 50688e51a8..6d2197339e 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -139,11 +139,18 @@ class SearchHandler(BaseHandler): # Holds the next_batch for the entire result set if one of those exists global_next_batch = None + highlights = set() + if order_by == "rank": - results = yield self.store.search_msgs( + search_result = yield self.store.search_msgs( room_ids, search_term, keys ) + if search_result["highlights"]: + highlights.update(search_result["highlights"]) + + results = search_result["results"] + results_map = {r["event"].event_id: r for r in results} rank_map.update({r["event"].event_id: r["rank"] for r in results}) @@ -187,11 +194,16 @@ class SearchHandler(BaseHandler): # But only go around 5 times since otherwise synapse will be sad. while len(room_events) < search_filter.limit() and i < 5: i += 1 - results = yield self.store.search_room( + search_result = yield self.store.search_room( room_id, search_term, keys, search_filter.limit() * 2, pagination_token=pagination_token, ) + if search_result["highlights"]: + highlights.update(search_result["highlights"]) + + results = search_result["results"] + results_map = {r["event"].event_id: r for r in results} rank_map.update({r["event"].event_id: r["rank"] for r in results}) @@ -347,7 +359,8 @@ class SearchHandler(BaseHandler): rooms_cat_res = { "results": results, - "count": len(results) + "count": len(results), + "highlights": list(highlights), } if state_results: diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 380270b009..c6386642df 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -20,6 +20,7 @@ from synapse.api.errors import SynapseError from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging +import re logger = logging.getLogger(__name__) @@ -194,14 +195,21 @@ class SearchStore(BackgroundUpdateStore): for ev in events } - defer.returnValue([ - { - "event": event_map[r["event_id"]], - "rank": r["rank"], - } - for r in results - if r["event_id"] in event_map - ]) + highlights = None + if isinstance(self.database_engine, PostgresEngine): + highlights = yield self._find_highlights_in_postgres(search_term, events) + + defer.returnValue({ + "results": [ + { + "event": event_map[r["event_id"]], + "rank": r["rank"], + } + for r in results + if r["event_id"] in event_map + ], + "highlights": highlights, + }) @defer.inlineCallbacks def search_room(self, room_id, search_term, keys, limit, pagination_token=None): @@ -294,14 +302,91 @@ class SearchStore(BackgroundUpdateStore): for ev in events } - defer.returnValue([ - { - "event": event_map[r["event_id"]], - "rank": r["rank"], - "pagination_token": "%s,%s" % ( - r["topological_ordering"], r["stream_ordering"] - ), - } - for r in results - if r["event_id"] in event_map - ]) + highlights = None + if isinstance(self.database_engine, PostgresEngine): + highlights = yield self._find_highlights_in_postgres(search_term, events) + + defer.returnValue({ + "results": [ + { + "event": event_map[r["event_id"]], + "rank": r["rank"], + "pagination_token": "%s,%s" % ( + r["topological_ordering"], r["stream_ordering"] + ), + } + for r in results + if r["event_id"] in event_map + ], + "highlights": highlights, + }) + + def _find_highlights_in_postgres(self, search_term, events): + """Given a list of events and a search term, return a list of words + that match from the content of the event. + + This is used to give a list of words that clients can match against to + highlight the matching parts. + + Args: + search_term (str) + events (list): A list of events + + Returns: + deferred : A set of strings. + """ + def f(txn): + highlight_words = set() + for event in events: + # As a hack we simply join values of all possible keys. This is + # fine since we're only using them to find possible highlights. + values = [] + for key in ("body", "name", "topic"): + v = event.content.get(key, None) + if v: + values.append(v) + + if not values: + continue + + value = " ".join(values) + + # We need to find some values for StartSel and StopSel that + # aren't in the value so that we can pick results out. + start_sel = "<" + stop_sel = ">" + + while start_sel in value: + start_sel += "<" + while stop_sel in value: + stop_sel += ">" + + query = "SELECT ts_headline(?, plainto_tsquery('english', ?), %s)" % ( + _to_postgres_options({ + "StartSel": start_sel, + "StopSel": stop_sel, + "MaxFragments": "50", + }) + ) + txn.execute(query, (value, search_term,)) + headline, = txn.fetchall()[0] + + # Now we need to pick the possible highlights out of the haedline + # result. + matcher_regex = "%s(.*?)%s" % ( + re.escape(start_sel), + re.escape(stop_sel), + ) + + res = re.findall(matcher_regex, headline) + highlight_words.update([r.lower() for r in res]) + + return highlight_words + + return self.runInteraction("_find_highlights", f) + + +def _to_postgres_options(options_dict): + return "'%s'" % ( + ",".join("%s=%s" % (k, v) for k, v in options_dict.items()), + ) -- cgit 1.5.1 From 4dcaa42b6d1fde87e29eb4f3c0080ea92fcc7fa2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 30 Nov 2015 17:45:31 +0000 Subject: Allow paginating search ordered by recents --- synapse/handlers/search.py | 128 ++++++++++++++++------------------ synapse/storage/events.py | 77 ++++++++++++++++++++ synapse/storage/schema/delta/26/ts.py | 57 +++++++++++++++ synapse/storage/search.py | 41 ++++++----- 4 files changed, 219 insertions(+), 84 deletions(-) create mode 100644 synapse/storage/schema/delta/26/ts.py (limited to 'synapse/storage') diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 6d2197339e..671dbb61b8 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -131,6 +131,17 @@ class SearchHandler(BaseHandler): if batch_group == "room_id": room_ids.intersection_update({batch_group_key}) + if not room_ids: + defer.returnValue({ + "search_categories": { + "room_events": { + "results": {}, + "count": 0, + "highlights": [], + } + } + }) + rank_map = {} # event_id -> rank of event allowed_events = [] room_groups = {} # Holds result of grouping by room, if applicable @@ -178,85 +189,66 @@ class SearchHandler(BaseHandler): s["results"].append(e.event_id) elif order_by == "recent": - # In this case we specifically loop through each room as the given - # limit applies to each room, rather than a global list. - # This is not necessarilly a good idea. - for room_id in room_ids: - room_events = [] - if batch_group == "room_id" and batch_group_key == room_id: - pagination_token = batch_token - else: - pagination_token = None - i = 0 - - # We keep looping and we keep filtering until we reach the limit - # or we run out of things. - # But only go around 5 times since otherwise synapse will be sad. - while len(room_events) < search_filter.limit() and i < 5: - i += 1 - search_result = yield self.store.search_room( - room_id, search_term, keys, search_filter.limit() * 2, - pagination_token=pagination_token, - ) + room_events = [] + i = 0 + + pagination_token = batch_token + + # We keep looping and we keep filtering until we reach the limit + # or we run out of things. + # But only go around 5 times since otherwise synapse will be sad. + while len(room_events) < search_filter.limit() and i < 5: + i += 1 + search_result = yield self.store.search_rooms( + room_ids, search_term, keys, search_filter.limit() * 2, + pagination_token=pagination_token, + ) - if search_result["highlights"]: - highlights.update(search_result["highlights"]) + if search_result["highlights"]: + highlights.update(search_result["highlights"]) - results = search_result["results"] + results = search_result["results"] - results_map = {r["event"].event_id: r for r in results} + results_map = {r["event"].event_id: r for r in results} - rank_map.update({r["event"].event_id: r["rank"] for r in results}) + rank_map.update({r["event"].event_id: r["rank"] for r in results}) - filtered_events = search_filter.filter([ - r["event"] for r in results - ]) + filtered_events = search_filter.filter([ + r["event"] for r in results + ]) - events = yield self._filter_events_for_client( - user.to_string(), filtered_events - ) + events = yield self._filter_events_for_client( + user.to_string(), filtered_events + ) - room_events.extend(events) - room_events = room_events[:search_filter.limit()] + room_events.extend(events) + room_events = room_events[:search_filter.limit()] - if len(results) < search_filter.limit() * 2: - pagination_token = None - break - else: - pagination_token = results[-1]["pagination_token"] - - if room_events: - res = results_map[room_events[-1].event_id] - pagination_token = res["pagination_token"] - - group = room_groups.setdefault(room_id, {}) - if pagination_token: - next_batch = encode_base64("%s\n%s\n%s" % ( - "room_id", room_id, pagination_token - )) - group["next_batch"] = next_batch - - if batch_token: - global_next_batch = next_batch - - group["results"] = [e.event_id for e in room_events] - group["order"] = max( - e.origin_server_ts/1000 for e in room_events - if hasattr(e, "origin_server_ts") - ) + if len(results) < search_filter.limit() * 2: + pagination_token = None + else: + pagination_token = results[-1]["pagination_token"] - allowed_events.extend(room_events) + if room_events: + for event in room_events: + group = room_groups.setdefault(event.room_id, { + "results": [], + }) + group["results"].append(event.event_id) - # Normalize the group orders - if room_groups: - if len(room_groups) > 1: - mx = max(g["order"] for g in room_groups.values()) - mn = min(g["order"] for g in room_groups.values()) + pagination_token = results_map[room_events[-1].event_id]["pagination_token"] - for g in room_groups.values(): - g["order"] = (g["order"] - mn) * 1.0 / (mx - mn) - else: - room_groups.values()[0]["order"] = 1 + if pagination_token: + global_next_batch = encode_base64("%s\n%s\n%s" % ( + "all", "", pagination_token + )) + + for room_id, group in room_groups.items(): + group["next_batch"] = encode_base64("%s\n%s\n%s" % ( + "room_id", room_id, pagination_token + )) + + allowed_events.extend(room_events) else: # We should never get here due to the guard earlier. diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5d35ca90b9..7088f2709b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -51,6 +51,14 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events class EventsStore(SQLBaseStore): + EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" + + def __init__(self, hs): + super(EventsStore, self).__init__(hs) + self.register_background_update_handler( + self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts + ) + @defer.inlineCallbacks def persist_events(self, events_and_contexts, backfilled=False, is_new_state=True): @@ -365,6 +373,7 @@ class EventsStore(SQLBaseStore): "processed": True, "outlier": event.internal_metadata.is_outlier(), "content": encode_json(event.content).decode("UTF-8"), + "origin_server_ts": int(event.origin_server_ts), } for event, _ in events_and_contexts ], @@ -964,3 +973,71 @@ class EventsStore(SQLBaseStore): ret = yield self.runInteraction("count_messages", _count_messages) defer.returnValue(ret) + + @defer.inlineCallbacks + def _background_reindex_origin_server_ts(self, progress, batch_size): + target_min_stream_id = progress["target_min_stream_id_inclusive"] + max_stream_id = progress["max_stream_id_exclusive"] + rows_inserted = progress.get("rows_inserted", 0) + + INSERT_CLUMP_SIZE = 1000 + + def reindex_search_txn(txn): + sql = ( + "SELECT stream_ordering, event_id FROM events" + " WHERE ? <= stream_ordering AND stream_ordering < ?" + " ORDER BY stream_ordering DESC" + " LIMIT ?" + ) + + txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + + rows = txn.fetchall() + if not rows: + return 0 + + min_stream_id = rows[-1][0] + event_ids = [row[1] for row in rows] + + events = self._get_events_txn(txn, event_ids) + + rows = [] + for event in events: + try: + event_id = event.event_id + origin_server_ts = event.origin_server_ts + except (KeyError, AttributeError): + # If the event is missing a necessary field then + # skip over it. + continue + + rows.append((origin_server_ts, event_id)) + + sql = ( + "UPDATE events SET origin_server_ts = ? WHERE event_id = ?" + ) + + for index in range(0, len(rows), INSERT_CLUMP_SIZE): + clump = rows[index:index + INSERT_CLUMP_SIZE] + txn.executemany(sql, clump) + + progress = { + "target_min_stream_id_inclusive": target_min_stream_id, + "max_stream_id_exclusive": min_stream_id, + "rows_inserted": rows_inserted + len(rows) + } + + self._background_update_progress_txn( + txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress + ) + + return len(rows) + + result = yield self.runInteraction( + self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn + ) + + if not result: + yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME) + + defer.returnValue(result) diff --git a/synapse/storage/schema/delta/26/ts.py b/synapse/storage/schema/delta/26/ts.py new file mode 100644 index 0000000000..8d4a981975 --- /dev/null +++ b/synapse/storage/schema/delta/26/ts.py @@ -0,0 +1,57 @@ +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from synapse.storage.prepare_database import get_statements + +import ujson + +logger = logging.getLogger(__name__) + + +ALTER_TABLE = ( + "ALTER TABLE events ADD COLUMN origin_server_ts BIGINT;" + "CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);" +) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + for statement in get_statements(ALTER_TABLE.splitlines()): + cur.execute(statement) + + cur.execute("SELECT MIN(stream_ordering) FROM events") + rows = cur.fetchall() + min_stream_id = rows[0][0] + + cur.execute("SELECT MAX(stream_ordering) FROM events") + rows = cur.fetchall() + max_stream_id = rows[0][0] + + if min_stream_id is not None and max_stream_id is not None: + progress = { + "target_min_stream_id_inclusive": min_stream_id, + "max_stream_id_exclusive": max_stream_id + 1, + "rows_inserted": 0, + } + progress_json = ujson.dumps(progress) + + sql = ( + "INSERT into background_updates (update_name, progress_json)" + " VALUES (?, ?)" + ) + + sql = database_engine.convert_param_style(sql) + + cur.execute(sql, ("event_origin_server_ts", progress_json)) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index c6386642df..20a62d07ff 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -212,11 +212,11 @@ class SearchStore(BackgroundUpdateStore): }) @defer.inlineCallbacks - def search_room(self, room_id, search_term, keys, limit, pagination_token=None): + def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None): """Performs a full text search over events with given keys. Args: - room_id (str): The room_id to search in + room_id (list): The room_ids to search in search_term (str): Search term to search for keys (list): List of keys to search in, currently supports "content.body", "content.name", "content.topic" @@ -226,7 +226,15 @@ class SearchStore(BackgroundUpdateStore): list of dicts """ clauses = [] - args = [search_term, room_id] + args = [search_term] + + # Make sure we don't explode because the person is in too many rooms. + # We filter the results below regardless. + if len(room_ids) < 500: + clauses.append( + "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),) + ) + args.extend(room_ids) local_clauses = [] for key in keys: @@ -239,25 +247,25 @@ class SearchStore(BackgroundUpdateStore): if pagination_token: try: - topo, stream = pagination_token.split(",") - topo = int(topo) + origin_server_ts, stream = pagination_token.split(",") + origin_server_ts = int(origin_server_ts) stream = int(stream) except: raise SynapseError(400, "Invalid pagination token") clauses.append( - "(topological_ordering < ?" - " OR (topological_ordering = ? AND stream_ordering < ?))" + "(origin_server_ts < ?" + " OR (origin_server_ts = ? AND stream_ordering < ?))" ) - args.extend([topo, topo, stream]) + args.extend([origin_server_ts, origin_server_ts, stream]) if isinstance(self.database_engine, PostgresEngine): sql = ( "SELECT ts_rank_cd(vector, query) as rank," - " topological_ordering, stream_ordering, room_id, event_id" + " origin_server_ts, stream_ordering, room_id, event_id" " FROM plainto_tsquery('english', ?) as query, event_search" " NATURAL JOIN events" - " WHERE vector @@ query AND room_id = ?" + " WHERE vector @@ query AND " ) elif isinstance(self.database_engine, Sqlite3Engine): # We use CROSS JOIN here to ensure we use the right indexes. @@ -270,24 +278,23 @@ class SearchStore(BackgroundUpdateStore): # MATCH unless it uses the full text search index sql = ( "SELECT rank(matchinfo) as rank, room_id, event_id," - " topological_ordering, stream_ordering" + " origin_server_ts, stream_ordering" " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo" " FROM event_search" " WHERE value MATCH ?" " )" " CROSS JOIN events USING (event_id)" - " WHERE room_id = ?" + " WHERE " ) else: # This should be unreachable. raise Exception("Unrecognized database engine") - for clause in clauses: - sql += " AND " + clause + sql += " AND ".join(clauses) # We add an arbitrary limit here to ensure we don't try to pull the # entire table from the database. - sql += " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" + sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?" args.append(limit) @@ -295,6 +302,8 @@ class SearchStore(BackgroundUpdateStore): "search_rooms", self.cursor_to_dict, sql, *args ) + results = filter(lambda row: row["room_id"] in room_ids, results) + events = yield self._get_events([r["event_id"] for r in results]) event_map = { @@ -312,7 +321,7 @@ class SearchStore(BackgroundUpdateStore): "event": event_map[r["event_id"]], "rank": r["rank"], "pagination_token": "%s,%s" % ( - r["topological_ordering"], r["stream_ordering"] + r["origin_server_ts"], r["stream_ordering"] ), } for r in results -- cgit 1.5.1 From 95f30ecd1f90cd143c908589b600742148491c15 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 1 Dec 2015 18:41:32 +0000 Subject: Add API for setting account_data globaly or on a per room basis --- synapse/api/filtering.py | 9 +- synapse/handlers/account_data.py | 21 ++- synapse/handlers/message.py | 40 ++++- synapse/handlers/sync.py | 72 ++++++-- synapse/rest/client/v2_alpha/__init__.py | 2 + synapse/rest/client/v2_alpha/account_data.py | 111 ++++++++++++ synapse/rest/client/v2_alpha/sync.py | 6 + synapse/storage/__init__.py | 2 + synapse/storage/account_data.py | 211 +++++++++++++++++++++++ synapse/storage/schema/delta/26/account_data.sql | 23 +++ synapse/storage/tags.py | 4 +- 11 files changed, 476 insertions(+), 25 deletions(-) create mode 100644 synapse/rest/client/v2_alpha/account_data.py create mode 100644 synapse/storage/account_data.py (limited to 'synapse/storage') diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 18f2ec3ae8..19f30c273c 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -50,7 +50,7 @@ class Filtering(object): # many definitions. top_level_definitions = [ - "presence" + "presence", "account_data" ] room_level_definitions = [ @@ -139,6 +139,10 @@ class FilterCollection(object): self.filter_json.get("presence", {}) ) + self.account_data = Filter( + self.filter_json.get("account_data", {}) + ) + def timeline_limit(self): return self.room_timeline_filter.limit() @@ -151,6 +155,9 @@ class FilterCollection(object): def filter_presence(self, events): return self.presence_filter.filter(events) + def filter_account_data(self, events): + return self.account_data.filter(events) + def filter_room_state(self, events): return self.room_state_filter.filter(events) diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 1d35d3b7dc..fe773bee9b 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -29,9 +29,10 @@ class AccountDataEventSource(object): last_stream_id = from_key current_stream_id = yield self.store.get_max_account_data_stream_id() - tags = yield self.store.get_updated_tags(user_id, last_stream_id) results = [] + tags = yield self.store.get_updated_tags(user_id, last_stream_id) + for room_id, room_tags in tags.items(): results.append({ "type": "m.tag", @@ -39,6 +40,24 @@ class AccountDataEventSource(object): "room_id": room_id, }) + account_data, room_account_data = ( + yield self.store.get_updated_account_data_for_user(user_id, last_stream_id) + ) + + for account_data_type, content in account_data.items(): + results.append({ + "type": account_data_type, + "content": content, + }) + + for room_id, account_data in room_account_data.items(): + for account_data_type, content in account_data.items(): + results.append({ + "type": account_data_type, + "content": content, + "room_id": room_id, + }) + defer.returnValue((results, current_stream_id)) @defer.inlineCallbacks diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 64c57375f7..e959ce50be 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -359,6 +359,10 @@ class MessageHandler(BaseHandler): tags_by_room = yield self.store.get_tags_for_user(user_id) + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user(user_id) + ) + public_room_ids = yield self.store.get_public_room_ids() limit = pagin_config.limit @@ -436,14 +440,22 @@ class MessageHandler(BaseHandler): for c in current_state.values() ] - account_data = [] + account_data_events = [] tags = tags_by_room.get(event.room_id) if tags: - account_data.append({ + account_data_events.append({ "type": "m.tag", "content": {"tags": tags}, }) - d["account_data"] = account_data + + account_data = account_data_by_room.get(event.room_id, {}) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + d["account_data"] = account_data_events except: logger.exception("Failed to get snapshot") @@ -456,9 +468,17 @@ class MessageHandler(BaseHandler): consumeErrors=True ).addErrback(unwrapFirstError) + account_data_events = [] + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + ret = { "rooms": rooms_ret, "presence": presence, + "account_data": account_data_events, "receipts": receipt, "end": now_token.to_string(), } @@ -498,14 +518,22 @@ class MessageHandler(BaseHandler): user_id, room_id, pagin_config, membership, member_event_id, is_guest ) - account_data = [] + account_data_events = [] tags = yield self.store.get_tags_for_room(user_id, room_id) if tags: - account_data.append({ + account_data_events.append({ "type": "m.tag", "content": {"tags": tags}, }) - result["account_data"] = account_data + + account_data = yield self.store.get_account_data_for_room(user_id, room_id) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + result["account_data"] = account_data_events defer.returnValue(result) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 877328b29e..943ce368ef 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -100,6 +100,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. + "account_data", # List of account_data events for the user. "joined", # JoinedSyncResult for each joined room. "invited", # InvitedSyncResult for each invited room. "archived", # ArchivedSyncResult for each archived room. @@ -195,6 +196,12 @@ class SyncHandler(BaseHandler): ) ) + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user( + sync_config.user.to_string() + ) + ) + tags_by_room = yield self.store.get_tags_for_user( sync_config.user.to_string() ) @@ -211,6 +218,7 @@ class SyncHandler(BaseHandler): timeline_since_token=timeline_since_token, ephemeral_by_room=ephemeral_by_room, tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, ) joined.append(room_sync) elif event.membership == Membership.INVITE: @@ -230,11 +238,13 @@ class SyncHandler(BaseHandler): leave_token=leave_token, timeline_since_token=timeline_since_token, tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, ) archived.append(room_sync) defer.returnValue(SyncResult( presence=presence, + account_data=self.account_data_for_user(account_data), joined=joined, invited=invited, archived=archived, @@ -244,7 +254,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def full_state_sync_for_joined_room(self, room_id, sync_config, now_token, timeline_since_token, - ephemeral_by_room, tags_by_room): + ephemeral_by_room, tags_by_room, + account_data_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -262,19 +273,38 @@ class SyncHandler(BaseHandler): state=current_state, ephemeral=ephemeral_by_room.get(room_id, []), account_data=self.account_data_for_room( - room_id, tags_by_room + room_id, tags_by_room, account_data_by_room ), )) - def account_data_for_room(self, room_id, tags_by_room): - account_data = [] + def account_data_for_user(self, account_data): + account_data_events = [] + + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + return account_data_events + + def account_data_for_room(self, room_id, tags_by_room, account_data_by_room): + account_data_events = [] tags = tags_by_room.get(room_id) if tags is not None: - account_data.append({ + account_data_events.append({ "type": "m.tag", "content": {"tags": tags}, }) - return account_data + + account_data = account_data_by_room.get(room_id, {}) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + return account_data_events @defer.inlineCallbacks def ephemeral_by_room(self, sync_config, now_token, since_token=None): @@ -341,7 +371,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def full_state_sync_for_archived_room(self, room_id, sync_config, leave_event_id, leave_token, - timeline_since_token, tags_by_room): + timeline_since_token, tags_by_room, + account_data_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -358,7 +389,7 @@ class SyncHandler(BaseHandler): timeline=batch, state=leave_state, account_data=self.account_data_for_room( - room_id, tags_by_room + room_id, tags_by_room, account_data_by_room ), )) @@ -415,6 +446,13 @@ class SyncHandler(BaseHandler): since_token.account_data_key, ) + account_data, account_data_by_room = ( + yield self.store.get_updated_account_data_for_user( + sync_config.user.to_string(), + since_token.account_data_key, + ) + ) + joined = [] archived = [] if len(room_events) <= timeline_limit: @@ -469,7 +507,7 @@ class SyncHandler(BaseHandler): state=state, ephemeral=ephemeral_by_room.get(room_id, []), account_data=self.account_data_for_room( - room_id, tags_by_room + room_id, tags_by_room, account_data_by_room ), ) logger.debug("Result for room %s: %r", room_id, room_sync) @@ -492,14 +530,15 @@ class SyncHandler(BaseHandler): for room_id in joined_room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( room_id, sync_config, since_token, now_token, - ephemeral_by_room, tags_by_room + ephemeral_by_room, tags_by_room, account_data_by_room ) if room_sync: joined.append(room_sync) for leave_event in leave_events: room_sync = yield self.incremental_sync_for_archived_room( - sync_config, leave_event, since_token, tags_by_room + sync_config, leave_event, since_token, tags_by_room, + account_data_by_room ) archived.append(room_sync) @@ -510,6 +549,7 @@ class SyncHandler(BaseHandler): defer.returnValue(SyncResult( presence=presence, + account_data=self.account_data_for_user(account_data), joined=joined, invited=invited, archived=archived, @@ -566,7 +606,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, - ephemeral_by_room, tags_by_room): + ephemeral_by_room, tags_by_room, + account_data_by_room): """ Get the incremental delta needed to bring the client up to date for the room. Gives the client the most recent events and the changes to state. @@ -606,7 +647,7 @@ class SyncHandler(BaseHandler): state=state, ephemeral=ephemeral_by_room.get(room_id, []), account_data=self.account_data_for_room( - room_id, tags_by_room + room_id, tags_by_room, account_data_by_room ), ) @@ -616,7 +657,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_for_archived_room(self, sync_config, leave_event, - since_token, tags_by_room): + since_token, tags_by_room, + account_data_by_room): """ Get the incremental delta needed to bring the client up to date for the archived room. Returns: @@ -654,7 +696,7 @@ class SyncHandler(BaseHandler): timeline=batch, state=state_events_delta, account_data=self.account_data_for_room( - leave_event.room_id, tags_by_room + leave_event.room_id, tags_by_room, account_data_by_room ), ) diff --git a/synapse/rest/client/v2_alpha/__init__.py b/synapse/rest/client/v2_alpha/__init__.py index a108132346..d7b59c84d1 100644 --- a/synapse/rest/client/v2_alpha/__init__.py +++ b/synapse/rest/client/v2_alpha/__init__.py @@ -23,6 +23,7 @@ from . import ( keys, tokenrefresh, tags, + account_data, ) from synapse.http.server import JsonResource @@ -46,3 +47,4 @@ class ClientV2AlphaRestResource(JsonResource): keys.register_servlets(hs, client_resource) tokenrefresh.register_servlets(hs, client_resource) tags.register_servlets(hs, client_resource) + account_data.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/account_data.py b/synapse/rest/client/v2_alpha/account_data.py new file mode 100644 index 0000000000..5b8f454bf1 --- /dev/null +++ b/synapse/rest/client/v2_alpha/account_data.py @@ -0,0 +1,111 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import client_v2_patterns + +from synapse.http.servlet import RestServlet +from synapse.api.errors import AuthError, SynapseError + +from twisted.internet import defer + +import logging + +import simplejson as json + +logger = logging.getLogger(__name__) + + +class AccountDataServlet(RestServlet): + """ + PUT /user/{user_id}/account_data/{account_dataType} HTTP/1.1 + """ + PATTERNS = client_v2_patterns( + "/user/(?P[^/]*)/account_data/(?P[^/]*)" + ) + + def __init__(self, hs): + super(AccountDataServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + + @defer.inlineCallbacks + def on_PUT(self, request, user_id, account_data_type): + auth_user, _, _ = yield self.auth.get_user_by_req(request) + if user_id != auth_user.to_string(): + raise AuthError(403, "Cannot add account data for other users.") + + try: + content_bytes = request.content.read() + body = json.loads(content_bytes) + except: + raise SynapseError(400, "Invalid JSON") + + max_id = yield self.store.add_account_data_for_user( + user_id, account_data_type, body + ) + + yield self.notifier.on_new_event( + "account_data_key", max_id, users=[user_id] + ) + + defer.returnValue((200, {})) + + +class RoomAccountDataServlet(RestServlet): + """ + PUT /user/{user_id}/rooms/{room_id}/account_data/{account_dataType} HTTP/1.1 + """ + PATTERNS = client_v2_patterns( + "/user/(?P[^/]*)" + "/rooms/(?P[^/]*)" + "/account_data/(?P[^/]*)" + ) + + def __init__(self, hs): + super(RoomAccountDataServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + + @defer.inlineCallbacks + def on_PUT(self, request, user_id, room_id, account_data_type): + auth_user, _, _ = yield self.auth.get_user_by_req(request) + if user_id != auth_user.to_string(): + raise AuthError(403, "Cannot add account data for other users.") + + try: + content_bytes = request.content.read() + body = json.loads(content_bytes) + except: + raise SynapseError(400, "Invalid JSON") + + if not isinstance(body, dict): + raise ValueError("Expected a JSON object") + + max_id = yield self.store.add_account_data_to_room( + user_id, room_id, account_data_type, body + ) + + yield self.notifier.on_new_event( + "account_data_key", max_id, users=[user_id] + ) + + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + AccountDataServlet(hs).register(http_server) + RoomAccountDataServlet(hs).register(http_server) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 09693bb435..4efe802487 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -144,6 +144,9 @@ class SyncRestServlet(RestServlet): ) response_content = { + "account_data": self.encode_account_data( + sync_result.account_data, filter, time_now + ), "presence": self.encode_presence( sync_result.presence, filter, time_now ), @@ -165,6 +168,9 @@ class SyncRestServlet(RestServlet): formatted.append(event) return {"events": filter.filter_presence(formatted)} + def encode_account_data(self, events, filter, time_now): + return {"events": filter.filter_account_data(events)} + def encode_joined(self, rooms, filter, time_now, token_id): """ Encode the joined rooms in a sync result diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index e7443f2838..c46b653f11 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -42,6 +42,7 @@ from .end_to_end_keys import EndToEndKeyStore from .receipts import ReceiptsStore from .search import SearchStore from .tags import TagsStore +from .account_data import AccountDataStore import logging @@ -73,6 +74,7 @@ class DataStore(RoomMemberStore, RoomStore, EndToEndKeyStore, SearchStore, TagsStore, + AccountDataStore, ): def __init__(self, hs): diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py new file mode 100644 index 0000000000..d1829f84e8 --- /dev/null +++ b/synapse/storage/account_data.py @@ -0,0 +1,211 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore +from twisted.internet import defer + +import ujson as json +import logging + +logger = logging.getLogger(__name__) + + +class AccountDataStore(SQLBaseStore): + + def get_account_data_for_user(self, user_id): + """Get all the client account_data for a user. + + Args: + user_id(str): The user to get the account_data for. + Returns: + A deferred pair of a dict of global account_data and a dict + mapping from room_id string to per room account_data dicts. + """ + + def get_account_data_for_user_txn(txn): + rows = self._simple_select_list_txn( + txn, "account_data", {"user_id": user_id}, + ["account_data_type", "content"] + ) + + global_account_data = { + row["account_data_type"]: json.loads(row["content"]) for row in rows + } + + rows = self._simple_select_list_txn( + txn, "room_account_data", {"user_id": user_id}, + ["room_id", "account_data_type", "content"] + ) + + by_room = {} + for row in rows: + room_data = by_room.setdefault(row["room_id"], {}) + room_data[row["account_data_type"]] = json.loads(row["content"]) + + return (global_account_data, by_room) + + return self.runInteraction( + "get_account_data_for_user", get_account_data_for_user_txn + ) + + def get_account_data_for_room(self, user_id, room_id): + """Get all the client account_data for a user for a room. + + Args: + user_id(str): The user to get the account_data for. + room_id(str): The room to get the account_data for. + Returns: + A deferred dict of the room account_data + """ + def get_account_data_for_room_txn(txn): + rows = self._simple_select_list_txn( + txn, "room_account_data", {"user_id": user_id, "room_id": room_id}, + ["account_data_type", "content"] + ) + + return { + row["account_data_type"]: json.loads(row["content"]) for row in rows + } + + return self.runInteraction( + "get_account_data_for_room", get_account_data_for_room_txn + ) + + def get_updated_account_data_for_user(self, user_id, stream_id): + """Get all the client account_data for a that's changed. + + Args: + user_id(str): The user to get the account_data for. + stream_id(int): The point in the stream since which to get updates + Returns: + A deferred pair of a dict of global account_data and a dict + mapping from room_id string to per room account_data dicts. + """ + + def get_updated_account_data_for_user_txn(txn): + sql = ( + "SELECT account_data_type, content FROM account_data" + " WHERE user_id = ? AND stream_id > ?" + ) + + txn.execute(sql, (user_id, stream_id)) + + global_account_data = { + row[0]: json.loads(row[1]) for row in txn.fetchall() + } + + sql = ( + "SELECT room_id, account_data_type, content FROM room_account_data" + " WHERE user_id = ? AND stream_id > ?" + ) + + txn.execute(sql, (user_id, stream_id)) + + account_data_by_room = {} + for row in txn.fetchall(): + room_account_data = account_data_by_room.setdefault(row[0], {}) + room_account_data[row[1]] = json.loads(row[2]) + + return (global_account_data, account_data_by_room) + + return self.runInteraction( + "get_updated_account_data_for_user", get_updated_account_data_for_user_txn + ) + + @defer.inlineCallbacks + def add_account_data_to_room(self, user_id, room_id, account_data_type, content): + """Add some account_data to a room for a user. + Args: + user_id(str): The user to add a tag for. + room_id(str): The room to add a tag for. + account_data_type(str): The type of account_data to add. + content(dict): A json object to associate with the tag. + Returns: + A deferred that completes once the account_data has been added. + """ + content_json = json.dumps(content) + + def add_account_data_txn(txn, next_id): + self._simple_upsert_txn( + txn, + table="room_account_data", + keyvalues={ + "user_id": user_id, + "room_id": room_id, + "account_data_type": account_data_type, + }, + values={ + "stream_id": next_id, + "content": content_json, + } + ) + self._update_max_stream_id(txn, next_id) + + with (yield self._account_data_id_gen.get_next(self)) as next_id: + yield self.runInteraction( + "add_room_account_data", add_account_data_txn, next_id + ) + + result = yield self._account_data_id_gen.get_max_token(self) + defer.returnValue(result) + + @defer.inlineCallbacks + def add_account_data_for_user(self, user_id, account_data_type, content): + """Add some account_data to a room for a user. + Args: + user_id(str): The user to add a tag for. + account_data_type(str): The type of account_data to add. + content(dict): A json object to associate with the tag. + Returns: + A deferred that completes once the account_data has been added. + """ + content_json = json.dumps(content) + + def add_account_data_txn(txn, next_id): + self._simple_upsert_txn( + txn, + table="account_data", + keyvalues={ + "user_id": user_id, + "account_data_type": account_data_type, + }, + values={ + "stream_id": next_id, + "content": content_json, + } + ) + self._update_max_stream_id(txn, next_id) + + with (yield self._account_data_id_gen.get_next(self)) as next_id: + yield self.runInteraction( + "add_user_account_data", add_account_data_txn, next_id + ) + + result = yield self._account_data_id_gen.get_max_token(self) + defer.returnValue(result) + + def _update_max_stream_id(self, txn, next_id): + """Update the max stream_id + + Args: + txn: The database cursor + next_id(int): The the revision to advance to. + """ + update_max_id_sql = ( + "UPDATE account_data_max_stream_id" + " SET stream_id = ?" + " WHERE stream_id < ?" + ) + txn.execute(update_max_id_sql, (next_id, next_id)) diff --git a/synapse/storage/schema/delta/26/account_data.sql b/synapse/storage/schema/delta/26/account_data.sql index 3198a0d29c..48ad9cc6b8 100644 --- a/synapse/storage/schema/delta/26/account_data.sql +++ b/synapse/storage/schema/delta/26/account_data.sql @@ -15,3 +15,26 @@ ALTER TABLE private_user_data_max_stream_id RENAME TO account_data_max_stream_id; + + +CREATE TABLE IF NOT EXISTS account_data( + user_id TEXT NOT NULL, + account_data_type TEXT NOT NULL, -- The type of the account_data. + stream_id BIGINT NOT NULL, -- The version of the account_data. + content TEXT NOT NULL, -- The JSON content of the account_data + CONSTRAINT account_data_uniqueness UNIQUE (user_id, account_data_type) +); + + +CREATE TABLE IF NOT EXISTS room_account_data( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + account_data_type TEXT NOT NULL, -- The type of the account_data. + stream_id BIGINT NOT NULL, -- The version of the account_data. + content TEXT NOT NULL, -- The JSON content of the account_data + CONSTRAINT room_account_data_uniqueness UNIQUE (user_id, room_id, account_data_type) +); + + +CREATE INDEX account_data_stream_id on account_data(user_id, stream_id); +CREATE INDEX room_account_data_stream_id on room_account_data(user_id, stream_id); diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index f6d826cc59..f520f60c6c 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -48,8 +48,8 @@ class TagsStore(SQLBaseStore): Args: user_id(str): The user to get the tags for. Returns: - A deferred dict mapping from room_id strings to lists of tag - strings. + A deferred dict mapping from room_id strings to dicts mapping from + tag strings to tag content. """ deferred = self._simple_select_list( -- cgit 1.5.1 From 477da77b463baa9c2326c763911ecf8b46e1d84b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Dec 2015 11:38:51 +0000 Subject: Search: Add prefix matching support --- synapse/storage/search.py | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 20a62d07ff..0dfd7b9fb5 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -140,7 +140,10 @@ class SearchStore(BackgroundUpdateStore): list of dicts """ clauses = [] - args = [] + if isinstance(self.database_engine, PostgresEngine): + args = [_postgres_parse_query(search_term)] + else: + args = [_sqlite_parse_query(search_term)] # Make sure we don't explode because the person is in too many rooms. # We filter the results below regardless. @@ -162,7 +165,7 @@ class SearchStore(BackgroundUpdateStore): if isinstance(self.database_engine, PostgresEngine): sql = ( "SELECT ts_rank_cd(vector, query) AS rank, room_id, event_id" - " FROM plainto_tsquery('english', ?) as query, event_search" + " FROM to_tsquery('english', ?) as query, event_search" " WHERE vector @@ query" ) elif isinstance(self.database_engine, Sqlite3Engine): @@ -183,7 +186,7 @@ class SearchStore(BackgroundUpdateStore): sql += " ORDER BY rank DESC LIMIT 500" results = yield self._execute( - "search_msgs", self.cursor_to_dict, sql, *([search_term] + args) + "search_msgs", self.cursor_to_dict, sql, *args ) results = filter(lambda row: row["room_id"] in room_ids, results) @@ -226,7 +229,11 @@ class SearchStore(BackgroundUpdateStore): list of dicts """ clauses = [] - args = [search_term] + + if isinstance(self.database_engine, PostgresEngine): + args = [_postgres_parse_query(search_term)] + else: + args = [_sqlite_parse_query(search_term)] # Make sure we don't explode because the person is in too many rooms. # We filter the results below regardless. @@ -263,7 +270,7 @@ class SearchStore(BackgroundUpdateStore): sql = ( "SELECT ts_rank_cd(vector, query) as rank," " origin_server_ts, stream_ordering, room_id, event_id" - " FROM plainto_tsquery('english', ?) as query, event_search" + " FROM to_tsquery('english', ?) as query, event_search" " NATURAL JOIN events" " WHERE vector @@ query AND " ) @@ -399,3 +406,23 @@ def _to_postgres_options(options_dict): return "'%s'" % ( ",".join("%s=%s" % (k, v) for k, v in options_dict.items()), ) + + +def _postgres_parse_query(search_term): + """Takes a plain unicode string from the user and converts it into a form + that can be passed to `to_tsquery(..)` postgres func. We use this so that + we can add prefix matching, which isn't something `plainto_tsquery` supports. + """ + results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) + + return " & ".join(result + ":*" for result in results) + + +def _sqlite_parse_query(search_term): + """Takes a plain unicode string from the user and converts it into a form + that can be passed to sqlite `MATCH`. We use this so that we can do prefix + matching. + """ + results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) + + return " & ".join(result + "*" for result in results) -- cgit 1.5.1 From 7dd6e5efca99fc17fa13225ed3d235931da315c9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Dec 2015 13:09:37 +0000 Subject: Remove deuplication. Add comment about regex. --- synapse/storage/search.py | 32 +++++++++++--------------------- 1 file changed, 11 insertions(+), 21 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 0dfd7b9fb5..4738bdd503 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -140,10 +140,7 @@ class SearchStore(BackgroundUpdateStore): list of dicts """ clauses = [] - if isinstance(self.database_engine, PostgresEngine): - args = [_postgres_parse_query(search_term)] - else: - args = [_sqlite_parse_query(search_term)] + args = [_parse_query(self.database_engine, search_term)] # Make sure we don't explode because the person is in too many rooms. # We filter the results below regardless. @@ -230,10 +227,7 @@ class SearchStore(BackgroundUpdateStore): """ clauses = [] - if isinstance(self.database_engine, PostgresEngine): - args = [_postgres_parse_query(search_term)] - else: - args = [_sqlite_parse_query(search_term)] + args = [_parse_query(self.database_engine, search_term)] # Make sure we don't explode because the person is in too many rooms. # We filter the results below regardless. @@ -408,21 +402,17 @@ def _to_postgres_options(options_dict): ) -def _postgres_parse_query(search_term): +def _parse_query(database_engine, search_term): """Takes a plain unicode string from the user and converts it into a form - that can be passed to `to_tsquery(..)` postgres func. We use this so that - we can add prefix matching, which isn't something `plainto_tsquery` supports. + that can be passed to database. + We use this so that we can add prefix matching, which isn't something + that is supported by default. """ - results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) - return " & ".join(result + ":*" for result in results) - - -def _sqlite_parse_query(search_term): - """Takes a plain unicode string from the user and converts it into a form - that can be passed to sqlite `MATCH`. We use this so that we can do prefix - matching. - """ + # Pull out the individual words, discarding any non-word characters. results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) - return " & ".join(result + "*" for result in results) + if isinstance(database_engine, PostgresEngine): + return " & ".join(result + ":*" for result in results) + else: + return " & ".join(result + "*" for result in results) -- cgit 1.5.1 From b9acef53015fb112c8b888d3d184388f9d030b01 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Dec 2015 13:28:13 +0000 Subject: Fix so highlight matching works again --- synapse/storage/search.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 4738bdd503..fd7b688cf5 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -140,7 +140,10 @@ class SearchStore(BackgroundUpdateStore): list of dicts """ clauses = [] - args = [_parse_query(self.database_engine, search_term)] + + search_query = search_query = _parse_query(self.database_engine, search_term) + + args = [search_query] # Make sure we don't explode because the person is in too many rooms. # We filter the results below regardless. @@ -197,7 +200,7 @@ class SearchStore(BackgroundUpdateStore): highlights = None if isinstance(self.database_engine, PostgresEngine): - highlights = yield self._find_highlights_in_postgres(search_term, events) + highlights = yield self._find_highlights_in_postgres(search_query, events) defer.returnValue({ "results": [ @@ -227,7 +230,9 @@ class SearchStore(BackgroundUpdateStore): """ clauses = [] - args = [_parse_query(self.database_engine, search_term)] + search_query = search_query = _parse_query(self.database_engine, search_term) + + args = [search_query] # Make sure we don't explode because the person is in too many rooms. # We filter the results below regardless. @@ -314,7 +319,7 @@ class SearchStore(BackgroundUpdateStore): highlights = None if isinstance(self.database_engine, PostgresEngine): - highlights = yield self._find_highlights_in_postgres(search_term, events) + highlights = yield self._find_highlights_in_postgres(search_query, events) defer.returnValue({ "results": [ @@ -331,7 +336,7 @@ class SearchStore(BackgroundUpdateStore): "highlights": highlights, }) - def _find_highlights_in_postgres(self, search_term, events): + def _find_highlights_in_postgres(self, search_query, events): """Given a list of events and a search term, return a list of words that match from the content of the event. @@ -339,7 +344,7 @@ class SearchStore(BackgroundUpdateStore): highlight the matching parts. Args: - search_term (str) + search_query (str) events (list): A list of events Returns: @@ -371,14 +376,14 @@ class SearchStore(BackgroundUpdateStore): while stop_sel in value: stop_sel += ">" - query = "SELECT ts_headline(?, plainto_tsquery('english', ?), %s)" % ( + query = "SELECT ts_headline(?, to_tsquery('english', ?), %s)" % ( _to_postgres_options({ "StartSel": start_sel, "StopSel": stop_sel, "MaxFragments": "50", }) ) - txn.execute(query, (value, search_term,)) + txn.execute(query, (value, search_query,)) headline, = txn.fetchall()[0] # Now we need to pick the possible highlights out of the haedline -- cgit 1.5.1 From b2def42bfdbb48152479cbb07411f9784c6871ac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Dec 2015 13:29:14 +0000 Subject: Older versions of SQLite don't like IF NOT EXISTS in virtual tables --- synapse/storage/schema/delta/25/fts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py index 5239d69073..ba48e43792 100644 --- a/synapse/storage/schema/delta/25/fts.py +++ b/synapse/storage/schema/delta/25/fts.py @@ -38,7 +38,7 @@ CREATE INDEX event_search_ev_ridx ON event_search(room_id); SQLITE_TABLE = ( - "CREATE VIRTUAL TABLE IF NOT EXISTS event_search" + "CREATE VIRTUAL TABLE event_search" " USING fts4 ( event_id, room_id, sender, key, value )" ) -- cgit 1.5.1 From 976cb5aaa8e052a27b1dc249798e12c80e8aa329 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Dec 2015 13:50:43 +0000 Subject: Throw if unrecognized DB type --- synapse/storage/search.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index fd7b688cf5..39f600f53c 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -419,5 +419,8 @@ def _parse_query(database_engine, search_term): if isinstance(database_engine, PostgresEngine): return " & ".join(result + ":*" for result in results) - else: + elif isinstance(database_engine, Sqlite3Engine): return " & ".join(result + "*" for result in results) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") -- cgit 1.5.1 From 8810eb8c399f84787ed45b22d7f1386cecf3ca87 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 2 Dec 2015 17:19:11 +0000 Subject: Fix schema delta 15 on postgres in the very unlikley event that anyone upgrades to 15... --- synapse/storage/schema/delta/15/v15.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/15/v15.sql b/synapse/storage/schema/delta/15/v15.sql index f5b2a08ca4..4d21ac61df 100644 --- a/synapse/storage/schema/delta/15/v15.sql +++ b/synapse/storage/schema/delta/15/v15.sql @@ -1,7 +1,7 @@ -- Drop, copy & recreate pushers table to change unique key -- Also add access_token column at the same time CREATE TABLE IF NOT EXISTS pushers2 ( - id INTEGER PRIMARY KEY AUTOINCREMENT, + id BIGINT PRIMARY KEY, user_name TEXT NOT NULL, access_token INTEGER DEFAULT NULL, profile_tag varchar(32) NOT NULL, -- cgit 1.5.1 From e515b4892928d49965a4f44e7a575a3c4fa0c9e2 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 2 Dec 2015 17:23:52 +0000 Subject: Just replace the table definition with the one from full_schema 16 --- synapse/storage/schema/delta/15/v15.sql | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/15/v15.sql b/synapse/storage/schema/delta/15/v15.sql index 4d21ac61df..9523d2bcc3 100644 --- a/synapse/storage/schema/delta/15/v15.sql +++ b/synapse/storage/schema/delta/15/v15.sql @@ -3,21 +3,20 @@ CREATE TABLE IF NOT EXISTS pushers2 ( id BIGINT PRIMARY KEY, user_name TEXT NOT NULL, - access_token INTEGER DEFAULT NULL, - profile_tag varchar(32) NOT NULL, - kind varchar(8) NOT NULL, - app_id varchar(64) NOT NULL, - app_display_name varchar(64) NOT NULL, - device_display_name varchar(128) NOT NULL, - pushkey blob NOT NULL, + access_token BIGINT DEFAULT NULL, + profile_tag VARCHAR(32) NOT NULL, + kind VARCHAR(8) NOT NULL, + app_id VARCHAR(64) NOT NULL, + app_display_name VARCHAR(64) NOT NULL, + device_display_name VARCHAR(128) NOT NULL, + pushkey bytea NOT NULL, ts BIGINT NOT NULL, - lang varchar(8), - data blob, + lang VARCHAR(8), + data bytea, last_token TEXT, last_success BIGINT, failing_since BIGINT, - FOREIGN KEY(user_name) REFERENCES users(name), - UNIQUE (app_id, pushkey, user_name) + UNIQUE (app_id, pushkey) ); INSERT INTO pushers2 (id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since) SELECT id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since FROM pushers; -- cgit 1.5.1 From d57c5cda71efe58ddca4fd2ed05848f55cc54d59 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 4 Dec 2015 15:28:39 +0000 Subject: Bump schema version. As we released version 26 in v0.11.1 --- synapse/storage/prepare_database.py | 2 +- synapse/storage/schema/delta/26/account_data.sql | 23 --------- .../schema/delta/26/forgotten_memberships.sql | 26 ---------- synapse/storage/schema/delta/26/ts.py | 57 ---------------------- synapse/storage/schema/delta/27/account_data.sql | 36 ++++++++++++++ .../schema/delta/27/forgotten_memberships.sql | 26 ++++++++++ synapse/storage/schema/delta/27/ts.py | 57 ++++++++++++++++++++++ 7 files changed, 120 insertions(+), 107 deletions(-) delete mode 100644 synapse/storage/schema/delta/26/forgotten_memberships.sql delete mode 100644 synapse/storage/schema/delta/26/ts.py create mode 100644 synapse/storage/schema/delta/27/account_data.sql create mode 100644 synapse/storage/schema/delta/27/forgotten_memberships.sql create mode 100644 synapse/storage/schema/delta/27/ts.py (limited to 'synapse/storage') diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 9800fd4203..16eff62544 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 26 +SCHEMA_VERSION = 27 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/26/account_data.sql b/synapse/storage/schema/delta/26/account_data.sql index 48ad9cc6b8..3198a0d29c 100644 --- a/synapse/storage/schema/delta/26/account_data.sql +++ b/synapse/storage/schema/delta/26/account_data.sql @@ -15,26 +15,3 @@ ALTER TABLE private_user_data_max_stream_id RENAME TO account_data_max_stream_id; - - -CREATE TABLE IF NOT EXISTS account_data( - user_id TEXT NOT NULL, - account_data_type TEXT NOT NULL, -- The type of the account_data. - stream_id BIGINT NOT NULL, -- The version of the account_data. - content TEXT NOT NULL, -- The JSON content of the account_data - CONSTRAINT account_data_uniqueness UNIQUE (user_id, account_data_type) -); - - -CREATE TABLE IF NOT EXISTS room_account_data( - user_id TEXT NOT NULL, - room_id TEXT NOT NULL, - account_data_type TEXT NOT NULL, -- The type of the account_data. - stream_id BIGINT NOT NULL, -- The version of the account_data. - content TEXT NOT NULL, -- The JSON content of the account_data - CONSTRAINT room_account_data_uniqueness UNIQUE (user_id, room_id, account_data_type) -); - - -CREATE INDEX account_data_stream_id on account_data(user_id, stream_id); -CREATE INDEX room_account_data_stream_id on room_account_data(user_id, stream_id); diff --git a/synapse/storage/schema/delta/26/forgotten_memberships.sql b/synapse/storage/schema/delta/26/forgotten_memberships.sql deleted file mode 100644 index beeb8a288b..0000000000 --- a/synapse/storage/schema/delta/26/forgotten_memberships.sql +++ /dev/null @@ -1,26 +0,0 @@ -/* Copyright 2015 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Keeps track of what rooms users have left and don't want to be able to - * access again. - * - * If all users on this server have left a room, we can delete the room - * entirely. - * - * This column should always contain either 0 or 1. - */ - - ALTER TABLE room_memberships ADD COLUMN forgotten INTEGER DEFAULT 0; diff --git a/synapse/storage/schema/delta/26/ts.py b/synapse/storage/schema/delta/26/ts.py deleted file mode 100644 index 8d4a981975..0000000000 --- a/synapse/storage/schema/delta/26/ts.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging - -from synapse.storage.prepare_database import get_statements - -import ujson - -logger = logging.getLogger(__name__) - - -ALTER_TABLE = ( - "ALTER TABLE events ADD COLUMN origin_server_ts BIGINT;" - "CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);" -) - - -def run_upgrade(cur, database_engine, *args, **kwargs): - for statement in get_statements(ALTER_TABLE.splitlines()): - cur.execute(statement) - - cur.execute("SELECT MIN(stream_ordering) FROM events") - rows = cur.fetchall() - min_stream_id = rows[0][0] - - cur.execute("SELECT MAX(stream_ordering) FROM events") - rows = cur.fetchall() - max_stream_id = rows[0][0] - - if min_stream_id is not None and max_stream_id is not None: - progress = { - "target_min_stream_id_inclusive": min_stream_id, - "max_stream_id_exclusive": max_stream_id + 1, - "rows_inserted": 0, - } - progress_json = ujson.dumps(progress) - - sql = ( - "INSERT into background_updates (update_name, progress_json)" - " VALUES (?, ?)" - ) - - sql = database_engine.convert_param_style(sql) - - cur.execute(sql, ("event_origin_server_ts", progress_json)) diff --git a/synapse/storage/schema/delta/27/account_data.sql b/synapse/storage/schema/delta/27/account_data.sql new file mode 100644 index 0000000000..9f25416005 --- /dev/null +++ b/synapse/storage/schema/delta/27/account_data.sql @@ -0,0 +1,36 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS account_data( + user_id TEXT NOT NULL, + account_data_type TEXT NOT NULL, -- The type of the account_data. + stream_id BIGINT NOT NULL, -- The version of the account_data. + content TEXT NOT NULL, -- The JSON content of the account_data + CONSTRAINT account_data_uniqueness UNIQUE (user_id, account_data_type) +); + + +CREATE TABLE IF NOT EXISTS room_account_data( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + account_data_type TEXT NOT NULL, -- The type of the account_data. + stream_id BIGINT NOT NULL, -- The version of the account_data. + content TEXT NOT NULL, -- The JSON content of the account_data + CONSTRAINT room_account_data_uniqueness UNIQUE (user_id, room_id, account_data_type) +); + + +CREATE INDEX account_data_stream_id on account_data(user_id, stream_id); +CREATE INDEX room_account_data_stream_id on room_account_data(user_id, stream_id); diff --git a/synapse/storage/schema/delta/27/forgotten_memberships.sql b/synapse/storage/schema/delta/27/forgotten_memberships.sql new file mode 100644 index 0000000000..beeb8a288b --- /dev/null +++ b/synapse/storage/schema/delta/27/forgotten_memberships.sql @@ -0,0 +1,26 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Keeps track of what rooms users have left and don't want to be able to + * access again. + * + * If all users on this server have left a room, we can delete the room + * entirely. + * + * This column should always contain either 0 or 1. + */ + + ALTER TABLE room_memberships ADD COLUMN forgotten INTEGER DEFAULT 0; diff --git a/synapse/storage/schema/delta/27/ts.py b/synapse/storage/schema/delta/27/ts.py new file mode 100644 index 0000000000..8d4a981975 --- /dev/null +++ b/synapse/storage/schema/delta/27/ts.py @@ -0,0 +1,57 @@ +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from synapse.storage.prepare_database import get_statements + +import ujson + +logger = logging.getLogger(__name__) + + +ALTER_TABLE = ( + "ALTER TABLE events ADD COLUMN origin_server_ts BIGINT;" + "CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);" +) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + for statement in get_statements(ALTER_TABLE.splitlines()): + cur.execute(statement) + + cur.execute("SELECT MIN(stream_ordering) FROM events") + rows = cur.fetchall() + min_stream_id = rows[0][0] + + cur.execute("SELECT MAX(stream_ordering) FROM events") + rows = cur.fetchall() + max_stream_id = rows[0][0] + + if min_stream_id is not None and max_stream_id is not None: + progress = { + "target_min_stream_id_inclusive": min_stream_id, + "max_stream_id_exclusive": max_stream_id + 1, + "rows_inserted": 0, + } + progress_json = ujson.dumps(progress) + + sql = ( + "INSERT into background_updates (update_name, progress_json)" + " VALUES (?, ?)" + ) + + sql = database_engine.convert_param_style(sql) + + cur.execute(sql, ("event_origin_server_ts", progress_json)) -- cgit 1.5.1 From 6a5ff5f223c1b4311aa63574663c0335d0c6bd79 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 7 Dec 2015 17:56:11 +0000 Subject: Track the time spent in the database per request. and track the number of transactions that request started. --- synapse/app/homeserver.py | 7 ++++++- synapse/http/server.py | 15 +++++++++++++++ synapse/storage/_base.py | 9 +++++++-- synapse/storage/events.py | 2 +- synapse/util/logcontext.py | 9 +++++++++ 5 files changed, 38 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 58c679bbfd..56bc52e9ca 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -503,12 +503,15 @@ class SynapseRequest(Request): try: context = LoggingContext.current_context() ru_utime, ru_stime = context.get_resource_usage() + db_txn_count = context.db_txn_count + db_txn_duration = context.db_txn_duration except: ru_utime, ru_stime = (0, 0) + db_txn_count, db_txn_duration = (0, 0) self.site.access_logger.info( "%s - %s - {%s}" - " Processed request: %dms (%dms, %dms)" + " Processed request: %dms (%dms, %dms) (%dms/%d)" " %sB %s \"%s %s %s\" \"%s\"", self.getClientIP(), self.site.site_tag, @@ -516,6 +519,8 @@ class SynapseRequest(Request): int(time.time() * 1000) - self.start_time, int(ru_utime * 1000), int(ru_stime * 1000), + int(db_txn_duration * 1000), + int(db_txn_count), self.sentLength, self.code, self.method, diff --git a/synapse/http/server.py b/synapse/http/server.py index 06fb53707b..c44bdfc888 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -61,6 +61,15 @@ response_ru_stime = metrics.register_distribution( "response_ru_stime", labels=["method", "servlet"] ) +response_db_txn_count = metrics.register_distribution( + "response_db_txn_count", labels=["method", "servlet"] +) + +response_db_txn_duration = metrics.register_distribution( + "response_db_txn_duration", labels=["method", "servlet"] +) + + _next_request_id = 0 @@ -235,6 +244,12 @@ class JsonResource(HttpServer, resource.Resource): response_ru_utime.inc_by(ru_utime, request.method, servlet_classname) response_ru_stime.inc_by(ru_stime, request.method, servlet_classname) + response_db_txn_count.inc_by( + context.db_txn_count, request.method, servlet_classname + ) + response_db_txn_duration.inc_by( + context.db_txn_duration, request.method, servlet_classname + ) except: pass diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 218e708054..17a14e001c 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -214,7 +214,8 @@ class SQLBaseStore(object): self._clock.looping_call(loop, 10000) - def _new_transaction(self, conn, desc, after_callbacks, func, *args, **kwargs): + def _new_transaction(self, conn, desc, after_callbacks, logging_context, + func, *args, **kwargs): start = time.time() * 1000 txn_id = self._TXN_ID @@ -277,6 +278,9 @@ class SQLBaseStore(object): end = time.time() * 1000 duration = end - start + if logging_context is not None: + logging_context.add_database_transaction(duration) + transaction_logger.debug("[TXN END] {%s} %f", name, duration) self._current_txn_total_time += duration @@ -302,7 +306,8 @@ class SQLBaseStore(object): current_context.copy_to(context) return self._new_transaction( - conn, desc, after_callbacks, func, *args, **kwargs + conn, desc, after_callbacks, current_context, + func, *args, **kwargs ) result = yield preserve_context_over_fn( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7088f2709b..fc5725097c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -649,7 +649,7 @@ class EventsStore(SQLBaseStore): ] rows = self._new_transaction( - conn, "do_fetch", [], self._fetch_event_rows, event_ids + conn, "do_fetch", [], None, self._fetch_event_rows, event_ids ) row_dict = { diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index c20c89aa8f..d528ced55a 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -69,6 +69,9 @@ class LoggingContext(object): def stop(self): pass + def add_database_transaction(self, duration_ms): + pass + sentinel = Sentinel() def __init__(self, name=None): @@ -76,6 +79,8 @@ class LoggingContext(object): self.name = name self.ru_stime = 0. self.ru_utime = 0. + self.db_txn_count = 0 + self.db_txn_duration = 0. self.usage_start = None self.main_thread = threading.current_thread() @@ -171,6 +176,10 @@ class LoggingContext(object): return ru_utime, ru_stime + def add_database_transaction(self, duration_ms): + self.db_txn_count += 1 + self.db_txn_duration += duration_ms / 1000. + class LoggingContextFilter(logging.Filter): """Logging filter that adds values from the current logging context to each -- cgit 1.5.1 From 7d6b3133125aef802dad36d120ad23d5e33948bf Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 10 Dec 2015 17:49:34 +0000 Subject: Add caches for whether a room has been forgotten by a user --- synapse/handlers/room.py | 2 +- synapse/storage/roommember.py | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 116a998c42..a72c3fda9f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -755,7 +755,7 @@ class RoomMemberHandler(BaseHandler): defer.returnValue((token, public_key, key_validity_url, display_name)) def forget(self, user, room_id): - self.store.forget(user.to_string(), room_id) + return self.store.forget(user.to_string(), room_id) class RoomListHandler(BaseHandler): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 69398b7c8e..e1777d7afa 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -18,7 +18,7 @@ from twisted.internet import defer from collections import namedtuple from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.api.constants import Membership from synapse.types import UserID @@ -270,6 +270,7 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(ret) + @defer.inlineCallbacks def forget(self, user_id, room_id): """Indicate that user_id wishes to discard history for room_id.""" def f(txn): @@ -284,9 +285,11 @@ class RoomMemberStore(SQLBaseStore): " room_id = ?" ) txn.execute(sql, (user_id, room_id)) - self.runInteraction("forget_membership", f) + yield self.runInteraction("forget_membership", f) + self.was_forgotten_at.invalidate_all() + self.did_forget.invalidate((user_id, room_id)) - @defer.inlineCallbacks + @cachedInlineCallbacks(num_args=2) def did_forget(self, user_id, room_id): """Returns whether user_id has elected to discard history for room_id. @@ -310,7 +313,7 @@ class RoomMemberStore(SQLBaseStore): count = yield self.runInteraction("did_forget_membership", f) defer.returnValue(count == 0) - @defer.inlineCallbacks + @cachedInlineCallbacks(num_args=3) def was_forgotten_at(self, user_id, room_id, event_id): """Returns whether user_id has elected to discard history for room_id at event_id. -- cgit 1.5.1 From 51fb590c0e787c385bea1d595fa8bceea23c26e5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 Dec 2015 11:12:57 +0000 Subject: Use more efficient query form --- synapse/storage/search.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 39f600f53c..c39d54a7ca 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -143,7 +143,7 @@ class SearchStore(BackgroundUpdateStore): search_query = search_query = _parse_query(self.database_engine, search_term) - args = [search_query] + args = [] # Make sure we don't explode because the person is in too many rooms. # We filter the results below regardless. @@ -164,16 +164,19 @@ class SearchStore(BackgroundUpdateStore): if isinstance(self.database_engine, PostgresEngine): sql = ( - "SELECT ts_rank_cd(vector, query) AS rank, room_id, event_id" - " FROM to_tsquery('english', ?) as query, event_search" - " WHERE vector @@ query" + "SELECT ts_rank_cd(vector, to_tsquery('english', ?)) AS rank," + " room_id, event_id" + " FROM event_search" + " WHERE vector @@ to_tsquery('english', ?)" ) + args = [search_query, search_query] + args elif isinstance(self.database_engine, Sqlite3Engine): sql = ( "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id" " FROM event_search" " WHERE value MATCH ?" ) + args = [search_query] + args else: # This should be unreachable. raise Exception("Unrecognized database engine") @@ -232,7 +235,7 @@ class SearchStore(BackgroundUpdateStore): search_query = search_query = _parse_query(self.database_engine, search_term) - args = [search_query] + args = [] # Make sure we don't explode because the person is in too many rooms. # We filter the results below regardless. @@ -267,12 +270,13 @@ class SearchStore(BackgroundUpdateStore): if isinstance(self.database_engine, PostgresEngine): sql = ( - "SELECT ts_rank_cd(vector, query) as rank," + "SELECT ts_rank_cd(vector, to_tsquery('english', ?)) as rank," " origin_server_ts, stream_ordering, room_id, event_id" - " FROM to_tsquery('english', ?) as query, event_search" + " FROM event_search" " NATURAL JOIN events" - " WHERE vector @@ query AND " + " WHERE vector @@ to_tsquery('english', ?) AND " ) + args = [search_term, search_term] + args elif isinstance(self.database_engine, Sqlite3Engine): # We use CROSS JOIN here to ensure we use the right indexes. # https://sqlite.org/optoverview.html#crossjoin @@ -292,6 +296,7 @@ class SearchStore(BackgroundUpdateStore): " CROSS JOIN events USING (event_id)" " WHERE " ) + args = [search_term] + args else: # This should be unreachable. raise Exception("Unrecognized database engine") -- cgit 1.5.1 From d9a5c56930c22b02268f5deca4df84eba345ec2c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 Dec 2015 11:40:23 +0000 Subject: Include approximate count of search results --- synapse/handlers/search.py | 8 ++++++- synapse/storage/search.py | 56 ++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 61 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index bc79564287..99ef56871c 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -152,11 +152,15 @@ class SearchHandler(BaseHandler): highlights = set() + count = None + if order_by == "rank": search_result = yield self.store.search_msgs( room_ids, search_term, keys ) + count = search_result["count"] + if search_result["highlights"]: highlights.update(search_result["highlights"]) @@ -207,6 +211,8 @@ class SearchHandler(BaseHandler): if search_result["highlights"]: highlights.update(search_result["highlights"]) + count = search_result["count"] + results = search_result["results"] results_map = {r["event"].event_id: r for r in results} @@ -359,7 +365,7 @@ class SearchHandler(BaseHandler): rooms_cat_res = { "results": results, - "count": len(results), + "count": count, "highlights": list(highlights), } diff --git a/synapse/storage/search.py b/synapse/storage/search.py index c39d54a7ca..efd87d99bb 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -162,6 +162,9 @@ class SearchStore(BackgroundUpdateStore): "(%s)" % (" OR ".join(local_clauses),) ) + count_args = args + count_clauses = clauses + if isinstance(self.database_engine, PostgresEngine): sql = ( "SELECT ts_rank_cd(vector, to_tsquery('english', ?)) AS rank," @@ -170,6 +173,12 @@ class SearchStore(BackgroundUpdateStore): " WHERE vector @@ to_tsquery('english', ?)" ) args = [search_query, search_query] + args + + count_sql = ( + "SELECT room_id, count(*) as count FROM event_search" + " WHERE vector @@ to_tsquery('english', ?)" + ) + count_args = [search_query] + count_args elif isinstance(self.database_engine, Sqlite3Engine): sql = ( "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id" @@ -177,6 +186,12 @@ class SearchStore(BackgroundUpdateStore): " WHERE value MATCH ?" ) args = [search_query] + args + + count_sql = ( + "SELECT room_id, count(*) as count FROM event_search" + " WHERE value MATCH ? AND " + ) + count_args = [search_term] + count_args else: # This should be unreachable. raise Exception("Unrecognized database engine") @@ -184,6 +199,9 @@ class SearchStore(BackgroundUpdateStore): for clause in clauses: sql += " AND " + clause + for clause in count_clauses: + count_sql += " AND " + clause + # We add an arbitrary limit here to ensure we don't try to pull the # entire table from the database. sql += " ORDER BY rank DESC LIMIT 500" @@ -205,6 +223,14 @@ class SearchStore(BackgroundUpdateStore): if isinstance(self.database_engine, PostgresEngine): highlights = yield self._find_highlights_in_postgres(search_query, events) + count_sql += " GROUP BY room_id" + + count_results = yield self._execute( + "search_rooms_count", self.cursor_to_dict, count_sql, *count_args + ) + + count = sum(row["count"] for row in count_results if row["room_id"] in room_ids) + defer.returnValue({ "results": [ { @@ -215,6 +241,7 @@ class SearchStore(BackgroundUpdateStore): if r["event_id"] in event_map ], "highlights": highlights, + "count": count, }) @defer.inlineCallbacks @@ -254,6 +281,9 @@ class SearchStore(BackgroundUpdateStore): "(%s)" % (" OR ".join(local_clauses),) ) + count_args = args + count_clauses = clauses + if pagination_token: try: origin_server_ts, stream = pagination_token.split(",") @@ -276,7 +306,13 @@ class SearchStore(BackgroundUpdateStore): " NATURAL JOIN events" " WHERE vector @@ to_tsquery('english', ?) AND " ) - args = [search_term, search_term] + args + args = [search_query, search_query] + args + + count_sql = ( + "SELECT room_id, count(*) as count FROM event_search" + " WHERE vector @@ to_tsquery('english', ?) AND " + ) + count_args = [search_query] + count_args elif isinstance(self.database_engine, Sqlite3Engine): # We use CROSS JOIN here to ensure we use the right indexes. # https://sqlite.org/optoverview.html#crossjoin @@ -296,12 +332,19 @@ class SearchStore(BackgroundUpdateStore): " CROSS JOIN events USING (event_id)" " WHERE " ) - args = [search_term] + args + args = [search_query] + args + + count_sql = ( + "SELECT room_id, count(*) as count FROM event_search" + " WHERE value MATCH ? AND " + ) + count_args = [search_term] + count_args else: # This should be unreachable. raise Exception("Unrecognized database engine") sql += " AND ".join(clauses) + count_sql += " AND ".join(count_clauses) # We add an arbitrary limit here to ensure we don't try to pull the # entire table from the database. @@ -326,6 +369,14 @@ class SearchStore(BackgroundUpdateStore): if isinstance(self.database_engine, PostgresEngine): highlights = yield self._find_highlights_in_postgres(search_query, events) + count_sql += " GROUP BY room_id" + + count_results = yield self._execute( + "search_rooms_count", self.cursor_to_dict, count_sql, *count_args + ) + + count = sum(row["count"] for row in count_results if row["room_id"] in room_ids) + defer.returnValue({ "results": [ { @@ -339,6 +390,7 @@ class SearchStore(BackgroundUpdateStore): if r["event_id"] in event_map ], "highlights": highlights, + "count": count, }) def _find_highlights_in_postgres(self, search_query, events): -- cgit 1.5.1 From bfc52a2342999a7887dcc5ba653b67454c0fc2c8 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 14 Dec 2015 11:38:11 +0000 Subject: Fix typo in sql for full text search on sqlite3 --- synapse/storage/search.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index efd87d99bb..00f89ff02f 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -189,7 +189,7 @@ class SearchStore(BackgroundUpdateStore): count_sql = ( "SELECT room_id, count(*) as count FROM event_search" - " WHERE value MATCH ? AND " + " WHERE value MATCH ?" ) count_args = [search_term] + count_args else: -- cgit 1.5.1 From 76e69cc8de186c42be5763be0492d074319060cc Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Mon, 14 Dec 2015 12:38:55 +0000 Subject: Fix typo --- synapse/storage/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index e1777d7afa..4e0e9ab59a 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -121,7 +121,7 @@ class RoomMemberStore(SQLBaseStore): return self.get_rooms_for_user_where_membership_is( user_id, [Membership.INVITE] ).addCallback(lambda invites: self._get_events([ - invites.event_id for invite in invites + invite.event_id for invite in invites ])) def get_leave_and_ban_events_for_user(self, user_id): -- cgit 1.5.1 From 98dfa7d24f91ff083b36f1379ce2426c8e6cdb75 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 14 Dec 2015 13:55:46 +0000 Subject: Skip events that where the body, name or topic isn't a string when back populating the FTS index --- synapse/storage/search.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 39f600f53c..04246101df 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -85,6 +85,11 @@ class SearchStore(BackgroundUpdateStore): # skip over it. continue + if not isinstance(value, basestring): + # If the event body, name or topic isn't a string + # then skip over it + continue + event_search_rows.append((event_id, room_id, key, value)) if isinstance(self.database_engine, PostgresEngine): -- cgit 1.5.1 From dcfc70e8ed263256b2a3cf59e7d21e54f39fc287 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 15 Dec 2015 17:02:21 +0000 Subject: Allow users to change which account a 3pid is bound to --- synapse/storage/registration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 2e5eddd259..09a05b08ef 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -258,10 +258,10 @@ class RegistrationStore(SQLBaseStore): @defer.inlineCallbacks def user_add_threepid(self, user_id, medium, address, validated_at, added_at): yield self._simple_upsert("user_threepids", { - "user_id": user_id, "medium": medium, "address": address, }, { + "user_id": user_id, "validated_at": validated_at, "added_at": added_at, }) -- cgit 1.5.1 From a64f9bbfe0fc592043a3da8979b7f2545187dbb6 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 17 Dec 2015 12:47:26 +0000 Subject: Fix 500 error when back-paginating search results We were mistakenly adding pagination clauses to the count query, which then failed because the count query doesn't join to the events table. --- synapse/storage/search.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 57c9cc1c5f..6cb5e73b6e 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -286,8 +286,10 @@ class SearchStore(BackgroundUpdateStore): "(%s)" % (" OR ".join(local_clauses),) ) - count_args = args - count_clauses = clauses + # take copies of the current args and clauses lists, before adding + # pagination clauses to main query. + count_args = list(args) + count_clauses = list(clauses) if pagination_token: try: -- cgit 1.5.1