summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/send_queue.py30
-rw-r--r--synapse/federation/transaction_queue.py3
2 files changed, 27 insertions, 6 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 19cb757acc..93e5acebc1 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -271,9 +271,12 @@ class FederationRemoteSendQueue(object):
         keys = self.keyed_edu_changed.keys()
         i = keys.bisect_right(from_token)
         j = keys.bisect_right(to_token) + 1
-        keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j])
+        # We purposefully clobber based on the key here, python dict comprehensions
+        # always use the last value, so this will correctly point to the last
+        # stream position.
+        keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
 
-        for (pos, (destination, edu_key)) in keyed_edus:
+        for ((destination, edu_key), pos) in keyed_edus.iteritems():
             rows.append((pos, KeyedEduRow(
                 key=edu_key,
                 edu=self.keyed_edu[(destination, edu_key)],
@@ -283,7 +286,7 @@ class FederationRemoteSendQueue(object):
         keys = self.edus.keys()
         i = keys.bisect_right(from_token)
         j = keys.bisect_right(to_token) + 1
-        edus = set((k, self.edus[k]) for k in keys[i:j])
+        edus = ((k, self.edus[k]) for k in keys[i:j])
 
         for (pos, edu) in edus:
             rows.append((pos, EduRow(edu)))
@@ -292,7 +295,7 @@ class FederationRemoteSendQueue(object):
         keys = self.failures.keys()
         i = keys.bisect_right(from_token)
         j = keys.bisect_right(to_token) + 1
-        failures = set((k, self.failures[k]) for k in keys[i:j])
+        failures = ((k, self.failures[k]) for k in keys[i:j])
 
         for (pos, (destination, failure)) in failures:
             rows.append((pos, FailureRow(
@@ -304,9 +307,9 @@ class FederationRemoteSendQueue(object):
         keys = self.device_messages.keys()
         i = keys.bisect_right(from_token)
         j = keys.bisect_right(to_token) + 1
-        device_messages = set((k, self.device_messages[k]) for k in keys[i:j])
+        device_messages = {self.device_messages[k]: k for k in keys[i:j]}
 
-        for (pos, destination) in device_messages:
+        for (destination, pos) in device_messages.iteritems():
             rows.append((pos, DeviceRow(
                 destination=destination,
             )))
@@ -379,6 +382,10 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
     "key",  # tuple(str) - the edu key passed to send_edu
     "edu",  # Edu
 ))):
+    """Streams EDUs that have an associated key that is ued to clobber. For example,
+    typing EDUs clobber based on room_id.
+    """
+
     TypeId = "k"
 
     @staticmethod
@@ -403,6 +410,8 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
 class EduRow(BaseFederationRow, namedtuple("EduRow", (
     "edu",  # Edu
 ))):
+    """Streams EDUs that don't have keys. See KeyedEduRow
+    """
     TypeId = "e"
 
     @staticmethod
@@ -420,6 +429,11 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
     "destination",  # str
     "failure",
 ))):
+    """Streams failures to a remote server. Failures are issued when there was
+    something wrong with a transaction the remote sent us, e.g. it included
+    an event that was invalid.
+    """
+
     TypeId = "f"
 
     @staticmethod
@@ -442,6 +456,10 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
 class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
     "destination",  # str
 ))):
+    """Streams the fact that either a) there is pending to device messages for
+    users on the remote, or b) a local users device has changed and needs to
+    be sent to the remote.
+    """
     TypeId = "d"
 
     @staticmethod
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 2919c2351a..dee387eb7f 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -41,6 +41,8 @@ sent_pdus_destination_dist = client_metrics.register_distribution(
 )
 sent_edus_counter = client_metrics.register_counter("sent_edus")
 
+sent_transactions_counter = client_metrics.register_counter("sent_transactions")
+
 
 class TransactionQueue(object):
     """This class makes sure we only have one transaction in flight at
@@ -440,6 +442,7 @@ class TransactionQueue(object):
                     destination, pending_pdus, pending_edus, pending_failures,
                 )
                 if success:
+                    sent_transactions_counter.inc()
                     # Remove the acknowledged device messages from the database
                     # Only bother if we actually sent some device messages
                     if device_message_edus: