diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index d4dd967c60..4096093527 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -549,7 +549,9 @@ class FederationServer(FederationBase):
affected=pdu.event_id,
)
- yield self.handler.on_receive_pdu(origin, pdu, get_missing=True)
+ yield self.handler.on_receive_pdu(
+ origin, pdu, get_missing=True, sent_to_us_directly=True,
+ )
def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b6f8d4cf82..250a5509d8 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -44,6 +44,7 @@ from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
compute_event_signature, add_hashes_and_signatures,
)
+from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
from synapse.events.utils import prune_event
@@ -89,7 +90,9 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
- def on_receive_pdu(self, origin, pdu, get_missing=True):
+ def on_receive_pdu(
+ self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+ ):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@@ -163,7 +166,7 @@ class FederationHandler(BaseHandler):
"Ignoring PDU %s for room %s from %s as we've left the room!",
pdu.event_id, pdu.room_id, origin,
)
- return
+ defer.returnValue(None)
state = None
@@ -225,26 +228,54 @@ class FederationHandler(BaseHandler):
list(prevs - seen)[:5],
)
- if prevs - seen:
- logger.info(
- "Still missing %d events for room %r: %r...",
- len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ if sent_to_us_directly and prevs - seen:
+ # If they have sent it to us directly, and the server
+ # isn't telling us about the auth events that it's
+ # made a message referencing, we explode
+ raise FederationError(
+ "ERROR",
+ 403,
+ ("Your server isn't divulging details about prev_events "
+ "referenced in this event."),
+ affected=pdu.event_id,
)
- fetch_state = True
+ elif prevs - seen:
+ # If we're walking back up the chain to fetch it, then
+ # try and find the states. If we can't get the states,
+ # discard it.
+ state_groups = []
+ auth_chains = set()
+ try:
+ # Get the ones we know about
+ ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+ state_groups.append(ours)
- if fetch_state:
- # We need to get the state at this event, since we haven't
- # processed all the prev events.
- logger.debug(
- "_handle_new_pdu getting state for %s",
- pdu.room_id
- )
- try:
- state, auth_chain = yield self.replication_layer.get_state_for_room(
- origin, pdu.room_id, pdu.event_id,
- )
- except Exception:
- logger.exception("Failed to get state for event: %s", pdu.event_id)
+ for p in prevs - seen:
+ state, auth_chain = yield self.replication_layer.get_state_for_room(
+ origin, pdu.room_id, p
+ )
+ auth_chains.update(auth_chain)
+ state_group = {
+ (x.type, x.state_key): x.event_id for x in state
+ }
+ state_groups.append(state_group)
+
+ def fetch(ev_ids):
+ return self.store.get_events(
+ ev_ids, get_prev_content=False, check_redacted=False,
+ )
+
+ state = yield resolve_events_with_factory(state_groups, {pdu.event_id: pdu}, fetch)
+
+ state = yield self.store.get_events(state.values())
+ state = state.values()
+ except Exception:
+ raise FederationError(
+ "ERROR",
+ 403,
+ "We can't get valid state history.",
+ affected=pdu.event_id,
+ )
yield self._process_received_pdu(
origin,
@@ -322,11 +353,17 @@ class FederationHandler(BaseHandler):
for e in missing_events:
logger.info("Handling found event %s", e.event_id)
- yield self.on_receive_pdu(
- origin,
- e,
- get_missing=False
- )
+ try:
+ yield self.on_receive_pdu(
+ origin,
+ e,
+ get_missing=False
+ )
+ except FederationError as e:
+ if e.code == 403:
+ logger.warn("Event %s failed history check.")
+ else:
+ raise
@log_function
@defer.inlineCallbacks
diff --git a/tests/test_federation.py b/tests/test_federation.py
new file mode 100644
index 0000000000..12f4633cd5
--- /dev/null
+++ b/tests/test_federation.py
@@ -0,0 +1,235 @@
+
+from twisted.internet.defer import Deferred, succeed, maybeDeferred
+
+from synapse.util import Clock
+from synapse.events import FrozenEvent
+from synapse.types import Requester, UserID
+from synapse.replication.slave.storage.events import SlavedEventStore
+
+from tests import unittest
+from tests.server import make_request, setup_test_homeserver, ThreadedMemoryReactorClock
+
+from mock import Mock
+
+from synapse.api.errors import CodeMessageException, HttpResponseException
+
+
+class MessageAcceptTests(unittest.TestCase):
+ def setUp(self):
+
+ self.http_client = Mock()
+ self.reactor = ThreadedMemoryReactorClock()
+ self.hs_clock = Clock(self.reactor)
+ self.homeserver = setup_test_homeserver(
+ http_client=self.http_client, clock=self.hs_clock, reactor=self.reactor
+ )
+
+ user_id = UserID("us", "test")
+ our_user = Requester(user_id, None, False, None, None)
+ room_creator = self.homeserver.get_room_creation_handler()
+ room = room_creator.create_room(
+ our_user, room_creator.PRESETS_DICT["public_chat"], ratelimit=False
+ )
+ self.reactor.advance(0.1)
+ self.room_id = self.successResultOf(room)["room_id"]
+
+ # Figure out what the most recent event is
+ most_recent = self.successResultOf(
+ self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id)
+ )[0]
+
+ join_event = FrozenEvent(
+ {
+ "room_id": self.room_id,
+ "sender": "@baduser:test.serv",
+ "state_key": "@baduser:test.serv",
+ "event_id": "$join:test.serv",
+ "depth": 1000,
+ "origin_server_ts": 1,
+ "type": "m.room.member",
+ "origin": "test.servx",
+ "content": {"membership": "join"},
+ "auth_events": [],
+ "prev_state": [(most_recent, {})],
+ "prev_events": [(most_recent, {})],
+ }
+ )
+
+ self.handler = self.homeserver.get_handlers().federation_handler
+ self.handler.do_auth = lambda *a, **b: succeed(True)
+ self.client = self.homeserver.get_federation_client()
+ self.client._check_sigs_and_hash_and_fetch = lambda dest, pdus, **k: succeed(
+ pdus
+ )
+
+ # Send the join, it should return None (which is not an error)
+ d = self.handler.on_receive_pdu(
+ "test.serv", join_event, sent_to_us_directly=True
+ )
+ self.reactor.advance(1)
+ self.assertEqual(self.successResultOf(d), None)
+
+ # Make sure we actually joined the room
+ self.assertEqual(
+ self.successResultOf(
+ self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id)
+ )[0],
+ "$join:test.serv",
+ )
+
+ def test_cant_hide_direct_ancestors(self):
+ """
+ If you send a message, you must be able to provide the direct
+ prev_events that said event references.
+ """
+
+ def post_json(destination, path, data, headers=None, timeout=0):
+ # If it asks us for new missing events, give them NOTHING
+ if path.startswith("/_matrix/federation/v1/get_missing_events/"):
+ return {"events": []}
+
+ self.http_client.post_json = post_json
+
+ # Figure out what the most recent event is
+ most_recent = self.successResultOf(
+ self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id)
+ )[0]
+
+ # Now lie about an event
+ lying_event = FrozenEvent(
+ {
+ "room_id": self.room_id,
+ "sender": "@baduser:test.serv",
+ "event_id": "one:test.serv",
+ "depth": 1000,
+ "origin_server_ts": 1,
+ "type": "m.room.message",
+ "origin": "test.serv",
+ "content": "hewwo?",
+ "auth_events": [],
+ "prev_events": [("two:test.serv", {}), (most_recent, {})],
+ }
+ )
+
+ d = self.handler.on_receive_pdu(
+ "test.serv", lying_event, sent_to_us_directly=True
+ )
+
+ # Step the reactor, so the database fetches come back
+ self.reactor.advance(1)
+
+ # on_receive_pdu should throw an error
+ failure = self.failureResultOf(d)
+ self.assertEqual(
+ failure.value.args[0],
+ (
+ "ERROR 403: Your server isn't divulging details about prev_events "
+ "referenced in this event."
+ ),
+ )
+
+ # Make sure the invalid event isn't there
+ extrem = self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id)
+ self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv")
+
+ @unittest.DEBUG
+ def test_cant_hide_past_history(self):
+ """
+ If you send a message, you must be able to provide the direct
+ prev_events that said event references.
+ """
+
+ def post_json(destination, path, data, headers=None, timeout=0):
+ if path.startswith("/_matrix/federation/v1/get_missing_events/"):
+ return {
+ "events": [
+ {
+ "room_id": self.room_id,
+ "sender": "@baduser:test.serv",
+ "event_id": "three:test.serv",
+ "depth": 1000,
+ "origin_server_ts": 1,
+ "type": "m.room.message",
+ "origin": "test.serv",
+ "content": "hewwo?",
+ "auth_events": [],
+ "prev_events": [("four:test.serv", {})],
+ }
+ ]
+ }
+
+ self.http_client.post_json = post_json
+
+ def get_json(destination, path, args, headers=None):
+ print(destination, path, args)
+ if path.startswith("/_matrix/federation/v1/state_ids/"):
+ d = self.successResultOf(
+ self.homeserver.datastore.get_state_ids_for_event("one:test.serv")
+ )
+
+ return succeed(
+ {
+ "pdu_ids": [
+ y
+ for x, y in d.items()
+ if x == ("m.room.member", "@us:test")
+ ],
+ "auth_chain_ids": d.values(),
+ }
+ )
+
+ self.http_client.get_json = get_json
+
+ # Figure out what the most recent event is
+ most_recent = self.successResultOf(
+ self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id)
+ )[0]
+
+ # Make a good event
+ good_event = FrozenEvent(
+ {
+ "room_id": self.room_id,
+ "sender": "@baduser:test.serv",
+ "event_id": "one:test.serv",
+ "depth": 1000,
+ "origin_server_ts": 1,
+ "type": "m.room.message",
+ "origin": "test.serv",
+ "content": "hewwo?",
+ "auth_events": [],
+ "prev_events": [(most_recent, {})],
+ }
+ )
+
+ d = self.handler.on_receive_pdu(
+ "test.serv", good_event, sent_to_us_directly=True
+ )
+ self.reactor.advance(1)
+ self.assertEqual(self.successResultOf(d), None)
+
+ bad_event = FrozenEvent(
+ {
+ "room_id": self.room_id,
+ "sender": "@baduser:test.serv",
+ "event_id": "two:test.serv",
+ "depth": 1000,
+ "origin_server_ts": 1,
+ "type": "m.room.message",
+ "origin": "test.serv",
+ "content": "hewwo?",
+ "auth_events": [],
+ "prev_events": [("one:test.serv", {}), ("three:test.serv", {})],
+ }
+ )
+
+ d = self.handler.on_receive_pdu(
+ "test.serv", bad_event, sent_to_us_directly=True
+ )
+ self.reactor.advance(1)
+
+ extrem = self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id)
+ self.assertEqual(self.successResultOf(extrem)[0], "two:test.serv")
+
+ state = self.homeserver.get_state_handler().get_current_state_ids(self.room_id)
+ self.reactor.advance(1)
+ self.assertIn(("m.room.member", "@us:test"), self.successResultOf(state).keys())
diff --git a/tests/unittest.py b/tests/unittest.py
index 184fe880f3..de24b1d2d4 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -35,7 +35,7 @@ class ToTwistedHandler(logging.Handler):
def emit(self, record):
log_entry = self.format(record)
log_level = record.levelname.lower().replace('warning', 'warn')
- self.tx_log.emit(twisted.logger.LogLevel.levelWithName(log_level), log_entry)
+ self.tx_log.emit(twisted.logger.LogLevel.levelWithName(log_level), log_entry.replace("{", r"(").replace("}", r")"))
handler = ToTwistedHandler()
|