summary refs log tree commit diff
path: root/synapse/federation/federation_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/federation_client.py')
-rw-r--r--synapse/federation/federation_client.py217
1 files changed, 108 insertions, 109 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 70573746d6..3883eb525e 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -57,6 +57,7 @@ class InvalidResponseError(RuntimeError):
     """Helper for _try_destination_list: indicates that the server returned a response
     we couldn't parse
     """
+
     pass
 
 
@@ -65,9 +66,7 @@ class FederationClient(FederationBase):
         super(FederationClient, self).__init__(hs)
 
         self.pdu_destination_tried = {}
-        self._clock.looping_call(
-            self._clear_tried_cache, 60 * 1000,
-        )
+        self._clock.looping_call(self._clear_tried_cache, 60 * 1000)
         self.state = hs.get_state_handler()
         self.transport_layer = hs.get_federation_transport_client()
 
@@ -99,8 +98,14 @@ class FederationClient(FederationBase):
                 self.pdu_destination_tried[event_id] = destination_dict
 
     @log_function
-    def make_query(self, destination, query_type, args,
-                   retry_on_dns_fail=False, ignore_backoff=False):
+    def make_query(
+        self,
+        destination,
+        query_type,
+        args,
+        retry_on_dns_fail=False,
+        ignore_backoff=False,
+    ):
         """Sends a federation Query to a remote homeserver of the given type
         and arguments.
 
@@ -120,7 +125,10 @@ class FederationClient(FederationBase):
         sent_queries_counter.labels(query_type).inc()
 
         return self.transport_layer.make_query(
-            destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail,
+            destination,
+            query_type,
+            args,
+            retry_on_dns_fail=retry_on_dns_fail,
             ignore_backoff=ignore_backoff,
         )
 
@@ -137,9 +145,7 @@ class FederationClient(FederationBase):
             response
         """
         sent_queries_counter.labels("client_device_keys").inc()
-        return self.transport_layer.query_client_keys(
-            destination, content, timeout
-        )
+        return self.transport_layer.query_client_keys(destination, content, timeout)
 
     @log_function
     def query_user_devices(self, destination, user_id, timeout=30000):
@@ -147,9 +153,7 @@ class FederationClient(FederationBase):
         server.
         """
         sent_queries_counter.labels("user_devices").inc()
-        return self.transport_layer.query_user_devices(
-            destination, user_id, timeout
-        )
+        return self.transport_layer.query_user_devices(destination, user_id, timeout)
 
     @log_function
     def claim_client_keys(self, destination, content, timeout):
@@ -164,9 +168,7 @@ class FederationClient(FederationBase):
             response
         """
         sent_queries_counter.labels("client_one_time_keys").inc()
-        return self.transport_layer.claim_client_keys(
-            destination, content, timeout
-        )
+        return self.transport_layer.claim_client_keys(destination, content, timeout)
 
     @defer.inlineCallbacks
     @log_function
@@ -191,7 +193,8 @@ class FederationClient(FederationBase):
             return
 
         transaction_data = yield self.transport_layer.backfill(
-            dest, room_id, extremities, limit)
+            dest, room_id, extremities, limit
+        )
 
         logger.debug("backfill transaction_data=%s", repr(transaction_data))
 
@@ -204,17 +207,19 @@ class FederationClient(FederationBase):
         ]
 
         # FIXME: We should handle signature failures more gracefully.
-        pdus[:] = yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            self._check_sigs_and_hashes(room_version, pdus),
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError))
+        pdus[:] = yield logcontext.make_deferred_yieldable(
+            defer.gatherResults(
+                self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True
+            ).addErrback(unwrapFirstError)
+        )
 
         defer.returnValue(pdus)
 
     @defer.inlineCallbacks
     @log_function
