From f2c4ee41b91f1828d516df871adcfbaed46d5407 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Fri, 6 Nov 2015 14:27:49 +0000 Subject: Remove accidentally added ID column --- synapse/storage/schema/delta/25/history_visibility.sql | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/25/history_visibility.sql b/synapse/storage/schema/delta/25/history_visibility.sql index 9f387ed69f..532cb05151 100644 --- a/synapse/storage/schema/delta/25/history_visibility.sql +++ b/synapse/storage/schema/delta/25/history_visibility.sql @@ -18,7 +18,6 @@ * so that we can join on them in SELECT statements. */ CREATE TABLE IF NOT EXISTS history_visibility( - id INTEGER PRIMARY KEY, event_id TEXT NOT NULL, room_id TEXT NOT NULL, history_visibility TEXT NOT NULL, -- cgit 1.5.1 From 767c20a869933204b6e545c6f1495cd7cd298a87 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 6 Nov 2015 20:49:57 +0100 Subject: add a key existence check to tags_by_room to avoid /events 500'ing when testing against vector --- synapse/storage/tags.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 641ea250f0..73babd53d9 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -95,7 +95,8 @@ class TagsStore(SQLBaseStore): if room_ids: tags_by_room = yield self.get_tags_for_user(user_id) for room_id in room_ids: - results[room_id] = tags_by_room[room_id] + if room_id in tags_by_room: + results[room_id] = tags_by_room[room_id] defer.returnValue(results) -- cgit 1.5.1 From dd40fb68e455b9a736f1871a19119eb0391cd0d5 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 8 Nov 2015 16:04:37 +0000 Subject: fix comedy important missing comma breaking recent-ordered FTS on sqlite --- synapse/storage/search.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 3cea2011fa..91a96d2a83 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -154,7 +154,7 @@ class SearchStore(SQLBaseStore): ) elif isinstance(self.database_engine, Sqlite3Engine): sql = ( - "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id" + "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id," " topological_ordering, stream_ordering" " FROM event_search" " NATURAL JOIN events" -- cgit 1.5.1 From c4135d85e116cecbc119c1243911a0b60a6452d7 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 9 Nov 2015 14:52:18 +0000 Subject: SYN-513: Include updates for rooms that have had all their tags deleted --- synapse/handlers/sync.py | 2 +- synapse/storage/tags.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5294d96466..ff766e3af5 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -272,7 +272,7 @@ class SyncHandler(BaseHandler): def private_user_data_for_room(self, room_id, tags_by_room): private_user_data = [] tags = tags_by_room.get(room_id) - if tags: + if tags is not None: private_user_data.append({ "type": "m.tag", "content": {"tags": tags}, diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 641ea250f0..bf695b7800 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -95,7 +95,7 @@ class TagsStore(SQLBaseStore): if room_ids: tags_by_room = yield self.get_tags_for_user(user_id) for room_id in room_ids: - results[room_id] = tags_by_room[room_id] + results[room_id] = tags_by_room.get(room_id, {}) defer.returnValue(results) -- cgit 1.5.1 From c6a01f2ed0da9f0db169307e7d1649021f0c2123 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 9 Nov 2015 14:37:28 +0000 Subject: Add storage module for tracking background updates. The progress for each background update is stored as a JSON blob in the database. Each background update is broken up into separate batches. The batch size is automatically tuned to try avoid blocking single threaded databases for too long. --- synapse/storage/background_updates.py | 210 +++++++++++++++++++++ .../schema/delta/25/00background_updates.sql | 21 +++ 2 files changed, 231 insertions(+) create mode 100644 synapse/storage/background_updates.py create mode 100644 synapse/storage/schema/delta/25/00background_updates.sql (limited to 'synapse/storage') diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py new file mode 100644 index 0000000000..32a233c213 --- /dev/null +++ b/synapse/storage/background_updates.py @@ -0,0 +1,210 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore + +from twisted.internet import defer + +import ujson as json +import logging + +logger = logging.getLogger(__name__) + + +class BackgroundUpdatePerformance(object): + """Tracks the how long a background update is taking to update its items""" + + def __init__(self, name): + self.name = name + self.total_item_count = 0 + self.total_duration_ms = 0 + self.avg_item_count = 0 + self.avg_duration_ms = 0 + + def update(self, item_count, duration_ms): + """Update the stats after doing an update""" + self.total_item_count += item_count + self.total_duration_ms += duration_ms + + # Exponential moving averages for the number of items updated and + # the duration. + self.avg_item_count += 0.1 * (item_count - self.avg_item_count) + self.avg_duration_ms += 0.1 * (duration_ms - self.avg_duration_ms) + + def average_duration_ms_per_item(self): + """An estimate of how long it takes to do a single update. + Returns: + A duration in ms as a float + """ + if self.total_item_count == 0: + return None + else: + # Use the exponential moving average so that we can adapt to + # changes in how long the update process takes. + return float(self.avg_duration_ms) / float(self.avg_item_count) + + +class BackgroundUpdateStore(SQLBaseStore): + """ Background updates are updates to the database that run in the + background. Each update processes a batch of data at once. We attempt to + limit the impact of each update by monitoring how long each batch takes to + process and autotuning the batch size. + """ + + MINIMUM_BACKGROUND_BATCH_SIZE = 100 + DEFAULT_BACKGROUND_BATCH_SIZE = 100 + + def __init__(self, hs): + super(BackgroundUpdateStore, self).__init__(hs) + self._background_update_performance = {} + self._background_update_queue = [] + self._background_update_handlers = {} + + @defer.inlineCallbacks + def do_background_update(self, desired_duration_ms): + """Does some amount of work on a background update + Args: + desired_duration_ms(float): How long we want to spend + updating. + Returns: + A deferred that completes once some amount of work is done. + The deferred will have a value of None if there is currently + no more work to do. + """ + if not self._background_update_queue: + updates = yield self._simple_select_list( + "background_updates", + keyvalues=None, + retcols=("update_name",), + ) + for update in updates: + self._background_update_queue.append(update['update_name']) + + if not self._background_update_queue: + defer.returnValue(None) + + update_name = self._background_update_queue.pop(0) + self._background_update_queue.append(update_name) + + update_handler = self._background_update_handlers[update_name] + + performance = self._background_update_performance.get(update_name) + + if performance is None: + performance = BackgroundUpdatePerformance(update_name) + self._background_update_performance[update_name] = performance + + duration_ms_per_item = performance.average_duration_ms_per_item() + + if duration_ms_per_item is not None: + batch_size = int(desired_duration_ms / duration_ms_per_item) + # Clamp the batch size so that we always make progress + batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE) + else: + batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE + + progress_json = yield self._simple_select_one_onecol( + "background_updates", + keyvalues={"update_name": update_name}, + retcol="progress_json" + ) + + progress = json.loads(progress_json) + + time_start = self._clock.time_msec() + items_updated = yield update_handler(progress, batch_size) + time_stop = self._clock.time_msec() + + duration_ms = time_stop - time_start + + logger.info( + "Updating %. Updated %r items in %rms", + update_name, items_updated, duration_ms + ) + + performance.update(items_updated, duration_ms) + + defer.returnValue(len(self._background_update_performance)) + + def register_background_update_handler(self, update_name, update_handler): + """Register a handler for doing a background update. + + The handler should take two arguments: + + * A dict of the current progress + * An integer count of the number of items to update in this batch. + + The handler should return a deferred integer count of items updated. + The hander is responsible for updating the progress of the update. + + Args: + update_name(str): The name of the update that this code handles. + update_handler(function): The function that does the update. + """ + self._background_update_handlers[update_name] = update_handler + + def start_background_update(self, update_name, progress): + """Starts a background update running. + + Args: + update_name: The update to set running. + progress: The initial state of the progress of the update. + + Returns: + A deferred that completes once the task has been added to the + queue. + """ + # Clear the background update queue so that we will pick up the new + # task on the next iteration of do_background_update. + self._background_update_queue = [] + progress_json = json.dumps(progress) + + return self._simple_insert( + "background_updates", + {"update_name": update_name, "progress_json": progress_json} + ) + + def _end_background_update(self, update_name): + """Removes a completed background update task from the queue. + + Args: + update_name(str): The name of the completed task to remove + Returns: + A deferred that completes once the task is removed. + """ + self._background_update_queue = [ + name for name in self._background_update_queue if name != update_name + ] + return self._simple_delete_one( + "background_updates", keyvalues={"update_name": update_name} + ) + + def _background_update_progress_txn(self, txn, update_name, progress): + """Update the progress of a background update + + Args: + txn(cursor): The transaction. + update_name(str): The name of the background update task + progress(dict): The progress of the update. + """ + + progress_json = json.dumps(progress) + + self._simple_update_one_txn( + txn, + "background_updates", + keyvalues={"update_name": update_name}, + updatevalues={"progress_json": progress_json}, + ) diff --git a/synapse/storage/schema/delta/25/00background_updates.sql b/synapse/storage/schema/delta/25/00background_updates.sql new file mode 100644 index 0000000000..41a9b59b1b --- /dev/null +++ b/synapse/storage/schema/delta/25/00background_updates.sql @@ -0,0 +1,21 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE IF NOT EXISTS background_updates( + update_name TEXT NOT NULL, -- The name of the background update. + progress_json TEXT NOT NULL, -- The current progress of the update as JSON. + CONSTRAINT background_updates_uniqueness UNIQUE (update_name) +); -- cgit 1.5.1 From 2ede7aa8a1da20b735797cc9230da7bbeb55efc3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 9 Nov 2015 19:29:32 +0000 Subject: Add background update task for reindexing event search --- synapse/storage/search.py | 98 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 96 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 3cea2011fa..f7c269865d 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from _base import SQLBaseStore +from .background_updates import BackgroundUpdateStore from synapse.api.errors import SynapseError from synapse.storage.engines import PostgresEngine, Sqlite3Engine @@ -25,7 +25,101 @@ import logging logger = logging.getLogger(__name__) -class SearchStore(SQLBaseStore): +class SearchStore(BackgroundUpdateStore): + + EVENT_SEARCH_UPDATE_NAME = "event_search" + + + @defer.inlineCallbacks + def _background_reindex_search(self, progress, batch_size): + target_min_stream_id = progress["target_min_stream_id"] + max_stream_id = progress["max_stream_id"] + rows_inserted = progress.get("rows_inserted", 0) + + INSERT_CLUMP_SIZE = 1000 + TYPES = ["m.room.name", "m.room.message", "m.room.topic"] + + def reindex_search_txn(txn): + sql = ( + "SELECT stream_id, event_id FROM events" + " WHERE ? <= stream_ordering AND stream_ordering < ?" + " AND (%s)" + " ORDER BY stream_ordering DESC" + " LIMIT ?" + ) % (" OR ".join("type = '%s'" % TYPES),) + + txn.execute(sql, target_min_stream_id, max_stream_id, batch_size) + + rows = txn.fetch_all() + if not rows: + return None + + min_stream_id = rows[-1][0] + event_ids = [row[1] for row in rows] + + events = self._get_events_txn(txn, event_ids) + + event_search_rows = [] + for event in events: + try: + event_id = event.event_id + room_id = event.room_id + content = event.content + if event.type == "m.room.message": + key = "content.body" + value = content["body"] + elif event.type == "m.room.topic": + key = "content.topic" + value = content["topic"] + elif event.type == "m.room.name": + key = "content.name" + value = content["name"] + except Exception: + # If the event is missing a necessary field then + # skip over it. + continue + + event_search_rows.append((event_id, room_id, key, value)) + + if isinstance(self.database_engine, PostgresEngine): + sql = ( + "INSERT INTO event_search (event_id, room_id, key, vector)" + " VALUES (?,?,?,to_tsvector('english', ?))" + ) + elif isinstance(self.database_engine, Sqlite3Engine): + sql = ( + "INSERT INTO event_search (event_id, room_id, key, value)" + " VALUES (?,?,?,?)" + ) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE): + clump = event_search_rows[index : index + INSERT_CLUMP_SIZE) + txn.execute_many(sql, clump) + + progress = { + "target_max_stream_id": target_min_stream_id, + "max_stream_id": min_stream_id, + "rows_inserted": rows_inserted + len(event_search_rows) + } + + self._background_update_progress_txn( + txn, self.EVENT_SEARCH_UPDATE_NAME, progress + ) + + return len(event_search_rows) + + result = yield self.runInteration( + self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn + ) + + if result is None: + yield _end_background_update(self.EVENT_SEARCH_UPDATE_NAME) + + defer.returnValue(result) + @defer.inlineCallbacks def search_msgs(self, room_ids, search_term, keys): """Performs a full text search over events with given keys. -- cgit 1.5.1 From a412b9a465cb6a64f44e3656d8645977f43c275f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 10 Nov 2015 15:50:58 +0000 Subject: Run the background updates when starting synapse. --- synapse/app/homeserver.py | 1 + synapse/storage/background_updates.py | 57 ++++++++++++++++++++++++++++++----- synapse/storage/search.py | 11 +++++-- synapse/util/__init__.py | 8 +++++ 4 files changed, 67 insertions(+), 10 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index a77535a4ee..cd7a52ec07 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -439,6 +439,7 @@ def setup(config_options): hs.get_pusherpool().start() hs.get_state_handler().start_caching() hs.get_datastore().start_profiling() + hs.get_datastore().start_doing_background_updates() hs.get_replication_layer().start_get_pdu_cache() return hs diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 32a233c213..b6cdc6ec68 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -43,7 +43,7 @@ class BackgroundUpdatePerformance(object): self.avg_item_count += 0.1 * (item_count - self.avg_item_count) self.avg_duration_ms += 0.1 * (duration_ms - self.avg_duration_ms) - def average_duration_ms_per_item(self): + def average_items_per_ms(self): """An estimate of how long it takes to do a single update. Returns: A duration in ms as a float @@ -53,7 +53,17 @@ class BackgroundUpdatePerformance(object): else: # Use the exponential moving average so that we can adapt to # changes in how long the update process takes. - return float(self.avg_duration_ms) / float(self.avg_item_count) + return float(self.avg_item_count) / float(self.avg_duration_ms) + + def total_items_per_ms(self): + """An estimate of how long it takes to do a single update. + Returns: + A duration in ms as a float + """ + if self.total_item_count == 0: + return None + else: + return float(self.total_item_count) / float(self.total_duration_ms) class BackgroundUpdateStore(SQLBaseStore): @@ -65,12 +75,41 @@ class BackgroundUpdateStore(SQLBaseStore): MINIMUM_BACKGROUND_BATCH_SIZE = 100 DEFAULT_BACKGROUND_BATCH_SIZE = 100 + BACKGROUND_UPDATE_INTERVAL_MS = 1000 + BACKGROUND_UPDATE_DURATION_MS = 100 def __init__(self, hs): super(BackgroundUpdateStore, self).__init__(hs) self._background_update_performance = {} self._background_update_queue = [] self._background_update_handlers = {} + self._background_update_timer = None + + @defer.inlineCallbacks + def start_doing_background_updates(self): + while True: + if self._background_update_timer is not None: + return + + sleep = defer.Deferred() + self._background_update_timer = self._clock.call_later( + self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback + ) + try: + yield sleep + finally: + self._background_update_timer = None + + result = yield self.do_background_update( + self.BACKGROUND_UPDATE_DURATION_MS + ) + + if result is None: + logger.info( + "No more background updates to do." + " Unscheduling background update task." + ) + return @defer.inlineCallbacks def do_background_update(self, desired_duration_ms): @@ -106,10 +145,10 @@ class BackgroundUpdateStore(SQLBaseStore): performance = BackgroundUpdatePerformance(update_name) self._background_update_performance[update_name] = performance - duration_ms_per_item = performance.average_duration_ms_per_item() + items_per_ms = performance.average_items_per_ms() - if duration_ms_per_item is not None: - batch_size = int(desired_duration_ms / duration_ms_per_item) + if items_per_ms is not None: + batch_size = int(desired_duration_ms * items_per_ms) # Clamp the batch size so that we always make progress batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE) else: @@ -130,8 +169,12 @@ class BackgroundUpdateStore(SQLBaseStore): duration_ms = time_stop - time_start logger.info( - "Updating %. Updated %r items in %rms", - update_name, items_updated, duration_ms + "Updating %. Updated %r items in %rms." + " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r)", + update_name, items_updated, duration_ms, + performance.total_items_per_ms(), + performance.average_items_per_ms(), + performance.total_item_count, ) performance.update(items_updated, duration_ms) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index f7c269865d..d170c546b5 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -29,6 +29,11 @@ class SearchStore(BackgroundUpdateStore): EVENT_SEARCH_UPDATE_NAME = "event_search" + def __init__(self, hs): + super(SearchStore, self).__init__(hs) + self.register_background_update_handler( + self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search + ) @defer.inlineCallbacks def _background_reindex_search(self, progress, batch_size): @@ -74,7 +79,7 @@ class SearchStore(BackgroundUpdateStore): elif event.type == "m.room.name": key = "content.name" value = content["name"] - except Exception: + except (KeyError, AttributeError): # If the event is missing a necessary field then # skip over it. continue @@ -96,7 +101,7 @@ class SearchStore(BackgroundUpdateStore): raise Exception("Unrecognized database engine") for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE): - clump = event_search_rows[index : index + INSERT_CLUMP_SIZE) + clump = event_search_rows[index:index + INSERT_CLUMP_SIZE] txn.execute_many(sql, clump) progress = { @@ -116,7 +121,7 @@ class SearchStore(BackgroundUpdateStore): ) if result is None: - yield _end_background_update(self.EVENT_SEARCH_UPDATE_NAME) + yield self._end_background_update(self.EVENT_SEARCH_UPDATE_NAME) defer.returnValue(result) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 1d123ccefc..d69c7cb991 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -53,6 +53,14 @@ class Clock(object): loop.stop() def call_later(self, delay, callback, *args, **kwargs): + """Call something later + + Args: + delay(float): How long to wait in seconds. + callback(function): Function to call + *args: Postional arguments to pass to function. + **kwargs: Key arguments to pass to function. + """ current_context = LoggingContext.current_context() def wrapped_callback(*args, **kwargs): -- cgit 1.5.1 From 90b503216c40974dc4925a9bbb673779a039143c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 10 Nov 2015 16:20:13 +0000 Subject: Use a background task to update databases to use the full text search --- synapse/storage/schema/delta/25/fts.py | 100 ++++++++------------------------- synapse/storage/search.py | 8 +-- 2 files changed, 28 insertions(+), 80 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py index b7cd0ce3b8..e408d36da0 100644 --- a/synapse/storage/schema/delta/25/fts.py +++ b/synapse/storage/schema/delta/25/fts.py @@ -22,7 +22,7 @@ import ujson logger = logging.getLogger(__name__) -POSTGRES_SQL = """ +POSTGRES_TABLE = """ CREATE TABLE IF NOT EXISTS event_search ( event_id TEXT, room_id TEXT, @@ -31,22 +31,6 @@ CREATE TABLE IF NOT EXISTS event_search ( vector tsvector ); -INSERT INTO event_search SELECT - event_id, room_id, json::json->>'sender', 'content.body', - to_tsvector('english', json::json->'content'->>'body') - FROM events NATURAL JOIN event_json WHERE type = 'm.room.message'; - -INSERT INTO event_search SELECT - event_id, room_id, json::json->>'sender', 'content.name', - to_tsvector('english', json::json->'content'->>'name') - FROM events NATURAL JOIN event_json WHERE type = 'm.room.name'; - -INSERT INTO event_search SELECT - event_id, room_id, json::json->>'sender', 'content.topic', - to_tsvector('english', json::json->'content'->>'topic') - FROM events NATURAL JOIN event_json WHERE type = 'm.room.topic'; - - CREATE INDEX event_search_fts_idx ON event_search USING gin(vector); CREATE INDEX event_search_ev_idx ON event_search(event_id); CREATE INDEX event_search_ev_ridx ON event_search(room_id); @@ -61,67 +45,31 @@ SQLITE_TABLE = ( def run_upgrade(cur, database_engine, *args, **kwargs): if isinstance(database_engine, PostgresEngine): - run_postgres_upgrade(cur) + for statement in get_statements(POSTGRES_TABLE.splitlines()): + cur.execute(statement) return if isinstance(database_engine, Sqlite3Engine): - run_sqlite_upgrade(cur) - return - - -def run_postgres_upgrade(cur): - for statement in get_statements(POSTGRES_SQL.splitlines()): - cur.execute(statement) - - -def run_sqlite_upgrade(cur): cur.execute(SQLITE_TABLE) + return - rowid = -1 - while True: - cur.execute( - "SELECT rowid, json FROM event_json" - " WHERE rowid > ?" - " ORDER BY rowid ASC LIMIT 100", - (rowid,) - ) - - res = cur.fetchall() - - if not res: - break - - events = [ - ujson.loads(js) - for _, js in res - ] - - rowid = max(rid for rid, _ in res) - - rows = [] - for ev in events: - content = ev.get("content", {}) - body = content.get("body", None) - name = content.get("name", None) - topic = content.get("topic", None) - sender = ev.get("sender", None) - if ev["type"] == "m.room.message" and body: - rows.append(( - ev["event_id"], ev["room_id"], sender, "content.body", body - )) - if ev["type"] == "m.room.name" and name: - rows.append(( - ev["event_id"], ev["room_id"], sender, "content.name", name - )) - if ev["type"] == "m.room.topic" and topic: - rows.append(( - ev["event_id"], ev["room_id"], sender, "content.topic", topic - )) - - if rows: - logger.info(rows) - cur.executemany( - "INSERT INTO event_search (event_id, room_id, sender, key, value)" - " VALUES (?,?,?,?,?)", - rows - ) + cur.execute("SELECT MIN(stream_ordering) FROM events") + rows = cur.fetchall() + min_stream_id = rows[0][0] + + cur.execute("SELECT MAX(stream_ordering) FROM events") + rows = cur.fetchall() + max_stream_id = rows[0][0] + + if min_stream_id is not None and max_stream_id is not None: + progress = { + "target_min_stream_id_inclusive": min_stream_id, + "max_stream_id_exclusive": max_stream_id + 1, + "rows_inserted": 0, + } + progress_json = ujson.dumps(progress) + + cur.execute( + "INSERT into background_updates (update_name, progress_json)" + " VALUES (?, ?)", ("event_search", progress_json) + ) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index d170c546b5..3b19b118eb 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -37,8 +37,8 @@ class SearchStore(BackgroundUpdateStore): @defer.inlineCallbacks def _background_reindex_search(self, progress, batch_size): - target_min_stream_id = progress["target_min_stream_id"] - max_stream_id = progress["max_stream_id"] + target_min_stream_id = progress["target_min_stream_id_inclusive"] + max_stream_id = progress["max_stream_id_exclusive"] rows_inserted = progress.get("rows_inserted", 0) INSERT_CLUMP_SIZE = 1000 @@ -105,8 +105,8 @@ class SearchStore(BackgroundUpdateStore): txn.execute_many(sql, clump) progress = { - "target_max_stream_id": target_min_stream_id, - "max_stream_id": min_stream_id, + "target_min_stream_id_inclusive": target_min_stream_id, + "max_stream_id_exclusive": min_stream_id, "rows_inserted": rows_inserted + len(event_search_rows) } -- cgit 1.5.1 From cf437900e0c689aad40f3da89866cf84c0f7ef65 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Tue, 10 Nov 2015 17:10:27 +0000 Subject: Return world_readable and guest_can_join in /publicRooms --- synapse/storage/events.py | 2 + synapse/storage/room.py | 71 ++++++++++++++---------- synapse/storage/schema/delta/25/guest_access.sql | 25 +++++++++ tests/storage/test_room.py | 2 + 4 files changed, 71 insertions(+), 29 deletions(-) create mode 100644 synapse/storage/schema/delta/25/guest_access.sql (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 59c9987202..4a365ff639 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -313,6 +313,8 @@ class EventsStore(SQLBaseStore): self._store_redaction(txn, event) elif event.type == EventTypes.RoomHistoryVisibility: self._store_history_visibility_txn(txn, event) + elif event.type == EventTypes.GuestAccess: + self._store_guest_access_txn(txn, event) self._store_room_members_txn( txn, diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 1c79626736..4f08df478c 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -99,34 +99,39 @@ class RoomStore(SQLBaseStore): """ def f(txn): - topic_subquery = ( - "SELECT topics.event_id as event_id, " - "topics.room_id as room_id, topic " - "FROM topics " - "INNER JOIN current_state_events as c " - "ON c.event_id = topics.event_id " - ) - - name_subquery = ( - "SELECT room_names.event_id as event_id, " - "room_names.room_id as room_id, name " - "FROM room_names " - "INNER JOIN current_state_events as c " - "ON c.event_id = room_names.event_id " - ) + def subquery(table_name, column_name=None): + column_name = column_name or table_name + return ( + "SELECT %(table_name)s.event_id as event_id, " + "%(table_name)s.room_id as room_id, %(column_name)s " + "FROM %(table_name)s " + "INNER JOIN current_state_events as c " + "ON c.event_id = %(table_name)s.event_id " % { + "column_name": column_name, + "table_name": table_name, + } + ) - # We use non printing ascii character US (\x1F) as a separator sql = ( - "SELECT r.room_id, max(n.name), max(t.topic)" + "SELECT" + " r.room_id," + " max(n.name)," + " max(t.topic)," + " max(v.history_visibility)," + " max(g.guest_access)" " FROM rooms AS r" " LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id" " LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id" + " LEFT JOIN (%(history_visibility)s) AS v ON v.room_id = r.room_id" + " LEFT JOIN (%(guest_access)s) AS g ON g.room_id = r.room_id" " WHERE r.is_public = ?" - " GROUP BY r.room_id" - ) % { - "topic": topic_subquery, - "name": name_subquery, - } + " GROUP BY r.room_id" % { + "topic": subquery("topics", "topic"), + "name": subquery("room_names", "name"), + "history_visibility": subquery("history_visibility"), + "guest_access": subquery("guest_access"), + } + ) txn.execute(sql, (is_public,)) @@ -156,10 +161,12 @@ class RoomStore(SQLBaseStore): "room_id": r[0], "name": r[1], "topic": r[2], - "aliases": r[3], + "world_readable": r[3] == "world_readable", + "guest_can_join": r[4] == "can_join", + "aliases": r[5], } for r in rows - if r[3] # We only return rooms that have at least one alias. + if r[5] # We only return rooms that have at least one alias. ] defer.returnValue(ret) @@ -203,16 +210,22 @@ class RoomStore(SQLBaseStore): ) def _store_history_visibility_txn(self, txn, event): - if hasattr(event, "content") and "history_visibility" in event.content: + self._store_content_index_txn(txn, event, "history_visibility") + + def _store_guest_access_txn(self, txn, event): + self._store_content_index_txn(txn, event, "guest_access") + + def _store_content_index_txn(self, txn, event, key): + if hasattr(event, "content") and key in event.content: sql = ( - "INSERT INTO history_visibility" - " (event_id, room_id, history_visibility)" - " VALUES (?, ?, ?)" + "INSERT INTO %(key)s" + " (event_id, room_id, %(key)s)" + " VALUES (?, ?, ?)" % {"key": key} ) txn.execute(sql, ( event.event_id, event.room_id, - event.content["history_visibility"] + event.content[key] )) def _store_event_search_txn(self, txn, event, key, value): diff --git a/synapse/storage/schema/delta/25/guest_access.sql b/synapse/storage/schema/delta/25/guest_access.sql new file mode 100644 index 0000000000..bdb90e7118 --- /dev/null +++ b/synapse/storage/schema/delta/25/guest_access.sql @@ -0,0 +1,25 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This is a manual index of guest_access content of state events, + * so that we can join on them in SELECT statements. + */ +CREATE TABLE IF NOT EXISTS guest_access( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + guest_access TEXT NOT NULL, + UNIQUE (event_id) +); diff --git a/tests/storage/test_room.py b/tests/storage/test_room.py index caffce64e3..91c967548d 100644 --- a/tests/storage/test_room.py +++ b/tests/storage/test_room.py @@ -73,6 +73,8 @@ class RoomStoreTestCase(unittest.TestCase): "room_id": self.room.to_string(), "topic": None, "aliases": [self.alias.to_string()], + "world_readable": False, + "guest_can_join": False, }, rooms[0]) -- cgit 1.5.1 From 940a16119205115c02a3b23e9f3c67a08486bae0 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 11 Nov 2015 13:59:40 +0000 Subject: Fix the background update --- synapse/storage/background_updates.py | 13 ++++++++----- synapse/storage/schema/delta/25/fts.py | 7 +++---- synapse/storage/search.py | 16 ++++++++-------- 3 files changed, 19 insertions(+), 17 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index b6cdc6ec68..45fccc2e5e 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -93,16 +93,19 @@ class BackgroundUpdateStore(SQLBaseStore): sleep = defer.Deferred() self._background_update_timer = self._clock.call_later( - self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback + self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None ) try: yield sleep finally: self._background_update_timer = None - result = yield self.do_background_update( - self.BACKGROUND_UPDATE_DURATION_MS - ) + try: + result = yield self.do_background_update( + self.BACKGROUND_UPDATE_DURATION_MS + ) + except: + logger.exception("Error doing update") if result is None: logger.info( @@ -169,7 +172,7 @@ class BackgroundUpdateStore(SQLBaseStore): duration_ms = time_stop - time_start logger.info( - "Updating %. Updated %r items in %rms." + "Updating %r. Updated %r items in %rms." " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r)", update_name, items_updated, duration_ms, performance.total_items_per_ms(), diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py index e408d36da0..ce92f59025 100644 --- a/synapse/storage/schema/delta/25/fts.py +++ b/synapse/storage/schema/delta/25/fts.py @@ -47,11 +47,10 @@ def run_upgrade(cur, database_engine, *args, **kwargs): if isinstance(database_engine, PostgresEngine): for statement in get_statements(POSTGRES_TABLE.splitlines()): cur.execute(statement) - return - - if isinstance(database_engine, Sqlite3Engine): + elif isinstance(database_engine, Sqlite3Engine): cur.execute(SQLITE_TABLE) - return + else: + raise Exception("Unrecognized database engine") cur.execute("SELECT MIN(stream_ordering) FROM events") rows = cur.fetchall() diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 3b19b118eb..3c0d671129 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -46,18 +46,18 @@ class SearchStore(BackgroundUpdateStore): def reindex_search_txn(txn): sql = ( - "SELECT stream_id, event_id FROM events" + "SELECT stream_ordering, event_id FROM events" " WHERE ? <= stream_ordering AND stream_ordering < ?" " AND (%s)" " ORDER BY stream_ordering DESC" " LIMIT ?" - ) % (" OR ".join("type = '%s'" % TYPES),) + ) % (" OR ".join("type = '%s'" % (t,) for t in TYPES),) - txn.execute(sql, target_min_stream_id, max_stream_id, batch_size) + txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) - rows = txn.fetch_all() + rows = txn.fetchall() if not rows: - return None + return 0 min_stream_id = rows[-1][0] event_ids = [row[1] for row in rows] @@ -102,7 +102,7 @@ class SearchStore(BackgroundUpdateStore): for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE): clump = event_search_rows[index:index + INSERT_CLUMP_SIZE] - txn.execute_many(sql, clump) + txn.executemany(sql, clump) progress = { "target_min_stream_id_inclusive": target_min_stream_id, @@ -116,11 +116,11 @@ class SearchStore(BackgroundUpdateStore): return len(event_search_rows) - result = yield self.runInteration( + result = yield self.runInteraction( self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn ) - if result is None: + if not result: yield self._end_background_update(self.EVENT_SEARCH_UPDATE_NAME) defer.returnValue(result) -- cgit 1.5.1 From e1627388d15fac534e22b80c2f7cb1a4e1afacc3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 11 Nov 2015 17:14:56 +0000 Subject: Fix param style to work on both sqlite and postgres --- synapse/storage/schema/delta/25/fts.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py index ce92f59025..5239d69073 100644 --- a/synapse/storage/schema/delta/25/fts.py +++ b/synapse/storage/schema/delta/25/fts.py @@ -68,7 +68,11 @@ def run_upgrade(cur, database_engine, *args, **kwargs): } progress_json = ujson.dumps(progress) - cur.execute( + sql = ( "INSERT into background_updates (update_name, progress_json)" - " VALUES (?, ?)", ("event_search", progress_json) + " VALUES (?, ?)" ) + + sql = database_engine.convert_param_style(sql) + + cur.execute(sql, ("event_search", progress_json)) -- cgit 1.5.1 From 39de87869c9ad966f382e597986e860d03f3fef2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 14:06:31 +0000 Subject: Fix bug where assumed dict was namedtuple --- synapse/storage/transactions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 4e0d7c9774..ad099775eb 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -59,7 +59,7 @@ class TransactionStore(SQLBaseStore): allow_none=True, ) - if result and result.response_code: + if result and result["response_code"]: return result["response_code"], result["response_json"] else: return None -- cgit 1.5.1 From 14a9d805b959b5d2b26fea0d57794cb3ceb28958 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 14:07:25 +0000 Subject: Use a (hopefully) more efficient SQL query for doing recency based room search --- synapse/storage/search.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 2e88c51ad0..eac265b543 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -253,11 +253,13 @@ class SearchStore(BackgroundUpdateStore): ) elif isinstance(self.database_engine, Sqlite3Engine): sql = ( - "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id," + "SELECT rank(matchinfo) as rank, room_id, event_id," " topological_ordering, stream_ordering" - " FROM event_search" - " NATURAL JOIN events" - " WHERE value MATCH ? AND room_id = ?" + " FROM (SELECT event_id, matchinfo(event_search) FROM event_search" + " WHERE value MATCH" + " )" + " CROSS JOIN events USING (event_id)" + " WHERE room_id = ?" ) else: # This should be unreachable. -- cgit 1.5.1 From 320408ef47eb373c2b8edad7ab3d08c3fa0cb459 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 15:09:45 +0000 Subject: Fix SQL syntax --- synapse/storage/search.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index eac265b543..e1911e2480 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -255,8 +255,9 @@ class SearchStore(BackgroundUpdateStore): sql = ( "SELECT rank(matchinfo) as rank, room_id, event_id," " topological_ordering, stream_ordering" - " FROM (SELECT event_id, matchinfo(event_search) FROM event_search" - " WHERE value MATCH" + " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo" + " FROM event_search" + " WHERE value MATCH ?" " )" " CROSS JOIN events USING (event_id)" " WHERE room_id = ?" -- cgit 1.5.1 From 764e79d0514a0cce9dc456df0355108d40dbeda8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 15:19:56 +0000 Subject: Comment --- synapse/storage/search.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index e1911e2480..0b00ddb9db 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -252,6 +252,8 @@ class SearchStore(BackgroundUpdateStore): " WHERE vector @@ query AND room_id = ?" ) elif isinstance(self.database_engine, Sqlite3Engine): + # We use CROSS JOIN here to ensure we use the right indexes. + # https://sqlite.org/optoverview.html#crossjoin sql = ( "SELECT rank(matchinfo) as rank, room_id, event_id," " topological_ordering, stream_ordering" -- cgit 1.5.1 From 8fd8e72cec8df94403441664d8eecc25fa8d363f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 15:33:47 +0000 Subject: Expand comment --- synapse/storage/search.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 0b00ddb9db..dcc5ac65a3 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -254,6 +254,12 @@ class SearchStore(BackgroundUpdateStore): elif isinstance(self.database_engine, Sqlite3Engine): # We use CROSS JOIN here to ensure we use the right indexes. # https://sqlite.org/optoverview.html#crossjoin + # + # We want to use the full text search index on event_search to + # extract all possible matches first, then lookup those matches + # in the events table to get the topological ordering. We need + # to use the indexes in this order because sqlite refuses to + # MATCH unless it uses the full text search index sql = ( "SELECT rank(matchinfo) as rank, room_id, event_id," " topological_ordering, stream_ordering" -- cgit 1.5.1 From 3de46c7755ac5e061fc13fb916c13b841b65f2b5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Nov 2015 15:36:43 +0000 Subject: Trailing whitespace --- synapse/storage/search.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index dcc5ac65a3..380270b009 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -258,7 +258,7 @@ class SearchStore(BackgroundUpdateStore): # We want to use the full text search index on event_search to # extract all possible matches first, then lookup those matches # in the events table to get the topological ordering. We need - # to use the indexes in this order because sqlite refuses to + # to use the indexes in this order because sqlite refuses to # MATCH unless it uses the full text search index sql = ( "SELECT rank(matchinfo) as rank, room_id, event_id," -- cgit 1.5.1 From fddedd51d9ca44f385d412e8c2e3e8d99325ba67 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 10 Nov 2015 18:27:23 +0000 Subject: Fix a few race conditions in the state calculation Be a bit more careful about how we calculate the state to be returned by /sync. In a few places, it was possible for /sync to return slightly later state than that represented by the next_batch token and the timeline. In particular, the following cases were susceptible: * On a full state sync, for an active room * During a per-room incremental sync with a timeline gap * When the user has just joined a room. (Refactor check_joined_room to make it less magical) Also, use store.get_state_for_events() (and thus the existing stategroups) to calculate the state corresponding to a particular sync position, rather than state_handler.compute_event_context(), which recalculates from first principles (and tends to miss some state). Merged from PR https://github.com/matrix-org/synapse/pull/372 --- synapse/handlers/sync.py | 123 ++++++++++++++++++++++++----------------------- synapse/storage/state.py | 14 ++++++ 2 files changed, 77 insertions(+), 60 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8b154fa7e7..6dc9d0fb92 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -254,9 +254,7 @@ class SyncHandler(BaseHandler): room_id, sync_config, now_token, since_token=timeline_since_token ) - current_state = yield self.state_handler.get_current_state( - room_id - ) + current_state = yield self.get_state_at(room_id, now_token) defer.returnValue(JoinedSyncResult( room_id=room_id, @@ -353,14 +351,12 @@ class SyncHandler(BaseHandler): room_id, sync_config, leave_token, since_token=timeline_since_token ) - leave_state = yield self.store.get_state_for_events( - [leave_event_id], None - ) + leave_state = yield self.store.get_state_for_event(leave_event_id) defer.returnValue(ArchivedSyncResult( room_id=room_id, timeline=batch, - state=leave_state[leave_event_id], + state=leave_state, private_user_data=self.private_user_data_for_room( room_id, tags_by_room ), @@ -424,6 +420,9 @@ class SyncHandler(BaseHandler): if len(room_events) <= timeline_limit: # There is no gap in any of the rooms. Therefore we can just # partition the new events by room and return them. + logger.debug("Got %i events for incremental sync - not limited", + len(room_events)) + invite_events = [] leave_events = [] events_by_room_id = {} @@ -439,9 +438,11 @@ class SyncHandler(BaseHandler): for room_id in joined_room_ids: recents = events_by_room_id.get(room_id, []) + logger.debug("Events for room %s: %r", room_id, recents) state = { (event.type, event.state_key): event for event in recents if event.is_state()} + limited = False if recents: prev_batch = now_token.copy_and_replace( @@ -450,9 +451,13 @@ class SyncHandler(BaseHandler): else: prev_batch = now_token - state, limited = yield self.check_joined_room( - sync_config, room_id, state - ) + just_joined = yield self.check_joined_room(sync_config, state) + if just_joined: + logger.debug("User has just joined %s: needs full state", + room_id) + state = yield self.get_state_at(room_id, now_token) + # the timeline is inherently limited if we've just joined + limited = True room_sync = JoinedSyncResult( room_id=room_id, @@ -467,10 +472,15 @@ class SyncHandler(BaseHandler): room_id, tags_by_room ), ) + logger.debug("Result for room %s: %r", room_id, room_sync) + if room_sync: joined.append(room_sync) else: + logger.debug("Got %i events for incremental sync - hit limit", + len(room_events)) + invite_events = yield self.store.get_invites_for_user( sync_config.user.to_string() ) @@ -563,6 +573,8 @@ class SyncHandler(BaseHandler): Returns: A Deferred JoinedSyncResult """ + logger.debug("Doing incremental sync for room %s between %s and %s", + room_id, since_token, now_token) # TODO(mjark): Check for redactions we might have missed. @@ -572,30 +584,26 @@ class SyncHandler(BaseHandler): logging.debug("Recents %r", batch) - # TODO(mjark): This seems racy since this isn't being passed a - # token to indicate what point in the stream this is - current_state = yield self.state_handler.get_current_state( - room_id - ) + current_state = yield self.get_state_at(room_id, now_token) - state_at_previous_sync = yield self.get_state_at_previous_sync( - room_id, since_token=since_token + state_at_previous_sync = yield self.get_state_at( + room_id, stream_position=since_token ) - state_events_delta = yield self.compute_state_delta( + state = yield self.compute_state_delta( since_token=since_token, previous_state=state_at_previous_sync, current_state=current_state, ) - state_events_delta, _ = yield self.check_joined_room( - sync_config, room_id, state_events_delta - ) + just_joined = yield self.check_joined_room(sync_config, state) + if just_joined: + state = yield self.get_state_at(room_id, now_token) room_sync = JoinedSyncResult( room_id=room_id, timeline=batch, - state=state_events_delta, + state=state, ephemeral=ephemeral_by_room.get(room_id, []), private_user_data=self.private_user_data_for_room( room_id, tags_by_room @@ -627,16 +635,12 @@ class SyncHandler(BaseHandler): logging.debug("Recents %r", batch) - # TODO(mjark): This seems racy since this isn't being passed a - # token to indicate what point in the stream this is - leave_state = yield self.store.get_state_for_events( - [leave_event.event_id], None + state_events_at_leave = yield self.store.get_state_for_event( + leave_event.event_id ) - state_events_at_leave = leave_state[leave_event.event_id] - - state_at_previous_sync = yield self.get_state_at_previous_sync( - leave_event.room_id, since_token=since_token + state_at_previous_sync = yield self.get_state_at( + leave_event.room_id, stream_position=since_token ) state_events_delta = yield self.compute_state_delta( @@ -659,26 +663,36 @@ class SyncHandler(BaseHandler): defer.returnValue(room_sync) @defer.inlineCallbacks - def get_state_at_previous_sync(self, room_id, since_token): - """ Get the room state at the previous sync the client made. - Returns: - A Deferred map from ((type, state_key)->Event) + def get_state_after_event(self, event): + """ + Get the room state after the given event + + :param synapse.events.EventBase event: event of interest + :return: A Deferred map from ((type, state_key)->Event) + """ + state = yield self.store.get_state_for_event(event.event_id) + if event.is_state(): + state = state.copy() + state[(event.type, event.state_key)] = event + defer.returnValue(state) + + @defer.inlineCallbacks + def get_state_at(self, room_id, stream_position): + """ Get the room state at a particular stream position + :param str room_id: room for which to get state + :param StreamToken stream_position: point at which to get state + :returns: A Deferred map from ((type, state_key)->Event) """ last_events, token = yield self.store.get_recent_events_for_room( - room_id, end_token=since_token.room_key, limit=1, + room_id, end_token=stream_position.room_key, limit=1, ) if last_events: - last_event = last_events[0] - last_context = yield self.state_handler.compute_event_context( - last_event - ) - if last_event.is_state(): - state = last_context.current_state.copy() - state[(last_event.type, last_event.state_key)] = last_event - else: - state = last_context.current_state + last_event = last_events[-1] + state = yield self.get_state_after_event(last_event) + else: + # no events in this room - so presumably no state state = {} defer.returnValue(state) @@ -706,31 +720,20 @@ class SyncHandler(BaseHandler): state_delta[key] = event return state_delta - @defer.inlineCallbacks - def check_joined_room(self, sync_config, room_id, state_delta): + def check_joined_room(self, sync_config, state_delta): """ - Check if the user has just joined the given room. If so, return the - full state for the room, instead of the delta since the last sync. + Check if the user has just joined the given room (so should + be given the full state) :param sync_config: - :param room_id: :param dict[(str,str), synapse.events.FrozenEvent] state_delta: the difference in state since the last sync :returns A deferred Tuple (state_delta, limited) """ - joined = False - limited = False - join_event = state_delta.get(( EventTypes.Member, sync_config.user.to_string()), None) if join_event is not None: if join_event.content["membership"] == Membership.JOIN: - joined = True - - if joined: - state_delta = yield self.state_handler.get_current_state(room_id) - # the timeline is inherently limited if we've just joined - limited = True - - defer.returnValue((state_delta, limited)) + return True + return False diff --git a/synapse/storage/state.py b/synapse/storage/state.py index acfb322a53..80e9b63f50 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -237,6 +237,20 @@ class StateStore(SQLBaseStore): defer.returnValue({event: event_to_state[event] for event in event_ids}) + @defer.inlineCallbacks + def get_state_for_event(self, event_id, types=None): + """ + Get the state dict corresponding to a particular event + + :param str event_id: event whose state should be returned + :param list[(str, str)]|None types: List of (type, state_key) tuples + which are used to filter the state fetched. May be None, which + matches any key + :return: a deferred dict from (type, state_key) -> state_event + """ + state_map = yield self.get_state_for_events([event_id], types) + defer.returnValue(state_map[event_id]) + @cached(num_args=2, lru=True, max_entries=10000) def _get_state_group_for_event(self, room_id, event_id): return self._simple_select_one_onecol( -- cgit 1.5.1 From e4d622aaaf0df503f942d016a5bf798dd52899d1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 10 Nov 2015 18:29:25 +0000 Subject: Implementation of state rollback in /sync Implementation of SPEC-254: roll back the state dictionary to how it looked at the start of the timeline. Merged PR https://github.com/matrix-org/synapse/pull/373 --- synapse/rest/client/v2_alpha/sync.py | 67 ++++++++++++++++++++++++++++++++++-- synapse/storage/events.py | 6 ++-- 2 files changed, 69 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 272a00bc85..efd8281558 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -20,6 +20,7 @@ from synapse.http.servlet import ( ) from synapse.handlers.sync import SyncConfig from synapse.types import StreamToken +from synapse.events import FrozenEvent from synapse.events.utils import ( serialize_event, format_event_for_client_v2_without_event_id, ) @@ -256,7 +257,13 @@ class SyncRestServlet(RestServlet): :rtype: dict[str, object] """ event_map = {} - state_events = filter.filter_room_state(room.state.values()) + state_dict = room.state + timeline_events = filter.filter_room_timeline(room.timeline.events) + + state_dict = SyncRestServlet._rollback_state_for_timeline( + state_dict, timeline_events) + + state_events = filter.filter_room_state(state_dict.values()) state_event_ids = [] for event in state_events: # TODO(mjark): Respect formatting requirements in the filter. @@ -266,7 +273,6 @@ class SyncRestServlet(RestServlet): ) state_event_ids.append(event.event_id) - timeline_events = filter.filter_room_timeline(room.timeline.events) timeline_event_ids = [] for event in timeline_events: # TODO(mjark): Respect formatting requirements in the filter. @@ -297,6 +303,63 @@ class SyncRestServlet(RestServlet): return result + @staticmethod + def _rollback_state_for_timeline(state, timeline): + """ + Wind the state dictionary backwards, so that it represents the + state at the start of the timeline, rather than at the end. + + :param dict[(str, str), synapse.events.EventBase] state: the + state dictionary. Will be updated to the state before the timeline. + :param list[synapse.events.EventBase] timeline: the event timeline + :return: updated state dictionary + """ + logger.debug("Processing state dict %r; timeline %r", state, + [e.get_dict() for e in timeline]) + + result = state.copy() + + for timeline_event in reversed(timeline): + if not timeline_event.is_state(): + continue + + event_key = (timeline_event.type, timeline_event.state_key) + + logger.debug("Considering %s for removal", event_key) + + state_event = result.get(event_key) + if (state_event is None or + state_event.event_id != timeline_event.event_id): + # the event in the timeline isn't present in the state + # dictionary. + # + # the most likely cause for this is that there was a fork in + # the event graph, and the state is no longer valid. Really, + # the event shouldn't be in the timeline. We're going to ignore + # it for now, however. + logger.warn("Found state event %r in timeline which doesn't " + "match state dictionary", timeline_event) + continue + + prev_event_id = timeline_event.unsigned.get("replaces_state", None) + logger.debug("Replacing %s with %s in state dict", + timeline_event.event_id, prev_event_id) + + if prev_event_id is None: + del result[event_key] + else: + result[event_key] = FrozenEvent({ + "type": timeline_event.type, + "state_key": timeline_event.state_key, + "content": timeline_event.unsigned['prev_content'], + "sender": timeline_event.unsigned['prev_sender'], + "event_id": prev_event_id, + "room_id": timeline_event.room_id, + }) + logger.debug("New value: %r", result.get(event_key)) + + return result + def register_servlets(hs, http_server): SyncRestServlet(hs).register(http_server) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4a365ff639..5d35ca90b9 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -831,7 +831,8 @@ class EventsStore(SQLBaseStore): allow_none=True, ) if prev: - ev.unsigned["prev_content"] = prev.get_dict()["content"] + ev.unsigned["prev_content"] = prev.content + ev.unsigned["prev_sender"] = prev.sender self._get_event_cache.prefill( (ev.event_id, check_redacted, get_prev_content), ev @@ -888,7 +889,8 @@ class EventsStore(SQLBaseStore): get_prev_content=False, ) if prev: - ev.unsigned["prev_content"] = prev.get_dict()["content"] + ev.unsigned["prev_content"] = prev.content + ev.unsigned["prev_sender"] = prev.sender self._get_event_cache.prefill( (ev.event_id, check_redacted, get_prev_content), ev -- cgit 1.5.1