diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 70573746d6..3883eb525e 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -57,6 +57,7 @@ class InvalidResponseError(RuntimeError):
"""Helper for _try_destination_list: indicates that the server returned a response
we couldn't parse
"""
+
pass
@@ -65,9 +66,7 @@ class FederationClient(FederationBase):
super(FederationClient, self).__init__(hs)
self.pdu_destination_tried = {}
- self._clock.looping_call(
- self._clear_tried_cache, 60 * 1000,
- )
+ self._clock.looping_call(self._clear_tried_cache, 60 * 1000)
self.state = hs.get_state_handler()
self.transport_layer = hs.get_federation_transport_client()
@@ -99,8 +98,14 @@ class FederationClient(FederationBase):
self.pdu_destination_tried[event_id] = destination_dict
@log_function
- def make_query(self, destination, query_type, args,
- retry_on_dns_fail=False, ignore_backoff=False):
+ def make_query(
+ self,
+ destination,
+ query_type,
+ args,
+ retry_on_dns_fail=False,
+ ignore_backoff=False,
+ ):
"""Sends a federation Query to a remote homeserver of the given type
and arguments.
@@ -120,7 +125,10 @@ class FederationClient(FederationBase):
sent_queries_counter.labels(query_type).inc()
return self.transport_layer.make_query(
- destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail,
+ destination,
+ query_type,
+ args,
+ retry_on_dns_fail=retry_on_dns_fail,
ignore_backoff=ignore_backoff,
)
@@ -137,9 +145,7 @@ class FederationClient(FederationBase):
response
"""
sent_queries_counter.labels("client_device_keys").inc()
- return self.transport_layer.query_client_keys(
- destination, content, timeout
- )
+ return self.transport_layer.query_client_keys(destination, content, timeout)
@log_function
def query_user_devices(self, destination, user_id, timeout=30000):
@@ -147,9 +153,7 @@ class FederationClient(FederationBase):
server.
"""
sent_queries_counter.labels("user_devices").inc()
- return self.transport_layer.query_user_devices(
- destination, user_id, timeout
- )
+ return self.transport_layer.query_user_devices(destination, user_id, timeout)
@log_function
def claim_client_keys(self, destination, content, timeout):
@@ -164,9 +168,7 @@ class FederationClient(FederationBase):
response
"""
sent_queries_counter.labels("client_one_time_keys").inc()
- return self.transport_layer.claim_client_keys(
- destination, content, timeout
- )
+ return self.transport_layer.claim_client_keys(destination, content, timeout)
@defer.inlineCallbacks
@log_function
@@ -191,7 +193,8 @@ class FederationClient(FederationBase):
return
transaction_data = yield self.transport_layer.backfill(
- dest, room_id, extremities, limit)
+ dest, room_id, extremities, limit
+ )
logger.debug("backfill transaction_data=%s", repr(transaction_data))
@@ -204,17 +207,19 @@ class FederationClient(FederationBase):
]
# FIXME: We should handle signature failures more gracefully.
- pdus[:] = yield logcontext.make_deferred_yieldable(defer.gatherResults(
- self._check_sigs_and_hashes(room_version, pdus),
- consumeErrors=True,
- ).addErrback(unwrapFirstError))
+ pdus[:] = yield logcontext.make_deferred_yieldable(
+ defer.gatherResults(
+ self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True
+ ).addErrback(unwrapFirstError)
+ )
defer.returnValue(pdus)
@defer.inlineCallbacks
@log_function
- def get_pdu(self, destinations, event_id, room_version, outlier=False,
- timeout=None):
+ def get_pdu(
+ self, destinations, event_id, room_version, outlier=False, timeout=None
+ ):
"""Requests the PDU with given origin and ID from the remote home
servers.
@@ -255,7 +260,7 @@ class FederationClient(FederationBase):
try:
transaction_data = yield self.transport_layer.get_event(
- destination, event_id, timeout=timeout,
+ destination, event_id, timeout=timeout
)
logger.debug(
@@ -282,8 +287,7 @@ class FederationClient(FederationBase):
except SynapseError as e:
logger.info(
- "Failed to get PDU %s from %s because %s",
- event_id, destination, e,
+ "Failed to get PDU %s from %s because %s", event_id, destination, e
)
continue
except NotRetryingDestination as e:
@@ -296,8 +300,7 @@ class FederationClient(FederationBase):
pdu_attempts[destination] = now
logger.info(
- "Failed to get PDU %s from %s because %s",
- event_id, destination, e,
+ "Failed to get PDU %s from %s because %s", event_id, destination, e
)
continue
@@ -326,7 +329,7 @@ class FederationClient(FederationBase):
# we have most of the state and auth_chain already.
# However, this may 404 if the other side has an old synapse.
result = yield self.transport_layer.get_room_state_ids(
- destination, room_id, event_id=event_id,
+ destination, room_id, event_id=event_id
)
state_event_ids = result["pdu_ids"]
@@ -340,12 +343,10 @@ class FederationClient(FederationBase):
logger.warning(
"Failed to fetch missing state/auth events for %s: %s",
room_id,
- failed_to_fetch
+ failed_to_fetch,
)
- event_map = {
- ev.event_id: ev for ev in fetched_events
- }
+ event_map = {ev.event_id: ev for ev in fetched_events}
pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map]
auth_chain = [
@@ -362,15 +363,14 @@ class FederationClient(FederationBase):
raise e
result = yield self.transport_layer.get_room_state(
- destination, room_id, event_id=event_id,
+ destination, room_id, event_id=event_id
)
room_version = yield self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version)
pdus = [
- event_from_pdu_json(p, format_ver, outlier=True)
- for p in result["pdus"]
+ event_from_pdu_json(p, format_ver, outlier=True) for p in result["pdus"]
]
auth_chain = [
@@ -378,9 +378,9 @@ class FederationClient(FederationBase):
for p in result.get("auth_chain", [])
]
- seen_events = yield self.store.get_events([
- ev.event_id for ev in itertools.chain(pdus, auth_chain)
- ])
+ seen_events = yield self.store.get_events(
+ [ev.event_id for ev in itertools.chain(pdus, auth_chain)]
+ )
signed_pdus = yield self._check_sigs_and_hash_and_fetch(
destination,
@@ -442,7 +442,7 @@ class FederationClient(FederationBase):
batch_size = 20
missing_events = list(missing_events)
for i in range(0, len(missing_events), batch_size):
- batch = set(missing_events[i:i + batch_size])
+ batch = set(missing_events[i : i + batch_size])
deferreds = [
run_in_background(
@@ -470,21 +470,17 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
@log_function
def get_event_auth(self, destination, room_id, event_id):
- res = yield self.transport_layer.get_event_auth(
- destination, room_id, event_id,
- )
+ res = yield self.transport_layer.get_event_auth(destination, room_id, event_id)
room_version = yield self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version)
auth_chain = [
- event_from_pdu_json(p, format_ver, outlier=True)
- for p in res["auth_chain"]
+ event_from_pdu_json(p, format_ver, outlier=True) for p in res["auth_chain"]
]
signed_auth = yield self._check_sigs_and_hash_and_fetch(
- destination, auth_chain,
- outlier=True, room_version=room_version,
+ destination, auth_chain, outlier=True, room_version=room_version
)
signed_auth.sort(key=lambda e: e.depth)
@@ -527,28 +523,26 @@ class FederationClient(FederationBase):
res = yield callback(destination)
defer.returnValue(res)
except InvalidResponseError as e:
- logger.warn(
- "Failed to %s via %s: %s",
- description, destination, e,
- )
+ logger.warn("Failed to %s via %s: %s", description, destination, e)
except HttpResponseException as e:
if not 500 <= e.code < 600:
raise e.to_synapse_error()
else:
logger.warn(
"Failed to %s via %s: %i %s",
- description, destination, e.code, e.args[0],
+ description,
+ destination,
+ e.code,
+ e.args[0],
)
except Exception:
- logger.warn(
- "Failed to %s via %s",
- description, destination, exc_info=1,
- )
+ logger.warn("Failed to %s via %s", description, destination, exc_info=1)
- raise RuntimeError("Failed to %s via any server" % (description, ))
+ raise RuntimeError("Failed to %s via any server" % (description,))
- def make_membership_event(self, destinations, room_id, user_id, membership,
- content, params):
+ def make_membership_event(
+ self, destinations, room_id, user_id, membership, content, params
+ ):
"""
Creates an m.room.member event, with context, without participating in the room.
@@ -584,14 +578,14 @@ class FederationClient(FederationBase):
valid_memberships = {Membership.JOIN, Membership.LEAVE}
if membership not in valid_memberships:
raise RuntimeError(
- "make_membership_event called with membership='%s', must be one of %s" %
- (membership, ",".join(valid_memberships))
+ "make_membership_event called with membership='%s', must be one of %s"
+ % (membership, ",".join(valid_memberships))
)
@defer.inlineCallbacks
def send_request(destination):
ret = yield self.transport_layer.make_membership_event(
- destination, room_id, user_id, membership, params,
+ destination, room_id, user_id, membership, params
)
# Note: If not supplied, the room version may be either v1 or v2,
@@ -614,16 +608,17 @@ class FederationClient(FederationBase):
pdu_dict["prev_state"] = []
ev = builder.create_local_event_from_event_dict(
- self._clock, self.hostname, self.signing_key,
- format_version=event_format, event_dict=pdu_dict,
+ self._clock,
+ self.hostname,
+ self.signing_key,
+ format_version=event_format,
+ event_dict=pdu_dict,
)
- defer.returnValue(
- (destination, ev, event_format)
- )
+ defer.returnValue((destination, ev, event_format))
return self._try_destination_list(
- "make_" + membership, destinations, send_request,
+ "make_" + membership, destinations, send_request
)
def send_join(self, destinations, pdu, event_format_version):
@@ -655,9 +650,7 @@ class FederationClient(FederationBase):
create_event = e
break
else:
- raise InvalidResponseError(
- "no %s in auth chain" % (EventTypes.Create,),
- )
+ raise InvalidResponseError("no %s in auth chain" % (EventTypes.Create,))
# the room version should be sane.
room_version = create_event.content.get("room_version", "1")
@@ -665,9 +658,8 @@ class FederationClient(FederationBase):
# This shouldn't be possible, because the remote server should have
# rejected the join attempt during make_join.
raise InvalidResponseError(
- "room appears to have unsupported version %s" % (
- room_version,
- ))
+ "room appears to have unsupported version %s" % (room_version,)
+ )
@defer.inlineCallbacks
def send_request(destination):
@@ -691,10 +683,7 @@ class FederationClient(FederationBase):
for p in content.get("auth_chain", [])
]
- pdus = {
- p.event_id: p
- for p in itertools.chain(state, auth_chain)
- }
+ pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)}
room_version = None
for e in state:
@@ -710,15 +699,13 @@ class FederationClient(FederationBase):
raise SynapseError(400, "No create event in state")
valid_pdus = yield self._check_sigs_and_hash_and_fetch(
- destination, list(pdus.values()),
+ destination,
+ list(pdus.values()),
outlier=True,
room_version=room_version,
)
- valid_pdus_map = {
- p.event_id: p
- for p in valid_pdus
- }
+ valid_pdus_map = {p.event_id: p for p in valid_pdus}
# NB: We *need* to copy to ensure that we don't have multiple
# references being passed on, as that causes... issues.
@@ -741,11 +728,14 @@ class FederationClient(FederationBase):
check_authchain_validity(signed_auth)
- defer.returnValue({
- "state": signed_state,
- "auth_chain": signed_auth,
- "origin": destination,
- })
+ defer.returnValue(
+ {
+ "state": signed_state,
+ "auth_chain": signed_auth,
+ "origin": destination,
+ }
+ )
+
return self._try_destination_list("send_join", destinations, send_request)
@defer.inlineCallbacks
@@ -854,6 +844,7 @@ class FederationClient(FederationBase):
Fails with a ``RuntimeError`` if no servers were reachable.
"""
+
@defer.inlineCallbacks
def send_request(destination):
time_now = self._clock.time_msec()
@@ -869,14 +860,23 @@ class FederationClient(FederationBase):
return self._try_destination_list("send_leave", destinations, send_request)
- def get_public_rooms(self, destination, limit=None, since_token=None,
- search_filter=None, include_all_networks=False,
- third_party_instance_id=None):
+ def get_public_rooms(
+ self,
+ destination,
+ limit=None,
+ since_token=None,
+ search_filter=None,
+ include_all_networks=False,
+ third_party_instance_id=None,
+ ):
if destination == self.server_name:
return
return self.transport_layer.get_public_rooms(
- destination, limit, since_token, search_filter,
+ destination,
+ limit,
+ since_token,
+ search_filter,
include_all_networks=include_all_networks,
third_party_instance_id=third_party_instance_id,
)
@@ -891,9 +891,7 @@ class FederationClient(FederationBase):
"""
time_now = self._clock.time_msec()
- send_content = {
- "auth_chain": [e.get_pdu_json(time_now) for e in local_auth],
- }
+ send_content = {"auth_chain": [e.get_pdu_json(time_now) for e in local_auth]}
code, content = yield self.transport_layer.send_query_auth(
destination=destination,
@@ -905,13 +903,10 @@ class FederationClient(FederationBase):
room_version = yield self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version)
- auth_chain = [
- event_from_pdu_json(e, format_ver)
- for e in content["auth_chain"]
- ]
+ auth_chain = [event_from_pdu_json(e, format_ver) for e in content["auth_chain"]]
signed_auth = yield self._check_sigs_and_hash_and_fetch(
- destination, auth_chain, outlier=True, room_version=room_version,
+ destination, auth_chain, outlier=True, room_version=room_version
)
signed_auth.sort(key=lambda e: e.depth)
@@ -925,8 +920,16 @@ class FederationClient(FederationBase):
defer.returnValue(ret)
@defer.inlineCallbacks
- def get_missing_events(self, destination, room_id, earliest_events_ids,
- latest_events, limit, min_depth, timeout):
+ def get_missing_events(
+ self,
+ destination,
+ room_id,
+ earliest_events_ids,
+ latest_events,
+ limit,
+ min_depth,
+ timeout,
+ ):
"""Tries to fetch events we are missing. This is called when we receive
an event without having received all of its ancestors.
@@ -957,12 +960,11 @@ class FederationClient(FederationBase):
format_ver = room_version_to_event_format(room_version)
events = [
- event_from_pdu_json(e, format_ver)
- for e in content.get("events", [])
+ event_from_pdu_json(e, format_ver) for e in content.get("events", [])
]
signed_events = yield self._check_sigs_and_hash_and_fetch(
- destination, events, outlier=False, room_version=room_version,
+ destination, events, outlier=False, room_version=room_version
)
except HttpResponseException as e:
if not e.code == 400:
@@ -982,17 +984,14 @@ class FederationClient(FederationBase):
try:
yield self.transport_layer.exchange_third_party_invite(
- destination=destination,
- room_id=room_id,
- event_dict=event_dict,
+ destination=destination, room_id=room_id, event_dict=event_dict
)
defer.returnValue(None)
except CodeMessageException:
raise
except Exception as e:
logger.exception(
- "Failed to send_third_party_invite via %s: %s",
- destination, str(e)
+ "Failed to send_third_party_invite via %s: %s", destination, str(e)
)
raise RuntimeError("Failed to send to any server.")
|