summary refs log tree commit diff
path: root/synapse/federation/send_queue.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-06-24 10:00:13 +0100
committerRichard van der Hoff <richard@matrix.org>2019-06-24 10:00:13 +0100
commit5097aee740b542407e5bb13d19a3e3e6c2227316 (patch)
tree09a03650256e09cd0b5df59dbf2d7bb2ba14df6c /synapse/federation/send_queue.py
parentchangelog (diff)
parentImprove help and cmdline option names for --generate-config options (#5512) (diff)
downloadsynapse-5097aee740b542407e5bb13d19a3e3e6c2227316.tar.xz
Merge branch 'develop' into rav/cleanup_metrics
Diffstat (limited to 'synapse/federation/send_queue.py')
-rw-r--r--synapse/federation/send_queue.py145
1 files changed, 72 insertions, 73 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 0240b339b0..454456a52d 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -77,12 +77,22 @@ class FederationRemoteSendQueue(object):
         # lambda binds to the queue rather than to the name of the queue which
         # changes. ARGH.
         def register(name, queue):
-            LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,),
-                       "", [], lambda: len(queue))
+            LaterGauge(
+                "synapse_federation_send_queue_%s_size" % (queue_name,),
+                "",
+                [],
+                lambda: len(queue),
+            )
 
         for queue_name in [
-            "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
-            "edus", "device_messages", "pos_time", "presence_destinations",
+            "presence_map",
+            "presence_changed",
+            "keyed_edu",
+            "keyed_edu_changed",
+            "edus",
+            "device_messages",
+            "pos_time",
+            "presence_destinations",
         ]:
             register(queue_name, getattr(self, queue_name))
 
@@ -121,9 +131,7 @@ class FederationRemoteSendQueue(object):
                 del self.presence_changed[key]
 
             user_ids = set(
-                user_id
-                for uids in self.presence_changed.values()
-                for user_id in uids
+                user_id for uids in self.presence_changed.values() for user_id in uids
             )
 
             keys = self.presence_destinations.keys()
@@ -285,19 +293,21 @@ class FederationRemoteSendQueue(object):
         ]
 
         for (key, user_id) in dest_user_ids:
-            rows.append((key, PresenceRow(
-                state=self.presence_map[user_id],
-            )))
+            rows.append((key, PresenceRow(state=self.presence_map[user_id])))
 
         # Fetch presence to send to destinations
         i = self.presence_destinations.bisect_right(from_token)
         j = self.presence_destinations.bisect_right(to_token) + 1
 
         for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
-            rows.append((pos, PresenceDestinationsRow(
-                state=self.presence_map[user_id],
-                destinations=list(dests),
-            )))
+            rows.append(
+                (
+                    pos,
+                    PresenceDestinationsRow(
+                        state=self.presence_map[user_id], destinations=list(dests)
+                    ),
+                )
+            )
 
         # Fetch changes keyed edus
         i = self.keyed_edu_changed.bisect_right(from_token)
@@ -308,10 +318,14 @@ class FederationRemoteSendQueue(object):
         keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
 
         for ((destination, edu_key), pos) in iteritems(keyed_edus):
-            rows.append((pos, KeyedEduRow(
-                key=edu_key,
-                edu=self.keyed_edu[(destination, edu_key)],
-            )))
+            rows.append(
+                (
+                    pos,
+                    KeyedEduRow(
+                        key=edu_key, edu=self.keyed_edu[(destination, edu_key)]
+                    ),
+                )
+            )
 
         # Fetch changed edus
         i = self.edus.bisect_right(from_token)
@@ -327,9 +341,7 @@ class FederationRemoteSendQueue(object):
         device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
 
         for (destination, pos) in iteritems(device_messages):
-            rows.append((pos, DeviceRow(
-                destination=destination,
-            )))
+            rows.append((pos, DeviceRow(destination=destination)))
 
         # Sort rows based on pos
         rows.sort()
@@ -377,16 +389,14 @@ class BaseFederationRow(object):
         raise NotImplementedError()
 
 
-class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
-    "state",  # UserPresenceState
-))):
+class PresenceRow(
+    BaseFederationRow, namedtuple("PresenceRow", ("state",))  # UserPresenceState
+):
     TypeId = "p"
 
     @staticmethod
     def from_data(data):
-        return PresenceRow(
-            state=UserPresenceState.from_dict(data)
-        )
+        return PresenceRow(state=UserPresenceState.from_dict(data))
 
     def to_data(self):
         return self.state.as_dict()
