diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 2 | ||||
-rw-r--r-- | synapse/storage/account_data.py | 211 | ||||
-rw-r--r-- | synapse/storage/events.py | 77 | ||||
-rw-r--r-- | synapse/storage/schema/delta/26/account_data.sql | 23 | ||||
-rw-r--r-- | synapse/storage/schema/delta/26/ts.py | 57 | ||||
-rw-r--r-- | synapse/storage/search.py | 41 | ||||
-rw-r--r-- | synapse/storage/tags.py | 4 |
7 files changed, 397 insertions, 18 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index e7443f2838..c46b653f11 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -42,6 +42,7 @@ from .end_to_end_keys import EndToEndKeyStore from .receipts import ReceiptsStore from .search import SearchStore from .tags import TagsStore +from .account_data import AccountDataStore import logging @@ -73,6 +74,7 @@ class DataStore(RoomMemberStore, RoomStore, EndToEndKeyStore, SearchStore, TagsStore, + AccountDataStore, ): def __init__(self, hs): diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py new file mode 100644 index 0000000000..d1829f84e8 --- /dev/null +++ b/synapse/storage/account_data.py @@ -0,0 +1,211 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore +from twisted.internet import defer + +import ujson as json +import logging + +logger = logging.getLogger(__name__) + + +class AccountDataStore(SQLBaseStore): + + def get_account_data_for_user(self, user_id): + """Get all the client account_data for a user. + + Args: + user_id(str): The user to get the account_data for. + Returns: + A deferred pair of a dict of global account_data and a dict + mapping from room_id string to per room account_data dicts. + """ + + def get_account_data_for_user_txn(txn): + rows = self._simple_select_list_txn( + txn, "account_data", {"user_id": user_id}, + ["account_data_type", "content"] + ) + + global_account_data = { + row["account_data_type"]: json.loads(row["content"]) for row in rows + } + + rows = self._simple_select_list_txn( + txn, "room_account_data", {"user_id": user_id}, + ["room_id", "account_data_type", "content"] + ) + + by_room = {} + for row in rows: + room_data = by_room.setdefault(row["room_id"], {}) + room_data[row["account_data_type"]] = json.loads(row["content"]) + + return (global_account_data, by_room) + + return self.runInteraction( + "get_account_data_for_user", get_account_data_for_user_txn + ) + + def get_account_data_for_room(self, user_id, room_id): + """Get all the client account_data for a user for a room. + + Args: + user_id(str): The user to get the account_data for. + room_id(str): The room to get the account_data for. + Returns: + A deferred dict of the room account_data + """ + def get_account_data_for_room_txn(txn): + rows = self._simple_select_list_txn( + txn, "room_account_data", {"user_id": user_id, "room_id": room_id}, + ["account_data_type", "content"] + ) + + return { + row["account_data_type"]: json.loads(row["content"]) for row in rows + } + + return self.runInteraction( + "get_account_data_for_room", get_account_data_for_room_txn + ) + + def get_updated_account_data_for_user(self, user_id, stream_id): + """Get all the client account_data for a that's changed. + + Args: + user_id(str): The user to get the account_data for. + stream_id(int): The point in the stream since which to get updates + Returns: + A deferred pair of a dict of global account_data and a dict + mapping from room_id string to per room account_data dicts. + """ + + def get_updated_account_data_for_user_txn(txn): + sql = ( + "SELECT account_data_type, content FROM account_data" + " WHERE user_id = ? AND stream_id > ?" + ) + + txn.execute(sql, (user_id, stream_id)) + + global_account_data = { + row[0]: json.loads(row[1]) for row in txn.fetchall() + } + + sql = ( + "SELECT room_id, account_data_type, content FROM room_account_data" + " WHERE user_id = ? AND stream_id > ?" + ) + + txn.execute(sql, (user_id, stream_id)) + + account_data_by_room = {} + for row in txn.fetchall(): + room_account_data = account_data_by_room.setdefault(row[0], {}) + room_account_data[row[1]] = json.loads(row[2]) + + return (global_account_data, account_data_by_room) + + return self.runInteraction( + "get_updated_account_data_for_user", get_updated_account_data_for_user_txn + ) + + @defer.inlineCallbacks + def add_account_data_to_room(self, user_id, room_id, account_data_type, content): + """Add some account_data to a room for a user. + Args: + user_id(str): The user to add a tag for. + room_id(str): The room to add a tag for. + account_data_type(str): The type of account_data to add. + content(dict): A json object to associate with the tag. + Returns: + A deferred that completes once the account_data has been added. + """ + content_json = json.dumps(content) + + def add_account_data_txn(txn, next_id): + self._simple_upsert_txn( + txn, + table="room_account_data", + keyvalues={ + "user_id": user_id, + "room_id": room_id, + "account_data_type": account_data_type, + }, + values={ + "stream_id": next_id, + "content": content_json, + } + ) + self._update_max_stream_id(txn, next_id) + + with (yield self._account_data_id_gen.get_next(self)) as next_id: + yield self.runInteraction( + "add_room_account_data", add_account_data_txn, next_id + ) + + result = yield self._account_data_id_gen.get_max_token(self) + defer.returnValue(result) + + @defer.inlineCallbacks + def add_account_data_for_user(self, user_id, account_data_type, content): + """Add some account_data to a room for a user. + Args: + user_id(str): The user to add a tag for. + account_data_type(str): The type of account_data to add. + content(dict): A json object to associate with the tag. + Returns: + A deferred that completes once the account_data has been added. + """ + content_json = json.dumps(content) + + def add_account_data_txn(txn, next_id): + self._simple_upsert_txn( + txn, + table="account_data", + keyvalues={ + "user_id": user_id, + "account_data_type": account_data_type, + }, + values={ + "stream_id": next_id, + "content": content_json, + } + ) + self._update_max_stream_id(txn, next_id) + + with (yield self._account_data_id_gen.get_next(self)) as next_id: + yield self.runInteraction( + "add_user_account_data", add_account_data_txn, next_id + ) + + result = yield self._account_data_id_gen.get_max_token(self) + defer.returnValue(result) + + def _update_max_stream_id(self, txn, next_id): + """Update the max stream_id + + Args: + txn: The database cursor + next_id(int): The the revision to advance to. + """ + update_max_id_sql = ( + "UPDATE account_data_max_stream_id" + " SET stream_id = ?" + " WHERE stream_id < ?" + ) + txn.execute(update_max_id_sql, (next_id, next_id)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5d35ca90b9..7088f2709b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -51,6 +51,14 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events class EventsStore(SQLBaseStore): + EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" + + def __init__(self, hs): + super(EventsStore, self).__init__(hs) + self.register_background_update_handler( + self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts + ) + @defer.inlineCallbacks def persist_events(self, events_and_contexts, backfilled=False, is_new_state=True): @@ -365,6 +373,7 @@ class EventsStore(SQLBaseStore): "processed": True, "outlier": event.internal_metadata.is_outlier(), "content": encode_json(event.content).decode("UTF-8"), + "origin_server_ts": int(event.origin_server_ts), } for event, _ in events_and_contexts ], @@ -964,3 +973,71 @@ class EventsStore(SQLBaseStore): ret = yield self.runInteraction("count_messages", _count_messages) defer.returnValue(ret) + + @defer.inlineCallbacks + def _background_reindex_origin_server_ts(self, progress, batch_size): + target_min_stream_id = progress["target_min_stream_id_inclusive"] + max_stream_id = progress["max_stream_id_exclusive"] + rows_inserted = progress.get("rows_inserted", 0) + + INSERT_CLUMP_SIZE = 1000 + + def reindex_search_txn(txn): + sql = ( + "SELECT stream_ordering, event_id FROM events" + " WHERE ? <= stream_ordering AND stream_ordering < ?" + " ORDER BY stream_ordering DESC" + " LIMIT ?" + ) + + txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + + rows = txn.fetchall() + if not rows: + return 0 + + min_stream_id = rows[-1][0] + event_ids = [row[1] for row in rows] + + events = self._get_events_txn(txn, event_ids) + + rows = [] + for event in events: + try: + event_id = event.event_id + origin_server_ts = event.origin_server_ts + except (KeyError, AttributeError): + # If the event is missing a necessary field then + # skip over it. + continue + + rows.append((origin_server_ts, event_id)) + + sql = ( + "UPDATE events SET origin_server_ts = ? WHERE event_id = ?" + ) + + for index in range(0, len(rows), INSERT_CLUMP_SIZE): + clump = rows[index:index + INSERT_CLUMP_SIZE] + txn.executemany(sql, clump) + + progress = { + "target_min_stream_id_inclusive": target_min_stream_id, + "max_stream_id_exclusive": min_stream_id, + "rows_inserted": rows_inserted + len(rows) + } + + self._background_update_progress_txn( + txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress + ) + + return len(rows) + + result = yield self.runInteraction( + self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn + ) + + if not result: + yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME) + + defer.returnValue(result) diff --git a/synapse/storage/schema/delta/26/account_data.sql b/synapse/storage/schema/delta/26/account_data.sql index 3198a0d29c..48ad9cc6b8 100644 --- a/synapse/storage/schema/delta/26/account_data.sql +++ b/synapse/storage/schema/delta/26/account_data.sql @@ -15,3 +15,26 @@ ALTER TABLE private_user_data_max_stream_id RENAME TO account_data_max_stream_id; + + +CREATE TABLE IF NOT EXISTS account_data( + user_id TEXT NOT NULL, + account_data_type TEXT NOT NULL, -- The type of the account_data. + stream_id BIGINT NOT NULL, -- The version of the account_data. + content TEXT NOT NULL, -- The JSON content of the account_data + CONSTRAINT account_data_uniqueness UNIQUE (user_id, account_data_type) +); + + +CREATE TABLE IF NOT EXISTS room_account_data( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + account_data_type TEXT NOT NULL, -- The type of the account_data. + stream_id BIGINT NOT NULL, -- The version of the account_data. + content TEXT NOT NULL, -- The JSON content of the account_data + CONSTRAINT room_account_data_uniqueness UNIQUE (user_id, room_id, account_data_type) +); + + +CREATE INDEX account_data_stream_id on account_data(user_id, stream_id); +CREATE INDEX room_account_data_stream_id on room_account_data(user_id, stream_id); diff --git a/synapse/storage/schema/delta/26/ts.py b/synapse/storage/schema/delta/26/ts.py new file mode 100644 index 0000000000..8d4a981975 --- /dev/null +++ b/synapse/storage/schema/delta/26/ts.py @@ -0,0 +1,57 @@ +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from synapse.storage.prepare_database import get_statements + +import ujson + +logger = logging.getLogger(__name__) + + +ALTER_TABLE = ( + "ALTER TABLE events ADD COLUMN origin_server_ts BIGINT;" + "CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);" +) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + for statement in get_statements(ALTER_TABLE.splitlines()): + cur.execute(statement) + + cur.execute("SELECT MIN(stream_ordering) FROM events") + rows = cur.fetchall() + min_stream_id = rows[0][0] + + cur.execute("SELECT MAX(stream_ordering) FROM events") + rows = cur.fetchall() + max_stream_id = rows[0][0] + + if min_stream_id is not None and max_stream_id is not None: + progress = { + "target_min_stream_id_inclusive": min_stream_id, + "max_stream_id_exclusive": max_stream_id + 1, + "rows_inserted": 0, + } + progress_json = ujson.dumps(progress) + + sql = ( + "INSERT into background_updates (update_name, progress_json)" + " VALUES (?, ?)" + ) + + sql = database_engine.convert_param_style(sql) + + cur.execute(sql, ("event_origin_server_ts", progress_json)) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index c6386642df..20a62d07ff 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -212,11 +212,11 @@ class SearchStore(BackgroundUpdateStore): }) @defer.inlineCallbacks - def search_room(self, room_id, search_term, keys, limit, pagination_token=None): + def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None): """Performs a full text search over events with given keys. Args: - room_id (str): The room_id to search in + room_id (list): The room_ids to search in search_term (str): Search term to search for keys (list): List of keys to search in, currently supports "content.body", "content.name", "content.topic" @@ -226,7 +226,15 @@ class SearchStore(BackgroundUpdateStore): list of dicts """ clauses = [] - args = [search_term, room_id] + args = [search_term] + + # Make sure we don't explode because the person is in too many rooms. + # We filter the results below regardless. + if len(room_ids) < 500: + clauses.append( + "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),) + ) + args.extend(room_ids) local_clauses = [] for key in keys: @@ -239,25 +247,25 @@ class SearchStore(BackgroundUpdateStore): if pagination_token: try: - topo, stream = pagination_token.split(",") - topo = int(topo) + origin_server_ts, stream = pagination_token.split(",") + origin_server_ts = int(origin_server_ts) stream = int(stream) except: raise SynapseError(400, "Invalid pagination token") clauses.append( - "(topological_ordering < ?" - " OR (topological_ordering = ? AND stream_ordering < ?))" + "(origin_server_ts < ?" + " OR (origin_server_ts = ? AND stream_ordering < ?))" ) - args.extend([topo, topo, stream]) + args.extend([origin_server_ts, origin_server_ts, stream]) if isinstance(self.database_engine, PostgresEngine): sql = ( "SELECT ts_rank_cd(vector, query) as rank," - " topological_ordering, stream_ordering, room_id, event_id" + " origin_server_ts, stream_ordering, room_id, event_id" " FROM plainto_tsquery('english', ?) as query, event_search" " NATURAL JOIN events" - " WHERE vector @@ query AND room_id = ?" + " WHERE vector @@ query AND " ) elif isinstance(self.database_engine, Sqlite3Engine): # We use CROSS JOIN here to ensure we use the right indexes. @@ -270,24 +278,23 @@ class SearchStore(BackgroundUpdateStore): # MATCH unless it uses the full text search index sql = ( "SELECT rank(matchinfo) as rank, room_id, event_id," - " topological_ordering, stream_ordering" + " origin_server_ts, stream_ordering" " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo" " FROM event_search" " WHERE value MATCH ?" " )" " CROSS JOIN events USING (event_id)" - " WHERE room_id = ?" + " WHERE " ) else: # This should be unreachable. raise Exception("Unrecognized database engine") - for clause in clauses: - sql += " AND " + clause + sql += " AND ".join(clauses) # We add an arbitrary limit here to ensure we don't try to pull the # entire table from the database. - sql += " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" + sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?" args.append(limit) @@ -295,6 +302,8 @@ class SearchStore(BackgroundUpdateStore): "search_rooms", self.cursor_to_dict, sql, *args ) + results = filter(lambda row: row["room_id"] in room_ids, results) + events = yield self._get_events([r["event_id"] for r in results]) event_map = { @@ -312,7 +321,7 @@ class SearchStore(BackgroundUpdateStore): "event": event_map[r["event_id"]], "rank": r["rank"], "pagination_token": "%s,%s" % ( - r["topological_ordering"], r["stream_ordering"] + r["origin_server_ts"], r["stream_ordering"] ), } for r in results diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index f6d826cc59..f520f60c6c 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -48,8 +48,8 @@ class TagsStore(SQLBaseStore): Args: user_id(str): The user to get the tags for. Returns: - A deferred dict mapping from room_id strings to lists of tag - strings. + A deferred dict mapping from room_id strings to dicts mapping from + tag strings to tag content. """ deferred = self._simple_select_list( |