diff --git a/changelog.d/17127.bugfix b/changelog.d/17127.bugfix
new file mode 100644
index 0000000000..93c7314098
--- /dev/null
+++ b/changelog.d/17127.bugfix
@@ -0,0 +1 @@
+Fix a bug which meant that to-device messages received over federation could be dropped when the server was under load or networking problems caused problems between Synapse processes or the database.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 65d3a661fe..7ffc650aa1 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -546,7 +546,25 @@ class FederationServer(FederationBase):
edu_type=edu_dict["edu_type"],
content=edu_dict["content"],
)
- await self.registry.on_edu(edu.edu_type, origin, edu.content)
+ try:
+ await self.registry.on_edu(edu.edu_type, origin, edu.content)
+ except Exception:
+ # If there was an error handling the EDU, we must reject the
+ # transaction.
+ #
+ # Some EDU types (notably, to-device messages) are, despite their name,
+ # expected to be reliable; if we weren't able to do something with it,
+ # we have to tell the sender that, and the only way the protocol gives
+ # us to do so is by sending an HTTP error back on the transaction.
+ #
+ # We log the exception now, and then raise a new SynapseError to cause
+ # the transaction to be failed.
+ logger.exception("Error handling EDU of type %s", edu.edu_type)
+ raise SynapseError(500, f"Error handing EDU of type {edu.edu_type}")
+
+ # TODO: if the first EDU fails, we should probably abort the whole
+ # thing rather than carrying on with the rest of them. That would
+ # probably be best done inside `concurrently_execute`.
await concurrently_execute(
_process_edu,
@@ -1414,12 +1432,7 @@ class FederationHandlerRegistry:
handler = self.edu_handlers.get(edu_type)
if handler:
with start_active_span_from_edu(content, "handle_edu"):
- try:
- await handler(origin, content)
- except SynapseError as e:
- logger.info("Failed to handle edu %r: %r", edu_type, e)
- except Exception:
- logger.exception("Failed to handle edu %r", edu_type)
+ await handler(origin, content)
return
# Check if we can route it somewhere else that isn't us
@@ -1428,17 +1441,12 @@ class FederationHandlerRegistry:
# Pick an instance randomly so that we don't overload one.
route_to = random.choice(instances)
- try:
- await self._send_edu(
- instance_name=route_to,
- edu_type=edu_type,
- origin=origin,
- content=content,
- )
- except SynapseError as e:
- logger.info("Failed to handle edu %r: %r", edu_type, e)
- except Exception:
- logger.exception("Failed to handle edu %r", edu_type)
+ await self._send_edu(
+ instance_name=route_to,
+ edu_type=edu_type,
+ origin=origin,
+ content=content,
+ )
return
# Oh well, let's just log and move on.
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 2b034dcbb7..79be7c97c8 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -104,6 +104,9 @@ class DeviceMessageHandler:
"""
Handle receiving to-device messages from remote homeservers.
+ Note that any errors thrown from this method will cause the federation /send
+ request to receive an error response.
+
Args:
origin: The remote homeserver.
content: The JSON dictionary containing the to-device messages.
diff --git a/tests/federation/test_federation_server.py b/tests/federation/test_federation_server.py
index 36684c2c91..88261450b1 100644
--- a/tests/federation/test_federation_server.py
+++ b/tests/federation/test_federation_server.py
@@ -67,6 +67,23 @@ class FederationServerTests(unittest.FederatingHomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, channel.result)
self.assertEqual(channel.json_body["errcode"], "M_NOT_JSON")
+ def test_failed_edu_causes_500(self) -> None:
+ """If the EDU handler fails, /send should return a 500."""
+
+ async def failing_handler(_origin: str, _content: JsonDict) -> None:
+ raise Exception("bleh")
+
+ self.hs.get_federation_registry().register_edu_handler(
+ "FAIL_EDU_TYPE", failing_handler
+ )
+
+ channel = self.make_signed_federation_request(
+ "PUT",
+ "/_matrix/federation/v1/send/txn",
+ {"edus": [{"edu_type": "FAIL_EDU_TYPE", "content": {}}]},
+ )
+ self.assertEqual(500, channel.code, channel.result)
+
class ServerACLsTestCase(unittest.TestCase):
def test_blocked_server(self) -> None:
diff --git a/tests/federation/transport/test_server.py b/tests/federation/transport/test_server.py
index 190b79bf26..0237369998 100644
--- a/tests/federation/transport/test_server.py
+++ b/tests/federation/transport/test_server.py
@@ -59,7 +59,14 @@ class RoomDirectoryFederationTests(unittest.FederatingHomeserverTestCase):
"/_matrix/federation/v1/send/txn_id_1234/",
content={
"edus": [
- {"edu_type": EduTypes.DEVICE_LIST_UPDATE, "content": {"foo": "bar"}}
+ {
+ "edu_type": EduTypes.DEVICE_LIST_UPDATE,
+ "content": {
+ "device_id": "QBUAZIFURK",
+ "stream_id": 0,
+ "user_id": "@user:id",
+ },
+ },
],
"pdus": [],
},
|