summary refs log tree commit diff
path: root/synapse/federation/transaction_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r--synapse/federation/transaction_queue.py52
1 files changed, 39 insertions, 13 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 3fdd63be95..30941f5ad6 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -22,14 +22,17 @@ from prometheus_client import Counter
 from twisted.internet import defer
 
 import synapse.metrics
-from synapse.api.errors import FederationDeniedError, HttpResponseException
+from synapse.api.errors import (
+    FederationDeniedError,
+    HttpResponseException,
+    RequestSendFailed,
+)
 from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
 from synapse.metrics import (
     LaterGauge,
     event_processing_loop_counter,
     event_processing_loop_room_count,
     events_processed_counter,
-    sent_edus_counter,
     sent_transactions_counter,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -43,10 +46,24 @@ from .units import Edu, Transaction
 logger = logging.getLogger(__name__)
 
 sent_pdus_destination_dist_count = Counter(
-    "synapse_federation_client_sent_pdu_destinations:count", ""
+    "synapse_federation_client_sent_pdu_destinations:count",
+    "Number of PDUs queued for sending to one or more destinations",
 )
+
 sent_pdus_destination_dist_total = Counter(
     "synapse_federation_client_sent_pdu_destinations:total", ""
+    "Total number of PDUs queued for sending across all destinations",
+)
+
+sent_edus_counter = Counter(
+    "synapse_federation_client_sent_edus",
+    "Total number of EDUs successfully sent",
+)
+
+sent_edus_by_type = Counter(
+    "synapse_federation_client_sent_edus_by_type",
+    "Number of sent EDUs successfully sent, by event type",
+    ["type"],
 )
 
 
@@ -171,7 +188,7 @@ class TransactionQueue(object):
                 def handle_event(event):
                     # Only send events for this server.
                     send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
-                    is_mine = self.is_mine_id(event.event_id)
+                    is_mine = self.is_mine_id(event.sender)
                     if not is_mine and send_on_behalf_of is None:
                         return
 
@@ -183,9 +200,7 @@ class TransactionQueue(object):
                         # banned then it won't receive the event because it won't
                         # be in the room after the ban.
                         destinations = yield self.state.get_current_hosts_in_room(
-                            event.room_id, latest_event_ids=[
-                                prev_id for prev_id, _ in event.prev_events
-                            ],
+                            event.room_id, latest_event_ids=event.prev_event_ids(),
                         )
                     except Exception:
                         logger.exception(
@@ -358,8 +373,6 @@ class TransactionQueue(object):
             logger.info("Not sending EDU to ourselves")
             return
 
-        sent_edus_counter.inc()
-
         if key:
             self.pending_edus_keyed_by_dest.setdefault(
                 destination, {}
@@ -494,6 +507,9 @@ class TransactionQueue(object):
                 )
                 if success:
                     sent_transactions_counter.inc()
+                    sent_edus_counter.inc(len(pending_edus))
+                    for edu in pending_edus:
+                        sent_edus_by_type.labels(edu.edu_type).inc()
                     # Remove the acknowledged device messages from the database
                     # Only bother if we actually sent some device messages
                     if device_message_edus:
@@ -520,11 +536,21 @@ class TransactionQueue(object):
             )
         except FederationDeniedError as e:
             logger.info(e)
-        except Exception as e:
-            logger.warn(
-                "TX [%s] Failed to send transaction: %s",
+        except HttpResponseException as e:
+            logger.warning(
+                "TX [%s] Received %d response to transaction: %s",
+                destination, e.code, e,
+            )
+        except RequestSendFailed as e:
+            logger.warning("TX [%s] Failed to send transaction: %s", destination, e)
+
+            for p, _ in pending_pdus:
+                logger.info("Failed to send event %s to %s", p.event_id,
+                            destination)
+        except Exception:
+            logger.exception(
+                "TX [%s] Failed to send transaction",
                 destination,
-                e,
             )
             for p, _ in pending_pdus:
                 logger.info("Failed to send event %s to %s", p.event_id,