From 10f1bdb9a27abe34c8b022c32209a4abfc8cb443 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2015 10:21:40 +0100 Subject: Move get_events functions to storage.events --- synapse/storage/events.py | 132 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9242b0a84e..afdf0f7193 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -17,6 +17,9 @@ from _base import SQLBaseStore, _RollbackButIsFineException from twisted.internet import defer +from synapse.events import FrozenEvent +from synapse.events.utils import prune_event + from synapse.util.logutils import log_function from synapse.api.constants import EventTypes from synapse.crypto.event_signing import compute_event_reference_hash @@ -26,6 +29,7 @@ from syutil.jsonutil import encode_canonical_json from contextlib import contextmanager import logging +import simplejson as json logger = logging.getLogger(__name__) @@ -393,3 +397,131 @@ class EventsStore(SQLBaseStore): return self.runInteraction( "have_events", f, ) + + def _get_events(self, event_ids, check_redacted=True, + get_prev_content=False): + return self.runInteraction( + "_get_events", self._get_events_txn, event_ids, + check_redacted=check_redacted, get_prev_content=get_prev_content, + ) + + def _get_events_txn(self, txn, event_ids, check_redacted=True, + get_prev_content=False): + if not event_ids: + return [] + + events = [ + self._get_event_txn( + txn, event_id, + check_redacted=check_redacted, + get_prev_content=get_prev_content + ) + for event_id in event_ids + ] + + return [e for e in events if e] + + def _invalidate_get_event_cache(self, event_id): + for check_redacted in (False, True): + for get_prev_content in (False, True): + self._get_event_cache.invalidate(event_id, check_redacted, + get_prev_content) + + def _get_event_txn(self, txn, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False): + + try: + ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) + + if allow_rejected or not ret.rejected_reason: + return ret + else: + return None + except KeyError: + pass + + sql = ( + "SELECT e.internal_metadata, e.json, r.event_id, rej.reason " + "FROM event_json as e " + "LEFT JOIN redactions as r ON e.event_id = r.redacts " + "LEFT JOIN rejections as rej on rej.event_id = e.event_id " + "WHERE e.event_id = ? " + "LIMIT 1 " + ) + + txn.execute(sql, (event_id,)) + + res = txn.fetchone() + + if not res: + return None + + internal_metadata, js, redacted, rejected_reason = res + + result = self._get_event_from_row_txn( + txn, internal_metadata, js, redacted, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=rejected_reason, + ) + self._get_event_cache.prefill(event_id, check_redacted, get_prev_content, result) + + if allow_rejected or not rejected_reason: + return result + else: + return None + + def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, + check_redacted=True, get_prev_content=False, + rejected_reason=None): + + d = json.loads(js) + internal_metadata = json.loads(internal_metadata) + + ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) + + if check_redacted and redacted: + ev = prune_event(ev) + + ev.unsigned["redacted_by"] = redacted + # Get the redaction event. + + because = self._get_event_txn( + txn, + redacted, + check_redacted=False + ) + + if because: + ev.unsigned["redacted_because"] = because + + if get_prev_content and "replaces_state" in ev.unsigned: + prev = self._get_event_txn( + txn, + ev.unsigned["replaces_state"], + get_prev_content=False, + ) + if prev: + ev.unsigned["prev_content"] = prev.get_dict()["content"] + + return ev + + def _parse_events(self, rows): + return self.runInteraction( + "_parse_events", self._parse_events_txn, rows + ) + + def _parse_events_txn(self, txn, rows): + event_ids = [r["event_id"] for r in rows] + + return self._get_events_txn(txn, event_ids) + + def _has_been_redacted_txn(self, txn, event): + sql = "SELECT event_id FROM redactions WHERE redacts = ?" + txn.execute(sql, (event.event_id,)) + result = txn.fetchone() + return result[0] if result else None -- cgit 1.4.1