From 27b094f382a33b7d69eb592951a463c3e53af5b4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Feb 2018 11:41:48 +0000 Subject: Split out get_events and co into a worker store --- synapse/replication/slave/storage/events.py | 13 +- synapse/storage/events.py | 2403 ++++++++++++++------------- 2 files changed, 1206 insertions(+), 1210 deletions(-) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index f8c164b48b..74a81a0a51 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -18,6 +18,7 @@ from synapse.api.constants import EventTypes from synapse.storage import DataStore from synapse.storage.event_federation import EventFederationStore from synapse.storage.event_push_actions import EventPushActionsStore +from synapse.storage.events import EventsWorkerStore from synapse.storage.roommember import RoomMemberStore from synapse.storage.state import StateGroupWorkerStore from synapse.storage.stream import StreamStore @@ -38,7 +39,7 @@ logger = logging.getLogger(__name__) # the method descriptor on the DataStore and chuck them into our class. -class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore): +class SlavedEventStore(EventsWorkerStore, StateGroupWorkerStore, BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedEventStore, self).__init__(db_conn, hs) @@ -104,8 +105,6 @@ class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore): get_push_action_users_in_range = ( DataStore.get_push_action_users_in_range.__func__ ) - get_event = DataStore.get_event.__func__ - get_events = DataStore.get_events.__func__ get_rooms_for_user_where_membership_is = ( DataStore.get_rooms_for_user_where_membership_is.__func__ ) @@ -135,14 +134,6 @@ class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore): _set_before_and_after = staticmethod(DataStore._set_before_and_after) - _get_events = DataStore._get_events.__func__ - _get_events_from_cache = DataStore._get_events_from_cache.__func__ - - _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__ - _enqueue_events = DataStore._enqueue_events.__func__ - _do_fetch = DataStore._do_fetch.__func__ - _fetch_event_rows = DataStore._fetch_event_rows.__func__ - _get_event_from_row = DataStore._get_event_from_row.__func__ _get_rooms_for_user_where_membership_is_txn = ( DataStore._get_rooms_for_user_where_membership_is_txn.__func__ ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 73177e0bc2..681a33314d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -199,1467 +199,1472 @@ def _retry_on_integrity_error(func): return f -class EventsStore(SQLBaseStore): - EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" - EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" - +class EventsWorkerStore(SQLBaseStore): def __init__(self, db_conn, hs): - super(EventsStore, self).__init__(db_conn, hs) - self._clock = hs.get_clock() - self.register_background_update_handler( - self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts - ) - self.register_background_update_handler( - self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, - self._background_reindex_fields_sender, - ) - - self.register_background_index_update( - "event_contains_url_index", - index_name="event_contains_url_index", - table="events", - columns=["room_id", "topological_ordering", "stream_ordering"], - where_clause="contains_url = true AND outlier = false", - ) - - # an event_id index on event_search is useful for the purge_history - # api. Plus it means we get to enforce some integrity with a UNIQUE - # clause - self.register_background_index_update( - "event_search_event_id_idx", - index_name="event_search_event_id_idx", - table="event_search", - columns=["event_id"], - unique=True, - psql_only=True, - ) + super(EventsWorkerStore, self).__init__(db_conn, hs) self._event_persist_queue = _EventPeristenceQueue() - self._state_resolution_handler = hs.get_state_resolution_handler() + @defer.inlineCallbacks + def get_event(self, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False, + allow_none=False): + """Get an event from the database by event_id. - def persist_events(self, events_and_contexts, backfilled=False): - """ - Write events to the database Args: - events_and_contexts: list of tuples of (event, context) - backfilled: ? - """ - partitioned = {} - for event, ctx in events_and_contexts: - partitioned.setdefault(event.room_id, []).append((event, ctx)) + event_id (str): The event_id of the event to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + allow_none (bool): If True, return None if no event found, if + False throw an exception. - deferreds = [] - for room_id, evs_ctxs in partitioned.iteritems(): - d = self._event_persist_queue.add_to_queue( - room_id, evs_ctxs, - backfilled=backfilled, - ) - deferreds.append(d) + Returns: + Deferred : A FrozenEvent. + """ + events = yield self._get_events( + [event_id], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) - for room_id in partitioned: - self._maybe_start_persisting(room_id) + if not events and not allow_none: + raise SynapseError(404, "Could not find event %s" % (event_id,)) - return make_deferred_yieldable( - defer.gatherResults(deferreds, consumeErrors=True) - ) + defer.returnValue(events[0] if events else None) @defer.inlineCallbacks - @log_function - def persist_event(self, event, context, backfilled=False): - """ + def get_events(self, event_ids, check_redacted=True, + get_prev_content=False, allow_rejected=False): + """Get events from the database Args: - event (EventBase): - context (EventContext): - backfilled (bool): + event_ids (list): The event_ids of the events to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. Returns: - Deferred: resolves to (int, int): the stream ordering of ``event``, - and the stream ordering of the latest persisted event + Deferred : Dict from event_id to event. """ - deferred = self._event_persist_queue.add_to_queue( - event.room_id, [(event, context)], - backfilled=backfilled, + events = yield self._get_events( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, ) - self._maybe_start_persisting(event.room_id) + defer.returnValue({e.event_id: e for e in events}) - yield make_deferred_yieldable(deferred) + @defer.inlineCallbacks + def _get_events(self, event_ids, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not event_ids: + defer.returnValue([]) - max_persisted_id = yield self._stream_id_gen.get_current_token() - defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) + event_id_list = event_ids + event_ids = set(event_ids) - def _maybe_start_persisting(self, room_id): - @defer.inlineCallbacks - def persisting_queue(item): - yield self._persist_events( - item.events_and_contexts, - backfilled=item.backfilled, + event_entry_map = self._get_events_from_cache( + event_ids, + allow_rejected=allow_rejected, + ) + + missing_events_ids = [e for e in event_ids if e not in event_entry_map] + + if missing_events_ids: + missing_events = yield self._enqueue_events( + missing_events_ids, + check_redacted=check_redacted, + allow_rejected=allow_rejected, ) - self._event_persist_queue.handle_queue(room_id, persisting_queue) + event_entry_map.update(missing_events) - @_retry_on_integrity_error - @defer.inlineCallbacks - def _persist_events(self, events_and_contexts, backfilled=False, - delete_existing=False): - """Persist events to db + events = [] + for event_id in event_id_list: + entry = event_entry_map.get(event_id, None) + if not entry: + continue + + if allow_rejected or not entry.event.rejected_reason: + if check_redacted and entry.redacted_event: + event = entry.redacted_event + else: + event = entry.event + + events.append(event) + + if get_prev_content: + if "replaces_state" in event.unsigned: + prev = yield self.get_event( + event.unsigned["replaces_state"], + get_prev_content=False, + allow_none=True, + ) + if prev: + event.unsigned = dict(event.unsigned) + event.unsigned["prev_content"] = prev.content + event.unsigned["prev_sender"] = prev.sender + + defer.returnValue(events) + + def _invalidate_get_event_cache(self, event_id): + self._get_event_cache.invalidate((event_id,)) + + def _get_events_from_cache(self, events, allow_rejected, update_metrics=True): + """Fetch events from the caches Args: - events_and_contexts (list[(EventBase, EventContext)]): - backfilled (bool): - delete_existing (bool): + events (list(str)): list of event_ids to fetch + allow_rejected (bool): Whether to teturn events that were rejected + update_metrics (bool): Whether to update the cache hit ratio metrics Returns: - Deferred: resolves when the events have been persisted + dict of event_id -> _EventCacheEntry for each event_id in cache. If + allow_rejected is `False` then there will still be an entry but it + will be `None` """ - if not events_and_contexts: - return + event_map = {} - if backfilled: - stream_ordering_manager = self._backfill_id_gen.get_next_mult( - len(events_and_contexts) - ) - else: - stream_ordering_manager = self._stream_id_gen.get_next_mult( - len(events_and_contexts) + for event_id in events: + ret = self._get_event_cache.get( + (event_id,), None, + update_metrics=update_metrics, ) + if not ret: + continue - with stream_ordering_manager as stream_orderings: - for (event, context), stream, in zip( - events_and_contexts, stream_orderings - ): - event.internal_metadata.stream_ordering = stream + if allow_rejected or not ret.event.rejected_reason: + event_map[event_id] = ret + else: + event_map[event_id] = None - chunks = [ - events_and_contexts[x:x + 100] - for x in xrange(0, len(events_and_contexts), 100) - ] + return event_map - for chunk in chunks: - # We can't easily parallelize these since different chunks - # might contain the same event. :( + def _do_fetch(self, conn): + """Takes a database connection and waits for requests for events from + the _event_fetch_list queue. + """ + event_list = [] + i = 0 + while True: + try: + with self._event_fetch_lock: + event_list = self._event_fetch_list + self._event_fetch_list = [] - # NB: Assumes that we are only persisting events for one room - # at a time. + if not event_list: + single_threaded = self.database_engine.single_threaded + if single_threaded or i > EVENT_QUEUE_ITERATIONS: + self._event_fetch_ongoing -= 1 + return + else: + self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) + i += 1 + continue + i = 0 - # map room_id->list[event_ids] giving the new forward - # extremities in each room - new_forward_extremeties = {} + event_id_lists = zip(*event_list)[0] + event_ids = [ + item for sublist in event_id_lists for item in sublist + ] - # map room_id->(type,state_key)->event_id tracking the full - # state in each room after adding these events - current_state_for_room = {} + rows = self._new_transaction( + conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids + ) - # map room_id->(to_delete, to_insert) where each entry is - # a map (type,key)->event_id giving the state delta in each - # room - state_delta_for_room = {} + row_dict = { + r["event_id"]: r + for r in rows + } - if not backfilled: - with Measure(self._clock, "_calculate_state_and_extrem"): - # Work out the new "current state" for each room. - # We do this by working out what the new extremities are and then - # calculating the state from that. - events_by_room = {} - for event, context in chunk: - events_by_room.setdefault(event.room_id, []).append( - (event, context) - ) + # We only want to resolve deferreds from the main thread + def fire(lst, res): + for ids, d in lst: + if not d.called: + try: + with PreserveLoggingContext(): + d.callback([ + res[i] + for i in ids + if i in res + ]) + except Exception: + logger.exception("Failed to callback") + with PreserveLoggingContext(): + reactor.callFromThread(fire, event_list, row_dict) + except Exception as e: + logger.exception("do_fetch") - for room_id, ev_ctx_rm in events_by_room.iteritems(): - # Work out new extremities by recursively adding and removing - # the new events. - latest_event_ids = yield self.get_latest_event_ids_in_room( - room_id - ) - new_latest_event_ids = yield self._calculate_new_extremeties( - room_id, ev_ctx_rm, latest_event_ids - ) + # We only want to resolve deferreds from the main thread + def fire(evs): + for _, d in evs: + if not d.called: + with PreserveLoggingContext(): + d.errback(e) - if new_latest_event_ids == set(latest_event_ids): - # No change in extremities, so no change in state - continue + if event_list: + with PreserveLoggingContext(): + reactor.callFromThread(fire, event_list) - new_forward_extremeties[room_id] = new_latest_event_ids + @defer.inlineCallbacks + def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): + """Fetches events from the database using the _event_fetch_list. This + allows batch and bulk fetching of events - it allows us to fetch events + without having to create a new transaction for each request for events. + """ + if not events: + defer.returnValue({}) - len_1 = ( - len(latest_event_ids) == 1 - and len(new_latest_event_ids) == 1 - ) - if len_1: - all_single_prev_not_state = all( - len(event.prev_events) == 1 - and not event.is_state() - for event, ctx in ev_ctx_rm - ) - # Don't bother calculating state if they're just - # a long chain of single ancestor non-state events. - if all_single_prev_not_state: - continue + events_d = defer.Deferred() + with self._event_fetch_lock: + self._event_fetch_list.append( + (events, events_d) + ) - logger.info( - "Calculating state delta for room %s", room_id, - ) - current_state = yield self._get_new_state_after_events( - room_id, - ev_ctx_rm, new_latest_event_ids, - ) - if current_state is not None: - current_state_for_room[room_id] = current_state - delta = yield self._calculate_state_delta( - room_id, current_state, - ) - if delta is not None: - state_delta_for_room[room_id] = delta + self._event_fetch_lock.notify() - yield self.runInteraction( - "persist_events", - self._persist_events_txn, - events_and_contexts=chunk, - backfilled=backfilled, - delete_existing=delete_existing, - state_delta_for_room=state_delta_for_room, - new_forward_extremeties=new_forward_extremeties, + if self._event_fetch_ongoing < EVENT_QUEUE_THREADS: + self._event_fetch_ongoing += 1 + should_start = True + else: + should_start = False + + if should_start: + with PreserveLoggingContext(): + self.runWithConnection( + self._do_fetch ) - persist_event_counter.inc_by(len(chunk)) - for event, context in chunk: - if context.app_service: - origin_type = "local" - origin_entity = context.app_service.id - elif self.hs.is_mine_id(event.sender): - origin_type = "local" - origin_entity = "*client*" - else: - origin_type = "remote" - origin_entity = get_domain_from_id(event.sender) - event_counter.inc(event.type, origin_type, origin_entity) + logger.debug("Loading %d events", len(events)) + with PreserveLoggingContext(): + rows = yield events_d + logger.debug("Loaded %d events (%d rows)", len(events), len(rows)) - for room_id, new_state in current_state_for_room.iteritems(): - self.get_current_state_ids.prefill( - (room_id, ), new_state - ) + if not allow_rejected: + rows[:] = [r for r in rows if not r["rejects"]] - for room_id, latest_event_ids in new_forward_extremeties.iteritems(): - self.get_latest_event_ids_in_room.prefill( - (room_id,), list(latest_event_ids) - ) + res = yield make_deferred_yieldable(defer.gatherResults( + [ + preserve_fn(self._get_event_from_row)( + row["internal_metadata"], row["json"], row["redacts"], + rejected_reason=row["rejects"], + ) + for row in rows + ], + consumeErrors=True + )) - @defer.inlineCallbacks - def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): - """Calculates the new forward extremeties for a room given events to - persist. + defer.returnValue({ + e.event.event_id: e + for e in res if e + }) - Assumes that we are only persisting events for one room at a time. - """ - new_latest_event_ids = set(latest_event_ids) - # First, add all the new events to the list - new_latest_event_ids.update( - event.event_id for event, ctx in event_contexts - if not event.internal_metadata.is_outlier() and not ctx.rejected - ) - # Now remove all events that are referenced by the to-be-added events - new_latest_event_ids.difference_update( - e_id - for event, ctx in event_contexts - for e_id, _ in event.prev_events - if not event.internal_metadata.is_outlier() and not ctx.rejected - ) + def _fetch_event_rows(self, txn, events): + rows = [] + N = 200 + for i in range(1 + len(events) / N): + evs = events[i * N:(i + 1) * N] + if not evs: + break - # And finally remove any events that are referenced by previously added - # events. - rows = yield self._simple_select_many_batch( - table="event_edges", - column="prev_event_id", - iterable=list(new_latest_event_ids), - retcols=["prev_event_id"], - keyvalues={ - "room_id": room_id, - "is_state": False, - }, - desc="_calculate_new_extremeties", - ) + sql = ( + "SELECT " + " e.event_id as event_id, " + " e.internal_metadata," + " e.json," + " r.redacts as redacts," + " rej.event_id as rejects " + " FROM event_json as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE e.event_id IN (%s)" + ) % (",".join(["?"] * len(evs)),) - new_latest_event_ids.difference_update( - row["prev_event_id"] for row in rows - ) + txn.execute(sql, evs) + rows.extend(self.cursor_to_dict(txn)) - defer.returnValue(new_latest_event_ids) + return rows @defer.inlineCallbacks - def _get_new_state_after_events(self, room_id, events_context, new_latest_event_ids): - """Calculate the current state dict after adding some new events to - a room + def _get_event_from_row(self, internal_metadata, js, redacted, + rejected_reason=None): + with Measure(self._clock, "_get_event_from_row"): + d = json.loads(js) + internal_metadata = json.loads(internal_metadata) - Args: - room_id (str): - room to which the events are being added. Used for logging etc + if rejected_reason: + rejected_reason = yield self._simple_select_one_onecol( + table="rejections", + keyvalues={"event_id": rejected_reason}, + retcol="reason", + desc="_get_event_from_row_rejected_reason", + ) - events_context (list[(EventBase, EventContext)]): - events and contexts which are being added to the room + original_ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) - new_latest_event_ids (iterable[str]): - the new forward extremities for the room. + redacted_event = None + if redacted: + redacted_event = prune_event(original_ev) - Returns: - Deferred[dict[(str,str), str]|None]: - None if there are no changes to the room state, or - a dict of (type, state_key) -> event_id]. - """ + redaction_id = yield self._simple_select_one_onecol( + table="redactions", + keyvalues={"redacts": redacted_event.event_id}, + retcol="event_id", + desc="_get_event_from_row_redactions", + ) - if not new_latest_event_ids: - defer.returnValue({}) - - # map from state_group to ((type, key) -> event_id) state map - state_groups = {} - missing_event_ids = [] - was_updated = False - for event_id in new_latest_event_ids: - # First search in the list of new events we're adding, - # and then use the current state from that - for ev, ctx in events_context: - if event_id == ev.event_id: - if ctx.current_state_ids is None: - raise Exception("Unknown current state") - - if ctx.state_group is None: - # I don't think this can happen, but let's double-check - raise Exception( - "Context for new extremity event %s has no state " - "group" % (event_id, ), - ) + redacted_event.unsigned["redacted_by"] = redaction_id + # Get the redaction event. - # If we've already seen the state group don't bother adding - # it to the state sets again - if ctx.state_group not in state_groups: - state_groups[ctx.state_group] = ctx.current_state_ids - if ctx.delta_ids or hasattr(ev, "state_key"): - was_updated = True - break - else: - # If we couldn't find it, then we'll need to pull - # the state from the database - was_updated = True - missing_event_ids.append(event_id) + because = yield self.get_event( + redaction_id, + check_redacted=False, + allow_none=True, + ) - if not was_updated: - return + if because: + # It's fine to do add the event directly, since get_pdu_json + # will serialise this field correctly + redacted_event.unsigned["redacted_because"] = because - if missing_event_ids: - # Now pull out the state for any missing events from DB - event_to_groups = yield self._get_state_group_for_events( - missing_event_ids, + cache_entry = _EventCacheEntry( + event=original_ev, + redacted_event=redacted_event, ) - groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys()) + self._get_event_cache.prefill((original_ev.event_id,), cache_entry) - if groups: - group_to_state = yield self._get_state_for_groups(groups) - state_groups.update(group_to_state) + defer.returnValue(cache_entry) - if len(state_groups) == 1: - # If there is only one state group, then we know what the current - # state is. - defer.returnValue(state_groups.values()[0]) - def get_events(ev_ids): - return self.get_events( - ev_ids, get_prev_content=False, check_redacted=False, - ) - events_map = {ev.event_id: ev for ev, _ in events_context} - logger.debug("calling resolve_state_groups from preserve_events") - res = yield self._state_resolution_handler.resolve_state_groups( - room_id, state_groups, events_map, get_events +class EventsStore(EventsWorkerStore): + EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" + EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" + + def __init__(self, db_conn, hs): + super(EventsStore, self).__init__(db_conn, hs) + self._clock = hs.get_clock() + self.register_background_update_handler( + self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts + ) + self.register_background_update_handler( + self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, + self._background_reindex_fields_sender, ) - defer.returnValue(res.state) + self.register_background_index_update( + "event_contains_url_index", + index_name="event_contains_url_index", + table="events", + columns=["room_id", "topological_ordering", "stream_ordering"], + where_clause="contains_url = true AND outlier = false", + ) - @defer.inlineCallbacks - def _calculate_state_delta(self, room_id, current_state): - """Calculate the new state deltas for a room. + # an event_id index on event_search is useful for the purge_history + # api. Plus it means we get to enforce some integrity with a UNIQUE + # clause + self.register_background_index_update( + "event_search_event_id_idx", + index_name="event_search_event_id_idx", + table="event_search", + columns=["event_id"], + unique=True, + psql_only=True, + ) - Assumes that we are only persisting events for one room at a time. + self._state_resolution_handler = hs.get_state_resolution_handler() - Returns: - 2-tuple (to_delete, to_insert) where both are state dicts, - i.e. (type, state_key) -> event_id. `to_delete` are the entries to - first be deleted from current_state_events, `to_insert` are entries - to insert. + def persist_events(self, events_and_contexts, backfilled=False): """ - existing_state = yield self.get_current_state_ids(room_id) - - existing_events = set(existing_state.itervalues()) - new_events = set(ev_id for ev_id in current_state.itervalues()) - changed_events = existing_events ^ new_events + Write events to the database + Args: + events_and_contexts: list of tuples of (event, context) + backfilled: ? + """ + partitioned = {} + for event, ctx in events_and_contexts: + partitioned.setdefault(event.room_id, []).append((event, ctx)) - if not changed_events: - return + deferreds = [] + for room_id, evs_ctxs in partitioned.iteritems(): + d = self._event_persist_queue.add_to_queue( + room_id, evs_ctxs, + backfilled=backfilled, + ) + deferreds.append(d) - to_delete = { - key: ev_id for key, ev_id in existing_state.iteritems() - if ev_id in changed_events - } - events_to_insert = (new_events - existing_events) - to_insert = { - key: ev_id for key, ev_id in current_state.iteritems() - if ev_id in events_to_insert - } + for room_id in partitioned: + self._maybe_start_persisting(room_id) - defer.returnValue((to_delete, to_insert)) + return make_deferred_yieldable( + defer.gatherResults(deferreds, consumeErrors=True) + ) @defer.inlineCallbacks - def get_event(self, event_id, check_redacted=True, - get_prev_content=False, allow_rejected=False, - allow_none=False): - """Get an event from the database by event_id. + @log_function + def persist_event(self, event, context, backfilled=False): + """ Args: - event_id (str): The event_id of the event to fetch - check_redacted (bool): If True, check if event has been redacted - and redact it. - get_prev_content (bool): If True and event is a state event, - include the previous states content in the unsigned field. - allow_rejected (bool): If True return rejected events. - allow_none (bool): If True, return None if no event found, if - False throw an exception. + event (EventBase): + context (EventContext): + backfilled (bool): Returns: - Deferred : A FrozenEvent. + Deferred: resolves to (int, int): the stream ordering of ``event``, + and the stream ordering of the latest persisted event """ - events = yield self._get_events( - [event_id], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, + deferred = self._event_persist_queue.add_to_queue( + event.room_id, [(event, context)], + backfilled=backfilled, ) - if not events and not allow_none: - raise SynapseError(404, "Could not find event %s" % (event_id,)) + self._maybe_start_persisting(event.room_id) - defer.returnValue(events[0] if events else None) + yield make_deferred_yieldable(deferred) + + max_persisted_id = yield self._stream_id_gen.get_current_token() + defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) + + def _maybe_start_persisting(self, room_id): + @defer.inlineCallbacks + def persisting_queue(item): + yield self._persist_events( + item.events_and_contexts, + backfilled=item.backfilled, + ) + + self._event_persist_queue.handle_queue(room_id, persisting_queue) + @_retry_on_integrity_error @defer.inlineCallbacks - def get_events(self, event_ids, check_redacted=True, - get_prev_content=False, allow_rejected=False): - """Get events from the database + def _persist_events(self, events_and_contexts, backfilled=False, + delete_existing=False): + """Persist events to db Args: - event_ids (list): The event_ids of the events to fetch - check_redacted (bool): If True, check if event has been redacted - and redact it. - get_prev_content (bool): If True and event is a state event, - include the previous states content in the unsigned field. - allow_rejected (bool): If True return rejected events. + events_and_contexts (list[(EventBase, EventContext)]): + backfilled (bool): + delete_existing (bool): Returns: - Deferred : Dict from event_id to event. + Deferred: resolves when the events have been persisted """ - events = yield self._get_events( - event_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) + if not events_and_contexts: + return - defer.returnValue({e.event_id: e for e in events}) + if backfilled: + stream_ordering_manager = self._backfill_id_gen.get_next_mult( + len(events_and_contexts) + ) + else: + stream_ordering_manager = self._stream_id_gen.get_next_mult( + len(events_and_contexts) + ) - @log_function - def _persist_events_txn(self, txn, events_and_contexts, backfilled, - delete_existing=False, state_delta_for_room={}, - new_forward_extremeties={}): - """Insert some number of room events into the necessary database tables. + with stream_ordering_manager as stream_orderings: + for (event, context), stream, in zip( + events_and_contexts, stream_orderings + ): + event.internal_metadata.stream_ordering = stream - Rejected events are only inserted into the events table, the events_json table, - and the rejections table. Things reading from those table will need to check - whether the event was rejected. + chunks = [ + events_and_contexts[x:x + 100] + for x in xrange(0, len(events_and_contexts), 100) + ] - Args: - txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): - events to persist - backfilled (bool): True if the events were backfilled - delete_existing (bool): True to purge existing table rows for the - events from the database. This is useful when retrying due to - IntegrityError. - state_delta_for_room (dict[str, (list[str], list[str])]): - The current-state delta for each room. For each room, a tuple - (to_delete, to_insert), being a list of event ids to be removed - from the current state, and a list of event ids to be added to - the current state. - new_forward_extremeties (dict[str, list[str]]): - The new forward extremities for each room. For each room, a - list of the event ids which are the forward extremities. - - """ - max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering - - self._update_current_state_txn(txn, state_delta_for_room, max_stream_order) - - self._update_forward_extremities_txn( - txn, - new_forward_extremities=new_forward_extremeties, - max_stream_order=max_stream_order, - ) - - # Ensure that we don't have the same event twice. - events_and_contexts = self._filter_events_and_contexts_for_duplicates( - events_and_contexts, - ) - - self._update_room_depths_txn( - txn, - events_and_contexts=events_and_contexts, - backfilled=backfilled, - ) - - # _update_outliers_txn filters out any events which have already been - # persisted, and returns the filtered list. - events_and_contexts = self._update_outliers_txn( - txn, - events_and_contexts=events_and_contexts, - ) - - # From this point onwards the events are only events that we haven't - # seen before. + for chunk in chunks: + # We can't easily parallelize these since different chunks + # might contain the same event. :( - if delete_existing: - # For paranoia reasons, we go and delete all the existing entries - # for these events so we can reinsert them. - # This gets around any problems with some tables already having - # entries. - self._delete_existing_rows_txn( - txn, - events_and_contexts=events_and_contexts, - ) + # NB: Assumes that we are only persisting events for one room + # at a time. - self._store_event_txn( - txn, - events_and_contexts=events_and_contexts, - ) + # map room_id->list[event_ids] giving the new forward + # extremities in each room + new_forward_extremeties = {} - # Insert into event_to_state_groups. - self._store_event_state_mappings_txn(txn, events_and_contexts) + # map room_id->(type,state_key)->event_id tracking the full + # state in each room after adding these events + current_state_for_room = {} - # _store_rejected_events_txn filters out any events which were - # rejected, and returns the filtered list. - events_and_contexts = self._store_rejected_events_txn( - txn, - events_and_contexts=events_and_contexts, - ) + # map room_id->(to_delete, to_insert) where each entry is + # a map (type,key)->event_id giving the state delta in each + # room + state_delta_for_room = {} - # From this point onwards the events are only ones that weren't - # rejected. + if not backfilled: + with Measure(self._clock, "_calculate_state_and_extrem"): + # Work out the new "current state" for each room. + # We do this by working out what the new extremities are and then + # calculating the state from that. + events_by_room = {} + for event, context in chunk: + events_by_room.setdefault(event.room_id, []).append( + (event, context) + ) - self._update_metadata_tables_txn( - txn, - events_and_contexts=events_and_contexts, - backfilled=backfilled, - ) + for room_id, ev_ctx_rm in events_by_room.iteritems(): + # Work out new extremities by recursively adding and removing + # the new events. + latest_event_ids = yield self.get_latest_event_ids_in_room( + room_id + ) + new_latest_event_ids = yield self._calculate_new_extremeties( + room_id, ev_ctx_rm, latest_event_ids + ) - def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): - for room_id, current_state_tuple in state_delta_by_room.iteritems(): - to_delete, to_insert = current_state_tuple - txn.executemany( - "DELETE FROM current_state_events WHERE event_id = ?", - [(ev_id,) for ev_id in to_delete.itervalues()], - ) + if new_latest_event_ids == set(latest_event_ids): + # No change in extremities, so no change in state + continue - self._simple_insert_many_txn( - txn, - table="current_state_events", - values=[ - { - "event_id": ev_id, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - } - for key, ev_id in to_insert.iteritems() - ], - ) + new_forward_extremeties[room_id] = new_latest_event_ids - state_deltas = {key: None for key in to_delete} - state_deltas.update(to_insert) + len_1 = ( + len(latest_event_ids) == 1 + and len(new_latest_event_ids) == 1 + ) + if len_1: + all_single_prev_not_state = all( + len(event.prev_events) == 1 + and not event.is_state() + for event, ctx in ev_ctx_rm + ) + # Don't bother calculating state if they're just + # a long chain of single ancestor non-state events. + if all_single_prev_not_state: + continue - self._simple_insert_many_txn( - txn, - table="current_state_delta_stream", - values=[ - { - "stream_id": max_stream_order, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - "event_id": ev_id, - "prev_event_id": to_delete.get(key, None), - } - for key, ev_id in state_deltas.iteritems() - ] - ) + logger.info( + "Calculating state delta for room %s", room_id, + ) + current_state = yield self._get_new_state_after_events( + room_id, + ev_ctx_rm, new_latest_event_ids, + ) + if current_state is not None: + current_state_for_room[room_id] = current_state + delta = yield self._calculate_state_delta( + room_id, current_state, + ) + if delta is not None: + state_delta_for_room[room_id] = delta - self._curr_state_delta_stream_cache.entity_has_changed( - room_id, max_stream_order, + yield self.runInteraction( + "persist_events", + self._persist_events_txn, + events_and_contexts=chunk, + backfilled=backfilled, + delete_existing=delete_existing, + state_delta_for_room=state_delta_for_room, + new_forward_extremeties=new_forward_extremeties, ) + persist_event_counter.inc_by(len(chunk)) + for event, context in chunk: + if context.app_service: + origin_type = "local" + origin_entity = context.app_service.id + elif self.hs.is_mine_id(event.sender): + origin_type = "local" + origin_entity = "*client*" + else: + origin_type = "remote" + origin_entity = get_domain_from_id(event.sender) - # Invalidate the various caches - - # Figure out the changes of membership to invalidate the - # `get_rooms_for_user` cache. - # We find out which membership events we may have deleted - # and which we have added, then we invlidate the caches for all - # those users. - members_changed = set( - state_key for ev_type, state_key in state_deltas - if ev_type == EventTypes.Member - ) + event_counter.inc(event.type, origin_type, origin_entity) - for member in members_changed: - self._invalidate_cache_and_stream( - txn, self.get_rooms_for_user, (member,) + for room_id, new_state in current_state_for_room.iteritems(): + self.get_current_state_ids.prefill( + (room_id, ), new_state ) - for host in set(get_domain_from_id(u) for u in members_changed): - self._invalidate_cache_and_stream( - txn, self.is_host_joined, (room_id, host) - ) - self._invalidate_cache_and_stream( - txn, self.was_host_joined, (room_id, host) + for room_id, latest_event_ids in new_forward_extremeties.iteritems(): + self.get_latest_event_ids_in_room.prefill( + (room_id,), list(latest_event_ids) ) - self._invalidate_cache_and_stream( - txn, self.get_users_in_room, (room_id,) - ) - - self._invalidate_cache_and_stream( - txn, self.get_current_state_ids, (room_id,) - ) - - def _update_forward_extremities_txn(self, txn, new_forward_extremities, - max_stream_order): - for room_id, new_extrem in new_forward_extremities.iteritems(): - self._simple_delete_txn( - txn, - table="event_forward_extremities", - keyvalues={"room_id": room_id}, - ) - txn.call_after( - self.get_latest_event_ids_in_room.invalidate, (room_id,) - ) - - self._simple_insert_many_txn( - txn, - table="event_forward_extremities", - values=[ - { - "event_id": ev_id, - "room_id": room_id, - } - for room_id, new_extrem in new_forward_extremities.iteritems() - for ev_id in new_extrem - ], - ) - # We now insert into stream_ordering_to_exterm a mapping from room_id, - # new stream_ordering to new forward extremeties in the room. - # This allows us to later efficiently look up the forward extremeties - # for a room before a given stream_ordering - self._simple_insert_many_txn( - txn, - table="stream_ordering_to_exterm", - values=[ - { - "room_id": room_id, - "event_id": event_id, - "stream_ordering": max_stream_order, - } - for room_id, new_extrem in new_forward_extremities.iteritems() - for event_id in new_extrem - ] - ) - - @classmethod - def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts): - """Ensure that we don't have the same event twice. - - Pick the earliest non-outlier if there is one, else the earliest one. - - Args: - events_and_contexts (list[(EventBase, EventContext)]): - Returns: - list[(EventBase, EventContext)]: filtered list - """ - new_events_and_contexts = OrderedDict() - for event, context in events_and_contexts: - prev_event_context = new_events_and_contexts.get(event.event_id) - if prev_event_context: - if not event.internal_metadata.is_outlier(): - if prev_event_context[0].internal_metadata.is_outlier(): - # To ensure correct ordering we pop, as OrderedDict is - # ordered by first insertion. - new_events_and_contexts.pop(event.event_id, None) - new_events_and_contexts[event.event_id] = (event, context) - else: - new_events_and_contexts[event.event_id] = (event, context) - return new_events_and_contexts.values() - - def _update_room_depths_txn(self, txn, events_and_contexts, backfilled): - """Update min_depth for each room - - Args: - txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events - we are persisting - backfilled (bool): True if the events were backfilled - """ - depth_updates = {} - for event, context in events_and_contexts: - # Remove the any existing cache entries for the event_ids - txn.call_after(self._invalidate_get_event_cache, event.event_id) - if not backfilled: - txn.call_after( - self._events_stream_cache.entity_has_changed, - event.room_id, event.internal_metadata.stream_ordering, - ) - - if not event.internal_metadata.is_outlier() and not context.rejected: - depth_updates[event.room_id] = max( - event.depth, depth_updates.get(event.room_id, event.depth) - ) - - for room_id, depth in depth_updates.iteritems(): - self._update_min_depth_for_room_txn(txn, room_id, depth) - - def _update_outliers_txn(self, txn, events_and_contexts): - """Update any outliers with new event info. - - This turns outliers into ex-outliers (unless the new event was - rejected). - - Args: - txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events - we are persisting + @defer.inlineCallbacks + def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): + """Calculates the new forward extremeties for a room given events to + persist. - Returns: - list[(EventBase, EventContext)] new list, without events which - are already in the events table. + Assumes that we are only persisting events for one room at a time. """ - txn.execute( - "SELECT event_id, outlier FROM events WHERE event_id in (%s)" % ( - ",".join(["?"] * len(events_and_contexts)), - ), - [event.event_id for event, _ in events_and_contexts] + new_latest_event_ids = set(latest_event_ids) + # First, add all the new events to the list + new_latest_event_ids.update( + event.event_id for event, ctx in event_contexts + if not event.internal_metadata.is_outlier() and not ctx.rejected + ) + # Now remove all events that are referenced by the to-be-added events + new_latest_event_ids.difference_update( + e_id + for event, ctx in event_contexts + for e_id, _ in event.prev_events + if not event.internal_metadata.is_outlier() and not ctx.rejected ) - have_persisted = { - event_id: outlier - for event_id, outlier in txn - } + # And finally remove any events that are referenced by previously added + # events. + rows = yield self._simple_select_many_batch( + table="event_edges", + column="prev_event_id", + iterable=list(new_latest_event_ids), + retcols=["prev_event_id"], + keyvalues={ + "room_id": room_id, + "is_state": False, + }, + desc="_calculate_new_extremeties", + ) - to_remove = set() - for event, context in events_and_contexts: - if event.event_id not in have_persisted: - continue + new_latest_event_ids.difference_update( + row["prev_event_id"] for row in rows + ) - to_remove.add(event) + defer.returnValue(new_latest_event_ids) - if context.rejected: - # If the event is rejected then we don't care if the event - # was an outlier or not. - continue + @defer.inlineCallbacks + def _get_new_state_after_events(self, room_id, events_context, new_latest_event_ids): + """Calculate the current state dict after adding some new events to + a room - outlier_persisted = have_persisted[event.event_id] - if not event.internal_metadata.is_outlier() and outlier_persisted: - # We received a copy of an event that we had already stored as - # an outlier in the database. We now have some state at that - # so we need to update the state_groups table with that state. + Args: + room_id (str): + room to which the events are being added. Used for logging etc - # insert into event_to_state_groups. - try: - self._store_event_state_mappings_txn(txn, ((event, context),)) - except Exception: - logger.exception("") - raise + events_context (list[(EventBase, EventContext)]): + events and contexts which are being added to the room - metadata_json = encode_json( - event.internal_metadata.get_dict() - ).decode("UTF-8") + new_latest_event_ids (iterable[str]): + the new forward extremities for the room. - sql = ( - "UPDATE event_json SET internal_metadata = ?" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (metadata_json, event.event_id,) - ) + Returns: + Deferred[dict[(str,str), str]|None]: + None if there are no changes to the room state, or + a dict of (type, state_key) -> event_id]. + """ - # Add an entry to the ex_outlier_stream table to replicate the - # change in outlier status to our workers. - stream_order = event.internal_metadata.stream_ordering - state_group_id = context.state_group - self._simple_insert_txn( - txn, - table="ex_outlier_stream", - values={ - "event_stream_ordering": stream_order, - "event_id": event.event_id, - "state_group": state_group_id, - } - ) + if not new_latest_event_ids: + defer.returnValue({}) - sql = ( - "UPDATE events SET outlier = ?" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (False, event.event_id,) - ) + # map from state_group to ((type, key) -> event_id) state map + state_groups = {} + missing_event_ids = [] + was_updated = False + for event_id in new_latest_event_ids: + # First search in the list of new events we're adding, + # and then use the current state from that + for ev, ctx in events_context: + if event_id == ev.event_id: + if ctx.current_state_ids is None: + raise Exception("Unknown current state") - # Update the event_backward_extremities table now that this - # event isn't an outlier any more. - self._update_backward_extremeties(txn, [event]) + if ctx.state_group is None: + # I don't think this can happen, but let's double-check + raise Exception( + "Context for new extremity event %s has no state " + "group" % (event_id, ), + ) - return [ - ec for ec in events_and_contexts if ec[0] not in to_remove - ] + # If we've already seen the state group don't bother adding + # it to the state sets again + if ctx.state_group not in state_groups: + state_groups[ctx.state_group] = ctx.current_state_ids + if ctx.delta_ids or hasattr(ev, "state_key"): + was_updated = True + break + else: + # If we couldn't find it, then we'll need to pull + # the state from the database + was_updated = True + missing_event_ids.append(event_id) - @classmethod - def _delete_existing_rows_txn(cls, txn, events_and_contexts): - if not events_and_contexts: - # nothing to do here + if not was_updated: return - logger.info("Deleting existing") - - for table in ( - "events", - "event_auth", - "event_json", - "event_content_hashes", - "event_destinations", - "event_edge_hashes", - "event_edges", - "event_forward_extremities", - "event_push_actions", - "event_reference_hashes", - "event_search", - "event_signatures", - "event_to_state_groups", - "guest_access", - "history_visibility", - "local_invites", - "room_names", - "state_events", - "rejections", - "redactions", - "room_memberships", - "topics" - ): - txn.executemany( - "DELETE FROM %s WHERE event_id = ?" % (table,), - [(ev.event_id,) for ev, _ in events_and_contexts] + if missing_event_ids: + # Now pull out the state for any missing events from DB + event_to_groups = yield self._get_state_group_for_events( + missing_event_ids, ) - def _store_event_txn(self, txn, events_and_contexts): - """Insert new events into the event and event_json tables - - Args: - txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events - we are persisting - """ + groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys()) - if not events_and_contexts: - # nothing to do here - return + if groups: + group_to_state = yield self._get_state_for_groups(groups) + state_groups.update(group_to_state) - def event_dict(event): - d = event.get_dict() - d.pop("redacted", None) - d.pop("redacted_because", None) - return d + if len(state_groups) == 1: + # If there is only one state group, then we know what the current + # state is. + defer.returnValue(state_groups.values()[0]) - self._simple_insert_many_txn( - txn, - table="event_json", - values=[ - { - "event_id": event.event_id, - "room_id": event.room_id, - "internal_metadata": encode_json( - event.internal_metadata.get_dict() - ).decode("UTF-8"), - "json": encode_json(event_dict(event)).decode("UTF-8"), - } - for event, _ in events_and_contexts - ], + def get_events(ev_ids): + return self.get_events( + ev_ids, get_prev_content=False, check_redacted=False, + ) + events_map = {ev.event_id: ev for ev, _ in events_context} + logger.debug("calling resolve_state_groups from preserve_events") + res = yield self._state_resolution_handler.resolve_state_groups( + room_id, state_groups, events_map, get_events ) - self._simple_insert_many_txn( - txn, - table="events", - values=[ - { - "stream_ordering": event.internal_metadata.stream_ordering, - "topological_ordering": event.depth, - "depth": event.depth, - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "processed": True, - "outlier": event.internal_metadata.is_outlier(), - "content": encode_json(event.content).decode("UTF-8"), - "origin_server_ts": int(event.origin_server_ts), - "received_ts": self._clock.time_msec(), - "sender": event.sender, - "contains_url": ( - "url" in event.content - and isinstance(event.content["url"], basestring) - ), - } - for event, _ in events_and_contexts - ], - ) + defer.returnValue(res.state) + + @defer.inlineCallbacks + def _calculate_state_delta(self, room_id, current_state): + """Calculate the new state deltas for a room. + + Assumes that we are only persisting events for one room at a time. + + Returns: + 2-tuple (to_delete, to_insert) where both are state dicts, + i.e. (type, state_key) -> event_id. `to_delete` are the entries to + first be deleted from current_state_events, `to_insert` are entries + to insert. + """ + existing_state = yield self.get_current_state_ids(room_id) + + existing_events = set(existing_state.itervalues()) + new_events = set(ev_id for ev_id in current_state.itervalues()) + changed_events = existing_events ^ new_events - def _store_rejected_events_txn(self, txn, events_and_contexts): - """Add rows to the 'rejections' table for received events which were - rejected + if not changed_events: + return - Args: - txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events - we are persisting + to_delete = { + key: ev_id for key, ev_id in existing_state.iteritems() + if ev_id in changed_events + } + events_to_insert = (new_events - existing_events) + to_insert = { + key: ev_id for key, ev_id in current_state.iteritems() + if ev_id in events_to_insert + } - Returns: - list[(EventBase, EventContext)] new list, without the rejected - events. - """ - # Remove the rejected events from the list now that we've added them - # to the events table and the events_json table. - to_remove = set() - for event, context in events_and_contexts: - if context.rejected: - # Insert the event_id into the rejections table - self._store_rejections_txn( - txn, event.event_id, context.rejected - ) - to_remove.add(event) + defer.returnValue((to_delete, to_insert)) - return [ - ec for ec in events_and_contexts if ec[0] not in to_remove - ] + @log_function + def _persist_events_txn(self, txn, events_and_contexts, backfilled, + delete_existing=False, state_delta_for_room={}, + new_forward_extremeties={}): + """Insert some number of room events into the necessary database tables. - def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled): - """Update all the miscellaneous tables for new events + Rejected events are only inserted into the events table, the events_json table, + and the rejections table. Things reading from those table will need to check + whether the event was rejected. Args: txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events - we are persisting + events_and_contexts (list[(EventBase, EventContext)]): + events to persist backfilled (bool): True if the events were backfilled + delete_existing (bool): True to purge existing table rows for the + events from the database. This is useful when retrying due to + IntegrityError. + state_delta_for_room (dict[str, (list[str], list[str])]): + The current-state delta for each room. For each room, a tuple + (to_delete, to_insert), being a list of event ids to be removed + from the current state, and a list of event ids to be added to + the current state. + new_forward_extremeties (dict[str, list[str]]): + The new forward extremities for each room. For each room, a + list of the event ids which are the forward extremities. + """ + max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering - if not events_and_contexts: - # nothing to do here - return + self._update_current_state_txn(txn, state_delta_for_room, max_stream_order) - for event, context in events_and_contexts: - # Insert all the push actions into the event_push_actions table. - self._set_push_actions_for_event_and_users_txn( - txn, event, - ) + self._update_forward_extremities_txn( + txn, + new_forward_extremities=new_forward_extremeties, + max_stream_order=max_stream_order, + ) - if event.type == EventTypes.Redaction and event.redacts is not None: - # Remove the entries in the event_push_actions table for the - # redacted event. - self._remove_push_actions_for_event_id_txn( - txn, event.room_id, event.redacts - ) + # Ensure that we don't have the same event twice. + events_and_contexts = self._filter_events_and_contexts_for_duplicates( + events_and_contexts, + ) - self._simple_insert_many_txn( + self._update_room_depths_txn( txn, - table="event_auth", - values=[ - { - "event_id": event.event_id, - "room_id": event.room_id, - "auth_id": auth_id, - } - for event, _ in events_and_contexts - for auth_id, _ in event.auth_events - if event.is_state() - ], + events_and_contexts=events_and_contexts, + backfilled=backfilled, ) - # Update the event_forward_extremities, event_backward_extremities and - # event_edges tables. - self._handle_mult_prev_events( + # _update_outliers_txn filters out any events which have already been + # persisted, and returns the filtered list. + events_and_contexts = self._update_outliers_txn( txn, - events=[event for event, _ in events_and_contexts], + events_and_contexts=events_and_contexts, ) - for event, _ in events_and_contexts: - if event.type == EventTypes.Name: - # Insert into the room_names and event_search tables. - self._store_room_name_txn(txn, event) - elif event.type == EventTypes.Topic: - # Insert into the topics table and event_search table. - self._store_room_topic_txn(txn, event) - elif event.type == EventTypes.Message: - # Insert into the event_search table. - self._store_room_message_txn(txn, event) - elif event.type == EventTypes.Redaction: - # Insert into the redactions table. - self._store_redaction(txn, event) - elif event.type == EventTypes.RoomHistoryVisibility: - # Insert into the event_search table. - self._store_history_visibility_txn(txn, event) - elif event.type == EventTypes.GuestAccess: - # Insert into the event_search table. - self._store_guest_access_txn(txn, event) + # From this point onwards the events are only events that we haven't + # seen before. - # Insert into the room_memberships table. - self._store_room_members_txn( + if delete_existing: + # For paranoia reasons, we go and delete all the existing entries + # for these events so we can reinsert them. + # This gets around any problems with some tables already having + # entries. + self._delete_existing_rows_txn( + txn, + events_and_contexts=events_and_contexts, + ) + + self._store_event_txn( txn, - [ - event - for event, _ in events_and_contexts - if event.type == EventTypes.Member - ], - backfilled=backfilled, + events_and_contexts=events_and_contexts, ) - # Insert event_reference_hashes table. - self._store_event_reference_hashes_txn( - txn, [event for event, _ in events_and_contexts] + # Insert into event_to_state_groups. + self._store_event_state_mappings_txn(txn, events_and_contexts) + + # _store_rejected_events_txn filters out any events which were + # rejected, and returns the filtered list. + events_and_contexts = self._store_rejected_events_txn( + txn, + events_and_contexts=events_and_contexts, ) - state_events_and_contexts = [ - ec for ec in events_and_contexts if ec[0].is_state() - ] + # From this point onwards the events are only ones that weren't + # rejected. - state_values = [] - for event, context in state_events_and_contexts: - vals = { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - } + self._update_metadata_tables_txn( + txn, + events_and_contexts=events_and_contexts, + backfilled=backfilled, + ) + + def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): + for room_id, current_state_tuple in state_delta_by_room.iteritems(): + to_delete, to_insert = current_state_tuple + txn.executemany( + "DELETE FROM current_state_events WHERE event_id = ?", + [(ev_id,) for ev_id in to_delete.itervalues()], + ) + + self._simple_insert_many_txn( + txn, + table="current_state_events", + values=[ + { + "event_id": ev_id, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + } + for key, ev_id in to_insert.iteritems() + ], + ) + + state_deltas = {key: None for key in to_delete} + state_deltas.update(to_insert) + + self._simple_insert_many_txn( + txn, + table="current_state_delta_stream", + values=[ + { + "stream_id": max_stream_order, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + "event_id": ev_id, + "prev_event_id": to_delete.get(key, None), + } + for key, ev_id in state_deltas.iteritems() + ] + ) + + self._curr_state_delta_stream_cache.entity_has_changed( + room_id, max_stream_order, + ) + + # Invalidate the various caches + + # Figure out the changes of membership to invalidate the + # `get_rooms_for_user` cache. + # We find out which membership events we may have deleted + # and which we have added, then we invlidate the caches for all + # those users. + members_changed = set( + state_key for ev_type, state_key in state_deltas + if ev_type == EventTypes.Member + ) + + for member in members_changed: + self._invalidate_cache_and_stream( + txn, self.get_rooms_for_user, (member,) + ) + + for host in set(get_domain_from_id(u) for u in members_changed): + self._invalidate_cache_and_stream( + txn, self.is_host_joined, (room_id, host) + ) + self._invalidate_cache_and_stream( + txn, self.was_host_joined, (room_id, host) + ) + + self._invalidate_cache_and_stream( + txn, self.get_users_in_room, (room_id,) + ) - # TODO: How does this work with backfilling? - if hasattr(event, "replaces_state"): - vals["prev_state"] = event.replaces_state + self._invalidate_cache_and_stream( + txn, self.get_current_state_ids, (room_id,) + ) - state_values.append(vals) + def _update_forward_extremities_txn(self, txn, new_forward_extremities, + max_stream_order): + for room_id, new_extrem in new_forward_extremities.iteritems(): + self._simple_delete_txn( + txn, + table="event_forward_extremities", + keyvalues={"room_id": room_id}, + ) + txn.call_after( + self.get_latest_event_ids_in_room.invalidate, (room_id,) + ) self._simple_insert_many_txn( txn, - table="state_events", - values=state_values, + table="event_forward_extremities", + values=[ + { + "event_id": ev_id, + "room_id": room_id, + } + for room_id, new_extrem in new_forward_extremities.iteritems() + for ev_id in new_extrem + ], ) - + # We now insert into stream_ordering_to_exterm a mapping from room_id, + # new stream_ordering to new forward extremeties in the room. + # This allows us to later efficiently look up the forward extremeties + # for a room before a given stream_ordering self._simple_insert_many_txn( txn, - table="event_edges", + table="stream_ordering_to_exterm", values=[ { - "event_id": event.event_id, - "prev_event_id": prev_id, - "room_id": event.room_id, - "is_state": True, + "room_id": room_id, + "event_id": event_id, + "stream_ordering": max_stream_order, } - for event, _ in state_events_and_contexts - for prev_id, _ in event.prev_state - ], + for room_id, new_extrem in new_forward_extremities.iteritems() + for event_id in new_extrem + ] ) - # Prefill the event cache - self._add_to_cache(txn, events_and_contexts) + @classmethod + def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts): + """Ensure that we don't have the same event twice. - def _add_to_cache(self, txn, events_and_contexts): - to_prefill = [] + Pick the earliest non-outlier if there is one, else the earliest one. - rows = [] - N = 200 - for i in range(0, len(events_and_contexts), N): - ev_map = { - e[0].event_id: e[0] - for e in events_and_contexts[i:i + N] - } - if not ev_map: - break + Args: + events_and_contexts (list[(EventBase, EventContext)]): + Returns: + list[(EventBase, EventContext)]: filtered list + """ + new_events_and_contexts = OrderedDict() + for event, context in events_and_contexts: + prev_event_context = new_events_and_contexts.get(event.event_id) + if prev_event_context: + if not event.internal_metadata.is_outlier(): + if prev_event_context[0].internal_metadata.is_outlier(): + # To ensure correct ordering we pop, as OrderedDict is + # ordered by first insertion. + new_events_and_contexts.pop(event.event_id, None) + new_events_and_contexts[event.event_id] = (event, context) + else: + new_events_and_contexts[event.event_id] = (event, context) + return new_events_and_contexts.values() - sql = ( - "SELECT " - " e.event_id as event_id, " - " r.redacts as redacts," - " rej.event_id as rejects " - " FROM events as e" - " LEFT JOIN rejections as rej USING (event_id)" - " LEFT JOIN redactions as r ON e.event_id = r.redacts" - " WHERE e.event_id IN (%s)" - ) % (",".join(["?"] * len(ev_map)),) + def _update_room_depths_txn(self, txn, events_and_contexts, backfilled): + """Update min_depth for each room - txn.execute(sql, ev_map.keys()) - rows = self.cursor_to_dict(txn) - for row in rows: - event = ev_map[row["event_id"]] - if not row["rejects"] and not row["redacts"]: - to_prefill.append(_EventCacheEntry( - event=event, - redacted_event=None, - )) + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + backfilled (bool): True if the events were backfilled + """ + depth_updates = {} + for event, context in events_and_contexts: + # Remove the any existing cache entries for the event_ids + txn.call_after(self._invalidate_get_event_cache, event.event_id) + if not backfilled: + txn.call_after( + self._events_stream_cache.entity_has_changed, + event.room_id, event.internal_metadata.stream_ordering, + ) - def prefill(): - for cache_entry in to_prefill: - self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry) - txn.call_after(prefill) + if not event.internal_metadata.is_outlier() and not context.rejected: + depth_updates[event.room_id] = max( + event.depth, depth_updates.get(event.room_id, event.depth) + ) - def _store_redaction(self, txn, event): - # invalidate the cache for the redacted event - txn.call_after(self._invalidate_get_event_cache, event.redacts) - txn.execute( - "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", - (event.event_id, event.redacts) - ) + for room_id, depth in depth_updates.iteritems(): + self._update_min_depth_for_room_txn(txn, room_id, depth) - @defer.inlineCallbacks - def have_events_in_timeline(self, event_ids): - """Given a list of event ids, check if we have already processed and - stored them as non outliers. - """ - rows = yield self._simple_select_many_batch( - table="events", - retcols=("event_id",), - column="event_id", - iterable=list(event_ids), - keyvalues={"outlier": False}, - desc="have_events_in_timeline", - ) + def _update_outliers_txn(self, txn, events_and_contexts): + """Update any outliers with new event info. - defer.returnValue(set(r["event_id"] for r in rows)) + This turns outliers into ex-outliers (unless the new event was + rejected). - def have_events(self, event_ids): - """Given a list of event ids, check if we have already processed them. + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting Returns: - dict: Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps to - None. + list[(EventBase, EventContext)] new list, without events which + are already in the events table. """ - if not event_ids: - return defer.succeed({}) + txn.execute( + "SELECT event_id, outlier FROM events WHERE event_id in (%s)" % ( + ",".join(["?"] * len(events_and_contexts)), + ), + [event.event_id for event, _ in events_and_contexts] + ) - def f(txn): - sql = ( - "SELECT e.event_id, reason FROM events as e " - "LEFT JOIN rejections as r ON e.event_id = r.event_id " - "WHERE e.event_id = ?" - ) + have_persisted = { + event_id: outlier + for event_id, outlier in txn + } - res = {} - for event_id in event_ids: - txn.execute(sql, (event_id,)) - row = txn.fetchone() - if row: - _, rejected = row - res[event_id] = rejected + to_remove = set() + for event, context in events_and_contexts: + if event.event_id not in have_persisted: + continue - return res + to_remove.add(event) - return self.runInteraction( - "have_events", f, - ) + if context.rejected: + # If the event is rejected then we don't care if the event + # was an outlier or not. + continue - @defer.inlineCallbacks - def _get_events(self, event_ids, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not event_ids: - defer.returnValue([]) + outlier_persisted = have_persisted[event.event_id] + if not event.internal_metadata.is_outlier() and outlier_persisted: + # We received a copy of an event that we had already stored as + # an outlier in the database. We now have some state at that + # so we need to update the state_groups table with that state. + + # insert into event_to_state_groups. + try: + self._store_event_state_mappings_txn(txn, ((event, context),)) + except Exception: + logger.exception("") + raise + + metadata_json = encode_json( + event.internal_metadata.get_dict() + ).decode("UTF-8") + + sql = ( + "UPDATE event_json SET internal_metadata = ?" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (metadata_json, event.event_id,) + ) + + # Add an entry to the ex_outlier_stream table to replicate the + # change in outlier status to our workers. + stream_order = event.internal_metadata.stream_ordering + state_group_id = context.state_group + self._simple_insert_txn( + txn, + table="ex_outlier_stream", + values={ + "event_stream_ordering": stream_order, + "event_id": event.event_id, + "state_group": state_group_id, + } + ) + + sql = ( + "UPDATE events SET outlier = ?" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (False, event.event_id,) + ) - event_id_list = event_ids - event_ids = set(event_ids) + # Update the event_backward_extremities table now that this + # event isn't an outlier any more. + self._update_backward_extremeties(txn, [event]) - event_entry_map = self._get_events_from_cache( - event_ids, - allow_rejected=allow_rejected, - ) + return [ + ec for ec in events_and_contexts if ec[0] not in to_remove + ] - missing_events_ids = [e for e in event_ids if e not in event_entry_map] + @classmethod + def _delete_existing_rows_txn(cls, txn, events_and_contexts): + if not events_and_contexts: + # nothing to do here + return - if missing_events_ids: - missing_events = yield self._enqueue_events( - missing_events_ids, - check_redacted=check_redacted, - allow_rejected=allow_rejected, - ) + logger.info("Deleting existing") - event_entry_map.update(missing_events) + for table in ( + "events", + "event_auth", + "event_json", + "event_content_hashes", + "event_destinations", + "event_edge_hashes", + "event_edges", + "event_forward_extremities", + "event_push_actions", + "event_reference_hashes", + "event_search", + "event_signatures", + "event_to_state_groups", + "guest_access", + "history_visibility", + "local_invites", + "room_names", + "state_events", + "rejections", + "redactions", + "room_memberships", + "topics" + ): + txn.executemany( + "DELETE FROM %s WHERE event_id = ?" % (table,), + [(ev.event_id,) for ev, _ in events_and_contexts] + ) - events = [] - for event_id in event_id_list: - entry = event_entry_map.get(event_id, None) - if not entry: - continue + def _store_event_txn(self, txn, events_and_contexts): + """Insert new events into the event and event_json tables - if allow_rejected or not entry.event.rejected_reason: - if check_redacted and entry.redacted_event: - event = entry.redacted_event - else: - event = entry.event + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + """ - events.append(event) + if not events_and_contexts: + # nothing to do here + return - if get_prev_content: - if "replaces_state" in event.unsigned: - prev = yield self.get_event( - event.unsigned["replaces_state"], - get_prev_content=False, - allow_none=True, - ) - if prev: - event.unsigned = dict(event.unsigned) - event.unsigned["prev_content"] = prev.content - event.unsigned["prev_sender"] = prev.sender + def event_dict(event): + d = event.get_dict() + d.pop("redacted", None) + d.pop("redacted_because", None) + return d - defer.returnValue(events) + self._simple_insert_many_txn( + txn, + table="event_json", + values=[ + { + "event_id": event.event_id, + "room_id": event.room_id, + "internal_metadata": encode_json( + event.internal_metadata.get_dict() + ).decode("UTF-8"), + "json": encode_json(event_dict(event)).decode("UTF-8"), + } + for event, _ in events_and_contexts + ], + ) - def _invalidate_get_event_cache(self, event_id): - self._get_event_cache.invalidate((event_id,)) + self._simple_insert_many_txn( + txn, + table="events", + values=[ + { + "stream_ordering": event.internal_metadata.stream_ordering, + "topological_ordering": event.depth, + "depth": event.depth, + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "processed": True, + "outlier": event.internal_metadata.is_outlier(), + "content": encode_json(event.content).decode("UTF-8"), + "origin_server_ts": int(event.origin_server_ts), + "received_ts": self._clock.time_msec(), + "sender": event.sender, + "contains_url": ( + "url" in event.content + and isinstance(event.content["url"], basestring) + ), + } + for event, _ in events_and_contexts + ], + ) - def _get_events_from_cache(self, events, allow_rejected, update_metrics=True): - """Fetch events from the caches + def _store_rejected_events_txn(self, txn, events_and_contexts): + """Add rows to the 'rejections' table for received events which were + rejected Args: - events (list(str)): list of event_ids to fetch - allow_rejected (bool): Whether to teturn events that were rejected - update_metrics (bool): Whether to update the cache hit ratio metrics + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting Returns: - dict of event_id -> _EventCacheEntry for each event_id in cache. If - allow_rejected is `False` then there will still be an entry but it - will be `None` + list[(EventBase, EventContext)] new list, without the rejected + events. """ - event_map = {} - - for event_id in events: - ret = self._get_event_cache.get( - (event_id,), None, - update_metrics=update_metrics, - ) - if not ret: - continue + # Remove the rejected events from the list now that we've added them + # to the events table and the events_json table. + to_remove = set() + for event, context in events_and_contexts: + if context.rejected: + # Insert the event_id into the rejections table + self._store_rejections_txn( + txn, event.event_id, context.rejected + ) + to_remove.add(event) - if allow_rejected or not ret.event.rejected_reason: - event_map[event_id] = ret - else: - event_map[event_id] = None + return [ + ec for ec in events_and_contexts if ec[0] not in to_remove + ] - return event_map + def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled): + """Update all the miscellaneous tables for new events - def _do_fetch(self, conn): - """Takes a database connection and waits for requests for events from - the _event_fetch_list queue. + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + backfilled (bool): True if the events were backfilled """ - event_list = [] - i = 0 - while True: - try: - with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] - if not event_list: - single_threaded = self.database_engine.single_threaded - if single_threaded or i > EVENT_QUEUE_ITERATIONS: - self._event_fetch_ongoing -= 1 - return - else: - self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) - i += 1 - continue - i = 0 + if not events_and_contexts: + # nothing to do here + return - event_id_lists = zip(*event_list)[0] - event_ids = [ - item for sublist in event_id_lists for item in sublist - ] + for event, context in events_and_contexts: + # Insert all the push actions into the event_push_actions table. + self._set_push_actions_for_event_and_users_txn( + txn, event, + ) - rows = self._new_transaction( - conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids + if event.type == EventTypes.Redaction and event.redacts is not None: + # Remove the entries in the event_push_actions table for the + # redacted event. + self._remove_push_actions_for_event_id_txn( + txn, event.room_id, event.redacts ) - row_dict = { - r["event_id"]: r - for r in rows + self._simple_insert_many_txn( + txn, + table="event_auth", + values=[ + { + "event_id": event.event_id, + "room_id": event.room_id, + "auth_id": auth_id, } + for event, _ in events_and_contexts + for auth_id, _ in event.auth_events + if event.is_state() + ], + ) - # We only want to resolve deferreds from the main thread - def fire(lst, res): - for ids, d in lst: - if not d.called: - try: - with PreserveLoggingContext(): - d.callback([ - res[i] - for i in ids - if i in res - ]) - except Exception: - logger.exception("Failed to callback") - with PreserveLoggingContext(): - reactor.callFromThread(fire, event_list, row_dict) - except Exception as e: - logger.exception("do_fetch") - - # We only want to resolve deferreds from the main thread - def fire(evs): - for _, d in evs: - if not d.called: - with PreserveLoggingContext(): - d.errback(e) + # Update the event_forward_extremities, event_backward_extremities and + # event_edges tables. + self._handle_mult_prev_events( + txn, + events=[event for event, _ in events_and_contexts], + ) - if event_list: - with PreserveLoggingContext(): - reactor.callFromThread(fire, event_list) + for event, _ in events_and_contexts: + if event.type == EventTypes.Name: + # Insert into the room_names and event_search tables. + self._store_room_name_txn(txn, event) + elif event.type == EventTypes.Topic: + # Insert into the topics table and event_search table. + self._store_room_topic_txn(txn, event) + elif event.type == EventTypes.Message: + # Insert into the event_search table. + self._store_room_message_txn(txn, event) + elif event.type == EventTypes.Redaction: + # Insert into the redactions table. + self._store_redaction(txn, event) + elif event.type == EventTypes.RoomHistoryVisibility: + # Insert into the event_search table. + self._store_history_visibility_txn(txn, event) + elif event.type == EventTypes.GuestAccess: + # Insert into the event_search table. + self._store_guest_access_txn(txn, event) - @defer.inlineCallbacks - def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): - """Fetches events from the database using the _event_fetch_list. This - allows batch and bulk fetching of events - it allows us to fetch events - without having to create a new transaction for each request for events. - """ - if not events: - defer.returnValue({}) + # Insert into the room_memberships table. + self._store_room_members_txn( + txn, + [ + event + for event, _ in events_and_contexts + if event.type == EventTypes.Member + ], + backfilled=backfilled, + ) - events_d = defer.Deferred() - with self._event_fetch_lock: - self._event_fetch_list.append( - (events, events_d) - ) + # Insert event_reference_hashes table. + self._store_event_reference_hashes_txn( + txn, [event for event, _ in events_and_contexts] + ) - self._event_fetch_lock.notify() + state_events_and_contexts = [ + ec for ec in events_and_contexts if ec[0].is_state() + ] - if self._event_fetch_ongoing < EVENT_QUEUE_THREADS: - self._event_fetch_ongoing += 1 - should_start = True - else: - should_start = False + state_values = [] + for event, context in state_events_and_contexts: + vals = { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } - if should_start: - with PreserveLoggingContext(): - self.runWithConnection( - self._do_fetch - ) + # TODO: How does this work with backfilling? + if hasattr(event, "replaces_state"): + vals["prev_state"] = event.replaces_state - logger.debug("Loading %d events", len(events)) - with PreserveLoggingContext(): - rows = yield events_d - logger.debug("Loaded %d events (%d rows)", len(events), len(rows)) + state_values.append(vals) - if not allow_rejected: - rows[:] = [r for r in rows if not r["rejects"]] + self._simple_insert_many_txn( + txn, + table="state_events", + values=state_values, + ) - res = yield make_deferred_yieldable(defer.gatherResults( - [ - preserve_fn(self._get_event_from_row)( - row["internal_metadata"], row["json"], row["redacts"], - rejected_reason=row["rejects"], - ) - for row in rows + self._simple_insert_many_txn( + txn, + table="event_edges", + values=[ + { + "event_id": event.event_id, + "prev_event_id": prev_id, + "room_id": event.room_id, + "is_state": True, + } + for event, _ in state_events_and_contexts + for prev_id, _ in event.prev_state ], - consumeErrors=True - )) + ) - defer.returnValue({ - e.event.event_id: e - for e in res if e - }) + # Prefill the event cache + self._add_to_cache(txn, events_and_contexts) + + def _add_to_cache(self, txn, events_and_contexts): + to_prefill = [] - def _fetch_event_rows(self, txn, events): rows = [] N = 200 - for i in range(1 + len(events) / N): - evs = events[i * N:(i + 1) * N] - if not evs: + for i in range(0, len(events_and_contexts), N): + ev_map = { + e[0].event_id: e[0] + for e in events_and_contexts[i:i + N] + } + if not ev_map: break sql = ( "SELECT " " e.event_id as event_id, " - " e.internal_metadata," - " e.json," " r.redacts as redacts," " rej.event_id as rejects " - " FROM event_json as e" + " FROM events as e" " LEFT JOIN rejections as rej USING (event_id)" " LEFT JOIN redactions as r ON e.event_id = r.redacts" " WHERE e.event_id IN (%s)" - ) % (",".join(["?"] * len(evs)),) - - txn.execute(sql, evs) - rows.extend(self.cursor_to_dict(txn)) - - return rows - - @defer.inlineCallbacks - def _get_event_from_row(self, internal_metadata, js, redacted, - rejected_reason=None): - with Measure(self._clock, "_get_event_from_row"): - d = json.loads(js) - internal_metadata = json.loads(internal_metadata) + ) % (",".join(["?"] * len(ev_map)),) - if rejected_reason: - rejected_reason = yield self._simple_select_one_onecol( - table="rejections", - keyvalues={"event_id": rejected_reason}, - retcol="reason", - desc="_get_event_from_row_rejected_reason", - ) + txn.execute(sql, ev_map.keys()) + rows = self.cursor_to_dict(txn) + for row in rows: + event = ev_map[row["event_id"]] + if not row["rejects"] and not row["redacts"]: + to_prefill.append(_EventCacheEntry( + event=event, + redacted_event=None, + )) - original_ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, - ) + def prefill(): + for cache_entry in to_prefill: + self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry) + txn.call_after(prefill) - redacted_event = None - if redacted: - redacted_event = prune_event(original_ev) + def _store_redaction(self, txn, event): + # invalidate the cache for the redacted event + txn.call_after(self._invalidate_get_event_cache, event.redacts) + txn.execute( + "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", + (event.event_id, event.redacts) + ) - redaction_id = yield self._simple_select_one_onecol( - table="redactions", - keyvalues={"redacts": redacted_event.event_id}, - retcol="event_id", - desc="_get_event_from_row_redactions", - ) + @defer.inlineCallbacks + def have_events_in_timeline(self, event_ids): + """Given a list of event ids, check if we have already processed and + stored them as non outliers. + """ + rows = yield self._simple_select_many_batch( + table="events", + retcols=("event_id",), + column="event_id", + iterable=list(event_ids), + keyvalues={"outlier": False}, + desc="have_events_in_timeline", + ) - redacted_event.unsigned["redacted_by"] = redaction_id - # Get the redaction event. + defer.returnValue(set(r["event_id"] for r in rows)) - because = yield self.get_event( - redaction_id, - check_redacted=False, - allow_none=True, - ) + def have_events(self, event_ids): + """Given a list of event ids, check if we have already processed them. - if because: - # It's fine to do add the event directly, since get_pdu_json - # will serialise this field correctly - redacted_event.unsigned["redacted_because"] = because + Returns: + dict: Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps to + None. + """ + if not event_ids: + return defer.succeed({}) - cache_entry = _EventCacheEntry( - event=original_ev, - redacted_event=redacted_event, + def f(txn): + sql = ( + "SELECT e.event_id, reason FROM events as e " + "LEFT JOIN rejections as r ON e.event_id = r.event_id " + "WHERE e.event_id = ?" ) - self._get_event_cache.prefill((original_ev.event_id,), cache_entry) + res = {} + for event_id in event_ids: + txn.execute(sql, (event_id,)) + row = txn.fetchone() + if row: + _, rejected = row + res[event_id] = rejected - defer.returnValue(cache_entry) + return res + + return self.runInteraction( + "have_events", f, + ) @defer.inlineCallbacks def count_daily_messages(self): -- cgit 1.4.1 From 3bd760628bc0b178f6709b9ea3439a44ebcebab2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 23 Feb 2018 10:49:18 +0000 Subject: _event_persist_queue shouldn't be in worker store --- synapse/storage/events.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 681a33314d..32da81c47b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -200,10 +200,6 @@ def _retry_on_integrity_error(func): class EventsWorkerStore(SQLBaseStore): - def __init__(self, db_conn, hs): - super(EventsWorkerStore, self).__init__(db_conn, hs) - - self._event_persist_queue = _EventPeristenceQueue() @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, @@ -583,6 +579,10 @@ class EventsStore(EventsWorkerStore): psql_only=True, ) + + + self._event_persist_queue = _EventPeristenceQueue() + self._state_resolution_handler = hs.get_state_resolution_handler() def persist_events(self, events_and_contexts, backfilled=False): -- cgit 1.4.1 From 5d0f6658489287dc19ce9e089fc61b06208f3fc4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 23 Feb 2018 10:49:58 +0000 Subject: Remove redundant clock --- synapse/storage/events.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 32da81c47b..f6aa3612e1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -550,7 +550,6 @@ class EventsStore(EventsWorkerStore): def __init__(self, db_conn, hs): super(EventsStore, self).__init__(db_conn, hs) - self._clock = hs.get_clock() self.register_background_update_handler( self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts ) @@ -579,8 +578,6 @@ class EventsStore(EventsWorkerStore): psql_only=True, ) - - self._event_persist_queue = _EventPeristenceQueue() self._state_resolution_handler = hs.get_state_resolution_handler() -- cgit 1.4.1 From bf8a36e0805a1626d335f78ddb90ba25220bbfa1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 23 Feb 2018 10:52:10 +0000 Subject: Update copyright --- synapse/replication/slave/storage/events.py | 1 + synapse/storage/events.py | 1 + 2 files changed, 2 insertions(+) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 74a81a0a51..f35cba2899 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f6aa3612e1..84a6f6782a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. -- cgit 1.4.1 From eba93b05bfaa1e6cb5bd66621021f6fff750ab97 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 23 Feb 2018 11:01:21 +0000 Subject: Split EventsWorkerStore into separate file --- synapse/replication/slave/storage/events.py | 2 +- synapse/storage/events.py | 365 +------------------------ synapse/storage/events_worker.py | 395 ++++++++++++++++++++++++++++ 3 files changed, 401 insertions(+), 361 deletions(-) create mode 100644 synapse/storage/events_worker.py diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index f35cba2899..5edfacc9ec 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -19,7 +19,7 @@ from synapse.api.constants import EventTypes from synapse.storage import DataStore from synapse.storage.event_federation import EventFederationStore from synapse.storage.event_push_actions import EventPushActionsStore -from synapse.storage.events import EventsWorkerStore +from synapse.storage.events_worker import EventsWorkerStore from synapse.storage.roommember import RoomMemberStore from synapse.storage.state import StateGroupWorkerStore from synapse.storage.stream import StreamStore diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 84a6f6782a..99d6cca585 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -13,16 +13,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore -from twisted.internet import defer, reactor +from synapse.storage.events_worker import EventsWorkerStore -from synapse.events import FrozenEvent, USE_FROZEN_DICTS -from synapse.events.utils import prune_event +from twisted.internet import defer + +from synapse.events import USE_FROZEN_DICTS from synapse.util.async import ObservableDeferred from synapse.util.logcontext import ( - preserve_fn, PreserveLoggingContext, make_deferred_yieldable + PreserveLoggingContext, make_deferred_yieldable ) from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -62,16 +62,6 @@ def encode_json(json_object): return json.dumps(json_object, ensure_ascii=False) -# These values are used in the `enqueus_event` and `_do_fetch` methods to -# control how we batch/bulk fetch events from the database. -# The values are plucked out of thing air to make initial sync run faster -# on jki.re -# TODO: Make these configurable. -EVENT_QUEUE_THREADS = 3 # Max number of threads that will fetch events -EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events -EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events - - class _EventPeristenceQueue(object): """Queues up events so that they can be persisted in bulk with only one concurrent transaction per room. @@ -200,351 +190,6 @@ def _retry_on_integrity_error(func): return f -class EventsWorkerStore(SQLBaseStore): - - @defer.inlineCallbacks - def get_event(self, event_id, check_redacted=True, - get_prev_content=False, allow_rejected=False, - allow_none=False): - """Get an event from the database by event_id. - - Args: - event_id (str): The event_id of the event to fetch - check_redacted (bool): If True, check if event has been redacted - and redact it. - get_prev_content (bool): If True and event is a state event, - include the previous states content in the unsigned field. - allow_rejected (bool): If True return rejected events. - allow_none (bool): If True, return None if no event found, if - False throw an exception. - - Returns: - Deferred : A FrozenEvent. - """ - events = yield self._get_events( - [event_id], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - if not events and not allow_none: - raise SynapseError(404, "Could not find event %s" % (event_id,)) - - defer.returnValue(events[0] if events else None) - - @defer.inlineCallbacks - def get_events(self, event_ids, check_redacted=True, - get_prev_content=False, allow_rejected=False): - """Get events from the database - - Args: - event_ids (list): The event_ids of the events to fetch - check_redacted (bool): If True, check if event has been redacted - and redact it. - get_prev_content (bool): If True and event is a state event, - include the previous states content in the unsigned field. - allow_rejected (bool): If True return rejected events. - - Returns: - Deferred : Dict from event_id to event. - """ - events = yield self._get_events( - event_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - defer.returnValue({e.event_id: e for e in events}) - - @defer.inlineCallbacks - def _get_events(self, event_ids, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not event_ids: - defer.returnValue([]) - - event_id_list = event_ids - event_ids = set(event_ids) - - event_entry_map = self._get_events_from_cache( - event_ids, - allow_rejected=allow_rejected, - ) - - missing_events_ids = [e for e in event_ids if e not in event_entry_map] - - if missing_events_ids: - missing_events = yield self._enqueue_events( - missing_events_ids, - check_redacted=check_redacted, - allow_rejected=allow_rejected, - ) - - event_entry_map.update(missing_events) - - events = [] - for event_id in event_id_list: - entry = event_entry_map.get(event_id, None) - if not entry: - continue - - if allow_rejected or not entry.event.rejected_reason: - if check_redacted and entry.redacted_event: - event = entry.redacted_event - else: - event = entry.event - - events.append(event) - - if get_prev_content: - if "replaces_state" in event.unsigned: - prev = yield self.get_event( - event.unsigned["replaces_state"], - get_prev_content=False, - allow_none=True, - ) - if prev: - event.unsigned = dict(event.unsigned) - event.unsigned["prev_content"] = prev.content - event.unsigned["prev_sender"] = prev.sender - - defer.returnValue(events) - - def _invalidate_get_event_cache(self, event_id): - self._get_event_cache.invalidate((event_id,)) - - def _get_events_from_cache(self, events, allow_rejected, update_metrics=True): - """Fetch events from the caches - - Args: - events (list(str)): list of event_ids to fetch - allow_rejected (bool): Whether to teturn events that were rejected - update_metrics (bool): Whether to update the cache hit ratio metrics - - Returns: - dict of event_id -> _EventCacheEntry for each event_id in cache. If - allow_rejected is `False` then there will still be an entry but it - will be `None` - """ - event_map = {} - - for event_id in events: - ret = self._get_event_cache.get( - (event_id,), None, - update_metrics=update_metrics, - ) - if not ret: - continue - - if allow_rejected or not ret.event.rejected_reason: - event_map[event_id] = ret - else: - event_map[event_id] = None - - return event_map - - def _do_fetch(self, conn): - """Takes a database connection and waits for requests for events from - the _event_fetch_list queue. - """ - event_list = [] - i = 0 - while True: - try: - with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] - - if not event_list: - single_threaded = self.database_engine.single_threaded - if single_threaded or i > EVENT_QUEUE_ITERATIONS: - self._event_fetch_ongoing -= 1 - return - else: - self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) - i += 1 - continue - i = 0 - - event_id_lists = zip(*event_list)[0] - event_ids = [ - item for sublist in event_id_lists for item in sublist - ] - - rows = self._new_transaction( - conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids - ) - - row_dict = { - r["event_id"]: r - for r in rows - } - - # We only want to resolve deferreds from the main thread - def fire(lst, res): - for ids, d in lst: - if not d.called: - try: - with PreserveLoggingContext(): - d.callback([ - res[i] - for i in ids - if i in res - ]) - except Exception: - logger.exception("Failed to callback") - with PreserveLoggingContext(): - reactor.callFromThread(fire, event_list, row_dict) - except Exception as e: - logger.exception("do_fetch") - - # We only want to resolve deferreds from the main thread - def fire(evs): - for _, d in evs: - if not d.called: - with PreserveLoggingContext(): - d.errback(e) - - if event_list: - with PreserveLoggingContext(): - reactor.callFromThread(fire, event_list) - - @defer.inlineCallbacks - def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): - """Fetches events from the database using the _event_fetch_list. This - allows batch and bulk fetching of events - it allows us to fetch events - without having to create a new transaction for each request for events. - """ - if not events: - defer.returnValue({}) - - events_d = defer.Deferred() - with self._event_fetch_lock: - self._event_fetch_list.append( - (events, events_d) - ) - - self._event_fetch_lock.notify() - - if self._event_fetch_ongoing < EVENT_QUEUE_THREADS: - self._event_fetch_ongoing += 1 - should_start = True - else: - should_start = False - - if should_start: - with PreserveLoggingContext(): - self.runWithConnection( - self._do_fetch - ) - - logger.debug("Loading %d events", len(events)) - with PreserveLoggingContext(): - rows = yield events_d - logger.debug("Loaded %d events (%d rows)", len(events), len(rows)) - - if not allow_rejected: - rows[:] = [r for r in rows if not r["rejects"]] - - res = yield make_deferred_yieldable(defer.gatherResults( - [ - preserve_fn(self._get_event_from_row)( - row["internal_metadata"], row["json"], row["redacts"], - rejected_reason=row["rejects"], - ) - for row in rows - ], - consumeErrors=True - )) - - defer.returnValue({ - e.event.event_id: e - for e in res if e - }) - - def _fetch_event_rows(self, txn, events): - rows = [] - N = 200 - for i in range(1 + len(events) / N): - evs = events[i * N:(i + 1) * N] - if not evs: - break - - sql = ( - "SELECT " - " e.event_id as event_id, " - " e.internal_metadata," - " e.json," - " r.redacts as redacts," - " rej.event_id as rejects " - " FROM event_json as e" - " LEFT JOIN rejections as rej USING (event_id)" - " LEFT JOIN redactions as r ON e.event_id = r.redacts" - " WHERE e.event_id IN (%s)" - ) % (",".join(["?"] * len(evs)),) - - txn.execute(sql, evs) - rows.extend(self.cursor_to_dict(txn)) - - return rows - - @defer.inlineCallbacks - def _get_event_from_row(self, internal_metadata, js, redacted, - rejected_reason=None): - with Measure(self._clock, "_get_event_from_row"): - d = json.loads(js) - internal_metadata = json.loads(internal_metadata) - - if rejected_reason: - rejected_reason = yield self._simple_select_one_onecol( - table="rejections", - keyvalues={"event_id": rejected_reason}, - retcol="reason", - desc="_get_event_from_row_rejected_reason", - ) - - original_ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, - ) - - redacted_event = None - if redacted: - redacted_event = prune_event(original_ev) - - redaction_id = yield self._simple_select_one_onecol( - table="redactions", - keyvalues={"redacts": redacted_event.event_id}, - retcol="event_id", - desc="_get_event_from_row_redactions", - ) - - redacted_event.unsigned["redacted_by"] = redaction_id - # Get the redaction event. - - because = yield self.get_event( - redaction_id, - check_redacted=False, - allow_none=True, - ) - - if because: - # It's fine to do add the event directly, since get_pdu_json - # will serialise this field correctly - redacted_event.unsigned["redacted_because"] = because - - cache_entry = _EventCacheEntry( - event=original_ev, - redacted_event=redacted_event, - ) - - self._get_event_cache.prefill((original_ev.event_id,), cache_entry) - - defer.returnValue(cache_entry) - - class EventsStore(EventsWorkerStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py new file mode 100644 index 0000000000..86c3b48ad4 --- /dev/null +++ b/synapse/storage/events_worker.py @@ -0,0 +1,395 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ._base import SQLBaseStore + +from twisted.internet import defer, reactor + +from synapse.events import FrozenEvent +from synapse.events.utils import prune_event + +from synapse.util.logcontext import ( + preserve_fn, PreserveLoggingContext, make_deferred_yieldable +) +from synapse.util.metrics import Measure +from synapse.api.errors import SynapseError + +from collections import namedtuple + +import logging +import ujson as json + +# these are only included to make the type annotations work +from synapse.events import EventBase # noqa: F401 +from synapse.events.snapshot import EventContext # noqa: F401 + +logger = logging.getLogger(__name__) + + +# These values are used in the `enqueus_event` and `_do_fetch` methods to +# control how we batch/bulk fetch events from the database. +# The values are plucked out of thing air to make initial sync run faster +# on jki.re +# TODO: Make these configurable. +EVENT_QUEUE_THREADS = 3 # Max number of threads that will fetch events +EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events +EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events + + +_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) + + +class EventsWorkerStore(SQLBaseStore): + + @defer.inlineCallbacks + def get_event(self, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False, + allow_none=False): + """Get an event from the database by event_id. + + Args: + event_id (str): The event_id of the event to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + allow_none (bool): If True, return None if no event found, if + False throw an exception. + + Returns: + Deferred : A FrozenEvent. + """ + events = yield self._get_events( + [event_id], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + if not events and not allow_none: + raise SynapseError(404, "Could not find event %s" % (event_id,)) + + defer.returnValue(events[0] if events else None) + + @defer.inlineCallbacks + def get_events(self, event_ids, check_redacted=True, + get_prev_content=False, allow_rejected=False): + """Get events from the database + + Args: + event_ids (list): The event_ids of the events to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + + Returns: + Deferred : Dict from event_id to event. + """ + events = yield self._get_events( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + defer.returnValue({e.event_id: e for e in events}) + + @defer.inlineCallbacks + def _get_events(self, event_ids, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not event_ids: + defer.returnValue([]) + + event_id_list = event_ids + event_ids = set(event_ids) + + event_entry_map = self._get_events_from_cache( + event_ids, + allow_rejected=allow_rejected, + ) + + missing_events_ids = [e for e in event_ids if e not in event_entry_map] + + if missing_events_ids: + missing_events = yield self._enqueue_events( + missing_events_ids, + check_redacted=check_redacted, + allow_rejected=allow_rejected, + ) + + event_entry_map.update(missing_events) + + events = [] + for event_id in event_id_list: + entry = event_entry_map.get(event_id, None) + if not entry: + continue + + if allow_rejected or not entry.event.rejected_reason: + if check_redacted and entry.redacted_event: + event = entry.redacted_event + else: + event = entry.event + + events.append(event) + + if get_prev_content: + if "replaces_state" in event.unsigned: + prev = yield self.get_event( + event.unsigned["replaces_state"], + get_prev_content=False, + allow_none=True, + ) + if prev: + event.unsigned = dict(event.unsigned) + event.unsigned["prev_content"] = prev.content + event.unsigned["prev_sender"] = prev.sender + + defer.returnValue(events) + + def _invalidate_get_event_cache(self, event_id): + self._get_event_cache.invalidate((event_id,)) + + def _get_events_from_cache(self, events, allow_rejected, update_metrics=True): + """Fetch events from the caches + + Args: + events (list(str)): list of event_ids to fetch + allow_rejected (bool): Whether to teturn events that were rejected + update_metrics (bool): Whether to update the cache hit ratio metrics + + Returns: + dict of event_id -> _EventCacheEntry for each event_id in cache. If + allow_rejected is `False` then there will still be an entry but it + will be `None` + """ + event_map = {} + + for event_id in events: + ret = self._get_event_cache.get( + (event_id,), None, + update_metrics=update_metrics, + ) + if not ret: + continue + + if allow_rejected or not ret.event.rejected_reason: + event_map[event_id] = ret + else: + event_map[event_id] = None + + return event_map + + def _do_fetch(self, conn): + """Takes a database connection and waits for requests for events from + the _event_fetch_list queue. + """ + event_list = [] + i = 0 + while True: + try: + with self._event_fetch_lock: + event_list = self._event_fetch_list + self._event_fetch_list = [] + + if not event_list: + single_threaded = self.database_engine.single_threaded + if single_threaded or i > EVENT_QUEUE_ITERATIONS: + self._event_fetch_ongoing -= 1 + return + else: + self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) + i += 1 + continue + i = 0 + + event_id_lists = zip(*event_list)[0] + event_ids = [ + item for sublist in event_id_lists for item in sublist + ] + + rows = self._new_transaction( + conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids + ) + + row_dict = { + r["event_id"]: r + for r in rows + } + + # We only want to resolve deferreds from the main thread + def fire(lst, res): + for ids, d in lst: + if not d.called: + try: + with PreserveLoggingContext(): + d.callback([ + res[i] + for i in ids + if i in res + ]) + except Exception: + logger.exception("Failed to callback") + with PreserveLoggingContext(): + reactor.callFromThread(fire, event_list, row_dict) + except Exception as e: + logger.exception("do_fetch") + + # We only want to resolve deferreds from the main thread + def fire(evs): + for _, d in evs: + if not d.called: + with PreserveLoggingContext(): + d.errback(e) + + if event_list: + with PreserveLoggingContext(): + reactor.callFromThread(fire, event_list) + + @defer.inlineCallbacks + def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): + """Fetches events from the database using the _event_fetch_list. This + allows batch and bulk fetching of events - it allows us to fetch events + without having to create a new transaction for each request for events. + """ + if not events: + defer.returnValue({}) + + events_d = defer.Deferred() + with self._event_fetch_lock: + self._event_fetch_list.append( + (events, events_d) + ) + + self._event_fetch_lock.notify() + + if self._event_fetch_ongoing < EVENT_QUEUE_THREADS: + self._event_fetch_ongoing += 1 + should_start = True + else: + should_start = False + + if should_start: + with PreserveLoggingContext(): + self.runWithConnection( + self._do_fetch + ) + + logger.debug("Loading %d events", len(events)) + with PreserveLoggingContext(): + rows = yield events_d + logger.debug("Loaded %d events (%d rows)", len(events), len(rows)) + + if not allow_rejected: + rows[:] = [r for r in rows if not r["rejects"]] + + res = yield make_deferred_yieldable(defer.gatherResults( + [ + preserve_fn(self._get_event_from_row)( + row["internal_metadata"], row["json"], row["redacts"], + rejected_reason=row["rejects"], + ) + for row in rows + ], + consumeErrors=True + )) + + defer.returnValue({ + e.event.event_id: e + for e in res if e + }) + + def _fetch_event_rows(self, txn, events): + rows = [] + N = 200 + for i in range(1 + len(events) / N): + evs = events[i * N:(i + 1) * N] + if not evs: + break + + sql = ( + "SELECT " + " e.event_id as event_id, " + " e.internal_metadata," + " e.json," + " r.redacts as redacts," + " rej.event_id as rejects " + " FROM event_json as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE e.event_id IN (%s)" + ) % (",".join(["?"] * len(evs)),) + + txn.execute(sql, evs) + rows.extend(self.cursor_to_dict(txn)) + + return rows + + @defer.inlineCallbacks + def _get_event_from_row(self, internal_metadata, js, redacted, + rejected_reason=None): + with Measure(self._clock, "_get_event_from_row"): + d = json.loads(js) + internal_metadata = json.loads(internal_metadata) + + if rejected_reason: + rejected_reason = yield self._simple_select_one_onecol( + table="rejections", + keyvalues={"event_id": rejected_reason}, + retcol="reason", + desc="_get_event_from_row_rejected_reason", + ) + + original_ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) + + redacted_event = None + if redacted: + redacted_event = prune_event(original_ev) + + redaction_id = yield self._simple_select_one_onecol( + table="redactions", + keyvalues={"redacts": redacted_event.event_id}, + retcol="event_id", + desc="_get_event_from_row_redactions", + ) + + redacted_event.unsigned["redacted_by"] = redaction_id + # Get the redaction event. + + because = yield self.get_event( + redaction_id, + check_redacted=False, + allow_none=True, + ) + + if because: + # It's fine to do add the event directly, since get_pdu_json + # will serialise this field correctly + redacted_event.unsigned["redacted_because"] = because + + cache_entry = _EventCacheEntry( + event=original_ev, + redacted_event=redacted_event, + ) + + self._get_event_cache.prefill((original_ev.event_id,), cache_entry) + + defer.returnValue(cache_entry) -- cgit 1.4.1