summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/federation_client.py123
-rw-r--r--synapse/handlers/federation_event.py22
-rw-r--r--synapse/storage/databases/main/events.py23
3 files changed, 119 insertions, 49 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 7c450ecad0..842f5327c2 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -53,7 +53,7 @@ from synapse.api.room_versions import (
     RoomVersion,
     RoomVersions,
 )
-from synapse.events import EventBase, builder
+from synapse.events import EventBase, builder, make_event_from_dict
 from synapse.federation.federation_base import (
     FederationBase,
     InvalidEventSignatureError,
@@ -299,7 +299,8 @@ class FederationClient(FederationBase):
                 moving to the next destination. None indicates no timeout.
 
         Returns:
-            The requested PDU, or None if we were unable to find it.
+            A copy of the requested PDU that is safe to modify, or None if we
+            were unable to find it.
 
         Raises:
             SynapseError, NotRetryingDestination, FederationDeniedError
@@ -309,7 +310,7 @@ class FederationClient(FederationBase):
         )
 
         logger.debug(
-            "retrieved event id %s from %s: %r",
+            "get_pdu_from_destination_raw: retrieved event id %s from %s: %r",
             event_id,
             destination,
             transaction_data,
@@ -358,54 +359,92 @@ class FederationClient(FederationBase):
             The requested PDU, or None if we were unable to find it.
         """
 
-        # TODO: Rate limit the number of times we try and get the same event.
+        logger.debug(
+            "get_pdu: event_id=%s from destinations=%s", event_id, destinations
+        )
 
-        ev = self._get_pdu_cache.get(event_id)
-        if ev:
-            return ev
+        # TODO: Rate limit the number of times we try and get the same event.
 
-        pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
+        # We might need the same event multiple times in quick succession (before
+        # it gets persisted to the database), so we cache the results of the lookup.
+        # Note that this is separate to the regular get_event cache which caches
+        # events once they have been persisted.
+        event = self._get_pdu_cache.get(event_id)
+
+        # If we don't see the event in the cache, go try to fetch it from the
+        # provided remote federated destinations
+        if not event:
+            pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
+
+            for destination in destinations:
+                now = self._clock.time_msec()
+                last_attempt = pdu_attempts.get(destination, 0)
+                if last_attempt + PDU_RETRY_TIME_MS > now:
+                    logger.debug(
+                        "get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)",
+                        destination,
+                        last_attempt,
+                        PDU_RETRY_TIME_MS,
+                        now,
+                    )
+                    continue
+
+                try:
+                    event = await self.get_pdu_from_destination_raw(
+                        destination=destination,
+                        event_id=event_id,
+                        room_version=room_version,
+                        timeout=timeout,
+                    )
 
-        signed_pdu = None
-        for destination in destinations:
-            now = self._clock.time_msec()
-            last_attempt = pdu_attempts.get(destination, 0)
-            if last_attempt + PDU_RETRY_TIME_MS > now:
-                continue
+                    pdu_attempts[destination] = now
 
-            try:
-                signed_pdu = await self.get_pdu_from_destination_raw(
-                    destination=destination,
-                    event_id=event_id,
-                    room_version=room_version,
-                    timeout=timeout,
-                )
+                    if event:
+                        # Prime the cache
+                        self._get_pdu_cache[event.event_id] = event
 
-                pdu_attempts[destination] = now
+                        # FIXME: We should add a `break` here to avoid calling every
+                        # destination after we already found a PDU (will follow-up
+                        # in a separate PR)
 
-            except SynapseError as e:
-                logger.info(
-                    "Failed to get PDU %s from %s because %s", event_id, destination, e
-                )
-                continue
-            except NotRetryingDestination as e:
-                logger.info(str(e))
-                continue
-            except FederationDeniedError as e:
-                logger.info(str(e))
-                continue
-            except Exception as e:
-                pdu_attempts[destination] = now
+                except SynapseError as e:
+                    logger.info(
+                        "Failed to get PDU %s from %s because %s",
+                        event_id,
+                        destination,
+                        e,
+                    )
+                    continue
+                except NotRetryingDestination as e:
+                    logger.info(str(e))
+                    continue
+                except FederationDeniedError as e:
+                    logger.info(str(e))
+                    continue
+                except Exception as e:
+                    pdu_attempts[destination] = now
+
+                    logger.info(
+                        "Failed to get PDU %s from %s because %s",
+                        event_id,
+                        destination,
+                        e,
+                    )
+                    continue
 
-                logger.info(
-                    "Failed to get PDU %s from %s because %s", event_id, destination, e
-                )
-                continue
+        if not event:
+            return None
 
-        if signed_pdu:
-            self._get_pdu_cache[event_id] = signed_pdu
+        # `event` now refers to an object stored in `get_pdu_cache`. Our
+        # callers may need to modify the returned object (eg to set
+        # `event.internal_metadata.outlier = true`), so we return a copy
+        # rather than the original object.
+        event_copy = make_event_from_dict(
+            event.get_pdu_json(),
+            event.room_version,
+        )
 
-        return signed_pdu
+        return event_copy
 
     async def get_room_state_ids(
         self, destination: str, room_id: str, event_id: str
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index e4a5b64d10..a5f4ce7c8a 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -766,10 +766,24 @@ class FederationEventHandler:
         """
         logger.info("Processing pulled event %s", event)
 
-        # these should not be outliers.
-        assert (
-            not event.internal_metadata.is_outlier()
-        ), "pulled event unexpectedly flagged as outlier"
+        # This function should not be used to persist outliers (use something
+        # else) because this does a bunch of operations that aren't necessary
+        # (extra work; in particular, it makes sure we have all the prev_events
+        # and resolves the state across those prev events). If you happen to run
+        # into a situation where the event you're trying to process/backfill is
+        # marked as an `outlier`, then you should update that spot to return an
+        # `EventBase` copy that doesn't have `outlier` flag set.
+        #
+        # `EventBase` is used to represent both an event we have not yet
+        # persisted, and one that we have persisted and now keep in the cache.
+        # In an ideal world this method would only be called with the first type
+        # of event, but it turns out that's not actually the case and for
+        # example, you could get an event from cache that is marked as an
+        # `outlier` (fix up that spot though).
+        assert not event.internal_metadata.is_outlier(), (
+            "Outlier event passed to _process_pulled_event. "
+            "To persist an event as a non-outlier, make sure to pass in a copy without `event.internal_metadata.outlier = true`."
+        )
 
         event_id = event.event_id
 
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 156e1bd5ab..1f600f1190 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1346,9 +1346,24 @@ class PersistEventsStore:
             event_id: outlier for event_id, outlier in txn
         }
 
