diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index fb57e42287..86a611c49c 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -70,7 +70,7 @@ class FederationSender(object):
self._transaction_manager = TransactionManager(hs)
self._instance_name = hs.get_instance_name()
- self._federation_shard_config = hs.config.federation.federation_shard_config
+ self._federation_shard_config = hs.config.worker.federation_shard_config
# map from destination to PerDestinationQueue
self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue]
@@ -306,8 +306,7 @@ class FederationSender(object):
for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu, order)
- @defer.inlineCallbacks
- def send_read_receipt(self, receipt: ReadReceipt):
+ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room
Args:
@@ -348,9 +347,7 @@ class FederationSender(object):
room_id = receipt.room_id
# Work out which remote servers should be poked and poke them.
- domains = yield defer.ensureDeferred(
- self.state.get_current_hosts_in_room(room_id)
- )
+ domains = await self.state.get_current_hosts_in_room(room_id)
domains = [
d
for d in domains
@@ -405,8 +402,7 @@ class FederationSender(object):
queue.flush_read_receipts_for_room(room_id)
@preserve_fn # the caller should not yield on this
- @defer.inlineCallbacks
- def send_presence(self, states: List[UserPresenceState]):
+ async def send_presence(self, states: List[UserPresenceState]):
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
@@ -441,7 +437,7 @@ class FederationSender(object):
if not states_map:
break
- yield self._process_presence_inner(list(states_map.values()))
+ await self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
@@ -468,14 +464,11 @@ class FederationSender(object):
self._get_per_destination_queue(destination).send_presence(states)
@measure_func("txnqueue._process_presence")
- @defer.inlineCallbacks
- def _process_presence_inner(self, states: List[UserPresenceState]):
+ async def _process_presence_inner(self, states: List[UserPresenceState]):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
- hosts_and_states = yield defer.ensureDeferred(
- get_interested_remotes(self.store, states, self.state)
- )
+ hosts_and_states = await get_interested_remotes(self.store, states, self.state)
for destinations, states in hosts_and_states:
for destination in destinations:
|