diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py
index 51c8dd6498..e448cb1901 100644
--- a/tests/handlers/test_federation_event.py
+++ b/tests/handlers/test_federation_event.py
@@ -11,14 +11,23 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from typing import Optional
from unittest import mock
+from synapse.api.errors import AuthError, StoreError
+from synapse.api.room_versions import RoomVersion
+from synapse.event_auth import (
+ check_state_dependent_auth_rules,
+ check_state_independent_auth_rules,
+)
from synapse.events import make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.federation.transport.client import StateRequestResponse
from synapse.logging.context import LoggingContext
from synapse.rest import admin
from synapse.rest.client import login, room
+from synapse.state.v2 import _mainline_sort, _reverse_topological_power_sort
+from synapse.types import JsonDict
from tests import unittest
from tests.test_utils import event_injection, make_awaitable
@@ -34,7 +43,7 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
def make_homeserver(self, reactor, clock):
# mock out the federation transport client
self.mock_federation_transport_client = mock.Mock(
- spec=["get_room_state_ids", "get_room_state", "get_event"]
+ spec=["get_room_state_ids", "get_room_state", "get_event", "backfill"]
)
return super().setup_test_homeserver(
federation_transport_client=self.mock_federation_transport_client
@@ -227,3 +236,812 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
if prev_exists_as_outlier:
self.mock_federation_transport_client.get_event.assert_not_called()
+
+ def test_process_pulled_event_records_failed_backfill_attempts(
+ self,
+ ) -> None:
+ """
+ Test to make sure that failed backfill attempts for an event are
+ recorded in the `event_failed_pull_attempts` table.
+
+ In this test, we pretend we are processing a "pulled" event via
+ backfill. The pulled event has a fake `prev_event` which our server has
+ obviously never seen before so it attempts to request the state at that
+ `prev_event` which expectedly fails because it's a fake event. Because
+ the server can't fetch the state at the missing `prev_event`, the
+ "pulled" event fails the history check and is fails to process.
+
+ We check that we correctly record the number of failed pull attempts
+ of the pulled event and as a sanity check, that the "pulled" event isn't
+ persisted.
+ """
+ OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
+ main_store = self.hs.get_datastores().main
+
+ # Create the room
+ user_id = self.register_user("kermit", "test")
+ tok = self.login("kermit", "test")
+ room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
+ room_version = self.get_success(main_store.get_room_version(room_id))
+
+ # We expect an outbound request to /state_ids, so stub that out
+ self.mock_federation_transport_client.get_room_state_ids.return_value = make_awaitable(
+ {
+ # Mimic the other server not knowing about the state at all.
+ # We want to cause Synapse to throw an error (`Unable to get
+ # missing prev_event $fake_prev_event`) and fail to backfill
+ # the pulled event.
+ "pdu_ids": [],
+ "auth_chain_ids": [],
+ }
+ )
+ # We also expect an outbound request to /state
+ self.mock_federation_transport_client.get_room_state.return_value = make_awaitable(
+ StateRequestResponse(
+ # Mimic the other server not knowing about the state at all.
+ # We want to cause Synapse to throw an error (`Unable to get
+ # missing prev_event $fake_prev_event`) and fail to backfill
+ # the pulled event.
+ auth_events=[],
+ state=[],
+ )
+ )
+
+ pulled_event = make_event_from_dict(
+ self.add_hashes_and_signatures_from_other_server(
+ {
+ "type": "test_regular_type",
+ "room_id": room_id,
+ "sender": OTHER_USER,
+ "prev_events": [
+ # The fake prev event will make the pulled event fail
+ # the history check (`Unable to get missing prev_event
+ # $fake_prev_event`)
+ "$fake_prev_event"
+ ],
+ "auth_events": [],
+ "origin_server_ts": 1,
+ "depth": 12,
+ "content": {"body": "pulled"},
+ }
+ ),
+ room_version,
+ )
+
+ # The function under test: try to process the pulled event
+ with LoggingContext("test"):
+ self.get_success(
+ self.hs.get_federation_event_handler()._process_pulled_event(
+ self.OTHER_SERVER_NAME, pulled_event, backfilled=True
+ )
+ )
+
+ # Make sure our failed pull attempt was recorded
+ backfill_num_attempts = self.get_success(
+ main_store.db_pool.simple_select_one_onecol(
+ table="event_failed_pull_attempts",
+ keyvalues={"event_id": pulled_event.event_id},
+ retcol="num_attempts",
+ )
+ )
+ self.assertEqual(backfill_num_attempts, 1)
+
+ # The function under test: try to process the pulled event again
+ with LoggingContext("test"):
+ self.get_success(
+ self.hs.get_federation_event_handler()._process_pulled_event(
+ self.OTHER_SERVER_NAME, pulled_event, backfilled=True
+ )
+ )
+
+ # Make sure our second failed pull attempt was recorded (`num_attempts` was incremented)
+ backfill_num_attempts = self.get_success(
+ main_store.db_pool.simple_select_one_onecol(
+ table="event_failed_pull_attempts",
+ keyvalues={"event_id": pulled_event.event_id},
+ retcol="num_attempts",
+ )
+ )
+ self.assertEqual(backfill_num_attempts, 2)
+
+ # And as a sanity check, make sure the event was not persisted through all of this.
+ persisted = self.get_success(
+ main_store.get_event(pulled_event.event_id, allow_none=True)
+ )
+ self.assertIsNone(
+ persisted,
+ "pulled event that fails the history check should not be persisted at all",
+ )
+
+ def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_persisted(
+ self,
+ ) -> None:
+ """
+ Test to make sure that failed pull attempts
+ (`event_failed_pull_attempts` table) for an event are cleared after the
+ event is successfully persisted.
+
+ In this test, we pretend we are processing a "pulled" event via
+ backfill. The pulled event succesfully processes and the backward
+ extremeties are updated along with clearing out any failed pull attempts
+ for those old extremities.
+
+ We check that we correctly cleared failed pull attempts of the
+ pulled event.
+ """
+ OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
+ main_store = self.hs.get_datastores().main
+
+ # Create the room
+ user_id = self.register_user("kermit", "test")
+ tok = self.login("kermit", "test")
+ room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
+ room_version = self.get_success(main_store.get_room_version(room_id))
+
+ # allow the remote user to send state events
+ self.helper.send_state(
+ room_id,
+ "m.room.power_levels",
+ {"events_default": 0, "state_default": 0},
+ tok=tok,
+ )
+
+ # add the remote user to the room
+ member_event = self.get_success(
+ event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
+ )
+
+ initial_state_map = self.get_success(
+ main_store.get_partial_current_state_ids(room_id)
+ )
+
+ auth_event_ids = [
+ initial_state_map[("m.room.create", "")],
+ initial_state_map[("m.room.power_levels", "")],
+ member_event.event_id,
+ ]
+
+ pulled_event = make_event_from_dict(
+ self.add_hashes_and_signatures_from_other_server(
+ {
+ "type": "test_regular_type",
+ "room_id": room_id,
+ "sender": OTHER_USER,
+ "prev_events": [member_event.event_id],
+ "auth_events": auth_event_ids,
+ "origin_server_ts": 1,
+ "depth": 12,
+ "content": {"body": "pulled"},
+ }
+ ),
+ room_version,
+ )
+
+ # Fake the "pulled" event failing to backfill once so we can test
+ # if it's cleared out later on.
+ self.get_success(
+ main_store.record_event_failed_pull_attempt(
+ pulled_event.room_id, pulled_event.event_id, "fake cause"
+ )
+ )
+ # Make sure we have a failed pull attempt recorded for the pulled event
+ backfill_num_attempts = self.get_success(
+ main_store.db_pool.simple_select_one_onecol(
+ table="event_failed_pull_attempts",
+ keyvalues={"event_id": pulled_event.event_id},
+ retcol="num_attempts",
+ )
+ )
+ self.assertEqual(backfill_num_attempts, 1)
+
+ # The function under test: try to process the pulled event
+ with LoggingContext("test"):
+ self.get_success(
+ self.hs.get_federation_event_handler()._process_pulled_event(
+ self.OTHER_SERVER_NAME, pulled_event, backfilled=True
+ )
+ )
+
+ # Make sure the failed pull attempts for the pulled event are cleared
+ backfill_num_attempts = self.get_success(
+ main_store.db_pool.simple_select_one_onecol(
+ table="event_failed_pull_attempts",
+ keyvalues={"event_id": pulled_event.event_id},
+ retcol="num_attempts",
+ allow_none=True,
+ )
+ )
+ self.assertIsNone(backfill_num_attempts)
+
+ # And as a sanity check, make sure the "pulled" event was persisted.
+ persisted = self.get_success(
+ main_store.get_event(pulled_event.event_id, allow_none=True)
+ )
+ self.assertIsNotNone(persisted, "pulled event was not persisted at all")
+
+ def test_backfill_signature_failure_does_not_fetch_same_prev_event_later(
+ self,
+ ) -> None:
+ """
+ Test to make sure we backoff and don't try to fetch a missing prev_event when we
+ already know it has a invalid signature from checking the signatures of all of
+ the events in the backfill response.
+ """
+ OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
+ main_store = self.hs.get_datastores().main
+
+ # Create the room
+ user_id = self.register_user("kermit", "test")
+ tok = self.login("kermit", "test")
+ room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
+ room_version = self.get_success(main_store.get_room_version(room_id))
+
+ # Allow the remote user to send state events
+ self.helper.send_state(
+ room_id,
+ "m.room.power_levels",
+ {"events_default": 0, "state_default": 0},
+ tok=tok,
+ )
+
+ # Add the remote user to the room
+ member_event = self.get_success(
+ event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
+ )
+
+ initial_state_map = self.get_success(
+ main_store.get_partial_current_state_ids(room_id)
+ )
+
+ auth_event_ids = [
+ initial_state_map[("m.room.create", "")],
+ initial_state_map[("m.room.power_levels", "")],
+ member_event.event_id,
+ ]
+
+ # We purposely don't run `add_hashes_and_signatures_from_other_server`
+ # over this because we want the signature check to fail.
+ pulled_event_without_signatures = make_event_from_dict(
+ {
+ "type": "test_regular_type",
+ "room_id": room_id,
+ "sender": OTHER_USER,
+ "prev_events": [member_event.event_id],
+ "auth_events": auth_event_ids,
+ "origin_server_ts": 1,
+ "depth": 12,
+ "content": {"body": "pulled_event_without_signatures"},
+ },
+ room_version,
+ )
+
+ # Create a regular event that should pass except for the
+ # `pulled_event_without_signatures` in the `prev_event`.
+ pulled_event = make_event_from_dict(
+ self.add_hashes_and_signatures_from_other_server(
+ {
+ "type": "test_regular_type",
+ "room_id": room_id,
+ "sender": OTHER_USER,
+ "prev_events": [
+ member_event.event_id,
+ pulled_event_without_signatures.event_id,
+ ],
+ "auth_events": auth_event_ids,
+ "origin_server_ts": 1,
+ "depth": 12,
+ "content": {"body": "pulled_event"},
+ }
+ ),
+ room_version,
+ )
+
+ # We expect an outbound request to /backfill, so stub that out
+ self.mock_federation_transport_client.backfill.return_value = make_awaitable(
+ {
+ "origin": self.OTHER_SERVER_NAME,
+ "origin_server_ts": 123,
+ "pdus": [
+ # This is one of the important aspects of this test: we include
+ # `pulled_event_without_signatures` so it fails the signature check
+ # when we filter down the backfill response down to events which
+ # have valid signatures in
+ # `_check_sigs_and_hash_for_pulled_events_and_fetch`
+ pulled_event_without_signatures.get_pdu_json(),
+ # Then later when we process this valid signature event, when we
+ # fetch the missing `prev_event`s, we want to make sure that we
+ # backoff and don't try and fetch `pulled_event_without_signatures`
+ # again since we know it just had an invalid signature.
+ pulled_event.get_pdu_json(),
+ ],
+ }
+ )
+
+ # Keep track of the count and make sure we don't make any of these requests
+ event_endpoint_requested_count = 0
+ room_state_ids_endpoint_requested_count = 0
+ room_state_endpoint_requested_count = 0
+
+ async def get_event(
+ destination: str, event_id: str, timeout: Optional[int] = None
+ ) -> None:
+ nonlocal event_endpoint_requested_count
+ event_endpoint_requested_count += 1
+
+ async def get_room_state_ids(
+ destination: str, room_id: str, event_id: str
+ ) -> None:
+ nonlocal room_state_ids_endpoint_requested_count
+ room_state_ids_endpoint_requested_count += 1
+
+ async def get_room_state(
+ room_version: RoomVersion, destination: str, room_id: str, event_id: str
+ ) -> None:
+ nonlocal room_state_endpoint_requested_count
+ room_state_endpoint_requested_count += 1
+
+ # We don't expect an outbound request to `/event`, `/state_ids`, or `/state` in
+ # the happy path but if the logic is sneaking around what we expect, stub that
+ # out so we can detect that failure
+ self.mock_federation_transport_client.get_event.side_effect = get_event
+ self.mock_federation_transport_client.get_room_state_ids.side_effect = (
+ get_room_state_ids
+ )
+ self.mock_federation_transport_client.get_room_state.side_effect = (
+ get_room_state
+ )
+
+ # The function under test: try to backfill and process the pulled event
+ with LoggingContext("test"):
+ self.get_success(
+ self.hs.get_federation_event_handler().backfill(
+ self.OTHER_SERVER_NAME,
+ room_id,
+ limit=1,
+ extremities=["$some_extremity"],
+ )
+ )
+
+ if event_endpoint_requested_count > 0:
+ self.fail(
+ "We don't expect an outbound request to /event in the happy path but if "
+ "the logic is sneaking around what we expect, make sure to fail the test. "
+ "We don't expect it because the signature failure should cause us to backoff "
+ "and not asking about pulled_event_without_signatures="
+ f"{pulled_event_without_signatures.event_id} again"
+ )
+
+ if room_state_ids_endpoint_requested_count > 0:
+ self.fail(
+ "We don't expect an outbound request to /state_ids in the happy path but if "
+ "the logic is sneaking around what we expect, make sure to fail the test. "
+ "We don't expect it because the signature failure should cause us to backoff "
+ "and not asking about pulled_event_without_signatures="
+ f"{pulled_event_without_signatures.event_id} again"
+ )
+
+ if room_state_endpoint_requested_count > 0:
+ self.fail(
+ "We don't expect an outbound request to /state in the happy path but if "
+ "the logic is sneaking around what we expect, make sure to fail the test. "
+ "We don't expect it because the signature failure should cause us to backoff "
+ "and not asking about pulled_event_without_signatures="
+ f"{pulled_event_without_signatures.event_id} again"
+ )
+
+ # Make sure we only recorded a single failure which corresponds to the signature
+ # failure initially in `_check_sigs_and_hash_for_pulled_events_and_fetch` before
+ # we process all of the pulled events.
+ backfill_num_attempts_for_event_without_signatures = self.get_success(
+ main_store.db_pool.simple_select_one_onecol(
+ table="event_failed_pull_attempts",
+ keyvalues={"event_id": pulled_event_without_signatures.event_id},
+ retcol="num_attempts",
+ )
+ )
+ self.assertEqual(backfill_num_attempts_for_event_without_signatures, 1)
+
+ # And make sure we didn't record a failure for the event that has the missing
+ # prev_event because we don't want to cause a cascade of failures. Not being
+ # able to fetch the `prev_events` just means we won't be able to de-outlier the
+ # pulled event. But we can still use an `outlier` in the state/auth chain for
+ # another event. So we shouldn't stop a downstream event from trying to pull it.
+ self.get_failure(
+ main_store.db_pool.simple_select_one_onecol(
+ table="event_failed_pull_attempts",
+ keyvalues={"event_id": pulled_event.event_id},
+ retcol="num_attempts",
+ ),
+ # StoreError: 404: No row found
+ StoreError,
+ )
+
+ def test_process_pulled_event_with_rejected_missing_state(self) -> None:
+ """Ensure that we correctly handle pulled events with missing state containing a
+ rejected state event
+
+ In this test, we pretend we are processing a "pulled" event (eg, via backfill
+ or get_missing_events). The pulled event has a prev_event we haven't previously
+ seen, so the server requests the state at that prev_event. We expect the server
+ to make a /state request.
+
+ We simulate a remote server whose /state includes a rejected kick event for a
+ local user. Notably, the kick event is rejected only because it cites a rejected
+ auth event and would otherwise be accepted based on the room state. During state
+ resolution, we re-run auth and can potentially introduce such rejected events
+ into the state if we are not careful.
+
+ We check that the pulled event is correctly persisted, and that the state
+ afterwards does not include the rejected kick.
+ """
+ # The DAG we are testing looks like:
+ #
+ # ...
+ # |
+ # v
+ # remote admin user joins
+ # | |
+ # +-------+ +-------+
+ # | |
+ # | rejected power levels
+ # | from remote server
+ # | |
+ # | v
+ # | rejected kick of local user
+ # v from remote server
+ # new power levels |
+ # | v
+ # | missing event
+ # | from remote server
+ # | |
+ # +-------+ +-------+
+ # | |
+ # v v
+ # pulled event
+ # from remote server
+ #
+ # (arrows are in the opposite direction to prev_events.)
+
+ OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
+ main_store = self.hs.get_datastores().main
+
+ # Create the room.
+ kermit_user_id = self.register_user("kermit", "test")
+ kermit_tok = self.login("kermit", "test")
+ room_id = self.helper.create_room_as(
+ room_creator=kermit_user_id, tok=kermit_tok
+ )
+ room_version = self.get_success(main_store.get_room_version(room_id))
+
+ # Add another local user to the room. This user is going to be kicked in a
+ # rejected event.
+ bert_user_id = self.register_user("bert", "test")
+ bert_tok = self.login("bert", "test")
+ self.helper.join(room_id, user=bert_user_id, tok=bert_tok)
+
+ # Allow the remote user to kick bert.
+ # The remote user is going to send a rejected power levels event later on and we
+ # need state resolution to order it before another power levels event kermit is
+ # going to send later on. Hence we give both users the same power level, so that
+ # ties are broken by `origin_server_ts`.
+ self.helper.send_state(
+ room_id,
+ "m.room.power_levels",
+ {"users": {kermit_user_id: 100, OTHER_USER: 100}},
+ tok=kermit_tok,
+ )
+
+ # Add the remote user to the room.
+ other_member_event = self.get_success(
+ event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
+ )
+
+ initial_state_map = self.get_success(
+ main_store.get_partial_current_state_ids(room_id)
+ )
+ create_event = self.get_success(
+ main_store.get_event(initial_state_map[("m.room.create", "")])
+ )
+ bert_member_event = self.get_success(
+ main_store.get_event(initial_state_map[("m.room.member", bert_user_id)])
+ )
+ power_levels_event = self.get_success(
+ main_store.get_event(initial_state_map[("m.room.power_levels", "")])
+ )
+
+ # We now need a rejected state event that will fail
+ # `check_state_independent_auth_rules` but pass
+ # `check_state_dependent_auth_rules`.
+
+ # First, we create a power levels event that we pretend the remote server has
+ # accepted, but the local homeserver will reject.
+ next_depth = 100
+ next_timestamp = other_member_event.origin_server_ts + 100
+ rejected_power_levels_event = make_event_from_dict(
+ self.add_hashes_and_signatures_from_other_server(
+ {
+ "type": "m.room.power_levels",
+ "state_key": "",
+ "room_id": room_id,
+ "sender": OTHER_USER,
+ "prev_events": [other_member_event.event_id],
+ "auth_events": [
+ initial_state_map[("m.room.create", "")],
+ initial_state_map[("m.room.power_levels", "")],
+ # The event will be rejected because of the duplicated auth
+ # event.
+ other_member_event.event_id,
+ other_member_event.event_id,
+ ],
+ "origin_server_ts": next_timestamp,
+ "depth": next_depth,
+ "content": power_levels_event.content,
+ }
+ ),
+ room_version,
+ )
+ next_depth += 1
+ next_timestamp += 100
+
+ with LoggingContext("send_rejected_power_levels_event"):
+ self.get_success(
+ self.hs.get_federation_event_handler()._process_pulled_event(
+ self.OTHER_SERVER_NAME,
+ rejected_power_levels_event,
+ backfilled=False,
+ )
+ )
+ self.assertEqual(
+ self.get_success(
+ main_store.get_rejection_reason(
+ rejected_power_levels_event.event_id
+ )
+ ),
+ "auth_error",
+ )
+
+ # Then we create a kick event for a local user that cites the rejected power
+ # levels event in its auth events. The kick event will be rejected solely
+ # because of the rejected auth event and would otherwise be accepted.
+ rejected_kick_event = make_event_from_dict(
+ self.add_hashes_and_signatures_from_other_server(
+ {
+ "type": "m.room.member",
+ "state_key": bert_user_id,
+ "room_id": room_id,
+ "sender": OTHER_USER,
+ "prev_events": [rejected_power_levels_event.event_id],
+ "auth_events": [
+ initial_state_map[("m.room.create", "")],
+ rejected_power_levels_event.event_id,
+ initial_state_map[("m.room.member", bert_user_id)],
+ initial_state_map[("m.room.member", OTHER_USER)],
+ ],
+ "origin_server_ts": next_timestamp,
+ "depth": next_depth,
+ "content": {"membership": "leave"},
+ }
+ ),
+ room_version,
+ )
+ next_depth += 1
+ next_timestamp += 100
+
+ # The kick event must fail the state-independent auth rules, but pass the
+ # state-dependent auth rules, so that it has a chance of making it through state
+ # resolution.
+ self.get_failure(
+ check_state_independent_auth_rules(main_store, rejected_kick_event),
+ AuthError,
+ )
+ check_state_dependent_auth_rules(
+ rejected_kick_event,
+ [create_event, power_levels_event, other_member_event, bert_member_event],
+ )
+
+ # The kick event must also win over the original member event during state
+ # resolution.
+ self.assertEqual(
+ self.get_success(
+ _mainline_sort(
+ self.clock,
+ room_id,
+ event_ids=[
+ bert_member_event.event_id,
+ rejected_kick_event.event_id,
+ ],
+ resolved_power_event_id=power_levels_event.event_id,
+ event_map={
+ bert_member_event.event_id: bert_member_event,
+ rejected_kick_event.event_id: rejected_kick_event,
+ },
+ state_res_store=main_store,
+ )
+ ),
+ [bert_member_event.event_id, rejected_kick_event.event_id],
+ "The rejected kick event will not be applied after bert's join event "
+ "during state resolution. The test setup is incorrect.",
+ )
+
+ with LoggingContext("send_rejected_kick_event"):
+ self.get_success(
+ self.hs.get_federation_event_handler()._process_pulled_event(
+ self.OTHER_SERVER_NAME, rejected_kick_event, backfilled=False
+ )
+ )
+ self.assertEqual(
+ self.get_success(
+ main_store.get_rejection_reason(rejected_kick_event.event_id)
+ ),
+ "auth_error",
+ )
+
+ # We need another power levels event which will win over the rejected one during
+ # state resolution, otherwise we hit other issues where we end up with rejected
+ # a power levels event during state resolution.
+ self.reactor.advance(100) # ensure the `origin_server_ts` is larger
+ new_power_levels_event = self.get_success(
+ main_store.get_event(
+ self.helper.send_state(
+ room_id,
+ "m.room.power_levels",
+ {"users": {kermit_user_id: 100, OTHER_USER: 100, bert_user_id: 1}},
+ tok=kermit_tok,
+ )["event_id"]
+ )
+ )
+ self.assertEqual(
+ self.get_success(
+ _reverse_topological_power_sort(
+ self.clock,
+ room_id,
+ event_ids=[
+ new_power_levels_event.event_id,
+ rejected_power_levels_event.event_id,
+ ],
+ event_map={},
+ state_res_store=main_store,
+ full_conflicted_set=set(),
+ )
+ ),
+ [rejected_power_levels_event.event_id, new_power_levels_event.event_id],
+ "The power levels events will not have the desired ordering during state "
+ "resolution. The test setup is incorrect.",
+ )
+
+ # Create a missing event, so that the local homeserver has to do a `/state` or
+ # `/state_ids` request to pull state from the remote homeserver.
+ missing_event = make_event_from_dict(
+ self.add_hashes_and_signatures_from_other_server(
+ {
+ "type": "m.room.message",
+ "room_id": room_id,
+ "sender": OTHER_USER,
+ "prev_events": [rejected_kick_event.event_id],
+ "auth_events": [
+ initial_state_map[("m.room.create", "")],
+ initial_state_map[("m.room.power_levels", "")],
+ initial_state_map[("m.room.member", OTHER_USER)],
+ ],
+ "origin_server_ts": next_timestamp,
+ "depth": next_depth,
+ "content": {"msgtype": "m.text", "body": "foo"},
+ }
+ ),
+ room_version,
+ )
+ next_depth += 1
+ next_timestamp += 100
+
+ # The pulled event has two prev events, one of which is missing. We will make a
+ # `/state` or `/state_ids` request to the remote homeserver to ask it for the
+ # state before the missing prev event.
+ pulled_event = make_event_from_dict(
+ self.add_hashes_and_signatures_from_other_server(
+ {
+ "type": "m.room.message",
+ "room_id": room_id,
+ "sender": OTHER_USER,
+ "prev_events": [
+ new_power_levels_event.event_id,
+ missing_event.event_id,
+ ],
+ "auth_events": [
+ initial_state_map[("m.room.create", "")],
+ new_power_levels_event.event_id,
+ initial_state_map[("m.room.member", OTHER_USER)],
+ ],
+ "origin_server_ts": next_timestamp,
+ "depth": next_depth,
+ "content": {"msgtype": "m.text", "body": "bar"},
+ }
+ ),
+ room_version,
+ )
+ next_depth += 1
+ next_timestamp += 100
+
+ # Prepare the response for the `/state` or `/state_ids` request.
+ # The remote server believes bert has been kicked, while the local server does
+ # not.
+ state_before_missing_event = self.get_success(
+ main_store.get_events_as_list(initial_state_map.values())
+ )
+ state_before_missing_event = [
+ event
+ for event in state_before_missing_event
+ if event.event_id != bert_member_event.event_id
+ ]
+ state_before_missing_event.append(rejected_kick_event)
+
+ # We have to bump the clock a bit, to keep the retry logic in
+ # `FederationClient.get_pdu` happy
+ self.reactor.advance(60000)
+ with LoggingContext("send_pulled_event"):
+
+ async def get_event(
+ destination: str, event_id: str, timeout: Optional[int] = None
+ ) -> JsonDict:
+ self.assertEqual(destination, self.OTHER_SERVER_NAME)
+ self.assertEqual(event_id, missing_event.event_id)
+ return {"pdus": [missing_event.get_pdu_json()]}
+
+ async def get_room_state_ids(
+ destination: str, room_id: str, event_id: str
+ ) -> JsonDict:
+ self.assertEqual(destination, self.OTHER_SERVER_NAME)
+ self.assertEqual(event_id, missing_event.event_id)
+ return {
+ "pdu_ids": [event.event_id for event in state_before_missing_event],
+ "auth_chain_ids": [],
+ }
+
+ async def get_room_state(
+ room_version: RoomVersion, destination: str, room_id: str, event_id: str
+ ) -> StateRequestResponse:
+ self.assertEqual(destination, self.OTHER_SERVER_NAME)
+ self.assertEqual(event_id, missing_event.event_id)
+ return StateRequestResponse(
+ state=state_before_missing_event,
+ auth_events=[],
+ )
+
+ self.mock_federation_transport_client.get_event.side_effect = get_event
+ self.mock_federation_transport_client.get_room_state_ids.side_effect = (
+ get_room_state_ids
+ )
+ self.mock_federation_transport_client.get_room_state.side_effect = (
+ get_room_state
+ )
+
+ self.get_success(
+ self.hs.get_federation_event_handler()._process_pulled_event(
+ self.OTHER_SERVER_NAME, pulled_event, backfilled=False
+ )
+ )
+ self.assertIsNone(
+ self.get_success(
+ main_store.get_rejection_reason(pulled_event.event_id)
+ ),
+ "Pulled event was unexpectedly rejected, likely due to a problem with "
+ "the test setup.",
+ )
+ self.assertEqual(
+ {pulled_event.event_id},
+ self.get_success(
+ main_store.have_events_in_timeline([pulled_event.event_id])
+ ),
+ "Pulled event was not persisted, likely due to a problem with the test "
+ "setup.",
+ )
+
+ # We must not accept rejected events into the room state, so we expect bert
+ # to not be kicked, even if the remote server believes so.
+ new_state_map = self.get_success(
+ main_store.get_partial_current_state_ids(room_id)
+ )
+ self.assertEqual(
+ new_state_map[("m.room.member", bert_user_id)],
+ bert_member_event.event_id,
+ "Rejected kick event unexpectedly became part of room state.",
+ )
|