# -*- coding: utf-8 -*- # Copyright 2015 - 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import sys import json import os import pprint testcases = json.load(sys.stdin) w = sys.stdout.write w(""" # -*- coding: utf-8 -*- # Generated by """ + os.path.basename(__file__) + """ # Copyright 2015 - 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from synapse.api.auth import Auth from synapse.api.errors import SynapseError from tests import unittest from tests.utils import setup_test_homeserver from twisted.internet import defer from synapse.events import FrozenEvent class EventAuthTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): self.hs = yield setup_test_homeserver(handlers=None) self.auth = Auth(self.hs) """) indent = " " * 4 for name, case in sorted(testcases.items()): w(""" def test_""" + name[4:] + """(self): """) auth_data = case.get("auth_events", {}) auth_events = {} if "create" in auth_data: auth_events[("m.room.create", "")] = auth_data["create"] for user_id, member in auth_data.get("member", {}).items(): auth_events[("m.room.member", user_id)] = member w(indent * 2 + "auth_events = (\n" + indent * 3) data = pprint.pformat(auth_events, width=80 - len(indent * 3)) w(data.replace("\n", "\n" + indent * 3)) w("\n" + indent * 2 + ")\n") w(""" auth_events = {k: FrozenEvent(v) for k, v in auth_events.items()} """[1:]) w("\n" + indent * 2 + "# Allowed events\n") for allowed in case.get("allowed", ()): reason = allowed.pop("unsigned", {}).pop("allowed", None) if reason is not None: w(indent * 2 + "#Allowed: " + reason) w(indent * 2 + "self.auth.check(FrozenEvent(\n" + indent * 3) data = pprint.pformat(allowed, width=80 - len(indent * 3)) w(data.replace("\n", "\n" + indent * 3)) w("\n" + indent * 2 + "), auth_events=auth_events, do_sig_check=False)\n") w("\n" + indent * 2 + "# Disallowed events\n") for not_allowed in case.get("not_allowed", ()): reason = not_allowed.pop("unsigned", {}).pop("not_allowed", None) if reason is not None: w(indent * 2 + "# " + reason + "\n") w(indent * 2 + "self.assertRaises(SynapseError,") w(" self.auth.check, FrozenEvent(\n" + indent * 3) data = pprint.pformat(not_allowed, width=80 - len(indent * 3)) w(data.replace("\n", "\n" + indent * 3)) w("\n" + indent * 2 + "), auth_events=auth_events, do_sig_check=False)\n")