@@ -395,33 +405,35 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
         buff.presence.append(self.state)
 
 
-class PresenceDestinationsRow(BaseFederationRow, namedtuple("PresenceDestinationsRow", (
-    "state",  # UserPresenceState
-    "destinations",  # list[str]
-))):
+class PresenceDestinationsRow(
+    BaseFederationRow,
+    namedtuple(
+        "PresenceDestinationsRow",
+        ("state", "destinations"),  # UserPresenceState  # list[str]
+    ),
+):
     TypeId = "pd"
 
     @staticmethod
     def from_data(data):
         return PresenceDestinationsRow(
-            state=UserPresenceState.from_dict(data["state"]),
-            destinations=data["dests"],
+            state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"]
         )
 
     def to_data(self):
-        return {
-            "state": self.state.as_dict(),
-            "dests": self.destinations,
-        }
+        return {"state": self.state.as_dict(), "dests": self.destinations}
 
     def add_to_buffer(self, buff):
         buff.presence_destinations.append((self.state, self.destinations))
 
 
-class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
-    "key",  # tuple(str) - the edu key passed to send_edu
-    "edu",  # Edu
-))):
+class KeyedEduRow(
+    BaseFederationRow,
+    namedtuple(
+        "KeyedEduRow",
+        ("key", "edu"),  # tuple(str) - the edu key passed to send_edu  # Edu
+    ),
+):
     """Streams EDUs that have an associated key that is ued to clobber. For example,
     typing EDUs clobber based on room_id.
     """
@@ -430,28 +442,19 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
 
     @staticmethod
     def from_data(data):
-        return KeyedEduRow(
-            key=tuple(data["key"]),
-            edu=Edu(**data["edu"]),
-        )
+        return KeyedEduRow(key=tuple(data["key"]), edu=Edu(**data["edu"]))
 
     def to_data(self):
-        return {
-            "key": self.key,
-            "edu": self.edu.get_internal_dict(),
-        }
+        return {"key": self.key, "edu": self.edu.get_internal_dict()}
 
     def add_to_buffer(self, buff):
-        buff.keyed_edus.setdefault(
-            self.edu.destination, {}
-        )[self.key] = self.edu
+        buff.keyed_edus.setdefault(self.edu.destination, {})[self.key] = self.edu
 
 
-class EduRow(BaseFederationRow, namedtuple("EduRow", (
-    "edu",  # Edu
-))):
+class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))):  # Edu
     """Streams EDUs that don't have keys. See KeyedEduRow
     """
+
     TypeId = "e"
 
     @staticmethod
@@ -465,13 +468,12 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", (
         buff.edus.setdefault(self.edu.destination, []).append(self.edu)
 
 
-class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
-    "destination",  # str
-))):
+class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ("destination",))):  # str
     """Streams the fact that either a) there is pending to device messages for
     users on the remote, or b) a local users device has changed and needs to
     be sent to the remote.
     """
+
     TypeId = "d"
 
     @staticmethod
@@ -487,23 +489,20 @@ class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
 
 TypeToRow = {
     Row.TypeId: Row
-    for Row in (
-        PresenceRow,
-        PresenceDestinationsRow,
-        KeyedEduRow,
-        EduRow,
-        DeviceRow,
-    )
+    for Row in (PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow, DeviceRow)
 }
 
 
-ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
-    "presence",  # list(UserPresenceState)
-    "presence_destinations",  # list of tuples of UserPresenceState and destinations
-    "keyed_edus",  # dict of destination -> { key -> Edu }
-    "edus",  # dict of destination -> [Edu]
-    "device_destinations",  # set of destinations
-))
+ParsedFederationStreamData = namedtuple(
+    "ParsedFederationStreamData",
+    (
+        "presence",  # list(UserPresenceState)
+        "presence_destinations",  # list of tuples of UserPresenceState and destinations
+        "keyed_edus",  # dict of destination -> { key -> Edu }
+        "edus",  # dict of destination -> [Edu]
+        "device_destinations",  # set of destinations
+    ),
+)
 
 
 def process_rows_for_federation(transaction_queue, rows):
@@ -542,7 +541,7 @@ def process_rows_for_federation(transaction_queue, rows):
 
     for state, destinations in buff.presence_destinations:
         transaction_queue.send_presence_to_destinations(
-            states=[state], destinations=destinations,
+            states=[state], destinations=destinations
         )
 
     for destination, edu_map in iteritems(buff.keyed_edus):