diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 0240b339b0..454456a52d 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -77,12 +77,22 @@ class FederationRemoteSendQueue(object):
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(name, queue):
- LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,),
- "", [], lambda: len(queue))
+ LaterGauge(
+ "synapse_federation_send_queue_%s_size" % (queue_name,),
+ "",
+ [],
+ lambda: len(queue),
+ )
for queue_name in [
- "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
- "edus", "device_messages", "pos_time", "presence_destinations",
+ "presence_map",
+ "presence_changed",
+ "keyed_edu",
+ "keyed_edu_changed",
+ "edus",
+ "device_messages",
+ "pos_time",
+ "presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
@@ -121,9 +131,7 @@ class FederationRemoteSendQueue(object):
del self.presence_changed[key]
user_ids = set(
- user_id
- for uids in self.presence_changed.values()
- for user_id in uids
+ user_id for uids in self.presence_changed.values() for user_id in uids
)
keys = self.presence_destinations.keys()
@@ -285,19 +293,21 @@ class FederationRemoteSendQueue(object):
]
for (key, user_id) in dest_user_ids:
- rows.append((key, PresenceRow(
- state=self.presence_map[user_id],
- )))
+ rows.append((key, PresenceRow(state=self.presence_map[user_id])))
# Fetch presence to send to destinations
i = self.presence_destinations.bisect_right(from_token)
j = self.presence_destinations.bisect_right(to_token) + 1
for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
- rows.append((pos, PresenceDestinationsRow(
- state=self.presence_map[user_id],
- destinations=list(dests),
- )))
+ rows.append(
+ (
+ pos,
+ PresenceDestinationsRow(
+ state=self.presence_map[user_id], destinations=list(dests)
+ ),
+ )
+ )
# Fetch changes keyed edus
i = self.keyed_edu_changed.bisect_right(from_token)
@@ -308,10 +318,14 @@ class FederationRemoteSendQueue(object):
keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
for ((destination, edu_key), pos) in iteritems(keyed_edus):
- rows.append((pos, KeyedEduRow(
- key=edu_key,
- edu=self.keyed_edu[(destination, edu_key)],
- )))
+ rows.append(
+ (
+ pos,
+ KeyedEduRow(
+ key=edu_key, edu=self.keyed_edu[(destination, edu_key)]
+ ),
+ )
+ )
# Fetch changed edus
i = self.edus.bisect_right(from_token)
@@ -327,9 +341,7 @@ class FederationRemoteSendQueue(object):
device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
for (destination, pos) in iteritems(device_messages):
- rows.append((pos, DeviceRow(
- destination=destination,
- )))
+ rows.append((pos, DeviceRow(destination=destination)))
# Sort rows based on pos
rows.sort()
@@ -377,16 +389,14 @@ class BaseFederationRow(object):
raise NotImplementedError()
-class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
- "state", # UserPresenceState
-))):
+class PresenceRow(
+ BaseFederationRow, namedtuple("PresenceRow", ("state",)) # UserPresenceState
+):
TypeId = "p"
@staticmethod
def from_data(data):
- return PresenceRow(
- state=UserPresenceState.from_dict(data)
- )
+ return PresenceRow(state=UserPresenceState.from_dict(data))
def to_data(self):
return self.state.as_dict()
@@ -395,33 +405,35 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
buff.presence.append(self.state)
-class PresenceDestinationsRow(BaseFederationRow, namedtuple("PresenceDestinationsRow", (
- "state", # UserPresenceState
- "destinations", # list[str]
-))):
+class PresenceDestinationsRow(
+ BaseFederationRow,
+ namedtuple(
+ "PresenceDestinationsRow",
+ ("state", "destinations"), # UserPresenceState # list[str]
+ ),
+):
TypeId = "pd"
@staticmethod
def from_data(data):
return PresenceDestinationsRow(
- state=UserPresenceState.from_dict(data["state"]),
- destinations=data["dests"],
+ state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"]
)
def to_data(self):
- return {
- "state": self.state.as_dict(),
- "dests": self.destinations,
- }
+ return {"state": self.state.as_dict(), "dests": self.destinations}
def add_to_buffer(self, buff):
buff.presence_destinations.append((self.state, self.destinations))
-class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
- "key", # tuple(str) - the edu key passed to send_edu
- "edu", # Edu
-))):
+class KeyedEduRow(
+ BaseFederationRow,
+ namedtuple(
+ "KeyedEduRow",
+ ("key", "edu"), # tuple(str) - the edu key passed to send_edu # Edu
+ ),
+):
"""Streams EDUs that have an associated key that is ued to clobber. For example,
typing EDUs clobber based on room_id.
"""
@@ -430,28 +442,19 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
@staticmethod
def from_data(data):
- return KeyedEduRow(
- key=tuple(data["key"]),
- edu=Edu(**data["edu"]),
- )
+ return KeyedEduRow(key=tuple(data["key"]), edu=Edu(**data["edu"]))
def to_data(self):
- return {
- "key": self.key,
- "edu": self.edu.get_internal_dict(),
- }
+ return {"key": self.key, "edu": self.edu.get_internal_dict()}
def add_to_buffer(self, buff):
- buff.keyed_edus.setdefault(
- self.edu.destination, {}
- )[self.key] = self.edu
+ buff.keyed_edus.setdefault(self.edu.destination, {})[self.key] = self.edu
-class EduRow(BaseFederationRow, namedtuple("EduRow", (
- "edu", # Edu
-))):
+class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
"""Streams EDUs that don't have keys. See KeyedEduRow
"""
+
TypeId = "e"
@staticmethod
@@ -465,13 +468,12 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", (
buff.edus.setdefault(self.edu.destination, []).append(self.edu)
-class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
- "destination", # str
-))):
+class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ("destination",))): # str
"""Streams the fact that either a) there is pending to device messages for
users on the remote, or b) a local users device has changed and needs to
be sent to the remote.
"""
+
TypeId = "d"
@staticmethod
@@ -487,23 +489,20 @@ class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
TypeToRow = {
Row.TypeId: Row
- for Row in (
- PresenceRow,
- PresenceDestinationsRow,
- KeyedEduRow,
- EduRow,
- DeviceRow,
- )
+ for Row in (PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow, DeviceRow)
}
-ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
- "presence", # list(UserPresenceState)
- "presence_destinations", # list of tuples of UserPresenceState and destinations
- "keyed_edus", # dict of destination -> { key -> Edu }
- "edus", # dict of destination -> [Edu]
- "device_destinations", # set of destinations
-))
+ParsedFederationStreamData = namedtuple(
+ "ParsedFederationStreamData",
+ (
+ "presence", # list(UserPresenceState)
+ "presence_destinations", # list of tuples of UserPresenceState and destinations
+ "keyed_edus", # dict of destination -> { key -> Edu }
+ "edus", # dict of destination -> [Edu]
+ "device_destinations", # set of destinations
+ ),
+)
def process_rows_for_federation(transaction_queue, rows):
@@ -542,7 +541,7 @@ def process_rows_for_federation(transaction_queue, rows):
for state, destinations in buff.presence_destinations:
transaction_queue.send_presence_to_destinations(
- states=[state], destinations=destinations,
+ states=[state], destinations=destinations
)
for destination, edu_map in iteritems(buff.keyed_edus):
|