diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 34cc5ecd11..3c44b4bf86 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -45,6 +45,7 @@ from synapse.api.errors import (
FederationDeniedError,
FederationError,
HttpResponseException,
+ LimitExceededError,
NotFoundError,
RequestSendFailed,
SynapseError,
@@ -64,6 +65,7 @@ from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
ReplicationStoreRoomOnOutlierMembershipRestServlet,
)
+from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import JsonDict, StateMap, get_domain_from_id
@@ -549,15 +551,29 @@ class FederationHandler:
# https://github.com/matrix-org/synapse/issues/12998
await self.store.store_partial_state_room(room_id, ret.servers_in_room)
- max_stream_id = await self._federation_event_handler.process_remote_join(
- origin,
- room_id,
- auth_chain,
- state,
- event,
- room_version_obj,
- partial_state=ret.partial_state,
- )
+ try:
+ max_stream_id = (
+ await self._federation_event_handler.process_remote_join(
+ origin,
+ room_id,
+ auth_chain,
+ state,
+ event,
+ room_version_obj,
+ partial_state=ret.partial_state,
+ )
+ )
+ except PartialStateConflictError as e:
+ # The homeserver was already in the room and it is no longer partial
+ # stated. We ought to be doing a local join instead. Turn the error into
+ # a 429, as a hint to the client to try again.
+ # TODO(faster_joins): `_should_perform_remote_join` suggests that we may
+ # do a remote join for restricted rooms even if we have full state.
+ logger.error(
+ "Room %s was un-partial stated while processing remote join.",
+ room_id,
+ )
+ raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
if ret.partial_state:
# Kick off the process of asynchronously fetching the state for this
@@ -1567,11 +1583,6 @@ class FederationHandler:
# we raced against more events arriving with partial state. Go round
# the loop again. We've already logged a warning, so no need for more.
- # TODO(faster_joins): there is still a race here, whereby incoming events which raced
- # with us will fail to be persisted after the call to `clear_partial_state_room` due to
- # having partial state.
- # https://github.com/matrix-org/synapse/issues/12988
- #
continue
events = await self.store.get_events_as_list(
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 479d936dc0..c74117c19a 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -64,6 +64,7 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
)
from synapse.state import StateResolutionStore
+from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import (
@@ -275,7 +276,16 @@ class FederationEventHandler:
affected=pdu.event_id,
)
- await self._process_received_pdu(origin, pdu, state_ids=None)
+ try:
+ await self._process_received_pdu(origin, pdu, state_ids=None)
+ except PartialStateConflictError:
+ # The room was un-partial stated while we were processing the PDU.
+ # Try once more, with full state this time.
+ logger.info(
+ "Room %s was un-partial stated while processing the PDU, trying again.",
+ room_id,
+ )
+ await self._process_received_pdu(origin, pdu, state_ids=None)
async def on_send_membership_event(
self, origin: str, event: EventBase
@@ -306,6 +316,9 @@ class FederationEventHandler:
Raises:
SynapseError if the event is not accepted into the room
+ PartialStateConflictError if the room was un-partial stated in between
+ computing the state at the event and persisting it. The caller should
+ retry exactly once in this case.
"""
logger.debug(
"on_send_membership_event: Got event: %s, signatures: %s",
@@ -423,6 +436,8 @@ class FederationEventHandler:
Raises:
SynapseError if the response is in some way invalid.
+ PartialStateConflictError if the homeserver is already in the room and it
+ has been un-partial stated.
"""
create_event = None
for e in state:
@@ -1084,10 +1099,14 @@ class FederationEventHandler:
state_ids: Normally None, but if we are handling a gap in the graph
(ie, we are missing one or more prev_events), the resolved state at the
- event
+ event. Must not be partial state.
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
+
+ PartialStateConflictError: if the room was un-partial stated in between
+ computing the state at the event and persisting it. The caller should retry
+ exactly once in this case. Will never be raised if `state_ids` is provided.
"""
logger.debug("Processing event: %s", event)
assert not event.internal_metadata.outlier
@@ -1933,6 +1952,9 @@ class FederationEventHandler:
event: The event itself.
context: The event context.
backfilled: True if the event was backfilled.
+
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
# this method should not be called on outliers (those code paths call
# persist_events_and_notify directly.)
@@ -1985,6 +2007,10 @@ class FederationEventHandler:
Returns:
The stream ID after which all events have been persisted.
+
+ Raises:
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
if not event_and_contexts:
return self._store.get_room_max_stream_ordering()
@@ -1993,14 +2019,19 @@ class FederationEventHandler:
if instance != self._instance_name:
# Limit the number of events sent over replication. We choose 200
# here as that is what we default to in `max_request_body_size(..)`
- for batch in batch_iter(event_and_contexts, 200):
- result = await self._send_events(
- instance_name=instance,
- store=self._store,
- room_id=room_id,
- event_and_contexts=batch,
- backfilled=backfilled,
- )
+ try:
+ for batch in batch_iter(event_and_contexts, 200):
+ result = await self._send_events(
+ instance_name=instance,
+ store=self._store,
+ room_id=room_id,
+ event_and_contexts=batch,
+ backfilled=backfilled,
+ )
+ except SynapseError as e:
+ if e.code == HTTPStatus.CONFLICT:
+ raise PartialStateConflictError()
+ raise
return result["max_stream_id"]
else:
assert self._storage_controllers.persistence
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c6b40a5b7a..1980e37dae 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -37,6 +37,7 @@ from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
+ LimitExceededError,
NotFoundError,
ShadowBanError,
SynapseError,
@@ -53,6 +54,7 @@ from synapse.handlers.directory import DirectoryHandler
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import (
@@ -1250,6 +1252,8 @@ class EventCreationHandler:
Raises:
ShadowBanError if the requester has been shadow-banned.
+ SynapseError(503) if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
extra_users = extra_users or []
@@ -1300,24 +1304,35 @@ class EventCreationHandler:
# We now persist the event (and update the cache in parallel, since we
# don't want to block on it).
- result, _ = await make_deferred_yieldable(
- gather_results(
- (
- run_in_background(
- self._persist_event,
- requester=requester,
- event=event,
- context=context,
- ratelimit=ratelimit,
- extra_users=extra_users,
+ try:
+ result, _ = await make_deferred_yieldable(
+ gather_results(
+ (
+ run_in_background(
+ self._persist_event,
+ requester=requester,
+ event=event,
+ context=context,
+ ratelimit=ratelimit,
+ extra_users=extra_users,
+ ),
+ run_in_background(
+ self.cache_joined_hosts_for_event, event, context
+ ).addErrback(
+ log_failure, "cache_joined_hosts_for_event failed"
+ ),
),
- run_in_background(
- self.cache_joined_hosts_for_event, event, context
- ).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
- ),
- consumeErrors=True,
+ consumeErrors=True,
+ )
+ ).addErrback(unwrapFirstError)
+ except PartialStateConflictError as e:
+ # The event context needs to be recomputed.
+ # Turn the error into a 429, as a hint to the client to try again.
+ logger.info(
+ "Room %s was un-partial stated while persisting client event.",
+ event.room_id,
)
- ).addErrback(unwrapFirstError)
+ raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
return result
@@ -1332,6 +1347,9 @@ class EventCreationHandler:
"""Actually persists the event. Should only be called by
`handle_new_client_event`, and see its docstring for documentation of
the arguments.
+
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
# Skip push notification actions for historical messages
@@ -1348,16 +1366,21 @@ class EventCreationHandler:
# If we're a worker we need to hit out to the master.
writer_instance = self._events_shard_config.get_instance(event.room_id)
if writer_instance != self._instance_name:
- result = await self.send_event(
- instance_name=writer_instance,
- event_id=event.event_id,
- store=self.store,
- requester=requester,
- event=event,
- context=context,
- ratelimit=ratelimit,
- extra_users=extra_users,
- )
+ try:
+ result = await self.send_event(
+ instance_name=writer_instance,
+ event_id=event.event_id,
+ store=self.store,
+ requester=requester,
+ event=event,
+ context=context,
+ ratelimit=ratelimit,
+ extra_users=extra_users,
+ )
+ except SynapseError as e:
+ if e.code == HTTPStatus.CONFLICT:
+ raise PartialStateConflictError()
+ raise
stream_id = result["stream_id"]
event_id = result["event_id"]
if event_id != event.event_id:
@@ -1485,6 +1508,10 @@ class EventCreationHandler:
The persisted event. This may be different than the given event if
it was de-duplicated (e.g. because we had already persisted an
event with the same transaction ID.)
+
+ Raises:
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
extra_users = extra_users or []
|