diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/send_queue.py | 30 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 3 |
2 files changed, 27 insertions, 6 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 19cb757acc..93e5acebc1 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -271,9 +271,12 @@ class FederationRemoteSendQueue(object): keys = self.keyed_edu_changed.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j]) + # We purposefully clobber based on the key here, python dict comprehensions + # always use the last value, so this will correctly point to the last + # stream position. + keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]} - for (pos, (destination, edu_key)) in keyed_edus: + for ((destination, edu_key), pos) in keyed_edus.iteritems(): rows.append((pos, KeyedEduRow( key=edu_key, edu=self.keyed_edu[(destination, edu_key)], @@ -283,7 +286,7 @@ class FederationRemoteSendQueue(object): keys = self.edus.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - edus = set((k, self.edus[k]) for k in keys[i:j]) + edus = ((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: rows.append((pos, EduRow(edu))) @@ -292,7 +295,7 @@ class FederationRemoteSendQueue(object): keys = self.failures.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - failures = set((k, self.failures[k]) for k in keys[i:j]) + failures = ((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: rows.append((pos, FailureRow( @@ -304,9 +307,9 @@ class FederationRemoteSendQueue(object): keys = self.device_messages.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) + device_messages = {self.device_messages[k]: k for k in keys[i:j]} - for (pos, destination) in device_messages: + for (destination, pos) in device_messages.iteritems(): rows.append((pos, DeviceRow( destination=destination, ))) @@ -379,6 +382,10 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( "key", # tuple(str) - the edu key passed to send_edu "edu", # Edu ))): + """Streams EDUs that have an associated key that is ued to clobber. For example, + typing EDUs clobber based on room_id. + """ + TypeId = "k" @staticmethod @@ -403,6 +410,8 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( class EduRow(BaseFederationRow, namedtuple("EduRow", ( "edu", # Edu ))): + """Streams EDUs that don't have keys. See KeyedEduRow + """ TypeId = "e" @staticmethod @@ -420,6 +429,11 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", ( "destination", # str "failure", ))): + """Streams failures to a remote server. Failures are issued when there was + something wrong with a transaction the remote sent us, e.g. it included + an event that was invalid. + """ + TypeId = "f" @staticmethod @@ -442,6 +456,10 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", ( class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ( "destination", # str ))): + """Streams the fact that either a) there is pending to device messages for + users on the remote, or b) a local users device has changed and needs to + be sent to the remote. + """ TypeId = "d" @staticmethod diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 2919c2351a..dee387eb7f 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -41,6 +41,8 @@ sent_pdus_destination_dist = client_metrics.register_distribution( ) sent_edus_counter = client_metrics.register_counter("sent_edus") +sent_transactions_counter = client_metrics.register_counter("sent_transactions") + class TransactionQueue(object): """This class makes sure we only have one transaction in flight at @@ -440,6 +442,7 @@ class TransactionQueue(object): destination, pending_pdus, pending_edus, pending_failures, ) if success: + sent_transactions_counter.inc() # Remove the acknowledged device messages from the database # Only bother if we actually sent some device messages if device_message_edus: |