diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 98cf125cb5..76e4c5cd80 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -31,10 +31,16 @@ Events are replicated via a separate events stream.
from .units import Edu
+from synapse.util.metrics import Measure
+import synapse.metrics
+
from blist import sorteddict
import ujson
+metrics = synapse.metrics.get_metrics_for(__name__)
+
+
PRESENCE_TYPE = "p"
KEYED_EDU_TYPE = "k"
EDU_TYPE = "e"
@@ -49,8 +55,6 @@ class FederationRemoteSendQueue(object):
self.server_name = hs.hostname
self.clock = hs.get_clock()
- # TODO: Add metrics for size of lists below
-
self.presence_map = {}
self.presence_changed = sorteddict()
@@ -61,10 +65,24 @@ class FederationRemoteSendQueue(object):
self.failures = sorteddict()
+ self.device_messages = sorteddict()
+
self.pos = 1
self.pos_time = sorteddict()
- self.device_messages = sorteddict()
+ # EVERYTHING IS SAD. In particular, python only makes new scopes when
+ # we make a new function, so we need to make a new function so the inner
+ def register(name, queue):
+ metrics.register_callback(
+ queue_name + "_size",
+ lambda: len(queue),
+ )
+
+ for queue_name in [
+ "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
+ "edus", "failures", "device_messages", "pos_time",
+ ]:
+ register(queue_name, getattr(self, queue_name))
self.clock.looping_call(self._clear_queue, 30 * 1000)
@@ -76,7 +94,6 @@ class FederationRemoteSendQueue(object):
def _clear_queue(self):
"""Clear the queues for anything older than N minutes"""
- # TODO measure this function time.
FIVE_MINUTES_AGO = 5 * 60 * 1000
now = self.clock.time_msec()
@@ -94,51 +111,54 @@ class FederationRemoteSendQueue(object):
def _clear_queue_before_pos(self, position_to_delete):
"""Clear all the queues from before a given position"""
- # Delete things out of presence maps
- keys = self.presence_changed.keys()
- i = keys.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.presence_changed[key]
-
- user_ids = set(
- user_id for uids in self.presence_changed.values() for _, user_id in uids
- )
-
- to_del = [user_id for user_id in self.presence_map if user_id not in user_ids]
- for user_id in to_del:
- del self.presence_map[user_id]
-
- # Delete things out of keyed edus
- keys = self.keyed_edu_changed.keys()
- i = keys.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.keyed_edu_changed[key]
-
- live_keys = set()
- for edu_key in self.keyed_edu_changed.values():
- live_keys.add(edu_key)
-
- to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys]
- for edu_key in to_del:
- del self.keyed_edu[edu_key]
-
- # Delete things out of edu map
- keys = self.edus.keys()
- i = keys.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.edus[key]
-
- # Delete things out of failure map
- keys = self.failures.keys()
- i = keys.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.failures[key]
+ with Measure(self.clock, "send_queue._clear"):
+ # Delete things out of presence maps
+ keys = self.presence_changed.keys()
+ i = keys.bisect_left(position_to_delete)
+ for key in keys[:i]:
+ del self.presence_changed[key]
+
+ user_ids = set(
+ user_id for uids in self.presence_changed.values() for _, user_id in uids
+ )
- # Delete things out of device map
- keys = self.device_messages.keys()
- i = keys.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.device_messages[key]
+ to_del = [
+ user_id for user_id in self.presence_map if user_id not in user_ids
+ ]
+ for user_id in to_del:
+ del self.presence_map[user_id]
+
+ # Delete things out of keyed edus
+ keys = self.keyed_edu_changed.keys()
+ i = keys.bisect_left(position_to_delete)
+ for key in keys[:i]:
+ del self.keyed_edu_changed[key]
+
+ live_keys = set()
+ for edu_key in self.keyed_edu_changed.values():
+ live_keys.add(edu_key)
+
+ to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys]
+ for edu_key in to_del:
+ del self.keyed_edu[edu_key]
+
+ # Delete things out of edu map
+ keys = self.edus.keys()
+ i = keys.bisect_left(position_to_delete)
+ for key in keys[:i]:
+ del self.edus[key]
+
+ # Delete things out of failure map
+ keys = self.failures.keys()
+ i = keys.bisect_left(position_to_delete)
+ for key in keys[:i]:
+ del self.failures[key]
+
+ # Delete things out of device map
+ keys = self.device_messages.keys()
+ i = keys.bisect_left(position_to_delete)
+ for key in keys[:i]:
+ del self.device_messages[key]
def notify_new_events(self, current_id):
"""As per TransactionQueue"""
|