summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/federation.py3
-rw-r--r--synapse/replication/slave/storage/events.py2
-rw-r--r--synapse/replication/tcp/resource.py3
-rw-r--r--synapse/storage/state.py18
-rw-r--r--synapse/util/caches/stream_change_cache.py8
5 files changed, 25 insertions, 9 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index a333acc4aa..39d2bee8da 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1102,6 +1102,9 @@ class FederationHandler(BaseHandler):
             user_id,
             "leave"
         )
+        # Mark as outlier as we don't have any state for this event; we're not
+        # even in the room.
+        event.internal_metadata.outlier = True
         event = self._sign_event(event)
 
         # Try the host that we succesfully called /make_leave/ on first for
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 3f33d473cc..7034f48b50 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -108,7 +108,7 @@ class SlavedEventStore(BaseSlavedStore):
     get_current_state_ids = (
         StateStore.__dict__["get_current_state_ids"]
     )
-    get_state_group_delta = DataStore.get_state_group_delta.__func__
+    get_state_group_delta = StateStore.__dict__["get_state_group_delta"]
     _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
     has_room_changed_since = DataStore.has_room_changed_since.__func__
 
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 8b2c4c3043..69c46911ec 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -67,6 +67,7 @@ class ReplicationStreamer(object):
         self.store = hs.get_datastore()
         self.presence_handler = hs.get_presence_handler()
         self.clock = hs.get_clock()
+        self.notifier = hs.get_notifier()
 
         # Current connections.
         self.connections = []
@@ -99,7 +100,7 @@ class ReplicationStreamer(object):
         if not hs.config.send_federation:
             self.federation_sender = hs.get_federation_sender()
 
-        hs.get_notifier().add_replication_callback(self.on_notifier_poke)
+        self.notifier.add_replication_callback(self.on_notifier_poke)
 
         # Keeps track of whether we are currently checking for updates
         self.is_looping = False
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 151223219d..d1e679719b 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -20,6 +20,7 @@ from synapse.util.stringutils import to_ascii
 from synapse.storage.engines import PostgresEngine
 
 from twisted.internet import defer
+from collections import namedtuple
 
 import logging
 
@@ -29,6 +30,16 @@ logger = logging.getLogger(__name__)
 MAX_STATE_DELTA_HOPS = 100
 
 
+class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delta_ids"))):
+    """Return type of get_state_group_delta that implements __len__, which lets
+    us use the itrable flag when caching
+    """
+    __slots__ = []
+
+    def __len__(self):
+        return len(self.delta_ids) if self.delta_ids else 0
+
+
 class StateStore(SQLBaseStore):
     """ Keeps track of the state at a given event.
 
@@ -98,6 +109,7 @@ class StateStore(SQLBaseStore):
             _get_current_state_ids_txn,
         )
 
+    @cached(max_entries=10000, iterable=True)
     def get_state_group_delta(self, state_group):
         """Given a state group try to return a previous group and a delta between
         the old and the new.
@@ -117,7 +129,7 @@ class StateStore(SQLBaseStore):
             )
 
             if not prev_group:
-                return None, None
+                return _GetStateGroupDelta(None, None)
 
             delta_ids = self._simple_select_list_txn(
                 txn,
@@ -128,10 +140,10 @@ class StateStore(SQLBaseStore):
                 retcols=("type", "state_key", "event_id",)
             )
 
-            return prev_group, {
+            return _GetStateGroupDelta(prev_group, {
                 (row["type"], row["state_key"]): row["event_id"]
                 for row in delta_ids
-            }
+            })
         return self.runInteraction(
             "get_state_group_delta",
             _get_state_group_delta_txn,
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index c498aee46c..609625b322 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -96,10 +96,10 @@ class StreamChangeCache(object):
 
         if stream_pos >= self._earliest_known_stream_pos:
             self.metrics.inc_hits()
-            if stream_pos >= max(self._cache):
-                return False
-            else:
-                return True
+            keys = self._cache.keys()
+            i = keys.bisect_right(stream_pos)
+
+            return i < len(keys)
         else:
             self.metrics.inc_misses()
             return True