-    def get_pdu(self, destinations, event_id, room_version, outlier=False,
-                timeout=None):
+    def get_pdu(
+        self, destinations, event_id, room_version, outlier=False, timeout=None
+    ):
         """Requests the PDU with given origin and ID from the remote home
         servers.
 
@@ -255,7 +260,7 @@ class FederationClient(FederationBase):
 
             try:
                 transaction_data = yield self.transport_layer.get_event(
-                    destination, event_id, timeout=timeout,
+                    destination, event_id, timeout=timeout
                 )
 
                 logger.debug(
@@ -282,8 +287,7 @@ class FederationClient(FederationBase):
 
             except SynapseError as e:
                 logger.info(
-                    "Failed to get PDU %s from %s because %s",
-                    event_id, destination, e,
+                    "Failed to get PDU %s from %s because %s", event_id, destination, e
                 )
                 continue
             except NotRetryingDestination as e:
@@ -296,8 +300,7 @@ class FederationClient(FederationBase):
                 pdu_attempts[destination] = now
 
                 logger.info(
-                    "Failed to get PDU %s from %s because %s",
-                    event_id, destination, e,
+                    "Failed to get PDU %s from %s because %s", event_id, destination, e
                 )
                 continue
 
@@ -326,7 +329,7 @@ class FederationClient(FederationBase):
             # we have most of the state and auth_chain already.
             # However, this may 404 if the other side has an old synapse.
             result = yield self.transport_layer.get_room_state_ids(
-                destination, room_id, event_id=event_id,
+                destination, room_id, event_id=event_id
             )
 
             state_event_ids = result["pdu_ids"]
@@ -340,12 +343,10 @@ class FederationClient(FederationBase):
                 logger.warning(
                     "Failed to fetch missing state/auth events for %s: %s",
                     room_id,
-                    failed_to_fetch
+                    failed_to_fetch,
                 )
 
-            event_map = {
-                ev.event_id: ev for ev in fetched_events
-            }
+            event_map = {ev.event_id: ev for ev in fetched_events}
 
             pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map]
             auth_chain = [
@@ -362,15 +363,14 @@ class FederationClient(FederationBase):
                 raise e
 
         result = yield self.transport_layer.get_room_state(
-            destination, room_id, event_id=event_id,
+            destination, room_id, event_id=event_id
         )
 
         room_version = yield self.store.get_room_version(room_id)
         format_ver = room_version_to_event_format(room_version)
 
         pdus = [
-            event_from_pdu_json(p, format_ver, outlier=True)
-            for p in result["pdus"]
+            event_from_pdu_json(p, format_ver, outlier=True) for p in result["pdus"]
         ]
 
         auth_chain = [
@@ -378,9 +378,9 @@ class FederationClient(FederationBase):
             for p in result.get("auth_chain", [])
         ]
 
-        seen_events = yield self.store.get_events([
-            ev.event_id for ev in itertools.chain(pdus, auth_chain)
-        ])
+        seen_events = yield self.store.get_events(
+            [ev.event_id for ev in itertools.chain(pdus, auth_chain)]
+        )
 
         signed_pdus = yield self._check_sigs_and_hash_and_fetch(
             destination,
@@ -442,7 +442,7 @@ class FederationClient(FederationBase):
         batch_size = 20
         missing_events = list(missing_events)
         for i in range(0, len(missing_events), batch_size):
-            batch = set(missing_events[i:i + batch_size])
+            batch = set(missing_events[i : i + batch_size])
 
             deferreds = [
                 run_in_background(
@@ -470,21 +470,17 @@ class FederationClient(FederationBase):
     @defer.inlineCallbacks
     @log_function
     def get_event_auth(self, destination, room_id, event_id):
-        res = yield self.transport_layer.get_event_auth(
-            destination, room_id, event_id,
-        )
+        res = yield self.transport_layer.get_event_auth(destination, room_id, event_id)
 
         room_version = yield self.store.get_room_version(room_id)
         format_ver = room_version_to_event_format(room_version)
 
         auth_chain = [
-            event_from_pdu_json(p, format_ver, outlier=True)
-            for p in res["auth_chain"]
+            event_from_pdu_json(p, format_ver, outlier=True) for p in res["auth_chain"]
         ]
 
         signed_auth = yield self._check_sigs_and_hash_and_fetch(
-            destination, auth_chain,
-            outlier=True, room_version=room_version,
+            destination, auth_chain, outlier=True, room_version=room_version
         )
 
         signed_auth.sort(key=lambda e: e.depth)
@@ -527,28 +523,26 @@ class FederationClient(FederationBase):
                 res = yield callback(destination)
                 defer.returnValue(res)
             except InvalidResponseError as e:
-                logger.warn(
-                    "Failed to %s via %s: %s",
-                    description, destination, e,
-                )
+                logger.warn("Failed to %s via %s: %s", description, destination, e)
             except HttpResponseException as e:
                 if not 500 <= e.code < 600:
                     raise e.to_synapse_error()
                 else:
                     logger.warn(
                         "Failed to %s via %s: %i %s",
-                        description, destination, e.code, e.args[0],
+                        description,
+                        destination,
+                        e.code,
+                        e.args[0],
                     )
             except Exception:
-                logger.warn(
-                    "Failed to %s via %s",
-                    description, destination, exc_info=1,
-                )
+                logger.warn("Failed to %s via %s", description, destination, exc_info=1)
 
-        raise RuntimeError("Failed to %s via any server" % (description, ))
+        raise RuntimeError("Failed to %s via any server" % (description,))
 
-    def make_membership_event(self, destinations, room_id, user_id, membership,
-                              content, params):
+    def make_membership_event(
+        self, destinations, room_id, user_id, membership, content, params
+    ):
         """
         Creates an m.room.member event, with context, without participating in the room.
 
