diff options
author | Brendan Abolivier <babolivier@matrix.org> | 2020-07-30 19:00:29 +0100 |
---|---|---|
committer | Brendan Abolivier <babolivier@matrix.org> | 2020-07-30 19:00:29 +0100 |
commit | 69158e554f30ac8b6b646a62fa496a2c0005dea6 (patch) | |
tree | 42fdb177abede9c0128906d4e6661cde0ee9cd6c /synapse/federation/sender/__init__.py | |
parent | Changelog (diff) | |
parent | Update workers docs (#7990) (diff) | |
download | synapse-69158e554f30ac8b6b646a62fa496a2c0005dea6.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into babolivier/new_push_rules
Diffstat (limited to 'synapse/federation/sender/__init__.py')
-rw-r--r-- | synapse/federation/sender/__init__.py | 17 |
1 files changed, 7 insertions, 10 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index b328a4df09..94cc63001e 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -70,7 +70,7 @@ class FederationSender(object): self._transaction_manager = TransactionManager(hs) self._instance_name = hs.get_instance_name() - self._federation_shard_config = hs.config.federation.federation_shard_config + self._federation_shard_config = hs.config.worker.federation_shard_config # map from destination to PerDestinationQueue self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue] @@ -288,8 +288,7 @@ class FederationSender(object): for destination in destinations: self._get_per_destination_queue(destination).send_pdu(pdu, order) - @defer.inlineCallbacks - def send_read_receipt(self, receipt: ReadReceipt): + async def send_read_receipt(self, receipt: ReadReceipt) -> None: """Send a RR to any other servers in the room Args: @@ -330,7 +329,7 @@ class FederationSender(object): room_id = receipt.room_id # Work out which remote servers should be poked and poke them. - domains = yield self.state.get_current_hosts_in_room(room_id) + domains = await self.state.get_current_hosts_in_room(room_id) domains = [ d for d in domains @@ -385,8 +384,7 @@ class FederationSender(object): queue.flush_read_receipts_for_room(room_id) @preserve_fn # the caller should not yield on this - @defer.inlineCallbacks - def send_presence(self, states: List[UserPresenceState]): + async def send_presence(self, states: List[UserPresenceState]): """Send the new presence states to the appropriate destinations. This actually queues up the presence states ready for sending and @@ -421,7 +419,7 @@ class FederationSender(object): if not states_map: break - yield self._process_presence_inner(list(states_map.values())) + await self._process_presence_inner(list(states_map.values())) except Exception: logger.exception("Error sending presence states to servers") finally: @@ -448,12 +446,11 @@ class FederationSender(object): self._get_per_destination_queue(destination).send_presence(states) @measure_func("txnqueue._process_presence") - @defer.inlineCallbacks - def _process_presence_inner(self, states: List[UserPresenceState]): + async def _process_presence_inner(self, states: List[UserPresenceState]): """Given a list of states populate self.pending_presence_by_dest and poke to send a new transaction to each destination """ - hosts_and_states = yield get_interested_remotes(self.store, states, self.state) + hosts_and_states = await get_interested_remotes(self.store, states, self.state) for destinations, states in hosts_and_states: for destination in destinations: |