diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index a333acc4aa..39d2bee8da 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1102,6 +1102,9 @@ class FederationHandler(BaseHandler):
user_id,
"leave"
)
+ # Mark as outlier as we don't have any state for this event; we're not
+ # even in the room.
+ event.internal_metadata.outlier = True
event = self._sign_event(event)
# Try the host that we succesfully called /make_leave/ on first for
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 3f33d473cc..7034f48b50 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -108,7 +108,7 @@ class SlavedEventStore(BaseSlavedStore):
get_current_state_ids = (
StateStore.__dict__["get_current_state_ids"]
)
- get_state_group_delta = DataStore.get_state_group_delta.__func__
+ get_state_group_delta = StateStore.__dict__["get_state_group_delta"]
_get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
has_room_changed_since = DataStore.has_room_changed_since.__func__
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 8b2c4c3043..69c46911ec 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -67,6 +67,7 @@ class ReplicationStreamer(object):
self.store = hs.get_datastore()
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()
+ self.notifier = hs.get_notifier()
# Current connections.
self.connections = []
@@ -99,7 +100,7 @@ class ReplicationStreamer(object):
if not hs.config.send_federation:
self.federation_sender = hs.get_federation_sender()
- hs.get_notifier().add_replication_callback(self.on_notifier_poke)
+ self.notifier.add_replication_callback(self.on_notifier_poke)
# Keeps track of whether we are currently checking for updates
self.is_looping = False
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 151223219d..d1e679719b 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -20,6 +20,7 @@ from synapse.util.stringutils import to_ascii
from synapse.storage.engines import PostgresEngine
from twisted.internet import defer
+from collections import namedtuple
import logging
@@ -29,6 +30,16 @@ logger = logging.getLogger(__name__)
MAX_STATE_DELTA_HOPS = 100
+class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delta_ids"))):
+ """Return type of get_state_group_delta that implements __len__, which lets
+ us use the itrable flag when caching
+ """
+ __slots__ = []
+
+ def __len__(self):
+ return len(self.delta_ids) if self.delta_ids else 0
+
+
class StateStore(SQLBaseStore):
""" Keeps track of the state at a given event.
@@ -98,6 +109,7 @@ class StateStore(SQLBaseStore):
_get_current_state_ids_txn,
)
+ @cached(max_entries=10000, iterable=True)
def get_state_group_delta(self, state_group):
"""Given a state group try to return a previous group and a delta between
the old and the new.
@@ -117,7 +129,7 @@ class StateStore(SQLBaseStore):
)
if not prev_group:
- return None, None
+ return _GetStateGroupDelta(None, None)
delta_ids = self._simple_select_list_txn(
txn,
@@ -128,10 +140,10 @@ class StateStore(SQLBaseStore):
retcols=("type", "state_key", "event_id",)
)
- return prev_group, {
+ return _GetStateGroupDelta(prev_group, {
(row["type"], row["state_key"]): row["event_id"]
for row in delta_ids
- }
+ })
return self.runInteraction(
"get_state_group_delta",
_get_state_group_delta_txn,
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index c498aee46c..609625b322 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -96,10 +96,10 @@ class StreamChangeCache(object):
if stream_pos >= self._earliest_known_stream_pos:
self.metrics.inc_hits()
- if stream_pos >= max(self._cache):
- return False
- else:
- return True
+ keys = self._cache.keys()
+ i = keys.bisect_right(stream_pos)
+
+ return i < len(keys)
else:
self.metrics.inc_misses()
return True
|