summary refs log tree commit diff
path: root/synapse/storage/events_worker.py
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2019-07-17 16:49:19 +0100
committerGitHub <noreply@github.com>2019-07-17 16:49:19 +0100
commit65c5592b8ed11e5ed4c3d6e29aa530d089c463ac (patch)
tree6e5d967940412c8451c749c3328dfc8549107fea /synapse/storage/events_worker.py
parentMerge pull request #5597 from matrix-org/erikj/admin_api_cmd (diff)
downloadsynapse-65c5592b8ed11e5ed4c3d6e29aa530d089c463ac.tar.xz
Refactor `get_events_as_list` (#5699)
A couple of changes here:

* get rid of a redundant `allow_rejected` condition - we should already have filtered out any rejected
  events before we get to that point in the code, and the redundancy is confusing. Instead, let's stick in
  an assertion just to make double-sure we aren't leaking rejected events by mistake.

* factor out a `_get_events_from_cache_or_db` method, which is going to be important for a 
  forthcoming fix to redactions.
Diffstat (limited to 'synapse/storage/events_worker.py')
-rw-r--r--synapse/storage/events_worker.py122
1 files changed, 74 insertions, 48 deletions
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 874d0a56bc..6d6bb1d5d3 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -218,37 +218,23 @@ class EventsWorkerStore(SQLBaseStore):
         if not event_ids:
             defer.returnValue([])
 
-        event_id_list = event_ids
-        event_ids = set(event_ids)
-
-        event_entry_map = self._get_events_from_cache(
-            event_ids, allow_rejected=allow_rejected
+        # there may be duplicates so we cast the list to a set
+        event_entry_map = yield self._get_events_from_cache_or_db(
+            set(event_ids), allow_rejected=allow_rejected
         )
 
-        missing_events_ids = [e for e in event_ids if e not in event_entry_map]
-
-        if missing_events_ids:
-            log_ctx = LoggingContext.current_context()
-            log_ctx.record_event_fetch(len(missing_events_ids))
-
-            # Note that _enqueue_events is also responsible for turning db rows
-            # into FrozenEvents (via _get_event_from_row), which involves seeing if
-            # the events have been redacted, and if so pulling the redaction event out
-            # of the database to check it.
-            #
-            # _enqueue_events is a bit of a rubbish name but naming is hard.
-            missing_events = yield self._enqueue_events(
-                missing_events_ids, allow_rejected=allow_rejected
-            )
-
-            event_entry_map.update(missing_events)
-
         events = []
-        for event_id in event_id_list:
+        for event_id in event_ids:
             entry = event_entry_map.get(event_id, None)
             if not entry:
                 continue
 
+            if not allow_rejected:
+                assert not entry.event.rejected_reason, (
+                    "rejected event returned from _get_events_from_cache_or_db despite "
+                    "allow_rejected=False"
+                )
+
             # Starting in room version v3, some redactions need to be rechecked if we
             # didn't have the redacted event at the time, so we recheck on read
             # instead.
@@ -291,34 +277,74 @@ class EventsWorkerStore(SQLBaseStore):
                         # recheck.
                         entry.event.internal_metadata.recheck_redaction = False
                     else:
-                        # We don't have the event that is being redacted, so we
-                        # assume that the event isn't authorized for now. (If we
-                        # later receive the event, then we will always redact
-                        # it anyway, since we have this redaction)
+                        # We either don't have the event that is being redacted (so we
+                        # assume that the event isn't authorised for now), or the
+                        # senders don't match (so it will never be authorised). Either
+                        # way, we shouldn't return it.
+                        #
+                        # (If we later receive the event, then we will redact it anyway,
+                        # since we have this redaction)
                         continue
 
-            if allow_rejected or not entry.event.rejected_reason:
-                if check_redacted and entry.redacted_event:
-                    event = entry.redacted_event
-                else:
-                    event = entry.event
-
-                events.append(event)
-
-                if get_prev_content:
-                    if "replaces_state" in event.unsigned:
-                        prev = yield self.get_event(
-                            event.unsigned["replaces_state"],
-                            get_prev_content=False,
-                            allow_none=True,
-                        )
-                        if prev:
-                            event.unsigned = dict(event.unsigned)
-                            event.unsigned["prev_content"] = prev.content
-                            event.unsigned["prev_sender"] = prev.sender
+            if check_redacted and entry.redacted_event:
+                event = entry.redacted_event
+            else:
+                event = entry.event
+
+            events.append(event)
+
+            if get_prev_content:
+                if "replaces_state" in event.unsigned:
+                    prev = yield self.get_event(
+                        event.unsigned["replaces_state"],
+                        get_prev_content=False,
+                        allow_none=True,
+                    )
+                    if prev:
+                        event.unsigned = dict(event.unsigned)
+                        event.unsigned["prev_content"] = prev.content
+                        event.unsigned["prev_sender"] = prev.sender
 
         defer.returnValue(events)
 
+    @defer.inlineCallbacks
+    def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False):
+        """Fetch a bunch of events from the cache or the database.
+
+        If events are pulled from the database, they will be cached for future lookups.
+
+        Args:
+            event_ids (Iterable[str]): The event_ids of the events to fetch
+            allow_rejected (bool): Whether to include rejected events
+
+        Returns:
+            Deferred[Dict[str, _EventCacheEntry]]:
+                map from event id to result
+        """
+        event_entry_map = self._get_events_from_cache(
+            event_ids, allow_rejected=allow_rejected
+        )
+
+        missing_events_ids = [e for e in event_ids if e not in event_entry_map]
+
+        if missing_events_ids:
+            log_ctx = LoggingContext.current_context()
+            log_ctx.record_event_fetch(len(missing_events_ids))
+
+            # Note that _enqueue_events is also responsible for turning db rows
+            # into FrozenEvents (via _get_event_from_row), which involves seeing if
+            # the events have been redacted, and if so pulling the redaction event out
+            # of the database to check it.
+            #
+            # _enqueue_events is a bit of a rubbish name but naming is hard.
+            missing_events = yield self._enqueue_events(
+                missing_events_ids, allow_rejected=allow_rejected
+            )
+
+            event_entry_map.update(missing_events)
+
+        return event_entry_map
+
     def _invalidate_get_event_cache(self, event_id):
         self._get_event_cache.invalidate((event_id,))
 
@@ -326,7 +352,7 @@ class EventsWorkerStore(SQLBaseStore):
         """Fetch events from the caches
 
         Args:
-            events (list(str)): list of event_ids to fetch
+            events (Iterable[str]): list of event_ids to fetch
             allow_rejected (bool): Whether to return events that were rejected
             update_metrics (bool): Whether to update the cache hit ratio metrics