diff options
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r-- | synapse/federation/transaction_queue.py | 18 |
1 files changed, 15 insertions, 3 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 7a3c9cbb70..a141ec9953 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -19,8 +19,8 @@ from twisted.internet import defer from .persistence import TransactionActions from .units import Transaction, Edu -from synapse.api.errors import HttpResponseException -from synapse.util import logcontext +from synapse.api.errors import HttpResponseException, FederationDeniedError +from synapse.util import logcontext, PreserveLoggingContext from synapse.util.async import run_on_reactor from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func @@ -42,6 +42,8 @@ sent_edus_counter = client_metrics.register_counter("sent_edus") sent_transactions_counter = client_metrics.register_counter("sent_transactions") +events_processed_counter = client_metrics.register_counter("events_processed") + class TransactionQueue(object): """This class makes sure we only have one transaction in flight at @@ -146,7 +148,6 @@ class TransactionQueue(object): else: return not destination.startswith("localhost") - @defer.inlineCallbacks def notify_new_events(self, current_id): """This gets called when we have some new events we might want to send out to other servers. @@ -156,6 +157,13 @@ class TransactionQueue(object): if self._is_processing: return + # fire off a processing loop in the background. It's likely it will + # outlast the current request, so run it in the sentinel logcontext. + with PreserveLoggingContext(): + self._process_event_queue_loop() + + @defer.inlineCallbacks + def _process_event_queue_loop(self): try: self._is_processing = True while True: @@ -199,6 +207,8 @@ class TransactionQueue(object): self._send_pdu(event, destinations) + events_processed_counter.inc_by(len(events)) + yield self.store.update_federation_out_pos( "events", next_token ) @@ -480,6 +490,8 @@ class TransactionQueue(object): (e.retry_last_ts + e.retry_interval) / 1000.0 ), ) + except FederationDeniedError as e: + logger.info(e) except Exception as e: logger.warn( "TX [%s] Failed to send transaction: %s", |