summary refs log tree commit diff
path: root/synapse/federation/send_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/send_queue.py')
-rw-r--r--synapse/federation/send_queue.py98
1 files changed, 45 insertions, 53 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 93e5acebc1..5157c3860d 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -29,23 +29,22 @@ dead worker doesn't cause the queues to grow limitlessly.
 Events are replicated via a separate events stream.
 """
 
-from .units import Edu
+import logging
+from collections import namedtuple
 
+from six import iteritems, itervalues
+
+from sortedcontainers import SortedDict
+
+from synapse.metrics import LaterGauge
 from synapse.storage.presence import UserPresenceState
 from synapse.util.metrics import Measure
-import synapse.metrics
-
-from blist import sorteddict
-from collections import namedtuple
 
-import logging
+from .units import Edu
 
 logger = logging.getLogger(__name__)
 
 
-metrics = synapse.metrics.get_metrics_for(__name__)
-
-
 class FederationRemoteSendQueue(object):
     """A drop in replacement for TransactionQueue"""
 
@@ -56,29 +55,27 @@ class FederationRemoteSendQueue(object):
         self.is_mine_id = hs.is_mine_id
 
         self.presence_map = {}  # Pending presence map user_id -> UserPresenceState
-        self.presence_changed = sorteddict()  # Stream position -> user_id
+        self.presence_changed = SortedDict()  # Stream position -> user_id
 
         self.keyed_edu = {}  # (destination, key) -> EDU
-        self.keyed_edu_changed = sorteddict()  # stream position -> (destination, key)
+        self.keyed_edu_changed = SortedDict()  # stream position -> (destination, key)
 
-        self.edus = sorteddict()  # stream position -> Edu
+        self.edus = SortedDict()  # stream position -> Edu
 
-        self.failures = sorteddict()  # stream position -> (destination, Failure)
+        self.failures = SortedDict()  # stream position -> (destination, Failure)
 
-        self.device_messages = sorteddict()  # stream position -> destination
+        self.device_messages = SortedDict()  # stream position -> destination
 
         self.pos = 1
-        self.pos_time = sorteddict()
+        self.pos_time = SortedDict()
 
         # EVERYTHING IS SAD. In particular, python only makes new scopes when
         # we make a new function, so we need to make a new function so the inner
         # lambda binds to the queue rather than to the name of the queue which
         # changes. ARGH.
         def register(name, queue):
-            metrics.register_callback(
-                queue_name + "_size",
-                lambda: len(queue),
-            )
+            LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,),
+                       "", [], lambda: len(queue))
 
         for queue_name in [
             "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
@@ -101,7 +98,7 @@ class FederationRemoteSendQueue(object):
         now = self.clock.time_msec()
 
         keys = self.pos_time.keys()
-        time = keys.bisect_left(now - FIVE_MINUTES_AGO)
+        time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
         if not keys[:time]:
             return
 
@@ -116,13 +113,13 @@ class FederationRemoteSendQueue(object):
         with Measure(self.clock, "send_queue._clear"):
             # Delete things out of presence maps
             keys = self.presence_changed.keys()
-            i = keys.bisect_left(position_to_delete)
+            i = self.presence_changed.bisect_left(position_to_delete)
             for key in keys[:i]:
                 del self.presence_changed[key]
 
             user_ids = set(
                 user_id
-                for uids in self.presence_changed.itervalues()
+                for uids in itervalues(self.presence_changed)
                 for user_id in uids
             )
 
@@ -134,7 +131,7 @@ class FederationRemoteSendQueue(object):
 
             # Delete things out of keyed edus
             keys = self.keyed_edu_changed.keys()
-            i = keys.bisect_left(position_to_delete)
+            i = self.keyed_edu_changed.bisect_left(position_to_delete)
             for key in keys[:i]:
                 del self.keyed_edu_changed[key]
 
@@ -148,19 +145,19 @@ class FederationRemoteSendQueue(object):
 
             # Delete things out of edu map
             keys = self.edus.keys()
-            i = keys.bisect_left(position_to_delete)
+            i = self.edus.bisect_left(position_to_delete)
             for key in keys[:i]:
                 del self.edus[key]
 
             # Delete things out of failure map
             keys = self.failures.keys()
-            i = keys.bisect_left(position_to_delete)
+            i = self.failures.bisect_left(position_to_delete)
             for key in keys[:i]:
                 del self.failures[key]
 
             # Delete things out of device map
             keys = self.device_messages.keys()
-            i = keys.bisect_left(position_to_delete)
+            i = self.device_messages.bisect_left(position_to_delete)
             for key in keys[:i]:
                 del self.device_messages[key]
 
@@ -200,7 +197,7 @@ class FederationRemoteSendQueue(object):
 
         # We only want to send presence for our own users, so lets always just
         # filter here just in case.
-        local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
+        local_states = list(filter(lambda s: self.is_mine_id(s.user_id), states))
 
         self.presence_map.update({state.user_id: state for state in local_states})
         self.presence_changed[pos] = [state.user_id for state in local_states]
@@ -253,13 +250,12 @@ class FederationRemoteSendQueue(object):
             self._clear_queue_before_pos(federation_ack)
 
         # Fetch changed presence
-        keys = self.presence_changed.keys()
-        i = keys.bisect_right(from_token)
-        j = keys.bisect_right(to_token) + 1
+        i = self.presence_changed.bisect_right(from_token)
+        j = self.presence_changed.bisect_right(to_token) + 1
         dest_user_ids = [
             (pos, user_id)
-            for pos in keys[i:j]
-            for user_id in self.presence_changed[pos]
+            for pos, user_id_list in self.presence_changed.items()[i:j]
+            for user_id in user_id_list
         ]
 
         for (key, user_id) in dest_user_ids:
@@ -268,34 +264,31 @@ class FederationRemoteSendQueue(object):
             )))
 
         # Fetch changes keyed edus
-        keys = self.keyed_edu_changed.keys()
-        i = keys.bisect_right(from_token)
-        j = keys.bisect_right(to_token) + 1
+        i = self.keyed_edu_changed.bisect_right(from_token)
+        j = self.keyed_edu_changed.bisect_right(to_token) + 1
         # We purposefully clobber based on the key here, python dict comprehensions
         # always use the last value, so this will correctly point to the last
         # stream position.
-        keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
+        keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
 
-        for ((destination, edu_key), pos) in keyed_edus.iteritems():
+        for ((destination, edu_key), pos) in iteritems(keyed_edus):
             rows.append((pos, KeyedEduRow(
                 key=edu_key,
                 edu=self.keyed_edu[(destination, edu_key)],
             )))
 
         # Fetch changed edus
-        keys = self.edus.keys()
-        i = keys.bisect_right(from_token)
-        j = keys.bisect_right(to_token) + 1
-        edus = ((k, self.edus[k]) for k in keys[i:j])
+        i = self.edus.bisect_right(from_token)
+        j = self.edus.bisect_right(to_token) + 1
+        edus = self.edus.items()[i:j]
 
         for (pos, edu) in edus:
             rows.append((pos, EduRow(edu)))
 
         # Fetch changed failures
-        keys = self.failures.keys()
-        i = keys.bisect_right(from_token)
-        j = keys.bisect_right(to_token) + 1
-        failures = ((k, self.failures[k]) for k in keys[i:j])
+        i = self.failures.bisect_right(from_token)
+        j = self.failures.bisect_right(to_token) + 1
+        failures = self.failures.items()[i:j]
 
         for (pos, (destination, failure)) in failures:
             rows.append((pos, FailureRow(
@@ -304,12 +297,11 @@ class FederationRemoteSendQueue(object):
             )))
 
         # Fetch changed device messages
-        keys = self.device_messages.keys()
-        i = keys.bisect_right(from_token)
-        j = keys.bisect_right(to_token) + 1
-        device_messages = {self.device_messages[k]: k for k in keys[i:j]}
+        i = self.device_messages.bisect_right(from_token)
+        j = self.device_messages.bisect_right(to_token) + 1
+        device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
 
-        for (destination, pos) in device_messages.iteritems():
+        for (destination, pos) in iteritems(device_messages):
             rows.append((pos, DeviceRow(
                 destination=destination,
             )))
@@ -528,19 +520,19 @@ def process_rows_for_federation(transaction_queue, rows):
     if buff.presence:
         transaction_queue.send_presence(buff.presence)
 
-    for destination, edu_map in buff.keyed_edus.iteritems():
+    for destination, edu_map in iteritems(buff.keyed_edus):
         for key, edu in edu_map.items():
             transaction_queue.send_edu(
                 edu.destination, edu.edu_type, edu.content, key=key,
             )
 
-    for destination, edu_list in buff.edus.iteritems():
+    for destination, edu_list in iteritems(buff.edus):
         for edu in edu_list:
             transaction_queue.send_edu(
                 edu.destination, edu.edu_type, edu.content, key=None,
             )
 
-    for destination, failure_list in buff.failures.iteritems():
+    for destination, failure_list in iteritems(buff.failures):
         for failure in failure_list:
             transaction_queue.send_failure(destination, failure)