@@ -584,14 +578,14 @@ class FederationClient(FederationBase):
         valid_memberships = {Membership.JOIN, Membership.LEAVE}
         if membership not in valid_memberships:
             raise RuntimeError(
-                "make_membership_event called with membership='%s', must be one of %s" %
-                (membership, ",".join(valid_memberships))
+                "make_membership_event called with membership='%s', must be one of %s"
+                % (membership, ",".join(valid_memberships))
             )
 
         @defer.inlineCallbacks
         def send_request(destination):
             ret = yield self.transport_layer.make_membership_event(
-                destination, room_id, user_id, membership, params,
+                destination, room_id, user_id, membership, params
             )
 
             # Note: If not supplied, the room version may be either v1 or v2,
@@ -614,16 +608,17 @@ class FederationClient(FederationBase):
                 pdu_dict["prev_state"] = []
 
             ev = builder.create_local_event_from_event_dict(
-                self._clock, self.hostname, self.signing_key,
-                format_version=event_format, event_dict=pdu_dict,
+                self._clock,
+                self.hostname,
+                self.signing_key,
+                format_version=event_format,
+                event_dict=pdu_dict,
             )
 
-            defer.returnValue(
-                (destination, ev, event_format)
-            )
+            defer.returnValue((destination, ev, event_format))
 
         return self._try_destination_list(
-            "make_" + membership, destinations, send_request,
+            "make_" + membership, destinations, send_request
         )
 
     def send_join(self, destinations, pdu, event_format_version):
@@ -655,9 +650,7 @@ class FederationClient(FederationBase):
                     create_event = e
                     break
             else:
-                raise InvalidResponseError(
-                    "no %s in auth chain" % (EventTypes.Create,),
-                )
+                raise InvalidResponseError("no %s in auth chain" % (EventTypes.Create,))
 
             # the room version should be sane.
             room_version = create_event.content.get("room_version", "1")
@@ -665,9 +658,8 @@ class FederationClient(FederationBase):
                 # This shouldn't be possible, because the remote server should have
                 # rejected the join attempt during make_join.
                 raise InvalidResponseError(
-                    "room appears to have unsupported version %s" % (
-                        room_version,
-                    ))
+                    "room appears to have unsupported version %s" % (room_version,)
+                )
 
         @defer.inlineCallbacks
         def send_request(destination):
@@ -691,10 +683,7 @@ class FederationClient(FederationBase):
                 for p in content.get("auth_chain", [])
             ]
 
-            pdus = {
-                p.event_id: p
-                for p in itertools.chain(state, auth_chain)
-            }
+            pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)}
 
             room_version = None
             for e in state:
@@ -710,15 +699,13 @@ class FederationClient(FederationBase):
                 raise SynapseError(400, "No create event in state")
 
             valid_pdus = yield self._check_sigs_and_hash_and_fetch(
-                destination, list(pdus.values()),
+                destination,
+                list(pdus.values()),
                 outlier=True,
                 room_version=room_version,
             )
 
-            valid_pdus_map = {
-                p.event_id: p
-                for p in valid_pdus
-            }
+            valid_pdus_map = {p.event_id: p for p in valid_pdus}
 
             # NB: We *need* to copy to ensure that we don't have multiple
             # references being passed on, as that causes... issues.
@@ -741,11 +728,14 @@ class FederationClient(FederationBase):
 
             check_authchain_validity(signed_auth)
 
