From dae6dc1e776cc6198dc1f71575876fc16693c170 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 23 Jul 2018 17:13:34 +0100 Subject: Remove redundant checks on room forgottenness Fixes #3550 --- synapse/storage/push_rule.py | 13 ------------- synapse/storage/roommember.py | 15 --------------- 2 files changed, 28 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index be655d287b..d25b39ec02 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -21,7 +21,6 @@ from canonicaljson import json from twisted.internet import defer -from synapse.api.constants import EventTypes from synapse.push.baserules import list_with_base_rules from synapse.storage.appservice import ApplicationServiceWorkerStore from synapse.storage.pusher import PusherWorkerStore @@ -247,18 +246,6 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore, if uid in local_users_in_room: user_ids.add(uid) - forgotten = yield self.who_forgot_in_room( - event.room_id, on_invalidate=cache_context.invalidate, - ) - - for row in forgotten: - user_id = row["user_id"] - event_id = row["event_id"] - - mem_id = current_state_ids.get((EventTypes.Member, user_id), None) - if event_id == mem_id: - user_ids.discard(user_id) - rules_by_user = yield self.bulk_get_push_rules( user_ids, on_invalidate=cache_context.invalidate, ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 02a802bed9..9eb5b18144 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -458,18 +458,6 @@ class RoomMemberWorkerStore(EventsWorkerStore): def _get_joined_hosts_cache(self, room_id): return _JoinedHostsCache(self, room_id) - @cached() - def who_forgot_in_room(self, room_id): - return self._simple_select_list( - table="room_memberships", - retcols=("user_id", "event_id"), - keyvalues={ - "room_id": room_id, - "forgotten": 1, - }, - desc="who_forgot" - ) - class RoomMemberStore(RoomMemberWorkerStore): def __init__(self, db_conn, hs): @@ -578,9 +566,6 @@ class RoomMemberStore(RoomMemberWorkerStore): txn.execute(sql, (user_id, room_id)) txn.call_after(self.did_forget.invalidate, (user_id, room_id)) - self._invalidate_cache_and_stream( - txn, self.who_forgot_in_room, (room_id,) - ) return self.runInteraction("forget_membership", f) @cachedInlineCallbacks(num_args=2) -- cgit 1.4.1 From 223341205edf725c5583a73d5680245a24790452 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 11:59:16 +0100 Subject: Don't require to_delete to have event_ids --- synapse/storage/events.py | 91 ++++++++++++++++++++++++++++------------------- 1 file changed, 55 insertions(+), 36 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 19c05dc8d6..87d0b78a2d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,7 @@ import logging from collections import OrderedDict, deque, namedtuple from functools import wraps -from six import iteritems, itervalues +from six import iteritems from six.moves import range from canonicaljson import json @@ -347,8 +347,9 @@ class EventsStore(EventsWorkerStore): # state in each room after adding these events current_state_for_room = {} - # map room_id->(to_delete, to_insert) where each entry is - # a map (type,key)->event_id giving the state delta in each + # map room_id->(to_delete, to_insert) where to_delete is a list + # of type/state keys to remove from current state, and to_insert + # is a map (type,key)->event_id giving the state delta in each # room state_delta_for_room = {} @@ -647,17 +648,16 @@ class EventsStore(EventsWorkerStore): Assumes that we are only persisting events for one room at a time. Returns: - 2-tuple (to_delete, to_insert) where both are state dicts, - i.e. (type, state_key) -> event_id. `to_delete` are the entries to - first be deleted from current_state_events, `to_insert` are entries - to insert. + tuple[list, dict] (to_delete, to_insert): where to_delete are the + type/state_keys to remove from current_state_events and `to_insert` + are the updates to current_state_events. """ existing_state = yield self.get_current_state_ids(room_id) - to_delete = { - key: ev_id for key, ev_id in iteritems(existing_state) - if ev_id != current_state.get(key) - } + to_delete = [ + key for key, ev_id in iteritems(existing_state) + if key not in current_state + ] to_insert = { key: ev_id for key, ev_id in iteritems(current_state) @@ -684,10 +684,10 @@ class EventsStore(EventsWorkerStore): delete_existing (bool): True to purge existing table rows for the events from the database. This is useful when retrying due to IntegrityError. - state_delta_for_room (dict[str, (list[str], list[str])]): + state_delta_for_room (dict[str, (list, dict)]): The current-state delta for each room. For each room, a tuple - (to_delete, to_insert), being a list of event ids to be removed - from the current state, and a list of event ids to be added to + (to_delete, to_insert), being a list of type/state keys to be + removed from the current state, and a state set to be added to the current state. new_forward_extremeties (dict[str, list[str]]): The new forward extremities for each room. For each room, a @@ -765,9 +765,46 @@ class EventsStore(EventsWorkerStore): def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): for room_id, current_state_tuple in iteritems(state_delta_by_room): to_delete, to_insert = current_state_tuple + + # First we add entries to the current_state_delta_stream. We + # do this before updating the current_state_events table so + # that we can use it to calculate the `prev_event_id`. (This + # allows us to not have to pull out the existing state + # unnecessarily). + sql = """ + INSERT INTO current_state_delta_stream + (stream_id, room_id, type, state_key, event_id, prev_event_id) + SELECT ?, ?, ?, ?, ?, ( + SELECT event_id FROM current_state_events + WHERE room_id = ? AND type = ? AND state_key = ? + ) + """ + txn.executemany(sql, ( + ( + max_stream_order, room_id, etype, state_key, None, + room_id, etype, state_key, + ) + for etype, state_key in to_delete + # We sanity check that we're deleting rather than updating + if (etype, state_key) not in to_insert + )) + txn.executemany(sql, ( + ( + max_stream_order, room_id, etype, state_key, ev_id, + room_id, etype, state_key, + ) + for (etype, state_key), ev_id in iteritems(to_insert) + )) + + # Now we actually update the current_state_events table + txn.executemany( - "DELETE FROM current_state_events WHERE event_id = ?", - [(ev_id,) for ev_id in itervalues(to_delete)], + "DELETE FROM current_state_events" + " WHERE room_id = ? AND type = ? AND state_key = ?", + ( + (room_id, etype, state_key) + for etype, state_key in itertools.chain(to_delete, to_insert) + ), ) self._simple_insert_many_txn( @@ -784,25 +821,6 @@ class EventsStore(EventsWorkerStore): ], ) - state_deltas = {key: None for key in to_delete} - state_deltas.update(to_insert) - - self._simple_insert_many_txn( - txn, - table="current_state_delta_stream", - values=[ - { - "stream_id": max_stream_order, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - "event_id": ev_id, - "prev_event_id": to_delete.get(key, None), - } - for key, ev_id in iteritems(state_deltas) - ] - ) - txn.call_after( self._curr_state_delta_stream_cache.entity_has_changed, room_id, max_stream_order, @@ -816,7 +834,8 @@ class EventsStore(EventsWorkerStore): # and which we have added, then we invlidate the caches for all # those users. members_changed = set( - state_key for ev_type, state_key in state_deltas + state_key + for ev_type, state_key in itertools.chain(to_delete, to_insert) if ev_type == EventTypes.Member ) -- cgit 1.4.1 From a79410e7b8dce3fb8b4b40901a66fd9fde9c47a4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 11:13:47 +0100 Subject: Have _get_new_state_after_events return delta If we have a delta from the existing to new current state, then we can reuse that rather than manually working it out by fetching both lots of state. --- synapse/storage/events.py | 65 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 17 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 87d0b78a2d..6d2aae9c4a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -344,7 +344,9 @@ class EventsStore(EventsWorkerStore): new_forward_extremeties = {} # map room_id->(type,state_key)->event_id tracking the full - # state in each room after adding these events + # state in each room after adding these events. + # This is simply used to prefill the get_current_state_ids + # cache current_state_for_room = {} # map room_id->(to_delete, to_insert) where to_delete is a list @@ -419,28 +421,40 @@ class EventsStore(EventsWorkerStore): logger.info( "Calculating state delta for room %s", room_id, ) - with Measure( - self._clock, - "persist_events.get_new_state_after_events", + self._clock, + "persist_events.get_new_state_after_events", ): - current_state = yield self._get_new_state_after_events( + res = yield self._get_new_state_after_events( room_id, ev_ctx_rm, latest_event_ids, new_latest_event_ids, ) - - if current_state is not None: - current_state_for_room[room_id] = current_state + current_state, delta_ids = res + + # If either are not None then there has been a change, + # and we need to work out the delta (or use that + # given) + if delta_ids is not None: + # If there is a delta we know that we've + # only added or replaced state, never + # removed keys entirely. + state_delta_for_room[room_id] = ([], delta_ids) + elif current_state is not None: with Measure( - self._clock, - "persist_events.calculate_state_delta", + self._clock, + "persist_events.calculate_state_delta", ): delta = yield self._calculate_state_delta( room_id, current_state, ) - state_delta_for_room[room_id] = delta + state_delta_for_room[room_id] = delta + + # If we have the current_state then lets prefill + # the cache with it. + if current_state is not None: + current_state_for_room[room_id] = current_state yield self.runInteraction( "persist_events", @@ -539,9 +553,10 @@ class EventsStore(EventsWorkerStore): the new forward extremities for the room. Returns: - Deferred[dict[(str,str), str]|None]: - None if there are no changes to the room state, or - a dict of (type, state_key) -> event_id]. + Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]: + Returns a tuple of two state maps, the first being the full new current + state and the second being the delta to the existing current state. + If both are None then there has been no change. """ if not new_latest_event_ids: @@ -549,6 +564,9 @@ class EventsStore(EventsWorkerStore): # map from state_group to ((type, key) -> event_id) state map state_groups_map = {} + + state_group_deltas = {} + for ev, ctx in events_context: if ctx.state_group is None: # I don't think this can happen, but let's double-check @@ -567,6 +585,9 @@ class EventsStore(EventsWorkerStore): if current_state_ids is not None: state_groups_map[ctx.state_group] = current_state_ids + if ctx.prev_group: + state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids + # We need to map the event_ids to their state groups. First, let's # check if the event is one we're persisting, in which case we can # pull the state group from its context. @@ -608,7 +629,7 @@ class EventsStore(EventsWorkerStore): # If they old and new groups are the same then we don't need to do # anything. if old_state_groups == new_state_groups: - return + defer.returnValue((None, None)) # Now that we have calculated new_state_groups we need to get # their state IDs so we can resolve to a single state set. @@ -620,7 +641,17 @@ class EventsStore(EventsWorkerStore): if len(new_state_groups) == 1: # If there is only one state group, then we know what the current # state is. - defer.returnValue(state_groups_map[new_state_groups.pop()]) + new_state_group = new_state_groups.pop() + + delta_ids = None + if len(old_state_groups) == 1: + old_state_group = old_state_groups.pop() + + delta_ids = state_group_deltas.get( + (old_state_group, new_state_group,), None + ) + + defer.returnValue((state_groups_map[new_state_group], delta_ids)) # Ok, we need to defer to the state handler to resolve our state sets. @@ -639,7 +670,7 @@ class EventsStore(EventsWorkerStore): room_id, state_groups, events_map, get_events ) - defer.returnValue(res.state) + defer.returnValue((res.state, None)) @defer.inlineCallbacks def _calculate_state_delta(self, room_id, current_state): -- cgit 1.4.1 From 811ac73a42db16db7211a7517ad5651a1f6aee71 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 13:40:42 +0100 Subject: Don't fetch state from the database unless needed --- synapse/storage/events.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6d2aae9c4a..eb8bbe13ab 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -631,27 +631,33 @@ class EventsStore(EventsWorkerStore): if old_state_groups == new_state_groups: defer.returnValue((None, None)) - # Now that we have calculated new_state_groups we need to get - # their state IDs so we can resolve to a single state set. - missing_state = new_state_groups - set(state_groups_map) - if missing_state: - group_to_state = yield self._get_state_for_groups(missing_state) - state_groups_map.update(group_to_state) - if len(new_state_groups) == 1: # If there is only one state group, then we know what the current # state is. - new_state_group = new_state_groups.pop() - - delta_ids = None if len(old_state_groups) == 1: - old_state_group = old_state_groups.pop() + new_state_group = next(iter(new_state_groups)) + old_state_group = next(iter(old_state_groups)) delta_ids = state_group_deltas.get( (old_state_group, new_state_group,), None ) + if delta_ids is not None: + # We have a delta from the existing to new current state, + # so lets just return that. If we happen to already have + # the current state in memory then lets also return that, + # but it doesn't matter if we don't. + new_state = state_groups_map.get(new_state_group) + defer.returnValue((new_state, delta_ids)) - defer.returnValue((state_groups_map[new_state_group], delta_ids)) + # Now that we have calculated new_state_groups we need to get + # their state IDs so we can resolve to a single state set. + missing_state = new_state_groups - set(state_groups_map) + if missing_state: + group_to_state = yield self._get_state_for_groups(missing_state) + state_groups_map.update(group_to_state) + + if len(new_state_groups) == 1: + defer.returnValue((state_groups_map[new_state_groups.pop()], None)) # Ok, we need to defer to the state handler to resolve our state sets. -- cgit 1.4.1 From ed0dd68731fa1549b4ea9b2dab08c2afb6ccb64e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 14:31:38 +0100 Subject: Fixup comment (and indent) --- synapse/storage/events.py | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index eb8bbe13ab..76cfbc90fe 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -565,6 +565,7 @@ class EventsStore(EventsWorkerStore): # map from state_group to ((type, key) -> event_id) state map state_groups_map = {} + # Map from (prev state group, new state group) -> delta state dict state_group_deltas = {} for ev, ctx in events_context: @@ -631,23 +632,24 @@ class EventsStore(EventsWorkerStore): if old_state_groups == new_state_groups: defer.returnValue((None, None)) - if len(new_state_groups) == 1: - # If there is only one state group, then we know what the current - # state is. - if len(old_state_groups) == 1: - new_state_group = next(iter(new_state_groups)) - old_state_group = next(iter(old_state_groups)) + if len(new_state_groups) == 1 and len(old_state_groups) == 1: + # If we're going from one state group to another, lets check if + # we have a delta for that transition. If we do then we can just + # return that. - delta_ids = state_group_deltas.get( - (old_state_group, new_state_group,), None - ) - if delta_ids is not None: - # We have a delta from the existing to new current state, - # so lets just return that. If we happen to already have - # the current state in memory then lets also return that, - # but it doesn't matter if we don't. - new_state = state_groups_map.get(new_state_group) - defer.returnValue((new_state, delta_ids)) + new_state_group = next(iter(new_state_groups)) + old_state_group = next(iter(old_state_groups)) + + delta_ids = state_group_deltas.get( + (old_state_group, new_state_group,), None + ) + if delta_ids is not None: + # We have a delta from the existing to new current state, + # so lets just return that. If we happen to already have + # the current state in memory then lets also return that, + # but it doesn't matter if we don't. + new_state = state_groups_map.get(new_state_group) + defer.returnValue((new_state, delta_ids)) # Now that we have calculated new_state_groups we need to get # their state IDs so we can resolve to a single state set. @@ -657,6 +659,8 @@ class EventsStore(EventsWorkerStore): state_groups_map.update(group_to_state) if len(new_state_groups) == 1: + # If there is only one state group, then we know what the current + # state is. defer.returnValue((state_groups_map[new_state_groups.pop()], None)) # Ok, we need to defer to the state handler to resolve our state sets. -- cgit 1.4.1 From 8f65ab98d2ab35bbb29c23cc7978d54b12fd3db0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 15:08:01 +0100 Subject: Remove unnecessary iteritems --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 76cfbc90fe..9c94d3454f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -696,7 +696,7 @@ class EventsStore(EventsWorkerStore): existing_state = yield self.get_current_state_ids(room_id) to_delete = [ - key for key, ev_id in iteritems(existing_state) + key for key in existing_state if key not in current_state ] -- cgit 1.4.1 From 709c309b0e1756afd8b4514b95b0e901c567a0f1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 15:12:50 +0100 Subject: Expand on docstring comment about return value --- synapse/storage/events.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9c94d3454f..906a405031 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -557,6 +557,11 @@ class EventsStore(EventsWorkerStore): Returns a tuple of two state maps, the first being the full new current state and the second being the delta to the existing current state. If both are None then there has been no change. + + If there has been a change then we only return the delta if its + already been calculated. Conversely if we do know the delta then + the new current state is only returned if we've already calculated + it. """ if not new_latest_event_ids: -- cgit 1.4.1 From 3188973857d5bab5c1faf968917ad0ced42b5a0d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 16:20:14 +0100 Subject: Pull out did_forget to worker store --- synapse/app/synchrotron.py | 5 +---- synapse/storage/roommember.py | 52 ++++++++++++++++++++++--------------------- 2 files changed, 28 insertions(+), 29 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 26b9ec85f2..e201f18efd 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -55,7 +55,6 @@ from synapse.rest.client.v2_alpha import sync from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState -from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole @@ -81,9 +80,7 @@ class SynchrotronSlavedStore( RoomStore, BaseSlavedStore, ): - did_forget = ( - RoomMemberStore.__dict__["did_forget"] - ) + pass UPDATE_SYNCING_USERS_MS = 10 * 1000 diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 01697ab2c9..027bf8c85e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -461,6 +461,30 @@ class RoomMemberWorkerStore(EventsWorkerStore): def _get_joined_hosts_cache(self, room_id): return _JoinedHostsCache(self, room_id) + @cachedInlineCallbacks(num_args=2) + def did_forget(self, user_id, room_id): + """Returns whether user_id has elected to discard history for room_id. + + Returns False if they have since re-joined.""" + def f(txn): + sql = ( + "SELECT" + " COUNT(*)" + " FROM" + " room_memberships" + " WHERE" + " user_id = ?" + " AND" + " room_id = ?" + " AND" + " forgotten = 0" + ) + txn.execute(sql, (user_id, room_id)) + rows = txn.fetchall() + return rows[0][0] + count = yield self.runInteraction("did_forget_membership", f) + defer.returnValue(count == 0) + class RoomMemberStore(RoomMemberWorkerStore): def __init__(self, db_conn, hs): @@ -568,32 +592,10 @@ class RoomMemberStore(RoomMemberWorkerStore): ) txn.execute(sql, (user_id, room_id)) - txn.call_after(self.did_forget.invalidate, (user_id, room_id)) - return self.runInteraction("forget_membership", f) - - @cachedInlineCallbacks(num_args=2) - def did_forget(self, user_id, room_id): - """Returns whether user_id has elected to discard history for room_id. - - Returns False if they have since re-joined.""" - def f(txn): - sql = ( - "SELECT" - " COUNT(*)" - " FROM" - " room_memberships" - " WHERE" - " user_id = ?" - " AND" - " room_id = ?" - " AND" - " forgotten = 0" + self._invalidate_cache_and_stream( + txn, self.did_forget, (user_id, room_id,), ) - txn.execute(sql, (user_id, room_id)) - rows = txn.fetchall() - return rows[0][0] - count = yield self.runInteraction("did_forget_membership", f) - defer.returnValue(count == 0) + return self.runInteraction("forget_membership", f) @defer.inlineCallbacks def _background_add_membership_profile(self, progress, batch_size): -- cgit 1.4.1 From ec56121b0d099824a1455260e05904f081b6d14a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 Jul 2018 09:35:02 +0100 Subject: Correctly handle outliers during persist events We incorrectly asserted that all contexts must have a non None state group without consider outliers. This would usually be fine as the assertion would never be hit, as there is a shortcut during persistence if the forward extremities don't change. However, if the outlier is being persisted with non-outlier events, the function would be called and the assertion would be hit. Fixes #3601 --- synapse/storage/events.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 906a405031..bd05f23b06 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -575,11 +575,12 @@ class EventsStore(EventsWorkerStore): for ev, ctx in events_context: if ctx.state_group is None: - # I don't think this can happen, but let's double-check - raise Exception( - "Context for new extremity event %s has no state " - "group" % (ev.event_id, ), - ) + # This should only happen for outlier events. + if not event.internal_metadata.is_outlier(): + raise Exception( + "Context for new event %s has no state " + "group" % (ev.event_id, ), + ) if ctx.state_group in state_groups_map: continue @@ -607,7 +608,7 @@ class EventsStore(EventsWorkerStore): for event_id in new_latest_event_ids: # First search in the list of new events we're adding. for ev, ctx in events_context: - if event_id == ev.event_id: + if event_id == ev.event_id and ctx.state_group is not None: event_id_to_state_group[event_id] = ctx.state_group break else: -- cgit 1.4.1 From 371da42ae4ec093f3887e9485d149cdfbedd4e58 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 25 Jul 2018 09:41:12 +0100 Subject: Wrap a number of things that run in the background This will reduce the number of "Starting db connection from sentinel context" warnings, and will help with our metrics. --- synapse/app/homeserver.py | 13 ++++++++++--- synapse/groups/attestations.py | 6 +++++- synapse/replication/tcp/resource.py | 14 ++++++++------ synapse/rest/media/v1/media_repository.py | 8 +++++++- synapse/rest/media/v1/preview_url_resource.py | 8 +++++++- synapse/storage/devices.py | 8 ++++++-- synapse/storage/event_federation.py | 9 ++++++--- synapse/storage/event_push_actions.py | 13 +++++++++---- synapse/storage/transactions.py | 6 +++++- 9 files changed, 63 insertions(+), 22 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 2ad1beb8d8..b7e7718290 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -49,6 +49,7 @@ from synapse.http.additional_resource import AdditionalResource from synapse.http.server import RootRedirect from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.module_api import ModuleApi from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements @@ -427,6 +428,9 @@ def run(hs): # currently either 0 or 1 stats_process = [] + def start_phone_stats_home(): + run_as_background_process("phone_stats_home", phone_stats_home) + @defer.inlineCallbacks def phone_stats_home(): logger.info("Gathering stats for reporting") @@ -498,7 +502,10 @@ def run(hs): ) def generate_user_daily_visit_stats(): - hs.get_datastore().generate_user_daily_visits() + run_as_background_process( + "generate_user_daily_visits", + hs.get_datastore().generate_user_daily_visits, + ) # Rather than update on per session basis, batch up the requests. # If you increase the loop period, the accuracy of user_daily_visits @@ -507,7 +514,7 @@ def run(hs): if hs.config.report_stats: logger.info("Scheduling stats reporting for 3 hour intervals") - clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000) + clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000) # We need to defer this init for the cases that we daemonize # otherwise the process ID we get is that of the non-daemon process @@ -515,7 +522,7 @@ def run(hs): # We wait 5 minutes to send the first set of stats as the server can # be quite busy the first few minutes - clock.call_later(5 * 60, phone_stats_home) + clock.call_later(5 * 60, start_phone_stats_home) if hs.config.daemonize and hs.config.print_pidfile: print (hs.config.pid_file) diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 47452700a8..4216af0a27 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -43,6 +43,7 @@ from signedjson.sign import sign_json from twisted.internet import defer from synapse.api.errors import SynapseError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import get_domain_from_id from synapse.util.logcontext import run_in_background @@ -129,7 +130,7 @@ class GroupAttestionRenewer(object): self.attestations = hs.get_groups_attestation_signing() self._renew_attestations_loop = self.clock.looping_call( - self._renew_attestations, 30 * 60 * 1000, + self._start_renew_attestations, 30 * 60 * 1000, ) @defer.inlineCallbacks @@ -151,6 +152,9 @@ class GroupAttestionRenewer(object): defer.returnValue({}) + def _start_renew_attestations(self): + run_as_background_process("renew_attestations", self._renew_attestations) + @defer.inlineCallbacks def _renew_attestations(self): """Called periodically to check if we need to update any of our attestations diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 611fb66e1d..fd59f1595f 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -25,6 +25,7 @@ from twisted.internet import defer from twisted.internet.protocol import Factory from synapse.metrics import LaterGauge +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.metrics import Measure, measure_func from .protocol import ServerReplicationStreamProtocol @@ -117,7 +118,6 @@ class ReplicationStreamer(object): for conn in self.connections: conn.send_error("server shutting down") - @defer.inlineCallbacks def on_notifier_poke(self): """Checks if there is actually any new data and sends it to the connections if there are. @@ -132,14 +132,16 @@ class ReplicationStreamer(object): stream.discard_updates_and_advance() return - # If we're in the process of checking for new updates, mark that fact - # and return + self.pending_updates = True + if self.is_looping: - logger.debug("Noitifier poke loop already running") - self.pending_updates = True + logger.debug("Notifier poke loop already running") return - self.pending_updates = True + run_as_background_process("replication_notifier", self._run_notifier_loop) + + @defer.inlineCallbacks + def _run_notifier_loop(self): self.is_looping = True try: diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 30242c525a..5b13378caa 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -35,6 +35,7 @@ from synapse.api.errors import ( SynapseError, ) from synapse.http.matrixfederationclient import MatrixFederationHttpClient +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.async import Linearizer from synapse.util.logcontext import make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination @@ -100,10 +101,15 @@ class MediaRepository(object): ) self.clock.looping_call( - self._update_recently_accessed, + self._start_update_recently_accessed, UPDATE_RECENTLY_ACCESSED_TS, ) + def _start_update_recently_accessed(self): + run_as_background_process( + "update_recently_accessed_media", self._update_recently_accessed, + ) + @defer.inlineCallbacks def _update_recently_accessed(self): remote_media = self.recently_accessed_remotes diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index b70b15c4c2..4efd5339a4 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -41,6 +41,7 @@ from synapse.http.server import ( wrap_json_request_handler, ) from synapse.http.servlet import parse_integer, parse_string +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.async import ObservableDeferred from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background @@ -81,7 +82,7 @@ class PreviewUrlResource(Resource): self._cache.start() self._cleaner_loop = self.clock.looping_call( - self._expire_url_cache_data, 10 * 1000 + self._start_expire_url_cache_data, 10 * 1000, ) def render_OPTIONS(self, request): @@ -371,6 +372,11 @@ class PreviewUrlResource(Resource): "etag": headers["ETag"][0] if "ETag" in headers else None, }) + def _start_expire_url_cache_data(self): + run_as_background_process( + "expire_url_cache_data", self._expire_url_cache_data, + ) + @defer.inlineCallbacks def _expire_url_cache_data(self): """Clean up expired url cache content, media and thumbnails. diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index cc3cdf2ebc..52dccb1507 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -21,6 +21,7 @@ from canonicaljson import json from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList from ._base import Cache, SQLBaseStore @@ -711,6 +712,9 @@ class DeviceStore(SQLBaseStore): logger.info("Pruned %d device list outbound pokes", txn.rowcount) - return self.runInteraction( - "_prune_old_outbound_device_pokes", _prune_txn + run_as_background_process( + "prune_old_outbound_device_pokes", + self.runInteraction, + "_prune_old_outbound_device_pokes", + _prune_txn, ) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 8d366d1b91..65f2d19e20 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -23,6 +23,7 @@ from unpaddedbase64 import encode_base64 from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore from synapse.storage.events import EventsWorkerStore from synapse.storage.signatures import SignatureWorkerStore @@ -446,7 +447,7 @@ class EventFederationStore(EventFederationWorkerStore): ) hs.get_clock().looping_call( - self._delete_old_forward_extrem_cache, 60 * 60 * 1000 + self._delete_old_forward_extrem_cache, 60 * 60 * 1000, ) def _update_min_depth_for_room_txn(self, txn, room_id, depth): @@ -548,9 +549,11 @@ class EventFederationStore(EventFederationWorkerStore): sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago,) ) - return self.runInteraction( + run_as_background_process( + "delete_old_forward_extrem_cache", + self.runInteraction, "_delete_old_forward_extrem_cache", - _delete_old_forward_extrem_cache_txn + _delete_old_forward_extrem_cache_txn, ) def clean_room_for_join(self, room_id): diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 29b511ae5e..4f44b0ad47 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -22,6 +22,7 @@ from canonicaljson import json from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import LoggingTransaction, SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks @@ -458,11 +459,12 @@ class EventPushActionsWorkerStore(SQLBaseStore): "Error removing push actions after event persistence failure", ) - @defer.inlineCallbacks def _find_stream_orderings_for_times(self): - yield self.runInteraction( + run_as_background_process( + "event_push_action_stream_orderings", + self.runInteraction, "_find_stream_orderings_for_times", - self._find_stream_orderings_for_times_txn + self._find_stream_orderings_for_times_txn, ) def _find_stream_orderings_for_times_txn(self, txn): @@ -604,7 +606,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): self._doing_notif_rotation = False self._rotate_notif_loop = self._clock.looping_call( - self._rotate_notifs, 30 * 60 * 1000 + self._start_rotate_notifs, 30 * 60 * 1000, ) def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts, @@ -787,6 +789,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore): WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? """, (room_id, user_id, stream_ordering)) + def _start_rotate_notifs(self): + run_as_background_process("rotate_notifs", self._rotate_notifs) + @defer.inlineCallbacks def _rotate_notifs(self): if self._doing_notif_rotation or self.stream_ordering_day_ago is None: diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index c3bc94f56d..b4b479d94c 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -22,6 +22,7 @@ from canonicaljson import encode_canonical_json, json from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches.descriptors import cached from ._base import SQLBaseStore @@ -57,7 +58,7 @@ class TransactionStore(SQLBaseStore): def __init__(self, db_conn, hs): super(TransactionStore, self).__init__(db_conn, hs) - self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) + self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000) def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have @@ -271,6 +272,9 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) + def _start_cleanup_transactions(self): + run_as_background_process("cleanup_transactions", self._cleanup_transactions) + def _cleanup_transactions(self): now = self._clock.time_msec() month_ago = now - 30 * 24 * 60 * 60 * 1000 -- cgit 1.4.1 From a297ff2b16094f358cc8432f1db660ca539f8b4d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 Jul 2018 09:48:01 +0100 Subject: Fix typo in conditional --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index bd05f23b06..5024c4714d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -576,7 +576,7 @@ class EventsStore(EventsWorkerStore): for ev, ctx in events_context: if ctx.state_group is None: # This should only happen for outlier events. - if not event.internal_metadata.is_outlier(): + if not ev.internal_metadata.is_outlier(): raise Exception( "Context for new event %s has no state " "group" % (ev.event_id, ), -- cgit 1.4.1 From 07defd5fe6cc6f645195ddf0d679290bb214ca73 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 25 Jul 2018 10:47:57 +0100 Subject: Fix another logcontext leak in _persist_events We need to run the errback in the sentinel context to avoid losing our own context. Also: add logging to runInteraction to help identify where "Starting db connection from sentinel context" warnings are coming from --- changelog.d/3606.misc | 1 + synapse/storage/_base.py | 6 ++++++ synapse/storage/events.py | 9 ++++----- 3 files changed, 11 insertions(+), 5 deletions(-) create mode 100644 changelog.d/3606.misc (limited to 'synapse/storage') diff --git a/changelog.d/3606.misc b/changelog.d/3606.misc new file mode 100644 index 0000000000..f0137766a0 --- /dev/null +++ b/changelog.d/3606.misc @@ -0,0 +1 @@ +Fix some random logcontext leaks. \ No newline at end of file diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1d41d8d445..44f37b4c1e 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -311,6 +311,12 @@ class SQLBaseStore(object): after_callbacks = [] exception_callbacks = [] + if LoggingContext.current_context() == LoggingContext.sentinel: + logger.warn( + "Starting db txn '%s' from sentinel context", + desc, + ) + try: result = yield self.runWithConnection( self._new_transaction, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 906a405031..ee9e4d4b65 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -142,15 +142,14 @@ class _EventPeristenceQueue(object): try: queue = self._get_drainining_queue(room_id) for item in queue: - # handle_queue_loop runs in the sentinel logcontext, so - # there is no need to preserve_fn when running the - # callbacks on the deferred. try: ret = yield per_item_callback(item) + except Exception: + with PreserveLoggingContext(): + item.deferred.errback() + else: with PreserveLoggingContext(): item.deferred.callback(ret) - except Exception: - item.deferred.errback() finally: queue = self._event_persist_queues.pop(room_id, None) if queue: -- cgit 1.4.1 From 1be94440d38ad6af64486ce31c650d0540d4049c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 25 Jul 2018 11:01:36 +0100 Subject: Fix occasional 'tuple index out of range' error This fixes a bug in _delete_existing_rows_txn which was introduced in #3435 (though it's been on matrix-org-hotfixes for *years*). This code is only called when there is some sort of conflict the first time we try to persist an event, so it only happens rarely. Still, the exceptions are annoying. --- changelog.d/3607.bugfix | 1 + synapse/storage/events.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/3607.bugfix (limited to 'synapse/storage') diff --git a/changelog.d/3607.bugfix b/changelog.d/3607.bugfix new file mode 100644 index 0000000000..7ad64593b8 --- /dev/null +++ b/changelog.d/3607.bugfix @@ -0,0 +1 @@ +Fix 'tuple index out of range' error \ No newline at end of file diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 906a405031..f7c4226ea2 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1137,7 +1137,7 @@ class EventsStore(EventsWorkerStore): ): txn.executemany( "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,), - [(ev.event_id,) for ev, _ in events_and_contexts] + [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts] ) def _store_event_txn(self, txn, events_and_contexts): -- cgit 1.4.1 From 7780a7b47ca0fa039886dda8f6ca84bb197e9425 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 Jul 2018 11:13:20 +0100 Subject: Actually fix it by adding continue --- synapse/storage/events.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5024c4714d..d7fea0986b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -581,6 +581,7 @@ class EventsStore(EventsWorkerStore): "Context for new event %s has no state " "group" % (ev.event_id, ), ) + continue if ctx.state_group in state_groups_map: continue -- cgit 1.4.1