summary refs log tree commit diff
path: root/synapse/federation/federation_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/federation_client.py')
-rw-r--r--synapse/federation/federation_client.py80
1 files changed, 79 insertions, 1 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index b5538bc07a..8c6b839478 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -33,6 +33,7 @@ from typing import (
 from prometheus_client import Counter
 
 from twisted.internet import defer
+from twisted.internet.defer import Deferred
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import (
@@ -51,7 +52,7 @@ from synapse.api.room_versions import (
 )
 from synapse.events import EventBase, builder
 from synapse.federation.federation_base import FederationBase, event_from_pdu_json
-from synapse.logging.context import make_deferred_yieldable
+from synapse.logging.context import make_deferred_yieldable, preserve_fn
 from synapse.logging.utils import log_function
 from synapse.types import JsonDict
 from synapse.util import unwrapFirstError
@@ -345,6 +346,83 @@ class FederationClient(FederationBase):
 
         return state_event_ids, auth_event_ids
 
+    async def _check_sigs_and_hash_and_fetch(
+        self,
+        origin: str,
+        pdus: List[EventBase],
+        room_version: str,
+        outlier: bool = False,
+        include_none: bool = False,
+    ) -> List[EventBase]:
+        """Takes a list of PDUs and checks the signatures and hashs of each
+        one. If a PDU fails its signature check then we check if we have it in
+        the database and if not then request if from the originating server of
+        that PDU.
+
+        If a PDU fails its content hash check then it is redacted.
+
+        The given list of PDUs are not modified, instead the function returns
+        a new list.
+
+        Args:
+            origin
+            pdu
+            room_version
+            outlier: Whether the events are outliers or not
+            include_none: Whether to include None in the returned list
+                for events that have failed their checks
+
+        Returns:
+            Deferred : A list of PDUs that have valid signatures and hashes.
+        """
+        deferreds = self._check_sigs_and_hashes(room_version, pdus)
+
+        @defer.inlineCallbacks
+        def handle_check_result(pdu: EventBase, deferred: Deferred):
+            try:
+                res = yield make_deferred_yieldable(deferred)
+            except SynapseError:
+                res = None
+
+            if not res:
+                # Check local db.
+                res = yield self.store.get_event(
+                    pdu.event_id, allow_rejected=True, allow_none=True
+                )
+
+            if not res and pdu.origin != origin:
+                try:
+                    res = yield defer.ensureDeferred(
+                        self.get_pdu(
+                            destinations=[pdu.origin],
+                            event_id=pdu.event_id,
+                            room_version=room_version,  # type: ignore
+                            outlier=outlier,
+                            timeout=10000,
+                        )
+                    )
+                except SynapseError:
+                    pass
+
+            if not res:
+                logger.warning(
+                    "Failed to find copy of %s with valid signature", pdu.event_id
+                )
+
+            return res
+
+        handle = preserve_fn(handle_check_result)
+        deferreds2 = [handle(pdu, deferred) for pdu, deferred in zip(pdus, deferreds)]
+
+        valid_pdus = await make_deferred_yieldable(
+            defer.gatherResults(deferreds2, consumeErrors=True)
+        ).addErrback(unwrapFirstError)
+
+        if include_none:
+            return valid_pdus
+        else:
+            return [p for p in valid_pdus if p]
+
     async def get_event_auth(self, destination, room_id, event_id):
         res = await self.transport_layer.get_event_auth(destination, room_id, event_id)