From 17d585753f1df2b2c2b13ddb8171e174cef97aac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Oct 2018 15:18:52 +0100 Subject: Delete unreferened state groups during purge --- synapse/storage/events.py | 33 +++++++++++++++++++++++++------ synapse/storage/state.py | 50 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 6 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e7487311ce..0fb190530a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2025,6 +2025,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore logger.info("[purge] finding state groups which depend on redundant" " state groups") remaining_state_groups = [] + unreferenced_state_groups = 0 for i in range(0, len(state_rows), 100): chunk = [sg for sg, in state_rows[i:i + 100]] # look for state groups whose prev_state_group is one we are about @@ -2037,13 +2038,33 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore retcols=["state_group"], keyvalues={}, ) - remaining_state_groups.extend( - row["state_group"] for row in rows - # exclude state groups we are about to delete: no point in - # updating them - if row["state_group"] not in state_groups_to_delete - ) + for row in rows: + sg = row["state_group"] + + if sg in state_groups_to_delete: + # exclude state groups we are about to delete: no point in + # updating them + continue + + if not self._is_state_group_referenced(txn, sg): + # Let's also delete unreferenced state groups while we're + # here, since otherwise we'd need to de-delta them + state_groups_to_delete.add(sg) + unreferenced_state_groups += 1 + continue + + remaining_state_groups.append(sg) + + logger.info( + "[purge] found %i extra unreferenced state groups to delete", + unreferenced_state_groups, + ) + + logger.info( + "[purge] de-delta-ing %i remaining state groups", + len(remaining_state_groups), + ) # Now we turn the state groups that reference to-be-deleted state # groups to non delta versions. diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 3f4cbd61c4..b88c7dc091 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -1041,6 +1041,56 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return count + def _is_state_group_referenced(self, txn, state_group): + """Checks if a given state group is referenced, or is safe to delete. + + A state groups is referenced if it or any of its descendants are + pointed at by an event. (A descendant is a group which has the given + state_group as a prev group) + """ + + # We check this by doing a depth first search to look for any + # descendant referenced by `event_to_state_groups`. + + # State groups we need to check, contains state groups that are + # descendants of `state_group` + state_groups_to_search = [state_group] + + # Set of state groups we've already checked + state_groups_searched = set() + + while state_groups_to_search: + state_group = state_groups_to_search.pop() # Next state group to check + + is_referenced = self._simple_select_one_onecol_txn( + txn, + table="event_to_state_groups", + keyvalues={"state_group": state_group}, + retcol="event_id", + allow_none=True, + ) + if is_referenced: + # A descendant is referenced by event_to_state_groups, so + # original state group is referenced. + return True + + state_groups_searched.add(state_group) + + # Find all children of current state group and add to search + references = self._simple_select_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"prev_state_group": state_group}, + retcol="state_group", + ) + state_groups_to_search.extend(references) + + # Lets be paranoid and check for cycles + if state_groups_searched.intersection(references): + raise Exception("State group %s has cyclic dependency", state_group) + + return False + class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): """ Keeps track of the state at a given event. -- cgit 1.4.1 From 4917ff55234718c0e650c6dc2a1117304465b9be Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Oct 2018 15:24:01 +0100 Subject: Add state_group index to event_to_state_groups This is needed to efficiently check for unreferenced state groups during purge. --- synapse/storage/prepare_database.py | 2 +- .../delta/52/add_event_to_state_group_index.sql | 19 +++++++++++++++++++ synapse/storage/state.py | 7 +++++++ 3 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/52/add_event_to_state_group_index.sql (limited to 'synapse/storage') diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index b364719312..bd740e1e45 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 51 +SCHEMA_VERSION = 52 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/52/add_event_to_state_group_index.sql b/synapse/storage/schema/delta/52/add_event_to_state_group_index.sql new file mode 100644 index 0000000000..91e03d13e1 --- /dev/null +++ b/synapse/storage/schema/delta/52/add_event_to_state_group_index.sql @@ -0,0 +1,19 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- This is needed to efficiently check for unreferenced state groups during +-- purge. Added events_to_state_group(state_group) index +INSERT into background_updates (update_name, progress_json) + VALUES ('event_to_state_groups_sg_index', '{}'); diff --git a/synapse/storage/state.py b/synapse/storage/state.py index b88c7dc091..3f08f447a3 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -1114,6 +1114,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index" CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx" + EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index" def __init__(self, db_conn, hs): super(StateStore, self).__init__(db_conn, hs) @@ -1132,6 +1133,12 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): columns=["state_key"], where_clause="type='m.room.member'", ) + self.register_background_index_update( + self.EVENT_STATE_GROUP_INDEX_UPDATE_NAME, + index_name="event_to_state_groups_sg_index", + table="event_to_state_groups", + columns=["state_group"], + ) def _store_event_state_mappings_txn(self, txn, events_and_contexts): state_groups = {} -- cgit 1.4.1 From 67a1e315cc7f8b270ec87e0ab6e58aa2adaee32d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Oct 2018 13:49:48 +0100 Subject: Fix up comments --- synapse/storage/state.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 3f08f447a3..f7cf5c86c9 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -1044,9 +1044,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): def _is_state_group_referenced(self, txn, state_group): """Checks if a given state group is referenced, or is safe to delete. - A state groups is referenced if it or any of its descendants are - pointed at by an event. (A descendant is a group which has the given - state_group as a prev group) + A state group is referenced if it or any of its descendants are + pointed at by an event. (A descendant is a state_group whose chain of + prev_groups includes the given state_group.) """ # We check this by doing a depth first search to look for any -- cgit 1.4.1 From 47a9da28caa6a4f27d2df31f043971a2c9c7b555 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Oct 2018 20:43:18 +0100 Subject: Batch process handling state groups --- synapse/storage/events.py | 81 +++++++++------------------------ synapse/storage/state.py | 112 +++++++++++++++++++++++++++++----------------- 2 files changed, 92 insertions(+), 101 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0fb190530a..e4d0f8b1a9 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -37,6 +37,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.event_federation import EventFederationStore from synapse.storage.events_worker import EventsWorkerStore +from synapse.storage.state import StateGroupWorkerStore from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedInlineCallbacks @@ -203,7 +204,8 @@ def _retry_on_integrity_error(func): # inherits from EventFederationStore so that we can call _update_backward_extremities # and _handle_mult_prev_events (though arguably those could both be moved in here) -class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore): +class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore, + BackgroundUpdateStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" @@ -1995,70 +1997,29 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore logger.info("[purge] finding redundant state groups") - # Get all state groups that are only referenced by events that are - # to be deleted. - # This works by first getting state groups that we may want to delete, - # joining against event_to_state_groups to get events that use that - # state group, then left joining against events_to_purge again. Any - # state group where the left join produce *no nulls* are referenced - # only by events that are going to be purged. + # Get all state groups that are referenced by events that are to be + # deleted. We then go and check if they are referenced by other events + # or state groups, and if not we delete them. txn.execute(""" - SELECT state_group FROM - ( - SELECT DISTINCT state_group FROM events_to_purge - INNER JOIN event_to_state_groups USING (event_id) - ) AS sp - INNER JOIN event_to_state_groups USING (state_group) - LEFT JOIN events_to_purge AS ep USING (event_id) - GROUP BY state_group - HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0 + SELECT DISTINCT state_group FROM events_to_purge + INNER JOIN event_to_state_groups USING (event_id) """) - state_rows = txn.fetchall() - logger.info("[purge] found %i redundant state groups", len(state_rows)) - - # make a set of the redundant state groups, so that we can look them up - # efficiently - state_groups_to_delete = set([sg for sg, in state_rows]) - - # Now we get all the state groups that rely on these state groups - logger.info("[purge] finding state groups which depend on redundant" - " state groups") - remaining_state_groups = [] - unreferenced_state_groups = 0 - for i in range(0, len(state_rows), 100): - chunk = [sg for sg, in state_rows[i:i + 100]] - # look for state groups whose prev_state_group is one we are about - # to delete - rows = self._simple_select_many_txn( - txn, - table="state_group_edges", - column="prev_state_group", - iterable=chunk, - retcols=["state_group"], - keyvalues={}, - ) - - for row in rows: - sg = row["state_group"] - - if sg in state_groups_to_delete: - # exclude state groups we are about to delete: no point in - # updating them - continue + referenced_state_groups = set(sg for sg, in txn) + logger.info( + "[purge] found %i referenced state groups", + len(referenced_state_groups), + ) - if not self._is_state_group_referenced(txn, sg): - # Let's also delete unreferenced state groups while we're - # here, since otherwise we'd need to de-delta them - state_groups_to_delete.add(sg) - unreferenced_state_groups += 1 - continue + logger.info("[purge] finding state groups that can be deleted") - remaining_state_groups.append(sg) + state_groups_to_delete, remaining_state_groups = self._find_unreferenced_groups( + txn, referenced_state_groups, + ) logger.info( - "[purge] found %i extra unreferenced state groups to delete", - unreferenced_state_groups, + "[purge] found %i state groups to delete", + len(state_groups_to_delete), ) logger.info( @@ -2109,11 +2070,11 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore logger.info("[purge] removing redundant state groups") txn.executemany( "DELETE FROM state_groups_state WHERE state_group = ?", - state_rows + ((sg,) for sg in state_groups_to_delete), ) txn.executemany( "DELETE FROM state_groups WHERE id = ?", - state_rows + ((sg,) for sg in state_groups_to_delete), ) logger.info("[purge] removing events from event_to_state_groups") diff --git a/synapse/storage/state.py b/synapse/storage/state.py index f7cf5c86c9..0f86311ed4 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -1041,55 +1041,85 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return count - def _is_state_group_referenced(self, txn, state_group): - """Checks if a given state group is referenced, or is safe to delete. + def _find_unreferenced_groups(self, txn, state_groups): + """Used when purging history to figure out which state groups can be + deleted and which need to be de-delta'ed (due to one of its prev groups + being scheduled for deletion). - A state group is referenced if it or any of its descendants are - pointed at by an event. (A descendant is a state_group whose chain of - prev_groups includes the given state_group.) - """ - - # We check this by doing a depth first search to look for any - # descendant referenced by `event_to_state_groups`. - - # State groups we need to check, contains state groups that are - # descendants of `state_group` - state_groups_to_search = [state_group] - - # Set of state groups we've already checked - state_groups_searched = set() - - while state_groups_to_search: - state_group = state_groups_to_search.pop() # Next state group to check + Args: + txn + state_groups (set[int]): Set of state groups referenced by events + that are going to be deleted. - is_referenced = self._simple_select_one_onecol_txn( + Returns: + tuple[set[int], set[int]]: The set of state groups that can be + deleted and the set of state groups that need to be de-delta'ed + """ + # Graph of state group -> previous group + graph = {} + + # Set of events that we have found to be referenced by events + referenced_groups = set() + + # Set of state groups we've already seen + state_groups_seen = set(state_groups) + + # Set of state groups to handle next. + next_to_search = set(state_groups) + while next_to_search: + # We bound size of groups we're looking up at once, to stop the + # SQL query getting too big + if len(next_to_search) < 100: + current_search = next_to_search + next_to_search = set() + else: + lst = list(next_to_search) + current_search = set(lst[:100]) + next_to_search = set(lst[100:]) + + # Check if state groups are referenced + sql = """ + SELECT state_group, count(*) FROM event_to_state_groups + LEFT JOIN events_to_purge AS ep USING (event_id) + WHERE state_group IN (%s) AND ep.event_id IS NULL + GROUP BY state_group + """ % (",".join("?" for _ in current_search),) + txn.execute(sql, list(current_search)) + + referenced = set(sg for sg, cnt in txn if cnt > 0) + referenced_groups |= referenced + + # We don't continue iterating up the state group graphs for state + # groups that are referenced. + current_search -= referenced + + rows = self._simple_select_many_txn( txn, - table="event_to_state_groups", - keyvalues={"state_group": state_group}, - retcol="event_id", - allow_none=True, + table="state_group_edges", + column="prev_state_group", + iterable=current_search, + keyvalues={}, + retcols=("prev_state_group", "state_group",), ) - if is_referenced: - # A descendant is referenced by event_to_state_groups, so - # original state group is referenced. - return True - state_groups_searched.add(state_group) + next_to_search.update(row["state_group"] for row in rows) + # We don't bother re-handling groups we've already seen + next_to_search -= state_groups_seen + state_groups_seen |= next_to_search - # Find all children of current state group and add to search - references = self._simple_select_onecol_txn( - txn, - table="state_group_edges", - keyvalues={"prev_state_group": state_group}, - retcol="state_group", - ) - state_groups_to_search.extend(references) + for row in rows: + # Note: Each state group can have at most one prev group + graph[row["state_group"]] = row["prev_state_group"] + + to_delete = state_groups_seen - referenced_groups - # Lets be paranoid and check for cycles - if state_groups_searched.intersection(references): - raise Exception("State group %s has cyclic dependency", state_group) + to_dedelta = set() + for sg in referenced_groups: + prev_sg = graph.get(sg) + if prev_sg and prev_sg in to_delete: + to_dedelta.add(sg) - return False + return to_delete, to_dedelta class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): -- cgit 1.4.1 From 67f7b9cb50c246226b94027e3c1d4b958ff9f840 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Oct 2018 16:06:59 +0100 Subject: pep8 --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index af822fb69d..379b5c514f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2041,7 +2041,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore INNER JOIN event_to_state_groups USING (event_id) """) - referenced_state_groups = set(sg for sg, in txn) + referenced_state_groups = set(sg for sg, in txn) logger.info( "[purge] found %i referenced state groups", len(referenced_state_groups), -- cgit 1.4.1 From b2399f6281d7cd11e7762b683bdd5a4f0c24927e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 29 Oct 2018 14:01:11 +0000 Subject: Make SQL a bit cleaner --- synapse/storage/state.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 59a50a5df9..45afd42b3f 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -1272,14 +1272,13 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): # Check if state groups are referenced sql = """ - SELECT state_group, count(*) FROM event_to_state_groups + SELECT DISTINCT state_group FROM event_to_state_groups LEFT JOIN events_to_purge AS ep USING (event_id) WHERE state_group IN (%s) AND ep.event_id IS NULL - GROUP BY state_group """ % (",".join("?" for _ in current_search),) txn.execute(sql, list(current_search)) - referenced = set(sg for sg, cnt in txn if cnt > 0) + referenced = set(sg for sg, in txn) referenced_groups |= referenced # We don't continue iterating up the state group graphs for state -- cgit 1.4.1 From f4f223aa4455bea3aa642c23ae957932b1168ba3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 29 Oct 2018 14:01:49 +0000 Subject: Don't make temporary list --- synapse/storage/state.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 45afd42b3f..947d3fc177 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -1266,9 +1266,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): current_search = next_to_search next_to_search = set() else: - lst = list(next_to_search) - current_search = set(lst[:100]) - next_to_search = set(lst[100:]) + current_search = set(islice(next_to_search, 100)) + next_to_search -= current_search # Check if state groups are referenced sql = """ -- cgit 1.4.1 From 664b192a3b4aa57597d9832361b025bc078ea87a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 29 Oct 2018 14:21:43 +0000 Subject: Fix set operations thinko --- synapse/storage/state.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 947d3fc177..dfec57c045 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -1293,10 +1293,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): retcols=("prev_state_group", "state_group",), ) - next_to_search.update(row["state_group"] for row in rows) + prevs = set(row["state_group"] for row in rows) # We don't bother re-handling groups we've already seen - next_to_search -= state_groups_seen - state_groups_seen |= next_to_search + prevs -= state_groups_seen + next_to_search |= prevs + state_groups_seen |= prevs for row in rows: # Note: Each state group can have at most one prev group -- cgit 1.4.1 From ad88460e0d2e0a7c2cf39ec5539d5c4ff030bbbd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 29 Oct 2018 14:23:34 +0000 Subject: Move _find_unreferenced_groups --- synapse/storage/events.py | 85 +++++++++++++++++++++++++++++++++++++++++++++-- synapse/storage/state.py | 79 ------------------------------------------- 2 files changed, 83 insertions(+), 81 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e791922733..919e855f3b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2052,8 +2052,10 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore logger.info("[purge] finding state groups that can be deleted") - state_groups_to_delete, remaining_state_groups = self._find_unreferenced_groups( - txn, referenced_state_groups, + state_groups_to_delete, remaining_state_groups = ( + self._find_unreferenced_groups_during_purge( + txn, referenced_state_groups, + ) ) logger.info( @@ -2209,6 +2211,85 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore logger.info("[purge] done") + def _find_unreferenced_groups_during_purge(self, txn, state_groups): + """Used when purging history to figure out which state groups can be + deleted and which need to be de-delta'ed (due to one of its prev groups + being scheduled for deletion). + + Args: + txn + state_groups (set[int]): Set of state groups referenced by events + that are going to be deleted. + + Returns: + tuple[set[int], set[int]]: The set of state groups that can be + deleted and the set of state groups that need to be de-delta'ed + """ + # Graph of state group -> previous group + graph = {} + + # Set of events that we have found to be referenced by events + referenced_groups = set() + + # Set of state groups we've already seen + state_groups_seen = set(state_groups) + + # Set of state groups to handle next. + next_to_search = set(state_groups) + while next_to_search: + # We bound size of groups we're looking up at once, to stop the + # SQL query getting too big + if len(next_to_search) < 100: + current_search = next_to_search + next_to_search = set() + else: + current_search = set(itertools.islice(next_to_search, 100)) + next_to_search -= current_search + + # Check if state groups are referenced + sql = """ + SELECT DISTINCT state_group FROM event_to_state_groups + LEFT JOIN events_to_purge AS ep USING (event_id) + WHERE state_group IN (%s) AND ep.event_id IS NULL + """ % (",".join("?" for _ in current_search),) + txn.execute(sql, list(current_search)) + + referenced = set(sg for sg, in txn) + referenced_groups |= referenced + + # We don't continue iterating up the state group graphs for state + # groups that are referenced. + current_search -= referenced + + rows = self._simple_select_many_txn( + txn, + table="state_group_edges", + column="prev_state_group", + iterable=current_search, + keyvalues={}, + retcols=("prev_state_group", "state_group",), + ) + + prevs = set(row["state_group"] for row in rows) + # We don't bother re-handling groups we've already seen + prevs -= state_groups_seen + next_to_search |= prevs + state_groups_seen |= prevs + + for row in rows: + # Note: Each state group can have at most one prev group + graph[row["state_group"]] = row["prev_state_group"] + + to_delete = state_groups_seen - referenced_groups + + to_dedelta = set() + for sg in referenced_groups: + prev_sg = graph.get(sg) + if prev_sg and prev_sg in to_delete: + to_dedelta.add(sg) + + return to_delete, to_dedelta + @defer.inlineCallbacks def is_event_after(self, event_id1, event_id2): """Returns True if event_id1 is after event_id2 in the stream diff --git a/synapse/storage/state.py b/synapse/storage/state.py index dfec57c045..d737bd6778 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -1234,85 +1234,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return count - def _find_unreferenced_groups(self, txn, state_groups): - """Used when purging history to figure out which state groups can be - deleted and which need to be de-delta'ed (due to one of its prev groups - being scheduled for deletion). - - Args: - txn - state_groups (set[int]): Set of state groups referenced by events - that are going to be deleted. - - Returns: - tuple[set[int], set[int]]: The set of state groups that can be - deleted and the set of state groups that need to be de-delta'ed - """ - # Graph of state group -> previous group - graph = {} - - # Set of events that we have found to be referenced by events - referenced_groups = set() - - # Set of state groups we've already seen - state_groups_seen = set(state_groups) - - # Set of state groups to handle next. - next_to_search = set(state_groups) - while next_to_search: - # We bound size of groups we're looking up at once, to stop the - # SQL query getting too big - if len(next_to_search) < 100: - current_search = next_to_search - next_to_search = set() - else: - current_search = set(islice(next_to_search, 100)) - next_to_search -= current_search - - # Check if state groups are referenced - sql = """ - SELECT DISTINCT state_group FROM event_to_state_groups - LEFT JOIN events_to_purge AS ep USING (event_id) - WHERE state_group IN (%s) AND ep.event_id IS NULL - """ % (",".join("?" for _ in current_search),) - txn.execute(sql, list(current_search)) - - referenced = set(sg for sg, in txn) - referenced_groups |= referenced - - # We don't continue iterating up the state group graphs for state - # groups that are referenced. - current_search -= referenced - - rows = self._simple_select_many_txn( - txn, - table="state_group_edges", - column="prev_state_group", - iterable=current_search, - keyvalues={}, - retcols=("prev_state_group", "state_group",), - ) - - prevs = set(row["state_group"] for row in rows) - # We don't bother re-handling groups we've already seen - prevs -= state_groups_seen - next_to_search |= prevs - state_groups_seen |= prevs - - for row in rows: - # Note: Each state group can have at most one prev group - graph[row["state_group"]] = row["prev_state_group"] - - to_delete = state_groups_seen - referenced_groups - - to_dedelta = set() - for sg in referenced_groups: - prev_sg = graph.get(sg) - if prev_sg and prev_sg in to_delete: - to_dedelta.add(sg) - - return to_delete, to_dedelta - class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): """ Keeps track of the state at a given event. -- cgit 1.4.1 From 50e328d1e7a61268dbf274f10798ea5994c6c25a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 1 Nov 2018 19:01:29 +0000 Subject: Remove redundant database locks for device list updates We can rely on the application-level per-user linearizer. --- synapse/storage/devices.py | 45 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 40 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 62497ab63f..bc3f575b1e 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -239,7 +239,19 @@ class DeviceStore(SQLBaseStore): def update_remote_device_list_cache_entry(self, user_id, device_id, content, stream_id): - """Updates a single user's device in the cache. + """Updates a single device in the cache of a remote user's devicelist. + + Note: assumes that we are the only thread that can be updating this user's + device list. + + Args: + user_id (str): User to update device list for + device_id (str): ID of decivice being updated + content (dict): new data on this device + stream_id (int): the version of the device list + + Returns: + Deferred[None] """ return self.runInteraction( "update_remote_device_list_cache_entry", @@ -272,7 +284,11 @@ class DeviceStore(SQLBaseStore): }, values={ "content": json.dumps(content), - } + }, + + # we don't need to lock, because we assume we are the only thread + # updating this user's devices. + lock=False, ) txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,)) @@ -289,11 +305,26 @@ class DeviceStore(SQLBaseStore): }, values={ "stream_id": stream_id, - } + }, + + # again, we can assume we are the only thread updating this user's + # extremity. + lock=False, ) def update_remote_device_list_cache(self, user_id, devices, stream_id): - """Replace the cache of the remote user's devices. + """Replace the entire cache of the remote user's devices. + + Note: assumes that we are the only thread that can be updating this user's + device list. + + Args: + user_id (str): User to update device list for + devices (list[dict]): list of device objects supplied over federation + stream_id (int): the version of the device list + + Returns: + Deferred[None] """ return self.runInteraction( "update_remote_device_list_cache", @@ -338,7 +369,11 @@ class DeviceStore(SQLBaseStore): }, values={ "stream_id": stream_id, - } + }, + + # we don't need to lock, because we can assume we are the only thread + # updating this user's extremity. + lock=False, ) def get_devices_by_remote(self, destination, from_stream_id): -- cgit 1.4.1 From 350f654e7b80ff19d1fdf3861093cef09ccf3ab1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 1 Nov 2018 19:10:33 +0000 Subject: Add unique indexes to a couple of tables The indexes on device_lists_remote_extremeties can be unique, and they therefore should, to ensure that the db remains consistent. --- synapse/storage/devices.py | 49 +++++++++++++++++++++- .../schema/delta/40/device_list_streams.sql | 9 ++-- .../delta/52/device_list_streams_unique_idx.sql | 36 ++++++++++++++++ 3 files changed, 88 insertions(+), 6 deletions(-) create mode 100644 synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql (limited to 'synapse/storage') diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index bc3f575b1e..ecdab34e7d 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -22,14 +22,19 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList -from ._base import Cache, SQLBaseStore, db_to_json +from ._base import Cache, db_to_json logger = logging.getLogger(__name__) +DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = ( + "drop_device_list_streams_non_unique_indexes" +) -class DeviceStore(SQLBaseStore): + +class DeviceStore(BackgroundUpdateStore): def __init__(self, db_conn, hs): super(DeviceStore, self).__init__(db_conn, hs) @@ -52,6 +57,30 @@ class DeviceStore(SQLBaseStore): columns=["user_id", "device_id"], ) + # create a unique index on device_lists_remote_cache + self.register_background_index_update( + "device_lists_remote_cache_unique_idx", + index_name="device_lists_remote_cache_unique_id", + table="device_lists_remote_cache", + columns=["user_id", "device_id"], + unique=True, + ) + + # And one on device_lists_remote_extremeties + self.register_background_index_update( + "device_lists_remote_extremeties_unique_idx", + index_name="device_lists_remote_extremeties_unique_idx", + table="device_lists_remote_extremeties", + columns=["user_id"], + unique=True, + ) + + # once they complete, we can remove the old non-unique indexes. + self.register_background_update_handler( + DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES, + self._drop_device_list_streams_non_unique_indexes, + ) + @defer.inlineCallbacks def store_device(self, user_id, device_id, initial_device_display_name): @@ -757,3 +786,19 @@ class DeviceStore(SQLBaseStore): "_prune_old_outbound_device_pokes", _prune_txn, ) + + @defer.inlineCallbacks + def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size): + def f(conn): + txn = conn.cursor() + txn.execute( + "DROP INDEX IF EXISTS device_lists_remote_cache_id" + ) + txn.execute( + "DROP INDEX IF EXISTS device_lists_remote_extremeties_id" + ) + txn.close() + + yield self.runWithConnection(f) + yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES) + defer.returnValue(1) diff --git a/synapse/storage/schema/delta/40/device_list_streams.sql b/synapse/storage/schema/delta/40/device_list_streams.sql index 54841b3843..dd6dcb65f1 100644 --- a/synapse/storage/schema/delta/40/device_list_streams.sql +++ b/synapse/storage/schema/delta/40/device_list_streams.sql @@ -20,9 +20,6 @@ CREATE TABLE device_lists_remote_cache ( content TEXT NOT NULL ); -CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id); - - -- The last update we got for a user. Empty if we're not receiving updates for -- that user. CREATE TABLE device_lists_remote_extremeties ( @@ -30,7 +27,11 @@ CREATE TABLE device_lists_remote_extremeties ( stream_id TEXT NOT NULL ); -CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id); +-- we used to create non-unique indexes on these tables, but as of update 52 we create +-- unique indexes concurrently: +-- +-- CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id); +-- CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id); -- Stream of device lists updates. Includes both local and remotes diff --git a/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql b/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql new file mode 100644 index 0000000000..bfa49e6f92 --- /dev/null +++ b/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql @@ -0,0 +1,36 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- register a background update which will create a unique index on +-- device_lists_remote_cache +INSERT into background_updates (update_name, progress_json) + VALUES ('device_lists_remote_cache_unique_idx', '{}'); + +-- and one on device_lists_remote_extremeties +INSERT into background_updates (update_name, progress_json, depends_on) + VALUES ( + 'device_lists_remote_extremeties_unique_idx', '{}', + + -- doesn't really depend on this, but we need to make sure both happen + -- before we drop the old indexes. + 'device_lists_remote_cache_unique_idx' + ); + +-- once they complete, we can drop the old indexes. +INSERT into background_updates (update_name, progress_json, depends_on) + VALUES ( + 'drop_device_list_streams_non_unique_indexes', '{}', + 'device_lists_remote_extremeties_unique_idx' + ); -- cgit 1.4.1 From bc80b3f454aa9b9ca8bc710ff502b83892ac0a91 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Nov 2018 13:35:15 +0000 Subject: Add helpers for getting prev and auth events (#4139) * Add helpers for getting prev and auth events This is in preparation for allowing the event format to change between room versions. --- changelog.d/4139.misc | 1 + synapse/event_auth.py | 4 +-- synapse/events/__init__.py | 18 +++++++++++++ synapse/federation/transaction_queue.py | 4 +-- synapse/handlers/federation.py | 48 ++++++++++++++++----------------- synapse/state/__init__.py | 2 +- synapse/state/v2.py | 16 +++++------ synapse/storage/event_federation.py | 4 +-- synapse/storage/events.py | 8 +++--- tests/state/test_v2.py | 2 +- 10 files changed, 62 insertions(+), 45 deletions(-) create mode 100644 changelog.d/4139.misc (limited to 'synapse/storage') diff --git a/changelog.d/4139.misc b/changelog.d/4139.misc new file mode 100644 index 0000000000..d63d9e7003 --- /dev/null +++ b/changelog.d/4139.misc @@ -0,0 +1 @@ +Add helpers functions for getting prev and auth events of an event diff --git a/synapse/event_auth.py b/synapse/event_auth.py index d4d4474847..c81d8e6729 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -200,11 +200,11 @@ def _is_membership_change_allowed(event, auth_events): membership = event.content["membership"] # Check if this is the room creator joining: - if len(event.prev_events) == 1 and Membership.JOIN == membership: + if len(event.prev_event_ids()) == 1 and Membership.JOIN == membership: # Get room creation event: key = (EventTypes.Create, "", ) create = auth_events.get(key) - if create and event.prev_events[0][0] == create.event_id: + if create and event.prev_event_ids()[0] == create.event_id: if create.content["creator"] == event.state_key: return diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 12f1eb0a3e..84c75495d5 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -159,6 +159,24 @@ class EventBase(object): def keys(self): return six.iterkeys(self._event_dict) + def prev_event_ids(self): + """Returns the list of prev event IDs. The order matches the order + specified in the event, though there is no meaning to it. + + Returns: + list[str]: The list of event IDs of this event's prev_events + """ + return [e for e, _ in self.prev_events] + + def auth_event_ids(self): + """Returns the list of auth event IDs. The order matches the order + specified in the event, though there is no meaning to it. + + Returns: + list[str]: The list of event IDs of this event's auth_events + """ + return [e for e, _ in self.auth_events] + class FrozenEvent(EventBase): def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None): diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 3fdd63be95..099ace28c1 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -183,9 +183,7 @@ class TransactionQueue(object): # banned then it won't receive the event because it won't # be in the room after the ban. destinations = yield self.state.get_current_hosts_in_room( - event.room_id, latest_event_ids=[ - prev_id for prev_id, _ in event.prev_events - ], + event.room_id, latest_event_ids=event.prev_event_ids(), ) except Exception: logger.exception( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cd5b9bbb19..9ca5fd8724 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -239,7 +239,7 @@ class FederationHandler(BaseHandler): room_id, event_id, min_depth, ) - prevs = {e_id for e_id, _ in pdu.prev_events} + prevs = set(pdu.prev_event_ids()) seen = yield self.store.have_seen_events(prevs) if min_depth and pdu.depth < min_depth: @@ -607,7 +607,7 @@ class FederationHandler(BaseHandler): if e.event_id in seen_ids: continue e.internal_metadata.outlier = True - auth_ids = [e_id for e_id, _ in e.auth_events] + auth_ids = e.auth_event_ids() auth = { (e.type, e.state_key): e for e in auth_chain if e.event_id in auth_ids or e.type == EventTypes.Create @@ -726,7 +726,7 @@ class FederationHandler(BaseHandler): edges = [ ev.event_id for ev in events - if set(e_id for e_id, _ in ev.prev_events) - event_ids + if set(ev.prev_event_ids()) - event_ids ] logger.info( @@ -753,7 +753,7 @@ class FederationHandler(BaseHandler): required_auth = set( a_id for event in events + list(state_events.values()) + list(auth_events.values()) - for a_id, _ in event.auth_events + for a_id in event.auth_event_ids() ) auth_events.update({ e_id: event_map[e_id] for e_id in required_auth if e_id in event_map @@ -769,7 +769,7 @@ class FederationHandler(BaseHandler): auth_events.update(ret_events) required_auth.update( - a_id for event in ret_events.values() for a_id, _ in event.auth_events + a_id for event in ret_events.values() for a_id in event.auth_event_ids() ) missing_auth = required_auth - set(auth_events) @@ -796,7 +796,7 @@ class FederationHandler(BaseHandler): required_auth.update( a_id for event in results if event - for a_id, _ in event.auth_events + for a_id in event.auth_event_ids() ) missing_auth = required_auth - set(auth_events) @@ -816,7 +816,7 @@ class FederationHandler(BaseHandler): "auth_events": { (auth_events[a_id].type, auth_events[a_id].state_key): auth_events[a_id] - for a_id, _ in a.auth_events + for a_id in a.auth_event_ids() if a_id in auth_events } }) @@ -828,7 +828,7 @@ class FederationHandler(BaseHandler): "auth_events": { (auth_events[a_id].type, auth_events[a_id].state_key): auth_events[a_id] - for a_id, _ in event_map[e_id].auth_events + for a_id in event_map[e_id].auth_event_ids() if a_id in auth_events } }) @@ -1041,17 +1041,17 @@ class FederationHandler(BaseHandler): Raises: SynapseError if the event does not pass muster """ - if len(ev.prev_events) > 20: + if len(ev.prev_event_ids()) > 20: logger.warn("Rejecting event %s which has %i prev_events", - ev.event_id, len(ev.prev_events)) + ev.event_id, len(ev.prev_event_ids())) raise SynapseError( http_client.BAD_REQUEST, "Too many prev_events", ) - if len(ev.auth_events) > 10: + if len(ev.auth_event_ids()) > 10: logger.warn("Rejecting event %s which has %i auth_events", - ev.event_id, len(ev.auth_events)) + ev.event_id, len(ev.auth_event_ids())) raise SynapseError( http_client.BAD_REQUEST, "Too many auth_events", @@ -1076,7 +1076,7 @@ class FederationHandler(BaseHandler): def on_event_auth(self, event_id): event = yield self.store.get_event(event_id) auth = yield self.store.get_auth_chain( - [auth_id for auth_id, _ in event.auth_events], + [auth_id for auth_id in event.auth_event_ids()], include_given=True ) defer.returnValue([e for e in auth]) @@ -1698,7 +1698,7 @@ class FederationHandler(BaseHandler): missing_auth_events = set() for e in itertools.chain(auth_events, state, [event]): - for e_id, _ in e.auth_events: + for e_id in e.auth_event_ids(): if e_id not in event_map: missing_auth_events.add(e_id) @@ -1717,7 +1717,7 @@ class FederationHandler(BaseHandler): for e in itertools.chain(auth_events, state, [event]): auth_for_e = { (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id] - for e_id, _ in e.auth_events + for e_id in e.auth_event_ids() if e_id in event_map } if create_event: @@ -1785,10 +1785,10 @@ class FederationHandler(BaseHandler): # This is a hack to fix some old rooms where the initial join event # didn't reference the create event in its auth events. - if event.type == EventTypes.Member and not event.auth_events: - if len(event.prev_events) == 1 and event.depth < 5: + if event.type == EventTypes.Member and not event.auth_event_ids(): + if len(event.prev_event_ids()) == 1 and event.depth < 5: c = yield self.store.get_event( - event.prev_events[0][0], + event.prev_event_ids()[0], allow_none=True, ) if c and c.type == EventTypes.Create: @@ -1835,7 +1835,7 @@ class FederationHandler(BaseHandler): # Now get the current auth_chain for the event. local_auth_chain = yield self.store.get_auth_chain( - [auth_id for auth_id, _ in event.auth_events], + [auth_id for auth_id in event.auth_event_ids()], include_given=True ) @@ -1891,7 +1891,7 @@ class FederationHandler(BaseHandler): """ # Check if we have all the auth events. current_state = set(e.event_id for e in auth_events.values()) - event_auth_events = set(e_id for e_id, _ in event.auth_events) + event_auth_events = set(event.auth_event_ids()) if event.is_state(): event_key = (event.type, event.state_key) @@ -1935,7 +1935,7 @@ class FederationHandler(BaseHandler): continue try: - auth_ids = [e_id for e_id, _ in e.auth_events] + auth_ids = e.auth_event_ids() auth = { (e.type, e.state_key): e for e in remote_auth_chain if e.event_id in auth_ids or e.type == EventTypes.Create @@ -1956,7 +1956,7 @@ class FederationHandler(BaseHandler): pass have_events = yield self.store.get_seen_events_with_rejections( - [e_id for e_id, _ in event.auth_events] + event.auth_event_ids() ) seen_events = set(have_events.keys()) except Exception: @@ -2058,7 +2058,7 @@ class FederationHandler(BaseHandler): continue try: - auth_ids = [e_id for e_id, _ in ev.auth_events] + auth_ids = ev.auth_event_ids() auth = { (e.type, e.state_key): e for e in result["auth_chain"] @@ -2250,7 +2250,7 @@ class FederationHandler(BaseHandler): missing_remote_ids = [e.event_id for e in missing_remotes] base_remote_rejected = list(missing_remotes) for e in missing_remotes: - for e_id, _ in e.auth_events: + for e_id in e.auth_event_ids(): if e_id in missing_remote_ids: try: base_remote_rejected.remove(e) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 943d5d6bb5..70048b0c09 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -261,7 +261,7 @@ class StateHandler(object): logger.debug("calling resolve_state_groups from compute_event_context") entry = yield self.resolve_state_groups_for_events( - event.room_id, [e for e, _ in event.prev_events], + event.room_id, event.prev_event_ids(), ) prev_state_ids = entry.state diff --git a/synapse/state/v2.py b/synapse/state/v2.py index dbc9688c56..3573bb0028 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -159,7 +159,7 @@ def _get_power_level_for_sender(event_id, event_map, state_res_store): event = yield _get_event(event_id, event_map, state_res_store) pl = None - for aid, _ in event.auth_events: + for aid in event.auth_event_ids(): aev = yield _get_event(aid, event_map, state_res_store) if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""): pl = aev @@ -167,7 +167,7 @@ def _get_power_level_for_sender(event_id, event_map, state_res_store): if pl is None: # Couldn't find power level. Check if they're the creator of the room - for aid, _ in event.auth_events: + for aid in event.auth_event_ids(): aev = yield _get_event(aid, event_map, state_res_store) if (aev.type, aev.state_key) == (EventTypes.Create, ""): if aev.content.get("creator") == event.sender: @@ -299,7 +299,7 @@ def _add_event_and_auth_chain_to_graph(graph, event_id, event_map, graph.setdefault(eid, set()) event = yield _get_event(eid, event_map, state_res_store) - for aid, _ in event.auth_events: + for aid in event.auth_event_ids(): if aid in auth_diff: if aid not in graph: state.append(aid) @@ -369,7 +369,7 @@ def _iterative_auth_checks(event_ids, base_state, event_map, state_res_store): event = event_map[event_id] auth_events = {} - for aid, _ in event.auth_events: + for aid in event.auth_event_ids(): ev = yield _get_event(aid, event_map, state_res_store) if ev.rejected_reason is None: @@ -417,9 +417,9 @@ def _mainline_sort(event_ids, resolved_power_event_id, event_map, while pl: mainline.append(pl) pl_ev = yield _get_event(pl, event_map, state_res_store) - auth_events = pl_ev.auth_events + auth_events = pl_ev.auth_event_ids() pl = None - for aid, _ in auth_events: + for aid in auth_events: ev = yield _get_event(aid, event_map, state_res_store) if (ev.type, ev.state_key) == (EventTypes.PowerLevels, ""): pl = aid @@ -464,10 +464,10 @@ def _get_mainline_depth_for_event(event, mainline_map, event_map, state_res_stor if depth is not None: defer.returnValue(depth) - auth_events = event.auth_events + auth_events = event.auth_event_ids() event = None - for aid, _ in auth_events: + for aid in auth_events: aev = yield _get_event(aid, event_map, state_res_store) if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""): event = aev diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 3faca2a042..d3b9dea1d6 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -477,7 +477,7 @@ class EventFederationStore(EventFederationWorkerStore): "is_state": False, } for ev in events - for e_id, _ in ev.prev_events + for e_id in ev.prev_event_ids() ], ) @@ -510,7 +510,7 @@ class EventFederationStore(EventFederationWorkerStore): txn.executemany(query, [ (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False) - for ev in events for e_id, _ in ev.prev_events + for ev in events for e_id in ev.prev_event_ids() if not ev.internal_metadata.is_outlier() ]) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 919e855f3b..2047110b1d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -416,7 +416,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore ) if len_1: all_single_prev_not_state = all( - len(event.prev_events) == 1 + len(event.prev_event_ids()) == 1 and not event.is_state() for event, ctx in ev_ctx_rm ) @@ -440,7 +440,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # guess this by looking at the prev_events and checking # if they match the current forward extremities. for ev, _ in ev_ctx_rm: - prev_event_ids = set(e for e, _ in ev.prev_events) + prev_event_ids = set(ev.prev_event_ids()) if latest_event_ids == prev_event_ids: state_delta_reuse_delta_counter.inc() break @@ -551,7 +551,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore result.difference_update( e_id for event in new_events - for e_id, _ in event.prev_events + for e_id in event.prev_event_ids() ) # Finally, remove any events which are prev_events of any existing events. @@ -869,7 +869,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore "auth_id": auth_id, } for event, _ in events_and_contexts - for auth_id, _ in event.auth_events + for auth_id in event.auth_event_ids() if event.is_state() ], ) diff --git a/tests/state/test_v2.py b/tests/state/test_v2.py index d67f59b2c7..2e073a3afc 100644 --- a/tests/state/test_v2.py +++ b/tests/state/test_v2.py @@ -753,7 +753,7 @@ class TestStateResolutionStore(object): result.add(event_id) event = self.event_map[event_id] - for aid, _ in event.auth_events: + for aid in event.auth_event_ids(): stack.append(aid) return list(result) -- cgit 1.4.1 From b1a22b24ab7532da993e673f353dd87eeef49151 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Nov 2018 12:14:20 +0000 Subject: Fix noop checks when updating device keys Clients often reupload their device keys (for some reason) so its important for the server to check for no-ops before sending out device list update notifications. The check is broken in python 3 due to the fact comparing bytes and unicode always fails, and that we write bytes to the DB but get unicode when we read. --- synapse/storage/end_to_end_keys.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 1f1721e820..29281630c0 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -40,6 +40,11 @@ class EndToEndKeyStore(SQLBaseStore): allow_none=True, ) + if old_key_json and not isinstance(old_key_json, bytes): + # In py3 we need old_key_json to match new_key_json type. The DB + # returns unicode while encode_canonical_json returns bytes + old_key_json = old_key_json.encode("utf-8") + new_key_json = encode_canonical_json(device_keys) if old_key_json == new_key_json: return False -- cgit 1.4.1 From 5ebed186926eee77844730f5270a926417a0be09 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Nov 2018 12:33:13 +0000 Subject: Lets convert bytes to unicode instead --- synapse/storage/end_to_end_keys.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 29281630c0..2a0f6cfca9 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -40,12 +40,10 @@ class EndToEndKeyStore(SQLBaseStore): allow_none=True, ) - if old_key_json and not isinstance(old_key_json, bytes): - # In py3 we need old_key_json to match new_key_json type. The DB - # returns unicode while encode_canonical_json returns bytes - old_key_json = old_key_json.encode("utf-8") + # In py3 we need old_key_json to match new_key_json type. The DB + # returns unicode while encode_canonical_json returns bytes. + new_key_json = encode_canonical_json(device_keys).decode("utf-8") - new_key_json = encode_canonical_json(device_keys) if old_key_json == new_key_json: return False -- cgit 1.4.1