summary refs log tree commit diff
path: root/synapse/storage/events_worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events_worker.py')
-rw-r--r--synapse/storage/events_worker.py86
1 files changed, 78 insertions, 8 deletions
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index a8326f5296..1716be529a 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -21,13 +21,14 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.api.constants import EventFormatVersions, EventTypes
 from synapse.api.errors import NotFoundError
+from synapse.events import FrozenEvent, event_type_from_format_version  # noqa: F401
 # these are only included to make the type annotations work
-from synapse.events import EventBase  # noqa: F401
-from synapse.events import FrozenEvent
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.events.utils import prune_event
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import get_domain_from_id
 from synapse.util.logcontext import (
     LoggingContext,
     PreserveLoggingContext,
@@ -160,9 +161,14 @@ class EventsWorkerStore(SQLBaseStore):
             log_ctx = LoggingContext.current_context()
             log_ctx.record_event_fetch(len(missing_events_ids))
 
+            # Note that _enqueue_events is also responsible for turning db rows
+            # into FrozenEvents (via _get_event_from_row), which involves seeing if
+            # the events have been redacted, and if so pulling the redaction event out
+            # of the database to check it.
+            #
+            # _enqueue_events is a bit of a rubbish name but naming is hard.
             missing_events = yield self._enqueue_events(
                 missing_events_ids,
-                check_redacted=check_redacted,
                 allow_rejected=allow_rejected,
             )
 
@@ -174,6 +180,50 @@ class EventsWorkerStore(SQLBaseStore):
             if not entry:
                 continue
 
+            # Starting in room version v3, some redactions need to be rechecked if we
+            # didn't have the redacted event at the time, so we recheck on read
+            # instead.
+            if not allow_rejected and entry.event.type == EventTypes.Redaction:
+                if entry.event.internal_metadata.need_to_check_redaction():
+                    # XXX: we need to avoid calling get_event here.
+                    #
+                    # The problem is that we end up at this point when an event
+                    # which has been redacted is pulled out of the database by
+                    # _enqueue_events, because _enqueue_events needs to check the
+                    # redaction before it can cache the redacted event. So obviously,
+                    # calling get_event to get the redacted event out of the database
+                    # gives us an infinite loop.
+                    #
+                    # For now (quick hack to fix during 0.99 release cycle), we just
+                    # go and fetch the relevant row from the db, but it would be nice
+                    # to think about how we can cache this rather than hit the db
+                    # every time we access a redaction event.
+                    #
+                    # One thought on how to do this:
+                    #  1. split _get_events up so that it is divided into (a) get the
+                    #     rawish event from the db/cache, (b) do the redaction/rejection
+                    #     filtering
+                    #  2. have _get_event_from_row just call the first half of that
+
+                    orig_sender = yield self._simple_select_one_onecol(
+                        table="events",
+                        keyvalues={"event_id": entry.event.redacts},
+                        retcol="sender",
+                        allow_none=True,
+                    )
+
+                    expected_domain = get_domain_from_id(entry.event.sender)
+                    if orig_sender and get_domain_from_id(orig_sender) == expected_domain:
+                        # This redaction event is allowed. Mark as not needing a
+                        # recheck.
+                        entry.event.internal_metadata.recheck_redaction = False
+                    else:
+                        # We don't have the event that is being redacted, so we
+                        # assume that the event isn't authorized for now. (If we
+                        # later receive the event, then we will always redact
+                        # it anyway, since we have this redaction)
+                        continue
+
             if allow_rejected or not entry.event.rejected_reason:
                 if check_redacted and entry.redacted_event:
                     event = entry.redacted_event
@@ -197,7 +247,7 @@ class EventsWorkerStore(SQLBaseStore):
         defer.returnValue(events)
 
     def _invalidate_get_event_cache(self, event_id):
-            self._get_event_cache.invalidate((event_id,))
+        self._get_event_cache.invalidate((event_id,))
 
     def _get_events_from_cache(self, events, allow_rejected, update_metrics=True):
         """Fetch events from the caches
@@ -310,7 +360,7 @@ class EventsWorkerStore(SQLBaseStore):
                     self.hs.get_reactor().callFromThread(fire, event_list, e)
 
     @defer.inlineCallbacks
-    def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
+    def _enqueue_events(self, events, allow_rejected=False):
         """Fetches events from the database using the _event_fetch_list. This
         allows batch and bulk fetching of events - it allows us to fetch events
         without having to create a new transaction for each request for events.
@@ -353,6 +403,7 @@ class EventsWorkerStore(SQLBaseStore):
                     self._get_event_from_row,
                     row["internal_metadata"], row["json"], row["redacts"],
                     rejected_reason=row["rejects"],
+                    format_version=row["format_version"],
                 )
                 for row in rows
             ],
@@ -377,6 +428,7 @@ class EventsWorkerStore(SQLBaseStore):
                 " e.event_id as event_id, "
                 " e.internal_metadata,"
                 " e.json,"
+                " e.format_version, "
                 " r.redacts as redacts,"
                 " rej.event_id as rejects "
                 " FROM event_json as e"
@@ -392,7 +444,7 @@ class EventsWorkerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def _get_event_from_row(self, internal_metadata, js, redacted,
-                            rejected_reason=None):
+                            format_version, rejected_reason=None):
         with Measure(self._clock, "_get_event_from_row"):
             d = json.loads(js)
             internal_metadata = json.loads(internal_metadata)
@@ -405,8 +457,13 @@ class EventsWorkerStore(SQLBaseStore):
                     desc="_get_event_from_row_rejected_reason",
                 )
 
-            original_ev = FrozenEvent(
-                d,
+            if format_version is None:
+                # This means that we stored the event before we had the concept
+                # of a event format version, so it must be a V1 event.
+                format_version = EventFormatVersions.V1
+
+            original_ev = event_type_from_format_version(format_version)(
+                event_dict=d,
                 internal_metadata_dict=internal_metadata,
                 rejected_reason=rejected_reason,
             )
@@ -436,6 +493,19 @@ class EventsWorkerStore(SQLBaseStore):
                     # will serialise this field correctly
                     redacted_event.unsigned["redacted_because"] = because
 
+                    # Starting in room version v3, some redactions need to be
+                    # rechecked if we didn't have the redacted event at the
+                    # time, so we recheck on read instead.
+                    if because.internal_metadata.need_to_check_redaction():
+                        expected_domain = get_domain_from_id(original_ev.sender)
+                        if get_domain_from_id(because.sender) == expected_domain:
+                            # This redaction event is allowed. Mark as not needing a
+                            # recheck.
+                            because.internal_metadata.recheck_redaction = False
+                        else:
+                            # Senders don't match, so the event isn't actually redacted
+                            redacted_event = None
+
             cache_entry = _EventCacheEntry(
                 event=original_ev,
                 redacted_event=redacted_event,