summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/send_queue.py116
1 files changed, 68 insertions, 48 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 98cf125cb5..76e4c5cd80 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -31,10 +31,16 @@ Events are replicated via a separate events stream.
 
 from .units import Edu
 
+from synapse.util.metrics import Measure
+import synapse.metrics
+
 from blist import sorteddict
 import ujson
 
 
+metrics = synapse.metrics.get_metrics_for(__name__)
+
+
 PRESENCE_TYPE = "p"
 KEYED_EDU_TYPE = "k"
 EDU_TYPE = "e"
@@ -49,8 +55,6 @@ class FederationRemoteSendQueue(object):
         self.server_name = hs.hostname
         self.clock = hs.get_clock()
 
-        # TODO: Add metrics for size of lists below
-
         self.presence_map = {}
         self.presence_changed = sorteddict()
 
@@ -61,10 +65,24 @@ class FederationRemoteSendQueue(object):
 
         self.failures = sorteddict()
 
+        self.device_messages = sorteddict()
+
         self.pos = 1
         self.pos_time = sorteddict()
 
-        self.device_messages = sorteddict()
+        # EVERYTHING IS SAD. In particular, python only makes new scopes when
+        # we make a new function, so we need to make a new function so the inner
+        def register(name, queue):
+            metrics.register_callback(
+                queue_name + "_size",
+                lambda: len(queue),
+            )
+
+        for queue_name in [
+            "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
+            "edus", "failures", "device_messages", "pos_time",
+        ]:
+            register(queue_name, getattr(self, queue_name))
 
         self.clock.looping_call(self._clear_queue, 30 * 1000)
 
@@ -76,7 +94,6 @@ class FederationRemoteSendQueue(object):
 
     def _clear_queue(self):
         """Clear the queues for anything older than N minutes"""
-        # TODO measure this function time.
 
         FIVE_MINUTES_AGO = 5 * 60 * 1000
         now = self.clock.time_msec()
@@ -94,51 +111,54 @@ class FederationRemoteSendQueue(object):
 
     def _clear_queue_before_pos(self, position_to_delete):
         """Clear all the queues from before a given position"""
-        # Delete things out of presence maps
-        keys = self.presence_changed.keys()
-        i = keys.bisect_left(position_to_delete)
-        for key in keys[:i]:
-            del self.presence_changed[key]
-
-        user_ids = set(
-            user_id for uids in self.presence_changed.values() for _, user_id in uids
-        )
-
-        to_del = [user_id for user_id in self.presence_map if user_id not in user_ids]
-        for user_id in to_del:
-            del self.presence_map[user_id]
-
-        # Delete things out of keyed edus
-        keys = self.keyed_edu_changed.keys()
-        i = keys.bisect_left(position_to_delete)
-        for key in keys[:i]:
-            del self.keyed_edu_changed[key]
-
-        live_keys = set()
-        for edu_key in self.keyed_edu_changed.values():
-            live_keys.add(edu_key)
-
-        to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys]
-        for edu_key in to_del:
-            del self.keyed_edu[edu_key]
-
-        # Delete things out of edu map
-        keys = self.edus.keys()
-        i = keys.bisect_left(position_to_delete)
-        for key in keys[:i]:
-            del self.edus[key]
-
-        # Delete things out of failure map
-        keys = self.failures.keys()
-        i = keys.bisect_left(position_to_delete)
-        for key in keys[:i]:
-            del self.failures[key]
+        with Measure(self.clock, "send_queue._clear"):
+            # Delete things out of presence maps
+            keys = self.presence_changed.keys()
+            i = keys.bisect_left(position_to_delete)
+            for key in keys[:i]:
+                del self.presence_changed[key]
+
+            user_ids = set(
+                user_id for uids in self.presence_changed.values() for _, user_id in uids
+            )
 
-        # Delete things out of device map
-        keys = self.device_messages.keys()
-        i = keys.bisect_left(position_to_delete)
-        for key in keys[:i]:
-            del self.device_messages[key]
+            to_del = [
+                user_id for user_id in self.presence_map if user_id not in user_ids
+            ]
+            for user_id in to_del:
+                del self.presence_map[user_id]
+
+            # Delete things out of keyed edus
+            keys = self.keyed_edu_changed.keys()
+            i = keys.bisect_left(position_to_delete)
+            for key in keys[:i]:
+                del self.keyed_edu_changed[key]
+
+            live_keys = set()
+            for edu_key in self.keyed_edu_changed.values():
+                live_keys.add(edu_key)
+
+            to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys]
+            for edu_key in to_del:
+                del self.keyed_edu[edu_key]
+
+            # Delete things out of edu map
+            keys = self.edus.keys()
+            i = keys.bisect_left(position_to_delete)
+            for key in keys[:i]:
+                del self.edus[key]
+
+            # Delete things out of failure map
+            keys = self.failures.keys()
+            i = keys.bisect_left(position_to_delete)
+            for key in keys[:i]:
+                del self.failures[key]
+
+            # Delete things out of device map
+            keys = self.device_messages.keys()
+            i = keys.bisect_left(position_to_delete)
+            for key in keys[:i]:
+                del self.device_messages[key]
 
     def notify_new_events(self, current_id):
         """As per TransactionQueue"""