From 9e25443db84f16bca36d1ba605e5b5ea09d1f8c7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Sep 2016 14:31:26 +0100 Subject: Move to storing state_groups_state as deltas --- synapse/storage/prepare_database.py | 2 +- synapse/storage/schema/delta/35/state.sql | 21 ++++ synapse/storage/state.py | 161 +++++++++++++++++++----------- 3 files changed, 124 insertions(+), 60 deletions(-) create mode 100644 synapse/storage/schema/delta/35/state.sql (limited to 'synapse/storage') diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index b94ce7bea1..b1fbc4ffa5 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 34 +SCHEMA_VERSION = 35 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/35/state.sql b/synapse/storage/schema/delta/35/state.sql new file mode 100644 index 0000000000..c4c244c169 --- /dev/null +++ b/synapse/storage/schema/delta/35/state.sql @@ -0,0 +1,21 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE state_group_edges( + state_group BIGINT NOT NULL, + prev_state_group BIGINT NOT NULL +); + +CREATE INDEX state_group_edges_idx ON state_group_edges(state_group); diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ec551b0b4f..73cebc7383 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -16,6 +16,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches import intern_string +from synapse.storage.engines import PostgresEngine from twisted.internet import defer @@ -118,20 +119,45 @@ class StateStore(SQLBaseStore): }, ) - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { + if context.prev_group: + self._simple_insert_txn( + txn, + table="state_group_edges", + values={ "state_group": context.state_group, - "room_id": event.room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in state_event_ids.items() - ], - ) + "prev_state_group": context.prev_group, + }, + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.delta_ids.items() + ], + ) + else: + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in state_event_ids.items() + ], + ) self._simple_insert_many_txn( txn, @@ -214,26 +240,70 @@ class StateStore(SQLBaseStore): else: where_clause = "" - sql = ( - "SELECT state_group, event_id, type, state_key" - " FROM state_groups_state WHERE" - " state_group IN (%s) %s" % ( - ",".join("?" for _ in groups), - where_clause, - ) - ) - - args = list(groups) - if types is not None: - args.extend([i for typ in types for i in typ]) - - txn.execute(sql, args) - rows = self.cursor_to_dict(txn) - results = {group: {} for group in groups} - for row in rows: - key = (row["type"], row["state_key"]) - results[row["state_group"]][key] = row["event_id"] + if isinstance(self.database_engine, PostgresEngine): + sql = (""" + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT type, state_key, event_id FROM state_groups_state + WHERE ROW(type, state_key, state_group) IN ( + SELECT type, state_key, max(state_group) FROM state + INNER JOIN state_groups_state USING (state_group) + GROUP BY type, state_key + ) + %s; + """) % (where_clause,) + + for group in groups: + args = [group] + if types is not None: + args.extend([i for typ in types for i in typ]) + + txn.execute(sql, args) + rows = self.cursor_to_dict(txn) + for row in rows: + key = (row["type"], row["state_key"]) + results[group][key] = row["event_id"] + else: + for group in groups: + group_tree = [group] + next_group = group + + while next_group: + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + if next_group: + group_tree.append(next_group) + + sql = (""" + SELECT type, state_key, event_id FROM state_groups_state + INNER JOIN ( + SELECT type, state_key, max(state_group) as state_group + FROM state_groups_state + WHERE state_group IN (%s) %s + GROUP BY type, state_key + ) USING (type, state_key, state_group); + """) % (",".join("?" for _ in group_tree), where_clause,) + + args = list(group_tree) + if types is not None: + args.extend([i for typ in types for i in typ]) + + txn.execute(sql, args) + rows = self.cursor_to_dict(txn) + for row in rows: + key = (row["type"], row["state_key"]) + results[group][key] = row["event_id"] + return results results = {} @@ -504,32 +574,5 @@ class StateStore(SQLBaseStore): defer.returnValue(results) - def get_all_new_state_groups(self, last_id, current_id, limit): - def get_all_new_state_groups_txn(txn): - sql = ( - "SELECT id, room_id, event_id FROM state_groups" - " WHERE ? < id AND id <= ? ORDER BY id LIMIT ?" - ) - txn.execute(sql, (last_id, current_id, limit)) - groups = txn.fetchall() - - if not groups: - return ([], []) - - lower_bound = groups[0][0] - upper_bound = groups[-1][0] - sql = ( - "SELECT state_group, type, state_key, event_id" - " FROM state_groups_state" - " WHERE ? <= state_group AND state_group <= ?" - ) - - txn.execute(sql, (lower_bound, upper_bound)) - state_group_state = txn.fetchall() - return (groups, state_group_state) - return self.runInteraction( - "get_all_new_state_groups", get_all_new_state_groups_txn - ) - def get_next_state_group(self): return self._state_groups_id_gen.get_next() -- cgit 1.4.1 From 598317927cb8f741528d639f3ce875299fde478e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 2 Sep 2016 10:41:38 +0100 Subject: Limit the length of state chains --- synapse/storage/events.py | 49 +++++++++++++---------- synapse/storage/state.py | 100 ++++++++++++++++++++++++++++++++++++---------- 2 files changed, 106 insertions(+), 43 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 1a7d4c5199..7e9b351513 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -497,7 +497,11 @@ class EventsStore(SQLBaseStore): # insert into the state_group, state_groups_state and # event_to_state_groups tables. - self._store_mult_state_groups_txn(txn, ((event, context),)) + try: + self._store_mult_state_groups_txn(txn, ((event, context),)) + except Exception: + logger.exception("") + raise metadata_json = encode_json( event.internal_metadata.get_dict() @@ -1543,6 +1547,9 @@ class EventsStore(SQLBaseStore): ) event_rows = txn.fetchall() + for event_id, state_key in event_rows: + txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) + # We calculate the new entries for the backward extremeties by finding # all events that point to events that are to be purged txn.execute( @@ -1571,26 +1578,26 @@ class EventsStore(SQLBaseStore): # Get all state groups that are only referenced by events that are # to be deleted. - txn.execute( - "SELECT state_group FROM event_to_state_groups" - " INNER JOIN events USING (event_id)" - " WHERE state_group IN (" - " SELECT DISTINCT state_group FROM events" - " INNER JOIN event_to_state_groups USING (event_id)" - " WHERE room_id = ? AND topological_ordering < ?" - " )" - " GROUP BY state_group HAVING MAX(topological_ordering) < ?", - (room_id, topological_ordering, topological_ordering) - ) - state_rows = txn.fetchall() - txn.executemany( - "DELETE FROM state_groups_state WHERE state_group = ?", - state_rows - ) - txn.executemany( - "DELETE FROM state_groups WHERE id = ?", - state_rows - ) + # txn.execute( + # "SELECT state_group FROM event_to_state_groups" + # " INNER JOIN events USING (event_id)" + # " WHERE state_group IN (" + # " SELECT DISTINCT state_group FROM events" + # " INNER JOIN event_to_state_groups USING (event_id)" + # " WHERE room_id = ? AND topological_ordering < ?" + # " )" + # " GROUP BY state_group HAVING MAX(topological_ordering) < ?", + # (room_id, topological_ordering, topological_ordering) + # ) + # state_rows = txn.fetchall() + # txn.executemany( + # "DELETE FROM state_groups_state WHERE state_group = ?", + # state_rows + # ) + # txn.executemany( + # "DELETE FROM state_groups WHERE id = ?", + # state_rows + # ) # Delete all non-state txn.executemany( "DELETE FROM event_to_state_groups WHERE event_id = ?", diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 73cebc7383..7f45c0cd99 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -25,6 +25,9 @@ import logging logger = logging.getLogger(__name__) +MAX_STATE_DELTA_HOPS = 100 + + class StateStore(SQLBaseStore): """ Keeps track of the state at a given event. @@ -104,7 +107,6 @@ class StateStore(SQLBaseStore): state_groups[event.event_id] = context.state_group if self._have_persisted_state_group_txn(txn, context.state_group): - logger.info("Already persisted state_group: %r", context.state_group) continue state_event_ids = dict(context.current_state_ids) @@ -120,29 +122,48 @@ class StateStore(SQLBaseStore): ) if context.prev_group: - self._simple_insert_txn( - txn, - table="state_group_edges", - values={ - "state_group": context.state_group, - "prev_state_group": context.prev_group, - }, + potential_hops = self._count_state_group_hops_txn( + txn, context.prev_group ) - - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { + if potential_hops < MAX_STATE_DELTA_HOPS: + self._simple_insert_txn( + txn, + table="state_group_edges", + values={ "state_group": context.state_group, - "room_id": event.room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in context.delta_ids.items() - ], - ) + "prev_state_group": context.prev_group, + }, + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.delta_ids.items() + ], + ) + else: + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.current_state_ids.items() + ], + ) else: self._simple_insert_many_txn( txn, @@ -171,6 +192,41 @@ class StateStore(SQLBaseStore): ], ) + def _count_state_group_hops_txn(self, txn, state_group): + if isinstance(self.database_engine, PostgresEngine): + sql = (""" + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT count(*) FROM state; + """) + + txn.execute(sql, (state_group,)) + row = txn.fetchone() + if row and row[0]: + return row[0] + else: + return 0 + else: + next_group = state_group + count = 0 + + while next_group: + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + if next_group: + count += 1 + + return count + @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): if event_type and state_key is not None: -- cgit 1.4.1 From a99e9335502df3389ff6f16ef52c43ce391b6955 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 09:34:24 +0100 Subject: Add upgrade script that will slowly prune state_groups_state entries --- synapse/replication/slave/storage/events.py | 3 + synapse/storage/schema/delta/35/state_dedupe.sql | 17 ++ synapse/storage/state.py | 278 +++++++++++++++++------ 3 files changed, 223 insertions(+), 75 deletions(-) create mode 100644 synapse/storage/schema/delta/35/state_dedupe.sql (limited to 'synapse/storage') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index cbebd5b2f7..15c52774a2 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -86,6 +86,9 @@ class SlavedEventStore(BaseSlavedStore): _get_state_groups_from_groups = ( StateStore.__dict__["_get_state_groups_from_groups"] ) + _get_state_groups_from_groups_txn = ( + DataStore._get_state_groups_from_groups_txn.__func__ + ) _get_state_group_from_group = ( StateStore.__dict__["_get_state_group_from_group"] ) diff --git a/synapse/storage/schema/delta/35/state_dedupe.sql b/synapse/storage/schema/delta/35/state_dedupe.sql new file mode 100644 index 0000000000..97e5067ef4 --- /dev/null +++ b/synapse/storage/schema/delta/35/state_dedupe.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT into background_updates (update_name, progress_json) + VALUES ('state_group_state_deduplication', '{}'); diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 7f45c0cd99..968b68f462 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -47,6 +47,15 @@ class StateStore(SQLBaseStore): * `state_groups_state`: Maps state group to state events. """ + STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" + + def __init__(self, hs): + super(StateStore, self).__init__(hs) + self.register_background_update_handler( + self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, + self._background_deduplicate_state, + ) + @defer.inlineCallbacks def get_state_groups_ids(self, room_id, event_ids): if not event_ids: @@ -288,92 +297,92 @@ class StateStore(SQLBaseStore): def _get_state_groups_from_groups(self, groups, types): """Returns dictionary state_group -> (dict of (type, state_key) -> event id) """ - def f(txn, groups): - if types is not None: - where_clause = "AND (%s)" % ( - " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), - ) - else: - where_clause = "" - - results = {group: {} for group in groups} - if isinstance(self.database_engine, PostgresEngine): - sql = (""" - WITH RECURSIVE state(state_group) AS ( - VALUES(?::bigint) - UNION ALL - SELECT prev_state_group FROM state_group_edges e, state s - WHERE s.state_group = e.state_group - ) - SELECT type, state_key, event_id FROM state_groups_state - WHERE ROW(type, state_key, state_group) IN ( - SELECT type, state_key, max(state_group) FROM state - INNER JOIN state_groups_state USING (state_group) - GROUP BY type, state_key - ) - %s; - """) % (where_clause,) - - for group in groups: - args = [group] - if types is not None: - args.extend([i for typ in types for i in typ]) - - txn.execute(sql, args) - rows = self.cursor_to_dict(txn) - for row in rows: - key = (row["type"], row["state_key"]) - results[group][key] = row["event_id"] - else: - for group in groups: - group_tree = [group] - next_group = group - - while next_group: - next_group = self._simple_select_one_onecol_txn( - txn, - table="state_group_edges", - keyvalues={"state_group": next_group}, - retcol="prev_state_group", - allow_none=True, - ) - if next_group: - group_tree.append(next_group) - - sql = (""" - SELECT type, state_key, event_id FROM state_groups_state - INNER JOIN ( - SELECT type, state_key, max(state_group) as state_group - FROM state_groups_state - WHERE state_group IN (%s) %s - GROUP BY type, state_key - ) USING (type, state_key, state_group); - """) % (",".join("?" for _ in group_tree), where_clause,) - - args = list(group_tree) - if types is not None: - args.extend([i for typ in types for i in typ]) - - txn.execute(sql, args) - rows = self.cursor_to_dict(txn) - for row in rows: - key = (row["type"], row["state_key"]) - results[group][key] = row["event_id"] - - return results - results = {} chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)] for chunk in chunks: res = yield self.runInteraction( "_get_state_groups_from_groups", - f, chunk + self._get_state_groups_from_groups_txn, chunk, types, ) results.update(res) defer.returnValue(results) + def _get_state_groups_from_groups_txn(self, txn, groups, types=None): + if types is not None: + where_clause = "AND (%s)" % ( + " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), + ) + else: + where_clause = "" + + results = {group: {} for group in groups} + if isinstance(self.database_engine, PostgresEngine): + sql = (""" + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT type, state_key, event_id FROM state_groups_state + WHERE ROW(type, state_key, state_group) IN ( + SELECT type, state_key, max(state_group) FROM state + INNER JOIN state_groups_state USING (state_group) + GROUP BY type, state_key + ) + %s; + """) % (where_clause,) + + for group in groups: + args = [group] + if types is not None: + args.extend([i for typ in types for i in typ]) + + txn.execute(sql, args) + rows = self.cursor_to_dict(txn) + for row in rows: + key = (row["type"], row["state_key"]) + results[group][key] = row["event_id"] + else: + for group in groups: + group_tree = [group] + next_group = group + + while next_group: + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + if next_group: + group_tree.append(next_group) + + sql = (""" + SELECT type, state_key, event_id FROM state_groups_state + INNER JOIN ( + SELECT type, state_key, max(state_group) as state_group + FROM state_groups_state + WHERE state_group IN (%s) %s + GROUP BY type, state_key + ) USING (type, state_key, state_group); + """) % (",".join("?" for _ in group_tree), where_clause,) + + args = list(group_tree) + if types is not None: + args.extend([i for typ in types for i in typ]) + + txn.execute(sql, args) + rows = self.cursor_to_dict(txn) + for row in rows: + key = (row["type"], row["state_key"]) + results[group][key] = row["event_id"] + + return results + @defer.inlineCallbacks def get_state_for_events(self, event_ids, types): """Given a list of event_ids and type tuples, return a list of state @@ -632,3 +641,122 @@ class StateStore(SQLBaseStore): def get_next_state_group(self): return self._state_groups_id_gen.get_next() + + @defer.inlineCallbacks + def _background_deduplicate_state(self, progress, batch_size): + last_state_group = progress.get("last_state_group", 0) + rows_inserted = progress.get("rows_inserted", 0) + max_group = progress.get("max_group", None) + + if max_group is None: + rows = yield self._execute( + "_background_deduplicate_state", None, + "SELECT coalesce(max(id), 0) FROM state_groups", + ) + max_group = rows[0][0] + + def reindex_txn(txn): + new_last_state_group = last_state_group + for count in xrange(batch_size): + txn.execute( + "SELECT id, room_id FROM state_groups" + " WHERE ? < id AND id <= ?" + " ORDER BY id ASC" + " LIMIT 1", + (new_last_state_group, max_group,) + ) + row = txn.fetchone() + if row: + state_group, room_id = row + + if not row or not state_group: + return True, count + + txn.execute( + "SELECT coalesce(max(id), 0) FROM state_groups" + " WHERE id < ? AND room_id = ?", + (state_group, room_id,) + ) + prev_group, = txn.fetchone() + new_last_state_group = state_group + + if prev_group: + potential_hops = self._count_state_group_hops_txn( + txn, prev_group + ) + if potential_hops >= MAX_STATE_DELTA_HOPS: + # We want to ensure chains are at most this long,# + # otherwise read performance degrades. + continue + + prev_state = self._get_state_groups_from_groups_txn( + txn, [prev_group], types=None + ) + prev_state = prev_state.values()[0] + + curr_state = self._get_state_groups_from_groups_txn( + txn, [state_group], types=None + ) + curr_state = curr_state.values()[0] + + if not set(prev_state.keys()) - set(curr_state.keys()): + # We can only do a delta if the current has a strict super set + # of keys + + delta_state = { + key: value for key, value in curr_state.items() + if prev_state.get(key, None) != value + } + + self._simple_insert_txn( + txn, + table="state_group_edges", + values={ + "state_group": state_group, + "prev_state_group": prev_group, + } + ) + + self._simple_delete_txn( + txn, + table="state_groups_state", + keyvalues={ + "state_group": state_group, + } + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": state_group, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in delta_state.items() + ], + ) + + progress = { + "last_state_group": state_group, + "rows_inserted": rows_inserted + batch_size, + "max_group": max_group, + } + + self._background_update_progress_txn( + txn, self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, progress + ) + + return False, batch_size + + finished, result = yield self.runInteraction( + self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, reindex_txn + ) + + if finished: + yield self._end_background_update(self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME) + + defer.returnValue(result) -- cgit 1.4.1 From 628e65721bdf1fb39e78a833d757a38e614b652d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 10:41:27 +0100 Subject: Add comments --- synapse/events/snapshot.py | 5 +++ synapse/storage/state.py | 79 ++++++++++++++++++++++------------------------ 2 files changed, 43 insertions(+), 41 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index ec32008d5a..11605b34a3 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -27,12 +27,17 @@ class EventContext(object): ] def __init__(self): + # The current state including the current event self.current_state_ids = None + # The current state excluding the current event self.prev_state_ids = None self.state_group = None + self.rejected = False self.push_actions = [] + # A previously persisted state group and a delta between that + # and this state. self.prev_group = None self.delta_ids = None diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 968b68f462..ee8b763008 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -118,8 +118,6 @@ class StateStore(SQLBaseStore): if self._have_persisted_state_group_txn(txn, context.state_group): continue - state_event_ids = dict(context.current_state_ids) - self._simple_insert_txn( txn, table="state_groups", @@ -130,49 +128,36 @@ class StateStore(SQLBaseStore): }, ) + # We persist as a delta if we can, while also ensuring the chain + # of deltas isn't tooo long, as otherwise read performance degrades. if context.prev_group: potential_hops = self._count_state_group_hops_txn( txn, context.prev_group ) - if potential_hops < MAX_STATE_DELTA_HOPS: - self._simple_insert_txn( - txn, - table="state_group_edges", - values={ - "state_group": context.state_group, - "prev_state_group": context.prev_group, - }, - ) + if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS: + self._simple_insert_txn( + txn, + table="state_group_edges", + values={ + "state_group": context.state_group, + "prev_state_group": context.prev_group, + }, + ) - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { - "state_group": context.state_group, - "room_id": event.room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in context.delta_ids.items() - ], - ) - else: - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { - "state_group": context.state_group, - "room_id": event.room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in context.current_state_ids.items() - ], - ) + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.delta_ids.items() + ], + ) else: self._simple_insert_many_txn( txn, @@ -185,7 +170,7 @@ class StateStore(SQLBaseStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in state_event_ids.items() + for key, state_id in context.current_state_ids.items() ], ) @@ -202,6 +187,10 @@ class StateStore(SQLBaseStore): ) def _count_state_group_hops_txn(self, txn, state_group): + """Given a state group, count how many hops there are in the tree. + + This is used to ensure the delta chains don't get too long. + """ if isinstance(self.database_engine, PostgresEngine): sql = (""" WITH RECURSIVE state(state_group) AS ( @@ -319,6 +308,11 @@ class StateStore(SQLBaseStore): results = {group: {} for group in groups} if isinstance(self.database_engine, PostgresEngine): + # The below query walks the state_group tree so that the "state" + # table includes all state_groups in the tree. It then joins + # against `state_groups_state` to fetch the latest state. + # It assumes that previous state groups are always numerically + # lesser. sql = (""" WITH RECURSIVE state(state_group) AS ( VALUES(?::bigint) @@ -644,6 +638,9 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def _background_deduplicate_state(self, progress, batch_size): + """This background update will slowly deduplicate state by reencoding + them as deltas. + """ last_state_group = progress.get("last_state_group", 0) rows_inserted = progress.get("rows_inserted", 0) max_group = progress.get("max_group", None) -- cgit 1.4.1 From 485d999c8a95f8fdc6425a00e906e86efc77a917 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 14:49:08 +0100 Subject: Correctly delete old state groups in purge history API --- synapse/storage/events.py | 99 ++++++++++++++++++++++++------- synapse/storage/schema/delta/35/state.sql | 1 + 2 files changed, 80 insertions(+), 20 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7e9b351513..bec35ea68d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1578,26 +1578,85 @@ class EventsStore(SQLBaseStore): # Get all state groups that are only referenced by events that are # to be deleted. - # txn.execute( - # "SELECT state_group FROM event_to_state_groups" - # " INNER JOIN events USING (event_id)" - # " WHERE state_group IN (" - # " SELECT DISTINCT state_group FROM events" - # " INNER JOIN event_to_state_groups USING (event_id)" - # " WHERE room_id = ? AND topological_ordering < ?" - # " )" - # " GROUP BY state_group HAVING MAX(topological_ordering) < ?", - # (room_id, topological_ordering, topological_ordering) - # ) - # state_rows = txn.fetchall() - # txn.executemany( - # "DELETE FROM state_groups_state WHERE state_group = ?", - # state_rows - # ) - # txn.executemany( - # "DELETE FROM state_groups WHERE id = ?", - # state_rows - # ) + txn.execute( + "SELECT state_group FROM event_to_state_groups" + " INNER JOIN events USING (event_id)" + " WHERE state_group IN (" + " SELECT DISTINCT state_group FROM events" + " INNER JOIN event_to_state_groups USING (event_id)" + " WHERE room_id = ? AND topological_ordering < ?" + " )" + " GROUP BY state_group HAVING MAX(topological_ordering) < ?", + (room_id, topological_ordering, topological_ordering) + ) + + state_rows = txn.fetchall() + state_groups_to_delete = [sg for sg, in state_rows] + + # Now we get all the state groups that rely on these state groups + new_state_edges = [] + chunks = [ + state_groups_to_delete[i:i + 100] + for i in xrange(0, len(state_groups_to_delete), 100) + ] + for chunk in chunks: + rows = self._simple_select_many_txn( + txn, + table="state_group_edges", + column="prev_state_group", + iterable=chunk, + retcols=["state_group"], + keyvalues={}, + ) + new_state_edges.extend(row["state_group"] for row in rows) + + # Now we turn the state groups that reference to-be-deleted state groups + # to non delta versions. + for new_state_edge in new_state_edges: + curr_state = self._get_state_groups_from_groups_txn( + txn, [new_state_edge], types=None + ) + curr_state = curr_state.values()[0] + + self._simple_delete_txn( + txn, + table="state_groups_state", + keyvalues={ + "state_group": new_state_edge, + } + ) + + self._simple_delete_txn( + txn, + table="state_group_edges", + keyvalues={ + "state_group": new_state_edge, + } + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": new_state_edge, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in curr_state.items() + ], + ) + + txn.executemany( + "DELETE FROM state_groups_state WHERE state_group = ?", + state_rows + ) + txn.executemany( + "DELETE FROM state_groups WHERE id = ?", + state_rows + ) # Delete all non-state txn.executemany( "DELETE FROM event_to_state_groups WHERE event_id = ?", diff --git a/synapse/storage/schema/delta/35/state.sql b/synapse/storage/schema/delta/35/state.sql index c4c244c169..0f1fa68a89 100644 --- a/synapse/storage/schema/delta/35/state.sql +++ b/synapse/storage/schema/delta/35/state.sql @@ -19,3 +19,4 @@ CREATE TABLE state_group_edges( ); CREATE INDEX state_group_edges_idx ON state_group_edges(state_group); +CREATE INDEX state_group_edges_prev_idx ON state_group_edges(prev_state_group); -- cgit 1.4.1 From 373654c6354c04b08a6f4dcb0ff7fa9ccae02f55 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 14:50:36 +0100 Subject: Comment about sqlite and WITH RECURSIVE --- synapse/storage/state.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ee8b763008..e790793370 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -209,6 +209,8 @@ class StateStore(SQLBaseStore): else: return 0 else: + # We don't use WITH RECURSIVE on sqlite3 as there are distributions + # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) next_group = state_group count = 0 @@ -340,6 +342,8 @@ class StateStore(SQLBaseStore): key = (row["type"], row["state_key"]) results[group][key] = row["event_id"] else: + # We don't use WITH RECURSIVE on sqlite3 as there are distributions + # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) for group in groups: group_tree = [group] next_group = group -- cgit 1.4.1 From 70332a12dd0a2ea01e1f8f835dcb5ca15526a5f3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 14:57:14 +0100 Subject: Take value in a better way --- synapse/storage/events.py | 2 +- synapse/storage/state.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index bec35ea68d..ed182c8d11 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1616,7 +1616,7 @@ class EventsStore(SQLBaseStore): curr_state = self._get_state_groups_from_groups_txn( txn, [new_state_edge], types=None ) - curr_state = curr_state.values()[0] + curr_state = curr_state[new_state_edge] self._simple_delete_txn( txn, diff --git a/synapse/storage/state.py b/synapse/storage/state.py index e790793370..589a4fec6e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -693,12 +693,12 @@ class StateStore(SQLBaseStore): prev_state = self._get_state_groups_from_groups_txn( txn, [prev_group], types=None ) - prev_state = prev_state.values()[0] + prev_state = prev_state[prev_group] curr_state = self._get_state_groups_from_groups_txn( txn, [state_group], types=None ) - curr_state = curr_state.values()[0] + curr_state = curr_state[state_group] if not set(prev_state.keys()) - set(curr_state.keys()): # We can only do a delta if the current has a strict super set -- cgit 1.4.1 From a7032abb2e64f79be5823b770230cb223cc22ff1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 15:07:23 +0100 Subject: Correctly handle reindexing state groups that already have an edge --- synapse/storage/state.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 589a4fec6e..af3ddd962d 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -673,6 +673,17 @@ class StateStore(SQLBaseStore): if not row or not state_group: return True, count + txn.execute( + "SELECT state_group FROM state_group_edges" + " WHERE state_group = ?", + (state_group,) + ) + + # If we reach a point where we've already started inserting + # edges we should stop. + if txn.fetchall(): + return True, count + txn.execute( "SELECT coalesce(max(id), 0) FROM state_groups" " WHERE id < ? AND room_id = ?", @@ -709,6 +720,14 @@ class StateStore(SQLBaseStore): if prev_state.get(key, None) != value } + self._simple_delete_txn( + txn, + table="state_group_edges", + keyvalues={ + "state_group": state_group, + } + ) + self._simple_insert_txn( txn, table="state_group_edges", -- cgit 1.4.1 From 0595413c0fe51d4f400f597bf57cd13d5e3450e3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 15:49:57 +0100 Subject: Scale the batch size so that we're not bitten by the minimum --- synapse/storage/state.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index af3ddd962d..0730399b80 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -649,6 +649,10 @@ class StateStore(SQLBaseStore): rows_inserted = progress.get("rows_inserted", 0) max_group = progress.get("max_group", None) + BATCH_SIZE_SCALE_FACTOR = 100 + + batch_size = max(1, int(batch_size / BATCH_SIZE_SCALE_FACTOR)) + if max_group is None: rows = yield self._execute( "_background_deduplicate_state", None, @@ -779,4 +783,4 @@ class StateStore(SQLBaseStore): if finished: yield self._end_background_update(self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME) - defer.returnValue(result) + defer.returnValue(result * BATCH_SIZE_SCALE_FACTOR) -- cgit 1.4.1 From f4164edb7020e4031a285e3723e7ad57d0486df9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 6 Sep 2016 11:26:37 +0100 Subject: Move _add_messages_to_device_inbox_txn into a separate method --- synapse/storage/deviceinbox.py | 69 ++++++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 37 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 68116b0394..57202a5bda 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -37,9 +37,21 @@ class DeviceInboxStore(SQLBaseStore): inserted. """ - def select_devices_txn(txn, user_id, devices): - if not devices: - return [] + with self._device_inbox_id_gen.get_next() as stream_id: + yield self.runInteraction( + "add_messages_to_device_inbox", + self._add_messages_to_device_inbox_txn, + stream_id, + messages_by_user_then_device, + ) + + defer.returnValue(self._device_inbox_id_gen.get_current_token()) + + def _add_messages_to_device_inbox_txn(self, txn, stream_id, + messages_by_user_then_device): + local_users_and_devices = set() + for user_id, messages_by_device in messages_by_user_then_device.items(): + devices = messages_by_device.keys() sql = ( "SELECT user_id, device_id FROM devices" " WHERE user_id = ? AND device_id IN (" @@ -48,41 +60,24 @@ class DeviceInboxStore(SQLBaseStore): ) # TODO: Maybe this needs to be done in batches if there are # too many local devices for a given user. - args = [user_id] + devices - txn.execute(sql, args) - return [tuple(row) for row in txn.fetchall()] - - def add_messages_to_device_inbox_txn(txn, stream_id): - local_users_and_devices = set() - for user_id, messages_by_device in messages_by_user_then_device.items(): - local_users_and_devices.update( - select_devices_txn(txn, user_id, messages_by_device.keys()) - ) - - sql = ( - "INSERT INTO device_inbox" - " (user_id, device_id, stream_id, message_json)" - " VALUES (?,?,?,?)" - ) - rows = [] - for user_id, messages_by_device in messages_by_user_then_device.items(): - for device_id, message in messages_by_device.items(): - message_json = ujson.dumps(message) - # Only insert into the local inbox if the device exists on - # this server - if (user_id, device_id) in local_users_and_devices: - rows.append((user_id, device_id, stream_id, message_json)) - - txn.executemany(sql, rows) - - with self._device_inbox_id_gen.get_next() as stream_id: - yield self.runInteraction( - "add_messages_to_device_inbox", - add_messages_to_device_inbox_txn, - stream_id - ) + txn.execute(sql, [user_id] + devices) + local_users_and_devices.update(map(tuple, txn.fetchall())) - defer.returnValue(self._device_inbox_id_gen.get_current_token()) + sql = ( + "INSERT INTO device_inbox" + " (user_id, device_id, stream_id, message_json)" + " VALUES (?,?,?,?)" + ) + rows = [] + for user_id, messages_by_device in messages_by_user_then_device.items(): + for device_id, message in messages_by_device.items(): + message_json = ujson.dumps(message) + # Only insert into the local inbox if the device exists on + # this server + if (user_id, device_id) in local_users_and_devices: + rows.append((user_id, device_id, stream_id, message_json)) + + txn.executemany(sql, rows) def get_new_messages_for_device( self, user_id, device_id, last_stream_id, current_stream_id, limit=100 -- cgit 1.4.1 From 2ad72da93181db7850f6ba8f0635bdc0924e6d8c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 6 Sep 2016 15:10:29 +0100 Subject: Add tables for federated device messages Adds tables for storing the messages that need to be sent to a remote device and for deduplicating messages received. --- synapse/storage/schema/delta/34/device_outbox.sql | 38 +++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 synapse/storage/schema/delta/34/device_outbox.sql (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/34/device_outbox.sql b/synapse/storage/schema/delta/34/device_outbox.sql new file mode 100644 index 0000000000..a319f73e47 --- /dev/null +++ b/synapse/storage/schema/delta/34/device_outbox.sql @@ -0,0 +1,38 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE device_federation_outbox ( + destination TEXT NOT NULL, + stream_id BIGINT NOT NULL, + sender TEXT NOT NULL, + message_id TEXT NOT NULL, + sent_ts BIGINT NOT NULL, + messages_json TEXT NOT NULL +); + + +CREATE INDEX device_federation_outbox_destination_id + ON device_federation_outbox(destination, stream_id); + + +CREATE TABLE device_federation_inbox ( + origin TEXT NOT NULL, + message_id TEXT NOT NULL, + received_ts BIGINT NOT NULL +); + + +CREATE INDEX device_federation_inbox_sender_id + ON device_federation_inbox(origin, message_id); -- cgit 1.4.1 From e020834e4fcce9ba0c7d60d270bbdf40a1d2a336 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 6 Sep 2016 15:12:13 +0100 Subject: Add storage methods for federated device messages --- synapse/storage/deviceinbox.py | 139 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 132 insertions(+), 7 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 57202a5bda..988577a334 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -27,28 +27,89 @@ logger = logging.getLogger(__name__) class DeviceInboxStore(SQLBaseStore): @defer.inlineCallbacks - def add_messages_to_device_inbox(self, messages_by_user_then_device): - """ + def add_messages_to_device_inbox(self, local_messages_by_user_then_device, + remote_messages_by_destination): + """Used to send messages from this server. + Args: - messages_by_user_and_device(dict): + sender_user_id(str): The ID of the user sending these messages. + local_messages_by_user_and_device(dict): Dictionary of user_id to device_id to message. + remote_messages_by_destination(dict): + Dictionary of destination server_name to the EDU JSON to send. Returns: A deferred stream_id that resolves when the messages have been inserted. """ + def add_messages_to_device_federation_outbox(txn, now_ms, stream_id): + sql = ( + "INSERT INTO device_federation_outbox" + " (destination, stream_id, queued_ts, messages_json)" + " VALUES (?,?,?,?)" + ) + rows = [] + for destination, edu in remote_messages_by_destination.items(): + edu_json = ujson.dumps(edu) + rows.append((destination, stream_id, now_ms, edu_json)) + + txn.executemany(sql, rows) + + def add_messages_txn(txn, now_ms, stream_id): + self._add_messages_to_local_device_inbox_txn( + txn, stream_id, local_messages_by_user_then_device + ) + add_messages_to_device_federation_outbox(now_ms, stream_id) + with self._device_inbox_id_gen.get_next() as stream_id: + now_ms = self.clock.time_now_ms() yield self.runInteraction( "add_messages_to_device_inbox", - self._add_messages_to_device_inbox_txn, + add_messages_txn, + now_ms, stream_id, - messages_by_user_then_device, ) defer.returnValue(self._device_inbox_id_gen.get_current_token()) - def _add_messages_to_device_inbox_txn(self, txn, stream_id, - messages_by_user_then_device): + @defer.inlineCallbacks + def add_messages_from_remote_to_device_inbox( + self, origin, message_id, local_messages_by_user_then_device + ): + def add_messages_txn(txn, now_ms, stream_id): + already_inserted = self._simple_select_one_txn( + txn, table="device_federation_inbox", + keyvalues={"origin": origin, "message_id": message_id}, + retcols=("message_id",), + allow_none=True, + ) + if already_inserted is not None: + return + + self._simple_insert_txn( + txn, table="device_federation_inbox", + values={ + "origin": origin, + "message_id": message_id, + "received_ts": now_ms, + }, + ) + + self._add_messages_to_local_device_inbox_txn( + txn, stream_id, local_messages_by_user_then_device + ) + + with self._device_inbox_id_gen.get_next() as stream_id: + now_ms = self.clock.time_now_ms() + yield self.runInteraction( + "add_messages_from_remote_to_device_inbox", + add_messages_txn, + now_ms, + stream_id, + ) + + def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, + messages_by_user_then_device): local_users_and_devices = set() for user_id, messages_by_device in messages_by_user_then_device.items(): devices = messages_by_device.keys() @@ -177,3 +238,67 @@ class DeviceInboxStore(SQLBaseStore): def get_to_device_stream_token(self): return self._device_inbox_id_gen.get_current_token() + + @defer.inlineCallbacks + def get_new_device_messages_for_remote_destination( + self, destination, last_stream_id, current_stream_id, limit=100 + ): + """ + Args: + destination(str): The name of the remote server. + last_stream_id(int): The last position of the device message stream + that the server sent up to. + current_stream_id(int): The current position of the device + message stream. + Returns: + Deferred ([dict], int): List of messages for the device and where + in the stream the messages got to. + """ + def get_new_messages_for_remote_destination_txn(txn): + sql = ( + "SELECT stream_id, messages_json FROM device_federation_outbox" + " WHERE destination = ?" + " AND ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC" + " LIMIT ?" + ) + txn.execute(sql, ( + destination, last_stream_id, current_stream_id, limit + )) + messages = [] + for row in txn.fetchall(): + stream_pos = row[0] + messages.append(ujson.loads(row[1])) + if len(messages) < limit: + stream_pos = current_stream_id + return (messages, stream_pos) + + return self.runInteraction( + "get_new_device_messages_for_remote_destination", + get_new_messages_for_remote_destination_txn, + ) + + @defer.inlineCallbacks + def delete_device_messages_for_remote_destination(self, destination, + up_to_stream_id): + """Used to delete messages when the remote destination acknowledges + their receipt. + + Args: + destination(str): The destination server_name + up_to_stream_id(int): Where to delete messages up to. + Returns: + A deferred that resolves when the messages have been deleted. + """ + def delete_messages_for_remote_destination_txn(txn): + sql = ( + "DELETE FROM device_federation_outbox" + " WHERE destination = ? AND" + " AND stream_id <= ?" + ) + txn.execute(sql, (destination, up_to_stream_id)) + + return self.runInteraction( + "delete_device_messages_for_remote_destination", + delete_messages_for_remote_destination_txn + ) -- cgit 1.4.1 From d4a35ada28302e096efd42e1a2a28542ed7ebd6f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 6 Sep 2016 18:16:20 +0100 Subject: Send device messages over federation --- synapse/federation/federation_server.py | 2 +- synapse/federation/transaction_queue.py | 43 ++++++-- synapse/handlers/devicemessage.py | 121 ++++++++++++++++++++++ synapse/rest/client/v2_alpha/sendtodevice.py | 33 ++---- synapse/server.py | 5 + synapse/storage/deviceinbox.py | 19 ++-- synapse/storage/schema/delta/34/device_outbox.sql | 4 +- 7 files changed, 179 insertions(+), 48 deletions(-) create mode 100644 synapse/handlers/devicemessage.py (limited to 'synapse/storage') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5621655098..3fa7b2315c 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -188,7 +188,7 @@ class FederationServer(FederationBase): except SynapseError as e: logger.info("Failed to handle edu %r: %r", edu_type, e) except Exception as e: - logger.exception("Failed to handle edu %r", edu_type, e) + logger.exception("Failed to handle edu %r", edu_type) else: logger.warn("Received EDU of type %s with no handler", edu_type) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index cb2ef0210c..5e86141f86 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -17,7 +17,7 @@ from twisted.internet import defer from .persistence import TransactionActions -from .units import Transaction +from .units import Transaction, Edu from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor @@ -187,6 +187,24 @@ class TransactionQueue(object): destination, pending_pdus, pending_edus, pending_failures ) + @defer.inlineCallbacks + def _get_new_device_messages(self, destination): + last_device_stream_id = 0 + to_device_stream_id = self.store.get_to_device_stream_token() + contents, stream_id = yield self.store.get_new_device_msgs_for_remote( + destination, last_device_stream_id, to_device_stream_id + ) + edus = [ + Edu( + origin=self.server_name, + destination=destination, + edu_type="m.direct_to_device", + content=content, + ) + for content in contents + ] + defer.returnValue((edus, stream_id)) + @measure_func("_send_new_transaction") @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, @@ -211,13 +229,19 @@ class TransactionQueue(object): self.store, ) + device_message_edus, device_stream_id = ( + yield self._get_new_device_messages(destination) + ) + + edus.extend(device_message_edus) + logger.debug( "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d, failures: %d)", destination, txn_id, - len(pending_pdus), - len(pending_edus), - len(pending_failures) + len(pdus), + len(edus), + len(failures) ) logger.debug("TX [%s] Persisting transaction...", destination) @@ -242,9 +266,9 @@ class TransactionQueue(object): " (PDUs: %d, EDUs: %d, failures: %d)", destination, txn_id, transaction.transaction_id, - len(pending_pdus), - len(pending_edus), - len(pending_failures), + len(pdus), + len(edus), + len(failures), ) with limiter: @@ -299,6 +323,11 @@ class TransactionQueue(object): logger.info( "Failed to send event %s to %s", p.event_id, destination ) + else: + # Remove the acknowledged device messages from the database + yield self.store.delete_device_msgs_for_remote( + destination, device_stream_id + ) except NotRetryingDestination: logger.info( "TX [%s] not ready for retry yet - " diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py new file mode 100644 index 0000000000..7e59c0d487 --- /dev/null +++ b/synapse/handlers/devicemessage.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.types import get_domain_from_id +from synapse.util.stringutils import random_string + + +logger = logging.getLogger(__name__) + + +class DeviceMessageHandler(object): + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id + self.federation = hs.get_replication_layer() + + self.federation.register_edu_handler( + "m.direct_to_device", self.on_direct_to_device_edu + ) + + @defer.inlineCallbacks + def on_direct_to_device_edu(self, origin, content): + local_messages = {} + sender_user_id = content["sender"] + if origin != get_domain_from_id(sender_user_id): + logger.warn( + "Dropping device message from %r with spoofed sender %r", + origin, sender_user_id + ) + message_type = content["type"] + message_id = content["message_id"] + for user_id, by_device in content["messages"].items(): + messages_by_device = { + device_id: { + "content": message_content, + "type": message_type, + "sender": sender_user_id, + } + for device_id, message_content in by_device.items() + } + if messages_by_device: + local_messages[user_id] = messages_by_device + + stream_id = yield self.store.add_messages_from_remote_to_device_inbox( + origin, message_id, local_messages + ) + + self.notifier.on_new_event( + "to_device_key", stream_id, users=local_messages.keys() + ) + + @defer.inlineCallbacks + def send_device_message(self, sender_user_id, message_type, messages): + + local_messages = {} + remote_messages = {} + for user_id, by_device in messages.items(): + if self.is_mine_id(user_id): + messages_by_device = { + device_id: { + "content": message_content, + "type": message_type, + "sender": sender_user_id, + } + for device_id, message_content in by_device.items() + } + if messages_by_device: + local_messages[user_id] = messages_by_device + else: + destination = get_domain_from_id(user_id) + remote_messages.setdefault(destination, {})[user_id] = by_device + + message_id = random_string(16) + + remote_edu_contents = {} + for destination, messages in remote_messages.items(): + remote_edu_contents[destination] = { + "messages": messages, + "sender": sender_user_id, + "type": message_type, + "message_id": message_id, + } + + stream_id = yield self.store.add_messages_to_device_inbox( + local_messages, remote_edu_contents + ) + + self.notifier.on_new_event( + "to_device_key", stream_id, users=local_messages.keys() + ) + + for destination in remote_messages.keys(): + # Hack to send make synapse send a federation transaction + # to the remote servers. + self.federation.send_edu( + destination=destination, + edu_type="m.ping", + content={}, + ) diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 9c10a99acf..5975164b37 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -16,10 +16,11 @@ import logging from twisted.internet import defer -from synapse.http.servlet import parse_json_object_from_request from synapse.http import servlet +from synapse.http.servlet import parse_json_object_from_request from synapse.rest.client.v1.transactions import HttpTransactionStore + from ._base import client_v2_patterns logger = logging.getLogger(__name__) @@ -39,10 +40,8 @@ class SendToDeviceRestServlet(servlet.RestServlet): super(SendToDeviceRestServlet, self).__init__() self.hs = hs self.auth = hs.get_auth() - self.store = hs.get_datastore() - self.notifier = hs.get_notifier() - self.is_mine_id = hs.is_mine_id self.txns = HttpTransactionStore() + self.device_message_handler = hs.get_device_message_handler() @defer.inlineCallbacks def on_PUT(self, request, message_type, txn_id): @@ -57,28 +56,10 @@ class SendToDeviceRestServlet(servlet.RestServlet): content = parse_json_object_from_request(request) - # TODO: Prod the notifier to wake up sync streams. - # TODO: Implement replication for the messages. - # TODO: Send the messages to remote servers if needed. - - local_messages = {} - for user_id, by_device in content["messages"].items(): - if self.is_mine_id(user_id): - messages_by_device = { - device_id: { - "content": message_content, - "type": message_type, - "sender": requester.user.to_string(), - } - for device_id, message_content in by_device.items() - } - if messages_by_device: - local_messages[user_id] = messages_by_device - - stream_id = yield self.store.add_messages_to_device_inbox(local_messages) - - self.notifier.on_new_event( - "to_device_key", stream_id, users=local_messages.keys() + sender_user_id = requester.user.to_string() + + yield self.device_message_handler.send_device_message( + sender_user_id, message_type, content["messages"] ) response = (200, {}) diff --git a/synapse/server.py b/synapse/server.py index af3246504b..f516f08167 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -35,6 +35,7 @@ from synapse.federation import initialize_http_replication from synapse.handlers import Handlers from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.auth import AuthHandler +from synapse.handlers.devicemessage import DeviceMessageHandler from synapse.handlers.device import DeviceHandler from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.presence import PresenceHandler @@ -100,6 +101,7 @@ class HomeServer(object): 'application_service_api', 'application_service_scheduler', 'application_service_handler', + 'device_message_handler', 'notifier', 'distributor', 'client_resource', @@ -205,6 +207,9 @@ class HomeServer(object): def build_device_handler(self): return DeviceHandler(self) + def build_device_message_handler(self): + return DeviceMessageHandler(self) + def build_e2e_keys_handler(self): return E2eKeysHandler(self) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 988577a334..d9f91ccc4e 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -59,10 +59,10 @@ class DeviceInboxStore(SQLBaseStore): self._add_messages_to_local_device_inbox_txn( txn, stream_id, local_messages_by_user_then_device ) - add_messages_to_device_federation_outbox(now_ms, stream_id) + add_messages_to_device_federation_outbox(txn, now_ms, stream_id) with self._device_inbox_id_gen.get_next() as stream_id: - now_ms = self.clock.time_now_ms() + now_ms = self.clock.time_msec() yield self.runInteraction( "add_messages_to_device_inbox", add_messages_txn, @@ -100,7 +100,7 @@ class DeviceInboxStore(SQLBaseStore): ) with self._device_inbox_id_gen.get_next() as stream_id: - now_ms = self.clock.time_now_ms() + now_ms = self.clock.time_msec() yield self.runInteraction( "add_messages_from_remote_to_device_inbox", add_messages_txn, @@ -239,8 +239,7 @@ class DeviceInboxStore(SQLBaseStore): def get_to_device_stream_token(self): return self._device_inbox_id_gen.get_current_token() - @defer.inlineCallbacks - def get_new_device_messages_for_remote_destination( + def get_new_device_msgs_for_remote( self, destination, last_stream_id, current_stream_id, limit=100 ): """ @@ -274,13 +273,11 @@ class DeviceInboxStore(SQLBaseStore): return (messages, stream_pos) return self.runInteraction( - "get_new_device_messages_for_remote_destination", + "get_new_device_msgs_for_remote", get_new_messages_for_remote_destination_txn, ) - @defer.inlineCallbacks - def delete_device_messages_for_remote_destination(self, destination, - up_to_stream_id): + def delete_device_msgs_for_remote(self, destination, up_to_stream_id): """Used to delete messages when the remote destination acknowledges their receipt. @@ -293,12 +290,12 @@ class DeviceInboxStore(SQLBaseStore): def delete_messages_for_remote_destination_txn(txn): sql = ( "DELETE FROM device_federation_outbox" - " WHERE destination = ? AND" + " WHERE destination = ?" " AND stream_id <= ?" ) txn.execute(sql, (destination, up_to_stream_id)) return self.runInteraction( - "delete_device_messages_for_remote_destination", + "delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn ) diff --git a/synapse/storage/schema/delta/34/device_outbox.sql b/synapse/storage/schema/delta/34/device_outbox.sql index a319f73e47..e87066d9a1 100644 --- a/synapse/storage/schema/delta/34/device_outbox.sql +++ b/synapse/storage/schema/delta/34/device_outbox.sql @@ -16,9 +16,7 @@ CREATE TABLE device_federation_outbox ( destination TEXT NOT NULL, stream_id BIGINT NOT NULL, - sender TEXT NOT NULL, - message_id TEXT NOT NULL, - sent_ts BIGINT NOT NULL, + queued_ts BIGINT NOT NULL, messages_json TEXT NOT NULL ); -- cgit 1.4.1 From 7d893beebe9d2fbab5a9b105992433efe8fab417 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 12:03:37 +0100 Subject: Comment the add_messages storage functions --- synapse/storage/deviceinbox.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index d9f91ccc4e..61da0e89e6 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -42,7 +42,15 @@ class DeviceInboxStore(SQLBaseStore): inserted. """ - def add_messages_to_device_federation_outbox(txn, now_ms, stream_id): + def add_messages_txn(txn, now_ms, stream_id): + # Add the local messages directly to the local inbox. + self._add_messages_to_local_device_inbox_txn( + txn, stream_id, local_messages_by_user_then_device + ) + + # Add the remote messages to the federation outbox. + # We'll send them to a remote server when we next send a + # federation transaction to that destination. sql = ( "INSERT INTO device_federation_outbox" " (destination, stream_id, queued_ts, messages_json)" @@ -52,15 +60,8 @@ class DeviceInboxStore(SQLBaseStore): for destination, edu in remote_messages_by_destination.items(): edu_json = ujson.dumps(edu) rows.append((destination, stream_id, now_ms, edu_json)) - txn.executemany(sql, rows) - def add_messages_txn(txn, now_ms, stream_id): - self._add_messages_to_local_device_inbox_txn( - txn, stream_id, local_messages_by_user_then_device - ) - add_messages_to_device_federation_outbox(txn, now_ms, stream_id) - with self._device_inbox_id_gen.get_next() as stream_id: now_ms = self.clock.time_msec() yield self.runInteraction( @@ -77,6 +78,9 @@ class DeviceInboxStore(SQLBaseStore): self, origin, message_id, local_messages_by_user_then_device ): def add_messages_txn(txn, now_ms, stream_id): + # Check if we've already inserted a matching message_id for that + # origin. This can happen if the origin doesn't receive our + # acknowledgement from the first time we received the message. already_inserted = self._simple_select_one_txn( txn, table="device_federation_inbox", keyvalues={"origin": origin, "message_id": message_id}, @@ -86,6 +90,8 @@ class DeviceInboxStore(SQLBaseStore): if already_inserted is not None: return + # Add an entry for this message_id so that we know we've processed + # it. self._simple_insert_txn( txn, table="device_federation_inbox", values={ @@ -95,6 +101,8 @@ class DeviceInboxStore(SQLBaseStore): }, ) + # Add the messages to the approriate local device inboxes so that + # they'll be sent to the devices when they next sync. self._add_messages_to_local_device_inbox_txn( txn, stream_id, local_messages_by_user_then_device ) -- cgit 1.4.1 From d25c20ccbe0f10fe5d6c0cef2156db7e8d76049c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Sep 2016 14:22:22 +0100 Subject: Use windowing function to make use of index --- synapse/storage/state.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 0730399b80..26ecad5907 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -322,11 +322,11 @@ class StateStore(SQLBaseStore): SELECT prev_state_group FROM state_group_edges e, state s WHERE s.state_group = e.state_group ) - SELECT type, state_key, event_id FROM state_groups_state - WHERE ROW(type, state_key, state_group) IN ( - SELECT type, state_key, max(state_group) FROM state - INNER JOIN state_groups_state USING (state_group) - GROUP BY type, state_key + SELECT type, state_key, last_value(event_id) OVER ( + PARTITION BY type, state_key ORDER BY state_group ASC + ) AS event_id FROM state_groups_state + WHERE state_group IN ( + SELECT state_group FROM state ) %s; """) % (where_clause,) -- cgit 1.4.1 From fadb01551a897fdf1a2cbe43ff463c9616bd11ad Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Sep 2016 14:39:01 +0100 Subject: Add appopriate framing clause --- synapse/storage/state.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 26ecad5907..382f308a60 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -324,6 +324,7 @@ class StateStore(SQLBaseStore): ) SELECT type, state_key, last_value(event_id) OVER ( PARTITION BY type, state_key ORDER BY state_group ASC + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) AS event_id FROM state_groups_state WHERE state_group IN ( SELECT state_group FROM state -- cgit 1.4.1 From 513188aa56bc680a54dbdf6d40657da72c5c6877 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Sep 2016 14:53:19 +0100 Subject: Comment --- synapse/storage/state.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 382f308a60..d6643473db 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -315,6 +315,10 @@ class StateStore(SQLBaseStore): # against `state_groups_state` to fetch the latest state. # It assumes that previous state groups are always numerically # lesser. + # The PARTITION is used to get the event_id in the greatest state + # group for the given type, state_key. + # This may return multiple rows per (type, state_key), but last_value + # should be the same. sql = (""" WITH RECURSIVE state(state_group) AS ( VALUES(?::bigint) -- cgit 1.4.1 From 31a07d2335dd628afb32f71167849ad88685525a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 15:27:07 +0100 Subject: Add stream change caches for device messages --- synapse/federation/transaction_queue.py | 5 ++++- synapse/storage/__init__.py | 24 ++++++++++++++++++++++++ synapse/storage/deviceinbox.py | 25 +++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5e86141f86..233c6606a9 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -81,6 +81,8 @@ class TransactionQueue(object): # destination -> list of tuple(failure, deferred) self.pending_failures_by_dest = {} + self.last_device_stream_id_by_dest = {} + # HACK to get unique tx id self._next_txn_id = int(self.clock.time_msec()) @@ -189,7 +191,7 @@ class TransactionQueue(object): @defer.inlineCallbacks def _get_new_device_messages(self, destination): - last_device_stream_id = 0 + last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0) to_device_stream_id = self.store.get_to_device_stream_token() contents, stream_id = yield self.store.get_new_device_msgs_for_remote( destination, last_device_stream_id, to_device_stream_id @@ -328,6 +330,7 @@ class TransactionQueue(object): yield self.store.delete_device_msgs_for_remote( destination, device_stream_id ) + self.last_device_stream_id_by_dest[destination] = device_stream_id except NotRetryingDestination: logger.info( "TX [%s] not ready for retry yet - " diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6c32773f25..6965daddc5 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -182,6 +182,30 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=push_rules_prefill, ) + max_device_inbox_id = self._device_inbox_id_gen.get_current_token() + device_inbox_prefill, min_device_inbox_id = self._get_cache_dict( + db_conn, "device_inbox", + entity_column="user_id", + stream_column="stream_id", + max_value=max_device_inbox_id + ) + self._device_inbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", min_device_inbox_id, + prefilled_cache=device_inbox_prefill, + ) + # The federation outbox and the local device inbox uses the same + # stream_id generator. + device_outbox_prefill, min_device_outbox_id = self._get_cache_dict( + db_conn, "device_federation_outbox", + entity_column="destination", + stream_column="stream_id", + max_value=max_device_inbox_id, + ) + self._device_federation_outbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", min_device_outbox_id, + prefilled_cache=device_outbox_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 61da0e89e6..0d37bb961b 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -70,6 +70,14 @@ class DeviceInboxStore(SQLBaseStore): now_ms, stream_id, ) + for user_id in local_messages_by_user_then_device.keys(): + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) + for destination in remote_messages_by_destination.keys(): + self._device_federation_outbox_stream_cache.entity_has_changed( + destination, stream_id + ) defer.returnValue(self._device_inbox_id_gen.get_current_token()) @@ -115,6 +123,10 @@ class DeviceInboxStore(SQLBaseStore): now_ms, stream_id, ) + for user_id in local_messages_by_user_then_device.keys(): + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, messages_by_user_then_device): @@ -161,6 +173,12 @@ class DeviceInboxStore(SQLBaseStore): Deferred ([dict], int): List of messages for the device and where in the stream the messages got to. """ + has_changed = self._device_inbox_stream_cache.has_entity_changed( + user_id, last_stream_id + ) + if not has_changed: + return defer.succeed(([], current_stream_id)) + def get_new_messages_for_device_txn(txn): sql = ( "SELECT stream_id, message_json FROM device_inbox" @@ -261,6 +279,13 @@ class DeviceInboxStore(SQLBaseStore): Deferred ([dict], int): List of messages for the device and where in the stream the messages got to. """ + + has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( + destination, last_stream_id + ) + if not has_changed: + return defer.succeed(([], current_stream_id)) + def get_new_messages_for_remote_destination_txn(txn): sql = ( "SELECT stream_id, messages_json FROM device_federation_outbox" -- cgit 1.4.1 From 2a0159b8aeaf8dce808345e2266c6d3301fa055a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 15:58:00 +0100 Subject: Fix the stream change cache to work over replication --- synapse/replication/slave/storage/deviceinbox.py | 11 +++++++++++ synapse/storage/__init__.py | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 64d8eb2af1..251078ba57 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -16,6 +16,7 @@ from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker from synapse.storage import DataStore +from synapse.util.caches.stream_change_cache import StreamChangeCache class SlavedDeviceInboxStore(BaseSlavedStore): @@ -24,6 +25,10 @@ class SlavedDeviceInboxStore(BaseSlavedStore): self._device_inbox_id_gen = SlavedIdTracker( db_conn, "device_inbox", "stream_id", ) + self._device_inbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", + self._device_inbox_id_gen.get_current_token() + ) get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__ get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__ @@ -38,5 +43,11 @@ class SlavedDeviceInboxStore(BaseSlavedStore): stream = result.get("to_device") if stream: self._device_inbox_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + stream_id = row[0] + user_id = row[1] + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) return super(SlavedDeviceInboxStore, self).process_replication(result) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6965daddc5..828e5ca60b 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -202,7 +202,7 @@ class DataStore(RoomMemberStore, RoomStore, max_value=max_device_inbox_id, ) self._device_federation_outbox_stream_cache = StreamChangeCache( - "DeviceInboxStreamChangeCache", min_device_outbox_id, + "DeviceFederationOutboxStreamChangeCache", min_device_outbox_id, prefilled_cache=device_outbox_prefill, ) -- cgit 1.4.1 From 85b51fdd6bc16b0b673130da760eb930e414af5c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 17:19:18 +0100 Subject: Log the types and values when failing to store devices --- synapse/storage/devices.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index afd6530cab..17920d4480 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -54,8 +54,12 @@ class DeviceStore(SQLBaseStore): or_ignore=ignore_if_known, ) except Exception as e: - logger.error("store_device with device_id=%s failed: %s", - device_id, e) + logger.error("store_device with device_id=%s(%r) user_id=%s(%r)" + " display_name=%s(%r) failed: %s", + type(device_id).__name__, device_id, + type(user_id).__name__, user_id, + type(initial_device_display_name).__name__, + initial_device_display_name, e) raise StoreError(500, "Problem storing device.") def get_device(self, user_id, device_id): -- cgit 1.4.1 From b568ca309c5724d28b6ebd9c0a3cd8179fa6d6d3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Sep 2016 09:38:54 +0100 Subject: Temporarily disable sequential scans for state fetching --- synapse/storage/state.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index d6643473db..fef87834ca 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -310,6 +310,10 @@ class StateStore(SQLBaseStore): results = {group: {} for group in groups} if isinstance(self.database_engine, PostgresEngine): + # Temporarily disable sequential scans in this transaction. This is + # a temporary hack until we can add the right indices in + txn.execute("SET LOCAL enable_seqscan=off") + # The below query walks the state_group tree so that the "state" # table includes all state_groups in the tree. It then joins # against `state_groups_state` to fetch the latest state. -- cgit 1.4.1 From c5b49eb7ca7327cba0e3658b1ec84cca823c8b54 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 8 Sep 2016 09:40:10 +0100 Subject: Fix /notifications API when used with `from` param --- synapse/storage/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index eb15fb751b..56dce4b616 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -342,7 +342,7 @@ class EventPushActionsStore(SQLBaseStore): def f(txn): before_clause = "" if before: - before_clause = "AND stream_ordering < ?" + before_clause = "AND epa.stream_ordering < ?" args = [user_id, before, limit] else: args = [user_id, limit] -- cgit 1.4.1 From 61cd9af09bc66f29d6a740f445047624d48fda8c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Sep 2016 13:40:46 +0100 Subject: Log delta files we're applying --- synapse/storage/prepare_database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index b1fbc4ffa5..7efbe51cda 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -242,7 +242,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, module = imp.load_source( module_name, absolute_path, python_file ) - logger.debug("Running script %s", relative_path) + logger.info("Running script %s", relative_path) module.run_create(cur, database_engine) if not is_empty: module.run_upgrade(cur, database_engine, config=config) @@ -253,7 +253,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, pass elif ext == ".sql": # A plain old .sql file, just read and execute it - logger.debug("Applying schema %s", relative_path) + logger.info("Applying schema %s", relative_path) executescript(cur, absolute_path) else: # Not a valid delta file. -- cgit 1.4.1 From 4ef222ab6142942eb76d4e0cdea57c1114b0c28a Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 8 Sep 2016 13:43:35 +0100 Subject: Implement `only=highlight` on `/notifications` --- synapse/rest/client/v2_alpha/notifications.py | 3 ++- synapse/storage/event_push_actions.py | 9 ++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/rest/client/v2_alpha/notifications.py b/synapse/rest/client/v2_alpha/notifications.py index f1a48acf07..fd2a3d69d4 100644 --- a/synapse/rest/client/v2_alpha/notifications.py +++ b/synapse/rest/client/v2_alpha/notifications.py @@ -45,11 +45,12 @@ class NotificationsServlet(RestServlet): from_token = parse_string(request, "from", required=False) limit = parse_integer(request, "limit", default=50) + only = parse_string(request, "only", required=False) limit = min(limit, 500) push_actions = yield self.store.get_push_actions_for_user( - user_id, from_token, limit + user_id, from_token, limit, only_highlight=(only == "highlight") ) receipts_by_room = yield self.store.get_receipts_for_user_with_orderings( diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index eb15fb751b..dedf517cfa 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -338,7 +338,8 @@ class EventPushActionsStore(SQLBaseStore): defer.returnValue(notifs[:limit]) @defer.inlineCallbacks - def get_push_actions_for_user(self, user_id, before=None, limit=50): + def get_push_actions_for_user(self, user_id, before=None, limit=50, + only_highlight=False): def f(txn): before_clause = "" if before: @@ -346,6 +347,12 @@ class EventPushActionsStore(SQLBaseStore): args = [user_id, before, limit] else: args = [user_id, limit] + + if only_highlight: + if len(before_clause) > 0: + before_clause += " " + before_clause += "AND epa.highlight = 1" + sql = ( "SELECT epa.event_id, epa.room_id," " epa.stream_ordering, epa.topological_ordering," -- cgit 1.4.1 From a1c8f268e5948d6466d64ef983b98fce287ec907 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 8 Sep 2016 15:13:05 +0100 Subject: Support wildcard device_ids for direct to device messages --- synapse/storage/deviceinbox.py | 54 ++++++++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 18 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 0d37bb961b..658fbef27b 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -130,19 +130,41 @@ class DeviceInboxStore(SQLBaseStore): def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, messages_by_user_then_device): - local_users_and_devices = set() + local_by_user_then_device = {} for user_id, messages_by_device in messages_by_user_then_device.items(): + messages_json_for_user = {} devices = messages_by_device.keys() - sql = ( - "SELECT user_id, device_id FROM devices" - " WHERE user_id = ? AND device_id IN (" - + ",".join("?" * len(devices)) - + ")" - ) - # TODO: Maybe this needs to be done in batches if there are - # too many local devices for a given user. - txn.execute(sql, [user_id] + devices) - local_users_and_devices.update(map(tuple, txn.fetchall())) + if len(devices) == 1 and devices[0] == "*": + # Handle wildcard device_ids. + sql = ( + "SELECT device_id FROM devices" + " WHERE user_id = ?" + ) + txn.execute(sql, (user_id,)) + message_json = ujson.dumps(messages_by_device["*"]) + for row in txn.fetchall(): + # Add the message for all devices for this user on this + # server. + device = row[0] + messages_json_for_user[device] = message_json + else: + sql = ( + "SELECT device_id FROM devices" + " WHERE user_id = ? AND device_id IN (" + + ",".join("?" * len(devices)) + + ")" + ) + # TODO: Maybe this needs to be done in batches if there are + # too many local devices for a given user. + txn.execute(sql, [user_id] + devices) + for row in txn.fetchall(): + # Only insert into the local inbox if the device exists on + # this server + device = row[0] + message_json = ujson.dumps(messages_by_device[device]) + messages_json_for_user[device] = message_json + + local_by_user_then_device[user_id] = messages_json_for_user sql = ( "INSERT INTO device_inbox" @@ -150,13 +172,9 @@ class DeviceInboxStore(SQLBaseStore): " VALUES (?,?,?,?)" ) rows = [] - for user_id, messages_by_device in messages_by_user_then_device.items(): - for device_id, message in messages_by_device.items(): - message_json = ujson.dumps(message) - # Only insert into the local inbox if the device exists on - # this server - if (user_id, device_id) in local_users_and_devices: - rows.append((user_id, device_id, stream_id, message_json)) + for user_id, messages_by_device in local_by_user_then_device.items(): + for device_id, message_json in messages_by_device.items(): + rows.append((user_id, device_id, stream_id, message_json)) txn.executemany(sql, rows) -- cgit 1.4.1 From 5beda10bbdeeed4d5535c726f32e18d5c09f2553 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Sep 2016 16:18:01 +0100 Subject: Reindex state_groups_state after pruning --- synapse/storage/background_updates.py | 6 ++++-- synapse/storage/state.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 30d0e4c5dc..003f5ba203 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -133,10 +133,12 @@ class BackgroundUpdateStore(SQLBaseStore): updates = yield self._simple_select_list( "background_updates", keyvalues=None, - retcols=("update_name",), + retcols=("update_name", "depends_on"), ) + in_flight = set(update["update_name"] for update in updates) for update in updates: - self._background_update_queue.append(update['update_name']) + if update["depends_on"] not in in_flight: + self._background_update_queue.append(update['update_name']) if not self._background_update_queue: # no work left to do diff --git a/synapse/storage/state.py b/synapse/storage/state.py index fef87834ca..0cff0a0cda 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -48,6 +48,7 @@ class StateStore(SQLBaseStore): """ STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" + STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index" def __init__(self, hs): super(StateStore, self).__init__(hs) @@ -55,6 +56,10 @@ class StateStore(SQLBaseStore): self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, self._background_deduplicate_state, ) + self.register_background_update_handler( + self.STATE_GROUP_INDEX_UPDATE_NAME, + self._background_index_state, + ) @defer.inlineCallbacks def get_state_groups_ids(self, room_id, event_ids): @@ -793,3 +798,31 @@ class StateStore(SQLBaseStore): yield self._end_background_update(self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME) defer.returnValue(result * BATCH_SIZE_SCALE_FACTOR) + + @defer.inlineCallbacks + def _background_index_state(self, progress, batch_size): + def reindex_txn(txn): + if isinstance(self.database_engine, PostgresEngine): + txn.execute( + "CREATE INDEX state_groups_state_type_idx" + " ON state_groups_state(state_group, type, state_key)" + ) + txn.execute( + "DROP INDEX IF EXISTS state_groups_state_id" + ) + else: + txn.execute( + "CREATE INDEX state_groups_state_type_idx" + " ON state_groups_state(state_group, type, state_key)" + ) + txn.execute( + "DROP INDEX IF EXISTS state_groups_state_id" + ) + + yield self.runInteraction( + self.STATE_GROUP_INDEX_UPDATE_NAME, reindex_txn + ) + + yield self._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME) + + defer.returnValue(1) -- cgit 1.4.1 From ebb46497ba622983b31a3c5aad943b8922b97e89 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Sep 2016 16:38:54 +0100 Subject: Add delta file --- synapse/storage/schema/delta/35/add_state_index.sql | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 synapse/storage/schema/delta/35/add_state_index.sql (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/35/add_state_index.sql b/synapse/storage/schema/delta/35/add_state_index.sql new file mode 100644 index 0000000000..0fce26345b --- /dev/null +++ b/synapse/storage/schema/delta/35/add_state_index.sql @@ -0,0 +1,20 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +ALTER TABLE background_updates ADD COLUMN depends_on TEXT; + +INSERT into background_updates (update_name, progress_json, depends_on) + VALUES ('state_group_state_type_index', '{}', 'state_group_state_deduplication'); -- cgit 1.4.1 From fa722a699cd2637546f02451b8ee969c7bc1a84d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 8 Sep 2016 17:35:16 +0100 Subject: Reapply 34/device_outbox in 35/device_outbox_again.py since the schema was bumped before it landed on develop --- .../storage/schema/delta/35/device_outbox_again.py | 30 ++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 synapse/storage/schema/delta/35/device_outbox_again.py (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/35/device_outbox_again.py b/synapse/storage/schema/delta/35/device_outbox_again.py new file mode 100644 index 0000000000..46da12a93c --- /dev/null +++ b/synapse/storage/schema/delta/35/device_outbox_again.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Re-apply 34/device_outbox.sql since the schema version was bumped before it +# was added to develop. + +import synapse.storage.prepare_database +import os + + +def run_create(cur, database_engine, *args, **kwargs): + try: + delta_dir = os.path.join(os.path.dirname(__file__), "..") + synapse.storage.prepare_database.executescript( + cur, os.path.join(delta_dir, "34", "device_outbox.sql") + ) + except: + pass -- cgit 1.4.1 From 7d5b1425478f7d7a7e06b11579b107f9e7c8c6a0 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 8 Sep 2016 17:39:11 +0100 Subject: Add a stub run_upgrade --- synapse/storage/schema/delta/35/device_outbox_again.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/35/device_outbox_again.py b/synapse/storage/schema/delta/35/device_outbox_again.py index 46da12a93c..d6d2260393 100644 --- a/synapse/storage/schema/delta/35/device_outbox_again.py +++ b/synapse/storage/schema/delta/35/device_outbox_again.py @@ -28,3 +28,7 @@ def run_create(cur, database_engine, *args, **kwargs): ) except: pass + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass -- cgit 1.4.1 From 43b77c5d97a3119296c0f26030140b28e8d25f04 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 8 Sep 2016 17:44:21 +0100 Subject: Only catch databas errors --- synapse/storage/schema/delta/35/device_outbox_again.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/35/device_outbox_again.py b/synapse/storage/schema/delta/35/device_outbox_again.py index d6d2260393..5f950a4a8a 100644 --- a/synapse/storage/schema/delta/35/device_outbox_again.py +++ b/synapse/storage/schema/delta/35/device_outbox_again.py @@ -26,7 +26,7 @@ def run_create(cur, database_engine, *args, **kwargs): synapse.storage.prepare_database.executescript( cur, os.path.join(delta_dir, "34", "device_outbox.sql") ) - except: + except database_engine.module.DatabaseError: pass -- cgit 1.4.1 From 0877157353c5610e8ede2f205a84ae80bef7983b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 9 Sep 2016 11:04:47 +0100 Subject: Just move the schema and add some DROPs --- synapse/storage/schema/delta/34/device_outbox.sql | 36 -------------------- synapse/storage/schema/delta/35/device_outbox.sql | 39 ++++++++++++++++++++++ .../storage/schema/delta/35/device_outbox_again.py | 34 ------------------- 3 files changed, 39 insertions(+), 70 deletions(-) delete mode 100644 synapse/storage/schema/delta/34/device_outbox.sql create mode 100644 synapse/storage/schema/delta/35/device_outbox.sql delete mode 100644 synapse/storage/schema/delta/35/device_outbox_again.py (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/34/device_outbox.sql b/synapse/storage/schema/delta/34/device_outbox.sql deleted file mode 100644 index e87066d9a1..0000000000 --- a/synapse/storage/schema/delta/34/device_outbox.sql +++ /dev/null @@ -1,36 +0,0 @@ -/* Copyright 2016 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -CREATE TABLE device_federation_outbox ( - destination TEXT NOT NULL, - stream_id BIGINT NOT NULL, - queued_ts BIGINT NOT NULL, - messages_json TEXT NOT NULL -); - - -CREATE INDEX device_federation_outbox_destination_id - ON device_federation_outbox(destination, stream_id); - - -CREATE TABLE device_federation_inbox ( - origin TEXT NOT NULL, - message_id TEXT NOT NULL, - received_ts BIGINT NOT NULL -); - - -CREATE INDEX device_federation_inbox_sender_id - ON device_federation_inbox(origin, message_id); diff --git a/synapse/storage/schema/delta/35/device_outbox.sql b/synapse/storage/schema/delta/35/device_outbox.sql new file mode 100644 index 0000000000..17e6c43105 --- /dev/null +++ b/synapse/storage/schema/delta/35/device_outbox.sql @@ -0,0 +1,39 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +DROP TABLE IF EXISTS device_federation_outbox; +CREATE TABLE device_federation_outbox ( + destination TEXT NOT NULL, + stream_id BIGINT NOT NULL, + queued_ts BIGINT NOT NULL, + messages_json TEXT NOT NULL +); + + +DROP INDEX IF EXISTS device_federation_outbox_destination_id; +CREATE INDEX device_federation_outbox_destination_id + ON device_federation_outbox(destination, stream_id); + + +DROP TABLE IF EXISTS device_federation_inbox; +CREATE TABLE device_federation_inbox ( + origin TEXT NOT NULL, + message_id TEXT NOT NULL, + received_ts BIGINT NOT NULL +); + +DROP INDEX IF EXISTS device_federation_inbox_sender_id; +CREATE INDEX device_federation_inbox_sender_id + ON device_federation_inbox(origin, message_id); diff --git a/synapse/storage/schema/delta/35/device_outbox_again.py b/synapse/storage/schema/delta/35/device_outbox_again.py deleted file mode 100644 index 5f950a4a8a..0000000000 --- a/synapse/storage/schema/delta/35/device_outbox_again.py +++ /dev/null @@ -1,34 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Re-apply 34/device_outbox.sql since the schema version was bumped before it -# was added to develop. - -import synapse.storage.prepare_database -import os - - -def run_create(cur, database_engine, *args, **kwargs): - try: - delta_dir = os.path.join(os.path.dirname(__file__), "..") - synapse.storage.prepare_database.executescript( - cur, os.path.join(delta_dir, "34", "device_outbox.sql") - ) - except database_engine.module.DatabaseError: - pass - - -def run_upgrade(cur, database_engine, *args, **kwargs): - pass -- cgit 1.4.1 From 6a6cbfcf1e12c0f34a280764f892eaa23e720d57 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 9 Sep 2016 11:48:23 +0100 Subject: Track the max_stream_device_id in a separate table, since we delete from the inbox table --- synapse/replication/slave/storage/deviceinbox.py | 2 +- synapse/storage/__init__.py | 2 +- synapse/storage/deviceinbox.py | 17 +++++++++++++++-- synapse/storage/schema/delta/35/device_stream_id.sql | 20 ++++++++++++++++++++ 4 files changed, 37 insertions(+), 4 deletions(-) create mode 100644 synapse/storage/schema/delta/35/device_stream_id.sql (limited to 'synapse/storage') diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 251078ba57..3bfd5e8213 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -23,7 +23,7 @@ class SlavedDeviceInboxStore(BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedDeviceInboxStore, self).__init__(db_conn, hs) self._device_inbox_id_gen = SlavedIdTracker( - db_conn, "device_inbox", "stream_id", + db_conn, "device_max_stream_id", "stream_id", ) self._device_inbox_stream_cache = StreamChangeCache( "DeviceInboxStreamChangeCache", diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 828e5ca60b..a61e83d5de 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -111,7 +111,7 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "presence_stream", "stream_id" ) self._device_inbox_id_gen = StreamIdGenerator( - db_conn, "device_inbox", "stream_id" + db_conn, "device_max_stream_id", "stream_id" ) self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id") diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 658fbef27b..b729b7106e 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -130,6 +130,13 @@ class DeviceInboxStore(SQLBaseStore): def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, messages_by_user_then_device): + sql = ( + "UPDATE device_max_stream_id" + " SET stream_id = ?" + " WHERE stream_id < ?" + ) + txn.execute(sql, (stream_id, stream_id)) + local_by_user_then_device = {} for user_id, messages_by_device in messages_by_user_then_device.items(): messages_json_for_user = {} @@ -148,6 +155,8 @@ class DeviceInboxStore(SQLBaseStore): device = row[0] messages_json_for_user[device] = message_json else: + if not devices: + continue sql = ( "SELECT device_id FROM devices" " WHERE user_id = ? AND device_id IN (" @@ -164,7 +173,11 @@ class DeviceInboxStore(SQLBaseStore): message_json = ujson.dumps(messages_by_device[device]) messages_json_for_user[device] = message_json - local_by_user_then_device[user_id] = messages_json_for_user + if messages_json_for_user: + local_by_user_then_device[user_id] = messages_json_for_user + + if not local_by_user_then_device: + return sql = ( "INSERT INTO device_inbox" @@ -301,7 +314,7 @@ class DeviceInboxStore(SQLBaseStore): has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( destination, last_stream_id ) - if not has_changed: + if not has_changed or last_stream_id == current_stream_id: return defer.succeed(([], current_stream_id)) def get_new_messages_for_remote_destination_txn(txn): diff --git a/synapse/storage/schema/delta/35/device_stream_id.sql b/synapse/storage/schema/delta/35/device_stream_id.sql new file mode 100644 index 0000000000..1ce6336f33 --- /dev/null +++ b/synapse/storage/schema/delta/35/device_stream_id.sql @@ -0,0 +1,20 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE device_max_stream_id ( + stream_id BIGINT NOT NULL +); + +INSERT INTO device_max_stream_id (stream_id) VALUES (0); -- cgit 1.4.1 From 647c7245733a72b9b71decb2b321869a942dbb88 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 9 Sep 2016 11:52:44 +0100 Subject: Use the previous MAX value if any to set the stream_id --- synapse/storage/schema/delta/35/device_stream_id.sql | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/35/device_stream_id.sql b/synapse/storage/schema/delta/35/device_stream_id.sql index 1ce6336f33..7ab7d942e2 100644 --- a/synapse/storage/schema/delta/35/device_stream_id.sql +++ b/synapse/storage/schema/delta/35/device_stream_id.sql @@ -17,4 +17,5 @@ CREATE TABLE device_max_stream_id ( stream_id BIGINT NOT NULL ); -INSERT INTO device_max_stream_id (stream_id) VALUES (0); +INSERT INTO device_max_stream_id (stream_id) + SELECT COALESCE(MAX(stream_id), 0) FROM device_inbox; -- cgit 1.4.1 From f2acc3dcf9cf213948ce3d2ebf12c3202abe97fd Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 9 Sep 2016 18:54:54 +0100 Subject: Add index to event_push_actions and remove room_id caluse so it uses it Mostly from @negativemjark --- synapse/storage/event_push_actions.py | 4 +++- .../schema/delta/35/event_push_actions_index.sql | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/35/event_push_actions_index.sql (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index dedf517cfa..a67c886f9a 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -353,12 +353,14 @@ class EventPushActionsStore(SQLBaseStore): before_clause += " " before_clause += "AND epa.highlight = 1" + # NB. This assumes event_ids are globally unique since + # it makes the query easier to index sql = ( "SELECT epa.event_id, epa.room_id," " epa.stream_ordering, epa.topological_ordering," " epa.actions, epa.profile_tag, e.received_ts" " FROM event_push_actions epa, events e" - " WHERE epa.room_id = e.room_id AND epa.event_id = e.event_id" + " WHERE epa.event_id = e.event_id" " AND epa.user_id = ? %s" " ORDER BY epa.stream_ordering DESC" " LIMIT ?" diff --git a/synapse/storage/schema/delta/35/event_push_actions_index.sql b/synapse/storage/schema/delta/35/event_push_actions_index.sql new file mode 100644 index 0000000000..4fc32c351a --- /dev/null +++ b/synapse/storage/schema/delta/35/event_push_actions_index.sql @@ -0,0 +1,18 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + CREATE INDEX event_push_actions_user_id_highlight_stream_ordering on event_push_actions( + user_id, highlight, stream_ordering + ); -- cgit 1.4.1 From 897d57bc58579b5dd253b3294f31bedd43edf0f1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Sep 2016 10:05:07 +0100 Subject: Change state fetch query for postgres to be faster It turns out that postgres doesn't like doing a list of OR's and is about 1000x slower, so we just issue a query for each specific type seperately. --- synapse/storage/state.py | 54 +++++++++++++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 19 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 0cff0a0cda..f98d5d53ee 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -306,13 +306,6 @@ class StateStore(SQLBaseStore): defer.returnValue(results) def _get_state_groups_from_groups_txn(self, txn, groups, types=None): - if types is not None: - where_clause = "AND (%s)" % ( - " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), - ) - else: - where_clause = "" - results = {group: {} for group in groups} if isinstance(self.database_engine, PostgresEngine): # Temporarily disable sequential scans in this transaction. This is @@ -342,20 +335,43 @@ class StateStore(SQLBaseStore): WHERE state_group IN ( SELECT state_group FROM state ) - %s; - """) % (where_clause,) - - for group in groups: - args = [group] - if types is not None: - args.extend([i for typ in types for i in typ]) + %s + """) - txn.execute(sql, args) - rows = self.cursor_to_dict(txn) - for row in rows: - key = (row["type"], row["state_key"]) - results[group][key] = row["event_id"] + # Turns out that postgres doesn't like doing a list of OR's and + # is about 1000x slower, so we just issue a query for each specific + # type seperately. + if types: + clause_to_args = [ + ( + "AND type = ? AND state_key = ?", + (etype, state_key) + ) + for etype, state_key in types + ] + else: + # If types is None we fetch all the state, and so just use an + # empty where clause with no extra args. + clause_to_args = [("", [])] + + for where_clause, where_args in clause_to_args: + for group in groups: + args = [group] + args.extend(where_args) + + txn.execute(sql % (where_clause,), args) + rows = self.cursor_to_dict(txn) + for row in rows: + key = (row["type"], row["state_key"]) + results[group][key] = row["event_id"] else: + if types is not None: + where_clause = "AND (%s)" % ( + " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), + ) + else: + where_clause = "" + # We don't use WITH RECURSIVE on sqlite3 as there are distributions # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) for group in groups: -- cgit 1.4.1 From 54417999b692a8dd0f8f4edd62598c80835a4212 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Sep 2016 10:39:55 +0100 Subject: Revert "Add index to event_push_actions" --- synapse/storage/event_push_actions.py | 4 +--- .../schema/delta/35/event_push_actions_index.sql | 18 ------------------ 2 files changed, 1 insertion(+), 21 deletions(-) delete mode 100644 synapse/storage/schema/delta/35/event_push_actions_index.sql (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index a87d90741a..10e9305f7b 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -353,14 +353,12 @@ class EventPushActionsStore(SQLBaseStore): before_clause += " " before_clause += "AND epa.highlight = 1" - # NB. This assumes event_ids are globally unique since - # it makes the query easier to index sql = ( "SELECT epa.event_id, epa.room_id," " epa.stream_ordering, epa.topological_ordering," " epa.actions, epa.profile_tag, e.received_ts" " FROM event_push_actions epa, events e" - " WHERE epa.event_id = e.event_id" + " WHERE epa.room_id = e.room_id AND epa.event_id = e.event_id" " AND epa.user_id = ? %s" " ORDER BY epa.stream_ordering DESC" " LIMIT ?" diff --git a/synapse/storage/schema/delta/35/event_push_actions_index.sql b/synapse/storage/schema/delta/35/event_push_actions_index.sql deleted file mode 100644 index 4fc32c351a..0000000000 --- a/synapse/storage/schema/delta/35/event_push_actions_index.sql +++ /dev/null @@ -1,18 +0,0 @@ -/* Copyright 2016 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - CREATE INDEX event_push_actions_user_id_highlight_stream_ordering on event_push_actions( - user_id, highlight, stream_ordering - ); -- cgit 1.4.1 From 31f85f9db9d78ce2e201e7adb7128131377b376d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 12 Sep 2016 11:00:26 +0100 Subject: Add comments to existing schema deltas that used "CREATE INDEX" directly --- synapse/storage/schema/delta/22/receipts_index.sql | 4 ++++ synapse/storage/schema/delta/28/events_room_stream.sql | 4 ++++ synapse/storage/schema/delta/28/public_roms_index.sql | 4 ++++ synapse/storage/schema/delta/28/receipts_user_id_index.sql | 4 ++++ synapse/storage/schema/delta/29/push_actions.sql | 4 ++++ synapse/storage/schema/delta/31/pushers_index.sql | 4 ++++ 6 files changed, 24 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/22/receipts_index.sql b/synapse/storage/schema/delta/22/receipts_index.sql index 7bc061dff6..bfc0b3bcaa 100644 --- a/synapse/storage/schema/delta/22/receipts_index.sql +++ b/synapse/storage/schema/delta/22/receipts_index.sql @@ -13,6 +13,10 @@ * limitations under the License. */ +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id ); diff --git a/synapse/storage/schema/delta/28/events_room_stream.sql b/synapse/storage/schema/delta/28/events_room_stream.sql index 200c35e6e2..36609475f1 100644 --- a/synapse/storage/schema/delta/28/events_room_stream.sql +++ b/synapse/storage/schema/delta/28/events_room_stream.sql @@ -13,4 +13,8 @@ * limitations under the License. */ +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX events_room_stream on events(room_id, stream_ordering); diff --git a/synapse/storage/schema/delta/28/public_roms_index.sql b/synapse/storage/schema/delta/28/public_roms_index.sql index ba62a974a4..6c1fd68c5b 100644 --- a/synapse/storage/schema/delta/28/public_roms_index.sql +++ b/synapse/storage/schema/delta/28/public_roms_index.sql @@ -13,4 +13,8 @@ * limitations under the License. */ +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX public_room_index on rooms(is_public); diff --git a/synapse/storage/schema/delta/28/receipts_user_id_index.sql b/synapse/storage/schema/delta/28/receipts_user_id_index.sql index 452a1b3c6c..cb84c69baa 100644 --- a/synapse/storage/schema/delta/28/receipts_user_id_index.sql +++ b/synapse/storage/schema/delta/28/receipts_user_id_index.sql @@ -13,6 +13,10 @@ * limitations under the License. */ +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id ); diff --git a/synapse/storage/schema/delta/29/push_actions.sql b/synapse/storage/schema/delta/29/push_actions.sql index 7e7b09820a..84b21cf813 100644 --- a/synapse/storage/schema/delta/29/push_actions.sql +++ b/synapse/storage/schema/delta/29/push_actions.sql @@ -26,6 +26,10 @@ UPDATE event_push_actions SET stream_ordering = ( UPDATE event_push_actions SET notif = 1, highlight = 0; +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX event_push_actions_rm_tokens on event_push_actions( user_id, room_id, topological_ordering, stream_ordering ); diff --git a/synapse/storage/schema/delta/31/pushers_index.sql b/synapse/storage/schema/delta/31/pushers_index.sql index 9027bccc69..a82add88fd 100644 --- a/synapse/storage/schema/delta/31/pushers_index.sql +++ b/synapse/storage/schema/delta/31/pushers_index.sql @@ -13,6 +13,10 @@ * limitations under the License. */ +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX event_push_actions_stream_ordering on event_push_actions( stream_ordering, user_id ); -- cgit 1.4.1 From a232e06100630699d375deb43e07c961f5a26237 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 12 Sep 2016 12:30:46 +0100 Subject: Fix direct to device messages recieved over federation to notify sync --- synapse/storage/deviceinbox.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index b729b7106e..f640e73714 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -128,6 +128,8 @@ class DeviceInboxStore(SQLBaseStore): user_id, stream_id ) + defer.returnValue(stream_id) + def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, messages_by_user_then_device): sql = ( -- cgit 1.4.1 From 15ca0c6a4d4b7ddbff8e3348ba949177e1562108 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Sep 2016 12:36:36 +0100 Subject: Make reindex happen in bg --- synapse/storage/event_push_actions.py | 30 ++++++++++++++++++++++ .../schema/delta/35/event_push_actions_index.sql | 5 ++-- 2 files changed, 32 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index a87d90741a..40bfe754b5 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -17,6 +17,7 @@ from ._base import SQLBaseStore from twisted.internet import defer from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.types import RoomStreamToken +from synapse.storage.engines import PostgresEngine from .stream import lower_bound import logging @@ -26,10 +27,17 @@ logger = logging.getLogger(__name__) class EventPushActionsStore(SQLBaseStore): + EPA_HIGHLIGHT_INDEX = "epa_highlight_index" + def __init__(self, hs): self.stream_ordering_month_ago = None super(EventPushActionsStore, self).__init__(hs) + self.register_background_update_handler( + self.EPA_HIGHLIGHT_INDEX, + self._background_index_epa_highlight, + ) + def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ Args: @@ -500,6 +508,28 @@ class EventPushActionsStore(SQLBaseStore): return range_end + @defer.inlineCallbacks + def _background_index_epa_highlight(self, progress, batch_size): + def reindex_txn(txn): + if isinstance(self.database_engine, PostgresEngine): + txn.execute( + "CREATE INDEX CONCURRENTLY event_push_actions_u_highlight" + " on event_push_actions(user_id, highlight, stream_ordering)" + ) + else: + txn.execute( + "CREATE INDEX event_push_actions_u_highlight" + " on event_push_actions(user_id, highlight, stream_ordering)" + ) + + yield self.runInteraction( + self.EPA_HIGHLIGHT_INDEX, reindex_txn + ) + + yield self._end_background_update(self.EPA_HIGHLIGHT_INDEX) + + defer.returnValue(1) + def _action_has_highlight(actions): for action in actions: diff --git a/synapse/storage/schema/delta/35/event_push_actions_index.sql b/synapse/storage/schema/delta/35/event_push_actions_index.sql index 4fc32c351a..2e836d8e9c 100644 --- a/synapse/storage/schema/delta/35/event_push_actions_index.sql +++ b/synapse/storage/schema/delta/35/event_push_actions_index.sql @@ -13,6 +13,5 @@ * limitations under the License. */ - CREATE INDEX event_push_actions_user_id_highlight_stream_ordering on event_push_actions( - user_id, highlight, stream_ordering - ); + INSERT into background_updates (update_name, progress_json) + VALUES ('epa_highlight_index', '{}'); -- cgit 1.4.1 From 0294c14ec43bed0b116c7ff531482539fb713443 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Sep 2016 12:43:56 +0100 Subject: Add back in query change --- synapse/storage/event_push_actions.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 8632b2f936..40bfe754b5 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -361,12 +361,14 @@ class EventPushActionsStore(SQLBaseStore): before_clause += " " before_clause += "AND epa.highlight = 1" + # NB. This assumes event_ids are globally unique since + # it makes the query easier to index sql = ( "SELECT epa.event_id, epa.room_id," " epa.stream_ordering, epa.topological_ordering," " epa.actions, epa.profile_tag, e.received_ts" " FROM event_push_actions epa, events e" - " WHERE epa.room_id = e.room_id AND epa.event_id = e.event_id" + " WHERE epa.event_id = e.event_id" " AND epa.user_id = ? %s" " ORDER BY epa.stream_ordering DESC" " LIMIT ?" -- cgit 1.4.1 From 7cd6edb9470ed949b2c63317624dbc9e38950c95 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Sep 2016 12:54:38 +0100 Subject: Use register_background_index_update --- synapse/storage/event_push_actions.py | 28 ++++------------------------ 1 file changed, 4 insertions(+), 24 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 40bfe754b5..7974a108ad 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -33,9 +33,11 @@ class EventPushActionsStore(SQLBaseStore): self.stream_ordering_month_ago = None super(EventPushActionsStore, self).__init__(hs) - self.register_background_update_handler( + self.register_background_index_update( self.EPA_HIGHLIGHT_INDEX, - self._background_index_epa_highlight, + index_name="event_push_actions_u_highlight", + table="event_push_actions", + columns=["user_id", "highlight", "stream_ordering"], ) def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): @@ -508,28 +510,6 @@ class EventPushActionsStore(SQLBaseStore): return range_end - @defer.inlineCallbacks - def _background_index_epa_highlight(self, progress, batch_size): - def reindex_txn(txn): - if isinstance(self.database_engine, PostgresEngine): - txn.execute( - "CREATE INDEX CONCURRENTLY event_push_actions_u_highlight" - " on event_push_actions(user_id, highlight, stream_ordering)" - ) - else: - txn.execute( - "CREATE INDEX event_push_actions_u_highlight" - " on event_push_actions(user_id, highlight, stream_ordering)" - ) - - yield self.runInteraction( - self.EPA_HIGHLIGHT_INDEX, reindex_txn - ) - - yield self._end_background_update(self.EPA_HIGHLIGHT_INDEX) - - defer.returnValue(1) - def _action_has_highlight(actions): for action in actions: -- cgit 1.4.1 From 5ef5435529778d6bf0bf19f36b7fb6febc26a718 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Sep 2016 13:32:58 +0100 Subject: Remove unused import --- synapse/storage/event_push_actions.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 7974a108ad..e02afdb1df 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -17,7 +17,6 @@ from ._base import SQLBaseStore from twisted.internet import defer from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.types import RoomStreamToken -from synapse.storage.engines import PostgresEngine from .stream import lower_bound import logging -- cgit 1.4.1 From fa20c9ce9410d71d7144932c33156b9dacd554f5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Sep 2016 14:04:08 +0100 Subject: Change the index to be stream_ordering, highlight --- synapse/storage/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index e02afdb1df..51b13e9498 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -36,7 +36,7 @@ class EventPushActionsStore(SQLBaseStore): self.EPA_HIGHLIGHT_INDEX, index_name="event_push_actions_u_highlight", table="event_push_actions", - columns=["user_id", "highlight", "stream_ordering"], + columns=["user_id", "stream_ordering", "highlight"], ) def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): -- cgit 1.4.1 From 03a98aff3c4803b163802b784e05f1eacd03c444 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Sep 2016 14:26:03 +0100 Subject: Create new index concurrently --- synapse/storage/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index f98d5d53ee..fdbdade536 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -820,7 +820,7 @@ class StateStore(SQLBaseStore): def reindex_txn(txn): if isinstance(self.database_engine, PostgresEngine): txn.execute( - "CREATE INDEX state_groups_state_type_idx" + "CREATE INDEX CONCURRENTLY state_groups_state_type_idx" " ON state_groups_state(state_group, type, state_key)" ) txn.execute( -- cgit 1.4.1 From c94de0ab606e5bbe7331a58456204a5775df779b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Sep 2016 16:55:01 +0100 Subject: Add WHERE clause support to index creation --- synapse/storage/background_updates.py | 24 +++++++++++++++--------- synapse/storage/event_push_actions.py | 3 ++- 2 files changed, 17 insertions(+), 10 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 003f5ba203..94b2bcc54a 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -219,7 +219,7 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_handlers[update_name] = update_handler def register_background_index_update(self, update_name, index_name, - table, columns): + table, columns, where_clause=None): """Helper for store classes to do a background index addition To use: @@ -243,14 +243,20 @@ class BackgroundUpdateStore(SQLBaseStore): conc = True else: conc = False - - sql = "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" \ - % { - "conc": "CONCURRENTLY" if conc else "", - "name": index_name, - "table": table, - "columns": ", ".join(columns), - } + # We don't use partial indices on SQLite as it wasn't introduced + # until 3.8, and wheezy has 3.7 + where_clause = None + + sql = ( + "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" + " %(where_clause)s" + ) % { + "conc": "CONCURRENTLY" if conc else "", + "name": index_name, + "table": table, + "columns": ", ".join(columns), + "where_clause": "WHERE " + where_clause if where_clause else "" + } def create_index_concurrently(conn): conn.rollback() diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 51b13e9498..efa1db3b94 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -36,7 +36,8 @@ class EventPushActionsStore(SQLBaseStore): self.EPA_HIGHLIGHT_INDEX, index_name="event_push_actions_u_highlight", table="event_push_actions", - columns=["user_id", "stream_ordering", "highlight"], + columns=["user_id", "stream_ordering"], + where_clause="highlight = 1", ) def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): -- cgit 1.4.1 From 0b32bb20bbf025b6e69bae23a6d4a96903f41885 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Sep 2016 16:57:05 +0100 Subject: Index contains_url for file search queries --- synapse/storage/events.py | 8 ++++++++ synapse/storage/schema/delta/35/contains_url.sql | 17 +++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 synapse/storage/schema/delta/35/contains_url.sql (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ed182c8d11..6dc46fa50f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -189,6 +189,14 @@ class EventsStore(SQLBaseStore): self._background_reindex_fields_sender, ) + self.register_background_index_update( + "event_contains_url_index", + index_name="event_contains_url_index", + table="events", + columns=["room_id", "topological_ordering", "stream_ordering"], + where_clause="contains_url = true AND outlier = false", + ) + self._event_persist_queue = _EventPeristenceQueue() def persist_events(self, events_and_contexts, backfilled=False): diff --git a/synapse/storage/schema/delta/35/contains_url.sql b/synapse/storage/schema/delta/35/contains_url.sql new file mode 100644 index 0000000000..6cd123027b --- /dev/null +++ b/synapse/storage/schema/delta/35/contains_url.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + INSERT into background_updates (update_name, progress_json) + VALUES ('event_contains_url_index', '{}'); -- cgit 1.4.1 From b17af156c71b9859edbd27516eacd2c55f488e31 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Sep 2016 17:05:54 +0100 Subject: Remove where clause --- synapse/storage/event_push_actions.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index efa1db3b94..9cd923eb93 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -37,7 +37,6 @@ class EventPushActionsStore(SQLBaseStore): index_name="event_push_actions_u_highlight", table="event_push_actions", columns=["user_id", "stream_ordering"], - where_clause="highlight = 1", ) def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): -- cgit 1.4.1 From d5ae1f129143d6436a238fd7882e39168f944846 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Sep 2016 10:03:48 +0100 Subject: Ensure we don't mutate state cache entries --- synapse/handlers/federation.py | 4 ++++ synapse/state.py | 6 ++++-- synapse/storage/state.py | 45 +++++++++++++++++++++++------------------- 3 files changed, 33 insertions(+), 22 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8a1038c44a..f7cb3c1bb2 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1585,10 +1585,12 @@ class FederationHandler(BaseHandler): current_state = set(e.event_id for e in auth_events.values()) different_auth = event_auth_events - current_state + context.current_state_ids = dict(context.current_state_ids) context.current_state_ids.update({ k: a.event_id for k, a in auth_events.items() if k != event_key }) + context.prev_state_ids = dict(context.prev_state_ids) context.prev_state_ids.update({ k: a.event_id for k, a in auth_events.items() }) @@ -1670,10 +1672,12 @@ class FederationHandler(BaseHandler): # 4. Look at rejects and their proofs. # TODO. + context.current_state_ids = dict(context.current_state_ids) context.current_state_ids.update({ k: a.event_id for k, a in auth_events.items() if k != event_key }) + context.prev_state_ids = dict(context.prev_state_ids) context.prev_state_ids.update({ k: a.event_id for k, a in auth_events.items() }) diff --git a/synapse/state.py b/synapse/state.py index 4520fa0415..617db8d2e2 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -26,6 +26,7 @@ from synapse.events.snapshot import EventContext from synapse.util.async import Linearizer from collections import namedtuple +from frozendict import frozendict import logging import hashlib @@ -58,11 +59,11 @@ class _StateCacheEntry(object): __slots__ = ["state", "state_group", "state_id", "prev_group", "delta_ids"] def __init__(self, state, state_group, prev_group=None, delta_ids=None): - self.state = state + self.state = frozendict(state) self.state_group = state_group self.prev_group = prev_group - self.delta_ids = delta_ids + self.delta_ids = frozendict(delta_ids) if delta_ids is not None else None # The `state_id` is a unique ID we generate that can be used as ID for # this collection of state. Usually this would be the same as the @@ -255,6 +256,7 @@ class StateHandler(object): context.prev_group = entry.prev_group context.delta_ids = entry.delta_ids if context.delta_ids is not None: + context.delta_ids = dict(context.delta_ids) context.delta_ids[key] = event.event_id else: context.current_state_ids = context.prev_state_ids diff --git a/synapse/storage/state.py b/synapse/storage/state.py index fdbdade536..ec8c62b653 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -817,27 +817,32 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def _background_index_state(self, progress, batch_size): - def reindex_txn(txn): - if isinstance(self.database_engine, PostgresEngine): - txn.execute( - "CREATE INDEX CONCURRENTLY state_groups_state_type_idx" - " ON state_groups_state(state_group, type, state_key)" - ) - txn.execute( - "DROP INDEX IF EXISTS state_groups_state_id" - ) - else: - txn.execute( - "CREATE INDEX state_groups_state_type_idx" - " ON state_groups_state(state_group, type, state_key)" - ) - txn.execute( - "DROP INDEX IF EXISTS state_groups_state_id" - ) + def reindex_txn(conn): + conn.rollback() + # postgres insists on autocommit for the index + conn.set_session(autocommit=True) + try: + txn = conn.cursor() + if isinstance(self.database_engine, PostgresEngine): + txn.execute( + "CREATE INDEX CONCURRENTLY state_groups_state_type_idx" + " ON state_groups_state(state_group, type, state_key)" + ) + txn.execute( + "DROP INDEX IF EXISTS state_groups_state_id" + ) + else: + txn.execute( + "CREATE INDEX state_groups_state_type_idx" + " ON state_groups_state(state_group, type, state_key)" + ) + txn.execute( + "DROP INDEX IF EXISTS state_groups_state_id" + ) + finally: + conn.set_session(autocommit=False) - yield self.runInteraction( - self.STATE_GROUP_INDEX_UPDATE_NAME, reindex_txn - ) + yield self.runWithConnection(reindex_txn) yield self._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME) -- cgit 1.4.1 From 00f51493f5726210bf649889ed3c03b56fddbe1d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Sep 2016 10:18:30 +0100 Subject: Fix reindex --- synapse/storage/state.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ec8c62b653..7eb342674c 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -819,11 +819,11 @@ class StateStore(SQLBaseStore): def _background_index_state(self, progress, batch_size): def reindex_txn(conn): conn.rollback() - # postgres insists on autocommit for the index - conn.set_session(autocommit=True) - try: - txn = conn.cursor() - if isinstance(self.database_engine, PostgresEngine): + if isinstance(self.database_engine, PostgresEngine): + # postgres insists on autocommit for the index + conn.set_session(autocommit=True) + try: + txn = conn.cursor() txn.execute( "CREATE INDEX CONCURRENTLY state_groups_state_type_idx" " ON state_groups_state(state_group, type, state_key)" @@ -831,16 +831,17 @@ class StateStore(SQLBaseStore): txn.execute( "DROP INDEX IF EXISTS state_groups_state_id" ) - else: - txn.execute( - "CREATE INDEX state_groups_state_type_idx" - " ON state_groups_state(state_group, type, state_key)" - ) - txn.execute( - "DROP INDEX IF EXISTS state_groups_state_id" - ) - finally: - conn.set_session(autocommit=False) + finally: + conn.set_session(autocommit=False) + else: + txn = conn.cursor() + txn.execute( + "CREATE INDEX state_groups_state_type_idx" + " ON state_groups_state(state_group, type, state_key)" + ) + txn.execute( + "DROP INDEX IF EXISTS state_groups_state_id" + ) yield self.runWithConnection(reindex_txn) -- cgit 1.4.1 From ed992ae6ba7e0f97a526339a9782e10a410a6a2b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Sep 2016 16:09:32 +0100 Subject: Add a DB index to figure out past state at a stream ordering in a room --- synapse/storage/event_federation.py | 81 ++++++++++++++++++++++ .../schema/delta/35/stream_order_to_extrem.sql | 37 ++++++++++ 2 files changed, 118 insertions(+) create mode 100644 synapse/storage/schema/delta/35/stream_order_to_extrem.sql (limited to 'synapse/storage') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 0827946207..9ec67ad0c4 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -16,6 +16,7 @@ from twisted.internet import defer from ._base import SQLBaseStore +from synapse.api.errors import StoreError from synapse.util.caches.descriptors import cached from unpaddedbase64 import encode_base64 @@ -36,6 +37,13 @@ class EventFederationStore(SQLBaseStore): and backfilling from another server respectively. """ + def __init__(self, hs): + super(EventFederationStore, self).__init__(hs) + + hs.get_clock().looping_call( + self._delete_old_forward_extrem_cache, 60 * 60 * 1000 + ) + def get_auth_chain(self, event_ids): return self.get_auth_chain_ids(event_ids).addCallback(self._get_events) @@ -270,6 +278,37 @@ class EventFederationStore(SQLBaseStore): ] ) + # We now insert into stream_ordering_to_exterm a mapping from room_id, + # new stream_ordering to new forward extremeties in the room. + # This allows us to later efficiently look up the forward extremeties + # for a room before a given stream_ordering + max_stream_ord = max( + ev.internal_metadata.stream_ordering for ev in events + ) + new_extrem = {} + for room_id in events_by_room: + event_ids = self._simple_select_onecol_txn( + txn, + table="event_forward_extremities", + keyvalues={"room_id": room_id}, + retcol="event_id", + ) + new_extrem[room_id] = event_ids + + self._simple_insert_many_txn( + txn, + table="stream_ordering_to_exterm", + values=[ + { + "room_id": room_id, + "event_id": event_id, + "stream_ordering": max_stream_ord, + } + for room_id, extrem_evs in new_extrem.items() + for event_id in extrem_evs + ] + ) + query = ( "INSERT INTO event_backward_extremities (event_id, room_id)" " SELECT ?, ? WHERE NOT EXISTS (" @@ -305,6 +344,48 @@ class EventFederationStore(SQLBaseStore): self.get_latest_event_ids_in_room.invalidate, (room_id,) ) + def get_forward_extremeties_for_room(self, room_id, stream_ordering): + """For a given room_id and stream_ordering, return the forward + extremeties of the room at that point in "time". + + Throws a StoreError if we have since purged the index for + stream_orderings from that point. + """ + + if stream_ordering <= self.stream_ordering_month_ago: + raise StoreError(400, "stream_ordering too old") + + sql = (""" + SELECT event_id FROM stream_ordering_to_exterm + INNER JOIN ( + SELECT room_id, MAX(stream_ordering) AS stream_ordering + FROM stream_ordering_to_exterm + WHERE stream_ordering < ? GROUP BY room_id + ) AS rms USING (room_id, stream_ordering) + WHERE room_id = ? + """) + + def get_forward_extremeties_for_room_txn(txn): + txn.execute(sql, (room_id, stream_ordering,)) + rows = txn.fetchall() + return [event_id for event_id, in rows] + + return self.runInteraction( + "get_forward_extremeties_for_room", + get_forward_extremeties_for_room_txn + ) + + def _delete_old_forward_extrem_cache(self): + def _delete_old_forward_extrem_cache_txn(txn): + txn.execute( + "DELETE FROM stream_ordering_to_exterm WHERE stream_ordering < ?", + (self.stream_ordering_month_ago,) + ) + return self.runInteraction( + "_delete_old_forward_extrem_cache", + _delete_old_forward_extrem_cache_txn + ) + def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occurred before (and including) the events in event_list. Return a list of max size `limit` diff --git a/synapse/storage/schema/delta/35/stream_order_to_extrem.sql b/synapse/storage/schema/delta/35/stream_order_to_extrem.sql new file mode 100644 index 0000000000..2b945d8a57 --- /dev/null +++ b/synapse/storage/schema/delta/35/stream_order_to_extrem.sql @@ -0,0 +1,37 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE stream_ordering_to_exterm ( + stream_ordering BIGINT NOT NULL, + room_id TEXT NOT NULL, + event_id TEXT NOT NULL +); + +INSERT INTO stream_ordering_to_exterm (stream_ordering, room_id, event_id) + SELECT stream_ordering, room_id, event_id FROM event_forward_extremities + INNER JOIN ( + SELECT room_id, max(stream_ordering) as stream_ordering FROM events + INNER JOIN event_forward_extremities USING (room_id, event_id) + GROUP BY room_id + ) AS rms USING (room_id); + +CREATE INDEX stream_ordering_to_exterm_idx on stream_ordering_to_exterm( + stream_ordering +); + +CREATE INDEX stream_ordering_to_exterm_rm_idx on stream_ordering_to_exterm( + room_id, stream_ordering +); -- cgit 1.4.1 From baffe96d95f31f0217be5fbc8c03c5f6b7485d53 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Sep 2016 17:01:02 +0100 Subject: Add a room visibility stream --- synapse/storage/__init__.py | 3 + synapse/storage/event_federation.py | 2 +- synapse/storage/room.py | 78 +++++++++++++++++----- .../delta/35/public_room_list_change_stream.sql | 33 +++++++++ 4 files changed, 100 insertions(+), 16 deletions(-) create mode 100644 synapse/storage/schema/delta/35/public_room_list_change_stream.sql (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a61e83d5de..0099a3f5bb 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -113,6 +113,9 @@ class DataStore(RoomMemberStore, RoomStore, self._device_inbox_id_gen = StreamIdGenerator( db_conn, "device_max_stream_id", "stream_id" ) + self._public_room_id_gen = StreamIdGenerator( + db_conn, "public_room_list_stream", "stream_id" + ) self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id") self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id") diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 9ec67ad0c4..ec6dbe5492 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -366,7 +366,7 @@ class EventFederationStore(SQLBaseStore): """) def get_forward_extremeties_for_room_txn(txn): - txn.execute(sql, (room_id, stream_ordering,)) + txn.execute(sql, (stream_ordering, room_id)) rows = txn.fetchall() return [event_id for event_id, in rows] diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 8251f58670..ef0d79891e 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -48,15 +48,31 @@ class RoomStore(SQLBaseStore): StoreError if the room could not be stored. """ try: - yield self._simple_insert( - "rooms", - { - "room_id": room_id, - "creator": room_creator_user_id, - "is_public": is_public, - }, - desc="store_room", - ) + def store_room_txn(txn, next_id): + self._simple_insert_txn( + txn, + "rooms", + { + "room_id": room_id, + "creator": room_creator_user_id, + "is_public": is_public, + }, + ) + if is_public: + self._simple_insert_txn( + txn, + table="public_room_list_stream", + values={ + "stream_id": next_id, + "room_id": room_id, + "visibility": is_public, + } + ) + with self._public_room_id_gen.get_next() as next_id: + yield self.runInteraction( + "store_room_txn", + store_room_txn, next_id, + ) except Exception as e: logger.error("store_room with room_id=%s failed: %s", room_id, e) raise StoreError(500, "Problem creating room.") @@ -77,13 +93,45 @@ class RoomStore(SQLBaseStore): allow_none=True, ) + @defer.inlineCallbacks def set_room_is_public(self, room_id, is_public): - return self._simple_update_one( - table="rooms", - keyvalues={"room_id": room_id}, - updatevalues={"is_public": is_public}, - desc="set_room_is_public", - ) + def set_room_is_public_txn(txn, next_id): + self._simple_update_one_txn( + txn, + table="rooms", + keyvalues={"room_id": room_id}, + updatevalues={"is_public": is_public}, + ) + + entries = self._simple_select_list_txn( + txn, + table="public_room_list_stream", + keyvalues={"room_id": room_id}, + retcols=("stream_id", "visibility"), + ) + + entries.sort(key=lambda r: r["stream_id"]) + + add_to_stream = True + if entries: + add_to_stream = bool(entries[-1]["visibility"]) != is_public + + if add_to_stream: + self._simple_insert_txn( + txn, + table="public_room_list_stream", + values={ + "stream_id": next_id, + "room_id": room_id, + "visibility": is_public, + } + ) + + with self._public_room_id_gen.get_next() as next_id: + yield self.runInteraction( + "set_room_is_public", + set_room_is_public_txn, next_id, + ) def get_public_room_ids(self): return self._simple_select_onecol( diff --git a/synapse/storage/schema/delta/35/public_room_list_change_stream.sql b/synapse/storage/schema/delta/35/public_room_list_change_stream.sql new file mode 100644 index 0000000000..dd2bf2e28a --- /dev/null +++ b/synapse/storage/schema/delta/35/public_room_list_change_stream.sql @@ -0,0 +1,33 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE public_room_list_stream ( + stream_id BIGINT NOT NULL, + room_id TEXT NOT NULL, + visibility BOOLEAN NOT NULL +); + +INSERT INTO public_room_list_stream (stream_id, room_id, visibility) + SELECT 1, room_id, is_public FROM rooms + WHERE is_public = CAST(1 AS BOOLEAN); + +CREATE INDEX public_room_list_stream_idx on public_room_list_stream( + stream_id +); + +CREATE INDEX public_room_list_stream_rm_idx on public_room_list_stream( + room_id, stream_id +); -- cgit 1.4.1 From c566f0ee17ee015e1a841e209f202c6e8aefdfcd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Sep 2016 17:28:52 +0100 Subject: Calculate the public room list from a stream_ordering --- synapse/handlers/room_list.py | 43 ++++++++++++++++++++++++++++++++++++++++--- synapse/storage/stream.py | 3 +++ 2 files changed, 43 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index a3d554ff2c..2b5a382052 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -52,12 +52,49 @@ class RoomListHandler(BaseHandler): def _get_public_room_list(self): room_ids = yield self.store.get_public_room_ids() + rooms_to_order_value = {} + rooms_to_num_joined = {} + rooms_to_latest_event_ids = {} + + current_stream_token = yield self.store.get_room_max_stream_ordering() + + # We want to return rooms in a particular order: the number of joined + # users. We then arbitrarily use the room_id as a tie breaker. + + @defer.inlineCallbacks + def get_order_for_room(room_id): + latest_event_ids = rooms_to_latest_event_ids.get(room_id, None) + if not latest_event_ids: + latest_event_ids = yield self.store.get_forward_extremeties_for_room( + room_id, current_stream_token + ) + rooms_to_latest_event_ids[room_id] = latest_event_ids + + if not latest_event_ids: + return + + joined_users = yield self.state_handler.get_current_user_in_room( + room_id, latest_event_ids, + ) + num_joined_users = len(joined_users) + rooms_to_num_joined[room_id] = num_joined_users + + if num_joined_users == 0: + return + + # We want larger rooms to be first, hence negating num_joined_users + rooms_to_order_value[room_id] = (-num_joined_users, room_id) + + yield concurrently_execute(get_order_for_room, room_ids, 10) + + sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1]) + sorted_rooms = [room_id for room_id, _ in sorted_entries] + results = [] @defer.inlineCallbacks def handle_room(room_id): - joined_users = yield self.state_handler.get_current_user_in_room(room_id) - num_joined_users = len(joined_users) + num_joined_users = rooms_to_num_joined[room_id] if num_joined_users == 0: return @@ -135,7 +172,7 @@ class RoomListHandler(BaseHandler): results.append(result) - yield concurrently_execute(handle_room, room_ids, 10) + yield concurrently_execute(handle_room, sorted_rooms, 10) # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": results}) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 0577a0525b..07ea969d4d 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -531,6 +531,9 @@ class StreamStore(SQLBaseStore): ) defer.returnValue("t%d-%d" % (topo, token)) + def get_room_max_stream_ordering(self): + return self._stream_id_gen.get_current_token() + def get_stream_token_for_event(self, event_id): """The stream token for an event Args: -- cgit 1.4.1 From 4fb65a10916481e0600d506d4c7e9bcfbffb7092 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Sep 2016 11:27:04 +0100 Subject: Base public room list off of public_rooms stream --- synapse/handlers/room_list.py | 34 ++++++++++++++++++++++------ synapse/storage/room.py | 52 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 7 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 6a62f3c27e..28bc35f8a3 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -55,16 +55,26 @@ class RoomListHandler(BaseHandler): else: since_token = None - room_ids = yield self.store.get_public_room_ids() - rooms_to_order_value = {} rooms_to_num_joined = {} rooms_to_latest_event_ids = {} + newly_visible = [] + newly_unpublished = [] if since_token: - current_stream_token = since_token.stream_ordering + stream_token = since_token.stream_ordering + current_public_id = yield self.store.get_current_public_room_stream_id() + public_room_stream_id = since_token.public_room_stream_id + newly_visible, newly_unpublished = yield self.store.get_public_room_changes( + public_room_stream_id, current_public_id + ) else: - current_stream_token = yield self.store.get_room_max_stream_ordering() + stream_token = yield self.store.get_room_max_stream_ordering() + public_room_stream_id = yield self.store.get_current_public_room_stream_id() + + room_ids = yield self.store.get_public_room_ids_at_stream_id( + public_room_stream_id + ) # We want to return rooms in a particular order: the number of joined # users. We then arbitrarily use the room_id as a tie breaker. @@ -74,7 +84,7 @@ class RoomListHandler(BaseHandler): latest_event_ids = rooms_to_latest_event_ids.get(room_id, None) if not latest_event_ids: latest_event_ids = yield self.store.get_forward_extremeties_for_room( - room_id, current_stream_token + room_id, stream_token ) rooms_to_latest_event_ids[room_id] = latest_event_ids @@ -125,6 +135,9 @@ class RoomListHandler(BaseHandler): if num_joined_users == 0: return + if room_id in newly_unpublished: + return + result = { "room_id": room_id, "num_joined_members": num_joined_users, @@ -207,10 +220,14 @@ class RoomListHandler(BaseHandler): "chunk": chunk, } + if since_token: + results["new_rooms"] = bool(newly_visible) + if not since_token or since_token.direction_is_forward: if new_limit: results["next_batch"] = RoomListNextBatch( - stream_ordering=current_stream_token, + stream_ordering=stream_token, + public_room_stream_id=public_room_stream_id, current_limit=new_limit, direction_is_forward=True, ).to_token() @@ -222,7 +239,8 @@ class RoomListHandler(BaseHandler): else: if new_limit: results["prev_batch"] = RoomListNextBatch( - stream_ordering=current_stream_token, + stream_ordering=stream_token, + public_room_stream_id=public_room_stream_id, current_limit=new_limit, direction_is_forward=False, ).to_token() @@ -245,12 +263,14 @@ class RoomListHandler(BaseHandler): class RoomListNextBatch(namedtuple("RoomListNextBatch", ( "stream_ordering", # stream_ordering of the first public room list + "public_room_stream_id", # public room stream id for first public room list "current_limit", # The number of previous rooms returned "direction_is_forward", # Bool if this is a next_batch, false if prev_batch ))): KEY_DICT = { "stream_ordering": "s", + "public_room_stream_id": "p", "current_limit": "n", "direction_is_forward": "d", } diff --git a/synapse/storage/room.py b/synapse/storage/room.py index ef0d79891e..8aa4545939 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -255,3 +255,55 @@ class RoomStore(SQLBaseStore): }, desc="add_event_report" ) + + def get_current_public_room_stream_id(self): + return self._public_room_id_gen.get_current_token() + + def get_public_room_ids_at_stream_id(self, stream_id): + return self.runInteraction( + "get_public_room_ids_at_stream_id", + self.get_public_room_ids_at_stream_id_txn, stream_id + ) + + def get_public_room_ids_at_stream_id_txn(self, txn, stream_id): + return { + rm + for rm, vis in self.get_published_at_stream_id_txn(txn, stream_id).items() + if vis + } + + def get_published_at_stream_id_txn(self, txn, stream_id): + sql = (""" + SELECT room_id, visibility FROM public_room_list_stream + INNER JOIN ( + SELECT room_id, max(stream_id) AS stream_id + FROM public_room_list_stream + WHERE stream_id <= ? + GROUP BY room_id + ) grouped USING (room_id, stream_id) + """) + + txn.execute(sql, (stream_id,)) + return dict(txn.fetchall()) + + def get_public_room_changes(self, prev_stream_id, new_stream_id): + def get_public_room_changes_txn(txn): + then_rooms = self.get_public_room_ids_at_stream_id_txn(txn, prev_stream_id) + + now_rooms_dict = self.get_published_at_stream_id_txn(txn, new_stream_id) + + now_rooms_visible = set( + rm for rm, vis in now_rooms_dict.items() if vis + ) + now_rooms_not_visible = set( + rm for rm, vis in now_rooms_dict.items() if not vis + ) + + newly_visible = now_rooms_visible - then_rooms + newly_unpublished = now_rooms_not_visible & then_rooms + + return newly_visible, newly_unpublished + + return self.runInteraction( + "get_public_room_changes", get_public_room_changes_txn + ) -- cgit 1.4.1 From 211786ecd629588f2481c94217a4a388b090c993 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Sep 2016 11:47:23 +0100 Subject: Stream public room changes down replication --- synapse/replication/resource.py | 20 ++++++++++++++++++- synapse/replication/slave/storage/events.py | 8 ++++++++ synapse/replication/slave/storage/room.py | 31 +++++++++++++++++++++++++++++ synapse/storage/room.py | 16 +++++++++++++++ 4 files changed, 74 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 299e9419a4..9aab3ce23c 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -42,6 +42,7 @@ STREAM_NAMES = ( ("pushers",), ("caches",), ("to_device",), + ("public_rooms",), ) @@ -131,6 +132,7 @@ class ReplicationResource(Resource): push_rules_token, room_stream_token = self.store.get_push_rules_stream_token() pushers_token = self.store.get_pushers_stream_token() caches_token = self.store.get_cache_stream_token() + public_rooms_token = self.store.get_current_public_room_stream_id() defer.returnValue(_ReplicationToken( room_stream_token, @@ -144,6 +146,7 @@ class ReplicationResource(Resource): 0, # State stream is no longer a thing caches_token, int(stream_token.to_device_key), + int(public_rooms_token), )) @request_handler() @@ -193,6 +196,7 @@ class ReplicationResource(Resource): yield self.pushers(writer, current_token, limit, request_streams) yield self.caches(writer, current_token, limit, request_streams) yield self.to_device(writer, current_token, limit, request_streams) + yield self.public_rooms(writer, current_token, limit, request_streams) self.streams(writer, current_token, request_streams) logger.debug("Replicated %d rows", writer.total) @@ -400,6 +404,20 @@ class ReplicationResource(Resource): "position", "user_id", "device_id", "message_json" )) + @defer.inlineCallbacks + def public_rooms(self, writer, current_token, limit, request_streams): + current_position = current_token.public_rooms + + public_rooms = request_streams.get("public_rooms") + + if public_rooms is not None: + public_rooms_rows = yield self.store.get_all_new_public_rooms( + public_rooms, current_position, limit + ) + writer.write_header_and_rows("public_rooms", public_rooms_rows, ( + "position", "room_id", "visibility" + )) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -428,7 +446,7 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", - "push_rules", "pushers", "state", "caches", "to_device", + "push_rules", "pushers", "state", "caches", "to_device", "public_rooms", ))): __slots__ = [] diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 15c52774a2..f8965c73a0 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -61,6 +61,8 @@ class SlavedEventStore(BaseSlavedStore): "MembershipStreamChangeCache", events_max, ) + self.stream_ordering_month_ago = 0 + # Cached functions can't be accessed through a class instance so we need # to reach inside the __dict__ to extract them. get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"] @@ -168,6 +170,12 @@ class SlavedEventStore(BaseSlavedStore): get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__ _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__ + get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__ + + get_forward_extremeties_for_room = ( + DataStore.get_forward_extremeties_for_room.__func__ + ) + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py index d5bb0f98ea..81743941dc 100644 --- a/synapse/replication/slave/storage/room.py +++ b/synapse/replication/slave/storage/room.py @@ -15,7 +15,38 @@ from ._base import BaseSlavedStore from synapse.storage import DataStore +from ._slaved_id_tracker import SlavedIdTracker class RoomStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(RoomStore, self).__init__(db_conn, hs) + self._public_room_id_gen = SlavedIdTracker( + db_conn, "public_room_list_stream", "stream_id" + ) + get_public_room_ids = DataStore.get_public_room_ids.__func__ + get_current_public_room_stream_id = ( + DataStore.get_current_public_room_stream_id.__func__ + ) + get_public_room_ids_at_stream_id = ( + DataStore.get_public_room_ids_at_stream_id.__func__ + ) + get_public_room_ids_at_stream_id_txn = ( + DataStore.get_public_room_ids_at_stream_id_txn.__func__ + ) + get_published_at_stream_id_txn = ( + DataStore.get_published_at_stream_id_txn.__func__ + ) + + def stream_positions(self): + result = super(RoomStore, self).stream_positions() + result["public_rooms"] = self._public_room_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("public_rooms") + if stream: + self._public_room_id_gen.advance(int(stream["position"])) + + return super(RoomStore, self).process_replication(result) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 8aa4545939..2ef13d7403 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -307,3 +307,19 @@ class RoomStore(SQLBaseStore): return self.runInteraction( "get_public_room_changes", get_public_room_changes_txn ) + + def get_all_new_public_rooms(self, prev_id, current_id, limit): + def get_all_new_public_rooms(txn): + sql = (""" + SELECT stream_id, room_id, visibility FROM public_room_list_stream + WHERE stream_id > ? AND stream_id <= ? + ORDER BY stream_id ASC + LIMIT ? + """) + + txn.execute(sql, (prev_id, current_id, limit,)) + return txn.fetchall() + + return self.runInteraction( + "get_all_new_public_rooms", get_all_new_public_rooms + ) -- cgit 1.4.1 From 55e6fc917c5c6b93b200706b3ef24cb27d80ff93 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Sep 2016 14:04:28 +0100 Subject: Add cache to get_forward_extremeties_for_room --- synapse/replication/slave/storage/events.py | 2 +- synapse/storage/event_federation.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index f8965c73a0..842ced02d6 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -173,7 +173,7 @@ class SlavedEventStore(BaseSlavedStore): get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__ get_forward_extremeties_for_room = ( - DataStore.get_forward_extremeties_for_room.__func__ + EventFederationStore.__dict__["get_forward_extremeties_for_room"] ) def stream_positions(self): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index ec6dbe5492..050b78d652 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -344,6 +344,7 @@ class EventFederationStore(SQLBaseStore): self.get_latest_event_ids_in_room.invalidate, (room_id,) ) + @cached(max_entries=5000, num_args=2) def get_forward_extremeties_for_room(self, room_id, stream_ordering): """For a given room_id and stream_ordering, return the forward extremeties of the room at that point in "time". -- cgit 1.4.1 From cb3edec6af55efb126f5e7ee66c4d895ef35a66e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Sep 2016 14:27:15 +0100 Subject: Use stream_change cache to make get_forward_extremeties_for_room cache more effective --- synapse/replication/slave/storage/events.py | 5 ++++- synapse/storage/event_federation.py | 11 ++++++++++- synapse/util/caches/stream_change_cache.py | 5 +++++ 3 files changed, 19 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 842ced02d6..cc32c66792 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -173,7 +173,10 @@ class SlavedEventStore(BaseSlavedStore): get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__ get_forward_extremeties_for_room = ( - EventFederationStore.__dict__["get_forward_extremeties_for_room"] + DataStore.get_forward_extremeties_for_room.__func__ + ) + _get_forward_extremeties_for_room = ( + EventFederationStore.__dict__["_get_forward_extremeties_for_room"] ) def stream_positions(self): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 050b78d652..97d0c26475 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -344,8 +344,17 @@ class EventFederationStore(SQLBaseStore): self.get_latest_event_ids_in_room.invalidate, (room_id,) ) - @cached(max_entries=5000, num_args=2) def get_forward_extremeties_for_room(self, room_id, stream_ordering): + # We want to make the cache more effective, so we clamp to the last + # change before the given ordering. + last_change = self._events_stream_cache.get_pos_of_last_change(room_id) + if last_change: + stream_ordering = min(last_change, stream_ordering) + + return self._get_forward_extremeties_for_room(room_id, stream_ordering) + + @cached(max_entries=5000, num_args=2) + def _get_forward_extremeties_for_room(self, room_id, stream_ordering): """For a given room_id and stream_ordering, return the forward extremeties of the room at that point in "time". diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 3c051dabc4..5c2a433e41 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -121,3 +121,8 @@ class StreamChangeCache(object): k, r = self._cache.popitem() self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos) self._entity_to_key.pop(r, None) + + def get_pos_of_last_change(self, entity): + """Returns the stream pos of the last change for an entitiy, if known. + """ + return self._entity_to_key.get(entity, None) -- cgit 1.4.1 From 955f34d23e03c30c5c85df542e3b9b8bf9970110 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Sep 2016 15:12:07 +0100 Subject: Change get_pos_of_last_change to return upper bound --- synapse/storage/event_federation.py | 5 ++--- synapse/util/caches/stream_change_cache.py | 7 ++++--- 2 files changed, 6 insertions(+), 6 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 97d0c26475..59b4cf1e53 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -347,9 +347,8 @@ class EventFederationStore(SQLBaseStore): def get_forward_extremeties_for_room(self, room_id, stream_ordering): # We want to make the cache more effective, so we clamp to the last # change before the given ordering. - last_change = self._events_stream_cache.get_pos_of_last_change(room_id) - if last_change: - stream_ordering = min(last_change, stream_ordering) + last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id) + stream_ordering = min(last_change, stream_ordering) return self._get_forward_extremeties_for_room(room_id, stream_ordering) diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 5c2a433e41..b72bb0ff02 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -122,7 +122,8 @@ class StreamChangeCache(object): self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos) self._entity_to_key.pop(r, None) - def get_pos_of_last_change(self, entity): - """Returns the stream pos of the last change for an entitiy, if known. + def get_max_pos_of_last_change(self, entity): + """Returns an upper bound of the stream id of the last change to an + entity. """ - return self._entity_to_key.get(entity, None) + return self._entity_to_key.get(entity, self._earliest_known_stream_pos) -- cgit 1.4.1 From de4f798f01e2ad478639b103d293b86079aaf0bc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Sep 2016 17:34:59 +0100 Subject: Handling expiring stream extrems correctly. --- synapse/storage/__init__.py | 2 ++ synapse/storage/event_federation.py | 23 ++++++++++++++++++++--- 2 files changed, 22 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 0099a3f5bb..9996f195a0 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -222,6 +222,8 @@ class DataStore(RoomMemberStore, RoomStore, self._find_stream_orderings_for_times, 60 * 60 * 1000 ) + self._stream_order_on_start = self.get_room_max_stream_ordering() + super(DataStore, self).__init__(hs) def take_presence_startup_info(self): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 59b4cf1e53..765b5a5bcb 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -348,7 +348,14 @@ class EventFederationStore(SQLBaseStore): # We want to make the cache more effective, so we clamp to the last # change before the given ordering. last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id) - stream_ordering = min(last_change, stream_ordering) + + # We don't always have a full stream_to_exterm_id table, e.g. after + # the upgrade that introduced it, so we make sure we never ask for a + # try and pin to a stream_ordering from before a restart + last_change = max(self._stream_order_on_start, last_change) + + if last_change > self.stream_ordering_month_ago: + stream_ordering = min(last_change, stream_ordering) return self._get_forward_extremeties_for_room(room_id, stream_ordering) @@ -386,9 +393,19 @@ class EventFederationStore(SQLBaseStore): def _delete_old_forward_extrem_cache(self): def _delete_old_forward_extrem_cache_txn(txn): + sql = (""" + DELETE FROM stream_ordering_to_exterm + WHERE + ( + SELECT max(stream_ordering) AS stream_ordering + FROM stream_ordering_to_exterm + WHERE room_id = stream_ordering_to_exterm.room_id + ) > ? + AND stream_ordering < ? + """) txn.execute( - "DELETE FROM stream_ordering_to_exterm WHERE stream_ordering < ?", - (self.stream_ordering_month_ago,) + sql, + (self.stream_ordering_month_ago, self.stream_ordering_month_ago,) ) return self.runInteraction( "_delete_old_forward_extrem_cache", -- cgit 1.4.1 From e58a9d781c7808b66f6eda221c9ce91ccd3cd8d6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Sep 2016 10:19:32 +0100 Subject: Filter remote rooms lists locally --- synapse/handlers/room_list.py | 34 ++++++++++++++++++++++++---------- synapse/storage/event_federation.py | 2 +- 2 files changed, 25 insertions(+), 11 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 37213f4bd8..9383f2486c 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -212,16 +212,7 @@ class RoomListHandler(BaseHandler): if avatar_url: result["avatar_url"] = avatar_url - logger.info("search_filter: %r", search_filter) - if search_filter and search_filter.get("generic_search_term", None): - generic_search_term = search_filter["generic_search_term"] - if generic_search_term in result.get("name", ""): - chunk.append(result) - elif generic_search_term in result.get("topic", ""): - chunk.append(result) - elif generic_search_term in result.get("canonical_alias", ""): - chunk.append(result) - else: + if _matches_room_entry(result, search_filter): chunk.append(result) yield concurrently_execute(handle_room, rooms_to_scan, 10) @@ -291,8 +282,16 @@ class RoomListHandler(BaseHandler): search_filter=None): res = yield self.hs.get_replication_layer().get_public_rooms( server_name, limit=limit, since_token=since_token, + search_filter=search_filter, ) + if search_filter: + res["chunk"] = [ + entry + for entry in dict(res.get("chunk", [])) + if _matches_room_entry(entry, search_filter) + ] + defer.returnValue(res) @@ -329,3 +328,18 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", ( return self._replace( **kwds ) + + +def _matches_room_entry(room_entry, search_filter): + if search_filter and search_filter.get("generic_search_term", None): + generic_search_term = search_filter["generic_search_term"] + if generic_search_term in room_entry.get("name", ""): + return True + elif generic_search_term in room_entry.get("topic", ""): + return True + elif generic_search_term in room_entry.get("canonical_alias", ""): + return True + else: + return True + + return False diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 765b5a5bcb..53289f556b 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -376,7 +376,7 @@ class EventFederationStore(SQLBaseStore): INNER JOIN ( SELECT room_id, MAX(stream_ordering) AS stream_ordering FROM stream_ordering_to_exterm - WHERE stream_ordering < ? GROUP BY room_id + WHERE stream_ordering <= ? GROUP BY room_id ) AS rms USING (room_id, stream_ordering) WHERE room_id = ? """) -- cgit 1.4.1 From a68807d4260778cff706fc37ef1badd4fd0d8bf2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Sep 2016 11:34:01 +0100 Subject: Comment --- synapse/storage/event_federation.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 53289f556b..3d62451de9 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -393,6 +393,8 @@ class EventFederationStore(SQLBaseStore): def _delete_old_forward_extrem_cache(self): def _delete_old_forward_extrem_cache_txn(txn): + # Delete entries older than a month, while making sure we don't delete + # the only entries for a room. sql = (""" DELETE FROM stream_ordering_to_exterm WHERE -- cgit 1.4.1