diff options
author | Erik Johnston <erikj@jki.re> | 2017-04-12 10:57:24 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-04-12 10:57:24 +0100 |
commit | c06c00190f1d80197e3bd45b856c78d8b028d15a (patch) | |
tree | 84f9a34c2b4d93698a566e4402b4850a62165a0d /synapse/federation | |
parent | Merge pull request #2117 from matrix-org/erikj/remove_http_replication (diff) | |
parent | Add some comments (diff) | |
download | synapse-c06c00190f1d80197e3bd45b856c78d8b028d15a.tar.xz |
Merge pull request #2116 from matrix-org/erikj/dedupe_federation_repl2
Dedupe KeyedEdu and Devices federation repl traffic
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/send_queue.py | 30 |
1 files changed, 24 insertions, 6 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 748548bbe2..b952e59518 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -267,9 +267,12 @@ class FederationRemoteSendQueue(object): keys = self.keyed_edu_changed.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j]) + # We purposefully clobber based on the key here, python dict comprehensions + # always use the last value, so this will correctly point to the last + # stream position. + keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]} - for (pos, (destination, edu_key)) in keyed_edus: + for ((destination, edu_key), pos) in keyed_edus.iteritems(): rows.append((pos, KeyedEduRow( key=edu_key, edu=self.keyed_edu[(destination, edu_key)], @@ -279,7 +282,7 @@ class FederationRemoteSendQueue(object): keys = self.edus.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - edus = set((k, self.edus[k]) for k in keys[i:j]) + edus = ((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: rows.append((pos, EduRow(edu))) @@ -288,7 +291,7 @@ class FederationRemoteSendQueue(object): keys = self.failures.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - failures = set((k, self.failures[k]) for k in keys[i:j]) + failures = ((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: rows.append((pos, FailureRow( @@ -300,9 +303,9 @@ class FederationRemoteSendQueue(object): keys = self.device_messages.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) + device_messages = {self.device_messages[k]: k for k in keys[i:j]} - for (pos, destination) in device_messages: + for (destination, pos) in device_messages.iteritems(): rows.append((pos, DeviceRow( destination=destination, ))) @@ -380,6 +383,10 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( "key", # tuple(str) - the edu key passed to send_edu "edu", # Edu ))): + """Streams EDUs that have an associated key that is ued to clobber. For example, + typing EDUs clobber based on room_id. + """ + TypeId = "k" @staticmethod @@ -404,6 +411,8 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( class EduRow(BaseFederationRow, namedtuple("EduRow", ( "edu", # Edu ))): + """Streams EDUs that don't have keys. See KeyedEduRow + """ TypeId = "e" @staticmethod @@ -421,6 +430,11 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", ( "destination", # str "failure", ))): + """Streams failures to a remote server. Failures are issued when there was + something wrong with a transaction the remote sent us, e.g. it included + an event that was invalid. + """ + TypeId = "f" @staticmethod @@ -443,6 +457,10 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", ( class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ( "destination", # str ))): + """Streams the fact that either a) there is pending to device messages for + users on the remote, or b) a local users device has changed and needs to + be sent to the remote. + """ TypeId = "d" @staticmethod |