+        logger.debug(
+            "_update_outliers_txn: events=%s have_persisted=%s",
+            [ev.event_id for ev, _ in events_and_contexts],
+            have_persisted,
+        )
+
         to_remove = set()
         for event, context in events_and_contexts:
-            if event.event_id not in have_persisted:
+            outlier_persisted = have_persisted.get(event.event_id)
+            logger.debug(
+                "_update_outliers_txn: event=%s outlier=%s outlier_persisted=%s",
+                event.event_id,
+                event.internal_metadata.is_outlier(),
+                outlier_persisted,
+            )
+
+            # Ignore events which we haven't persisted at all
+            if outlier_persisted is None:
                 continue
 
             to_remove.add(event)
@@ -1358,7 +1373,6 @@ class PersistEventsStore:
                 # was an outlier or not - what we have is at least as good.
                 continue
 
-            outlier_persisted = have_persisted[event.event_id]
             if not event.internal_metadata.is_outlier() and outlier_persisted:
                 # We received a copy of an event that we had already stored as
                 # an outlier in the database. We now have some state at that event
@@ -1369,7 +1383,10 @@ class PersistEventsStore:
                 # events down /sync. In general they will be historical events, so that
                 # doesn't matter too much, but that is not always the case.
 
-                logger.info("Updating state for ex-outlier event %s", event.event_id)
+                logger.info(
+                    "_update_outliers_txn: Updating state for ex-outlier event %s",
+                    event.event_id,
+                )
 
                 # insert into event_to_state_groups.
                 try: