diff options
author | Neil Johnson <neil@fragile.org.uk> | 2018-03-26 14:51:11 +0100 |
---|---|---|
committer | Neil Johnson <neil@fragile.org.uk> | 2018-03-26 14:51:11 +0100 |
commit | aa3587fdd111d01b3bd0a610dab6e102d79f230c (patch) | |
tree | 1fad5548eab845afcb1fe5f0c5e26694b322d6d3 /synapse/storage/events.py | |
parent | Merge branch 'hotfixes-v0.26.1' of github.com:matrix-org/synapse (diff) | |
parent | version bump (diff) | |
download | synapse-aa3587fdd111d01b3bd0a610dab6e102d79f230c.tar.xz |
Merge branch 'release-v0.27.0' of https://github.com/matrix-org/synapse v0.27.0
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 791 |
1 files changed, 276 insertions, 515 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 73658a9927..85ce6bea1a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,23 +13,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore -from twisted.internet import defer, reactor +from synapse.storage.events_worker import EventsWorkerStore -from synapse.events import FrozenEvent, USE_FROZEN_DICTS -from synapse.events.utils import prune_event +from twisted.internet import defer + +from synapse.events import USE_FROZEN_DICTS from synapse.util.async import ObservableDeferred from synapse.util.logcontext import ( - preserve_fn, PreserveLoggingContext, make_deferred_yieldable + PreserveLoggingContext, make_deferred_yieldable ) from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError -from synapse.state import resolve_events -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.types import get_domain_from_id from canonicaljson import encode_canonical_json @@ -61,16 +61,6 @@ def encode_json(json_object): return json.dumps(json_object, ensure_ascii=False) -# These values are used in the `enqueus_event` and `_do_fetch` methods to -# control how we batch/bulk fetch events from the database. -# The values are plucked out of thing air to make initial sync run faster -# on jki.re -# TODO: Make these configurable. -EVENT_QUEUE_THREADS = 3 # Max number of threads that will fetch events -EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events -EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events - - class _EventPeristenceQueue(object): """Queues up events so that they can be persisted in bulk with only one concurrent transaction per room. @@ -109,7 +99,7 @@ class _EventPeristenceQueue(object): end_item.events_and_contexts.extend(events_and_contexts) return end_item.deferred.observe() - deferred = ObservableDeferred(defer.Deferred()) + deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True) queue.append(self._EventPersistQueueItem( events_and_contexts=events_and_contexts, @@ -145,18 +135,25 @@ class _EventPeristenceQueue(object): try: queue = self._get_drainining_queue(room_id) for item in queue: + # handle_queue_loop runs in the sentinel logcontext, so + # there is no need to preserve_fn when running the + # callbacks on the deferred. try: ret = yield per_item_callback(item) item.deferred.callback(ret) - except Exception as e: - item.deferred.errback(e) + except Exception: + item.deferred.errback() finally: queue = self._event_persist_queues.pop(room_id, None) if queue: self._event_persist_queues[room_id] = queue self._currently_persisting_rooms.discard(room_id) - preserve_fn(handle_queue_loop)() + # set handle_queue_loop off on the background. We don't want to + # attribute work done in it to the current request, so we drop the + # logcontext altogether. + with PreserveLoggingContext(): + handle_queue_loop() def _get_drainining_queue(self, room_id): queue = self._event_persist_queues.setdefault(room_id, deque()) @@ -192,13 +189,12 @@ def _retry_on_integrity_error(func): return f -class EventsStore(SQLBaseStore): +class EventsStore(EventsWorkerStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" def __init__(self, db_conn, hs): super(EventsStore, self).__init__(db_conn, hs) - self._clock = hs.get_clock() self.register_background_update_handler( self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts ) @@ -229,6 +225,8 @@ class EventsStore(SQLBaseStore): self._event_persist_queue = _EventPeristenceQueue() + self._state_resolution_handler = hs.get_state_resolution_handler() + def persist_events(self, events_and_contexts, backfilled=False): """ Write events to the database @@ -284,10 +282,11 @@ class EventsStore(SQLBaseStore): def _maybe_start_persisting(self, room_id): @defer.inlineCallbacks def persisting_queue(item): - yield self._persist_events( - item.events_and_contexts, - backfilled=item.backfilled, - ) + with Measure(self._clock, "persist_events"): + yield self._persist_events( + item.events_and_contexts, + backfilled=item.backfilled, + ) self._event_persist_queue.handle_queue(room_id, persisting_queue) @@ -334,8 +333,20 @@ class EventsStore(SQLBaseStore): # NB: Assumes that we are only persisting events for one room # at a time. + + # map room_id->list[event_ids] giving the new forward + # extremities in each room new_forward_extremeties = {} + + # map room_id->(type,state_key)->event_id tracking the full + # state in each room after adding these events current_state_for_room = {} + + # map room_id->(to_delete, to_insert) where each entry is + # a map (type,key)->event_id giving the state delta in each + # room + state_delta_for_room = {} + if not backfilled: with Measure(self._clock, "_calculate_state_and_extrem"): # Work out the new "current state" for each room. @@ -378,11 +389,20 @@ class EventsStore(SQLBaseStore): if all_single_prev_not_state: continue - state = yield self._calculate_state_delta( - room_id, ev_ctx_rm, new_latest_event_ids + logger.info( + "Calculating state delta for room %s", room_id, ) - if state: - current_state_for_room[room_id] = state + current_state = yield self._get_new_state_after_events( + room_id, + ev_ctx_rm, new_latest_event_ids, + ) + if current_state is not None: + current_state_for_room[room_id] = current_state + delta = yield self._calculate_state_delta( + room_id, current_state, + ) + if delta is not None: + state_delta_for_room[room_id] = delta yield self.runInteraction( "persist_events", @@ -390,7 +410,7 @@ class EventsStore(SQLBaseStore): events_and_contexts=chunk, backfilled=backfilled, delete_existing=delete_existing, - current_state_for_room=current_state_for_room, + state_delta_for_room=state_delta_for_room, new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) @@ -407,7 +427,7 @@ class EventsStore(SQLBaseStore): event_counter.inc(event.type, origin_type, origin_entity) - for room_id, (_, _, new_state) in current_state_for_room.iteritems(): + for room_id, new_state in current_state_for_room.iteritems(): self.get_current_state_ids.prefill( (room_id, ), new_state ) @@ -459,22 +479,31 @@ class EventsStore(SQLBaseStore): defer.returnValue(new_latest_event_ids) @defer.inlineCallbacks - def _calculate_state_delta(self, room_id, events_context, new_latest_event_ids): - """Calculate the new state deltas for a room. + def _get_new_state_after_events(self, room_id, events_context, new_latest_event_ids): + """Calculate the current state dict after adding some new events to + a room - Assumes that we are only persisting events for one room at a time. + Args: + room_id (str): + room to which the events are being added. Used for logging etc + + events_context (list[(EventBase, EventContext)]): + events and contexts which are being added to the room + + new_latest_event_ids (iterable[str]): + the new forward extremities for the room. Returns: - 3-tuple (to_delete, to_insert, new_state) where both are state dicts, - i.e. (type, state_key) -> event_id. `to_delete` are the entries to - first be deleted from current_state_events, `to_insert` are entries - to insert. `new_state` is the full set of state. - May return None if there are no changes to be applied. + Deferred[dict[(str,str), str]|None]: + None if there are no changes to the room state, or + a dict of (type, state_key) -> event_id]. """ - # Now we need to work out the different state sets for - # each state extremities - state_sets = [] - state_groups = set() + + if not new_latest_event_ids: + defer.returnValue({}) + + # map from state_group to ((type, key) -> event_id) state map + state_groups = {} missing_event_ids = [] was_updated = False for event_id in new_latest_event_ids: @@ -485,16 +514,19 @@ class EventsStore(SQLBaseStore): if ctx.current_state_ids is None: raise Exception("Unknown current state") + if ctx.state_group is None: + # I don't think this can happen, but let's double-check + raise Exception( + "Context for new extremity event %s has no state " + "group" % (event_id, ), + ) + # If we've already seen the state group don't bother adding # it to the state sets again if ctx.state_group not in state_groups: - state_sets.append(ctx.current_state_ids) + state_groups[ctx.state_group] = ctx.current_state_ids if ctx.delta_ids or hasattr(ev, "state_key"): was_updated = True - if ctx.state_group: - # Add this as a seen state group (if it has a state - # group) - state_groups.add(ctx.state_group) break else: # If we couldn't find it, then we'll need to pull @@ -502,60 +534,50 @@ class EventsStore(SQLBaseStore): was_updated = True missing_event_ids.append(event_id) + if not was_updated: + return + if missing_event_ids: # Now pull out the state for any missing events from DB event_to_groups = yield self._get_state_group_for_events( missing_event_ids, ) - groups = set(event_to_groups.itervalues()) - state_groups + groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys()) if groups: group_to_state = yield self._get_state_for_groups(groups) - state_sets.extend(group_to_state.itervalues()) + state_groups.update(group_to_state) - if not new_latest_event_ids: - current_state = {} - elif was_updated: - if len(state_sets) == 1: - # If there is only one state set, then we know what the current - # state is. - current_state = state_sets[0] - else: - # We work out the current state by passing the state sets to the - # state resolution algorithm. It may ask for some events, including - # the events we have yet to persist, so we need a slightly more - # complicated event lookup function than simply looking the events - # up in the db. - events_map = {ev.event_id: ev for ev, _ in events_context} - - @defer.inlineCallbacks - def get_events(ev_ids): - # We get the events by first looking at the list of events we - # are trying to persist, and then fetching the rest from the DB. - db = [] - to_return = {} - for ev_id in ev_ids: - ev = events_map.get(ev_id, None) - if ev: - to_return[ev_id] = ev - else: - db.append(ev_id) - - if db: - evs = yield self.get_events( - ev_ids, get_prev_content=False, check_redacted=False, - ) - to_return.update(evs) - defer.returnValue(to_return) + if len(state_groups) == 1: + # If there is only one state group, then we know what the current + # state is. + defer.returnValue(state_groups.values()[0]) - current_state = yield resolve_events( - state_sets, - state_map_factory=get_events, - ) - else: - return + def get_events(ev_ids): + return self.get_events( + ev_ids, get_prev_content=False, check_redacted=False, + ) + events_map = {ev.event_id: ev for ev, _ in events_context} + logger.debug("calling resolve_state_groups from preserve_events") + res = yield self._state_resolution_handler.resolve_state_groups( + room_id, state_groups, events_map, get_events + ) + + defer.returnValue(res.state) + + @defer.inlineCallbacks + def _calculate_state_delta(self, room_id, current_state): + """Calculate the new state deltas for a room. + Assumes that we are only persisting events for one room at a time. + + Returns: + 2-tuple (to_delete, to_insert) where both are state dicts, + i.e. (type, state_key) -> event_id. `to_delete` are the entries to + first be deleted from current_state_events, `to_insert` are entries + to insert. + """ existing_state = yield self.get_current_state_ids(room_id) existing_events = set(existing_state.itervalues()) @@ -575,67 +597,11 @@ class EventsStore(SQLBaseStore): if ev_id in events_to_insert } - defer.returnValue((to_delete, to_insert, current_state)) - - @defer.inlineCallbacks - def get_event(self, event_id, check_redacted=True, - get_prev_content=False, allow_rejected=False, - allow_none=False): - """Get an event from the database by event_id. - - Args: - event_id (str): The event_id of the event to fetch - check_redacted (bool): If True, check if event has been redacted - and redact it. - get_prev_content (bool): If True and event is a state event, - include the previous states content in the unsigned field. - allow_rejected (bool): If True return rejected events. - allow_none (bool): If True, return None if no event found, if - False throw an exception. - - Returns: - Deferred : A FrozenEvent. - """ - events = yield self._get_events( - [event_id], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - if not events and not allow_none: - raise SynapseError(404, "Could not find event %s" % (event_id,)) - - defer.returnValue(events[0] if events else None) - - @defer.inlineCallbacks - def get_events(self, event_ids, check_redacted=True, - get_prev_content=False, allow_rejected=False): - """Get events from the database - - Args: - event_ids (list): The event_ids of the events to fetch - check_redacted (bool): If True, check if event has been redacted - and redact it. - get_prev_content (bool): If True and event is a state event, - include the previous states content in the unsigned field. - allow_rejected (bool): If True return rejected events. - - Returns: - Deferred : Dict from event_id to event. - """ - events = yield self._get_events( - event_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - defer.returnValue({e.event_id: e for e in events}) + defer.returnValue((to_delete, to_insert)) @log_function def _persist_events_txn(self, txn, events_and_contexts, backfilled, - delete_existing=False, current_state_for_room={}, + delete_existing=False, state_delta_for_room={}, new_forward_extremeties={}): """Insert some number of room events into the necessary database tables. @@ -651,7 +617,7 @@ class EventsStore(SQLBaseStore): delete_existing (bool): True to purge existing table rows for the events from the database. This is useful when retrying due to IntegrityError. - current_state_for_room (dict[str, (list[str], list[str])]): + state_delta_for_room (dict[str, (list[str], list[str])]): The current-state delta for each room. For each room, a tuple (to_delete, to_insert), being a list of event ids to be removed from the current state, and a list of event ids to be added to @@ -661,9 +627,11 @@ class EventsStore(SQLBaseStore): list of the event ids which are the forward extremities. """ + all_events_and_contexts = events_and_contexts + max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering - self._update_current_state_txn(txn, current_state_for_room, max_stream_order) + self._update_current_state_txn(txn, state_delta_for_room, max_stream_order) self._update_forward_extremities_txn( txn, @@ -707,9 +675,8 @@ class EventsStore(SQLBaseStore): events_and_contexts=events_and_contexts, ) - # Insert into the state_groups, state_groups_state, and - # event_to_state_groups tables. - self._store_mult_state_groups_txn(txn, events_and_contexts) + # Insert into event_to_state_groups. + self._store_event_state_mappings_txn(txn, events_and_contexts) # _store_rejected_events_txn filters out any events which were # rejected, and returns the filtered list. @@ -724,12 +691,13 @@ class EventsStore(SQLBaseStore): self._update_metadata_tables_txn( txn, events_and_contexts=events_and_contexts, + all_events_and_contexts=all_events_and_contexts, backfilled=backfilled, ) def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): for room_id, current_state_tuple in state_delta_by_room.iteritems(): - to_delete, to_insert, _ = current_state_tuple + to_delete, to_insert = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", [(ev_id,) for ev_id in to_delete.itervalues()], @@ -786,7 +754,7 @@ class EventsStore(SQLBaseStore): for member in members_changed: self._invalidate_cache_and_stream( - txn, self.get_rooms_for_user, (member,) + txn, self.get_rooms_for_user_with_stream_ordering, (member,) ) for host in set(get_domain_from_id(u) for u in members_changed): @@ -944,10 +912,9 @@ class EventsStore(SQLBaseStore): # an outlier in the database. We now have some state at that # so we need to update the state_groups table with that state. - # insert into the state_group, state_groups_state and - # event_to_state_groups tables. + # insert into event_to_state_groups. try: - self._store_mult_state_groups_txn(txn, ((event, context),)) + self._store_event_state_mappings_txn(txn, ((event, context),)) except Exception: logger.exception("") raise @@ -1122,27 +1089,33 @@ class EventsStore(SQLBaseStore): ec for ec in events_and_contexts if ec[0] not in to_remove ] - def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled): + def _update_metadata_tables_txn(self, txn, events_and_contexts, + all_events_and_contexts, backfilled): """Update all the miscellaneous tables for new events Args: txn (twisted.enterprise.adbapi.Connection): db connection events_and_contexts (list[(EventBase, EventContext)]): events we are persisting + all_events_and_contexts (list[(EventBase, EventContext)]): all + events that we were going to persist. This includes events + we've already persisted, etc, that wouldn't appear in + events_and_context. backfilled (bool): True if the events were backfilled """ + # Insert all the push actions into the event_push_actions table. + self._set_push_actions_for_event_and_users_txn( + txn, + events_and_contexts=events_and_contexts, + all_events_and_contexts=all_events_and_contexts, + ) + if not events_and_contexts: # nothing to do here return for event, context in events_and_contexts: - # Insert all the push actions into the event_push_actions table. - if context.push_actions: - self._set_push_actions_for_event_and_users_txn( - txn, event, context.push_actions - ) - if event.type == EventTypes.Redaction and event.redacts is not None: # Remove the entries in the event_push_actions table for the # redacted event. @@ -1347,292 +1320,6 @@ class EventsStore(SQLBaseStore): ) @defer.inlineCallbacks - def _get_events(self, event_ids, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not event_ids: - defer.returnValue([]) - - event_id_list = event_ids - event_ids = set(event_ids) - - event_entry_map = self._get_events_from_cache( - event_ids, - allow_rejected=allow_rejected, - ) - - missing_events_ids = [e for e in event_ids if e not in event_entry_map] - - if missing_events_ids: - missing_events = yield self._enqueue_events( - missing_events_ids, - check_redacted=check_redacted, - allow_rejected=allow_rejected, - ) - - event_entry_map.update(missing_events) - - events = [] - for event_id in event_id_list: - entry = event_entry_map.get(event_id, None) - if not entry: - continue - - if allow_rejected or not entry.event.rejected_reason: - if check_redacted and entry.redacted_event: - event = entry.redacted_event - else: - event = entry.event - - events.append(event) - - if get_prev_content: - if "replaces_state" in event.unsigned: - prev = yield self.get_event( - event.unsigned["replaces_state"], - get_prev_content=False, - allow_none=True, - ) - if prev: - event.unsigned = dict(event.unsigned) - event.unsigned["prev_content"] = prev.content - event.unsigned["prev_sender"] = prev.sender - - defer.returnValue(events) - - def _invalidate_get_event_cache(self, event_id): - self._get_event_cache.invalidate((event_id,)) - - def _get_events_from_cache(self, events, allow_rejected, update_metrics=True): - """Fetch events from the caches - - Args: - events (list(str)): list of event_ids to fetch - allow_rejected (bool): Whether to teturn events that were rejected - update_metrics (bool): Whether to update the cache hit ratio metrics - - Returns: - dict of event_id -> _EventCacheEntry for each event_id in cache. If - allow_rejected is `False` then there will still be an entry but it - will be `None` - """ - event_map = {} - - for event_id in events: - ret = self._get_event_cache.get( - (event_id,), None, - update_metrics=update_metrics, - ) - if not ret: - continue - - if allow_rejected or not ret.event.rejected_reason: - event_map[event_id] = ret - else: - event_map[event_id] = None - - return event_map - - def _do_fetch(self, conn): - """Takes a database connection and waits for requests for events from - the _event_fetch_list queue. - """ - event_list = [] - i = 0 - while True: - try: - with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] - - if not event_list: - single_threaded = self.database_engine.single_threaded - if single_threaded or i > EVENT_QUEUE_ITERATIONS: - self._event_fetch_ongoing -= 1 - return - else: - self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) - i += 1 - continue - i = 0 - - event_id_lists = zip(*event_list)[0] - event_ids = [ - item for sublist in event_id_lists for item in sublist - ] - - rows = self._new_transaction( - conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids - ) - - row_dict = { - r["event_id"]: r - for r in rows - } - - # We only want to resolve deferreds from the main thread - def fire(lst, res): - for ids, d in lst: - if not d.called: - try: - with PreserveLoggingContext(): - d.callback([ - res[i] - for i in ids - if i in res - ]) - except Exception: - logger.exception("Failed to callback") - with PreserveLoggingContext(): - reactor.callFromThread(fire, event_list, row_dict) - except Exception as e: - logger.exception("do_fetch") - - # We only want to resolve deferreds from the main thread - def fire(evs): - for _, d in evs: - if not d.called: - with PreserveLoggingContext(): - d.errback(e) - - if event_list: - with PreserveLoggingContext(): - reactor.callFromThread(fire, event_list) - - @defer.inlineCallbacks - def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): - """Fetches events from the database using the _event_fetch_list. This - allows batch and bulk fetching of events - it allows us to fetch events - without having to create a new transaction for each request for events. - """ - if not events: - defer.returnValue({}) - - events_d = defer.Deferred() - with self._event_fetch_lock: - self._event_fetch_list.append( - (events, events_d) - ) - - self._event_fetch_lock.notify() - - if self._event_fetch_ongoing < EVENT_QUEUE_THREADS: - self._event_fetch_ongoing += 1 - should_start = True - else: - should_start = False - - if should_start: - with PreserveLoggingContext(): - self.runWithConnection( - self._do_fetch - ) - - logger.debug("Loading %d events", len(events)) - with PreserveLoggingContext(): - rows = yield events_d - logger.debug("Loaded %d events (%d rows)", len(events), len(rows)) - - if not allow_rejected: - rows[:] = [r for r in rows if not r["rejects"]] - - res = yield make_deferred_yieldable(defer.gatherResults( - [ - preserve_fn(self._get_event_from_row)( - row["internal_metadata"], row["json"], row["redacts"], - rejected_reason=row["rejects"], - ) - for row in rows - ], - consumeErrors=True - )) - - defer.returnValue({ - e.event.event_id: e - for e in res if e - }) - - def _fetch_event_rows(self, txn, events): - rows = [] - N = 200 - for i in range(1 + len(events) / N): - evs = events[i * N:(i + 1) * N] - if not evs: - break - - sql = ( - "SELECT " - " e.event_id as event_id, " - " e.internal_metadata," - " e.json," - " r.redacts as redacts," - " rej.event_id as rejects " - " FROM event_json as e" - " LEFT JOIN rejections as rej USING (event_id)" - " LEFT JOIN redactions as r ON e.event_id = r.redacts" - " WHERE e.event_id IN (%s)" - ) % (",".join(["?"] * len(evs)),) - - txn.execute(sql, evs) - rows.extend(self.cursor_to_dict(txn)) - - return rows - - @defer.inlineCallbacks - def _get_event_from_row(self, internal_metadata, js, redacted, - rejected_reason=None): - with Measure(self._clock, "_get_event_from_row"): - d = json.loads(js) - internal_metadata = json.loads(internal_metadata) - - if rejected_reason: - rejected_reason = yield self._simple_select_one_onecol( - table="rejections", - keyvalues={"event_id": rejected_reason}, - retcol="reason", - desc="_get_event_from_row_rejected_reason", - ) - - original_ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, - ) - - redacted_event = None - if redacted: - redacted_event = prune_event(original_ev) - - redaction_id = yield self._simple_select_one_onecol( - table="redactions", - keyvalues={"redacts": redacted_event.event_id}, - retcol="event_id", - desc="_get_event_from_row_redactions", - ) - - redacted_event.unsigned["redacted_by"] = redaction_id - # Get the redaction event. - - because = yield self.get_event( - redaction_id, - check_redacted=False, - allow_none=True, - ) - - if because: - # It's fine to do add the event directly, since get_pdu_json - # will serialise this field correctly - redacted_event.unsigned["redacted_because"] = because - - cache_entry = _EventCacheEntry( - event=original_ev, - redacted_event=redacted_event, - ) - - self._get_event_cache.prefill((original_ev.event_id,), cache_entry) - - defer.returnValue(cache_entry) - - @defer.inlineCallbacks def count_daily_messages(self): """ Returns an estimate of the number of messages sent in the last day. @@ -2017,16 +1704,32 @@ class EventsStore(SQLBaseStore): ) return self.runInteraction("get_all_new_events", get_all_new_events_txn) - def delete_old_state(self, room_id, topological_ordering): - return self.runInteraction( - "delete_old_state", - self._delete_old_state_txn, room_id, topological_ordering - ) + def purge_history( + self, room_id, topological_ordering, delete_local_events, + ): + """Deletes room history before a certain point + + Args: + room_id (str): + + topological_ordering (int): + minimum topo ordering to preserve - def _delete_old_state_txn(self, txn, room_id, topological_ordering): - """Deletes old room state + delete_local_events (bool): + if True, we will delete local events as well as remote ones + (instead of just marking them as outliers and deleting their + state groups). """ + return self.runInteraction( + "purge_history", + self._purge_history_txn, room_id, topological_ordering, + delete_local_events, + ) + + def _purge_history_txn( + self, txn, room_id, topological_ordering, delete_local_events, + ): # Tables that should be pruned: # event_auth # event_backward_extremities @@ -2047,6 +1750,30 @@ class EventsStore(SQLBaseStore): # state_groups # state_groups_state + # we will build a temporary table listing the events so that we don't + # have to keep shovelling the list back and forth across the + # connection. Annoyingly the python sqlite driver commits the + # transaction on CREATE, so let's do this first. + # + # furthermore, we might already have the table from a previous (failed) + # purge attempt, so let's drop the table first. + + txn.execute("DROP TABLE IF EXISTS events_to_purge") + + txn.execute( + "CREATE TEMPORARY TABLE events_to_purge (" + " event_id TEXT NOT NULL," + " should_delete BOOLEAN NOT NULL" + ")" + ) + + # create an index on should_delete because later we'll be looking for + # the should_delete / shouldn't_delete subsets + txn.execute( + "CREATE INDEX events_to_purge_should_delete" + " ON events_to_purge(should_delete)", + ) + # First ensure that we're not about to delete all the forward extremeties txn.execute( "SELECT e.event_id, e.depth FROM events as e " @@ -2067,42 +1794,48 @@ class EventsStore(SQLBaseStore): 400, "topological_ordering is greater than forward extremeties" ) - logger.debug("[purge] looking for events to delete") + logger.info("[purge] looking for events to delete") + + should_delete_expr = "state_key IS NULL" + should_delete_params = () + if not delete_local_events: + should_delete_expr += " AND event_id NOT LIKE ?" + should_delete_params += ("%:" + self.hs.hostname, ) + + should_delete_params += (room_id, topological_ordering) txn.execute( - "SELECT event_id, state_key FROM events" - " LEFT JOIN state_events USING (room_id, event_id)" - " WHERE room_id = ? AND topological_ordering < ?", - (room_id, topological_ordering,) + "INSERT INTO events_to_purge" + " SELECT event_id, %s" + " FROM events AS e LEFT JOIN state_events USING (event_id)" + " WHERE e.room_id = ? AND topological_ordering < ?" % ( + should_delete_expr, + ), + should_delete_params, + ) + txn.execute( + "SELECT event_id, should_delete FROM events_to_purge" ) event_rows = txn.fetchall() - - to_delete = [ - (event_id,) for event_id, state_key in event_rows - if state_key is None and not self.hs.is_mine_id(event_id) - ] logger.info( - "[purge] found %i events before cutoff, of which %i are remote" - " non-state events to delete", len(event_rows), len(to_delete)) - - for event_id, state_key in event_rows: - txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) + "[purge] found %i events before cutoff, of which %i can be deleted", + len(event_rows), sum(1 for e in event_rows if e[1]), + ) - logger.debug("[purge] Finding new backward extremities") + logger.info("[purge] Finding new backward extremities") # We calculate the new entries for the backward extremeties by finding # all events that point to events that are to be purged txn.execute( - "SELECT DISTINCT e.event_id FROM events as e" - " INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id" - " INNER JOIN events as e2 ON e2.event_id = ed.event_id" - " WHERE e.room_id = ? AND e.topological_ordering < ?" - " AND e2.topological_ordering >= ?", - (room_id, topological_ordering, topological_ordering) + "SELECT DISTINCT e.event_id FROM events_to_purge AS e" + " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id" + " INNER JOIN events AS e2 ON e2.event_id = ed.event_id" + " WHERE e2.topological_ordering >= ?", + (topological_ordering, ) ) new_backwards_extrems = txn.fetchall() - logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems) + logger.info("[purge] replacing backward extremities: %r", new_backwards_extrems) txn.execute( "DELETE FROM event_backward_extremities WHERE room_id = ?", @@ -2118,7 +1851,7 @@ class EventsStore(SQLBaseStore): ] ) - logger.debug("[purge] finding redundant state groups") + logger.info("[purge] finding redundant state groups") # Get all state groups that are only referenced by events that are # to be deleted. @@ -2126,24 +1859,23 @@ class EventsStore(SQLBaseStore): "SELECT state_group FROM event_to_state_groups" " INNER JOIN events USING (event_id)" " WHERE state_group IN (" - " SELECT DISTINCT state_group FROM events" + " SELECT DISTINCT state_group FROM events_to_purge" " INNER JOIN event_to_state_groups USING (event_id)" - " WHERE room_id = ? AND topological_ordering < ?" " )" " GROUP BY state_group HAVING MAX(topological_ordering) < ?", - (room_id, topological_ordering, topological_ordering) + (topological_ordering, ) ) state_rows = txn.fetchall() - logger.debug("[purge] found %i redundant state groups", len(state_rows)) + logger.info("[purge] found %i redundant state groups", len(state_rows)) # make a set of the redundant state groups, so that we can look them up # efficiently state_groups_to_delete = set([sg for sg, in state_rows]) # Now we get all the state groups that rely on these state groups - logger.debug("[purge] finding state groups which depend on redundant" - " state groups") + logger.info("[purge] finding state groups which depend on redundant" + " state groups") remaining_state_groups = [] for i in xrange(0, len(state_rows), 100): chunk = [sg for sg, in state_rows[i:i + 100]] @@ -2168,7 +1900,7 @@ class EventsStore(SQLBaseStore): # Now we turn the state groups that reference to-be-deleted state # groups to non delta versions. for sg in remaining_state_groups: - logger.debug("[purge] de-delta-ing remaining state group %s", sg) + logger.info("[purge] de-delta-ing remaining state group %s", sg) curr_state = self._get_state_groups_from_groups_txn( txn, [sg], types=None ) @@ -2205,7 +1937,7 @@ class EventsStore(SQLBaseStore): ], ) - logger.debug("[purge] removing redundant state groups") + logger.info("[purge] removing redundant state groups") txn.executemany( "DELETE FROM state_groups_state WHERE state_group = ?", state_rows @@ -2215,18 +1947,15 @@ class EventsStore(SQLBaseStore): state_rows ) - # Delete all non-state - logger.debug("[purge] removing events from event_to_state_groups") - txn.executemany( - "DELETE FROM event_to_state_groups WHERE event_id = ?", - [(event_id,) for event_id, _ in event_rows] - ) - - logger.debug("[purge] updating room_depth") + logger.info("[purge] removing events from event_to_state_groups") txn.execute( - "UPDATE room_depth SET min_depth = ? WHERE room_id = ?", - (topological_ordering, room_id,) + "DELETE FROM event_to_state_groups " + "WHERE event_id IN (SELECT event_id from events_to_purge)" ) + for event_id, _ in event_rows: + txn.call_after(self._get_state_group_for_event.invalidate, ( + event_id, + )) # Delete all remote non-state events for table in ( @@ -2238,28 +1967,60 @@ class EventsStore(SQLBaseStore): "event_edge_hashes", "event_edges", "event_forward_extremities", - "event_push_actions", "event_reference_hashes", "event_search", "event_signatures", "rejections", ): - logger.debug("[purge] removing remote non-state events from %s", table) + logger.info("[purge] removing events from %s", table) - txn.executemany( - "DELETE FROM %s WHERE event_id = ?" % (table,), - to_delete + txn.execute( + "DELETE FROM %s WHERE event_id IN (" + " SELECT event_id FROM events_to_purge WHERE should_delete" + ")" % (table,), + ) + + # event_push_actions lacks an index on event_id, and has one on + # (room_id, event_id) instead. + for table in ( + "event_push_actions", + ): + logger.info("[purge] removing events from %s", table) + + txn.execute( + "DELETE FROM %s WHERE room_id = ? AND event_id IN (" + " SELECT event_id FROM events_to_purge WHERE should_delete" + ")" % (table,), + (room_id, ) ) # Mark all state and own events as outliers - logger.debug("[purge] marking remaining events as outliers") - txn.executemany( + logger.info("[purge] marking remaining events as outliers") + txn.execute( "UPDATE events SET outlier = ?" - " WHERE event_id = ?", - [ - (True, event_id,) for event_id, state_key in event_rows - if state_key is not None or self.hs.is_mine_id(event_id) - ] + " WHERE event_id IN (" + " SELECT event_id FROM events_to_purge " + " WHERE NOT should_delete" + ")", + (True,), + ) + + # synapse tries to take out an exclusive lock on room_depth whenever it + # persists events (because upsert), and once we run this update, we + # will block that for the rest of our transaction. + # + # So, let's stick it at the end so that we don't block event + # persistence. + logger.info("[purge] updating room_depth") + txn.execute( + "UPDATE room_depth SET min_depth = ? WHERE room_id = ?", + (topological_ordering, room_id,) + ) + + # finally, drop the temp table. this will commit the txn in sqlite, + # so make sure to keep this actually last. + txn.execute( + "DROP TABLE events_to_purge" ) logger.info("[purge] done") @@ -2272,7 +2033,7 @@ class EventsStore(SQLBaseStore): to_2, so_2 = yield self._get_event_ordering(event_id2) defer.returnValue((to_1, so_1) > (to_2, so_2)) - @defer.inlineCallbacks + @cachedInlineCallbacks(max_entries=5000) def _get_event_ordering(self, event_id): res = yield self._simple_select_one( table="events", |