diff --git a/changelog.d/9358.misc b/changelog.d/9358.misc
new file mode 100644
index 0000000000..cc7614afc0
--- /dev/null
+++ b/changelog.d/9358.misc
@@ -0,0 +1 @@
+Added a fix that invalidates cache for empty timed-out sync responses.
\ No newline at end of file
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 4e8ed7b33f..ce644e01ad 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -277,8 +277,9 @@ class SyncHandler:
user_id = sync_config.user.to_string()
await self.auth.check_auth_blocking(requester=requester)
- res = await self.response_cache.wrap(
+ res = await self.response_cache.wrap_conditional(
sync_config.request_key,
+ lambda result: since_token != result.next_batch,
self._wait_for_sync_for_user,
sync_config,
since_token,
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 32228f42ee..53f85195a7 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, TypeVar
+from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, Set, TypeVar
from twisted.internet import defer
@@ -40,6 +40,7 @@ class ResponseCache(Generic[T]):
def __init__(self, hs: "HomeServer", name: str, timeout_ms: float = 0):
# Requests that haven't finished yet.
self.pending_result_cache = {} # type: Dict[T, ObservableDeferred]
+ self.pending_conditionals = {} # type: Dict[T, Set[Callable[[Any], bool]]]
self.clock = hs.get_clock()
self.timeout_sec = timeout_ms / 1000.0
@@ -101,7 +102,11 @@ class ResponseCache(Generic[T]):
self.pending_result_cache[key] = result
def remove(r):
- if self.timeout_sec:
+ should_cache = all(
+ func(r) for func in self.pending_conditionals.pop(key, [])
+ )
+
+ if self.timeout_sec and should_cache:
self.clock.call_later(
self.timeout_sec, self.pending_result_cache.pop, key, None
)
@@ -112,6 +117,31 @@ class ResponseCache(Generic[T]):
result.addBoth(remove)
return result.observe()
+ def add_conditional(self, key: T, conditional: Callable[[Any], bool]):
+ self.pending_conditionals.setdefault(key, set()).add(conditional)
+
+ def wrap_conditional(
+ self,
+ key: T,
+ should_cache: Callable[[Any], bool],
+ callback: "Callable[..., Any]",
+ *args: Any,
+ **kwargs: Any
+ ) -> defer.Deferred:
+ """The same as wrap(), but adds a conditional to the final execution.
+
+ When the final execution completes, *all* conditionals need to return True for it to properly cache,
+ else it'll not be cached in a timed fashion.
+ """
+
+ # See if there's already a result on this key that hasn't yet completed. Due to the single-threaded nature of
+ # python, adding a key immediately in the same execution thread will not cause a race condition.
+ result = self.get(key)
+ if not result or isinstance(result, defer.Deferred) and not result.called:
+ self.add_conditional(key, should_cache)
+
+ return self.wrap(key, callback, *args, **kwargs)
+
def wrap(
self, key: T, callback: "Callable[..., Any]", *args: Any, **kwargs: Any
) -> defer.Deferred:
|