summary refs log tree commit diff
path: root/synapse/storage/events_worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events_worker.py')
-rw-r--r--synapse/storage/events_worker.py214
1 files changed, 129 insertions, 85 deletions
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 6d6bb1d5d3..e2fc7bc047 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -37,6 +37,7 @@ from synapse.logging.context import (
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import get_domain_from_id
+from synapse.util import batch_iter
 from synapse.util.metrics import Measure
 
 from ._base import SQLBaseStore
@@ -240,27 +241,8 @@ class EventsWorkerStore(SQLBaseStore):
             # instead.
             if not allow_rejected and entry.event.type == EventTypes.Redaction:
                 if entry.event.internal_metadata.need_to_check_redaction():
-                    # XXX: we need to avoid calling get_event here.
-                    #
-                    # The problem is that we end up at this point when an event
-                    # which has been redacted is pulled out of the database by
-                    # _enqueue_events, because _enqueue_events needs to check
-                    # the redaction before it can cache the redacted event. So
-                    # obviously, calling get_event to get the redacted event out
-                    # of the database gives us an infinite loop.
-                    #
-                    # For now (quick hack to fix during 0.99 release cycle), we
-                    # just go and fetch the relevant row from the db, but it
-                    # would be nice to think about how we can cache this rather
-                    # than hit the db every time we access a redaction event.
-                    #
-                    # One thought on how to do this:
-                    #  1. split get_events_as_list up so that it is divided into
-                    #     (a) get the rawish event from the db/cache, (b) do the
-                    #     redaction/rejection filtering
-                    #  2. have _get_event_from_row just call the first half of
-                    #     that
-
+                    # XXX: we should probably use _get_events_from_cache_or_db here,
+                    # so that we can benefit from caching.
                     orig_sender = yield self._simple_select_one_onecol(
                         table="events",
                         keyvalues={"event_id": entry.event.redacts},
@@ -410,19 +392,16 @@ class EventsWorkerStore(SQLBaseStore):
                 The fetch requests. Each entry consists of a list of event
                 ids to be fetched, and a deferred to be completed once the
                 events have been fetched.
-
         """
         with Measure(self._clock, "_fetch_event_list"):
             try:
                 event_id_lists = list(zip(*event_list))[0]
                 event_ids = [item for sublist in event_id_lists for item in sublist]
 
-                rows = self._new_transaction(
+                row_dict = self._new_transaction(
                     conn, "do_fetch", [], [], self._fetch_event_rows, event_ids
                 )
 
-                row_dict = {r["event_id"]: r for r in rows}
-
                 # We only want to resolve deferreds from the main thread
                 def fire(lst, res):
                     for ids, d in lst:
@@ -480,7 +459,7 @@ class EventsWorkerStore(SQLBaseStore):
         logger.debug("Loaded %d events (%d rows)", len(events), len(rows))
 
         if not allow_rejected:
-            rows[:] = [r for r in rows if not r["rejects"]]
+            rows[:] = [r for r in rows if r["rejected_reason"] is None]
 
         res = yield make_deferred_yieldable(
             defer.gatherResults(
@@ -489,8 +468,8 @@ class EventsWorkerStore(SQLBaseStore):
                         self._get_event_from_row,
                         row["internal_metadata"],
                         row["json"],
-                        row["redacts"],
-                        rejected_reason=row["rejects"],
+                        row["redactions"],
+                        rejected_reason=row["rejected_reason"],
                         format_version=row["format_version"],
                     )
                     for row in rows
@@ -501,49 +480,98 @@ class EventsWorkerStore(SQLBaseStore):
 
         defer.returnValue({e.event.event_id: e for e in res if e})
 
-    def _fetch_event_rows(self, txn, events):
-        rows = []
-        N = 200
-        for i in range(1 + len(events) // N):
-            evs = events[i * N : (i + 1) * N]
-            if not evs:
-                break
+    def _fetch_event_rows(self, txn, event_ids):
+        """Fetch event rows from the database
+
+        Events which are not found are omitted from the result.
+
+        The returned per-event dicts contain the following keys:
+
+         * event_id (str)
+
+         * json (str): json-encoded event structure
+
+         * internal_metadata (str): json-encoded internal metadata dict
+
+         * format_version (int|None): The format of the event. Hopefully one
+           of EventFormatVersions. 'None' means the event predates
+           EventFormatVersions (so the event is format V1).
+
+         * rejected_reason (str|None): if the event was rejected, the reason
+           why.
+
+         * redactions (List[str]): a list of event-ids which (claim to) redact
+           this event.
+
+        Args:
+            txn (twisted.enterprise.adbapi.Connection):
+            event_ids (Iterable[str]): event IDs to fetch
 
+        Returns:
+            Dict[str, Dict]: a map from event id to event info.
+        """
+        event_dict = {}
+        for evs in batch_iter(event_ids, 200):
             sql = (
                 "SELECT "
-                " e.event_id as event_id, "
+                " e.event_id, "
                 " e.internal_metadata,"
                 " e.json,"
                 " e.format_version, "
-                " r.redacts as redacts,"
-                " rej.event_id as rejects "
+                " rej.reason "
                 " FROM event_json as e"
                 " LEFT JOIN rejections as rej USING (event_id)"
-                " LEFT JOIN redactions as r ON e.event_id = r.redacts"
                 " WHERE e.event_id IN (%s)"
             ) % (",".join(["?"] * len(evs)),)
 
             txn.execute(sql, evs)
-            rows.extend(self.cursor_to_dict(txn))
 
-        return rows
+            for row in txn:
+                event_id = row[0]
+                event_dict[event_id] = {
+                    "event_id": event_id,
+                    "internal_metadata": row[1],
+                    "json": row[2],
+                    "format_version": row[3],
+                    "rejected_reason": row[4],
+                    "redactions": [],
+                }
+
+            # check for redactions
+            redactions_sql = (
+                "SELECT event_id, redacts FROM redactions WHERE redacts IN (%s)"
+            ) % (",".join(["?"] * len(evs)),)
+
+            txn.execute(redactions_sql, evs)
+
+            for (redacter, redacted) in txn:
+                d = event_dict.get(redacted)
+                if d:
+                    d["redactions"].append(redacter)
+
+        return event_dict
 
     @defer.inlineCallbacks
     def _get_event_from_row(
-        self, internal_metadata, js, redacted, format_version, rejected_reason=None
+        self, internal_metadata, js, redactions, format_version, rejected_reason=None
     ):
+        """Parse an event row which has been read from the database
+
+        Args:
+            internal_metadata (str): json-encoded internal_metadata column
+            js (str): json-encoded event body from event_json
+            redactions (list[str]): a list of the events which claim to have redacted
+                this event, from the redactions table
+            format_version: (str): the 'format_version' column
+            rejected_reason (str|None): the reason this event was rejected, if any
+
+        Returns:
+            _EventCacheEntry
+        """
         with Measure(self._clock, "_get_event_from_row"):
             d = json.loads(js)
             internal_metadata = json.loads(internal_metadata)
 
-            if rejected_reason:
-                rejected_reason = yield self._simple_select_one_onecol(
-                    table="rejections",
-                    keyvalues={"event_id": rejected_reason},
-                    retcol="reason",
-                    desc="_get_event_from_row_rejected_reason",
-                )
-
             if format_version is None:
                 # This means that we stored the event before we had the concept
                 # of a event format version, so it must be a V1 event.
@@ -555,41 +583,7 @@ class EventsWorkerStore(SQLBaseStore):
                 rejected_reason=rejected_reason,
             )
 
-            redacted_event = None
-            if redacted:
-                redacted_event = prune_event(original_ev)
-
-                redaction_id = yield self._simple_select_one_onecol(
-                    table="redactions",
-                    keyvalues={"redacts": redacted_event.event_id},
-                    retcol="event_id",
-                    desc="_get_event_from_row_redactions",
-                )
-
-                redacted_event.unsigned["redacted_by"] = redaction_id
-                # Get the redaction event.
-
-                because = yield self.get_event(
-                    redaction_id, check_redacted=False, allow_none=True
-                )
-
-                if because:
-                    # It's fine to do add the event directly, since get_pdu_json
-                    # will serialise this field correctly
-                    redacted_event.unsigned["redacted_because"] = because
-
-                    # Starting in room version v3, some redactions need to be
-                    # rechecked if we didn't have the redacted event at the
-                    # time, so we recheck on read instead.
-                    if because.internal_metadata.need_to_check_redaction():
-                        expected_domain = get_domain_from_id(original_ev.sender)
-                        if get_domain_from_id(because.sender) == expected_domain:
-                            # This redaction event is allowed. Mark as not needing a
-                            # recheck.
-                            because.internal_metadata.recheck_redaction = False
-                        else:
-                            # Senders don't match, so the event isn't actually redacted
-                            redacted_event = None
+            redacted_event = yield self._maybe_redact_event_row(original_ev, redactions)
 
             cache_entry = _EventCacheEntry(
                 event=original_ev, redacted_event=redacted_event
@@ -600,6 +594,56 @@ class EventsWorkerStore(SQLBaseStore):
         defer.returnValue(cache_entry)
 
     @defer.inlineCallbacks
+    def _maybe_redact_event_row(self, original_ev, redactions):
+        """Given an event object and a list of possible redacting event ids,
+        determine whether to honour any of those redactions and if so return a redacted
+        event.
+
+        Args:
+             original_ev (EventBase):
+             redactions (iterable[str]): list of event ids of potential redaction events
+
+        Returns:
+            Deferred[EventBase|None]: if the event should be redacted, a pruned
+                event object. Otherwise, None.
+        """
+        redaction_map = yield self._get_events_from_cache_or_db(redactions)
+
+        for redaction_id in redactions:
+            redaction_entry = redaction_map.get(redaction_id)
+            if not redaction_entry:
+                # we don't have the redaction event, or the redaction event was not
+                # authorized.
+                continue
+
+            redaction_event = redaction_entry.event
+
+            # Starting in room version v3, some redactions need to be
+            # rechecked if we didn't have the redacted event at the
+            # time, so we recheck on read instead.
+            if redaction_event.internal_metadata.need_to_check_redaction():
+                expected_domain = get_domain_from_id(original_ev.sender)
+                if get_domain_from_id(redaction_event.sender) == expected_domain:
+                    # This redaction event is allowed. Mark as not needing a recheck.
+                    redaction_event.internal_metadata.recheck_redaction = False
+                else:
+                    # Senders don't match, so the event isn't actually redacted
+                    continue
+
+            # we found a good redaction event. Redact!
+            redacted_event = prune_event(original_ev)
+            redacted_event.unsigned["redacted_by"] = redaction_id
+
+            # It's fine to add the event directly, since get_pdu_json
+            # will serialise this field correctly
+            redacted_event.unsigned["redacted_because"] = redaction_event
+
+            return redacted_event
+
+        # no valid redaction found for this event
+        return None
+
+    @defer.inlineCallbacks
     def have_events_in_timeline(self, event_ids):
         """Given a list of event ids, check if we have already processed and
         stored them as non outliers.