summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py155
-rw-r--r--synapse/storage/events.py132
2 files changed, 132 insertions, 155 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 81052409b7..ec80169c5b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -15,8 +15,6 @@
 import logging
 
 from synapse.api.errors import StoreError
-from synapse.events import FrozenEvent
-from synapse.events.utils import prune_event
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import preserve_context_over_fn, LoggingContext
 from synapse.util.lrucache import LruCache
@@ -28,7 +26,6 @@ from twisted.internet import defer
 
 from collections import namedtuple, OrderedDict
 import functools
-import simplejson as json
 import sys
 import time
 import threading
@@ -867,158 +864,6 @@ class SQLBaseStore(object):
 
         return self.runInteraction("_simple_max_id", func)
 
-    def _get_events(self, event_ids, check_redacted=True,
-                    get_prev_content=False):
-        return self.runInteraction(
-            "_get_events", self._get_events_txn, event_ids,
-            check_redacted=check_redacted, get_prev_content=get_prev_content,
-        )
-
-    def _get_events_txn(self, txn, event_ids, check_redacted=True,
-                        get_prev_content=False):
-        if not event_ids:
-            return []
-
-        events = [
-            self._get_event_txn(
-                txn, event_id,
-                check_redacted=check_redacted,
-                get_prev_content=get_prev_content
-            )
-            for event_id in event_ids
-        ]
-
-        return [e for e in events if e]
-
-    def _invalidate_get_event_cache(self, event_id):
-        for check_redacted in (False, True):
-            for get_prev_content in (False, True):
-                self._get_event_cache.invalidate(event_id, check_redacted,
-                                                 get_prev_content)
-
-    def _get_event_txn(self, txn, event_id, check_redacted=True,
-                       get_prev_content=False, allow_rejected=False):
-
-        start_time = time.time() * 1000
-
-        def update_counter(desc, last_time):
-            curr_time = self._get_event_counters.update(desc, last_time)
-            sql_getevents_timer.inc_by(curr_time - last_time, desc)
-            return curr_time
-
-        try:
-            ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content)
-
-            if allow_rejected or not ret.rejected_reason:
-                return ret
-            else:
-                return None
-        except KeyError:
-            pass
-        finally:
-            start_time = update_counter("event_cache", start_time)
-
-        sql = (
-            "SELECT e.internal_metadata, e.json, r.event_id, rej.reason "
-            "FROM event_json as e "
-            "LEFT JOIN redactions as r ON e.event_id = r.redacts "
-            "LEFT JOIN rejections as rej on rej.event_id = e.event_id  "
-            "WHERE e.event_id = ? "
-            "LIMIT 1 "
-        )
-
-        txn.execute(sql, (event_id,))
-
-        res = txn.fetchone()
-
-        if not res:
-            return None
-
-        internal_metadata, js, redacted, rejected_reason = res
-
-        start_time = update_counter("select_event", start_time)
-
-        result = self._get_event_from_row_txn(
-            txn, internal_metadata, js, redacted,
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
-            rejected_reason=rejected_reason,
-        )
-        self._get_event_cache.prefill(event_id, check_redacted, get_prev_content, result)
-
-        if allow_rejected or not rejected_reason:
-            return result
-        else:
-            return None
-
-    def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
-                                check_redacted=True, get_prev_content=False,
-                                rejected_reason=None):
-
-        start_time = time.time() * 1000
-
-        def update_counter(desc, last_time):
-            curr_time = self._get_event_counters.update(desc, last_time)
-            sql_getevents_timer.inc_by(curr_time - last_time, desc)
-            return curr_time
-
-        d = json.loads(js)
-        start_time = update_counter("decode_json", start_time)
-
-        internal_metadata = json.loads(internal_metadata)
-        start_time = update_counter("decode_internal", start_time)
-
-        ev = FrozenEvent(
-            d,
-            internal_metadata_dict=internal_metadata,
-            rejected_reason=rejected_reason,
-        )
-        start_time = update_counter("build_frozen_event", start_time)
-
-        if check_redacted and redacted:
-            ev = prune_event(ev)
-
-            ev.unsigned["redacted_by"] = redacted
-            # Get the redaction event.
-
-            because = self._get_event_txn(
-                txn,
-                redacted,
-                check_redacted=False
-            )
-
-            if because:
-                ev.unsigned["redacted_because"] = because
-            start_time = update_counter("redact_event", start_time)
-
-        if get_prev_content and "replaces_state" in ev.unsigned:
-            prev = self._get_event_txn(
-                txn,
-                ev.unsigned["replaces_state"],
-                get_prev_content=False,
-            )
-            if prev:
-                ev.unsigned["prev_content"] = prev.get_dict()["content"]
-            start_time = update_counter("get_prev_content", start_time)
-
-        return ev
-
-    def _parse_events(self, rows):
-        return self.runInteraction(
-            "_parse_events", self._parse_events_txn, rows
-        )
-
-    def _parse_events_txn(self, txn, rows):
-        event_ids = [r["event_id"] for r in rows]
-
-        return self._get_events_txn(txn, event_ids)
-
-    def _has_been_redacted_txn(self, txn, event):
-        sql = "SELECT event_id FROM redactions WHERE redacts = ?"
-        txn.execute(sql, (event.event_id,))
-        result = txn.fetchone()
-        return result[0] if result else None
-
     def get_next_stream_id(self):
         with self._next_stream_id_lock:
             i = self._next_stream_id
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 9242b0a84e..afdf0f7193 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -17,6 +17,9 @@ from _base import SQLBaseStore, _RollbackButIsFineException
 
 from twisted.internet import defer
 
