diff options
Diffstat (limited to 'synapse/federation/send_queue.py')
-rw-r--r-- | synapse/federation/send_queue.py | 145 |
1 files changed, 72 insertions, 73 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 0240b339b0..454456a52d 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -77,12 +77,22 @@ class FederationRemoteSendQueue(object): # lambda binds to the queue rather than to the name of the queue which # changes. ARGH. def register(name, queue): - LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,), - "", [], lambda: len(queue)) + LaterGauge( + "synapse_federation_send_queue_%s_size" % (queue_name,), + "", + [], + lambda: len(queue), + ) for queue_name in [ - "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed", - "edus", "device_messages", "pos_time", "presence_destinations", + "presence_map", + "presence_changed", + "keyed_edu", + "keyed_edu_changed", + "edus", + "device_messages", + "pos_time", + "presence_destinations", ]: register(queue_name, getattr(self, queue_name)) @@ -121,9 +131,7 @@ class FederationRemoteSendQueue(object): del self.presence_changed[key] user_ids = set( - user_id - for uids in self.presence_changed.values() - for user_id in uids + user_id for uids in self.presence_changed.values() for user_id in uids ) keys = self.presence_destinations.keys() @@ -285,19 +293,21 @@ class FederationRemoteSendQueue(object): ] for (key, user_id) in dest_user_ids: - rows.append((key, PresenceRow( - state=self.presence_map[user_id], - ))) + rows.append((key, PresenceRow(state=self.presence_map[user_id]))) # Fetch presence to send to destinations i = self.presence_destinations.bisect_right(from_token) j = self.presence_destinations.bisect_right(to_token) + 1 for pos, (user_id, dests) in self.presence_destinations.items()[i:j]: - rows.append((pos, PresenceDestinationsRow( - state=self.presence_map[user_id], - destinations=list(dests), - ))) + rows.append( + ( + pos, + PresenceDestinationsRow( + state=self.presence_map[user_id], destinations=list(dests) + ), + ) + ) # Fetch changes keyed edus i = self.keyed_edu_changed.bisect_right(from_token) @@ -308,10 +318,14 @@ class FederationRemoteSendQueue(object): keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]} for ((destination, edu_key), pos) in iteritems(keyed_edus): - rows.append((pos, KeyedEduRow( - key=edu_key, - edu=self.keyed_edu[(destination, edu_key)], - ))) + rows.append( + ( + pos, + KeyedEduRow( + key=edu_key, edu=self.keyed_edu[(destination, edu_key)] + ), + ) + ) # Fetch changed edus i = self.edus.bisect_right(from_token) @@ -327,9 +341,7 @@ class FederationRemoteSendQueue(object): device_messages = {v: k for k, v in self.device_messages.items()[i:j]} for (destination, pos) in iteritems(device_messages): - rows.append((pos, DeviceRow( - destination=destination, - ))) + rows.append((pos, DeviceRow(destination=destination))) # Sort rows based on pos rows.sort() @@ -377,16 +389,14 @@ class BaseFederationRow(object): raise NotImplementedError() -class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( - "state", # UserPresenceState -))): +class PresenceRow( + BaseFederationRow, namedtuple("PresenceRow", ("state",)) # UserPresenceState +): TypeId = "p" @staticmethod def from_data(data): - return PresenceRow( - state=UserPresenceState.from_dict(data) - ) + return PresenceRow(state=UserPresenceState.from_dict(data)) def to_data(self): return self.state.as_dict() @@ -395,33 +405,35 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( buff.presence.append(self.state) -class PresenceDestinationsRow(BaseFederationRow, namedtuple("PresenceDestinationsRow", ( - "state", # UserPresenceState - "destinations", # list[str] -))): +class PresenceDestinationsRow( + BaseFederationRow, + namedtuple( + "PresenceDestinationsRow", + ("state", "destinations"), # UserPresenceState # list[str] + ), +): TypeId = "pd" @staticmethod def from_data(data): return PresenceDestinationsRow( - state=UserPresenceState.from_dict(data["state"]), - destinations=data["dests"], + state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"] ) def to_data(self): - return { - "state": self.state.as_dict(), - "dests": self.destinations, - } + return {"state": self.state.as_dict(), "dests": self.destinations} def add_to_buffer(self, buff): buff.presence_destinations.append((self.state, self.destinations)) -class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( - "key", # tuple(str) - the edu key passed to send_edu - "edu", # Edu -))): +class KeyedEduRow( + BaseFederationRow, + namedtuple( + "KeyedEduRow", + ("key", "edu"), # tuple(str) - the edu key passed to send_edu # Edu + ), +): """Streams EDUs that have an associated key that is ued to clobber. For example, typing EDUs clobber based on room_id. """ @@ -430,28 +442,19 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( @staticmethod def from_data(data): - return KeyedEduRow( - key=tuple(data["key"]), - edu=Edu(**data["edu"]), - ) + return KeyedEduRow(key=tuple(data["key"]), edu=Edu(**data["edu"])) def to_data(self): - return { - "key": self.key, - "edu": self.edu.get_internal_dict(), - } + return {"key": self.key, "edu": self.edu.get_internal_dict()} def add_to_buffer(self, buff): - buff.keyed_edus.setdefault( - self.edu.destination, {} - )[self.key] = self.edu + buff.keyed_edus.setdefault(self.edu.destination, {})[self.key] = self.edu -class EduRow(BaseFederationRow, namedtuple("EduRow", ( - "edu", # Edu -))): +class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu """Streams EDUs that don't have keys. See KeyedEduRow """ + TypeId = "e" @staticmethod @@ -465,13 +468,12 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ( buff.edus.setdefault(self.edu.destination, []).append(self.edu) -class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ( - "destination", # str -))): +class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ("destination",))): # str """Streams the fact that either a) there is pending to device messages for users on the remote, or b) a local users device has changed and needs to be sent to the remote. """ + TypeId = "d" @staticmethod @@ -487,23 +489,20 @@ class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ( TypeToRow = { Row.TypeId: Row - for Row in ( - PresenceRow, - PresenceDestinationsRow, - KeyedEduRow, - EduRow, - DeviceRow, - ) + for Row in (PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow, DeviceRow) } -ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( - "presence", # list(UserPresenceState) - "presence_destinations", # list of tuples of UserPresenceState and destinations - "keyed_edus", # dict of destination -> { key -> Edu } - "edus", # dict of destination -> [Edu] - "device_destinations", # set of destinations -)) +ParsedFederationStreamData = namedtuple( + "ParsedFederationStreamData", + ( + "presence", # list(UserPresenceState) + "presence_destinations", # list of tuples of UserPresenceState and destinations + "keyed_edus", # dict of destination -> { key -> Edu } + "edus", # dict of destination -> [Edu] + "device_destinations", # set of destinations + ), +) def process_rows_for_federation(transaction_queue, rows): @@ -542,7 +541,7 @@ def process_rows_for_federation(transaction_queue, rows): for state, destinations in buff.presence_destinations: transaction_queue.send_presence_to_destinations( - states=[state], destinations=destinations, + states=[state], destinations=destinations ) for destination, edu_map in iteritems(buff.keyed_edus): |