summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/send_queue.py14
-rw-r--r--synapse/python_dependencies.py3
-rw-r--r--synapse/util/caches/stream_change_cache.py57
3 files changed, 40 insertions, 34 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 9f1142b5a9..0f4aea95a3 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -35,7 +35,7 @@ from synapse.storage.presence import UserPresenceState
 from synapse.util.metrics import Measure
 from synapse.metrics import LaterGauge
 
-from blist import sorteddict
+from sortedcontainers import SortedDict
 from collections import namedtuple
 
 import logging
@@ -55,19 +55,19 @@ class FederationRemoteSendQueue(object):
         self.is_mine_id = hs.is_mine_id
 
         self.presence_map = {}  # Pending presence map user_id -> UserPresenceState
-        self.presence_changed = sorteddict()  # Stream position -> user_id
+        self.presence_changed = SortedDict()  # Stream position -> user_id
 
         self.keyed_edu = {}  # (destination, key) -> EDU
-        self.keyed_edu_changed = sorteddict()  # stream position -> (destination, key)
+        self.keyed_edu_changed = SortedDict()  # stream position -> (destination, key)
 
-        self.edus = sorteddict()  # stream position -> Edu
+        self.edus = SortedDict()  # stream position -> Edu
 
-        self.failures = sorteddict()  # stream position -> (destination, Failure)
+        self.failures = SortedDict()  # stream position -> (destination, Failure)
 
-        self.device_messages = sorteddict()  # stream position -> destination
+        self.device_messages = SortedDict()  # stream position -> destination
 
         self.pos = 1
-        self.pos_time = sorteddict()
+        self.pos_time = SortedDict()
 
         # EVERYTHING IS SAD. In particular, python only makes new scopes when
         # we make a new function, so we need to make a new function so the inner
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 478c497722..001c798fe3 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -50,7 +50,7 @@ REQUIREMENTS = {
     "bcrypt": ["bcrypt>=3.1.0"],
     "pillow": ["PIL"],
     "pydenticon": ["pydenticon"],
-    "blist": ["blist"],
+    "sortedcontainers": ["sortedcontainers"],
     "pysaml2>=3.0.0": ["saml2>=3.0.0"],
     "pymacaroons-pynacl": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
@@ -58,6 +58,7 @@ REQUIREMENTS = {
     "six": ["six"],
     "prometheus_client": ["prometheus_client"],
 }
+
 CONDITIONAL_REQUIREMENTS = {
     "web_client": {
         "matrix_angular_sdk>=0.6.8": ["syweb>=0.6.8"],
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index a7fe0397fa..817118e30f 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -13,10 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
+from synapse.util import caches
 
 
-from blist import sorteddict
+from sortedcontainers import SortedDict
 import logging
 
 
@@ -32,16 +32,18 @@ class StreamChangeCache(object):
     entities that may have changed since that position. If position key is too
     old then the cache will simply return all given entities.
     """
-    def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
-        self._max_size = int(max_size * CACHE_SIZE_FACTOR)
+
+    def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None):
+        self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
         self._entity_to_key = {}
-        self._cache = sorteddict()
+        self._cache = SortedDict()
         self._earliest_known_stream_pos = current_stream_pos
         self.name = name
-        self.metrics = register_cache("cache", self.name, self._cache)
+        self.metrics = caches.register_cache("cache", self.name, self._cache)
 
-        for entity, stream_pos in prefilled_cache.items():
-            self.entity_has_changed(entity, stream_pos)
+        if prefilled_cache:
+            for entity, stream_pos in prefilled_cache.items():
+                self.entity_has_changed(entity, stream_pos)
 
     def has_entity_changed(self, entity, stream_pos):
         """Returns True if the entity may have been updated since stream_pos
@@ -65,22 +67,25 @@ class StreamChangeCache(object):
         return False
 
     def get_entities_changed(self, entities, stream_pos):
-        """Returns subset of entities that have had new things since the
-        given position. If the position is too old it will just return the given list.
+        """
+        Returns subset of entities that have had new things since the given
+        position.  Entities unknown to the cache will be returned.  If the
+        position is too old it will just return the given list.
         """
         assert type(stream_pos) is int
 
         if stream_pos >= self._earliest_known_stream_pos:
-            keys = self._cache.keys()
-            i = keys.bisect_right(stream_pos)
+            not_known_entities = set(entities) - set(self._entity_to_key)
 
-            result = set(
-                self._cache[k] for k in keys[i:]
-            ).intersection(entities)
+            result = (
+                set(self._cache.values()[self._cache.bisect_right(stream_pos) :])
+                .intersection(entities)
+                .union(not_known_entities)
+            )
 
             self.metrics.inc_hits()
         else:
-            result = entities
+            result = set(entities)
             self.metrics.inc_misses()
 
         return result
@@ -90,12 +95,13 @@ class StreamChangeCache(object):
         """
         assert type(stream_pos) is int
 
+        if not self._cache:
+            # If we have no cache, nothing can have changed.
+            return False
+
         if stream_pos >= self._earliest_known_stream_pos:
             self.metrics.inc_hits()
-            keys = self._cache.keys()
-            i = keys.bisect_right(stream_pos)
-
-            return i < len(keys)
+            return self._cache.bisect_right(stream_pos) < len(self._cache)
         else:
             self.metrics.inc_misses()
             return True
@@ -107,10 +113,7 @@ class StreamChangeCache(object):
         assert type(stream_pos) is int
 
         if stream_pos >= self._earliest_known_stream_pos:
-            keys = self._cache.keys()
-            i = keys.bisect_right(stream_pos)
-
-            return [self._cache[k] for k in keys[i:]]
+            return self._cache.values()[self._cache.bisect_right(stream_pos) :]
         else:
             return None
 
@@ -129,8 +132,10 @@ class StreamChangeCache(object):
             self._entity_to_key[entity] = stream_pos
 
             while len(self._cache) > self._max_size:
-                k, r = self._cache.popitem()
-                self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
+                k, r = self._cache.popitem(0)
+                self._earliest_known_stream_pos = max(
+                    k, self._earliest_known_stream_pos,
+                )
                 self._entity_to_key.pop(r, None)
 
     def get_max_pos_of_last_change(self, entity):