+from synapse.events import FrozenEvent
+from synapse.events.utils import prune_event
+
 from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
 from synapse.crypto.event_signing import compute_event_reference_hash
@@ -26,6 +29,7 @@ from syutil.jsonutil import encode_canonical_json
 from contextlib import contextmanager
 
 import logging
+import simplejson as json
 
 logger = logging.getLogger(__name__)
 
@@ -393,3 +397,131 @@ class EventsStore(SQLBaseStore):
         return self.runInteraction(
             "have_events", f,
         )
+
+    def _get_events(self, event_ids, check_redacted=True,
+                    get_prev_content=False):
+        return self.runInteraction(
+            "_get_events", self._get_events_txn, event_ids,
+            check_redacted=check_redacted, get_prev_content=get_prev_content,
+        )
+
+    def _get_events_txn(self, txn, event_ids, check_redacted=True,
+                        get_prev_content=False):
+        if not event_ids:
+            return []
+
+        events = [
+            self._get_event_txn(
+                txn, event_id,
+                check_redacted=check_redacted,
+                get_prev_content=get_prev_content
+            )
+            for event_id in event_ids
+        ]
+
+        return [e for e in events if e]
+
+    def _invalidate_get_event_cache(self, event_id):
+        for check_redacted in (False, True):
+            for get_prev_content in (False, True):
+                self._get_event_cache.invalidate(event_id, check_redacted,
+                                                 get_prev_content)
+
+    def _get_event_txn(self, txn, event_id, check_redacted=True,
+                       get_prev_content=False, allow_rejected=False):
+
+        try:
+            ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content)
+
+            if allow_rejected or not ret.rejected_reason:
+                return ret
+            else:
+                return None
+        except KeyError:
+            pass
+
+        sql = (
+            "SELECT e.internal_metadata, e.json, r.event_id, rej.reason "
+            "FROM event_json as e "
+            "LEFT JOIN redactions as r ON e.event_id = r.redacts "
+            "LEFT JOIN rejections as rej on rej.event_id = e.event_id  "
+            "WHERE e.event_id = ? "
+            "LIMIT 1 "
+        )
+
+        txn.execute(sql, (event_id,))
+
+        res = txn.fetchone()
+
+        if not res:
+            return None
+
+        internal_metadata, js, redacted, rejected_reason = res
+
+        result = self._get_event_from_row_txn(
+            txn, internal_metadata, js, redacted,
+            check_redacted=check_redacted,
+            get_prev_content=get_prev_content,
+            rejected_reason=rejected_reason,
+        )
+        self._get_event_cache.prefill(event_id, check_redacted, get_prev_content, result)
+
+        if allow_rejected or not rejected_reason:
+            return result
+        else:
+            return None
+
+    def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
+                                check_redacted=True, get_prev_content=False,
+                                rejected_reason=None):
+
+        d = json.loads(js)
+        internal_metadata = json.loads(internal_metadata)
+
+        ev = FrozenEvent(
+            d,
+            internal_metadata_dict=internal_metadata,
+            rejected_reason=rejected_reason,
+        )
+
+        if check_redacted and redacted:
+            ev = prune_event(ev)
+
+            ev.unsigned["redacted_by"] = redacted
+            # Get the redaction event.
+
+            because = self._get_event_txn(
+                txn,
+                redacted,
+                check_redacted=False
+            )
+
+            if because:
+                ev.unsigned["redacted_because"] = because
+
+        if get_prev_content and "replaces_state" in ev.unsigned:
+            prev = self._get_event_txn(
+                txn,
+                ev.unsigned["replaces_state"],
+                get_prev_content=False,
+            )
+            if prev:
+                ev.unsigned["prev_content"] = prev.get_dict()["content"]
+
+        return ev
+
+    def _parse_events(self, rows):
+        return self.runInteraction(
+            "_parse_events", self._parse_events_txn, rows
+        )
+
+    def _parse_events_txn(self, txn, rows):
+        event_ids = [r["event_id"] for r in rows]
+
+        return self._get_events_txn(txn, event_ids)
+
+    def _has_been_redacted_txn(self, txn, event):
+        sql = "SELECT event_id FROM redactions WHERE redacts = ?"
+        txn.execute(sql, (event.event_id,))
+        result = txn.fetchone()
+        return result[0] if result else None