diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py
index 2873b4d430..b8fee72898 100644
--- a/tests/federation/test_federation_catch_up.py
+++ b/tests/federation/test_federation_catch_up.py
@@ -7,13 +7,21 @@ from synapse.federation.sender import PerDestinationQueue, TransactionManager
from synapse.federation.units import Edu
from synapse.rest import admin
from synapse.rest.client import login, room
+from synapse.types import JsonDict
from synapse.util.retryutils import NotRetryingDestination
from tests.test_utils import event_injection, make_awaitable
-from tests.unittest import FederatingHomeserverTestCase, override_config
+from tests.unittest import FederatingHomeserverTestCase
class FederationCatchUpTestCases(FederatingHomeserverTestCase):
+ """
+ Tests cases of catching up over federation.
+
+ By default for test cases federation sending is disabled. This Test class has it
+ re-enabled for the main process.
+ """
+
servlets = [
admin.register_servlets,
room.register_servlets,
@@ -42,6 +50,11 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
self.record_transaction
)
+ def default_config(self) -> JsonDict:
+ config = super().default_config()
+ config["federation_sender_instances"] = None
+ return config
+
async def record_transaction(self, txn, json_cb):
if self.is_online:
data = json_cb()
@@ -79,7 +92,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
)[0]
return {"event_id": event_id, "stream_ordering": stream_ordering}
- @override_config({"send_federation": True})
def test_catch_up_destination_rooms_tracking(self):
"""
Tests that we populate the `destination_rooms` table as needed.
@@ -105,7 +117,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
self.assertEqual(row_2["event_id"], event_id_2)
self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1)
- @override_config({"send_federation": True})
def test_catch_up_last_successful_stream_ordering_tracking(self):
"""
Tests that we populate the `destination_rooms` table as needed.
@@ -163,7 +174,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
"Send succeeded but not marked as last_successful_stream_ordering",
)
- @override_config({"send_federation": True}) # critical to federate
def test_catch_up_from_blank_state(self):
"""
Runs an overall test of federation catch-up from scratch.
@@ -260,7 +270,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
return per_dest_queue, results_list
- @override_config({"send_federation": True})
def test_catch_up_loop(self):
"""
Tests the behaviour of _catch_up_transmission_loop.
@@ -325,7 +334,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
event_5.internal_metadata.stream_ordering,
)
- @override_config({"send_federation": True})
def test_catch_up_on_synapse_startup(self):
"""
Tests the behaviour of get_catch_up_outstanding_destinations and
@@ -424,7 +432,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
# - all destinations are woken exactly once; they appear once in woken.
self.assertCountEqual(woken, server_names[:-1])
- @override_config({"send_federation": True})
def test_not_latest_event(self):
"""Test that we send the latest event in the room even if its not ours."""
diff --git a/tests/federation/test_federation_client.py b/tests/federation/test_federation_client.py
index a538215931..e67f405826 100644
--- a/tests/federation/test_federation_client.py
+++ b/tests/federation/test_federation_client.py
@@ -12,13 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import json
from unittest import mock
import twisted.web.client
from twisted.internet import defer
-from twisted.internet.protocol import Protocol
-from twisted.python.failure import Failure
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.room_versions import RoomVersions
@@ -26,10 +23,9 @@ from synapse.events import EventBase
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
-from synapse.types import JsonDict
from synapse.util import Clock
-from tests.test_utils import event_injection
+from tests.test_utils import FakeResponse, event_injection
from tests.unittest import FederatingHomeserverTestCase
@@ -98,8 +94,8 @@ class FederationClientTest(FederatingHomeserverTestCase):
# mock up the response, and have the agent return it
self._mock_agent.request.side_effect = lambda *args, **kwargs: defer.succeed(
- _mock_response(
- {
+ FakeResponse.json(
+ payload={
"pdus": [
create_event_dict,
member_event_dict,
@@ -146,14 +142,14 @@ class FederationClientTest(FederatingHomeserverTestCase):
def test_get_pdu_returns_nothing_when_event_does_not_exist(self):
"""No event should be returned when the event does not exist"""
- remote_pdu = self.get_success(
+ pulled_pdu_info = self.get_success(
self.hs.get_federation_client().get_pdu(
["yet.another.server"],
"event_should_not_exist",
RoomVersions.V9,
)
)
- self.assertEqual(remote_pdu, None)
+ self.assertEqual(pulled_pdu_info, None)
def test_get_pdu(self):
"""Test to make sure an event is returned by `get_pdu()`"""
@@ -173,13 +169,15 @@ class FederationClientTest(FederatingHomeserverTestCase):
remote_pdu.internal_metadata.outlier = True
# Get the event again. This time it should read it from cache.
- remote_pdu2 = self.get_success(
+ pulled_pdu_info2 = self.get_success(
self.hs.get_federation_client().get_pdu(
["yet.another.server"],
remote_pdu.event_id,
RoomVersions.V9,
)
)
+ self.assertIsNotNone(pulled_pdu_info2)
+ remote_pdu2 = pulled_pdu_info2.pdu
# Sanity check that we are working against the same event
self.assertEqual(remote_pdu.event_id, remote_pdu2.event_id)
@@ -208,8 +206,8 @@ class FederationClientTest(FederatingHomeserverTestCase):
# mock up the response, and have the agent return it
self._mock_agent.request.side_effect = lambda *args, **kwargs: defer.succeed(
- _mock_response(
- {
+ FakeResponse.json(
+ payload={
"origin": "yet.another.server",
"origin_server_ts": 900,
"pdus": [
@@ -219,13 +217,15 @@ class FederationClientTest(FederatingHomeserverTestCase):
)
)
- remote_pdu = self.get_success(
+ pulled_pdu_info = self.get_success(
self.hs.get_federation_client().get_pdu(
["yet.another.server"],
"event_id",
RoomVersions.V9,
)
)
+ self.assertIsNotNone(pulled_pdu_info)
+ remote_pdu = pulled_pdu_info.pdu
# check the right call got made to the agent
self._mock_agent.request.assert_called_once_with(
@@ -269,8 +269,8 @@ class FederationClientTest(FederatingHomeserverTestCase):
# We expect an outbound request to /backfill, so stub that out
self._mock_agent.request.side_effect = lambda *args, **kwargs: defer.succeed(
- _mock_response(
- {
+ FakeResponse.json(
+ payload={
"origin": "yet.another.server",
"origin_server_ts": 900,
# Mimic the other server returning our new `pulled_event`
@@ -305,21 +305,3 @@ class FederationClientTest(FederatingHomeserverTestCase):
# This is 2 because it failed once from `self.OTHER_SERVER_NAME` and the
# other from "yet.another.server"
self.assertEqual(backfill_num_attempts, 2)
-
-
-def _mock_response(resp: JsonDict):
- body = json.dumps(resp).encode("utf-8")
-
- def deliver_body(p: Protocol):
- p.dataReceived(body)
- p.connectionLost(Failure(twisted.web.client.ResponseDone()))
-
- response = mock.Mock(
- code=200,
- phrase=b"OK",
- headers=twisted.web.client.Headers({"content-Type": ["application/json"]}),
- length=len(body),
- deliverBody=deliver_body,
- )
- mock.seal(response)
- return response
diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py
index f1e357764f..8692d8190f 100644
--- a/tests/federation/test_federation_sender.py
+++ b/tests/federation/test_federation_sender.py
@@ -25,10 +25,17 @@ from synapse.rest.client import login
from synapse.types import JsonDict, ReadReceipt
from tests.test_utils import make_awaitable
-from tests.unittest import HomeserverTestCase, override_config
+from tests.unittest import HomeserverTestCase
class FederationSenderReceiptsTestCases(HomeserverTestCase):
+ """
+ Test federation sending to update receipts.
+
+ By default for test cases federation sending is disabled. This Test class has it
+ re-enabled for the main process.
+ """
+
def make_homeserver(self, reactor, clock):
hs = self.setup_test_homeserver(
federation_transport_client=Mock(spec=["send_transaction"]),
@@ -38,9 +45,17 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
return_value=make_awaitable({"test", "host2"})
)
+ hs.get_storage_controllers().state.get_current_hosts_in_room_or_partial_state_approximation = (
+ hs.get_storage_controllers().state.get_current_hosts_in_room
+ )
+
return hs
- @override_config({"send_federation": True})
+ def default_config(self) -> JsonDict:
+ config = super().default_config()
+ config["federation_sender_instances"] = None
+ return config
+
def test_send_receipts(self):
mock_send_transaction = (
self.hs.get_federation_transport_client().send_transaction
@@ -83,7 +98,82 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
],
)
- @override_config({"send_federation": True})
+ def test_send_receipts_thread(self):
+ mock_send_transaction = (
+ self.hs.get_federation_transport_client().send_transaction
+ )
+ mock_send_transaction.return_value = make_awaitable({})
+
+ # Create receipts for:
+ #
+ # * The same room / user on multiple threads.
+ # * A different user in the same room.
+ sender = self.hs.get_federation_sender()
+ for user, thread in (
+ ("alice", None),
+ ("alice", "thread"),
+ ("bob", None),
+ ("bob", "diff-thread"),
+ ):
+ receipt = ReadReceipt(
+ "room_id",
+ "m.read",
+ user,
+ ["event_id"],
+ thread_id=thread,
+ data={"ts": 1234},
+ )
+ self.successResultOf(
+ defer.ensureDeferred(sender.send_read_receipt(receipt))
+ )
+
+ self.pump()
+
+ # expect a call to send_transaction with two EDUs to separate threads.
+ mock_send_transaction.assert_called_once()
+ json_cb = mock_send_transaction.call_args[0][1]
+ data = json_cb()
+ # Note that the ordering of the EDUs doesn't matter.
+ self.assertCountEqual(
+ data["edus"],
+ [
+ {
+ "edu_type": EduTypes.RECEIPT,
+ "content": {
+ "room_id": {
+ "m.read": {
+ "alice": {
+ "event_ids": ["event_id"],
+ "data": {"ts": 1234, "thread_id": "thread"},
+ },
+ "bob": {
+ "event_ids": ["event_id"],
+ "data": {"ts": 1234, "thread_id": "diff-thread"},
+ },
+ }
+ }
+ },
+ },
+ {
+ "edu_type": EduTypes.RECEIPT,
+ "content": {
+ "room_id": {
+ "m.read": {
+ "alice": {
+ "event_ids": ["event_id"],
+ "data": {"ts": 1234},
+ },
+ "bob": {
+ "event_ids": ["event_id"],
+ "data": {"ts": 1234},
+ },
+ }
+ }
+ },
+ },
+ ],
+ )
+
def test_send_receipts_with_backoff(self):
"""Send two receipts in quick succession; the second should be flushed, but
only after 20ms"""
@@ -170,6 +260,13 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
class FederationSenderDevicesTestCases(HomeserverTestCase):
+ """
+ Test federation sending to update devices.
+
+ By default for test cases federation sending is disabled. This Test class has it
+ re-enabled for the main process.
+ """
+
servlets = [
admin.register_servlets,
login.register_servlets,
@@ -184,7 +281,8 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
def default_config(self):
c = super().default_config()
- c["send_federation"] = True
+ # Enable federation sending on the main process.
+ c["federation_sender_instances"] = None
return c
def prepare(self, reactor, clock, hs):
diff --git a/tests/federation/test_federation_server.py b/tests/federation/test_federation_server.py
index 3a6ef221ae..177e5b5afc 100644
--- a/tests/federation/test_federation_server.py
+++ b/tests/federation/test_federation_server.py
@@ -212,7 +212,7 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
self.assertEqual(r[("m.room.member", joining_user)].membership, "join")
@override_config({"experimental_features": {"msc3706_enabled": True}})
- def test_send_join_partial_state(self):
+ def test_send_join_partial_state(self) -> None:
"""When MSC3706 support is enabled, /send_join should return partial state"""
joining_user = "@misspiggy:" + self.OTHER_SERVER_NAME
join_result = self._make_join(joining_user)
@@ -240,6 +240,9 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
("m.room.power_levels", ""),
("m.room.join_rules", ""),
("m.room.history_visibility", ""),
+ # Users included here because they're heroes.
+ ("m.room.member", "@kermit:test"),
+ ("m.room.member", "@fozzie:test"),
],
)
@@ -249,9 +252,9 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
]
self.assertCountEqual(
returned_auth_chain_events,
- [
- ("m.room.member", "@kermit:test"),
- ],
+ # TODO: change the test so that we get at least one event in the auth chain
+ # here.
+ [],
)
# the room should show that the new user is a member
diff --git a/tests/federation/transport/test_client.py b/tests/federation/transport/test_client.py
index 0926e0583d..b84c74fc0e 100644
--- a/tests/federation/transport/test_client.py
+++ b/tests/federation/transport/test_client.py
@@ -15,8 +15,11 @@
import json
from unittest.mock import Mock
+import ijson.common
+
from synapse.api.room_versions import RoomVersions
from synapse.federation.transport.client import SendJoinParser
+from synapse.util import ExceptionBundle
from tests.unittest import TestCase
@@ -116,15 +119,22 @@ class SendJoinParserTestCase(TestCase):
coro_3 = Mock()
coro_3.close = Mock(side_effect=RuntimeError("Couldn't close coro 3"))
+ original_coros = parser._coros
parser._coros = [coro_1, coro_2, coro_3]
+ # Close the original coroutines. If we don't, when we garbage collect them
+ # they will throw, failing the test. (Oddly, this only started in CPython 3.11).
+ for coro in original_coros:
+ try:
+ coro.close()
+ except ijson.common.IncompleteJSONError:
+ pass
+
# Send half of the data to the parser
parser.write(serialisation[: len(serialisation) // 2])
- # Close the parser. There should be _some_ kind of exception, but it need not
- # be that RuntimeError directly. E.g. we might want to raise a wrapper
- # encompassing multiple errors from multiple coroutines.
- with self.assertRaises(Exception):
+ # Close the parser. There should be _some_ kind of exception.
+ with self.assertRaises(ExceptionBundle):
parser.finish()
# In any case, we should have tried to close both coros.
|