-            defer.returnValue({
-                "state": signed_state,
-                "auth_chain": signed_auth,
-                "origin": destination,
-            })
+            defer.returnValue(
+                {
+                    "state": signed_state,
+                    "auth_chain": signed_auth,
+                    "origin": destination,
+                }
+            )
+
         return self._try_destination_list("send_join", destinations, send_request)
 
     @defer.inlineCallbacks
@@ -854,6 +844,7 @@ class FederationClient(FederationBase):
 
             Fails with a ``RuntimeError`` if no servers were reachable.
         """
+
         @defer.inlineCallbacks
         def send_request(destination):
             time_now = self._clock.time_msec()
@@ -869,14 +860,23 @@ class FederationClient(FederationBase):
 
         return self._try_destination_list("send_leave", destinations, send_request)
 
-    def get_public_rooms(self, destination, limit=None, since_token=None,
-                         search_filter=None, include_all_networks=False,
-                         third_party_instance_id=None):
+    def get_public_rooms(
+        self,
+        destination,
+        limit=None,
+        since_token=None,
+        search_filter=None,
+        include_all_networks=False,
+        third_party_instance_id=None,
+    ):
         if destination == self.server_name:
             return
 
         return self.transport_layer.get_public_rooms(
-            destination, limit, since_token, search_filter,
+            destination,
+            limit,
+            since_token,
+            search_filter,
             include_all_networks=include_all_networks,
             third_party_instance_id=third_party_instance_id,
         )
@@ -891,9 +891,7 @@ class FederationClient(FederationBase):
         """
         time_now = self._clock.time_msec()
 
-        send_content = {
-            "auth_chain": [e.get_pdu_json(time_now) for e in local_auth],
-        }
+        send_content = {"auth_chain": [e.get_pdu_json(time_now) for e in local_auth]}
 
         code, content = yield self.transport_layer.send_query_auth(
             destination=destination,
@@ -905,13 +903,10 @@ class FederationClient(FederationBase):
         room_version = yield self.store.get_room_version(room_id)
         format_ver = room_version_to_event_format(room_version)
 
-        auth_chain = [
-            event_from_pdu_json(e, format_ver)
-            for e in content["auth_chain"]
-        ]
+        auth_chain = [event_from_pdu_json(e, format_ver) for e in content["auth_chain"]]
 
         signed_auth = yield self._check_sigs_and_hash_and_fetch(
-            destination, auth_chain, outlier=True, room_version=room_version,
+            destination, auth_chain, outlier=True, room_version=room_version
         )
 
         signed_auth.sort(key=lambda e: e.depth)
@@ -925,8 +920,16 @@ class FederationClient(FederationBase):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def get_missing_events(self, destination, room_id, earliest_events_ids,
-                           latest_events, limit, min_depth, timeout):
+    def get_missing_events(
+        self,
+        destination,
+        room_id,
+        earliest_events_ids,
+        latest_events,
+        limit,
+        min_depth,
+        timeout,
+    ):
         """Tries to fetch events we are missing. This is called when we receive
         an event without having received all of its ancestors.
 
@@ -957,12 +960,11 @@ class FederationClient(FederationBase):
             format_ver = room_version_to_event_format(room_version)
 
             events = [
-                event_from_pdu_json(e, format_ver)
-                for e in content.get("events", [])
+                event_from_pdu_json(e, format_ver) for e in content.get("events", [])
             ]
 
             signed_events = yield self._check_sigs_and_hash_and_fetch(
-                destination, events, outlier=False, room_version=room_version,
+                destination, events, outlier=False, room_version=room_version
             )
         except HttpResponseException as e:
             if not e.code == 400:
@@ -982,17 +984,14 @@ class FederationClient(FederationBase):
 
             try:
                 yield self.transport_layer.exchange_third_party_invite(
-                    destination=destination,
-                    room_id=room_id,
-                    event_dict=event_dict,
+                    destination=destination, room_id=room_id, event_dict=event_dict
                 )
                 defer.returnValue(None)
             except CodeMessageException:
                 raise
             except Exception as e:
                 logger.exception(
-                    "Failed to send_third_party_invite via %s: %s",
-                    destination, str(e)
+                    "Failed to send_third_party_invite via %s: %s", destination, str(e)
                 )
 
         raise RuntimeError("Failed to send to any server.")