diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/send_queue.py | 116 |
1 files changed, 68 insertions, 48 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 98cf125cb5..76e4c5cd80 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -31,10 +31,16 @@ Events are replicated via a separate events stream. from .units import Edu +from synapse.util.metrics import Measure +import synapse.metrics + from blist import sorteddict import ujson +metrics = synapse.metrics.get_metrics_for(__name__) + + PRESENCE_TYPE = "p" KEYED_EDU_TYPE = "k" EDU_TYPE = "e" @@ -49,8 +55,6 @@ class FederationRemoteSendQueue(object): self.server_name = hs.hostname self.clock = hs.get_clock() - # TODO: Add metrics for size of lists below - self.presence_map = {} self.presence_changed = sorteddict() @@ -61,10 +65,24 @@ class FederationRemoteSendQueue(object): self.failures = sorteddict() + self.device_messages = sorteddict() + self.pos = 1 self.pos_time = sorteddict() - self.device_messages = sorteddict() + # EVERYTHING IS SAD. In particular, python only makes new scopes when + # we make a new function, so we need to make a new function so the inner + def register(name, queue): + metrics.register_callback( + queue_name + "_size", + lambda: len(queue), + ) + + for queue_name in [ + "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed", + "edus", "failures", "device_messages", "pos_time", + ]: + register(queue_name, getattr(self, queue_name)) self.clock.looping_call(self._clear_queue, 30 * 1000) @@ -76,7 +94,6 @@ class FederationRemoteSendQueue(object): def _clear_queue(self): """Clear the queues for anything older than N minutes""" - # TODO measure this function time. FIVE_MINUTES_AGO = 5 * 60 * 1000 now = self.clock.time_msec() @@ -94,51 +111,54 @@ class FederationRemoteSendQueue(object): def _clear_queue_before_pos(self, position_to_delete): """Clear all the queues from before a given position""" - # Delete things out of presence maps - keys = self.presence_changed.keys() - i = keys.bisect_left(position_to_delete) - for key in keys[:i]: - del self.presence_changed[key] - - user_ids = set( - user_id for uids in self.presence_changed.values() for _, user_id in uids - ) - - to_del = [user_id for user_id in self.presence_map if user_id not in user_ids] - for user_id in to_del: - del self.presence_map[user_id] - - # Delete things out of keyed edus - keys = self.keyed_edu_changed.keys() - i = keys.bisect_left(position_to_delete) - for key in keys[:i]: - del self.keyed_edu_changed[key] - - live_keys = set() - for edu_key in self.keyed_edu_changed.values(): - live_keys.add(edu_key) - - to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys] - for edu_key in to_del: - del self.keyed_edu[edu_key] - - # Delete things out of edu map - keys = self.edus.keys() - i = keys.bisect_left(position_to_delete) - for key in keys[:i]: - del self.edus[key] - - # Delete things out of failure map - keys = self.failures.keys() - i = keys.bisect_left(position_to_delete) - for key in keys[:i]: - del self.failures[key] + with Measure(self.clock, "send_queue._clear"): + # Delete things out of presence maps + keys = self.presence_changed.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.presence_changed[key] + + user_ids = set( + user_id for uids in self.presence_changed.values() for _, user_id in uids + ) - # Delete things out of device map - keys = self.device_messages.keys() - i = keys.bisect_left(position_to_delete) - for key in keys[:i]: - del self.device_messages[key] + to_del = [ + user_id for user_id in self.presence_map if user_id not in user_ids + ] + for user_id in to_del: + del self.presence_map[user_id] + + # Delete things out of keyed edus + keys = self.keyed_edu_changed.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.keyed_edu_changed[key] + + live_keys = set() + for edu_key in self.keyed_edu_changed.values(): + live_keys.add(edu_key) + + to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys] + for edu_key in to_del: + del self.keyed_edu[edu_key] + + # Delete things out of edu map + keys = self.edus.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.edus[key] + + # Delete things out of failure map + keys = self.failures.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.failures[key] + + # Delete things out of device map + keys = self.device_messages.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.device_messages[key] def notify_new_events(self, current_id): """As per TransactionQueue""" |