diff options
Diffstat (limited to 'synapse/federation/send_queue.py')
-rw-r--r-- | synapse/federation/send_queue.py | 98 |
1 files changed, 45 insertions, 53 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 93e5acebc1..5157c3860d 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -29,23 +29,22 @@ dead worker doesn't cause the queues to grow limitlessly. Events are replicated via a separate events stream. """ -from .units import Edu +import logging +from collections import namedtuple +from six import iteritems, itervalues + +from sortedcontainers import SortedDict + +from synapse.metrics import LaterGauge from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure -import synapse.metrics - -from blist import sorteddict -from collections import namedtuple -import logging +from .units import Edu logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - - class FederationRemoteSendQueue(object): """A drop in replacement for TransactionQueue""" @@ -56,29 +55,27 @@ class FederationRemoteSendQueue(object): self.is_mine_id = hs.is_mine_id self.presence_map = {} # Pending presence map user_id -> UserPresenceState - self.presence_changed = sorteddict() # Stream position -> user_id + self.presence_changed = SortedDict() # Stream position -> user_id self.keyed_edu = {} # (destination, key) -> EDU - self.keyed_edu_changed = sorteddict() # stream position -> (destination, key) + self.keyed_edu_changed = SortedDict() # stream position -> (destination, key) - self.edus = sorteddict() # stream position -> Edu + self.edus = SortedDict() # stream position -> Edu - self.failures = sorteddict() # stream position -> (destination, Failure) + self.failures = SortedDict() # stream position -> (destination, Failure) - self.device_messages = sorteddict() # stream position -> destination + self.device_messages = SortedDict() # stream position -> destination self.pos = 1 - self.pos_time = sorteddict() + self.pos_time = SortedDict() # EVERYTHING IS SAD. In particular, python only makes new scopes when # we make a new function, so we need to make a new function so the inner # lambda binds to the queue rather than to the name of the queue which # changes. ARGH. def register(name, queue): - metrics.register_callback( - queue_name + "_size", - lambda: len(queue), - ) + LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,), + "", [], lambda: len(queue)) for queue_name in [ "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed", @@ -101,7 +98,7 @@ class FederationRemoteSendQueue(object): now = self.clock.time_msec() keys = self.pos_time.keys() - time = keys.bisect_left(now - FIVE_MINUTES_AGO) + time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO) if not keys[:time]: return @@ -116,13 +113,13 @@ class FederationRemoteSendQueue(object): with Measure(self.clock, "send_queue._clear"): # Delete things out of presence maps keys = self.presence_changed.keys() - i = keys.bisect_left(position_to_delete) + i = self.presence_changed.bisect_left(position_to_delete) for key in keys[:i]: del self.presence_changed[key] user_ids = set( user_id - for uids in self.presence_changed.itervalues() + for uids in itervalues(self.presence_changed) for user_id in uids ) @@ -134,7 +131,7 @@ class FederationRemoteSendQueue(object): # Delete things out of keyed edus keys = self.keyed_edu_changed.keys() - i = keys.bisect_left(position_to_delete) + i = self.keyed_edu_changed.bisect_left(position_to_delete) for key in keys[:i]: del self.keyed_edu_changed[key] @@ -148,19 +145,19 @@ class FederationRemoteSendQueue(object): # Delete things out of edu map keys = self.edus.keys() - i = keys.bisect_left(position_to_delete) + i = self.edus.bisect_left(position_to_delete) for key in keys[:i]: del self.edus[key] # Delete things out of failure map keys = self.failures.keys() - i = keys.bisect_left(position_to_delete) + i = self.failures.bisect_left(position_to_delete) for key in keys[:i]: del self.failures[key] # Delete things out of device map keys = self.device_messages.keys() - i = keys.bisect_left(position_to_delete) + i = self.device_messages.bisect_left(position_to_delete) for key in keys[:i]: del self.device_messages[key] @@ -200,7 +197,7 @@ class FederationRemoteSendQueue(object): # We only want to send presence for our own users, so lets always just # filter here just in case. - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + local_states = list(filter(lambda s: self.is_mine_id(s.user_id), states)) self.presence_map.update({state.user_id: state for state in local_states}) self.presence_changed[pos] = [state.user_id for state in local_states] @@ -253,13 +250,12 @@ class FederationRemoteSendQueue(object): self._clear_queue_before_pos(federation_ack) # Fetch changed presence - keys = self.presence_changed.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 + i = self.presence_changed.bisect_right(from_token) + j = self.presence_changed.bisect_right(to_token) + 1 dest_user_ids = [ (pos, user_id) - for pos in keys[i:j] - for user_id in self.presence_changed[pos] + for pos, user_id_list in self.presence_changed.items()[i:j] + for user_id in user_id_list ] for (key, user_id) in dest_user_ids: @@ -268,34 +264,31 @@ class FederationRemoteSendQueue(object): ))) # Fetch changes keyed edus - keys = self.keyed_edu_changed.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 + i = self.keyed_edu_changed.bisect_right(from_token) + j = self.keyed_edu_changed.bisect_right(to_token) + 1 # We purposefully clobber based on the key here, python dict comprehensions # always use the last value, so this will correctly point to the last # stream position. - keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]} + keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]} - for ((destination, edu_key), pos) in keyed_edus.iteritems(): + for ((destination, edu_key), pos) in iteritems(keyed_edus): rows.append((pos, KeyedEduRow( key=edu_key, edu=self.keyed_edu[(destination, edu_key)], ))) # Fetch changed edus - keys = self.edus.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 - edus = ((k, self.edus[k]) for k in keys[i:j]) + i = self.edus.bisect_right(from_token) + j = self.edus.bisect_right(to_token) + 1 + edus = self.edus.items()[i:j] for (pos, edu) in edus: rows.append((pos, EduRow(edu))) # Fetch changed failures - keys = self.failures.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 - failures = ((k, self.failures[k]) for k in keys[i:j]) + i = self.failures.bisect_right(from_token) + j = self.failures.bisect_right(to_token) + 1 + failures = self.failures.items()[i:j] for (pos, (destination, failure)) in failures: rows.append((pos, FailureRow( @@ -304,12 +297,11 @@ class FederationRemoteSendQueue(object): ))) # Fetch changed device messages - keys = self.device_messages.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 - device_messages = {self.device_messages[k]: k for k in keys[i:j]} + i = self.device_messages.bisect_right(from_token) + j = self.device_messages.bisect_right(to_token) + 1 + device_messages = {v: k for k, v in self.device_messages.items()[i:j]} - for (destination, pos) in device_messages.iteritems(): + for (destination, pos) in iteritems(device_messages): rows.append((pos, DeviceRow( destination=destination, ))) @@ -528,19 +520,19 @@ def process_rows_for_federation(transaction_queue, rows): if buff.presence: transaction_queue.send_presence(buff.presence) - for destination, edu_map in buff.keyed_edus.iteritems(): + for destination, edu_map in iteritems(buff.keyed_edus): for key, edu in edu_map.items(): transaction_queue.send_edu( edu.destination, edu.edu_type, edu.content, key=key, ) - for destination, edu_list in buff.edus.iteritems(): + for destination, edu_list in iteritems(buff.edus): for edu in edu_list: transaction_queue.send_edu( edu.destination, edu.edu_type, edu.content, key=None, ) - for destination, failure_list in buff.failures.iteritems(): + for destination, failure_list in iteritems(buff.failures): for failure in failure_list: transaction_queue.send_failure(destination, failure) |