diff options
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 291 |
1 files changed, 104 insertions, 187 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2b3f79577b..6d978ffcd5 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -27,6 +27,9 @@ from synapse.api.constants import EventTypes from canonicaljson import encode_canonical_json from collections import deque, namedtuple +import synapse +import synapse.metrics + import logging import math @@ -35,6 +38,10 @@ import ujson as json logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) +persist_event_counter = metrics.register_counter("persisted_events") + + def encode_json(json_object): if USE_FROZEN_DICTS: # ujson doesn't like frozen_dicts @@ -139,6 +146,9 @@ class _EventPeristenceQueue(object): pass +_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) + + class EventsStore(SQLBaseStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" @@ -258,6 +268,7 @@ class EventsStore(SQLBaseStore): events_and_contexts=chunk, backfilled=backfilled, ) + persist_event_counter.inc_by(len(chunk)) @defer.inlineCallbacks @log_function @@ -275,6 +286,7 @@ class EventsStore(SQLBaseStore): current_state=current_state, backfilled=backfilled, ) + persist_event_counter.inc() except _RollbackButIsFineException: pass @@ -632,6 +644,8 @@ class EventsStore(SQLBaseStore): ], ) + self._add_to_cache(txn, events_and_contexts) + if backfilled: # Backfilled events come before the current state so we don't need # to update the current state table @@ -673,6 +687,45 @@ class EventsStore(SQLBaseStore): return + def _add_to_cache(self, txn, events_and_contexts): + to_prefill = [] + + rows = [] + N = 200 + for i in range(0, len(events_and_contexts), N): + ev_map = { + e[0].event_id: e[0] + for e in events_and_contexts[i:i + N] + } + if not ev_map: + break + + sql = ( + "SELECT " + " e.event_id as event_id, " + " r.redacts as redacts," + " rej.event_id as rejects " + " FROM events as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE e.event_id IN (%s)" + ) % (",".join(["?"] * len(ev_map)),) + + txn.execute(sql, ev_map.keys()) + rows = self.cursor_to_dict(txn) + for row in rows: + event = ev_map[row["event_id"]] + if not row["rejects"] and not row["redacts"]: + to_prefill.append(_EventCacheEntry( + event=event, + redacted_event=None, + )) + + def prefill(): + for cache_entry in to_prefill: + self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry) + txn.call_after(prefill) + def _store_redaction(self, txn, event): # invalidate the cache for the redacted event txn.call_after(self._invalidate_get_event_cache, event.redacts) @@ -738,100 +791,65 @@ class EventsStore(SQLBaseStore): event_id_list = event_ids event_ids = set(event_ids) - event_map = self._get_events_from_cache( + event_entry_map = self._get_events_from_cache( event_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, allow_rejected=allow_rejected, ) - missing_events_ids = [e for e in event_ids if e not in event_map] + missing_events_ids = [e for e in event_ids if e not in event_entry_map] if missing_events_ids: missing_events = yield self._enqueue_events( missing_events_ids, check_redacted=check_redacted, - get_prev_content=get_prev_content, allow_rejected=allow_rejected, ) - event_map.update(missing_events) + event_entry_map.update(missing_events) - defer.returnValue([ - event_map[e_id] for e_id in event_id_list - if e_id in event_map and event_map[e_id] - ]) - - def _get_events_txn(self, txn, event_ids, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not event_ids: - return [] - - event_map = self._get_events_from_cache( - event_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - missing_events_ids = [e for e in event_ids if e not in event_map] + events = [] + for event_id in event_id_list: + entry = event_entry_map.get(event_id, None) + if not entry: + continue - if not missing_events_ids: - return [ - event_map[e_id] for e_id in event_ids - if e_id in event_map and event_map[e_id] - ] + if allow_rejected or not entry.event.rejected_reason: + if check_redacted and entry.redacted_event: + event = entry.redacted_event + else: + event = entry.event - missing_events = self._fetch_events_txn( - txn, - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) + events.append(event) - event_map.update(missing_events) + if get_prev_content: + if "replaces_state" in event.unsigned: + prev = yield self.get_event( + event.unsigned["replaces_state"], + get_prev_content=False, + allow_none=True, + ) + if prev: + event.unsigned = dict(event.unsigned) + event.unsigned["prev_content"] = prev.content + event.unsigned["prev_sender"] = prev.sender - return [ - event_map[e_id] for e_id in event_ids - if e_id in event_map and event_map[e_id] - ] + defer.returnValue(events) def _invalidate_get_event_cache(self, event_id): - for check_redacted in (False, True): - for get_prev_content in (False, True): - self._get_event_cache.invalidate( - (event_id, check_redacted, get_prev_content) - ) - - def _get_event_txn(self, txn, event_id, check_redacted=True, - get_prev_content=False, allow_rejected=False): - - events = self._get_events_txn( - txn, [event_id], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - return events[0] if events else None + self._get_event_cache.invalidate((event_id,)) - def _get_events_from_cache(self, events, check_redacted, get_prev_content, - allow_rejected): + def _get_events_from_cache(self, events, allow_rejected): event_map = {} for event_id in events: - try: - ret = self._get_event_cache.get( - (event_id, check_redacted, get_prev_content,) - ) + ret = self._get_event_cache.get((event_id,), None) + if not ret: + continue - if allow_rejected or not ret.rejected_reason: - event_map[event_id] = ret - else: - event_map[event_id] = None - except KeyError: - pass + if allow_rejected or not ret.event.rejected_reason: + event_map[event_id] = ret + else: + event_map[event_id] = None return event_map @@ -902,8 +920,7 @@ class EventsStore(SQLBaseStore): reactor.callFromThread(fire, event_list) @defer.inlineCallbacks - def _enqueue_events(self, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): + def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): """Fetches events from the database using the _event_fetch_list. This allows batch and bulk fetching of events - it allows us to fetch events without having to create a new transaction for each request for events. @@ -941,8 +958,6 @@ class EventsStore(SQLBaseStore): [ preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], - check_redacted=check_redacted, - get_prev_content=get_prev_content, rejected_reason=row["rejects"], ) for row in rows @@ -951,7 +966,7 @@ class EventsStore(SQLBaseStore): ) defer.returnValue({ - e.event_id: e + e.event.event_id: e for e in res if e }) @@ -981,37 +996,8 @@ class EventsStore(SQLBaseStore): return rows - def _fetch_events_txn(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not events: - return {} - - rows = self._fetch_event_rows( - txn, events, - ) - - if not allow_rejected: - rows[:] = [r for r in rows if not r["rejects"]] - - res = [ - self._get_event_from_row_txn( - txn, - row["internal_metadata"], row["json"], row["redacts"], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=row["rejects"], - ) - for row in rows - ] - - return { - r.event_id: r - for r in res - } - @defer.inlineCallbacks def _get_event_from_row(self, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False, rejected_reason=None): d = json.loads(js) internal_metadata = json.loads(internal_metadata) @@ -1021,26 +1007,27 @@ class EventsStore(SQLBaseStore): table="rejections", keyvalues={"event_id": rejected_reason}, retcol="reason", - desc="_get_event_from_row", + desc="_get_event_from_row_rejected_reason", ) - ev = FrozenEvent( + original_ev = FrozenEvent( d, internal_metadata_dict=internal_metadata, rejected_reason=rejected_reason, ) - if check_redacted and redacted: - ev = prune_event(ev) + redacted_event = None + if redacted: + redacted_event = prune_event(original_ev) redaction_id = yield self._simple_select_one_onecol( table="redactions", - keyvalues={"redacts": ev.event_id}, + keyvalues={"redacts": redacted_event.event_id}, retcol="event_id", - desc="_get_event_from_row", + desc="_get_event_from_row_redactions", ) - ev.unsigned["redacted_by"] = redaction_id + redacted_event.unsigned["redacted_by"] = redaction_id # Get the redaction event. because = yield self.get_event( @@ -1052,86 +1039,16 @@ class EventsStore(SQLBaseStore): if because: # It's fine to do add the event directly, since get_pdu_json # will serialise this field correctly - ev.unsigned["redacted_because"] = because - - if get_prev_content and "replaces_state" in ev.unsigned: - prev = yield self.get_event( - ev.unsigned["replaces_state"], - get_prev_content=False, - allow_none=True, - ) - if prev: - ev.unsigned["prev_content"] = prev.content - ev.unsigned["prev_sender"] = prev.sender - - self._get_event_cache.prefill( - (ev.event_id, check_redacted, get_prev_content), ev - ) - - defer.returnValue(ev) - - def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False, - rejected_reason=None): - d = json.loads(js) - internal_metadata = json.loads(internal_metadata) - - if rejected_reason: - rejected_reason = self._simple_select_one_onecol_txn( - txn, - table="rejections", - keyvalues={"event_id": rejected_reason}, - retcol="reason", - ) - - ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, - ) - - if check_redacted and redacted: - ev = prune_event(ev) - - redaction_id = self._simple_select_one_onecol_txn( - txn, - table="redactions", - keyvalues={"redacts": ev.event_id}, - retcol="event_id", - ) - - ev.unsigned["redacted_by"] = redaction_id - # Get the redaction event. - - because = self._get_event_txn( - txn, - redaction_id, - check_redacted=False - ) - - if because: - ev.unsigned["redacted_because"] = because + redacted_event.unsigned["redacted_because"] = because - if get_prev_content and "replaces_state" in ev.unsigned: - prev = self._get_event_txn( - txn, - ev.unsigned["replaces_state"], - get_prev_content=False, - ) - if prev: - ev.unsigned["prev_content"] = prev.content - ev.unsigned["prev_sender"] = prev.sender - - self._get_event_cache.prefill( - (ev.event_id, check_redacted, get_prev_content), ev + cache_entry = _EventCacheEntry( + event=original_ev, + redacted_event=redacted_event, ) - return ev - - def _parse_events_txn(self, txn, rows): - event_ids = [r["event_id"] for r in rows] + self._get_event_cache.prefill((original_ev.event_id,), cache_entry) - return self._get_events_txn(txn, event_ids) + defer.returnValue(cache_entry) @defer.inlineCallbacks def count_daily_messages(self): |