diff options
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r-- | synapse/federation/transaction_queue.py | 67 |
1 files changed, 32 insertions, 35 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index f0aeb5a0d3..6996d6b695 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -13,37 +13,38 @@ # See the License for the specific language governing permissions and # limitations under the License. import datetime +import logging -from twisted.internet import defer +from six import itervalues -from .persistence import TransactionActions -from .units import Transaction, Edu +from prometheus_client import Counter + +from twisted.internet import defer -from synapse.api.errors import HttpResponseException, FederationDeniedError -from synapse.util import logcontext, PreserveLoggingContext -from synapse.util.async import run_on_reactor -from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter -from synapse.util.metrics import measure_func -from synapse.handlers.presence import format_user_presence_state, get_interested_remotes import synapse.metrics -from synapse.metrics import LaterGauge +from synapse.api.errors import FederationDeniedError, HttpResponseException +from synapse.handlers.presence import format_user_presence_state, get_interested_remotes from synapse.metrics import ( + LaterGauge, + events_processed_counter, sent_edus_counter, sent_transactions_counter, - events_processed_counter, ) +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util import logcontext +from synapse.util.metrics import measure_func +from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter -from prometheus_client import Counter - -from six import itervalues - -import logging - +from .persistence import TransactionActions +from .units import Edu, Transaction logger = logging.getLogger(__name__) -sent_pdus_destination_dist = Counter( - "synapse_federation_transaction_queue_sent_pdu_destinations", "" +sent_pdus_destination_dist_count = Counter( + "synapse_federation_client_sent_pdu_destinations:count", "" +) +sent_pdus_destination_dist_total = Counter( + "synapse_federation_client_sent_pdu_destinations:total", "" ) @@ -165,10 +166,11 @@ class TransactionQueue(object): if self._is_processing: return - # fire off a processing loop in the background. It's likely it will - # outlast the current request, so run it in the sentinel logcontext. - with PreserveLoggingContext(): - self._process_event_queue_loop() + # fire off a processing loop in the background + run_as_background_process( + "process_event_queue_for_federation", + self._process_event_queue_loop, + ) @defer.inlineCallbacks def _process_event_queue_loop(self): @@ -280,7 +282,8 @@ class TransactionQueue(object): if not destinations: return - sent_pdus_destination_dist.inc(len(destinations)) + sent_pdus_destination_dist_total.inc(len(destinations)) + sent_pdus_destination_dist_count.inc() for destination in destinations: self.pending_pdus_by_dest.setdefault(destination, []).append( @@ -431,14 +434,11 @@ class TransactionQueue(object): logger.debug("TX [%s] Starting transaction loop", destination) - # Drop the logcontext before starting the transaction. It doesn't - # really make sense to log all the outbound transactions against - # whatever path led us to this point: that's pretty arbitrary really. - # - # (this also means we can fire off _perform_transaction without - # yielding) - with logcontext.PreserveLoggingContext(): - self._transaction_transmission_loop(destination) + run_as_background_process( + "federation_transaction_transmission_loop", + self._transaction_transmission_loop, + destination, + ) @defer.inlineCallbacks def _transaction_transmission_loop(self, destination): @@ -451,9 +451,6 @@ class TransactionQueue(object): # hence why we throw the result away. yield get_retry_limiter(destination, self.clock, self.store) - # XXX: what's this for? - yield run_on_reactor() - pending_pdus = [] while True: device_message_edus, device_stream_id, dev_list_id = ( |