diff --git a/synapse/config/cache.py b/synapse/config/cache.py
index eb4194a5a9..015b2a138e 100644
--- a/synapse/config/cache.py
+++ b/synapse/config/cache.py
@@ -16,7 +16,7 @@ import logging
import os
import re
import threading
-from typing import Any, Callable, Dict, Optional
+from typing import Any, Callable, Dict, Mapping, Optional
import attr
@@ -94,7 +94,7 @@ def add_resizable_cache(
class CacheConfig(Config):
section = "caches"
- _environ = os.environ
+ _environ: Mapping[str, str] = os.environ
event_cache_size: int
cache_factors: Dict[str, float]
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 8b9ef25d29..30f2d46c3c 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -2031,7 +2031,7 @@ class PasswordAuthProvider:
self.is_3pid_allowed_callbacks: List[IS_3PID_ALLOWED_CALLBACK] = []
# Mapping from login type to login parameters
- self._supported_login_types: Dict[str, Iterable[str]] = {}
+ self._supported_login_types: Dict[str, Tuple[str, ...]] = {}
# Mapping from login type to auth checker callbacks
self.auth_checker_callbacks: Dict[str, List[CHECK_AUTH_CALLBACK]] = {}
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b2784d7333..eca75f1108 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1343,32 +1343,53 @@ class FederationHandler:
)
EventValidator().validate_builder(builder)
- event, context = await self.event_creation_handler.create_new_client_event(
- builder=builder
- )
- event, context = await self.add_display_name_to_third_party_invite(
- room_version_obj, event_dict, event, context
- )
+ # Try several times, it could fail with PartialStateConflictError
+ # in send_membership_event, cf comment in except block.
+ max_retries = 5
+ for i in range(max_retries):
+ try:
+ (
+ event,
+ context,
+ ) = await self.event_creation_handler.create_new_client_event(
+ builder=builder
+ )
- EventValidator().validate_new(event, self.config)
+ event, context = await self.add_display_name_to_third_party_invite(
+ room_version_obj, event_dict, event, context
+ )
- # We need to tell the transaction queue to send this out, even
- # though the sender isn't a local user.
- event.internal_metadata.send_on_behalf_of = self.hs.hostname
+ EventValidator().validate_new(event, self.config)
- try:
- validate_event_for_room_version(event)
- await self._event_auth_handler.check_auth_rules_from_context(event)
- except AuthError as e:
- logger.warning("Denying new third party invite %r because %s", event, e)
- raise e
+ # We need to tell the transaction queue to send this out, even
+ # though the sender isn't a local user.
+ event.internal_metadata.send_on_behalf_of = self.hs.hostname
- await self._check_signature(event, context)
+ try:
+ validate_event_for_room_version(event)
+ await self._event_auth_handler.check_auth_rules_from_context(
+ event
+ )
+ except AuthError as e:
+ logger.warning(
+ "Denying new third party invite %r because %s", event, e
+ )
+ raise e
- # We retrieve the room member handler here as to not cause a cyclic dependency
- member_handler = self.hs.get_room_member_handler()
- await member_handler.send_membership_event(None, event, context)
+ await self._check_signature(event, context)
+
+ # We retrieve the room member handler here as to not cause a cyclic dependency
+ member_handler = self.hs.get_room_member_handler()
+ await member_handler.send_membership_event(None, event, context)
+
+ break
+ except PartialStateConflictError as e:
+ # Persisting couldn't happen because the room got un-partial stated
+ # in the meantime and context needs to be recomputed, so let's do so.
+ if i == max_retries - 1:
+ raise e
+ pass
else:
destinations = {x.split(":", 1)[-1] for x in (sender_user_id, room_id)}
@@ -1400,28 +1421,46 @@ class FederationHandler:
room_version_obj, event_dict
)
- event, context = await self.event_creation_handler.create_new_client_event(
- builder=builder
- )
- event, context = await self.add_display_name_to_third_party_invite(
- room_version_obj, event_dict, event, context
- )
+ # Try several times, it could fail with PartialStateConflictError
+ # in send_membership_event, cf comment in except block.
+ max_retries = 5
+ for i in range(max_retries):
+ try:
+ (
+ event,
+ context,
+ ) = await self.event_creation_handler.create_new_client_event(
+ builder=builder
+ )
+ event, context = await self.add_display_name_to_third_party_invite(
+ room_version_obj, event_dict, event, context
+ )
- try:
- validate_event_for_room_version(event)
- await self._event_auth_handler.check_auth_rules_from_context(event)
- except AuthError as e:
- logger.warning("Denying third party invite %r because %s", event, e)
- raise e
- await self._check_signature(event, context)
+ try:
+ validate_event_for_room_version(event)
+ await self._event_auth_handler.check_auth_rules_from_context(event)
+ except AuthError as e:
+ logger.warning("Denying third party invite %r because %s", event, e)
+ raise e
+ await self._check_signature(event, context)
+
+ # We need to tell the transaction queue to send this out, even
+ # though the sender isn't a local user.
+ event.internal_metadata.send_on_behalf_of = get_domain_from_id(
+ event.sender
+ )
- # We need to tell the transaction queue to send this out, even
- # though the sender isn't a local user.
- event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender)
+ # We retrieve the room member handler here as to not cause a cyclic dependency
+ member_handler = self.hs.get_room_member_handler()
+ await member_handler.send_membership_event(None, event, context)
- # We retrieve the room member handler here as to not cause a cyclic dependency
- member_handler = self.hs.get_room_member_handler()
- await member_handler.send_membership_event(None, event, context)
+ break
+ except PartialStateConflictError as e:
+ # Persisting couldn't happen because the room got un-partial stated
+ # in the meantime and context needs to be recomputed, so let's do so.
+ if i == max_retries - 1:
+ raise e
+ pass
async def add_display_name_to_third_party_invite(
self,
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 66aca2f864..31df7f55cc 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -610,6 +610,8 @@ class FederationEventHandler:
self._state_storage_controller.notify_event_un_partial_stated(
event.event_id
)
+ # Notify that there's a new row in the un_partial_stated_events stream.
+ self._notifier.notify_replication()
@trace
async def backfill(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 845f683358..88fc51a4c9 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -37,7 +37,6 @@ from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
- LimitExceededError,
NotFoundError,
ShadowBanError,
SynapseError,
@@ -999,60 +998,73 @@ class EventCreationHandler:
event.internal_metadata.stream_ordering,
)
- event, context = await self.create_event(
- requester,
- event_dict,
- txn_id=txn_id,
- allow_no_prev_events=allow_no_prev_events,
- prev_event_ids=prev_event_ids,
- state_event_ids=state_event_ids,
- outlier=outlier,
- historical=historical,
- depth=depth,
- )
+ # Try several times, it could fail with PartialStateConflictError
+ # in handle_new_client_event, cf comment in except block.
+ max_retries = 5
+ for i in range(max_retries):
+ try:
+ event, context = await self.create_event(
+ requester,
+ event_dict,
+ txn_id=txn_id,
+ allow_no_prev_events=allow_no_prev_events,
+ prev_event_ids=prev_event_ids,
+ state_event_ids=state_event_ids,
+ outlier=outlier,
+ historical=historical,
+ depth=depth,
+ )
- assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
- event.sender,
- )
+ assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
+ event.sender,
+ )
- spam_check_result = await self.spam_checker.check_event_for_spam(event)
- if spam_check_result != self.spam_checker.NOT_SPAM:
- if isinstance(spam_check_result, tuple):
- try:
- [code, dict] = spam_check_result
- raise SynapseError(
- 403,
- "This message had been rejected as probable spam",
- code,
- dict,
- )
- except ValueError:
- logger.error(
- "Spam-check module returned invalid error value. Expecting [code, dict], got %s",
- spam_check_result,
- )
+ spam_check_result = await self.spam_checker.check_event_for_spam(event)
+ if spam_check_result != self.spam_checker.NOT_SPAM:
+ if isinstance(spam_check_result, tuple):
+ try:
+ [code, dict] = spam_check_result
+ raise SynapseError(
+ 403,
+ "This message had been rejected as probable spam",
+ code,
+ dict,
+ )
+ except ValueError:
+ logger.error(
+ "Spam-check module returned invalid error value. Expecting [code, dict], got %s",
+ spam_check_result,
+ )
- raise SynapseError(
- 403,
- "This message has been rejected as probable spam",
- Codes.FORBIDDEN,
- )
+ raise SynapseError(
+ 403,
+ "This message has been rejected as probable spam",
+ Codes.FORBIDDEN,
+ )
- # Backwards compatibility: if the return value is not an error code, it
- # means the module returned an error message to be included in the
- # SynapseError (which is now deprecated).
- raise SynapseError(
- 403,
- spam_check_result,
- Codes.FORBIDDEN,
+ # Backwards compatibility: if the return value is not an error code, it
+ # means the module returned an error message to be included in the
+ # SynapseError (which is now deprecated).
+ raise SynapseError(
+ 403,
+ spam_check_result,
+ Codes.FORBIDDEN,
+ )
+
+ ev = await self.handle_new_client_event(
+ requester=requester,
+ events_and_context=[(event, context)],
+ ratelimit=ratelimit,
+ ignore_shadow_ban=ignore_shadow_ban,
)
- ev = await self.handle_new_client_event(
- requester=requester,
- events_and_context=[(event, context)],
- ratelimit=ratelimit,
- ignore_shadow_ban=ignore_shadow_ban,
- )
+ break
+ except PartialStateConflictError as e:
+ # Persisting couldn't happen because the room got un-partial stated
+ # in the meantime and context needs to be recomputed, so let's do so.
+ if i == max_retries - 1:
+ raise e
+ pass
# we know it was persisted, so must have a stream ordering
assert ev.internal_metadata.stream_ordering
@@ -1356,7 +1368,7 @@ class EventCreationHandler:
Raises:
ShadowBanError if the requester has been shadow-banned.
- SynapseError(503) if attempting to persist a partial state event in
+ PartialStateConflictError if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
extra_users = extra_users or []
@@ -1418,34 +1430,23 @@ class EventCreationHandler:
# We now persist the event (and update the cache in parallel, since we
# don't want to block on it).
event, context = events_and_context[0]
- try:
- result, _ = await make_deferred_yieldable(
- gather_results(
- (
- run_in_background(
- self._persist_events,
- requester=requester,
- events_and_context=events_and_context,
- ratelimit=ratelimit,
- extra_users=extra_users,
- ),
- run_in_background(
- self.cache_joined_hosts_for_events, events_and_context
- ).addErrback(
- log_failure, "cache_joined_hosts_for_event failed"
- ),
+ result, _ = await make_deferred_yieldable(
+ gather_results(
+ (
+ run_in_background(
+ self._persist_events,
+ requester=requester,
+ events_and_context=events_and_context,
+ ratelimit=ratelimit,
+ extra_users=extra_users,
),
- consumeErrors=True,
- )
- ).addErrback(unwrapFirstError)
- except PartialStateConflictError as e:
- # The event context needs to be recomputed.
- # Turn the error into a 429, as a hint to the client to try again.
- logger.info(
- "Room %s was un-partial stated while persisting client event.",
- event.room_id,
+ run_in_background(
+ self.cache_joined_hosts_for_events, events_and_context
+ ).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
+ ),
+ consumeErrors=True,
)
- raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
+ ).addErrback(unwrapFirstError)
return result
@@ -2012,26 +2013,39 @@ class EventCreationHandler:
for user_id in members:
requester = create_requester(user_id, authenticated_entity=self.server_name)
try:
- event, context = await self.create_event(
- requester,
- {
- "type": EventTypes.Dummy,
- "content": {},
- "room_id": room_id,
- "sender": user_id,
- },
- )
+ # Try several times, it could fail with PartialStateConflictError
+ # in handle_new_client_event, cf comment in except block.
+ max_retries = 5
+ for i in range(max_retries):
+ try:
+ event, context = await self.create_event(
+ requester,
+ {
+ "type": EventTypes.Dummy,
+ "content": {},
+ "room_id": room_id,
+ "sender": user_id,
+ },
+ )
- event.internal_metadata.proactively_send = False
+ event.internal_metadata.proactively_send = False
- # Since this is a dummy-event it is OK if it is sent by a
- # shadow-banned user.
- await self.handle_new_client_event(
- requester,
- events_and_context=[(event, context)],
- ratelimit=False,
- ignore_shadow_ban=True,
- )
+ # Since this is a dummy-event it is OK if it is sent by a
+ # shadow-banned user.
+ await self.handle_new_client_event(
+ requester,
+ events_and_context=[(event, context)],
+ ratelimit=False,
+ ignore_shadow_ban=True,
+ )
+
+ break
+ except PartialStateConflictError as e:
+ # Persisting couldn't happen because the room got un-partial stated
+ # in the meantime and context needs to be recomputed, so let's do so.
+ if i == max_retries - 1:
+ raise e
+ pass
return True
except AuthError:
logger.info(
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index f81241c2b3..572c7b4db3 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -62,6 +62,7 @@ from synapse.events.utils import copy_and_fixup_power_levels_contents
from synapse.handlers.relations import BundledAggregations
from synapse.module_api import NOT_SPAM
from synapse.rest.admin._base import assert_user_is_admin
+from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.streams import EventSource
from synapse.types import (
JsonDict,
@@ -207,46 +208,64 @@ class RoomCreationHandler:
new_room_id = self._generate_room_id()
- # Check whether the user has the power level to carry out the upgrade.
- # `check_auth_rules_from_context` will check that they are in the room and have
- # the required power level to send the tombstone event.
- (
- tombstone_event,
- tombstone_context,
- ) = await self.event_creation_handler.create_event(
- requester,
- {
- "type": EventTypes.Tombstone,
- "state_key": "",
- "room_id": old_room_id,
- "sender": user_id,
- "content": {
- "body": "This room has been replaced",
- "replacement_room": new_room_id,
- },
- },
- )
- validate_event_for_room_version(tombstone_event)
- await self._event_auth_handler.check_auth_rules_from_context(tombstone_event)
+ # Try several times, it could fail with PartialStateConflictError
+ # in _upgrade_room, cf comment in except block.
+ max_retries = 5
+ for i in range(max_retries):
+ try:
+ # Check whether the user has the power level to carry out the upgrade.
+ # `check_auth_rules_from_context` will check that they are in the room and have
+ # the required power level to send the tombstone event.
+ (
+ tombstone_event,
+ tombstone_context,
+ ) = await self.event_creation_handler.create_event(
+ requester,
+ {
+ "type": EventTypes.Tombstone,
+ "state_key": "",
+ "room_id": old_room_id,
+ "sender": user_id,
+ "content": {
+ "body": "This room has been replaced",
+ "replacement_room": new_room_id,
+ },
+ },
+ )
+ validate_event_for_room_version(tombstone_event)
+ await self._event_auth_handler.check_auth_rules_from_context(
+ tombstone_event
+ )
- # Upgrade the room
- #
- # If this user has sent multiple upgrade requests for the same room
- # and one of them is not complete yet, cache the response and
- # return it to all subsequent requests
- ret = await self._upgrade_response_cache.wrap(
- (old_room_id, user_id),
- self._upgrade_room,
- requester,
- old_room_id,
- old_room, # args for _upgrade_room
- new_room_id,
- new_version,
- tombstone_event,
- tombstone_context,
- )
+ # Upgrade the room
+ #
+ # If this user has sent multiple upgrade requests for the same room
+ # and one of them is not complete yet, cache the response and
+ # return it to all subsequent requests
+ ret = await self._upgrade_response_cache.wrap(
+ (old_room_id, user_id),
+ self._upgrade_room,
+ requester,
+ old_room_id,
+ old_room, # args for _upgrade_room
+ new_room_id,
+ new_version,
+ tombstone_event,
+ tombstone_context,
+ )
- return ret
+ return ret
+ except PartialStateConflictError as e:
+ # Clean up the cache so we can retry properly
+ self._upgrade_response_cache.unset((old_room_id, user_id))
+ # Persisting couldn't happen because the room got un-partial stated
+ # in the meantime and context needs to be recomputed, so let's do so.
+ if i == max_retries - 1:
+ raise e
+ pass
+
+ # This is to satisfy mypy and should never happen
+ raise PartialStateConflictError()
async def _upgrade_room(
self,
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
index 411a6fb22f..c73d2adaad 100644
--- a/synapse/handlers/room_batch.py
+++ b/synapse/handlers/room_batch.py
@@ -375,6 +375,8 @@ class RoomBatchHandler:
# Events are sorted by (topological_ordering, stream_ordering)
# where topological_ordering is just depth.
for (event, context) in reversed(events_to_persist):
+ # This call can't raise `PartialStateConflictError` since we forbid
+ # use of the historical batch API during partial state
await self.event_creation_handler.handle_new_client_event(
await self.create_requester_for_user_id_from_app_service(
event.sender, app_service_requester.app_service
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0c39e852a1..d236cc09b5 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -34,6 +34,7 @@ from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.logging import opentracing
from synapse.module_api import NOT_SPAM
+from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.types import (
JsonDict,
Requester,
@@ -392,60 +393,81 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
event_pos = await self.store.get_position_for_event(existing_event_id)
return existing_event_id, event_pos.stream
- event, context = await self.event_creation_handler.create_event(
- requester,
- {
- "type": EventTypes.Member,
- "content": content,
- "room_id": room_id,
- "sender": requester.user.to_string(),
- "state_key": user_id,
- # For backwards compatibility:
- "membership": membership,
- "origin_server_ts": origin_server_ts,
- },
- txn_id=txn_id,
- allow_no_prev_events=allow_no_prev_events,
- prev_event_ids=prev_event_ids,
- state_event_ids=state_event_ids,
- depth=depth,
- require_consent=require_consent,
- outlier=outlier,
- historical=historical,
- )
-
- prev_state_ids = await context.get_prev_state_ids(
- StateFilter.from_types([(EventTypes.Member, None)])
- )
+ # Try several times, it could fail with PartialStateConflictError,
+ # in handle_new_client_event, cf comment in except block.
+ max_retries = 5
+ for i in range(max_retries):
+ try:
+ event, context = await self.event_creation_handler.create_event(
+ requester,
+ {
+ "type": EventTypes.Member,
+ "content": content,
+ "room_id": room_id,
+ "sender": requester.user.to_string(),
+ "state_key": user_id,
+ # For backwards compatibility:
+ "membership": membership,
+ "origin_server_ts": origin_server_ts,
+ },
+ txn_id=txn_id,
+ allow_no_prev_events=allow_no_prev_events,
+ prev_event_ids=prev_event_ids,
+ state_event_ids=state_event_ids,
+ depth=depth,
+ require_consent=require_consent,
+ outlier=outlier,
+ historical=historical,
+ )
- prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
+ prev_state_ids = await context.get_prev_state_ids(
+ StateFilter.from_types([(EventTypes.Member, None)])
+ )
- if event.membership == Membership.JOIN:
- newly_joined = True
- if prev_member_event_id:
- prev_member_event = await self.store.get_event(prev_member_event_id)
- newly_joined = prev_member_event.membership != Membership.JOIN
-
- # Only rate-limit if the user actually joined the room, otherwise we'll end
- # up blocking profile updates.
- if newly_joined and ratelimit:
- await self._join_rate_limiter_local.ratelimit(requester)
- await self._join_rate_per_room_limiter.ratelimit(
- requester, key=room_id, update=False
+ prev_member_event_id = prev_state_ids.get(
+ (EventTypes.Member, user_id), None
)
- with opentracing.start_active_span("handle_new_client_event"):
- result_event = await self.event_creation_handler.handle_new_client_event(
- requester,
- events_and_context=[(event, context)],
- extra_users=[target],
- ratelimit=ratelimit,
- )
- if event.membership == Membership.LEAVE:
- if prev_member_event_id:
- prev_member_event = await self.store.get_event(prev_member_event_id)
- if prev_member_event.membership == Membership.JOIN:
- await self._user_left_room(target, room_id)
+ if event.membership == Membership.JOIN:
+ newly_joined = True
+ if prev_member_event_id:
+ prev_member_event = await self.store.get_event(
+ prev_member_event_id
+ )
+ newly_joined = prev_member_event.membership != Membership.JOIN
+
+ # Only rate-limit if the user actually joined the room, otherwise we'll end
+ # up blocking profile updates.
+ if newly_joined and ratelimit:
+ await self._join_rate_limiter_local.ratelimit(requester)
+ await self._join_rate_per_room_limiter.ratelimit(
+ requester, key=room_id, update=False
+ )
+ with opentracing.start_active_span("handle_new_client_event"):
+ result_event = (
+ await self.event_creation_handler.handle_new_client_event(
+ requester,
+ events_and_context=[(event, context)],
+ extra_users=[target],
+ ratelimit=ratelimit,
+ )
+ )
+
+ if event.membership == Membership.LEAVE:
+ if prev_member_event_id:
+ prev_member_event = await self.store.get_event(
+ prev_member_event_id
+ )
+ if prev_member_event.membership == Membership.JOIN:
+ await self._user_left_room(target, room_id)
+
+ break
+ except PartialStateConflictError as e:
+ # Persisting couldn't happen because the room got un-partial stated
+ # in the meantime and context needs to be recomputed, so let's do so.
+ if i == max_retries - 1:
+ raise e
+ pass
# we know it was persisted, so should have a stream ordering
assert result_event.internal_metadata.stream_ordering
@@ -1234,6 +1256,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
ratelimit: Whether to rate limit this request.
Raises:
SynapseError if there was a problem changing the membership.
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
target_user = UserID.from_string(event.state_key)
room_id = event.room_id
@@ -1863,21 +1887,37 @@ class RoomMemberMasterHandler(RoomMemberHandler):
list(previous_membership_event.auth_event_ids()) + prev_event_ids
)
- event, context = await self.event_creation_handler.create_event(
- requester,
- event_dict,
- txn_id=txn_id,
- prev_event_ids=prev_event_ids,
- auth_event_ids=auth_event_ids,
- outlier=True,
- )
- event.internal_metadata.out_of_band_membership = True
+ # Try several times, it could fail with PartialStateConflictError
+ # in handle_new_client_event, cf comment in except block.
+ max_retries = 5
+ for i in range(max_retries):
+ try:
+ event, context = await self.event_creation_handler.create_event(
+ requester,
+ event_dict,
+ txn_id=txn_id,
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
+ outlier=True,
+ )
+ event.internal_metadata.out_of_band_membership = True
+
+ result_event = (
+ await self.event_creation_handler.handle_new_client_event(
+ requester,
+ events_and_context=[(event, context)],
+ extra_users=[UserID.from_string(target_user)],
+ )
+ )
+
+ break
+ except PartialStateConflictError as e:
+ # Persisting couldn't happen because the room got un-partial stated
+ # in the meantime and context needs to be recomputed, so let's do so.
+ if i == max_retries - 1:
+ raise e
+ pass
- result_event = await self.event_creation_handler.handle_new_client_event(
- requester,
- events_and_context=[(event, context)],
- extra_users=[UserID.from_string(target_user)],
- )
# we know it was persisted, so must have a stream ordering
assert result_event.internal_metadata.stream_ordering
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index 8d08625237..c6b869c6f4 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -20,7 +20,6 @@ from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Sequence, Set,
import attr
from synapse.api.constants import (
- EventContentFields,
EventTypes,
HistoryVisibility,
JoinRules,
@@ -701,13 +700,6 @@ class RoomSummaryHandler:
# there should always be an entry
assert stats is not None, "unable to retrieve stats for %s" % (room_id,)
- current_state_ids = await self._storage_controllers.state.get_current_state_ids(
- room_id
- )
- create_event = await self._store.get_event(
- current_state_ids[(EventTypes.Create, "")]
- )
-
entry = {
"room_id": stats["room_id"],
"name": stats["name"],
@@ -720,7 +712,7 @@ class RoomSummaryHandler:
stats["history_visibility"] == HistoryVisibility.WORLD_READABLE
),
"guest_can_join": stats["guest_access"] == "can_join",
- "room_type": create_event.content.get(EventContentFields.ROOM_TYPE),
+ "room_type": stats["room_type"],
}
if self._msc3266_enabled:
@@ -730,7 +722,11 @@ class RoomSummaryHandler:
# Federation requests need to provide additional information so the
# requested server is able to filter the response appropriately.
if for_federation:
+ current_state_ids = (
+ await self._storage_controllers.state.get_current_state_ids(room_id)
+ )
room_version = await self._store.get_room_version(room_id)
+
if await self._event_auth_handler.has_restricted_join_rules(
current_state_ids, room_version
):
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index b4dad47b45..658d89210d 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -36,6 +36,7 @@ from synapse.replication.tcp.streams import (
TagAccountDataStream,
ToDeviceStream,
TypingStream,
+ UnPartialStatedEventStream,
UnPartialStatedRoomStream,
)
from synapse.replication.tcp.streams.events import (
@@ -43,7 +44,10 @@ from synapse.replication.tcp.streams.events import (
EventsStreamEventRow,
EventsStreamRow,
)
-from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStreamRow
+from synapse.replication.tcp.streams.partial_state import (
+ UnPartialStatedEventStreamRow,
+ UnPartialStatedRoomStreamRow,
+)
from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
from synapse.util.async_helpers import Linearizer, timeout_deferred
from synapse.util.metrics import Measure
@@ -247,6 +251,14 @@ class ReplicationDataHandler:
self._state_storage_controller.notify_room_un_partial_stated(
row.room_id
)
+ elif stream_name == UnPartialStatedEventStream.NAME:
+ for row in rows:
+ assert isinstance(row, UnPartialStatedEventStreamRow)
+
+ # Wake up any tasks waiting for the event to be un-partial-stated.
+ self._state_storage_controller.notify_event_un_partial_stated(
+ row.event_id
+ )
await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index 8575666d9c..110f10aab9 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -42,7 +42,10 @@ from synapse.replication.tcp.streams._base import (
)
from synapse.replication.tcp.streams.events import EventsStream
from synapse.replication.tcp.streams.federation import FederationStream
-from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
+from synapse.replication.tcp.streams.partial_state import (
+ UnPartialStatedEventStream,
+ UnPartialStatedRoomStream,
+)
STREAMS_MAP = {
stream.NAME: stream
@@ -63,6 +66,7 @@ STREAMS_MAP = {
AccountDataStream,
UserSignatureStream,
UnPartialStatedRoomStream,
+ UnPartialStatedEventStream,
)
}
@@ -83,4 +87,5 @@ __all__ = [
"AccountDataStream",
"UserSignatureStream",
"UnPartialStatedRoomStream",
+ "UnPartialStatedEventStream",
]
diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py
index 18f087ffa2..b5a2ae74b6 100644
--- a/synapse/replication/tcp/streams/partial_state.py
+++ b/synapse/replication/tcp/streams/partial_state.py
@@ -46,3 +46,31 @@ class UnPartialStatedRoomStream(Stream):
current_token_without_instance(store.get_un_partial_stated_rooms_token),
store.get_un_partial_stated_rooms_from_stream,
)
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class UnPartialStatedEventStreamRow:
+ # ID of the event that has been un-partial-stated.
+ event_id: str
+
+ # True iff the rejection status of the event changed as a result of being
+ # un-partial-stated.
+ rejection_status_changed: bool
+
+
+class UnPartialStatedEventStream(Stream):
+ """
+ Stream to notify about events becoming un-partial-stated.
+ """
+
+ NAME = "un_partial_stated_event"
+ ROW_TYPE = UnPartialStatedEventStreamRow
+
+ def __init__(self, hs: "HomeServer"):
+ store = hs.get_datastores().main
+ super().__init__(
+ hs.get_instance_name(),
+ # TODO(faster_joins, multiple writers): we need to account for instance names
+ current_token_without_instance(store.get_un_partial_stated_events_token),
+ store.get_un_partial_stated_events_from_stream,
+ )
diff --git a/synapse/res/templates/recaptcha.html b/synapse/res/templates/recaptcha.html
index 8204928cdf..f00992a24b 100644
--- a/synapse/res/templates/recaptcha.html
+++ b/synapse/res/templates/recaptcha.html
@@ -3,11 +3,10 @@
{% block header %}
<script src="https://www.recaptcha.net/recaptcha/api.js" async defer></script>
-<script src="//code.jquery.com/jquery-1.11.2.min.js"></script>
<link rel="stylesheet" href="/_matrix/static/client/register/style.css">
<script>
function captchaDone() {
- $('#registrationForm').submit();
+ document.getElementById('registrationForm').submit();
}
</script>
{% endblock %}
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index ee5469d5a8..fdfb46ab82 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -202,14 +202,20 @@ class StateHandler:
room_id: the room_id containing the given events.
event_ids: the events whose state should be fetched and resolved.
await_full_state: if `True`, will block if we do not yet have complete state
- at the given `event_id`s, regardless of whether `state_filter` is
- satisfied by partial state.
+ at these events and `state_filter` is not satisfied by partial state.
+ Defaults to `True`.
Returns:
the state dict (a mapping from (event_type, state_key) -> event_id) which
holds the resolution of the states after the given event IDs.
"""
logger.debug("calling resolve_state_groups from compute_state_after_events")
+ if (
+ await_full_state
+ and state_filter
+ and not state_filter.must_await_full_state(self.hs.is_mine_id)
+ ):
+ await_full_state = False
ret = await self.resolve_state_groups_for_events(
room_id, event_ids, await_full_state
)
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 7ebe34f773..3a0c370fde 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -275,15 +275,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
)
self.db_pool.updates.register_background_index_update(
- "event_push_summary_unique_index",
- index_name="event_push_summary_unique_index",
- table="event_push_summary",
- columns=["user_id", "room_id"],
- unique=True,
- replaces_index="event_push_summary_user_rm",
- )
-
- self.db_pool.updates.register_background_index_update(
"event_push_summary_unique_index2",
index_name="event_push_summary_unique_index2",
table="event_push_summary",
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 318fd7dc71..761b15a815 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -59,8 +59,9 @@ from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
-from synapse.replication.tcp.streams import BackfillStream
+from synapse.replication.tcp.streams import BackfillStream, UnPartialStatedEventStream
from synapse.replication.tcp.streams.events import EventsStream
+from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
@@ -70,6 +71,7 @@ from synapse.storage.database import (
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import (
+ AbstractStreamIdGenerator,
AbstractStreamIdTracker,
MultiWriterIdGenerator,
StreamIdGenerator,
@@ -292,6 +294,93 @@ class EventsWorkerStore(SQLBaseStore):
id_column="chain_id",
)
+ self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator
+
+ if isinstance(database.engine, PostgresEngine):
+ self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
+ db_conn=db_conn,
+ db=database,
+ stream_name="un_partial_stated_event_stream",
+ instance_name=hs.get_instance_name(),
+ tables=[
+ ("un_partial_stated_event_stream", "instance_name", "stream_id")
+ ],
+ sequence_name="un_partial_stated_event_stream_sequence",
+ # TODO(faster_joins, multiple writers) Support multiple writers.
+ writers=["master"],
+ )
+ else:
+ self._un_partial_stated_events_stream_id_gen = StreamIdGenerator(
+ db_conn, "un_partial_stated_event_stream", "stream_id"
+ )
+
+ def get_un_partial_stated_events_token(self) -> int:
+ # TODO(faster_joins, multiple writers): This is inappropriate if there are multiple
+ # writers because workers that don't write often will hold all
+ # readers up.
+ return self._un_partial_stated_events_stream_id_gen.get_current_token()
+
+ async def get_un_partial_stated_events_from_stream(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]:
+ """Get updates for the un-partial-stated events replication stream.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
+ """
+
+ if last_id == current_id:
+ return [], current_id, False
+
+ def get_un_partial_stated_events_from_stream_txn(
+ txn: LoggingTransaction,
+ ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]:
+ sql = """
+ SELECT stream_id, event_id, rejection_status_changed
+ FROM un_partial_stated_event_stream
+ WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
+ ORDER BY stream_id ASC
+ LIMIT ?
+ """
+ txn.execute(sql, (last_id, current_id, instance_name, limit))
+ updates = [
+ (
+ row[0],
+ (
+ row[1],
+ bool(row[2]),
+ ),
+ )
+ for row in txn
+ ]
+ limited = False
+ upto_token = current_id
+ if len(updates) >= limit:
+ upto_token = updates[-1][0]
+ limited = True
+
+ return updates, upto_token, limited
+
+ return await self.db_pool.runInteraction(
+ "get_un_partial_stated_events_from_stream",
+ get_un_partial_stated_events_from_stream_txn,
+ )
+
def process_replication_rows(
self,
stream_name: str,
@@ -303,6 +392,16 @@ class EventsWorkerStore(SQLBaseStore):
self._stream_id_gen.advance(instance_name, token)
elif stream_name == BackfillStream.NAME:
self._backfill_id_gen.advance(instance_name, -token)
+ elif stream_name == UnPartialStatedEventStream.NAME:
+ for row in rows:
+ assert isinstance(row, UnPartialStatedEventStreamRow)
+
+ self.is_partial_state_event.invalidate((row.event_id,))
+
+ if row.rejection_status_changed:
+ # If the partial-stated event became rejected or unrejected
+ # when it wasn't before, we need to invalidate this cache.
+ self._invalidate_local_get_event_cache(row.event_id)
super().process_replication_rows(stream_name, instance_name, token, rows)
@@ -2292,6 +2391,9 @@ class EventsWorkerStore(SQLBaseStore):
This can happen, for example, when resyncing state during a faster join.
+ It is the caller's responsibility to ensure that other workers are
+ sent a notification so that they call `_invalidate_local_get_event_cache()`.
+
Args:
txn:
event_id: ID of event to update
@@ -2330,14 +2432,3 @@ class EventsWorkerStore(SQLBaseStore):
)
self.invalidate_get_event_cache_after_txn(txn, event_id)
-
- # TODO(faster_joins): invalidate the cache on workers. Ideally we'd just
- # call '_send_invalidation_to_replication', but we actually need the other
- # end to call _invalidate_local_get_event_cache() rather than (just)
- # _get_event_cache.invalidate().
- #
- # One solution might be to (somehow) get the workers to call
- # _invalidate_caches_for_event() (though that will invalidate more than
- # strictly necessary).
- #
- # https://github.com/matrix-org/synapse/issues/12994
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index c801a93b5b..f32cbb2dec 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -14,7 +14,7 @@
# limitations under the License.
import collections.abc
import logging
-from typing import TYPE_CHECKING, Collection, Dict, Iterable, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Any, Collection, Dict, Iterable, Optional, Set, Tuple
import attr
@@ -24,6 +24,8 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.opentracing import trace
+from synapse.replication.tcp.streams import UnPartialStatedEventStream
+from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
@@ -80,6 +82,21 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
+ self._instance_name: str = hs.get_instance_name()
+
+ def process_replication_rows(
+ self,
+ stream_name: str,
+ instance_name: str,
+ token: int,
+ rows: Iterable[Any],
+ ) -> None:
+ if stream_name == UnPartialStatedEventStream.NAME:
+ for row in rows:
+ assert isinstance(row, UnPartialStatedEventStreamRow)
+ self._get_state_group_for_event.invalidate((row.event_id,))
+
+ super().process_replication_rows(stream_name, instance_name, token, rows)
async def get_room_version(self, room_id: str) -> RoomVersion:
"""Get the room_version of a given room
@@ -404,18 +421,21 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
context: EventContext,
) -> None:
"""Update the state group for a partial state event"""
- await self.db_pool.runInteraction(
- "update_state_for_partial_state_event",
- self._update_state_for_partial_state_event_txn,
- event,
- context,
- )
+ async with self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id:
+ await self.db_pool.runInteraction(
+ "update_state_for_partial_state_event",
+ self._update_state_for_partial_state_event_txn,
+ event,
+ context,
+ un_partial_state_event_stream_id,
+ )
def _update_state_for_partial_state_event_txn(
self,
txn: LoggingTransaction,
event: EventBase,
context: EventContext,
+ un_partial_state_event_stream_id: int,
) -> None:
# we shouldn't have any outliers here
assert not event.internal_metadata.is_outlier()
@@ -436,7 +456,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
# the event may now be rejected where it was not before, or vice versa,
# in which case we need to update the rejected flags.
- if bool(context.rejected) != (event.rejected_reason is not None):
+ rejection_status_changed = bool(context.rejected) != (
+ event.rejected_reason is not None
+ )
+ if rejection_status_changed:
self.mark_event_rejected_txn(txn, event.event_id, context.rejected)
self.db_pool.simple_delete_one_txn(
@@ -445,8 +468,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
keyvalues={"event_id": event.event_id},
)
- # TODO(faster_joins): need to do something about workers here
- # https://github.com/matrix-org/synapse/issues/12994
txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,))
txn.call_after(
self._get_state_group_for_event.prefill,
@@ -454,6 +475,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
state_group,
)
+ self.db_pool.simple_insert_txn(
+ txn,
+ "un_partial_stated_event_stream",
+ {
+ "stream_id": un_partial_state_event_stream_id,
+ "instance_name": self._instance_name,
+ "event_id": event.event_id,
+ "rejection_status_changed": rejection_status_changed,
+ },
+ )
+
class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
diff --git a/synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql b/synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql
new file mode 100644
index 0000000000..0e571f78c3
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql
@@ -0,0 +1,34 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Stream for notifying that an event has become un-partial-stated.
+CREATE TABLE un_partial_stated_event_stream(
+ -- Position in the stream
+ stream_id BIGINT PRIMARY KEY NOT NULL,
+
+ -- Which instance wrote this entry.
+ instance_name TEXT NOT NULL,
+
+ -- Which event has been un-partial-stated.
+ event_id TEXT NOT NULL REFERENCES events(event_id) ON DELETE CASCADE,
+
+ -- true iff the `rejected` status of the event changed when it became
+ -- un-partial-stated.
+ rejection_status_changed BOOLEAN NOT NULL
+);
+
+-- We want an index here because of the foreign key constraint:
+-- upon deleting an event, the database needs to be able to check here.
+CREATE UNIQUE INDEX un_partial_stated_event_stream_room_id ON un_partial_stated_event_stream (event_id);
diff --git a/synapse/storage/schema/main/delta/73/23_fix_thread_index.sql b/synapse/storage/schema/main/delta/73/23_fix_thread_index.sql
new file mode 100644
index 0000000000..ec519ceebf
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/23_fix_thread_index.sql
@@ -0,0 +1,33 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- If a Synapse deployment made a large jump in versions (from < 1.62.0 to >= 1.70.0)
+-- in a single upgrade then it might be possible for the event_push_summary_unique_index
+-- to be created in the background from delta 71/02event_push_summary_unique.sql after
+-- delta 73/06thread_notifications_thread_id_idx.sql is executed, causing it to
+-- not drop the event_push_summary_unique_index index.
+--
+-- See https://github.com/matrix-org/synapse/issues/14641
+
+-- Stop the index from being scheduled for creation in the background.
+DELETE FROM background_updates WHERE update_name = 'event_push_summary_unique_index';
+
+-- The above background job also replaces another index, so ensure that side-effect
+-- is applied.
+DROP INDEX IF EXISTS event_push_summary_user_rm;
+
+-- Fix deployments which ran the 73/06thread_notifications_thread_id_idx.sql delta
+-- before the event_push_summary_unique_index background job was run.
+DROP INDEX IF EXISTS event_push_summary_unique_index;
diff --git a/synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres b/synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres
new file mode 100644
index 0000000000..1ec24702f3
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres
@@ -0,0 +1,20 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS un_partial_stated_event_stream_sequence;
+
+SELECT setval('un_partial_stated_event_stream_sequence', (
+ SELECT COALESCE(MAX(stream_id), 1) FROM un_partial_stated_event_stream
+));
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 72227359b9..81df71a0c5 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -53,9 +53,9 @@ F = TypeVar("F", bound=Callable[..., Any])
class CachedFunction(Generic[F]):
- invalidate: Any = None
- invalidate_all: Any = None
- prefill: Any = None
+ invalidate: Callable[[Tuple[Any, ...]], None]
+ invalidate_all: Callable[[], None]
+ prefill: Callable[[Tuple[Any, ...], Any], None]
cache: Any = None
num_args: Any = None
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index dcf0eac3bf..452d5d04c1 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -788,26 +788,21 @@ class LruCache(Generic[KT, VT]):
def __contains__(self, key: KT) -> bool:
return self.contains(key)
- def set_cache_factor(self, factor: float) -> bool:
+ def set_cache_factor(self, factor: float) -> None:
"""
Set the cache factor for this individual cache.
This will trigger a resize if it changes, which may require evicting
items from the cache.
-
- Returns:
- Whether the cache changed size or not.
"""
if not self.apply_cache_factor_from_config:
- return False
+ return
new_size = int(self._original_max_size * factor)
if new_size != self.max_size:
self.max_size = new_size
if self._on_resize:
self._on_resize()
- return True
- return False
def __del__(self) -> None:
# We're about to be deleted, so we make sure to clear up all the nodes
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index a3eb5f741b..340e5e9145 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -167,12 +167,10 @@ class ResponseCache(Generic[KV]):
# the should_cache bit, we leave it in the cache for now and schedule
# its removal later.
if self.timeout_sec and context.should_cache:
- self.clock.call_later(
- self.timeout_sec, self._result_cache.pop, key, None
- )
+ self.clock.call_later(self.timeout_sec, self.unset, key)
else:
# otherwise, remove the result immediately.
- self._result_cache.pop(key, None)
+ self.unset(key)
return r
# make sure we do this *after* adding the entry to result_cache,
@@ -181,6 +179,14 @@ class ResponseCache(Generic[KV]):
result.addBoth(on_complete)
return entry
+ def unset(self, key: KV) -> None:
+ """Remove the cached value for this key from the cache, if any.
+
+ Args:
+ key: key used to remove the cached value
+ """
+ self._result_cache.pop(key, None)
+
async def wrap(
self,
key: KV,
|