diff options
author | Erik Johnston <erik@matrix.org> | 2016-11-16 14:28:03 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-11-16 14:47:52 +0000 |
commit | 59ef517e6bc63b2613f18c9b85356a0f973f5698 (patch) | |
tree | 277cafe08a8b1dff46acbee2273c59a859386a53 /synapse/federation | |
parent | Add transaction queue and transport layer to DI (diff) | |
download | synapse-59ef517e6bc63b2613f18c9b85356a0f973f5698.tar.xz |
Use new federation_sender DI
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_client.py | 49 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 10 |
2 files changed, 10 insertions, 49 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 0fe21ac8d7..b255709165 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -44,10 +44,6 @@ logger = logging.getLogger(__name__) # synapse.federation.federation_client is a silly name metrics = synapse.metrics.get_metrics_for("synapse.federation.client") -sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations") - -sent_edus_counter = metrics.register_counter("sent_edus") - sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) @@ -92,51 +88,6 @@ class FederationClient(FederationBase): self._get_pdu_cache.start() @log_function - def send_pdu(self, pdu, destinations): - """Informs the replication layer about a new PDU generated within the - home server that should be transmitted to others. - - TODO: Figure out when we should actually resolve the deferred. - - Args: - pdu (Pdu): The new Pdu. - - Returns: - Deferred: Completes when we have successfully processed the PDU - and replicated it to any interested remote home servers. - """ - sent_pdus_destination_dist.inc_by(len(destinations)) - - logger.debug("[%s] transaction_layer.send_pdu... ", pdu.event_id) - - # TODO, add errback, etc. - self._transaction_queue.send_pdu(pdu, destinations) - - logger.debug( - "[%s] transaction_layer.send_pdu... done", - pdu.event_id - ) - - def send_presence(self, destination, states): - if destination != self.server_name: - self._transaction_queue.send_presence(destination, states) - - @log_function - def send_edu(self, destination, edu_type, content, key=None): - self._transaction_queue.send_edu(destination, edu_type, content, key=key) - - @log_function - def send_device_messages(self, destination): - """Sends the device messages in the local database to the remote - destination""" - self._transaction_queue.send_device_messages(destination) - - @log_function - def send_failure(self, failure, destination): - self._transaction_queue.send_failure(failure, destination) - return defer.succeed(None) - - @log_function def make_query(self, destination, query_type, args, retry_on_dns_fail=False): """Sends a federation Query to a remote homeserver of the given type diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index eb504055f8..5d4f244377 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -36,6 +36,12 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) +client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client") +sent_pdus_destination_dist = client_metrics.register_distribution( + "sent_pdu_destinations" +) +sent_edus_counter = client_metrics.register_counter("sent_edus") + class TransactionQueue(object): """This class makes sure we only have one transaction in flight at @@ -135,6 +141,8 @@ class TransactionQueue(object): if not destinations: return + sent_pdus_destination_dist.inc_by(len(destinations)) + for destination in destinations: self.pending_pdus_by_dest.setdefault(destination, []).append( (pdu, order) @@ -167,6 +175,8 @@ class TransactionQueue(object): if not self.can_send_to(destination): return + sent_edus_counter.inc() + if key: self.pending_edus_keyed_by_dest.setdefault( destination, {} |