diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/events.py | 189 |
1 files changed, 100 insertions, 89 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index af56f1ee57..bbb6aa992c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -27,7 +27,6 @@ from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError -from synapse.state import resolve_events_with_factory from synapse.util.caches.descriptors import cached from synapse.types import get_domain_from_id @@ -237,6 +236,8 @@ class EventsStore(SQLBaseStore): self._event_persist_queue = _EventPeristenceQueue() + self._state_resolution_handler = hs.get_state_resolution_handler() + def persist_events(self, events_and_contexts, backfilled=False): """ Write events to the database @@ -402,6 +403,7 @@ class EventsStore(SQLBaseStore): "Calculating state delta for room %s", room_id, ) current_state = yield self._get_new_state_after_events( + room_id, ev_ctx_rm, new_latest_event_ids, ) if current_state is not None: @@ -487,11 +489,14 @@ class EventsStore(SQLBaseStore): defer.returnValue(new_latest_event_ids) @defer.inlineCallbacks - def _get_new_state_after_events(self, events_context, new_latest_event_ids): + def _get_new_state_after_events(self, room_id, events_context, new_latest_event_ids): """Calculate the current state dict after adding some new events to a room Args: + room_id (str): + room to which the events are being added. Used for logging etc + events_context (list[(EventBase, EventContext)]): events and contexts which are being added to the room @@ -503,8 +508,12 @@ class EventsStore(SQLBaseStore): None if there are no changes to the room state, or a dict of (type, state_key) -> event_id]. """ - state_sets = [] - state_groups = set() + + if not new_latest_event_ids: + defer.returnValue({}) + + # map from state_group to ((type, key) -> event_id) state map + state_groups = {} missing_event_ids = [] was_updated = False for event_id in new_latest_event_ids: @@ -515,16 +524,19 @@ class EventsStore(SQLBaseStore): if ctx.current_state_ids is None: raise Exception("Unknown current state") + if ctx.state_group is None: + # I don't think this can happen, but let's double-check + raise Exception( + "Context for new extremity event %s has no state " + "group" % (event_id, ), + ) + # If we've already seen the state group don't bother adding # it to the state sets again if ctx.state_group not in state_groups: - state_sets.append(ctx.current_state_ids) + state_groups[ctx.state_group] = ctx.current_state_ids if ctx.delta_ids or hasattr(ev, "state_key"): was_updated = True - if ctx.state_group: - # Add this as a seen state group (if it has a state - # group) - state_groups.add(ctx.state_group) break else: # If we couldn't find it, then we'll need to pull @@ -532,65 +544,37 @@ class EventsStore(SQLBaseStore): was_updated = True missing_event_ids.append(event_id) + if not was_updated: + return + if missing_event_ids: # Now pull out the state for any missing events from DB event_to_groups = yield self._get_state_group_for_events( missing_event_ids, ) - groups = set(event_to_groups.itervalues()) - state_groups + groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys()) if groups: group_to_state = yield self._get_state_for_groups(groups) - state_sets.extend(group_to_state.itervalues()) + state_groups.update(group_to_state) - if not new_latest_event_ids: - defer.returnValue({}) - elif was_updated: - if len(state_sets) == 1: - # If there is only one state set, then we know what the current - # state is. - defer.returnValue(state_sets[0]) - else: - # We work out the current state by passing the state sets to the - # state resolution algorithm. It may ask for some events, including - # the events we have yet to persist, so we need a slightly more - # complicated event lookup function than simply looking the events - # up in the db. - - logger.info( - "Resolving state with %i state sets", len(state_sets), - ) + if len(state_groups) == 1: + # If there is only one state group, then we know what the current + # state is. + defer.returnValue(state_groups.values()[0]) - events_map = {ev.event_id: ev for ev, _ in events_context} - - @defer.inlineCallbacks - def get_events(ev_ids): - # We get the events by first looking at the list of events we - # are trying to persist, and then fetching the rest from the DB. - db = [] - to_return = {} - for ev_id in ev_ids: - ev = events_map.get(ev_id, None) - if ev: - to_return[ev_id] = ev - else: - db.append(ev_id) - - if db: - evs = yield self.get_events( - ev_ids, get_prev_content=False, check_redacted=False, - ) - to_return.update(evs) - defer.returnValue(to_return) + def get_events(ev_ids): + return self.get_events( + ev_ids, get_prev_content=False, check_redacted=False, + ) + events_map = {ev.event_id: ev for ev, _ in events_context} + logger.debug("calling resolve_state_groups from preserve_events") + res = yield self._state_resolution_handler.resolve_state_groups( + room_id, state_groups, events_map, get_events + ) - current_state = yield resolve_events_with_factory( - state_sets, - state_map_factory=get_events, - ) - defer.returnValue(current_state) - else: - return + defer.returnValue(res.state) @defer.inlineCallbacks def _calculate_state_delta(self, room_id, current_state): @@ -2063,16 +2047,32 @@ class EventsStore(SQLBaseStore): ) return self.runInteraction("get_all_new_events", get_all_new_events_txn) - def delete_old_state(self, room_id, topological_ordering): - return self.runInteraction( - "delete_old_state", - self._delete_old_state_txn, room_id, topological_ordering - ) + def purge_history( + self, room_id, topological_ordering, delete_local_events, + ): + """Deletes room history before a certain point + + Args: + room_id (str): + + topological_ordering (int): + minimum topo ordering to preserve - def _delete_old_state_txn(self, txn, room_id, topological_ordering): - """Deletes old room state + delete_local_events (bool): + if True, we will delete local events as well as remote ones + (instead of just marking them as outliers and deleting their + state groups). """ + return self.runInteraction( + "purge_history", + self._purge_history_txn, room_id, topological_ordering, + delete_local_events, + ) + + def _purge_history_txn( + self, txn, room_id, topological_ordering, delete_local_events, + ): # Tables that should be pruned: # event_auth # event_backward_extremities @@ -2113,7 +2113,7 @@ class EventsStore(SQLBaseStore): 400, "topological_ordering is greater than forward extremeties" ) - logger.debug("[purge] looking for events to delete") + logger.info("[purge] looking for events to delete") txn.execute( "SELECT event_id, state_key FROM events" @@ -2125,16 +2125,16 @@ class EventsStore(SQLBaseStore): to_delete = [ (event_id,) for event_id, state_key in event_rows - if state_key is None and not self.hs.is_mine_id(event_id) + if state_key is None and ( + delete_local_events or not self.hs.is_mine_id(event_id) + ) ] logger.info( - "[purge] found %i events before cutoff, of which %i are remote" - " non-state events to delete", len(event_rows), len(to_delete)) - - for event_id, state_key in event_rows: - txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) + "[purge] found %i events before cutoff, of which %i can be deleted", + len(event_rows), len(to_delete), + ) - logger.debug("[purge] Finding new backward extremities") + logger.info("[purge] Finding new backward extremities") # We calculate the new entries for the backward extremeties by finding # all events that point to events that are to be purged @@ -2148,7 +2148,7 @@ class EventsStore(SQLBaseStore): ) new_backwards_extrems = txn.fetchall() - logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems) + logger.info("[purge] replacing backward extremities: %r", new_backwards_extrems) txn.execute( "DELETE FROM event_backward_extremities WHERE room_id = ?", @@ -2164,7 +2164,7 @@ class EventsStore(SQLBaseStore): ] ) - logger.debug("[purge] finding redundant state groups") + logger.info("[purge] finding redundant state groups") # Get all state groups that are only referenced by events that are # to be deleted. @@ -2181,15 +2181,15 @@ class EventsStore(SQLBaseStore): ) state_rows = txn.fetchall() - logger.debug("[purge] found %i redundant state groups", len(state_rows)) + logger.info("[purge] found %i redundant state groups", len(state_rows)) # make a set of the redundant state groups, so that we can look them up # efficiently state_groups_to_delete = set([sg for sg, in state_rows]) # Now we get all the state groups that rely on these state groups - logger.debug("[purge] finding state groups which depend on redundant" - " state groups") + logger.info("[purge] finding state groups which depend on redundant" + " state groups") remaining_state_groups = [] for i in xrange(0, len(state_rows), 100): chunk = [sg for sg, in state_rows[i:i + 100]] @@ -2214,7 +2214,7 @@ class EventsStore(SQLBaseStore): # Now we turn the state groups that reference to-be-deleted state # groups to non delta versions. for sg in remaining_state_groups: - logger.debug("[purge] de-delta-ing remaining state group %s", sg) + logger.info("[purge] de-delta-ing remaining state group %s", sg) curr_state = self._get_state_groups_from_groups_txn( txn, [sg], types=None ) @@ -2251,7 +2251,7 @@ class EventsStore(SQLBaseStore): ], ) - logger.debug("[purge] removing redundant state groups") + logger.info("[purge] removing redundant state groups") txn.executemany( "DELETE FROM state_groups_state WHERE state_group = ?", state_rows @@ -2261,18 +2261,15 @@ class EventsStore(SQLBaseStore): state_rows ) - # Delete all non-state - logger.debug("[purge] removing events from event_to_state_groups") + logger.info("[purge] removing events from event_to_state_groups") txn.executemany( "DELETE FROM event_to_state_groups WHERE event_id = ?", [(event_id,) for event_id, _ in event_rows] ) - - logger.debug("[purge] updating room_depth") - txn.execute( - "UPDATE room_depth SET min_depth = ? WHERE room_id = ?", - (topological_ordering, room_id,) - ) + for event_id, _ in event_rows: + txn.call_after(self._get_state_group_for_event.invalidate, ( + event_id, + )) # Delete all remote non-state events for table in ( @@ -2290,7 +2287,7 @@ class EventsStore(SQLBaseStore): "event_signatures", "rejections", ): - logger.debug("[purge] removing remote non-state events from %s", table) + logger.info("[purge] removing events from %s", table) txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), @@ -2298,16 +2295,30 @@ class EventsStore(SQLBaseStore): ) # Mark all state and own events as outliers - logger.debug("[purge] marking remaining events as outliers") + logger.info("[purge] marking remaining events as outliers") txn.executemany( "UPDATE events SET outlier = ?" " WHERE event_id = ?", [ (True, event_id,) for event_id, state_key in event_rows - if state_key is not None or self.hs.is_mine_id(event_id) + if state_key is not None or ( + not delete_local_events and self.hs.is_mine_id(event_id) + ) ] ) + # synapse tries to take out an exclusive lock on room_depth whenever it + # persists events (because upsert), and once we run this update, we + # will block that for the rest of our transaction. + # + # So, let's stick it at the end so that we don't block event + # persistence. + logger.info("[purge] updating room_depth") + txn.execute( + "UPDATE room_depth SET min_depth = ? WHERE room_id = ?", + (topological_ordering, room_id,) + ) + logger.info("[purge] done") @defer.inlineCallbacks |