From 64ec45fc1b0856dc7daacca7d3ab75d50bd89f84 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 1 Feb 2022 14:13:38 +0000 Subject: Send to-device messages to application services (#11215) Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- tests/appservice/test_scheduler.py | 109 +++++++------- tests/handlers/test_appservice.py | 281 +++++++++++++++++++++++++++++++++++-- tests/storage/test_appservice.py | 26 ++-- 3 files changed, 348 insertions(+), 68 deletions(-) (limited to 'tests') diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 55f0899bae..8fb6687f89 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -11,23 +11,29 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING from unittest.mock import Mock from twisted.internet import defer from synapse.appservice import ApplicationServiceState from synapse.appservice.scheduler import ( + ApplicationServiceScheduler, _Recoverer, - _ServiceQueuer, _TransactionController, ) from synapse.logging.context import make_deferred_yieldable +from synapse.server import HomeServer +from synapse.util import Clock from tests import unittest from tests.test_utils import simple_async_mock from ..utils import MockClock +if TYPE_CHECKING: + from twisted.internet.testing import MemoryReactor + class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): def setUp(self): @@ -58,7 +64,10 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events))) self.store.create_appservice_txn.assert_called_once_with( - service=service, events=events, ephemeral=[] # txn made and saved + service=service, + events=events, + ephemeral=[], + to_device_messages=[], # txn made and saved ) self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made txn.complete.assert_called_once_with(self.store) # txn completed @@ -79,7 +88,10 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events))) self.store.create_appservice_txn.assert_called_once_with( - service=service, events=events, ephemeral=[] # txn made and saved + service=service, + events=events, + ephemeral=[], + to_device_messages=[], # txn made and saved ) self.assertEquals(0, txn.send.call_count) # txn not sent though self.assertEquals(0, txn.complete.call_count) # or completed @@ -102,7 +114,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events))) self.store.create_appservice_txn.assert_called_once_with( - service=service, events=events, ephemeral=[] + service=service, events=events, ephemeral=[], to_device_messages=[] ) self.assertEquals(1, self.recoverer_fn.call_count) # recoverer made self.assertEquals(1, self.recoverer.recover.call_count) # and invoked @@ -189,38 +201,41 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.callback.assert_called_once_with(self.recoverer) -class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): - def setUp(self): +class ApplicationServiceSchedulerQueuerTestCase(unittest.HomeserverTestCase): + def prepare(self, reactor: "MemoryReactor", clock: Clock, hs: HomeServer): + self.scheduler = ApplicationServiceScheduler(hs) self.txn_ctrl = Mock() self.txn_ctrl.send = simple_async_mock() - self.queuer = _ServiceQueuer(self.txn_ctrl, MockClock()) + + # Replace instantiated _TransactionController instances with our Mock + self.scheduler.txn_ctrl = self.txn_ctrl + self.scheduler.queuer.txn_ctrl = self.txn_ctrl def test_send_single_event_no_queue(self): # Expect the event to be sent immediately. service = Mock(id=4) event = Mock() - self.queuer.enqueue_event(service, event) - self.txn_ctrl.send.assert_called_once_with(service, [event], []) + self.scheduler.enqueue_for_appservice(service, events=[event]) + self.txn_ctrl.send.assert_called_once_with(service, [event], [], []) def test_send_single_event_with_queue(self): d = defer.Deferred() - self.txn_ctrl.send = Mock( - side_effect=lambda x, y, z: make_deferred_yieldable(d) - ) + self.txn_ctrl.send = Mock(return_value=make_deferred_yieldable(d)) service = Mock(id=4) event = Mock(event_id="first") event2 = Mock(event_id="second") event3 = Mock(event_id="third") # Send an event and don't resolve it just yet. - self.queuer.enqueue_event(service, event) + self.scheduler.enqueue_for_appservice(service, events=[event]) # Send more events: expect send() to NOT be called multiple times. - self.queuer.enqueue_event(service, event2) - self.queuer.enqueue_event(service, event3) - self.txn_ctrl.send.assert_called_with(service, [event], []) + # (call enqueue_for_appservice multiple times deliberately) + self.scheduler.enqueue_for_appservice(service, events=[event2]) + self.scheduler.enqueue_for_appservice(service, events=[event3]) + self.txn_ctrl.send.assert_called_with(service, [event], [], []) self.assertEquals(1, self.txn_ctrl.send.call_count) # Resolve the send event: expect the queued events to be sent d.callback(service) - self.txn_ctrl.send.assert_called_with(service, [event2, event3], []) + self.txn_ctrl.send.assert_called_with(service, [event2, event3], [], []) self.assertEquals(2, self.txn_ctrl.send.call_count) def test_multiple_service_queues(self): @@ -238,23 +253,23 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): send_return_list = [srv_1_defer, srv_2_defer] - def do_send(x, y, z): + def do_send(*args, **kwargs): return make_deferred_yieldable(send_return_list.pop(0)) self.txn_ctrl.send = Mock(side_effect=do_send) # send events for different ASes and make sure they are sent - self.queuer.enqueue_event(srv1, srv_1_event) - self.queuer.enqueue_event(srv1, srv_1_event2) - self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event], []) - self.queuer.enqueue_event(srv2, srv_2_event) - self.queuer.enqueue_event(srv2, srv_2_event2) - self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event], []) + self.scheduler.enqueue_for_appservice(srv1, events=[srv_1_event]) + self.scheduler.enqueue_for_appservice(srv1, events=[srv_1_event2]) + self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event], [], []) + self.scheduler.enqueue_for_appservice(srv2, events=[srv_2_event]) + self.scheduler.enqueue_for_appservice(srv2, events=[srv_2_event2]) + self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event], [], []) # make sure callbacks for a service only send queued events for THAT # service srv_2_defer.callback(srv2) - self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2], []) + self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2], [], []) self.assertEquals(3, self.txn_ctrl.send.call_count) def test_send_large_txns(self): @@ -262,7 +277,7 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): srv_2_defer = defer.Deferred() send_return_list = [srv_1_defer, srv_2_defer] - def do_send(x, y, z): + def do_send(*args, **kwargs): return make_deferred_yieldable(send_return_list.pop(0)) self.txn_ctrl.send = Mock(side_effect=do_send) @@ -270,67 +285,65 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): service = Mock(id=4, name="service") event_list = [Mock(name="event%i" % (i + 1)) for i in range(200)] for event in event_list: - self.queuer.enqueue_event(service, event) + self.scheduler.enqueue_for_appservice(service, [event], []) # Expect the first event to be sent immediately. - self.txn_ctrl.send.assert_called_with(service, [event_list[0]], []) + self.txn_ctrl.send.assert_called_with(service, [event_list[0]], [], []) srv_1_defer.callback(service) # Then send the next 100 events - self.txn_ctrl.send.assert_called_with(service, event_list[1:101], []) + self.txn_ctrl.send.assert_called_with(service, event_list[1:101], [], []) srv_2_defer.callback(service) # Then the final 99 events - self.txn_ctrl.send.assert_called_with(service, event_list[101:], []) + self.txn_ctrl.send.assert_called_with(service, event_list[101:], [], []) self.assertEquals(3, self.txn_ctrl.send.call_count) def test_send_single_ephemeral_no_queue(self): # Expect the event to be sent immediately. service = Mock(id=4, name="service") event_list = [Mock(name="event")] - self.queuer.enqueue_ephemeral(service, event_list) - self.txn_ctrl.send.assert_called_once_with(service, [], event_list) + self.scheduler.enqueue_for_appservice(service, ephemeral=event_list) + self.txn_ctrl.send.assert_called_once_with(service, [], event_list, []) def test_send_multiple_ephemeral_no_queue(self): # Expect the event to be sent immediately. service = Mock(id=4, name="service") event_list = [Mock(name="event1"), Mock(name="event2"), Mock(name="event3")] - self.queuer.enqueue_ephemeral(service, event_list) - self.txn_ctrl.send.assert_called_once_with(service, [], event_list) + self.scheduler.enqueue_for_appservice(service, ephemeral=event_list) + self.txn_ctrl.send.assert_called_once_with(service, [], event_list, []) def test_send_single_ephemeral_with_queue(self): d = defer.Deferred() - self.txn_ctrl.send = Mock( - side_effect=lambda x, y, z: make_deferred_yieldable(d) - ) + self.txn_ctrl.send = Mock(return_value=make_deferred_yieldable(d)) service = Mock(id=4) event_list_1 = [Mock(event_id="event1"), Mock(event_id="event2")] event_list_2 = [Mock(event_id="event3"), Mock(event_id="event4")] event_list_3 = [Mock(event_id="event5"), Mock(event_id="event6")] # Send an event and don't resolve it just yet. - self.queuer.enqueue_ephemeral(service, event_list_1) + self.scheduler.enqueue_for_appservice(service, ephemeral=event_list_1) # Send more events: expect send() to NOT be called multiple times. - self.queuer.enqueue_ephemeral(service, event_list_2) - self.queuer.enqueue_ephemeral(service, event_list_3) - self.txn_ctrl.send.assert_called_with(service, [], event_list_1) + self.scheduler.enqueue_for_appservice(service, ephemeral=event_list_2) + self.scheduler.enqueue_for_appservice(service, ephemeral=event_list_3) + self.txn_ctrl.send.assert_called_with(service, [], event_list_1, []) self.assertEquals(1, self.txn_ctrl.send.call_count) # Resolve txn_ctrl.send d.callback(service) # Expect the queued events to be sent - self.txn_ctrl.send.assert_called_with(service, [], event_list_2 + event_list_3) + self.txn_ctrl.send.assert_called_with( + service, [], event_list_2 + event_list_3, [] + ) self.assertEquals(2, self.txn_ctrl.send.call_count) def test_send_large_txns_ephemeral(self): d = defer.Deferred() - self.txn_ctrl.send = Mock( - side_effect=lambda x, y, z: make_deferred_yieldable(d) - ) + self.txn_ctrl.send = Mock(return_value=make_deferred_yieldable(d)) # Expect the event to be sent immediately. service = Mock(id=4, name="service") first_chunk = [Mock(name="event%i" % (i + 1)) for i in range(100)] second_chunk = [Mock(name="event%i" % (i + 101)) for i in range(50)] event_list = first_chunk + second_chunk - self.queuer.enqueue_ephemeral(service, event_list) - self.txn_ctrl.send.assert_called_once_with(service, [], first_chunk) + self.scheduler.enqueue_for_appservice(service, ephemeral=event_list) + self.txn_ctrl.send.assert_called_once_with(service, [], first_chunk, []) d.callback(service) - self.txn_ctrl.send.assert_called_with(service, [], second_chunk) + self.txn_ctrl.send.assert_called_with(service, [], second_chunk, []) self.assertEquals(2, self.txn_ctrl.send.call_count) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index d6f14e2dba..fe57ff2671 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -1,4 +1,4 @@ -# Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2015-2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,18 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Dict, Iterable, List, Optional from unittest.mock import Mock from twisted.internet import defer +import synapse.rest.admin +import synapse.storage +from synapse.appservice import ApplicationService from synapse.handlers.appservice import ApplicationServicesHandler +from synapse.rest.client import login, receipts, room, sendtodevice from synapse.types import RoomStreamToken +from synapse.util.stringutils import random_string -from tests.test_utils import make_awaitable +from tests import unittest +from tests.test_utils import make_awaitable, simple_async_mock from tests.utils import MockClock -from .. import unittest - class AppServiceHandlerTestCase(unittest.TestCase): """Tests the ApplicationServicesHandler.""" @@ -36,6 +41,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): hs.get_datastore.return_value = self.mock_store self.mock_store.get_received_ts.return_value = make_awaitable(0) self.mock_store.set_appservice_last_pos.return_value = make_awaitable(None) + self.mock_store.set_appservice_stream_type_pos.return_value = make_awaitable( + None + ) hs.get_application_service_api.return_value = self.mock_as_api hs.get_application_service_scheduler.return_value = self.mock_scheduler hs.get_clock.return_value = MockClock() @@ -63,8 +71,8 @@ class AppServiceHandlerTestCase(unittest.TestCase): ] self.handler.notify_interested_services(RoomStreamToken(None, 1)) - self.mock_scheduler.submit_event_for_as.assert_called_once_with( - interested_service, event + self.mock_scheduler.enqueue_for_appservice.assert_called_once_with( + interested_service, events=[event] ) def test_query_user_exists_unknown_user(self): @@ -261,7 +269,6 @@ class AppServiceHandlerTestCase(unittest.TestCase): """ interested_service = self._mkservice(is_interested=True) services = [interested_service] - self.mock_store.get_app_services.return_value = services self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable( 579 @@ -275,10 +282,10 @@ class AppServiceHandlerTestCase(unittest.TestCase): self.handler.notify_interested_services_ephemeral( "receipt_key", 580, ["@fakerecipient:example.com"] ) - self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with( - interested_service, [event] + self.mock_scheduler.enqueue_for_appservice.assert_called_once_with( + interested_service, ephemeral=[event] ) - self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with( + self.mock_store.set_appservice_stream_type_pos.assert_called_once_with( interested_service, "read_receipt", 580, @@ -305,7 +312,10 @@ class AppServiceHandlerTestCase(unittest.TestCase): self.handler.notify_interested_services_ephemeral( "receipt_key", 580, ["@fakerecipient:example.com"] ) - self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called() + # This method will be called, but with an empty list of events + self.mock_scheduler.enqueue_for_appservice.assert_called_once_with( + interested_service, ephemeral=[] + ) def _mkservice(self, is_interested, protocols=None): service = Mock() @@ -321,3 +331,252 @@ class AppServiceHandlerTestCase(unittest.TestCase): service.token = "mock_service_token" service.url = "mock_service_url" return service + + +class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase): + """ + Tests that the ApplicationServicesHandler sends events to application + services correctly. + """ + + servlets = [ + synapse.rest.admin.register_servlets_for_client_rest_resource, + login.register_servlets, + room.register_servlets, + sendtodevice.register_servlets, + receipts.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + # Mock the ApplicationServiceScheduler's _TransactionController's send method so that + # we can track any outgoing ephemeral events + self.send_mock = simple_async_mock() + hs.get_application_service_handler().scheduler.txn_ctrl.send = self.send_mock + + # Mock out application services, and allow defining our own in tests + self._services: List[ApplicationService] = [] + self.hs.get_datastore().get_app_services = Mock(return_value=self._services) + + # A user on the homeserver. + self.local_user_device_id = "local_device" + self.local_user = self.register_user("local_user", "password") + self.local_user_token = self.login( + "local_user", "password", self.local_user_device_id + ) + + # A user on the homeserver which lies within an appservice's exclusive user namespace. + self.exclusive_as_user_device_id = "exclusive_as_device" + self.exclusive_as_user = self.register_user("exclusive_as_user", "password") + self.exclusive_as_user_token = self.login( + "exclusive_as_user", "password", self.exclusive_as_user_device_id + ) + + @unittest.override_config( + {"experimental_features": {"msc2409_to_device_messages_enabled": True}} + ) + def test_application_services_receive_local_to_device(self): + """ + Test that when a user sends a to-device message to another user + that is an application service's user namespace, the + application service will receive it. + """ + interested_appservice = self._register_application_service( + namespaces={ + ApplicationService.NS_USERS: [ + { + "regex": "@exclusive_as_user:.+", + "exclusive": True, + } + ], + }, + ) + + # Have local_user send a to-device message to exclusive_as_user + message_content = {"some_key": "some really interesting value"} + chan = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.room_key_request/3", + content={ + "messages": { + self.exclusive_as_user: { + self.exclusive_as_user_device_id: message_content + } + } + }, + access_token=self.local_user_token, + ) + self.assertEqual(chan.code, 200, chan.result) + + # Have exclusive_as_user send a to-device message to local_user + chan = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.room_key_request/4", + content={ + "messages": { + self.local_user: {self.local_user_device_id: message_content} + } + }, + access_token=self.exclusive_as_user_token, + ) + self.assertEqual(chan.code, 200, chan.result) + + # Check if our application service - that is interested in exclusive_as_user - received + # the to-device message as part of an AS transaction. + # Only the local_user -> exclusive_as_user to-device message should have been forwarded to the AS. + # + # The uninterested application service should not have been notified at all. + self.send_mock.assert_called_once() + service, _events, _ephemeral, to_device_messages = self.send_mock.call_args[0] + + # Assert that this was the same to-device message that local_user sent + self.assertEqual(service, interested_appservice) + self.assertEqual(to_device_messages[0]["type"], "m.room_key_request") + self.assertEqual(to_device_messages[0]["sender"], self.local_user) + + # Additional fields 'to_user_id' and 'to_device_id' specifically for + # to-device messages via the AS API + self.assertEqual(to_device_messages[0]["to_user_id"], self.exclusive_as_user) + self.assertEqual( + to_device_messages[0]["to_device_id"], self.exclusive_as_user_device_id + ) + self.assertEqual(to_device_messages[0]["content"], message_content) + + @unittest.override_config( + {"experimental_features": {"msc2409_to_device_messages_enabled": True}} + ) + def test_application_services_receive_bursts_of_to_device(self): + """ + Test that when a user sends >100 to-device messages at once, any + interested AS's will receive them in separate transactions. + + Also tests that uninterested application services do not receive messages. + """ + # Register two application services with exclusive interest in a user + interested_appservices = [] + for _ in range(2): + appservice = self._register_application_service( + namespaces={ + ApplicationService.NS_USERS: [ + { + "regex": "@exclusive_as_user:.+", + "exclusive": True, + } + ], + }, + ) + interested_appservices.append(appservice) + + # ...and an application service which does not have any user interest. + self._register_application_service() + + to_device_message_content = { + "some key": "some interesting value", + } + + # We need to send a large burst of to-device messages. We also would like to + # include them all in the same application service transaction so that we can + # test large transactions. + # + # To do this, we can send a single to-device message to many user devices at + # once. + # + # We insert number_of_messages - 1 messages into the database directly. We'll then + # send a final to-device message to the real device, which will also kick off + # an AS transaction (as just inserting messages into the DB won't). + number_of_messages = 150 + fake_device_ids = [f"device_{num}" for num in range(number_of_messages - 1)] + messages = { + self.exclusive_as_user: { + device_id: to_device_message_content for device_id in fake_device_ids + } + } + + # Create a fake device per message. We can't send to-device messages to + # a device that doesn't exist. + self.get_success( + self.hs.get_datastore().db_pool.simple_insert_many( + desc="test_application_services_receive_burst_of_to_device", + table="devices", + keys=("user_id", "device_id"), + values=[ + ( + self.exclusive_as_user, + device_id, + ) + for device_id in fake_device_ids + ], + ) + ) + + # Seed the device_inbox table with our fake messages + self.get_success( + self.hs.get_datastore().add_messages_to_device_inbox(messages, {}) + ) + + # Now have local_user send a final to-device message to exclusive_as_user. All unsent + # to-device messages should be sent to any application services + # interested in exclusive_as_user. + chan = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.room_key_request/4", + content={ + "messages": { + self.exclusive_as_user: { + self.exclusive_as_user_device_id: to_device_message_content + } + } + }, + access_token=self.local_user_token, + ) + self.assertEqual(chan.code, 200, chan.result) + + self.send_mock.assert_called() + + # Count the total number of to-device messages that were sent out per-service. + # Ensure that we only sent to-device messages to interested services, and that + # each interested service received the full count of to-device messages. + service_id_to_message_count: Dict[str, int] = {} + + for call in self.send_mock.call_args_list: + service, _events, _ephemeral, to_device_messages = call[0] + + # Check that this was made to an interested service + self.assertIn(service, interested_appservices) + + # Add to the count of messages for this application service + service_id_to_message_count.setdefault(service.id, 0) + service_id_to_message_count[service.id] += len(to_device_messages) + + # Assert that each interested service received the full count of messages + for count in service_id_to_message_count.values(): + self.assertEqual(count, number_of_messages) + + def _register_application_service( + self, + namespaces: Optional[Dict[str, Iterable[Dict]]] = None, + ) -> ApplicationService: + """ + Register a new application service, with the given namespaces of interest. + + Args: + namespaces: A dictionary containing any user, room or alias namespaces that + the application service is interested in. + + Returns: + The registered application service. + """ + # Create an application service + appservice = ApplicationService( + token=random_string(10), + hostname="example.com", + id=random_string(10), + sender="@as:example.com", + rate_limited=False, + namespaces=namespaces, + supports_ephemeral=True, + ) + + # Register the application service + self._services.append(appservice) + + return appservice diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 329490caad..ddcb7f5549 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -266,7 +266,9 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase): service = Mock(id=self.as_list[0]["id"]) events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) txn = self.get_success( - defer.ensureDeferred(self.store.create_appservice_txn(service, events, [])) + defer.ensureDeferred( + self.store.create_appservice_txn(service, events, [], []) + ) ) self.assertEquals(txn.id, 1) self.assertEquals(txn.events, events) @@ -280,7 +282,9 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase): self.get_success(self._set_last_txn(service.id, 9643)) # AS is falling behind self.get_success(self._insert_txn(service.id, 9644, events)) self.get_success(self._insert_txn(service.id, 9645, events)) - txn = self.get_success(self.store.create_appservice_txn(service, events, [])) + txn = self.get_success( + self.store.create_appservice_txn(service, events, [], []) + ) self.assertEquals(txn.id, 9646) self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) @@ -291,7 +295,9 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase): service = Mock(id=self.as_list[0]["id"]) events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) self.get_success(self._set_last_txn(service.id, 9643)) - txn = self.get_success(self.store.create_appservice_txn(service, events, [])) + txn = self.get_success( + self.store.create_appservice_txn(service, events, [], []) + ) self.assertEquals(txn.id, 9644) self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) @@ -313,7 +319,9 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase): self.get_success(self._insert_txn(self.as_list[2]["id"], 10, events)) self.get_success(self._insert_txn(self.as_list[3]["id"], 9643, events)) - txn = self.get_success(self.store.create_appservice_txn(service, events, [])) + txn = self.get_success( + self.store.create_appservice_txn(service, events, [], []) + ) self.assertEquals(txn.id, 9644) self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) @@ -481,10 +489,10 @@ class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): ValueError, ) - def test_set_type_stream_id_for_appservice(self) -> None: + def test_set_appservice_stream_type_pos(self) -> None: read_receipt_value = 1024 self.get_success( - self.store.set_type_stream_id_for_appservice( + self.store.set_appservice_stream_type_pos( self.service, "read_receipt", read_receipt_value ) ) @@ -494,7 +502,7 @@ class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): self.assertEqual(result, read_receipt_value) self.get_success( - self.store.set_type_stream_id_for_appservice( + self.store.set_appservice_stream_type_pos( self.service, "presence", read_receipt_value ) ) @@ -503,9 +511,9 @@ class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): ) self.assertEqual(result, read_receipt_value) - def test_set_type_stream_id_for_appservice_invalid_type(self) -> None: + def test_set_appservice_stream_type_pos_invalid_type(self) -> None: self.get_failure( - self.store.set_type_stream_id_for_appservice(self.service, "foobar", 1024), + self.store.set_appservice_stream_type_pos(self.service, "foobar", 1024), ValueError, ) -- cgit 1.5.1 From 3f72c2a322af0d9d45452aa9c78723793bea2432 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 1 Feb 2022 17:45:13 +0000 Subject: Convert `ApplicationServiceTestCase` to use `simple_async_mock` (#11880) --- changelog.d/11880.misc | 1 + tests/appservice/test_appservice.py | 19 +++++++++---------- 2 files changed, 10 insertions(+), 10 deletions(-) create mode 100644 changelog.d/11880.misc (limited to 'tests') diff --git a/changelog.d/11880.misc b/changelog.d/11880.misc new file mode 100644 index 0000000000..8125947b2a --- /dev/null +++ b/changelog.d/11880.misc @@ -0,0 +1 @@ +Convert `ApplicationServiceTestCase` to use `simple_async_mock`. \ No newline at end of file diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py index ba2a2bfd64..07d8105f41 100644 --- a/tests/appservice/test_appservice.py +++ b/tests/appservice/test_appservice.py @@ -19,6 +19,7 @@ from twisted.internet import defer from synapse.appservice import ApplicationService, Namespace from tests import unittest +from tests.test_utils import simple_async_mock def _regex(regex: str, exclusive: bool = True) -> Namespace: @@ -91,10 +92,10 @@ class ApplicationServiceTestCase(unittest.TestCase): self.service.namespaces[ApplicationService.NS_ALIASES].append( _regex("#irc_.*:matrix.org") ) - self.store.get_aliases_for_room.return_value = defer.succeed( + self.store.get_aliases_for_room = simple_async_mock( ["#irc_foobar:matrix.org", "#athing:matrix.org"] ) - self.store.get_users_in_room.return_value = defer.succeed([]) + self.store.get_users_in_room = simple_async_mock([]) self.assertTrue( ( yield defer.ensureDeferred( @@ -144,10 +145,10 @@ class ApplicationServiceTestCase(unittest.TestCase): self.service.namespaces[ApplicationService.NS_ALIASES].append( _regex("#irc_.*:matrix.org") ) - self.store.get_aliases_for_room.return_value = defer.succeed( + self.store.get_aliases_for_room = simple_async_mock( ["#xmpp_foobar:matrix.org", "#athing:matrix.org"] ) - self.store.get_users_in_room.return_value = defer.succeed([]) + self.store.get_users_in_room = simple_async_mock([]) self.assertFalse( ( yield defer.ensureDeferred( @@ -163,10 +164,8 @@ class ApplicationServiceTestCase(unittest.TestCase): ) self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*")) self.event.sender = "@irc_foobar:matrix.org" - self.store.get_aliases_for_room.return_value = defer.succeed( - ["#irc_barfoo:matrix.org"] - ) - self.store.get_users_in_room.return_value = defer.succeed([]) + self.store.get_aliases_for_room = simple_async_mock(["#irc_barfoo:matrix.org"]) + self.store.get_users_in_room = simple_async_mock([]) self.assertTrue( ( yield defer.ensureDeferred( @@ -191,10 +190,10 @@ class ApplicationServiceTestCase(unittest.TestCase): def test_member_list_match(self): self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*")) # Note that @irc_fo:here is the AS user. - self.store.get_users_in_room.return_value = defer.succeed( + self.store.get_users_in_room = simple_async_mock( ["@alice:here", "@irc_fo:here", "@bob:here"] ) - self.store.get_aliases_for_room.return_value = defer.succeed([]) + self.store.get_aliases_for_room = simple_async_mock([]) self.event.sender = "@xmpp_foobar:matrix.org" self.assertTrue( -- cgit 1.5.1 From 513913cc6ba77833082178b7d2b2f0565daf3c99 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Wed, 2 Feb 2022 09:59:55 +0000 Subject: Expose the registered device ID from the `register_appservice_user` test helper. (#11615) --- changelog.d/11615.misc | 1 + tests/handlers/test_user_directory.py | 6 ++++-- tests/rest/client/test_room_batch.py | 2 +- tests/storage/test_user_directory.py | 4 +++- tests/unittest.py | 9 +++++---- 5 files changed, 14 insertions(+), 8 deletions(-) create mode 100644 changelog.d/11615.misc (limited to 'tests') diff --git a/changelog.d/11615.misc b/changelog.d/11615.misc new file mode 100644 index 0000000000..0aa9536504 --- /dev/null +++ b/changelog.d/11615.misc @@ -0,0 +1 @@ +Expose the registered device ID from the `register_appservice_user` test helper. \ No newline at end of file diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index 70c621b825..482c90ef68 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -169,7 +169,9 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): # Register an AS user. user = self.register_user("user", "pass") token = self.login(user, "pass") - as_user = self.register_appservice_user("as_user_potato", self.appservice.token) + as_user, _ = self.register_appservice_user( + "as_user_potato", self.appservice.token + ) # Join the AS user to rooms owned by the normal user. public, private = self._create_rooms_and_inject_memberships( @@ -388,7 +390,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): def test_handle_local_profile_change_with_appservice_user(self) -> None: # create user - as_user_id = self.register_appservice_user( + as_user_id, _ = self.register_appservice_user( "as_user_alice", self.appservice.token ) diff --git a/tests/rest/client/test_room_batch.py b/tests/rest/client/test_room_batch.py index 721454c187..e9f8704035 100644 --- a/tests/rest/client/test_room_batch.py +++ b/tests/rest/client/test_room_batch.py @@ -89,7 +89,7 @@ class RoomBatchTestCase(unittest.HomeserverTestCase): self.clock = clock self.storage = hs.get_storage() - self.virtual_user_id = self.register_appservice_user( + self.virtual_user_id, _ = self.register_appservice_user( "as_user_potato", self.appservice.token ) diff --git a/tests/storage/test_user_directory.py b/tests/storage/test_user_directory.py index 7f5b28aed8..48f1e9d841 100644 --- a/tests/storage/test_user_directory.py +++ b/tests/storage/test_user_directory.py @@ -341,7 +341,9 @@ class UserDirectoryInitialPopulationTestcase(HomeserverTestCase): # Register an AS user. user = self.register_user("user", "pass") token = self.login(user, "pass") - as_user = self.register_appservice_user("as_user_potato", self.appservice.token) + as_user, _ = self.register_appservice_user( + "as_user_potato", self.appservice.token + ) # Join the AS user to rooms owned by the normal user. public, private = self._create_rooms_and_inject_memberships( diff --git a/tests/unittest.py b/tests/unittest.py index 1431848367..6fc617601a 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -620,18 +620,19 @@ class HomeserverTestCase(TestCase): self, username: str, appservice_token: str, - ) -> str: + ) -> Tuple[str, str]: """Register an appservice user as an application service. Requires the client-facing registration API be registered. Args: username: the user to be registered by an application service. - Should be a full username, i.e. ""@localpart:hostname" as opposed to just "localpart" + Should NOT be a full username, i.e. just "localpart" as opposed to "@localpart:hostname" appservice_token: the acccess token for that application service. Raises: if the request to '/register' does not return 200 OK. - Returns: the MXID of the new user. + Returns: + The MXID of the new user, the device ID of the new user's first device. """ channel = self.make_request( "POST", @@ -643,7 +644,7 @@ class HomeserverTestCase(TestCase): access_token=appservice_token, ) self.assertEqual(channel.code, 200, channel.json_body) - return channel.json_body["user_id"] + return channel.json_body["user_id"], channel.json_body["device_id"] def login( self, -- cgit 1.5.1 From af795173bec447da44b65b739d05d0083b02229d Mon Sep 17 00:00:00 2001 From: reivilibre Date: Wed, 2 Feb 2022 11:37:18 +0000 Subject: Add a background database update to purge account data for deactivated users. (#11655) --- changelog.d/11655.feature | 1 + scripts/synapse_port_db | 4 + synapse/storage/databases/main/account_data.py | 164 ++++++++++++++------- ...elete_account_data_for_deactivated_accounts.sql | 20 +++ tests/handlers/test_deactivate_account.py | 106 +++++++++++++ 5 files changed, 240 insertions(+), 55 deletions(-) create mode 100644 changelog.d/11655.feature create mode 100644 synapse/storage/schema/main/delta/68/03_delete_account_data_for_deactivated_accounts.sql (limited to 'tests') diff --git a/changelog.d/11655.feature b/changelog.d/11655.feature new file mode 100644 index 0000000000..dc426fb658 --- /dev/null +++ b/changelog.d/11655.feature @@ -0,0 +1 @@ +Remove account data (including client config, push rules and ignored users) upon user deactivation. \ No newline at end of file diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 640ff15277..70ee4e5c7f 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -36,6 +36,8 @@ from synapse.logging.context import ( run_in_background, ) from synapse.storage.database import DatabasePool, make_conn +from synapse.storage.databases.main import PushRuleStore +from synapse.storage.databases.main.account_data import AccountDataWorkerStore from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore from synapse.storage.databases.main.devices import DeviceBackgroundUpdateStore @@ -180,6 +182,8 @@ class Store( UserDirectoryBackgroundUpdateStore, EndToEndKeyBackgroundStore, StatsStore, + AccountDataWorkerStore, + PushRuleStore, PusherWorkerStore, PresenceBackgroundUpdateStore, GroupServerWorkerStore, diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 5bfa408f74..52146aacc8 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -106,6 +106,11 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) "AccountDataAndTagsChangeCache", account_max ) + self.db_pool.updates.register_background_update_handler( + "delete_account_data_for_deactivated_users", + self._delete_account_data_for_deactivated_users, + ) + def get_max_account_data_stream_id(self) -> int: """Get the current max stream ID for account data stream @@ -549,72 +554,121 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) async def purge_account_data_for_user(self, user_id: str) -> None: """ - Removes the account data for a user. + Removes ALL the account data for a user. + Intended to be used upon user deactivation. - This is intended to be used upon user deactivation and also removes any - derived information from account data (e.g. push rules and ignored users). + Also purges the user from the ignored_users cache table + and the push_rules cache tables. + """ - Args: - user_id: The user ID to remove data for. + await self.db_pool.runInteraction( + "purge_account_data_for_user_txn", + self._purge_account_data_for_user_txn, + user_id, + ) + + def _purge_account_data_for_user_txn( + self, txn: LoggingTransaction, user_id: str + ) -> None: """ + See `purge_account_data_for_user`. + """ + # Purge from the primary account_data tables. + self.db_pool.simple_delete_txn( + txn, table="account_data", keyvalues={"user_id": user_id} + ) - def purge_account_data_for_user_txn(txn: LoggingTransaction) -> None: - # Purge from the primary account_data tables. - self.db_pool.simple_delete_txn( - txn, table="account_data", keyvalues={"user_id": user_id} - ) + self.db_pool.simple_delete_txn( + txn, table="room_account_data", keyvalues={"user_id": user_id} + ) - self.db_pool.simple_delete_txn( - txn, table="room_account_data", keyvalues={"user_id": user_id} - ) + # Purge from ignored_users where this user is the ignorer. + # N.B. We don't purge where this user is the ignoree, because that + # interferes with other users' account data. + # It's also not this user's data to delete! + self.db_pool.simple_delete_txn( + txn, table="ignored_users", keyvalues={"ignorer_user_id": user_id} + ) - # Purge from ignored_users where this user is the ignorer. - # N.B. We don't purge where this user is the ignoree, because that - # interferes with other users' account data. - # It's also not this user's data to delete! - self.db_pool.simple_delete_txn( - txn, table="ignored_users", keyvalues={"ignorer_user_id": user_id} - ) + # Remove the push rules + self.db_pool.simple_delete_txn( + txn, table="push_rules", keyvalues={"user_name": user_id} + ) + self.db_pool.simple_delete_txn( + txn, table="push_rules_enable", keyvalues={"user_name": user_id} + ) + self.db_pool.simple_delete_txn( + txn, table="push_rules_stream", keyvalues={"user_id": user_id} + ) - # Remove the push rules - self.db_pool.simple_delete_txn( - txn, table="push_rules", keyvalues={"user_name": user_id} - ) - self.db_pool.simple_delete_txn( - txn, table="push_rules_enable", keyvalues={"user_name": user_id} - ) - self.db_pool.simple_delete_txn( - txn, table="push_rules_stream", keyvalues={"user_id": user_id} - ) + # Invalidate caches as appropriate + self._invalidate_cache_and_stream( + txn, self.get_account_data_for_room_and_type, (user_id,) + ) + self._invalidate_cache_and_stream( + txn, self.get_account_data_for_user, (user_id,) + ) + self._invalidate_cache_and_stream( + txn, self.get_global_account_data_by_type_for_user, (user_id,) + ) + self._invalidate_cache_and_stream( + txn, self.get_account_data_for_room, (user_id,) + ) + self._invalidate_cache_and_stream(txn, self.get_push_rules_for_user, (user_id,)) + self._invalidate_cache_and_stream( + txn, self.get_push_rules_enabled_for_user, (user_id,) + ) + # This user might be contained in the ignored_by cache for other users, + # so we have to invalidate it all. + self._invalidate_all_cache_and_stream(txn, self.ignored_by) - # Invalidate caches as appropriate - self._invalidate_cache_and_stream( - txn, self.get_account_data_for_room_and_type, (user_id,) - ) - self._invalidate_cache_and_stream( - txn, self.get_account_data_for_user, (user_id,) - ) - self._invalidate_cache_and_stream( - txn, self.get_global_account_data_by_type_for_user, (user_id,) - ) - self._invalidate_cache_and_stream( - txn, self.get_account_data_for_room, (user_id,) - ) - self._invalidate_cache_and_stream( - txn, self.get_push_rules_for_user, (user_id,) - ) - self._invalidate_cache_and_stream( - txn, self.get_push_rules_enabled_for_user, (user_id,) - ) - # This user might be contained in the ignored_by cache for other users, - # so we have to invalidate it all. - self._invalidate_all_cache_and_stream(txn, self.ignored_by) + async def _delete_account_data_for_deactivated_users( + self, progress: dict, batch_size: int + ) -> int: + """ + Retroactively purges account data for users that have already been deactivated. + Gets run as a background update caused by a schema delta. + """ - await self.db_pool.runInteraction( - "purge_account_data_for_user_txn", - purge_account_data_for_user_txn, + last_user: str = progress.get("last_user", "") + + def _delete_account_data_for_deactivated_users_txn( + txn: LoggingTransaction, + ) -> int: + sql = """ + SELECT name FROM users + WHERE deactivated = ? and name > ? + ORDER BY name ASC + LIMIT ? + """ + + txn.execute(sql, (1, last_user, batch_size)) + users = [row[0] for row in txn] + + for user in users: + self._purge_account_data_for_user_txn(txn, user_id=user) + + if users: + self.db_pool.updates._background_update_progress_txn( + txn, + "delete_account_data_for_deactivated_users", + {"last_user": users[-1]}, + ) + + return len(users) + + number_deleted = await self.db_pool.runInteraction( + "_delete_account_data_for_deactivated_users", + _delete_account_data_for_deactivated_users_txn, ) + if number_deleted < batch_size: + await self.db_pool.updates._end_background_update( + "delete_account_data_for_deactivated_users" + ) + + return number_deleted + class AccountDataStore(AccountDataWorkerStore): pass diff --git a/synapse/storage/schema/main/delta/68/03_delete_account_data_for_deactivated_accounts.sql b/synapse/storage/schema/main/delta/68/03_delete_account_data_for_deactivated_accounts.sql new file mode 100644 index 0000000000..e124933843 --- /dev/null +++ b/synapse/storage/schema/main/delta/68/03_delete_account_data_for_deactivated_accounts.sql @@ -0,0 +1,20 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- We want to retroactively delete account data for users that were already +-- deactivated. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6803, 'delete_account_data_for_deactivated_users', '{}'); diff --git a/tests/handlers/test_deactivate_account.py b/tests/handlers/test_deactivate_account.py index 3da597768c..01096a1581 100644 --- a/tests/handlers/test_deactivate_account.py +++ b/tests/handlers/test_deactivate_account.py @@ -217,3 +217,109 @@ class DeactivateAccountTestCase(HomeserverTestCase): self.assertEqual( self.get_success(self._store.ignored_by("@sheltie:test")), set() ) + + def _rerun_retroactive_account_data_deletion_update(self) -> None: + # Reset the 'all done' flag + self._store.db_pool.updates._all_done = False + + self.get_success( + self._store.db_pool.simple_insert( + "background_updates", + { + "update_name": "delete_account_data_for_deactivated_users", + "progress_json": "{}", + }, + ) + ) + + self.wait_for_background_updates() + + def test_account_data_deleted_retroactively_by_background_update_if_deactivated( + self, + ) -> None: + """ + Tests that a user, who deactivated their account before account data was + deleted automatically upon deactivation, has their account data retroactively + scrubbed by the background update. + """ + + # Request the deactivation of our account + self._deactivate_my_account() + + # Add some account data + # (we do this after the deactivation so that the act of deactivating doesn't + # clear it out. This emulates a user that was deactivated before this was cleared + # upon deactivation.) + self.get_success( + self._store.add_account_data_for_user( + self.user, + AccountDataTypes.DIRECT, + {"@someone:remote": ["!somewhere:remote"]}, + ) + ) + + # Check that the account data is there. + self.assertIsNotNone( + self.get_success( + self._store.get_global_account_data_by_type_for_user( + self.user, + AccountDataTypes.DIRECT, + ) + ), + ) + + # Re-run the retroactive deletion update + self._rerun_retroactive_account_data_deletion_update() + + # Check that the account data was cleared. + self.assertIsNone( + self.get_success( + self._store.get_global_account_data_by_type_for_user( + self.user, + AccountDataTypes.DIRECT, + ) + ), + ) + + def test_account_data_preserved_by_background_update_if_not_deactivated( + self, + ) -> None: + """ + Tests that the background update does not scrub account data for users that have + not been deactivated. + """ + + # Add some account data + # (we do this after the deactivation so that the act of deactivating doesn't + # clear it out. This emulates a user that was deactivated before this was cleared + # upon deactivation.) + self.get_success( + self._store.add_account_data_for_user( + self.user, + AccountDataTypes.DIRECT, + {"@someone:remote": ["!somewhere:remote"]}, + ) + ) + + # Check that the account data is there. + self.assertIsNotNone( + self.get_success( + self._store.get_global_account_data_by_type_for_user( + self.user, + AccountDataTypes.DIRECT, + ) + ), + ) + + # Re-run the retroactive deletion update + self._rerun_retroactive_account_data_deletion_update() + + # Check that the account data was NOT cleared. + self.assertIsNotNone( + self.get_success( + self._store.get_global_account_data_by_type_for_user( + self.user, + AccountDataTypes.DIRECT, + ) + ), + ) -- cgit 1.5.1 From 31b554c2976612ce5fd983517615906261c39cea Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 2 Feb 2022 22:41:57 +0000 Subject: Fixes for opentracing scopes (#11869) `start_active_span` was inconsistent as to whether it would activate the span immediately, or wait for `scope.__enter__` to happen (it depended on whether the current logcontext already had an associated scope). The inconsistency was rather confusing if you were hoping to set up a couple of separate spans before activating either. Looking at the other implementations of opentracing `ScopeManager`s, the intention is that it *should* be activated immediately, as the name implies. Indeed, the idea is that you don't have to use the scope as a contextmanager at all - you can just call `.close` on the result. Hence, our cleanup has to happen in `.close` rather than `.__exit__`. So, the main change here is to ensure that `start_active_span` does activate the span, and that `scope.close()` does close the scope. We also add some tests, which requires a `tracer` param so that we don't have to rely on the global variable in unit tests. --- changelog.d/11869.misc | 1 + synapse/logging/opentracing.py | 29 ++++-- synapse/logging/scopecontextmanager.py | 76 ++++++++------ tests/logging/test_opentracing.py | 184 +++++++++++++++++++++++++++++++++ 4 files changed, 255 insertions(+), 35 deletions(-) create mode 100644 changelog.d/11869.misc create mode 100644 tests/logging/test_opentracing.py (limited to 'tests') diff --git a/changelog.d/11869.misc b/changelog.d/11869.misc new file mode 100644 index 0000000000..054fbf6101 --- /dev/null +++ b/changelog.d/11869.misc @@ -0,0 +1 @@ +Ensure that `opentracing` scopes are activated and closed at the right time. diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index b240d2d21d..d25f25ecb5 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -443,10 +443,14 @@ def start_active_span( start_time=None, ignore_active_span=False, finish_on_close=True, + *, + tracer=None, ): - """Starts an active opentracing span. Note, the scope doesn't become active - until it has been entered, however, the span starts from the time this - message is called. + """Starts an active opentracing span. + + Records the start time for the span, and sets it as the "active span" in the + scope manager. + Args: See opentracing.tracer Returns: @@ -456,7 +460,11 @@ def start_active_span( if opentracing is None: return noop_context_manager() # type: ignore[unreachable] - return opentracing.tracer.start_active_span( + if tracer is None: + # use the global tracer by default + tracer = opentracing.tracer + + return tracer.start_active_span( operation_name, child_of=child_of, references=references, @@ -468,7 +476,11 @@ def start_active_span( def start_active_span_follows_from( - operation_name: str, contexts: Collection, inherit_force_tracing=False + operation_name: str, + contexts: Collection, + *, + inherit_force_tracing=False, + tracer=None, ): """Starts an active opentracing span, with additional references to previous spans @@ -477,12 +489,17 @@ def start_active_span_follows_from( contexts: the previous spans to inherit from inherit_force_tracing: if set, and any of the previous contexts have had tracing forced, the new span will also have tracing forced. + tracer: override the opentracing tracer. By default the global tracer is used. """ if opentracing is None: return noop_context_manager() # type: ignore[unreachable] references = [opentracing.follows_from(context) for context in contexts] - scope = start_active_span(operation_name, references=references) + scope = start_active_span( + operation_name, + references=references, + tracer=tracer, + ) if inherit_force_tracing and any( is_context_forced_tracing(ctx) for ctx in contexts diff --git a/synapse/logging/scopecontextmanager.py b/synapse/logging/scopecontextmanager.py index db8ca2c049..d57e7c5324 100644 --- a/synapse/logging/scopecontextmanager.py +++ b/synapse/logging/scopecontextmanager.py @@ -28,8 +28,9 @@ class LogContextScopeManager(ScopeManager): The LogContextScopeManager tracks the active scope in opentracing by using the log contexts which are native to synapse. This is so that the basic opentracing api can be used across twisted defereds. - (I would love to break logcontexts and this into an OS package. but - let's wait for twisted's contexts to be released.) + + It would be nice just to use opentracing's ContextVarsScopeManager, + but currently that doesn't work due to https://twistedmatrix.com/trac/ticket/10301. """ def __init__(self, config): @@ -65,29 +66,45 @@ class LogContextScopeManager(ScopeManager): Scope.close() on the returned instance. """ - enter_logcontext = False ctx = current_context() if not ctx: - # We don't want this scope to affect. logger.error("Tried to activate scope outside of loggingcontext") return Scope(None, span) # type: ignore[arg-type] - elif ctx.scope is not None: - # We want the logging scope to look exactly the same so we give it - # a blank suffix + + if ctx.scope is not None: + # start a new logging context as a child of the existing one. + # Doing so -- rather than updating the existing logcontext -- means that + # creating several concurrent spans under the same logcontext works + # correctly. ctx = nested_logging_context("") enter_logcontext = True + else: + # if there is no span currently associated with the current logcontext, we + # just store the scope in it. + # + # This feels a bit dubious, but it does hack around a problem where a + # span outlasts its parent logcontext (which would otherwise lead to + # "Re-starting finished log context" errors). + enter_logcontext = False scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close) ctx.scope = scope + if enter_logcontext: + ctx.__enter__() + return scope class _LogContextScope(Scope): """ - A custom opentracing scope. The only significant difference is that it will - close the log context it's related to if the logcontext was created specifically - for this scope. + A custom opentracing scope, associated with a LogContext + + * filters out _DefGen_Return exceptions which arise from calling + `defer.returnValue` in Twisted code + + * When the scope is closed, the logcontext's active scope is reset to None. + and - if enter_logcontext was set - the logcontext is finished too. """ def __init__(self, manager, span, logcontext, enter_logcontext, finish_on_close): @@ -101,8 +118,7 @@ class _LogContextScope(Scope): logcontext (LogContext): the logcontext to which this scope is attached. enter_logcontext (Boolean): - if True the logcontext will be entered and exited when the scope - is entered and exited respectively + if True the logcontext will be exited when the scope is finished finish_on_close (Boolean): if True finish the span when the scope is closed """ @@ -111,26 +127,28 @@ class _LogContextScope(Scope): self._finish_on_close = finish_on_close self._enter_logcontext = enter_logcontext - def __enter__(self): - if self._enter_logcontext: - self.logcontext.__enter__() + def __exit__(self, exc_type, value, traceback): + if exc_type == twisted.internet.defer._DefGen_Return: + # filter out defer.returnValue() calls + exc_type = value = traceback = None + super().__exit__(exc_type, value, traceback) - return self - - def __exit__(self, type, value, traceback): - if type == twisted.internet.defer._DefGen_Return: - super().__exit__(None, None, None) - else: - super().__exit__(type, value, traceback) - if self._enter_logcontext: - self.logcontext.__exit__(type, value, traceback) - else: # the logcontext existed before the creation of the scope - self.logcontext.scope = None + def __str__(self): + return f"Scope<{self.span}>" def close(self): - if self.manager.active is not self: - logger.error("Tried to close a non-active scope!") - return + active_scope = self.manager.active + if active_scope is not self: + logger.error( + "Closing scope %s which is not the currently-active one %s", + self, + active_scope, + ) if self._finish_on_close: self.span.finish() + + self.logcontext.scope = None + + if self._enter_logcontext: + self.logcontext.__exit__(None, None, None) diff --git a/tests/logging/test_opentracing.py b/tests/logging/test_opentracing.py new file mode 100644 index 0000000000..e430941d27 --- /dev/null +++ b/tests/logging/test_opentracing.py @@ -0,0 +1,184 @@ +# Copyright 2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer +from twisted.test.proto_helpers import MemoryReactorClock + +from synapse.logging.context import ( + LoggingContext, + make_deferred_yieldable, + run_in_background, +) +from synapse.logging.opentracing import ( + start_active_span, + start_active_span_follows_from, +) +from synapse.util import Clock + +try: + from synapse.logging.scopecontextmanager import LogContextScopeManager +except ImportError: + LogContextScopeManager = None # type: ignore + +try: + import jaeger_client +except ImportError: + jaeger_client = None # type: ignore + +from tests.unittest import TestCase + + +class LogContextScopeManagerTestCase(TestCase): + if LogContextScopeManager is None: + skip = "Requires opentracing" # type: ignore[unreachable] + if jaeger_client is None: + skip = "Requires jaeger_client" # type: ignore[unreachable] + + def setUp(self) -> None: + # since this is a unit test, we don't really want to mess around with the + # global variables that power opentracing. We create our own tracer instance + # and test with it. + + scope_manager = LogContextScopeManager({}) + config = jaeger_client.config.Config( + config={}, service_name="test", scope_manager=scope_manager + ) + + self._reporter = jaeger_client.reporter.InMemoryReporter() + + self._tracer = config.create_tracer( + sampler=jaeger_client.ConstSampler(True), + reporter=self._reporter, + ) + + def test_start_active_span(self) -> None: + # the scope manager assumes a logging context of some sort. + with LoggingContext("root context"): + self.assertIsNone(self._tracer.active_span) + + # start_active_span should start and activate a span. + scope = start_active_span("span", tracer=self._tracer) + span = scope.span + self.assertEqual(self._tracer.active_span, span) + self.assertIsNotNone(span.start_time) + + # entering the context doesn't actually do a whole lot. + with scope as ctx: + self.assertIs(ctx, scope) + self.assertEqual(self._tracer.active_span, span) + + # ... but leaving it unsets the active span, and finishes the span. + self.assertIsNone(self._tracer.active_span) + self.assertIsNotNone(span.end_time) + + # the span should have been reported + self.assertEqual(self._reporter.get_spans(), [span]) + + def test_nested_spans(self) -> None: + """Starting two spans off inside each other should work""" + + with LoggingContext("root context"): + with start_active_span("root span", tracer=self._tracer) as root_scope: + self.assertEqual(self._tracer.active_span, root_scope.span) + + scope1 = start_active_span( + "child1", + tracer=self._tracer, + ) + self.assertEqual( + self._tracer.active_span, scope1.span, "child1 was not activated" + ) + self.assertEqual( + scope1.span.context.parent_id, root_scope.span.context.span_id + ) + + scope2 = start_active_span_follows_from( + "child2", + contexts=(scope1,), + tracer=self._tracer, + ) + self.assertEqual(self._tracer.active_span, scope2.span) + self.assertEqual( + scope2.span.context.parent_id, scope1.span.context.span_id + ) + + with scope1, scope2: + pass + + # the root scope should be restored + self.assertEqual(self._tracer.active_span, root_scope.span) + self.assertIsNotNone(scope2.span.end_time) + self.assertIsNotNone(scope1.span.end_time) + + self.assertIsNone(self._tracer.active_span) + + # the spans should be reported in order of their finishing. + self.assertEqual( + self._reporter.get_spans(), [scope2.span, scope1.span, root_scope.span] + ) + + def test_overlapping_spans(self) -> None: + """Overlapping spans which are not neatly nested should work""" + reactor = MemoryReactorClock() + clock = Clock(reactor) + + scopes = [] + + async def task(i: int): + scope = start_active_span( + f"task{i}", + tracer=self._tracer, + ) + scopes.append(scope) + + self.assertEqual(self._tracer.active_span, scope.span) + await clock.sleep(4) + self.assertEqual(self._tracer.active_span, scope.span) + scope.close() + + async def root(): + with start_active_span("root span", tracer=self._tracer) as root_scope: + self.assertEqual(self._tracer.active_span, root_scope.span) + scopes.append(root_scope) + + d1 = run_in_background(task, 1) + await clock.sleep(2) + d2 = run_in_background(task, 2) + + # because we did run_in_background, the active span should still be the + # root. + self.assertEqual(self._tracer.active_span, root_scope.span) + + await make_deferred_yieldable( + defer.gatherResults([d1, d2], consumeErrors=True) + ) + + self.assertEqual(self._tracer.active_span, root_scope.span) + + with LoggingContext("root context"): + # start the test off + d1 = defer.ensureDeferred(root()) + + # let the tasks complete + reactor.pump((2,) * 8) + + self.successResultOf(d1) + self.assertIsNone(self._tracer.active_span) + + # the spans should be reported in order of their finishing: task 1, task 2, + # root. + self.assertEqual( + self._reporter.get_spans(), + [scopes[1].span, scopes[2].span, scopes[0].span], + ) -- cgit 1.5.1 From 833247553f40d7725f252b081016f8b40a513047 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Thu, 3 Feb 2022 13:09:22 +0000 Subject: Allow specifying the application service-specific `user_id` parameter in the `join` test helper. (#11616) --- changelog.d/11615.misc | 2 +- changelog.d/11616.misc | 1 + tests/rest/client/utils.py | 31 ++++++++++++++++++++++++++----- 3 files changed, 28 insertions(+), 6 deletions(-) create mode 100644 changelog.d/11616.misc (limited to 'tests') diff --git a/changelog.d/11615.misc b/changelog.d/11615.misc index 0aa9536504..bbc551698d 100644 --- a/changelog.d/11615.misc +++ b/changelog.d/11615.misc @@ -1 +1 @@ -Expose the registered device ID from the `register_appservice_user` test helper. \ No newline at end of file +Enhance user registration test helpers to make them more useful for tests involving Application Services and devices. diff --git a/changelog.d/11616.misc b/changelog.d/11616.misc new file mode 100644 index 0000000000..bbc551698d --- /dev/null +++ b/changelog.d/11616.misc @@ -0,0 +1 @@ +Enhance user registration test helpers to make them more useful for tests involving Application Services and devices. diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index 8424383580..1c0cb0cf4f 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -31,6 +31,7 @@ from typing import ( overload, ) from unittest.mock import patch +from urllib.parse import urlencode import attr from typing_extensions import Literal @@ -147,12 +148,20 @@ class RestHelper: expect_code=expect_code, ) - def join(self, room=None, user=None, expect_code=200, tok=None): + def join( + self, + room: str, + user: Optional[str] = None, + expect_code: int = 200, + tok: Optional[str] = None, + appservice_user_id: Optional[str] = None, + ) -> None: self.change_membership( room=room, src=user, targ=user, tok=tok, + appservice_user_id=appservice_user_id, membership=Membership.JOIN, expect_code=expect_code, ) @@ -209,11 +218,12 @@ class RestHelper: def change_membership( self, room: str, - src: str, - targ: str, + src: Optional[str], + targ: Optional[str], membership: str, extra_data: Optional[dict] = None, tok: Optional[str] = None, + appservice_user_id: Optional[str] = None, expect_code: int = 200, expect_errcode: Optional[str] = None, ) -> None: @@ -227,15 +237,26 @@ class RestHelper: membership: The type of membership event extra_data: Extra information to include in the content of the event tok: The user access token to use + appservice_user_id: The `user_id` URL parameter to pass. + This allows driving an application service user + using an application service access token in `tok`. expect_code: The expected HTTP response code expect_errcode: The expected Matrix error code """ temp_id = self.auth_user_id self.auth_user_id = src - path = "/_matrix/client/r0/rooms/%s/state/m.room.member/%s" % (room, targ) + path = f"/_matrix/client/r0/rooms/{room}/state/m.room.member/{targ}" + url_params: Dict[str, str] = {} + if tok: - path = path + "?access_token=%s" % tok + url_params["access_token"] = tok + + if appservice_user_id: + url_params["user_id"] = appservice_user_id + + if url_params: + path += "?" + urlencode(url_params) data = {"membership": membership} data.update(extra_data or {}) -- cgit 1.5.1 From 119edf51eb3e4f5ed5139dc370f5d7aed46edc1c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 3 Feb 2022 13:36:49 -0500 Subject: Remove support for the webclient listener. (#11895) Also remove support for non-HTTP(S) web_client_location. --- changelog.d/11895.removal | 1 + docs/upgrade.md | 13 ++++++ synapse/api/urls.py | 1 - synapse/app/homeserver.py | 34 ++------------ synapse/config/server.py | 48 +++++-------------- tests/http/test_webclient.py | 108 ------------------------------------------- 6 files changed, 29 insertions(+), 176 deletions(-) create mode 100644 changelog.d/11895.removal delete mode 100644 tests/http/test_webclient.py (limited to 'tests') diff --git a/changelog.d/11895.removal b/changelog.d/11895.removal new file mode 100644 index 0000000000..5973d96a33 --- /dev/null +++ b/changelog.d/11895.removal @@ -0,0 +1 @@ +Drop support for `webclient` listeners and configuring `web_client_location` to a non-HTTP(S) URL. Deprecated configurations are a configuration error. diff --git a/docs/upgrade.md b/docs/upgrade.md index f455d257ba..7d582af0a7 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -85,6 +85,19 @@ process, for example: dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb ``` +# Upgrading to v1.53.0 + +## Dropping support for `webclient` listeners and non-HTTP(S) `web_client_location` + +Per the deprecation notice in Synapse v1.51.0, listeners of type `webclient` +are no longer supported and configuring them is a now a configuration error. + +Configuring a non-HTTP(S) `web_client_location` configuration is is now a +configuration error. Since the `webclient` listener is no longer supported, this +setting only applies to the root path `/` of Synapse's web server and no longer +the `/_matrix/client/` path. + + # Upgrading to v1.51.0 ## Deprecation of `webclient` listeners and non-HTTP(S) `web_client_location` diff --git a/synapse/api/urls.py b/synapse/api/urls.py index f9f9467dc1..bd49fa6a5f 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -28,7 +28,6 @@ FEDERATION_V1_PREFIX = FEDERATION_PREFIX + "/v1" FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2" FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable" STATIC_PREFIX = "/_matrix/static" -WEB_CLIENT_PREFIX = "/_matrix/client" SERVER_KEY_V2_PREFIX = "/_matrix/key/v2" MEDIA_R0_PREFIX = "/_matrix/media/r0" MEDIA_V3_PREFIX = "/_matrix/media/v3" diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index efedcc8889..24d55b0494 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -21,7 +21,6 @@ from typing import Dict, Iterable, Iterator, List from twisted.internet.tcp import Port from twisted.web.resource import EncodingResourceWrapper, Resource from twisted.web.server import GzipEncoderFactory -from twisted.web.static import File import synapse import synapse.config.logger @@ -33,7 +32,6 @@ from synapse.api.urls import ( MEDIA_V3_PREFIX, SERVER_KEY_V2_PREFIX, STATIC_PREFIX, - WEB_CLIENT_PREFIX, ) from synapse.app import _base from synapse.app._base import ( @@ -53,7 +51,6 @@ from synapse.http.additional_resource import AdditionalResource from synapse.http.server import ( OptionsResource, RootOptionsRedirectResource, - RootRedirect, StaticResource, ) from synapse.http.site import SynapseSite @@ -134,15 +131,12 @@ class SynapseHomeServer(HomeServer): # Try to find something useful to serve at '/': # # 1. Redirect to the web client if it is an HTTP(S) URL. - # 2. Redirect to the web client served via Synapse. - # 3. Redirect to the static "Synapse is running" page. - # 4. Do not redirect and use a blank resource. - if self.config.server.web_client_location_is_redirect: + # 2. Redirect to the static "Synapse is running" page. + # 3. Do not redirect and use a blank resource. + if self.config.server.web_client_location: root_resource: Resource = RootOptionsRedirectResource( self.config.server.web_client_location ) - elif WEB_CLIENT_PREFIX in resources: - root_resource = RootOptionsRedirectResource(WEB_CLIENT_PREFIX) elif STATIC_PREFIX in resources: root_resource = RootOptionsRedirectResource(STATIC_PREFIX) else: @@ -270,28 +264,6 @@ class SynapseHomeServer(HomeServer): if name in ["keys", "federation"]: resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self) - if name == "webclient": - # webclient listeners are deprecated as of Synapse v1.51.0, remove it - # in > v1.53.0. - webclient_loc = self.config.server.web_client_location - - if webclient_loc is None: - logger.warning( - "Not enabling webclient resource, as web_client_location is unset." - ) - elif self.config.server.web_client_location_is_redirect: - resources[WEB_CLIENT_PREFIX] = RootRedirect(webclient_loc) - else: - logger.warning( - "Running webclient on the same domain is not recommended: " - "https://github.com/matrix-org/synapse#security-note - " - "after you move webclient to different host you can set " - "web_client_location to its full URL to enable redirection." - ) - # GZip is disabled here due to - # https://twistedmatrix.com/trac/ticket/7678 - resources[WEB_CLIENT_PREFIX] = File(webclient_loc) - if name == "metrics" and self.config.metrics.enable_metrics: resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) diff --git a/synapse/config/server.py b/synapse/config/server.py index a0a00a9798..7bc9624546 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -179,7 +179,6 @@ KNOWN_RESOURCES = { "openid", "replication", "static", - "webclient", } @@ -519,16 +518,12 @@ class ServerConfig(Config): self.listeners = l2 self.web_client_location = config.get("web_client_location", None) - self.web_client_location_is_redirect = self.web_client_location and ( + # Non-HTTP(S) web client location is not supported. + if self.web_client_location and not ( self.web_client_location.startswith("http://") or self.web_client_location.startswith("https://") - ) - # A non-HTTP(S) web client location is deprecated. - if self.web_client_location and not self.web_client_location_is_redirect: - logger.warning(NO_MORE_NONE_HTTP_WEB_CLIENT_LOCATION_WARNING) - - # Warn if webclient is configured for a worker. - _warn_if_webclient_configured(self.listeners) + ): + raise ConfigError("web_client_location must point to a HTTP(S) URL.") self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None)) self.gc_seconds = self.read_gc_intervals(config.get("gc_min_interval", None)) @@ -1351,11 +1346,16 @@ def parse_listener_def(listener: Any) -> ListenerConfig: http_config = None if listener_type == "http": + try: + resources = [ + HttpResourceConfig(**res) for res in listener.get("resources", []) + ] + except ValueError as e: + raise ConfigError("Unknown listener resource") from e + http_config = HttpListenerConfig( x_forwarded=listener.get("x_forwarded", False), - resources=[ - HttpResourceConfig(**res) for res in listener.get("resources", []) - ], + resources=resources, additional_resources=listener.get("additional_resources", {}), tag=listener.get("tag"), ) @@ -1363,30 +1363,6 @@ def parse_listener_def(listener: Any) -> ListenerConfig: return ListenerConfig(port, bind_addresses, listener_type, tls, http_config) -NO_MORE_NONE_HTTP_WEB_CLIENT_LOCATION_WARNING = """ -Synapse no longer supports serving a web client. To remove this warning, -configure 'web_client_location' with an HTTP(S) URL. -""" - - -NO_MORE_WEB_CLIENT_WARNING = """ -Synapse no longer includes a web client. To redirect the root resource to a web client, configure -'web_client_location'. To remove this warning, remove 'webclient' from the 'listeners' -configuration. -""" - - -def _warn_if_webclient_configured(listeners: Iterable[ListenerConfig]) -> None: - for listener in listeners: - if not listener.http_options: - continue - for res in listener.http_options.resources: - for name in res.names: - if name == "webclient": - logger.warning(NO_MORE_WEB_CLIENT_WARNING) - return - - _MANHOLE_SETTINGS_SCHEMA = { "type": "object", "properties": { diff --git a/tests/http/test_webclient.py b/tests/http/test_webclient.py deleted file mode 100644 index ee5cf299f6..0000000000 --- a/tests/http/test_webclient.py +++ /dev/null @@ -1,108 +0,0 @@ -# Copyright 2022 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from http import HTTPStatus -from typing import Dict - -from twisted.web.resource import Resource - -from synapse.app.homeserver import SynapseHomeServer -from synapse.config.server import HttpListenerConfig, HttpResourceConfig, ListenerConfig -from synapse.http.site import SynapseSite - -from tests.server import make_request -from tests.unittest import HomeserverTestCase, create_resource_tree, override_config - - -class WebClientTests(HomeserverTestCase): - @override_config( - { - "web_client_location": "https://example.org", - } - ) - def test_webclient_resolves_with_client_resource(self): - """ - Tests that both client and webclient resources can be accessed simultaneously. - - This is a regression test created in response to https://github.com/matrix-org/synapse/issues/11763. - """ - for resource_name_order_list in [ - ["webclient", "client"], - ["client", "webclient"], - ]: - # Create a dictionary from path regex -> resource - resource_dict: Dict[str, Resource] = {} - - for resource_name in resource_name_order_list: - resource_dict.update( - SynapseHomeServer._configure_named_resource(self.hs, resource_name) - ) - - # Create a root resource which ties the above resources together into one - root_resource = Resource() - create_resource_tree(resource_dict, root_resource) - - # Create a site configured with this resource to make HTTP requests against - listener_config = ListenerConfig( - port=8008, - bind_addresses=["127.0.0.1"], - type="http", - http_options=HttpListenerConfig( - resources=[HttpResourceConfig(names=resource_name_order_list)] - ), - ) - test_site = SynapseSite( - logger_name="synapse.access.http.fake", - site_tag=self.hs.config.server.server_name, - config=listener_config, - resource=root_resource, - server_version_string="1", - max_request_body_size=1234, - reactor=self.reactor, - ) - - # Attempt to make requests to endpoints on both the webclient and client resources - # on test_site. - self._request_client_and_webclient_resources(test_site) - - def _request_client_and_webclient_resources(self, test_site: SynapseSite) -> None: - """Make a request to an endpoint on both the webclient and client-server resources - of the given SynapseSite. - - Args: - test_site: The SynapseSite object to make requests against. - """ - - # Ensure that the *webclient* resource is behaving as expected (we get redirected to - # the configured web_client_location) - channel = make_request( - self.reactor, - site=test_site, - method="GET", - path="/_matrix/client", - ) - # Check that we are being redirected to the webclient location URI. - self.assertEqual(channel.code, HTTPStatus.FOUND) - self.assertEqual( - channel.headers.getRawHeaders("Location"), ["https://example.org"] - ) - - # Ensure that a request to the *client* resource works. - channel = make_request( - self.reactor, - site=test_site, - method="GET", - path="/_matrix/client/v3/login", - ) - self.assertEqual(channel.code, HTTPStatus.OK) - self.assertIn("flows", channel.json_body) -- cgit 1.5.1 From 02632b3504ad4512c5f5a4f859b3fe326b19c788 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Fri, 4 Feb 2022 13:15:13 +0100 Subject: Stabilise MSC3231 (Token Based Registration) (#11867) --- changelog.d/11867.feature | 5 +++++ docs/modules/password_auth_provider_callbacks.md | 2 +- docs/upgrade.md | 15 +++++++++++++++ docs/workers.md | 2 +- synapse/api/constants.py | 2 +- synapse/handlers/ui_auth/__init__.py | 2 +- synapse/rest/client/register.py | 7 +++---- tests/rest/client/test_register.py | 2 +- 8 files changed, 28 insertions(+), 9 deletions(-) create mode 100644 changelog.d/11867.feature (limited to 'tests') diff --git a/changelog.d/11867.feature b/changelog.d/11867.feature new file mode 100644 index 0000000000..dbd9de0e4c --- /dev/null +++ b/changelog.d/11867.feature @@ -0,0 +1,5 @@ +Stabilize [MSC3231](https://github.com/matrix-org/matrix-doc/pull/3231). + +Client implementations using `m.login.registration_token` should switch to the stable identifiers: +* `org.matrix.msc3231.login.registration_token` in query parameters and request/response bodies becomes `m.login.registration_token`. +* `/_matrix/client/unstable/org.matrix.msc3231/register/org.matrix.msc3231.login.registration_token/validity` becomes `/_matrix/client/v1/register/m.login.registration_token/validity`. \ No newline at end of file diff --git a/docs/modules/password_auth_provider_callbacks.md b/docs/modules/password_auth_provider_callbacks.md index ec8324d292..3697e3782e 100644 --- a/docs/modules/password_auth_provider_callbacks.md +++ b/docs/modules/password_auth_provider_callbacks.md @@ -148,7 +148,7 @@ Here's an example featuring all currently supported keys: "address": "33123456789", "validated_at": 1642701357084, }, - "org.matrix.msc3231.login.registration_token": "sometoken", # User has registered through the flow described in MSC3231 + "m.login.registration_token": "sometoken", # User has registered through a registration token } ``` diff --git a/docs/upgrade.md b/docs/upgrade.md index 75febb4adf..8ce37bcdee 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -84,6 +84,21 @@ process, for example: wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb ``` +# Upgrading to v1.(next) + +## Stablisation of MSC3231 + +The unstable validity-check endpoint for the +[Registration Tokens](https://spec.matrix.org/v1.2/client-server-api/#get_matrixclientv1registermloginregistration_tokenvalidity) +feature has been stabilised and moved from: + +`/_matrix/client/unstable/org.matrix.msc3231/register/org.matrix.msc3231.login.registration_token/validity` + +to: + +`/_matrix/client/v1/register/m.login.registration_token/validity` + +Please update any relevant reverse proxy or firewall configurations appropriately. # Upgrading to v1.53.0 diff --git a/docs/workers.md b/docs/workers.md index fd83e2ddeb..dadde4d726 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -241,7 +241,7 @@ expressions: # Registration/login requests ^/_matrix/client/(api/v1|r0|v3|unstable)/login$ ^/_matrix/client/(r0|v3|unstable)/register$ - ^/_matrix/client/unstable/org.matrix.msc3231/register/org.matrix.msc3231.login.registration_token/validity$ + ^/_matrix/client/v1/register/m.login.registration_token/validity$ # Event sending requests ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 52c083a20b..36ace7c613 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -81,7 +81,7 @@ class LoginType: TERMS: Final = "m.login.terms" SSO: Final = "m.login.sso" DUMMY: Final = "m.login.dummy" - REGISTRATION_TOKEN: Final = "org.matrix.msc3231.login.registration_token" + REGISTRATION_TOKEN: Final = "m.login.registration_token" # This is used in the `type` parameter for /register when called by diff --git a/synapse/handlers/ui_auth/__init__.py b/synapse/handlers/ui_auth/__init__.py index 13b0c61d2e..56eee4057f 100644 --- a/synapse/handlers/ui_auth/__init__.py +++ b/synapse/handlers/ui_auth/__init__.py @@ -38,4 +38,4 @@ class UIAuthSessionDataConstants: # used during registration to store the registration token used (if required) so that: # - we can prevent a token being used twice by one session # - we can 'use up' the token after registration has successfully completed - REGISTRATION_TOKEN = "org.matrix.msc3231.login.registration_token" + REGISTRATION_TOKEN = "m.login.registration_token" diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py index e3492f9f93..c283313e8d 100644 --- a/synapse/rest/client/register.py +++ b/synapse/rest/client/register.py @@ -368,7 +368,7 @@ class RegistrationTokenValidityRestServlet(RestServlet): Example: - GET /_matrix/client/unstable/org.matrix.msc3231/register/org.matrix.msc3231.login.registration_token/validity?token=abcd + GET /_matrix/client/v1/register/m.login.registration_token/validity?token=abcd 200 OK @@ -378,9 +378,8 @@ class RegistrationTokenValidityRestServlet(RestServlet): """ PATTERNS = client_patterns( - f"/org.matrix.msc3231/register/{LoginType.REGISTRATION_TOKEN}/validity", - releases=(), - unstable=True, + f"/register/{LoginType.REGISTRATION_TOKEN}/validity", + releases=("v1",), ) def __init__(self, hs: "HomeServer"): diff --git a/tests/rest/client/test_register.py b/tests/rest/client/test_register.py index 407dd32a73..0f1c47dcbb 100644 --- a/tests/rest/client/test_register.py +++ b/tests/rest/client/test_register.py @@ -1154,7 +1154,7 @@ class AccountValidityBackgroundJobTestCase(unittest.HomeserverTestCase): class RegistrationTokenValidityRestServletTestCase(unittest.HomeserverTestCase): servlets = [register.register_servlets] - url = "/_matrix/client/unstable/org.matrix.msc3231/register/org.matrix.msc3231.login.registration_token/validity" + url = "/_matrix/client/v1/register/m.login.registration_token/validity" def default_config(self): config = super().default_config() -- cgit 1.5.1 From 0c4878caf2ba6d7e67f373b8ef80a445920f0dcd Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Mon, 7 Feb 2022 13:21:19 +0000 Subject: Add a unit test for users receiving their own device list updates (#11909) --- changelog.d/11909.misc | 1 + tests/rest/client/test_sync.py | 57 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 changelog.d/11909.misc (limited to 'tests') diff --git a/changelog.d/11909.misc b/changelog.d/11909.misc new file mode 100644 index 0000000000..ffd3e5c639 --- /dev/null +++ b/changelog.d/11909.misc @@ -0,0 +1 @@ +Add a test that checks users receive their own device list updates down `/sync`. \ No newline at end of file diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index c427686376..cd4af2b1f3 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -23,7 +23,7 @@ from synapse.api.constants import ( ReadReceiptEventFields, RelationTypes, ) -from synapse.rest.client import knock, login, read_marker, receipts, room, sync +from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync from tests import unittest from tests.federation.transport.test_knocking import ( @@ -710,3 +710,58 @@ class SyncCacheTestCase(unittest.HomeserverTestCase): channel.await_result(timeout_ms=9900) channel.await_result(timeout_ms=200) self.assertEqual(channel.code, 200, channel.json_body) + + +class DeviceListSyncTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + sync.register_servlets, + devices.register_servlets, + ] + + def test_user_with_no_rooms_receives_self_device_list_updates(self): + """Tests that a user with no rooms still receives their own device list updates""" + device_id = "TESTDEVICE" + + # Register a user and login, creating a device + self.user_id = self.register_user("kermit", "monkey") + self.tok = self.login("kermit", "monkey", device_id=device_id) + + # Request an initial sync + channel = self.make_request("GET", "/sync", access_token=self.tok) + self.assertEqual(channel.code, 200, channel.json_body) + next_batch = channel.json_body["next_batch"] + + # Now, make an incremental sync request. + # It won't return until something has happened + incremental_sync_channel = self.make_request( + "GET", + f"/sync?since={next_batch}&timeout=30000", + access_token=self.tok, + await_result=False, + ) + + # Change our device's display name + channel = self.make_request( + "PUT", + f"devices/{device_id}", + { + "display_name": "freeze ray", + }, + access_token=self.tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # The sync should now have returned + incremental_sync_channel.await_result(timeout_ms=20000) + self.assertEqual(incremental_sync_channel.code, 200, channel.json_body) + + # We should have received notification that the (user's) device has changed + device_list_changes = incremental_sync_channel.json_body.get( + "device_lists", {} + ).get("changed", []) + + self.assertIn( + self.user_id, device_list_changes, incremental_sync_channel.json_body + ) -- cgit 1.5.1 From cf06783d54b2b9090aef595a9094ddd857c1155b Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Mon, 7 Feb 2022 18:26:42 +0000 Subject: Remove optional state of `ApplicationService.is_interested`'s `store` parameter (#11911) --- changelog.d/11911.misc | 1 + synapse/appservice/__init__.py | 23 +++++----------------- synapse/handlers/appservice.py | 2 +- tests/appservice/test_appservice.py | 38 +++++++++++++++++++++++++++++++------ 4 files changed, 39 insertions(+), 25 deletions(-) create mode 100644 changelog.d/11911.misc (limited to 'tests') diff --git a/changelog.d/11911.misc b/changelog.d/11911.misc new file mode 100644 index 0000000000..805588c2e9 --- /dev/null +++ b/changelog.d/11911.misc @@ -0,0 +1 @@ +Various refactors to the application service notifier code. \ No newline at end of file diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 7dbebd97b5..a340a8c9c7 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -165,23 +165,16 @@ class ApplicationService: return namespace.exclusive return False - async def _matches_user( - self, event: Optional[EventBase], store: Optional["DataStore"] = None - ) -> bool: - if not event: - return False - + async def _matches_user(self, event: EventBase, store: "DataStore") -> bool: if self.is_interested_in_user(event.sender): return True + # also check m.room.member state key if event.type == EventTypes.Member and self.is_interested_in_user( event.state_key ): return True - if not store: - return False - does_match = await self.matches_user_in_member_list(event.room_id, store) return does_match @@ -216,21 +209,15 @@ class ApplicationService: return self.is_interested_in_room(event.room_id) return False - async def _matches_aliases( - self, event: EventBase, store: Optional["DataStore"] = None - ) -> bool: - if not store or not event: - return False - + async def _matches_aliases(self, event: EventBase, store: "DataStore") -> bool: alias_list = await store.get_aliases_for_room(event.room_id) for alias in alias_list: if self.is_interested_in_alias(alias): return True + return False - async def is_interested( - self, event: EventBase, store: Optional["DataStore"] = None - ) -> bool: + async def is_interested(self, event: EventBase, store: "DataStore") -> bool: """Check if this service is interested in this event. Args: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 0fb919acf6..a42c3558e4 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -649,7 +649,7 @@ class ApplicationServicesHandler: """Retrieve a list of application services interested in this event. Args: - event: The event to check. Can be None if alias_list is not. + event: The event to check. Returns: A list of services interested in this event based on the service regex. """ diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py index 07d8105f41..9bd6275e92 100644 --- a/tests/appservice/test_appservice.py +++ b/tests/appservice/test_appservice.py @@ -40,13 +40,19 @@ class ApplicationServiceTestCase(unittest.TestCase): ) self.store = Mock() + self.store.get_aliases_for_room = simple_async_mock([]) + self.store.get_users_in_room = simple_async_mock([]) @defer.inlineCallbacks def test_regex_user_id_prefix_match(self): self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*")) self.event.sender = "@irc_foobar:matrix.org" self.assertTrue( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks @@ -54,7 +60,11 @@ class ApplicationServiceTestCase(unittest.TestCase): self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*")) self.event.sender = "@someone_else:matrix.org" self.assertFalse( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks @@ -64,7 +74,11 @@ class ApplicationServiceTestCase(unittest.TestCase): self.event.type = "m.room.member" self.event.state_key = "@irc_foobar:matrix.org" self.assertTrue( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks @@ -74,7 +88,11 @@ class ApplicationServiceTestCase(unittest.TestCase): ) self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org" self.assertTrue( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks @@ -84,7 +102,11 @@ class ApplicationServiceTestCase(unittest.TestCase): ) self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org" self.assertFalse( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks @@ -183,7 +205,11 @@ class ApplicationServiceTestCase(unittest.TestCase): self.event.content = {"membership": "invite"} self.event.state_key = self.service.sender self.assertTrue( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks -- cgit 1.5.1 From 0640f8ebaa34e10a69ad7481b738ae36fda1c103 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 8 Feb 2022 11:20:32 +0100 Subject: Add a callback to allow modules to deny 3PID (#11854) Part of the Tchap Synapse mainlining. This allows modules to implement extra logic to figure out whether a given 3PID can be added to the local homeserver. In the Tchap use case, this will allow a Synapse module to interface with the custom endpoint /internal_info. --- changelog.d/11854.feature | 1 + docs/modules/password_auth_provider_callbacks.md | 19 ++++++ synapse/handlers/auth.py | 44 ++++++++++++++ synapse/module_api/__init__.py | 3 + synapse/rest/client/account.py | 4 +- synapse/rest/client/register.py | 8 ++- synapse/util/threepids.py | 13 +++- tests/handlers/test_password_providers.py | 76 +++++++++++++++++++++++- 8 files changed, 161 insertions(+), 7 deletions(-) create mode 100644 changelog.d/11854.feature (limited to 'tests') diff --git a/changelog.d/11854.feature b/changelog.d/11854.feature new file mode 100644 index 0000000000..975e95bc52 --- /dev/null +++ b/changelog.d/11854.feature @@ -0,0 +1 @@ +Add a callback to allow modules to allow or forbid a 3PID (email address, phone number) from being associated to a local account. diff --git a/docs/modules/password_auth_provider_callbacks.md b/docs/modules/password_auth_provider_callbacks.md index 3697e3782e..88b59bb09e 100644 --- a/docs/modules/password_auth_provider_callbacks.md +++ b/docs/modules/password_auth_provider_callbacks.md @@ -166,6 +166,25 @@ any of the subsequent implementations of this callback. If every callback return the username provided by the user is used, if any (otherwise one is automatically generated). +## `is_3pid_allowed` + +_First introduced in Synapse v1.53.0_ + +```python +async def is_3pid_allowed(self, medium: str, address: str, registration: bool) -> bool +``` + +Called when attempting to bind a third-party identifier (i.e. an email address or a phone +number). The module is given the medium of the third-party identifier (which is `email` if +the identifier is an email address, or `msisdn` if the identifier is a phone number) and +its address, as well as a boolean indicating whether the attempt to bind is happening as +part of registering a new user. The module must return a boolean indicating whether the +identifier can be allowed to be bound to an account on the local homeserver. + +If multiple modules implement this callback, they will be considered in order. If a +callback returns `True`, Synapse falls through to the next one. The value of the first +callback that does not return `True` will be used. If this happens, Synapse will not call +any of the subsequent implementations of this callback. ## Example diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index e32c93e234..6959d1aa7e 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -2064,6 +2064,7 @@ GET_USERNAME_FOR_REGISTRATION_CALLBACK = Callable[ [JsonDict, JsonDict], Awaitable[Optional[str]], ] +IS_3PID_ALLOWED_CALLBACK = Callable[[str, str, bool], Awaitable[bool]] class PasswordAuthProvider: @@ -2079,6 +2080,7 @@ class PasswordAuthProvider: self.get_username_for_registration_callbacks: List[ GET_USERNAME_FOR_REGISTRATION_CALLBACK ] = [] + self.is_3pid_allowed_callbacks: List[IS_3PID_ALLOWED_CALLBACK] = [] # Mapping from login type to login parameters self._supported_login_types: Dict[str, Iterable[str]] = {} @@ -2090,6 +2092,7 @@ class PasswordAuthProvider: self, check_3pid_auth: Optional[CHECK_3PID_AUTH_CALLBACK] = None, on_logged_out: Optional[ON_LOGGED_OUT_CALLBACK] = None, + is_3pid_allowed: Optional[IS_3PID_ALLOWED_CALLBACK] = None, auth_checkers: Optional[ Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK] ] = None, @@ -2145,6 +2148,9 @@ class PasswordAuthProvider: get_username_for_registration, ) + if is_3pid_allowed is not None: + self.is_3pid_allowed_callbacks.append(is_3pid_allowed) + def get_supported_login_types(self) -> Mapping[str, Iterable[str]]: """Get the login types supported by this password provider @@ -2343,3 +2349,41 @@ class PasswordAuthProvider: raise SynapseError(code=500, msg="Internal Server Error") return None + + async def is_3pid_allowed( + self, + medium: str, + address: str, + registration: bool, + ) -> bool: + """Check if the user can be allowed to bind a 3PID on this homeserver. + + Args: + medium: The medium of the 3PID. + address: The address of the 3PID. + registration: Whether the 3PID is being bound when registering a new user. + + Returns: + Whether the 3PID is allowed to be bound on this homeserver + """ + for callback in self.is_3pid_allowed_callbacks: + try: + res = await callback(medium, address, registration) + + if res is False: + return res + elif not isinstance(res, bool): + # mypy complains that this line is unreachable because it assumes the + # data returned by the module fits the expected type. We just want + # to make sure this is the case. + logger.warning( # type: ignore[unreachable] + "Ignoring non-string value returned by" + " is_3pid_allowed callback %s: %s", + callback, + res, + ) + except Exception as e: + logger.error("Module raised an exception in is_3pid_allowed: %s", e) + raise SynapseError(code=500, msg="Internal Server Error") + + return True diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 29fbc73c97..a91a7fa3ce 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -72,6 +72,7 @@ from synapse.handlers.auth import ( CHECK_3PID_AUTH_CALLBACK, CHECK_AUTH_CALLBACK, GET_USERNAME_FOR_REGISTRATION_CALLBACK, + IS_3PID_ALLOWED_CALLBACK, ON_LOGGED_OUT_CALLBACK, AuthHandler, ) @@ -312,6 +313,7 @@ class ModuleApi: auth_checkers: Optional[ Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK] ] = None, + is_3pid_allowed: Optional[IS_3PID_ALLOWED_CALLBACK] = None, get_username_for_registration: Optional[ GET_USERNAME_FOR_REGISTRATION_CALLBACK ] = None, @@ -323,6 +325,7 @@ class ModuleApi: return self._password_auth_provider.register_password_auth_provider_callbacks( check_3pid_auth=check_3pid_auth, on_logged_out=on_logged_out, + is_3pid_allowed=is_3pid_allowed, auth_checkers=auth_checkers, get_username_for_registration=get_username_for_registration, ) diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py index 6b272658fc..cfa2aee76d 100644 --- a/synapse/rest/client/account.py +++ b/synapse/rest/client/account.py @@ -385,7 +385,7 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): send_attempt = body["send_attempt"] next_link = body.get("next_link") # Optional param - if not check_3pid_allowed(self.hs, "email", email): + if not await check_3pid_allowed(self.hs, "email", email): raise SynapseError( 403, "Your email domain is not authorized on this server", @@ -468,7 +468,7 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet): msisdn = phone_number_to_msisdn(country, phone_number) - if not check_3pid_allowed(self.hs, "msisdn", msisdn): + if not await check_3pid_allowed(self.hs, "msisdn", msisdn): raise SynapseError( 403, "Account phone numbers are not authorized on this server", diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py index c283313e8d..c965e2bda2 100644 --- a/synapse/rest/client/register.py +++ b/synapse/rest/client/register.py @@ -112,7 +112,7 @@ class EmailRegisterRequestTokenRestServlet(RestServlet): send_attempt = body["send_attempt"] next_link = body.get("next_link") # Optional param - if not check_3pid_allowed(self.hs, "email", email): + if not await check_3pid_allowed(self.hs, "email", email, registration=True): raise SynapseError( 403, "Your email domain is not authorized to register on this server", @@ -192,7 +192,7 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet): msisdn = phone_number_to_msisdn(country, phone_number) - if not check_3pid_allowed(self.hs, "msisdn", msisdn): + if not await check_3pid_allowed(self.hs, "msisdn", msisdn, registration=True): raise SynapseError( 403, "Phone numbers are not authorized to register on this server", @@ -616,7 +616,9 @@ class RegisterRestServlet(RestServlet): medium = auth_result[login_type]["medium"] address = auth_result[login_type]["address"] - if not check_3pid_allowed(self.hs, medium, address): + if not await check_3pid_allowed( + self.hs, medium, address, registration=True + ): raise SynapseError( 403, "Third party identifiers (email/phone numbers)" diff --git a/synapse/util/threepids.py b/synapse/util/threepids.py index 389adf00f6..1e9c2faa64 100644 --- a/synapse/util/threepids.py +++ b/synapse/util/threepids.py @@ -32,7 +32,12 @@ logger = logging.getLogger(__name__) MAX_EMAIL_ADDRESS_LENGTH = 500 -def check_3pid_allowed(hs: "HomeServer", medium: str, address: str) -> bool: +async def check_3pid_allowed( + hs: "HomeServer", + medium: str, + address: str, + registration: bool = False, +) -> bool: """Checks whether a given format of 3PID is allowed to be used on this HS Args: @@ -40,9 +45,15 @@ def check_3pid_allowed(hs: "HomeServer", medium: str, address: str) -> bool: medium: 3pid medium - e.g. email, msisdn address: address within that medium (e.g. "wotan@matrix.org") msisdns need to first have been canonicalised + registration: whether we want to bind the 3PID as part of registering a new user. + Returns: bool: whether the 3PID medium/address is allowed to be added to this HS """ + if not await hs.get_password_auth_provider().is_3pid_allowed( + medium, address, registration + ): + return False if hs.config.registration.allowed_local_3pids: for constraint in hs.config.registration.allowed_local_3pids: diff --git a/tests/handlers/test_password_providers.py b/tests/handlers/test_password_providers.py index 94809cb8be..4740dd0a65 100644 --- a/tests/handlers/test_password_providers.py +++ b/tests/handlers/test_password_providers.py @@ -21,13 +21,15 @@ from twisted.internet import defer import synapse from synapse.api.constants import LoginType +from synapse.api.errors import Codes from synapse.handlers.auth import load_legacy_password_auth_providers from synapse.module_api import ModuleApi -from synapse.rest.client import devices, login, logout, register +from synapse.rest.client import account, devices, login, logout, register from synapse.types import JsonDict, UserID from tests import unittest from tests.server import FakeChannel +from tests.test_utils import make_awaitable from tests.unittest import override_config # (possibly experimental) login flows we expect to appear in the list after the normal @@ -158,6 +160,7 @@ class PasswordAuthProviderTests(unittest.HomeserverTestCase): devices.register_servlets, logout.register_servlets, register.register_servlets, + account.register_servlets, ] def setUp(self): @@ -803,6 +806,77 @@ class PasswordAuthProviderTests(unittest.HomeserverTestCase): # Check that the callback has been called. m.assert_called_once() + # Set some email configuration so the test doesn't fail because of its absence. + @override_config({"email": {"notif_from": "noreply@test"}}) + def test_3pid_allowed(self): + """Tests that an is_3pid_allowed_callbacks forbidding a 3PID makes Synapse refuse + to bind the new 3PID, and that one allowing a 3PID makes Synapse accept to bind + the 3PID. Also checks that the module is passed a boolean indicating whether the + user to bind this 3PID to is currently registering. + """ + self._test_3pid_allowed("rin", False) + self._test_3pid_allowed("kitay", True) + + def _test_3pid_allowed(self, username: str, registration: bool): + """Tests that the "is_3pid_allowed" module callback is called correctly, using + either /register or /account URLs depending on the arguments. + + Args: + username: The username to use for the test. + registration: Whether to test with registration URLs. + """ + self.hs.get_identity_handler().send_threepid_validation = Mock( + return_value=make_awaitable(0), + ) + + m = Mock(return_value=make_awaitable(False)) + self.hs.get_password_auth_provider().is_3pid_allowed_callbacks = [m] + + self.register_user(username, "password") + tok = self.login(username, "password") + + if registration: + url = "/register/email/requestToken" + else: + url = "/account/3pid/email/requestToken" + + channel = self.make_request( + "POST", + url, + { + "client_secret": "foo", + "email": "foo@test.com", + "send_attempt": 0, + }, + access_token=tok, + ) + self.assertEqual(channel.code, 403, channel.result) + self.assertEqual( + channel.json_body["errcode"], + Codes.THREEPID_DENIED, + channel.json_body, + ) + + m.assert_called_once_with("email", "foo@test.com", registration) + + m = Mock(return_value=make_awaitable(True)) + self.hs.get_password_auth_provider().is_3pid_allowed_callbacks = [m] + + channel = self.make_request( + "POST", + url, + { + "client_secret": "foo", + "email": "bar@test.com", + "send_attempt": 0, + }, + access_token=tok, + ) + self.assertEqual(channel.code, 200, channel.result) + self.assertIn("sid", channel.json_body) + + m.assert_called_once_with("email", "bar@test.com", registration) + def _setup_get_username_for_registration(self) -> Mock: """Registers a get_username_for_registration callback that appends "-foo" to the username the client is trying to register. -- cgit 1.5.1 From 8c94b3abe93fe8c3e2ddd29fa350f54f69714151 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 8 Feb 2022 09:21:20 -0500 Subject: Experimental support to include bundled aggregations in search results (MSC3666) (#11837) --- changelog.d/11837.feature | 1 + synapse/config/experimental.py | 2 ++ synapse/handlers/search.py | 29 +++++++++++++++++---- synapse/storage/databases/main/relations.py | 13 ++++++++-- tests/rest/client/test_relations.py | 39 ++++++++++++++++++++++++++++- 5 files changed, 76 insertions(+), 8 deletions(-) create mode 100644 changelog.d/11837.feature (limited to 'tests') diff --git a/changelog.d/11837.feature b/changelog.d/11837.feature new file mode 100644 index 0000000000..62ef707123 --- /dev/null +++ b/changelog.d/11837.feature @@ -0,0 +1 @@ +Experimental support for [MSC3666](https://github.com/matrix-org/matrix-doc/pull/3666): including bundled aggregations in server side search results. diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index e4719d19b8..f05a803a71 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -26,6 +26,8 @@ class ExperimentalConfig(Config): # MSC3440 (thread relation) self.msc3440_enabled: bool = experimental.get("msc3440_enabled", False) + # MSC3666: including bundled relations in /search. + self.msc3666_enabled: bool = experimental.get("msc3666_enabled", False) # MSC3026 (busy presence state) self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 02bb5ae72f..41cb809078 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -43,6 +43,8 @@ class SearchHandler: self.state_store = self.storage.state self.auth = hs.get_auth() + self._msc3666_enabled = hs.config.experimental.msc3666_enabled + async def get_old_rooms_from_upgraded_room(self, room_id: str) -> Iterable[str]: """Retrieves room IDs of old rooms in the history of an upgraded room. @@ -238,8 +240,6 @@ class SearchHandler: results = search_result["results"] - results_map = {r["event"].event_id: r for r in results} - rank_map.update({r["event"].event_id: r["rank"] for r in results}) filtered_events = await search_filter.filter([r["event"] for r in results]) @@ -420,12 +420,29 @@ class SearchHandler: time_now = self.clock.time_msec() + aggregations = None + if self._msc3666_enabled: + aggregations = await self.store.get_bundled_aggregations( + # Generate an iterable of EventBase for all the events that will be + # returned, including contextual events. + itertools.chain( + # The events_before and events_after for each context. + itertools.chain.from_iterable( + itertools.chain(context["events_before"], context["events_after"]) # type: ignore[arg-type] + for context in contexts.values() + ), + # The returned events. + allowed_events, + ), + user.to_string(), + ) + for context in contexts.values(): context["events_before"] = self._event_serializer.serialize_events( - context["events_before"], time_now # type: ignore[arg-type] + context["events_before"], time_now, bundle_aggregations=aggregations # type: ignore[arg-type] ) context["events_after"] = self._event_serializer.serialize_events( - context["events_after"], time_now # type: ignore[arg-type] + context["events_after"], time_now, bundle_aggregations=aggregations # type: ignore[arg-type] ) state_results = {} @@ -442,7 +459,9 @@ class SearchHandler: results.append( { "rank": rank_map[e.event_id], - "result": self._event_serializer.serialize_event(e, time_now), + "result": self._event_serializer.serialize_event( + e, time_now, bundle_aggregations=aggregations + ), "context": contexts.get(e.event_id, {}), } ) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 6180b17296..7718acbf1c 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -715,6 +715,9 @@ class RelationsWorkerStore(SQLBaseStore): A map of event ID to the bundled aggregation for the event. Not all events may have bundled aggregations in the results. """ + # The already processed event IDs. Tracked separately from the result + # since the result omits events which do not have bundled aggregations. + seen_event_ids = set() # State events and redacted events do not get bundled aggregations. events = [ @@ -728,13 +731,19 @@ class RelationsWorkerStore(SQLBaseStore): # Fetch other relations per event. for event in events: + # De-duplicate events by ID to handle the same event requested multiple + # times. The caches that _get_bundled_aggregation_for_event use should + # capture this, but best to reduce work. + if event.event_id in seen_event_ids: + continue + seen_event_ids.add(event.event_id) + event_result = await self._get_bundled_aggregation_for_event(event, user_id) if event_result: results[event.event_id] = event_result # Fetch any edits. - event_ids = [event.event_id for event in events] - edits = await self._get_applicable_edits(event_ids) + edits = await self._get_applicable_edits(seen_event_ids) for event_id, edit in edits.items(): results.setdefault(event_id, BundledAggregations()).replace = edit diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 96ae7790bb..06721e67c9 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -453,7 +453,9 @@ class RelationsTestCase(unittest.HomeserverTestCase): ) self.assertEquals(400, channel.code, channel.json_body) - @unittest.override_config({"experimental_features": {"msc3440_enabled": True}}) + @unittest.override_config( + {"experimental_features": {"msc3440_enabled": True, "msc3666_enabled": True}} + ) def test_bundled_aggregations(self): """ Test that annotations, references, and threads get correctly bundled. @@ -579,6 +581,23 @@ class RelationsTestCase(unittest.HomeserverTestCase): self.assertTrue(room_timeline["limited"]) assert_bundle(self._find_event_in_chunk(room_timeline["events"])) + # Request search. + channel = self.make_request( + "POST", + "/search", + # Search term matches the parent message. + content={"search_categories": {"room_events": {"search_term": "Hi"}}}, + access_token=self.user_token, + ) + self.assertEquals(200, channel.code, channel.json_body) + chunk = [ + result["result"] + for result in channel.json_body["search_categories"]["room_events"][ + "results" + ] + ] + assert_bundle(self._find_event_in_chunk(chunk)) + def test_aggregation_get_event_for_annotation(self): """Test that annotations do not get bundled aggregations included when directly requested. @@ -759,6 +778,7 @@ class RelationsTestCase(unittest.HomeserverTestCase): self.assertEquals(200, channel.code, channel.json_body) self.assertNotIn("m.relations", channel.json_body["unsigned"]) + @unittest.override_config({"experimental_features": {"msc3666_enabled": True}}) def test_edit(self): """Test that a simple edit works.""" @@ -825,6 +845,23 @@ class RelationsTestCase(unittest.HomeserverTestCase): self.assertTrue(room_timeline["limited"]) assert_bundle(self._find_event_in_chunk(room_timeline["events"])) + # Request search. + channel = self.make_request( + "POST", + "/search", + # Search term matches the parent message. + content={"search_categories": {"room_events": {"search_term": "Hi"}}}, + access_token=self.user_token, + ) + self.assertEquals(200, channel.code, channel.json_body) + chunk = [ + result["result"] + for result in channel.json_body["search_categories"]["room_events"][ + "results" + ] + ] + assert_bundle(self._find_event_in_chunk(chunk)) + def test_multi_edit(self): """Test that multiple edits, including attempts by people who shouldn't be allowed, are correctly handled. -- cgit 1.5.1 From d0e78af35e519ff76bd23e786007f3e7130d90f7 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 8 Feb 2022 11:03:08 -0500 Subject: Add missing type hints to synapse.replication. (#11938) --- changelog.d/11938.misc | 1 + mypy.ini | 3 + .../slave/storage/_slaved_id_tracker.py | 2 +- synapse/replication/slave/storage/client_ips.py | 4 +- synapse/replication/slave/storage/devices.py | 10 ++- synapse/replication/slave/storage/groups.py | 8 ++- synapse/replication/slave/storage/push_rule.py | 7 +- synapse/replication/slave/storage/pushers.py | 6 +- synapse/replication/tcp/client.py | 45 +++++++------ synapse/replication/tcp/commands.py | 74 ++++++++++++++-------- synapse/replication/tcp/handler.py | 36 +++++------ synapse/replication/tcp/protocol.py | 68 ++++++++++---------- synapse/replication/tcp/redis.py | 32 +++++----- synapse/replication/tcp/resource.py | 16 +++-- synapse/replication/tcp/streams/_base.py | 6 +- synapse/replication/tcp/streams/events.py | 23 ++++--- synapse/util/stringutils.py | 5 +- tests/replication/_base.py | 7 +- tests/replication/tcp/test_remote_server_up.py | 3 +- 19 files changed, 209 insertions(+), 147 deletions(-) create mode 100644 changelog.d/11938.misc (limited to 'tests') diff --git a/changelog.d/11938.misc b/changelog.d/11938.misc new file mode 100644 index 0000000000..1d3a0030f7 --- /dev/null +++ b/changelog.d/11938.misc @@ -0,0 +1 @@ +Add missing type hints to replication code. diff --git a/mypy.ini b/mypy.ini index 2884078d0a..cd28ac0dd2 100644 --- a/mypy.ini +++ b/mypy.ini @@ -169,6 +169,9 @@ disallow_untyped_defs = True [mypy-synapse.push.*] disallow_untyped_defs = True +[mypy-synapse.replication.*] +disallow_untyped_defs = True + [mypy-synapse.rest.*] disallow_untyped_defs = True diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py index fa132d10b4..8f3f953ed4 100644 --- a/synapse/replication/slave/storage/_slaved_id_tracker.py +++ b/synapse/replication/slave/storage/_slaved_id_tracker.py @@ -40,7 +40,7 @@ class SlavedIdTracker(AbstractStreamIdTracker): for table, column in extra_tables: self.advance(None, _load_current_id(db_conn, table, column)) - def advance(self, instance_name: Optional[str], new_id: int): + def advance(self, instance_name: Optional[str], new_id: int) -> None: self._current = (max if self.step > 0 else min)(self._current, new_id) def get_current_token(self) -> int: diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py index bc888ce1a8..b5b84c09ae 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py @@ -37,7 +37,9 @@ class SlavedClientIpStore(BaseSlavedStore): cache_name="client_ip_last_seen", max_size=50000 ) - async def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id): + async def insert_client_ip( + self, user_id: str, access_token: str, ip: str, user_agent: str, device_id: str + ) -> None: now = int(self._clock.time_msec()) key = (user_id, access_token, ip) diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index a2aff75b70..0ffd34f1da 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Iterable from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker @@ -60,7 +60,9 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto def get_device_stream_token(self) -> int: return self._device_list_id_gen.get_current_token() - def process_replication_rows(self, stream_name, instance_name, token, rows): + def process_replication_rows( + self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] + ) -> None: if stream_name == DeviceListsStream.NAME: self._device_list_id_gen.advance(instance_name, token) self._invalidate_caches_for_devices(token, rows) @@ -70,7 +72,9 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto self._user_signature_stream_cache.entity_has_changed(row.user_id, token) return super().process_replication_rows(stream_name, instance_name, token, rows) - def _invalidate_caches_for_devices(self, token, rows): + def _invalidate_caches_for_devices( + self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow] + ) -> None: for row in rows: # The entities are either user IDs (starting with '@') whose devices # have changed, or remote servers that we need to tell about diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py index 9d90e26375..d6f37d7479 100644 --- a/synapse/replication/slave/storage/groups.py +++ b/synapse/replication/slave/storage/groups.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Iterable from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker @@ -44,10 +44,12 @@ class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore): self._group_updates_id_gen.get_current_token(), ) - def get_group_stream_token(self): + def get_group_stream_token(self) -> int: return self._group_updates_id_gen.get_current_token() - def process_replication_rows(self, stream_name, instance_name, token, rows): + def process_replication_rows( + self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] + ) -> None: if stream_name == GroupServerStream.NAME: self._group_updates_id_gen.advance(instance_name, token) for row in rows: diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py index 7541e21de9..52ee3f7e58 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import Any, Iterable from synapse.replication.tcp.streams import PushRulesStream from synapse.storage.databases.main.push_rule import PushRulesWorkerStore @@ -20,10 +21,12 @@ from .events import SlavedEventStore class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore): - def get_max_push_rules_stream_id(self): + def get_max_push_rules_stream_id(self) -> int: return self._push_rules_stream_id_gen.get_current_token() - def process_replication_rows(self, stream_name, instance_name, token, rows): + def process_replication_rows( + self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] + ) -> None: if stream_name == PushRulesStream.NAME: self._push_rules_stream_id_gen.advance(instance_name, token) for row in rows: diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py index cea90c0f1b..de642bba71 100644 --- a/synapse/replication/slave/storage/pushers.py +++ b/synapse/replication/slave/storage/pushers.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Iterable from synapse.replication.tcp.streams import PushersStream from synapse.storage.database import DatabasePool, LoggingDatabaseConnection @@ -41,8 +41,8 @@ class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore): return self._pushers_id_gen.get_current_token() def process_replication_rows( - self, stream_name: str, instance_name: str, token, rows + self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] ) -> None: if stream_name == PushersStream.NAME: - self._pushers_id_gen.advance(instance_name, token) # type: ignore + self._pushers_id_gen.advance(instance_name, token) return super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index e29ae1e375..d59ce7ccf9 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -14,10 +14,12 @@ """A replication client for use by synapse workers. """ import logging -from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple from twisted.internet.defer import Deferred +from twisted.internet.interfaces import IAddress, IConnector from twisted.internet.protocol import ReconnectingClientFactory +from twisted.python.failure import Failure from synapse.api.constants import EventTypes from synapse.federation import send_queue @@ -79,10 +81,10 @@ class DirectTcpReplicationClientFactory(ReconnectingClientFactory): hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying) - def startedConnecting(self, connector): + def startedConnecting(self, connector: IConnector) -> None: logger.info("Connecting to replication: %r", connector.getDestination()) - def buildProtocol(self, addr): + def buildProtocol(self, addr: IAddress) -> ClientReplicationStreamProtocol: logger.info("Connected to replication: %r", addr) return ClientReplicationStreamProtocol( self.hs, @@ -92,11 +94,11 @@ class DirectTcpReplicationClientFactory(ReconnectingClientFactory): self.command_handler, ) - def clientConnectionLost(self, connector, reason): + def clientConnectionLost(self, connector: IConnector, reason: Failure) -> None: logger.error("Lost replication conn: %r", reason) ReconnectingClientFactory.clientConnectionLost(self, connector, reason) - def clientConnectionFailed(self, connector, reason): + def clientConnectionFailed(self, connector: IConnector, reason: Failure) -> None: logger.error("Failed to connect to replication: %r", reason) ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) @@ -131,7 +133,7 @@ class ReplicationDataHandler: async def on_rdata( self, stream_name: str, instance_name: str, token: int, rows: list - ): + ) -> None: """Called to handle a batch of replication data with a given stream token. By default this just pokes the slave store. Can be overridden in subclasses to @@ -252,14 +254,16 @@ class ReplicationDataHandler: # loop. (This maintains the order so no need to resort) waiting_list[:] = waiting_list[index_of_first_deferred_not_called:] - async def on_position(self, stream_name: str, instance_name: str, token: int): + async def on_position( + self, stream_name: str, instance_name: str, token: int + ) -> None: await self.on_rdata(stream_name, instance_name, token, []) # We poke the generic "replication" notifier to wake anything up that # may be streaming. self.notifier.notify_replication() - def on_remote_server_up(self, server: str): + def on_remote_server_up(self, server: str) -> None: """Called when get a new REMOTE_SERVER_UP command.""" # Let's wake up the transaction queue for the server in case we have @@ -269,7 +273,7 @@ class ReplicationDataHandler: async def wait_for_stream_position( self, instance_name: str, stream_name: str, position: int - ): + ) -> None: """Wait until this instance has received updates up to and including the given stream position. """ @@ -304,7 +308,7 @@ class ReplicationDataHandler: "Finished waiting for repl stream %r to reach %s", stream_name, position ) - def stop_pusher(self, user_id, app_id, pushkey): + def stop_pusher(self, user_id: str, app_id: str, pushkey: str) -> None: if not self._notify_pushers: return @@ -316,13 +320,13 @@ class ReplicationDataHandler: logger.info("Stopping pusher %r / %r", user_id, key) pusher.on_stop() - async def start_pusher(self, user_id, app_id, pushkey): + async def start_pusher(self, user_id: str, app_id: str, pushkey: str) -> None: if not self._notify_pushers: return key = "%s:%s" % (app_id, pushkey) logger.info("Starting pusher %r / %r", user_id, key) - return await self._pusher_pool.start_pusher_by_id(app_id, pushkey, user_id) + await self._pusher_pool.start_pusher_by_id(app_id, pushkey, user_id) class FederationSenderHandler: @@ -353,10 +357,12 @@ class FederationSenderHandler: self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") - def wake_destination(self, server: str): + def wake_destination(self, server: str) -> None: self.federation_sender.wake_destination(server) - async def process_replication_rows(self, stream_name, token, rows): + async def process_replication_rows( + self, stream_name: str, token: int, rows: list + ) -> None: # The federation stream contains things that we want to send out, e.g. # presence, typing, etc. if stream_name == "federation": @@ -384,11 +390,12 @@ class FederationSenderHandler: for host in hosts: self.federation_sender.send_device_messages(host) - async def _on_new_receipts(self, rows): + async def _on_new_receipts( + self, rows: Iterable[ReceiptsStream.ReceiptsStreamRow] + ) -> None: """ Args: - rows (Iterable[synapse.replication.tcp.streams.ReceiptsStream.ReceiptsStreamRow]): - new receipts to be processed + rows: new receipts to be processed """ for receipt in rows: # we only want to send on receipts for our own users @@ -408,7 +415,7 @@ class FederationSenderHandler: ) await self.federation_sender.send_read_receipt(receipt_info) - async def update_token(self, token): + async def update_token(self, token: int) -> None: """Update the record of where we have processed to in the federation stream. Called after we have processed a an update received over replication. Sends @@ -428,7 +435,7 @@ class FederationSenderHandler: run_as_background_process("_save_and_send_ack", self._save_and_send_ack) - async def _save_and_send_ack(self): + async def _save_and_send_ack(self) -> None: """Save the current federation position in the database and send an ACK to master with where we're up to. """ diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 1311b013da..3654f6c03c 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -18,12 +18,15 @@ allowed to be sent by which side. """ import abc import logging -from typing import Tuple, Type +from typing import Optional, Tuple, Type, TypeVar +from synapse.replication.tcp.streams._base import StreamRow from synapse.util import json_decoder, json_encoder logger = logging.getLogger(__name__) +T = TypeVar("T", bound="Command") + class Command(metaclass=abc.ABCMeta): """The base command class. @@ -38,7 +41,7 @@ class Command(metaclass=abc.ABCMeta): @classmethod @abc.abstractmethod - def from_line(cls, line): + def from_line(cls: Type[T], line: str) -> T: """Deserialises a line from the wire into this command. `line` does not include the command. """ @@ -49,21 +52,24 @@ class Command(metaclass=abc.ABCMeta): prefix. """ - def get_logcontext_id(self): + def get_logcontext_id(self) -> str: """Get a suitable string for the logcontext when processing this command""" # by default, we just use the command name. return self.NAME +SC = TypeVar("SC", bound="_SimpleCommand") + + class _SimpleCommand(Command): """An implementation of Command whose argument is just a 'data' string.""" - def __init__(self, data): + def __init__(self, data: str): self.data = data @classmethod - def from_line(cls, line): + def from_line(cls: Type[SC], line: str) -> SC: return cls(line) def to_line(self) -> str: @@ -109,14 +115,16 @@ class RdataCommand(Command): NAME = "RDATA" - def __init__(self, stream_name, instance_name, token, row): + def __init__( + self, stream_name: str, instance_name: str, token: Optional[int], row: StreamRow + ): self.stream_name = stream_name self.instance_name = instance_name self.token = token self.row = row @classmethod - def from_line(cls, line): + def from_line(cls: Type["RdataCommand"], line: str) -> "RdataCommand": stream_name, instance_name, token, row_json = line.split(" ", 3) return cls( stream_name, @@ -125,7 +133,7 @@ class RdataCommand(Command): json_decoder.decode(row_json), ) - def to_line(self): + def to_line(self) -> str: return " ".join( ( self.stream_name, @@ -135,7 +143,7 @@ class RdataCommand(Command): ) ) - def get_logcontext_id(self): + def get_logcontext_id(self) -> str: return "RDATA-" + self.stream_name @@ -164,18 +172,20 @@ class PositionCommand(Command): NAME = "POSITION" - def __init__(self, stream_name, instance_name, prev_token, new_token): + def __init__( + self, stream_name: str, instance_name: str, prev_token: int, new_token: int + ): self.stream_name = stream_name self.instance_name = instance_name self.prev_token = prev_token self.new_token = new_token @classmethod - def from_line(cls, line): + def from_line(cls: Type["PositionCommand"], line: str) -> "PositionCommand": stream_name, instance_name, prev_token, new_token = line.split(" ", 3) return cls(stream_name, instance_name, int(prev_token), int(new_token)) - def to_line(self): + def to_line(self) -> str: return " ".join( ( self.stream_name, @@ -218,14 +228,14 @@ class ReplicateCommand(Command): NAME = "REPLICATE" - def __init__(self): + def __init__(self) -> None: pass @classmethod - def from_line(cls, line): + def from_line(cls: Type[T], line: str) -> T: return cls() - def to_line(self): + def to_line(self) -> str: return "" @@ -247,14 +257,16 @@ class UserSyncCommand(Command): NAME = "USER_SYNC" - def __init__(self, instance_id, user_id, is_syncing, last_sync_ms): + def __init__( + self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int + ): self.instance_id = instance_id self.user_id = user_id self.is_syncing = is_syncing self.last_sync_ms = last_sync_ms @classmethod - def from_line(cls, line): + def from_line(cls: Type["UserSyncCommand"], line: str) -> "UserSyncCommand": instance_id, user_id, state, last_sync_ms = line.split(" ", 3) if state not in ("start", "end"): @@ -262,7 +274,7 @@ class UserSyncCommand(Command): return cls(instance_id, user_id, state == "start", int(last_sync_ms)) - def to_line(self): + def to_line(self) -> str: return " ".join( ( self.instance_id, @@ -286,14 +298,16 @@ class ClearUserSyncsCommand(Command): NAME = "CLEAR_USER_SYNC" - def __init__(self, instance_id): + def __init__(self, instance_id: str): self.instance_id = instance_id @classmethod - def from_line(cls, line): + def from_line( + cls: Type["ClearUserSyncsCommand"], line: str + ) -> "ClearUserSyncsCommand": return cls(line) - def to_line(self): + def to_line(self) -> str: return self.instance_id @@ -316,7 +330,9 @@ class FederationAckCommand(Command): self.token = token @classmethod - def from_line(cls, line: str) -> "FederationAckCommand": + def from_line( + cls: Type["FederationAckCommand"], line: str + ) -> "FederationAckCommand": instance_name, token = line.split(" ") return cls(instance_name, int(token)) @@ -334,7 +350,15 @@ class UserIpCommand(Command): NAME = "USER_IP" - def __init__(self, user_id, access_token, ip, user_agent, device_id, last_seen): + def __init__( + self, + user_id: str, + access_token: str, + ip: str, + user_agent: str, + device_id: str, + last_seen: int, + ): self.user_id = user_id self.access_token = access_token self.ip = ip @@ -343,14 +367,14 @@ class UserIpCommand(Command): self.last_seen = last_seen @classmethod - def from_line(cls, line): + def from_line(cls: Type["UserIpCommand"], line: str) -> "UserIpCommand": user_id, jsn = line.split(" ", 1) access_token, ip, user_agent, device_id, last_seen = json_decoder.decode(jsn) return cls(user_id, access_token, ip, user_agent, device_id, last_seen) - def to_line(self): + def to_line(self) -> str: return ( self.user_id + " " diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index f7e6bc1e62..17e1572393 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -261,7 +261,7 @@ class ReplicationCommandHandler: "process-replication-data", self._unsafe_process_queue, stream_name ) - async def _unsafe_process_queue(self, stream_name: str): + async def _unsafe_process_queue(self, stream_name: str) -> None: """Processes the command queue for the given stream, until it is empty Does not check if there is already a thread processing the queue, hence "unsafe" @@ -294,7 +294,7 @@ class ReplicationCommandHandler: # This shouldn't be possible raise Exception("Unrecognised command %s in stream queue", cmd.NAME) - def start_replication(self, hs: "HomeServer"): + def start_replication(self, hs: "HomeServer") -> None: """Helper method to start a replication connection to the remote server using TCP. """ @@ -345,10 +345,10 @@ class ReplicationCommandHandler: """Get a list of streams that this instances replicates.""" return self._streams_to_replicate - def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand): + def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None: self.send_positions_to_connection(conn) - def send_positions_to_connection(self, conn: IReplicationConnection): + def send_positions_to_connection(self, conn: IReplicationConnection) -> None: """Send current position of all streams this process is source of to the connection. """ @@ -392,7 +392,7 @@ class ReplicationCommandHandler: def on_FEDERATION_ACK( self, conn: IReplicationConnection, cmd: FederationAckCommand - ): + ) -> None: federation_ack_counter.inc() if self._federation_sender: @@ -408,7 +408,7 @@ class ReplicationCommandHandler: else: return None - async def _handle_user_ip(self, cmd: UserIpCommand): + async def _handle_user_ip(self, cmd: UserIpCommand) -> None: await self._store.insert_client_ip( cmd.user_id, cmd.access_token, @@ -421,7 +421,7 @@ class ReplicationCommandHandler: assert self._server_notices_sender is not None await self._server_notices_sender.on_user_ip(cmd.user_id) - def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand): + def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand) -> None: if cmd.instance_name == self._instance_name: # Ignore RDATA that are just our own echoes return @@ -497,7 +497,7 @@ class ReplicationCommandHandler: async def on_rdata( self, stream_name: str, instance_name: str, token: int, rows: list - ): + ) -> None: """Called to handle a batch of replication data with a given stream token. Args: @@ -512,7 +512,7 @@ class ReplicationCommandHandler: stream_name, instance_name, token, rows ) - def on_POSITION(self, conn: IReplicationConnection, cmd: PositionCommand): + def on_POSITION(self, conn: IReplicationConnection, cmd: PositionCommand) -> None: if cmd.instance_name == self._instance_name: # Ignore POSITION that are just our own echoes return @@ -581,7 +581,7 @@ class ReplicationCommandHandler: def on_REMOTE_SERVER_UP( self, conn: IReplicationConnection, cmd: RemoteServerUpCommand - ): + ) -> None: """Called when get a new REMOTE_SERVER_UP command.""" self._replication_data_handler.on_remote_server_up(cmd.data) @@ -604,7 +604,7 @@ class ReplicationCommandHandler: # between two instances, but that is not currently supported). self.send_command(cmd, ignore_conn=conn) - def new_connection(self, connection: IReplicationConnection): + def new_connection(self, connection: IReplicationConnection) -> None: """Called when we have a new connection.""" self._connections.append(connection) @@ -631,7 +631,7 @@ class ReplicationCommandHandler: UserSyncCommand(self._instance_id, user_id, True, now) ) - def lost_connection(self, connection: IReplicationConnection): + def lost_connection(self, connection: IReplicationConnection) -> None: """Called when a connection is closed/lost.""" # we no longer need _streams_by_connection for this connection. streams = self._streams_by_connection.pop(connection, None) @@ -653,7 +653,7 @@ class ReplicationCommandHandler: def send_command( self, cmd: Command, ignore_conn: Optional[IReplicationConnection] = None - ): + ) -> None: """Send a command to all connected connections. Args: @@ -680,7 +680,7 @@ class ReplicationCommandHandler: else: logger.warning("Dropping command as not connected: %r", cmd.NAME) - def send_federation_ack(self, token: int): + def send_federation_ack(self, token: int) -> None: """Ack data for the federation stream. This allows the master to drop data stored purely in memory. """ @@ -688,7 +688,7 @@ class ReplicationCommandHandler: def send_user_sync( self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int - ): + ) -> None: """Poke the master that a user has started/stopped syncing.""" self.send_command( UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms) @@ -702,15 +702,15 @@ class ReplicationCommandHandler: user_agent: str, device_id: str, last_seen: int, - ): + ) -> None: """Tell the master that the user made a request.""" cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen) self.send_command(cmd) - def send_remote_server_up(self, server: str): + def send_remote_server_up(self, server: str) -> None: self.send_command(RemoteServerUpCommand(server)) - def stream_update(self, stream_name: str, token: str, data: Any): + def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None: """Called when a new update is available to stream to clients. We need to check if the client is interested in the stream or not diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 7bae36db16..7763ffb2d0 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -49,7 +49,7 @@ import fcntl import logging import struct from inspect import isawaitable -from typing import TYPE_CHECKING, Collection, List, Optional +from typing import TYPE_CHECKING, Any, Collection, List, Optional from prometheus_client import Counter from zope.interface import Interface, implementer @@ -123,7 +123,7 @@ class ConnectionStates: class IReplicationConnection(Interface): """An interface for replication connections.""" - def send_command(cmd: Command): + def send_command(cmd: Command) -> None: """Send the command down the connection""" @@ -190,7 +190,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): "replication-conn", self.conn_id ) - def connectionMade(self): + def connectionMade(self) -> None: logger.info("[%s] Connection established", self.id()) self.state = ConnectionStates.ESTABLISHED @@ -207,11 +207,11 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): # Always send the initial PING so that the other side knows that they # can time us out. - self.send_command(PingCommand(self.clock.time_msec())) + self.send_command(PingCommand(str(self.clock.time_msec()))) self.command_handler.new_connection(self) - def send_ping(self): + def send_ping(self) -> None: """Periodically sends a ping and checks if we should close the connection due to the other side timing out. """ @@ -226,7 +226,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self.transport.abortConnection() else: if now - self.last_sent_command >= PING_TIME: - self.send_command(PingCommand(now)) + self.send_command(PingCommand(str(now))) if ( self.received_ping @@ -239,12 +239,12 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): ) self.send_error("ping timeout") - def lineReceived(self, line: bytes): + def lineReceived(self, line: bytes) -> None: """Called when we've received a line""" with PreserveLoggingContext(self._logging_context): self._parse_and_dispatch_line(line) - def _parse_and_dispatch_line(self, line: bytes): + def _parse_and_dispatch_line(self, line: bytes) -> None: if line.strip() == "": # Ignore blank lines return @@ -309,24 +309,24 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): if not handled: logger.warning("Unhandled command: %r", cmd) - def close(self): + def close(self) -> None: logger.warning("[%s] Closing connection", self.id()) self.time_we_closed = self.clock.time_msec() assert self.transport is not None self.transport.loseConnection() self.on_connection_closed() - def send_error(self, error_string, *args): + def send_error(self, error_string: str, *args: Any) -> None: """Send an error to remote and close the connection.""" self.send_command(ErrorCommand(error_string % args)) self.close() - def send_command(self, cmd, do_buffer=True): + def send_command(self, cmd: Command, do_buffer: bool = True) -> None: """Send a command if connection has been established. Args: - cmd (Command) - do_buffer (bool): Whether to buffer the message or always attempt + cmd + do_buffer: Whether to buffer the message or always attempt to send the command. This is mostly used to send an error message if we're about to close the connection due our buffers becoming full. @@ -357,7 +357,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self.last_sent_command = self.clock.time_msec() - def _queue_command(self, cmd): + def _queue_command(self, cmd: Command) -> None: """Queue the command until the connection is ready to write to again.""" logger.debug("[%s] Queueing as conn %r, cmd: %r", self.id(), self.state, cmd) self.pending_commands.append(cmd) @@ -370,20 +370,20 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self.send_command(ErrorCommand("Failed to keep up"), do_buffer=False) self.close() - def _send_pending_commands(self): + def _send_pending_commands(self) -> None: """Send any queued commandes""" pending = self.pending_commands self.pending_commands = [] for cmd in pending: self.send_command(cmd) - def on_PING(self, line): + def on_PING(self, cmd: PingCommand) -> None: self.received_ping = True - def on_ERROR(self, cmd): + def on_ERROR(self, cmd: ErrorCommand) -> None: logger.error("[%s] Remote reported error: %r", self.id(), cmd.data) - def pauseProducing(self): + def pauseProducing(self) -> None: """This is called when both the kernel send buffer and the twisted tcp connection send buffers have become full. @@ -394,26 +394,26 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): logger.info("[%s] Pause producing", self.id()) self.state = ConnectionStates.PAUSED - def resumeProducing(self): + def resumeProducing(self) -> None: """The remote has caught up after we started buffering!""" logger.info("[%s] Resume producing", self.id()) self.state = ConnectionStates.ESTABLISHED self._send_pending_commands() - def stopProducing(self): + def stopProducing(self) -> None: """We're never going to send any more data (normally because either we or the remote has closed the connection) """ logger.info("[%s] Stop producing", self.id()) self.on_connection_closed() - def connectionLost(self, reason): + def connectionLost(self, reason: Failure) -> None: # type: ignore[override] logger.info("[%s] Replication connection closed: %r", self.id(), reason) if isinstance(reason, Failure): assert reason.type is not None connection_close_counter.labels(reason.type.__name__).inc() else: - connection_close_counter.labels(reason.__class__.__name__).inc() + connection_close_counter.labels(reason.__class__.__name__).inc() # type: ignore[unreachable] try: # Remove us from list of connections to be monitored @@ -427,7 +427,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self.on_connection_closed() - def on_connection_closed(self): + def on_connection_closed(self) -> None: logger.info("[%s] Connection was closed", self.id()) self.state = ConnectionStates.CLOSED @@ -445,7 +445,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): # the sentinel context is now active, which may not be correct. # PreserveLoggingContext() will restore the correct logging context. - def __str__(self): + def __str__(self) -> str: addr = None if self.transport: addr = str(self.transport.getPeer()) @@ -455,10 +455,10 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): addr, ) - def id(self): + def id(self) -> str: return "%s-%s" % (self.name, self.conn_id) - def lineLengthExceeded(self, line): + def lineLengthExceeded(self, line: str) -> None: """Called when we receive a line that is above the maximum line length""" self.send_error("Line length exceeded") @@ -474,11 +474,11 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): self.server_name = server_name - def connectionMade(self): + def connectionMade(self) -> None: self.send_command(ServerCommand(self.server_name)) super().connectionMade() - def on_NAME(self, cmd): + def on_NAME(self, cmd: NameCommand) -> None: logger.info("[%s] Renamed to %r", self.id(), cmd.data) self.name = cmd.data @@ -500,19 +500,19 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): self.client_name = client_name self.server_name = server_name - def connectionMade(self): + def connectionMade(self) -> None: self.send_command(NameCommand(self.client_name)) super().connectionMade() # Once we've connected subscribe to the necessary streams self.replicate() - def on_SERVER(self, cmd): + def on_SERVER(self, cmd: ServerCommand) -> None: if cmd.data != self.server_name: logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data) self.send_error("Wrong remote") - def replicate(self): + def replicate(self) -> None: """Send the subscription request to the server""" logger.info("[%s] Subscribing to replication streams", self.id()) @@ -529,7 +529,7 @@ pending_commands = LaterGauge( ) -def transport_buffer_size(protocol): +def transport_buffer_size(protocol: BaseReplicationStreamProtocol) -> int: if protocol.transport: size = len(protocol.transport.dataBuffer) + protocol.transport._tempDataLen return size @@ -544,7 +544,9 @@ transport_send_buffer = LaterGauge( ) -def transport_kernel_read_buffer_size(protocol, read=True): +def transport_kernel_read_buffer_size( + protocol: BaseReplicationStreamProtocol, read: bool = True +) -> int: SIOCINQ = 0x541B SIOCOUTQ = 0x5411 diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 5b37f379d0..3170f7c59b 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -14,7 +14,7 @@ import logging from inspect import isawaitable -from typing import TYPE_CHECKING, Generic, Optional, Type, TypeVar, cast +from typing import TYPE_CHECKING, Any, Generic, Optional, Type, TypeVar, cast import attr import txredisapi @@ -62,7 +62,7 @@ class ConstantProperty(Generic[T, V]): def __get__(self, obj: Optional[T], objtype: Optional[Type[T]] = None) -> V: return self.constant - def __set__(self, obj: Optional[T], value: V): + def __set__(self, obj: Optional[T], value: V) -> None: pass @@ -95,7 +95,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): synapse_stream_name: str synapse_outbound_redis_connection: txredisapi.RedisProtocol - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) # a logcontext which we use for processing incoming commands. We declare it as a @@ -108,12 +108,12 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): "replication_command_handler" ) - def connectionMade(self): + def connectionMade(self) -> None: logger.info("Connected to redis") super().connectionMade() run_as_background_process("subscribe-replication", self._send_subscribe) - async def _send_subscribe(self): + async def _send_subscribe(self) -> None: # it's important to make sure that we only send the REPLICATE command once we # have successfully subscribed to the stream - otherwise we might miss the # POSITION response sent back by the other end. @@ -131,12 +131,12 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): # otherside won't know we've connected and so won't issue a REPLICATE. self.synapse_handler.send_positions_to_connection(self) - def messageReceived(self, pattern: str, channel: str, message: str): + def messageReceived(self, pattern: str, channel: str, message: str) -> None: """Received a message from redis.""" with PreserveLoggingContext(self._logging_context): self._parse_and_dispatch_message(message) - def _parse_and_dispatch_message(self, message: str): + def _parse_and_dispatch_message(self, message: str) -> None: if message.strip() == "": # Ignore blank lines return @@ -181,7 +181,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): "replication-" + cmd.get_logcontext_id(), lambda: res ) - def connectionLost(self, reason): + def connectionLost(self, reason: Failure) -> None: # type: ignore[override] logger.info("Lost connection to redis") super().connectionLost(reason) self.synapse_handler.lost_connection(self) @@ -193,17 +193,17 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): # the sentinel context is now active, which may not be correct. # PreserveLoggingContext() will restore the correct logging context. - def send_command(self, cmd: Command): + def send_command(self, cmd: Command) -> None: """Send a command if connection has been established. Args: - cmd (Command) + cmd: The command to send """ run_as_background_process( "send-cmd", self._async_send_command, cmd, bg_start_span=False ) - async def _async_send_command(self, cmd: Command): + async def _async_send_command(self, cmd: Command) -> None: """Encode a replication command and send it over our outbound connection""" string = "%s %s" % (cmd.NAME, cmd.to_line()) if "\n" in string: @@ -259,7 +259,7 @@ class SynapseRedisFactory(txredisapi.RedisFactory): hs.get_clock().looping_call(self._send_ping, 30 * 1000) @wrap_as_background_process("redis_ping") - async def _send_ping(self): + async def _send_ping(self) -> None: for connection in self.pool: try: await make_deferred_yieldable(connection.ping()) @@ -269,13 +269,13 @@ class SynapseRedisFactory(txredisapi.RedisFactory): # ReconnectingClientFactory has some logging (if you enable `self.noisy`), but # it's rubbish. We add our own here. - def startedConnecting(self, connector: IConnector): + def startedConnecting(self, connector: IConnector) -> None: logger.info( "Connecting to redis server %s", format_address(connector.getDestination()) ) super().startedConnecting(connector) - def clientConnectionFailed(self, connector: IConnector, reason: Failure): + def clientConnectionFailed(self, connector: IConnector, reason: Failure) -> None: logger.info( "Connection to redis server %s failed: %s", format_address(connector.getDestination()), @@ -283,7 +283,7 @@ class SynapseRedisFactory(txredisapi.RedisFactory): ) super().clientConnectionFailed(connector, reason) - def clientConnectionLost(self, connector: IConnector, reason: Failure): + def clientConnectionLost(self, connector: IConnector, reason: Failure) -> None: logger.info( "Connection to redis server %s lost: %s", format_address(connector.getDestination()), @@ -330,7 +330,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): self.synapse_outbound_redis_connection = outbound_redis_connection - def buildProtocol(self, addr): + def buildProtocol(self, addr: IAddress) -> RedisSubscriber: p = super().buildProtocol(addr) p = cast(RedisSubscriber, p) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index a9d85f4f6c..ecd6190f5b 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -16,16 +16,18 @@ import logging import random -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, List, Optional, Tuple from prometheus_client import Counter +from twisted.internet.interfaces import IAddress from twisted.internet.protocol import ServerFactory from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.commands import PositionCommand from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol from synapse.replication.tcp.streams import EventsStream +from synapse.replication.tcp.streams._base import StreamRow, Token from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -56,7 +58,7 @@ class ReplicationStreamProtocolFactory(ServerFactory): # listener config again or always starting a `ReplicationStreamer`.) hs.get_replication_streamer() - def buildProtocol(self, addr): + def buildProtocol(self, addr: IAddress) -> ServerReplicationStreamProtocol: return ServerReplicationStreamProtocol( self.server_name, self.clock, self.command_handler ) @@ -105,7 +107,7 @@ class ReplicationStreamer: if any(EventsStream.NAME == s.NAME for s in self.streams): self.clock.looping_call(self.on_notifier_poke, 1000) - def on_notifier_poke(self): + def on_notifier_poke(self) -> None: """Checks if there is actually any new data and sends it to the connections if there are. @@ -137,7 +139,7 @@ class ReplicationStreamer: run_as_background_process("replication_notifier", self._run_notifier_loop) - async def _run_notifier_loop(self): + async def _run_notifier_loop(self) -> None: self.is_looping = True try: @@ -238,7 +240,9 @@ class ReplicationStreamer: self.is_looping = False -def _batch_updates(updates): +def _batch_updates( + updates: List[Tuple[Token, StreamRow]] +) -> List[Tuple[Optional[Token], StreamRow]]: """Takes a list of updates of form [(token, row)] and sets the token to None for all rows where the next row has the same token. This is used to implement batching. @@ -254,7 +258,7 @@ def _batch_updates(updates): if not updates: return [] - new_updates = [] + new_updates: List[Tuple[Optional[Token], StreamRow]] = [] for i, update in enumerate(updates[:-1]): if update[0] == updates[i + 1][0]: new_updates.append((None, update[1])) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 5a2d90c530..914b9eae84 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -90,7 +90,7 @@ class Stream: ROW_TYPE: Any = None @classmethod - def parse_row(cls, row: StreamRow): + def parse_row(cls, row: StreamRow) -> Any: """Parse a row received over replication By default, assumes that the row data is an array object and passes its contents @@ -139,7 +139,7 @@ class Stream: # The token from which we last asked for updates self.last_token = self.current_token(self.local_instance_name) - def discard_updates_and_advance(self): + def discard_updates_and_advance(self) -> None: """Called when the stream should advance but the updates would be discarded, e.g. when there are no currently connected workers. """ @@ -200,7 +200,7 @@ def current_token_without_instance( return lambda instance_name: current_token() -def make_http_update_function(hs, stream_name: str) -> UpdateFunction: +def make_http_update_function(hs: "HomeServer", stream_name: str) -> UpdateFunction: """Makes a suitable function for use as an `update_function` that queries the master process for updates. """ diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 4f4f1ad453..50c4a5ba03 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -13,12 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. import heapq -from collections.abc import Iterable -from typing import TYPE_CHECKING, Optional, Tuple, Type +from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Type, TypeVar, cast import attr -from ._base import Stream, StreamUpdateResult, Token +from synapse.replication.tcp.streams._base import ( + Stream, + StreamRow, + StreamUpdateResult, + Token, +) if TYPE_CHECKING: from synapse.server import HomeServer @@ -58,6 +62,9 @@ class EventsStreamRow: data: "BaseEventsStreamRow" +T = TypeVar("T", bound="BaseEventsStreamRow") + + class BaseEventsStreamRow: """Base class for rows to be sent in the events stream. @@ -68,7 +75,7 @@ class BaseEventsStreamRow: TypeId: str @classmethod - def from_data(cls, data): + def from_data(cls: Type[T], data: Iterable[Optional[str]]) -> T: """Parse the data from the replication stream into a row. By default we just call the constructor with the data list as arguments @@ -221,7 +228,7 @@ class EventsStream(Stream): return updates, upper_limit, limited @classmethod - def parse_row(cls, row): - (typ, data) = row - data = TypeToRow[typ].from_data(data) - return EventsStreamRow(typ, data) + def parse_row(cls, row: StreamRow) -> "EventsStreamRow": + (typ, data) = cast(Tuple[str, Iterable[Optional[str]]], row) + event_stream_row_data = TypeToRow[typ].from_data(data) + return EventsStreamRow(typ, event_stream_row_data) diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py index ea1032b4fc..b26546aecd 100644 --- a/synapse/util/stringutils.py +++ b/synapse/util/stringutils.py @@ -16,8 +16,7 @@ import itertools import re import secrets import string -from collections.abc import Iterable -from typing import Optional, Tuple +from typing import Iterable, Optional, Tuple from netaddr import valid_ipv6 @@ -197,7 +196,7 @@ def shortstr(iterable: Iterable, maxitems: int = 5) -> str: """If iterable has maxitems or fewer, return the stringification of a list containing those items. - Otherwise, return the stringification of a a list with the first maxitems items, + Otherwise, return the stringification of a list with the first maxitems items, followed by "...". Args: diff --git a/tests/replication/_base.py b/tests/replication/_base.py index cb02eddf07..9fc50f8852 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -14,6 +14,7 @@ import logging from typing import Any, Dict, List, Optional, Tuple +from twisted.internet.address import IPv4Address from twisted.internet.protocol import Protocol from twisted.web.resource import Resource @@ -53,7 +54,7 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): server_factory = ReplicationStreamProtocolFactory(hs) self.streamer = hs.get_replication_streamer() self.server: ServerReplicationStreamProtocol = server_factory.buildProtocol( - None + IPv4Address("TCP", "127.0.0.1", 0) ) # Make a new HomeServer object for the worker @@ -345,7 +346,9 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): self.clock, repl_handler, ) - server = self.server_factory.buildProtocol(None) + server = self.server_factory.buildProtocol( + IPv4Address("TCP", "127.0.0.1", 0) + ) client_transport = FakeTransport(server, self.reactor) client.makeConnection(client_transport) diff --git a/tests/replication/tcp/test_remote_server_up.py b/tests/replication/tcp/test_remote_server_up.py index 262c35cef3..545f11acd1 100644 --- a/tests/replication/tcp/test_remote_server_up.py +++ b/tests/replication/tcp/test_remote_server_up.py @@ -14,6 +14,7 @@ from typing import Tuple +from twisted.internet.address import IPv4Address from twisted.internet.interfaces import IProtocol from twisted.test.proto_helpers import StringTransport @@ -29,7 +30,7 @@ class RemoteServerUpTestCase(HomeserverTestCase): def _make_client(self) -> Tuple[IProtocol, StringTransport]: """Create a new direct TCP replication connection""" - proto = self.factory.buildProtocol(("127.0.0.1", 0)) + proto = self.factory.buildProtocol(IPv4Address("TCP", "127.0.0.1", 0)) transport = StringTransport() proto.makeConnection(transport) -- cgit 1.5.1 From 337f38cac38bc57bc6a3cc8b319e3079c60c4549 Mon Sep 17 00:00:00 2001 From: Denis Kasak Date: Thu, 10 Feb 2022 15:43:01 +0000 Subject: Implement a content type allow list for URL previews (#11936) This implements an allow list for content types for which Synapse will attempt URL preview. If a URL resolves to a resource with a content type which isn't in the list, the download will terminate immediately. This makes sense given that Synapse would never successfully generate a URL preview for such files in the first place, and helps prevent issues with streaming media servers, such as #8302. Signed-off-by: Denis Kasak dkasak@termina.org.uk --- changelog.d/11936.bugfix | 1 + synapse/http/client.py | 18 +++++++ synapse/rest/media/v1/preview_url_resource.py | 8 +++ tests/rest/media/v1/test_url_preview.py | 72 +++++++++++++++++++++++++++ 4 files changed, 99 insertions(+) create mode 100644 changelog.d/11936.bugfix (limited to 'tests') diff --git a/changelog.d/11936.bugfix b/changelog.d/11936.bugfix new file mode 100644 index 0000000000..bc149f2801 --- /dev/null +++ b/changelog.d/11936.bugfix @@ -0,0 +1 @@ +Implement an allow list of content types for which we will attempt to preview a URL. This prevents Synapse from making useless longer-lived connections to streaming media servers. diff --git a/synapse/http/client.py b/synapse/http/client.py index 743a7ffcb1..d617055617 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -20,6 +20,7 @@ from typing import ( TYPE_CHECKING, Any, BinaryIO, + Callable, Dict, Iterable, List, @@ -693,12 +694,18 @@ class SimpleHttpClient: output_stream: BinaryIO, max_size: Optional[int] = None, headers: Optional[RawHeaders] = None, + is_allowed_content_type: Optional[Callable[[str], bool]] = None, ) -> Tuple[int, Dict[bytes, List[bytes]], str, int]: """GETs a file from a given URL Args: url: The URL to GET output_stream: File to write the response body to. headers: A map from header name to a list of values for that header + is_allowed_content_type: A predicate to determine whether the + content type of the file we're downloading is allowed. If set and + it evaluates to False when called with the content type, the + request will be terminated before completing the download by + raising SynapseError. Returns: A tuple of the file length, dict of the response headers, absolute URI of the response and HTTP response code. @@ -726,6 +733,17 @@ class SimpleHttpClient: HTTPStatus.BAD_GATEWAY, "Got error %d" % (response.code,), Codes.UNKNOWN ) + if is_allowed_content_type and b"Content-Type" in resp_headers: + content_type = resp_headers[b"Content-Type"][0].decode("ascii") + if not is_allowed_content_type(content_type): + raise SynapseError( + HTTPStatus.BAD_GATEWAY, + ( + "Requested file's content type not allowed for this operation: %s" + % content_type + ), + ) + # TODO: if our Content-Type is HTML or something, just read the first # N bytes into RAM rather than saving it all to disk only to read it # straight back in again diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index efd84ced8f..8d3d1e54dc 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -403,6 +403,7 @@ class PreviewUrlResource(DirectServeJsonResource): output_stream=output_stream, max_size=self.max_spider_size, headers={"Accept-Language": self.url_preview_accept_language}, + is_allowed_content_type=_is_previewable, ) except SynapseError: # Pass SynapseErrors through directly, so that the servlet @@ -761,3 +762,10 @@ def _is_html(content_type: str) -> bool: def _is_json(content_type: str) -> bool: return content_type.lower().startswith("application/json") + + +def _is_previewable(content_type: str) -> bool: + """Returns True for content types for which we will perform URL preview and False + otherwise.""" + + return _is_html(content_type) or _is_media(content_type) or _is_json(content_type) diff --git a/tests/rest/media/v1/test_url_preview.py b/tests/rest/media/v1/test_url_preview.py index 53f6186213..da2c533260 100644 --- a/tests/rest/media/v1/test_url_preview.py +++ b/tests/rest/media/v1/test_url_preview.py @@ -243,6 +243,78 @@ class URLPreviewTests(unittest.HomeserverTestCase): self.assertEqual(channel.code, 200) self.assertEqual(channel.json_body["og:title"], "\u0434\u043a\u0430") + def test_video_rejected(self): + self.lookups["matrix.org"] = [(IPv4Address, "10.1.2.3")] + + end_content = b"anything" + + channel = self.make_request( + "GET", + "preview_url?url=http://matrix.org", + shorthand=False, + await_result=False, + ) + self.pump() + + client = self.reactor.tcpClients[0][2].buildProtocol(None) + server = AccumulatingProtocol() + server.makeConnection(FakeTransport(client, self.reactor)) + client.makeConnection(FakeTransport(server, self.reactor)) + client.dataReceived( + ( + b"HTTP/1.0 200 OK\r\nContent-Length: %d\r\n" + b"Content-Type: video/mp4\r\n\r\n" + ) + % (len(end_content)) + + end_content + ) + + self.pump() + self.assertEqual(channel.code, 502) + self.assertEqual( + channel.json_body, + { + "errcode": "M_UNKNOWN", + "error": "Requested file's content type not allowed for this operation: video/mp4", + }, + ) + + def test_audio_rejected(self): + self.lookups["matrix.org"] = [(IPv4Address, "10.1.2.3")] + + end_content = b"anything" + + channel = self.make_request( + "GET", + "preview_url?url=http://matrix.org", + shorthand=False, + await_result=False, + ) + self.pump() + + client = self.reactor.tcpClients[0][2].buildProtocol(None) + server = AccumulatingProtocol() + server.makeConnection(FakeTransport(client, self.reactor)) + client.makeConnection(FakeTransport(server, self.reactor)) + client.dataReceived( + ( + b"HTTP/1.0 200 OK\r\nContent-Length: %d\r\n" + b"Content-Type: audio/aac\r\n\r\n" + ) + % (len(end_content)) + + end_content + ) + + self.pump() + self.assertEqual(channel.code, 502) + self.assertEqual( + channel.json_body, + { + "errcode": "M_UNKNOWN", + "error": "Requested file's content type not allowed for this operation: audio/aac", + }, + ) + def test_non_ascii_preview_content_type(self): self.lookups["matrix.org"] = [(IPv4Address, "10.1.2.3")] -- cgit 1.5.1 From df36945ff0e4a293a9dac0da07e2c94256835b32 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 10 Feb 2022 10:52:48 -0500 Subject: Support pagination tokens from /sync and /messages in the relations API. (#11952) --- changelog.d/11952.bugfix | 1 + synapse/rest/client/relations.py | 57 +++++++---- synapse/storage/databases/main/relations.py | 46 ++++++--- synapse/storage/relations.py | 15 +-- tests/rest/client/test_relations.py | 151 +++++++++++++++++++++++++--- 5 files changed, 217 insertions(+), 53 deletions(-) create mode 100644 changelog.d/11952.bugfix (limited to 'tests') diff --git a/changelog.d/11952.bugfix b/changelog.d/11952.bugfix new file mode 100644 index 0000000000..e38a08f559 --- /dev/null +++ b/changelog.d/11952.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where pagination tokens from `/sync` and `/messages` could not be provided to the `/relations` API. diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py index 8cf5ebaa07..9ec425888a 100644 --- a/synapse/rest/client/relations.py +++ b/synapse/rest/client/relations.py @@ -32,14 +32,45 @@ from synapse.storage.relations import ( PaginationChunk, RelationPaginationToken, ) -from synapse.types import JsonDict +from synapse.types import JsonDict, RoomStreamToken, StreamToken if TYPE_CHECKING: from synapse.server import HomeServer + from synapse.storage.databases.main import DataStore logger = logging.getLogger(__name__) +async def _parse_token( + store: "DataStore", token: Optional[str] +) -> Optional[StreamToken]: + """ + For backwards compatibility support RelationPaginationToken, but new pagination + tokens are generated as full StreamTokens, to be compatible with /sync and /messages. + """ + if not token: + return None + # Luckily the format for StreamToken and RelationPaginationToken differ enough + # that they can easily be separated. An "_" appears in the serialization of + # RoomStreamToken (as part of StreamToken), but RelationPaginationToken uses + # "-" only for separators. + if "_" in token: + return await StreamToken.from_string(store, token) + else: + relation_token = RelationPaginationToken.from_string(token) + return StreamToken( + room_key=RoomStreamToken(relation_token.topological, relation_token.stream), + presence_key=0, + typing_key=0, + receipt_key=0, + account_data_key=0, + push_rules_key=0, + to_device_key=0, + device_list_key=0, + groups_key=0, + ) + + class RelationPaginationServlet(RestServlet): """API to paginate relations on an event by topological ordering, optionally filtered by relation type and event type. @@ -88,13 +119,8 @@ class RelationPaginationServlet(RestServlet): pagination_chunk = PaginationChunk(chunk=[]) else: # Return the relations - from_token = None - if from_token_str: - from_token = RelationPaginationToken.from_string(from_token_str) - - to_token = None - if to_token_str: - to_token = RelationPaginationToken.from_string(to_token_str) + from_token = await _parse_token(self.store, from_token_str) + to_token = await _parse_token(self.store, to_token_str) pagination_chunk = await self.store.get_relations_for_event( event_id=parent_id, @@ -125,7 +151,7 @@ class RelationPaginationServlet(RestServlet): events, now, bundle_aggregations=aggregations ) - return_value = pagination_chunk.to_dict() + return_value = await pagination_chunk.to_dict(self.store) return_value["chunk"] = serialized_events return_value["original_event"] = original_event @@ -216,7 +242,7 @@ class RelationAggregationPaginationServlet(RestServlet): to_token=to_token, ) - return 200, pagination_chunk.to_dict() + return 200, await pagination_chunk.to_dict(self.store) class RelationAggregationGroupPaginationServlet(RestServlet): @@ -287,13 +313,8 @@ class RelationAggregationGroupPaginationServlet(RestServlet): from_token_str = parse_string(request, "from") to_token_str = parse_string(request, "to") - from_token = None - if from_token_str: - from_token = RelationPaginationToken.from_string(from_token_str) - - to_token = None - if to_token_str: - to_token = RelationPaginationToken.from_string(to_token_str) + from_token = await _parse_token(self.store, from_token_str) + to_token = await _parse_token(self.store, to_token_str) result = await self.store.get_relations_for_event( event_id=parent_id, @@ -313,7 +334,7 @@ class RelationAggregationGroupPaginationServlet(RestServlet): now = self.clock.time_msec() serialized_events = self._event_serializer.serialize_events(events, now) - return_value = result.to_dict() + return_value = await result.to_dict(self.store) return_value["chunk"] = serialized_events return 200, return_value diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 7718acbf1c..ad79cc5610 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -39,16 +39,13 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.stream import generate_pagination_where_clause from synapse.storage.engines import PostgresEngine -from synapse.storage.relations import ( - AggregationPaginationToken, - PaginationChunk, - RelationPaginationToken, -) -from synapse.types import JsonDict +from synapse.storage.relations import AggregationPaginationToken, PaginationChunk +from synapse.types import JsonDict, RoomStreamToken, StreamToken from synapse.util.caches.descriptors import cached, cachedList if TYPE_CHECKING: from synapse.server import HomeServer + from synapse.storage.databases.main import DataStore logger = logging.getLogger(__name__) @@ -98,8 +95,8 @@ class RelationsWorkerStore(SQLBaseStore): aggregation_key: Optional[str] = None, limit: int = 5, direction: str = "b", - from_token: Optional[RelationPaginationToken] = None, - to_token: Optional[RelationPaginationToken] = None, + from_token: Optional[StreamToken] = None, + to_token: Optional[StreamToken] = None, ) -> PaginationChunk: """Get a list of relations for an event, ordered by topological ordering. @@ -138,8 +135,10 @@ class RelationsWorkerStore(SQLBaseStore): pagination_clause = generate_pagination_where_clause( direction=direction, column_names=("topological_ordering", "stream_ordering"), - from_token=attr.astuple(from_token) if from_token else None, # type: ignore[arg-type] - to_token=attr.astuple(to_token) if to_token else None, # type: ignore[arg-type] + from_token=from_token.room_key.as_historical_tuple() + if from_token + else None, + to_token=to_token.room_key.as_historical_tuple() if to_token else None, engine=self.database_engine, ) @@ -177,12 +176,27 @@ class RelationsWorkerStore(SQLBaseStore): last_topo_id = row[1] last_stream_id = row[2] - next_batch = None + # If there are more events, generate the next pagination key. + next_token = None if len(events) > limit and last_topo_id and last_stream_id: - next_batch = RelationPaginationToken(last_topo_id, last_stream_id) + next_key = RoomStreamToken(last_topo_id, last_stream_id) + if from_token: + next_token = from_token.copy_and_replace("room_key", next_key) + else: + next_token = StreamToken( + room_key=next_key, + presence_key=0, + typing_key=0, + receipt_key=0, + account_data_key=0, + push_rules_key=0, + to_device_key=0, + device_list_key=0, + groups_key=0, + ) return PaginationChunk( - chunk=list(events[:limit]), next_batch=next_batch, prev_batch=from_token + chunk=list(events[:limit]), next_batch=next_token, prev_batch=from_token ) return await self.db_pool.runInteraction( @@ -676,13 +690,15 @@ class RelationsWorkerStore(SQLBaseStore): annotations = await self.get_aggregation_groups_for_event(event_id, room_id) if annotations.chunk: - aggregations.annotations = annotations.to_dict() + aggregations.annotations = await annotations.to_dict( + cast("DataStore", self) + ) references = await self.get_relations_for_event( event_id, room_id, RelationTypes.REFERENCE, direction="f" ) if references.chunk: - aggregations.references = references.to_dict() + aggregations.references = await references.to_dict(cast("DataStore", self)) # If this event is the start of a thread, include a summary of the replies. if self._msc3440_enabled: diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py index b1536c1ca4..36ca2b8273 100644 --- a/synapse/storage/relations.py +++ b/synapse/storage/relations.py @@ -13,13 +13,16 @@ # limitations under the License. import logging -from typing import Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple import attr from synapse.api.errors import SynapseError from synapse.types import JsonDict +if TYPE_CHECKING: + from synapse.storage.databases.main import DataStore + logger = logging.getLogger(__name__) @@ -39,14 +42,14 @@ class PaginationChunk: next_batch: Optional[Any] = None prev_batch: Optional[Any] = None - def to_dict(self) -> Dict[str, Any]: + async def to_dict(self, store: "DataStore") -> Dict[str, Any]: d = {"chunk": self.chunk} if self.next_batch: - d["next_batch"] = self.next_batch.to_string() + d["next_batch"] = await self.next_batch.to_string(store) if self.prev_batch: - d["prev_batch"] = self.prev_batch.to_string() + d["prev_batch"] = await self.prev_batch.to_string(store) return d @@ -75,7 +78,7 @@ class RelationPaginationToken: except ValueError: raise SynapseError(400, "Invalid relation pagination token") - def to_string(self) -> str: + async def to_string(self, store: "DataStore") -> str: return "%d-%d" % (self.topological, self.stream) def as_tuple(self) -> Tuple[Any, ...]: @@ -105,7 +108,7 @@ class AggregationPaginationToken: except ValueError: raise SynapseError(400, "Invalid aggregation pagination token") - def to_string(self) -> str: + async def to_string(self, store: "DataStore") -> str: return "%d-%d" % (self.count, self.stream) def as_tuple(self) -> Tuple[Any, ...]: diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 06721e67c9..9768fb2971 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -21,7 +21,8 @@ from unittest.mock import patch from synapse.api.constants import EventTypes, RelationTypes from synapse.rest import admin from synapse.rest.client import login, register, relations, room, sync -from synapse.types import JsonDict +from synapse.storage.relations import RelationPaginationToken +from synapse.types import JsonDict, StreamToken from tests import unittest from tests.server import FakeChannel @@ -200,6 +201,15 @@ class RelationsTestCase(unittest.HomeserverTestCase): channel.json_body.get("next_batch"), str, channel.json_body ) + def _stream_token_to_relation_token(self, token: str) -> str: + """Convert a StreamToken into a legacy token (RelationPaginationToken).""" + room_key = self.get_success(StreamToken.from_string(self.store, token)).room_key + return self.get_success( + RelationPaginationToken( + topological=room_key.topological, stream=room_key.stream + ).to_string(self.store) + ) + def test_repeated_paginate_relations(self): """Test that if we paginate using a limit and tokens then we get the expected events. @@ -213,7 +223,7 @@ class RelationsTestCase(unittest.HomeserverTestCase): self.assertEquals(200, channel.code, channel.json_body) expected_event_ids.append(channel.json_body["event_id"]) - prev_token: Optional[str] = None + prev_token = "" found_event_ids: List[str] = [] for _ in range(20): from_token = "" @@ -222,8 +232,35 @@ class RelationsTestCase(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "/_matrix/client/unstable/rooms/%s/relations/%s?limit=1%s" - % (self.room, self.parent_id, from_token), + f"/_matrix/client/unstable/rooms/{self.room}/relations/{self.parent_id}?limit=1{from_token}", + access_token=self.user_token, + ) + self.assertEquals(200, channel.code, channel.json_body) + + found_event_ids.extend(e["event_id"] for e in channel.json_body["chunk"]) + next_batch = channel.json_body.get("next_batch") + + self.assertNotEquals(prev_token, next_batch) + prev_token = next_batch + + if not prev_token: + break + + # We paginated backwards, so reverse + found_event_ids.reverse() + self.assertEquals(found_event_ids, expected_event_ids) + + # Reset and try again, but convert the tokens to the legacy format. + prev_token = "" + found_event_ids = [] + for _ in range(20): + from_token = "" + if prev_token: + from_token = "&from=" + self._stream_token_to_relation_token(prev_token) + + channel = self.make_request( + "GET", + f"/_matrix/client/unstable/rooms/{self.room}/relations/{self.parent_id}?limit=1{from_token}", access_token=self.user_token, ) self.assertEquals(200, channel.code, channel.json_body) @@ -241,6 +278,65 @@ class RelationsTestCase(unittest.HomeserverTestCase): found_event_ids.reverse() self.assertEquals(found_event_ids, expected_event_ids) + def test_pagination_from_sync_and_messages(self): + """Pagination tokens from /sync and /messages can be used to paginate /relations.""" + channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", "A") + self.assertEquals(200, channel.code, channel.json_body) + annotation_id = channel.json_body["event_id"] + # Send an event after the relation events. + self.helper.send(self.room, body="Latest event", tok=self.user_token) + + # Request /sync, limiting it such that only the latest event is returned + # (and not the relation). + filter = urllib.parse.quote_plus( + '{"room": {"timeline": {"limit": 1}}}'.encode() + ) + channel = self.make_request( + "GET", f"/sync?filter={filter}", access_token=self.user_token + ) + self.assertEquals(200, channel.code, channel.json_body) + room_timeline = channel.json_body["rooms"]["join"][self.room]["timeline"] + sync_prev_batch = room_timeline["prev_batch"] + self.assertIsNotNone(sync_prev_batch) + # Ensure the relation event is not in the batch returned from /sync. + self.assertNotIn( + annotation_id, [ev["event_id"] for ev in room_timeline["events"]] + ) + + # Request /messages, limiting it such that only the latest event is + # returned (and not the relation). + channel = self.make_request( + "GET", + f"/rooms/{self.room}/messages?dir=b&limit=1", + access_token=self.user_token, + ) + self.assertEquals(200, channel.code, channel.json_body) + messages_end = channel.json_body["end"] + self.assertIsNotNone(messages_end) + # Ensure the relation event is not in the chunk returned from /messages. + self.assertNotIn( + annotation_id, [ev["event_id"] for ev in channel.json_body["chunk"]] + ) + + # Request /relations with the pagination tokens received from both the + # /sync and /messages responses above, in turn. + # + # This is a tiny bit silly since the client wouldn't know the parent ID + # from the requests above; consider the parent ID to be known from a + # previous /sync. + for from_token in (sync_prev_batch, messages_end): + channel = self.make_request( + "GET", + f"/_matrix/client/unstable/rooms/{self.room}/relations/{self.parent_id}?from={from_token}", + access_token=self.user_token, + ) + self.assertEquals(200, channel.code, channel.json_body) + + # The relation should be in the returned chunk. + self.assertIn( + annotation_id, [ev["event_id"] for ev in channel.json_body["chunk"]] + ) + def test_aggregation_pagination_groups(self): """Test that we can paginate annotation groups correctly.""" @@ -337,7 +433,7 @@ class RelationsTestCase(unittest.HomeserverTestCase): channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="a") self.assertEquals(200, channel.code, channel.json_body) - prev_token: Optional[str] = None + prev_token = "" found_event_ids: List[str] = [] encoded_key = urllib.parse.quote_plus("👍".encode()) for _ in range(20): @@ -347,15 +443,42 @@ class RelationsTestCase(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "/_matrix/client/unstable/rooms/%s" - "/aggregations/%s/%s/m.reaction/%s?limit=1%s" - % ( - self.room, - self.parent_id, - RelationTypes.ANNOTATION, - encoded_key, - from_token, - ), + f"/_matrix/client/unstable/rooms/{self.room}" + f"/aggregations/{self.parent_id}/{RelationTypes.ANNOTATION}" + f"/m.reaction/{encoded_key}?limit=1{from_token}", + access_token=self.user_token, + ) + self.assertEquals(200, channel.code, channel.json_body) + + self.assertEqual(len(channel.json_body["chunk"]), 1, channel.json_body) + + found_event_ids.extend(e["event_id"] for e in channel.json_body["chunk"]) + + next_batch = channel.json_body.get("next_batch") + + self.assertNotEquals(prev_token, next_batch) + prev_token = next_batch + + if not prev_token: + break + + # We paginated backwards, so reverse + found_event_ids.reverse() + self.assertEquals(found_event_ids, expected_event_ids) + + # Reset and try again, but convert the tokens to the legacy format. + prev_token = "" + found_event_ids = [] + for _ in range(20): + from_token = "" + if prev_token: + from_token = "&from=" + self._stream_token_to_relation_token(prev_token) + + channel = self.make_request( + "GET", + f"/_matrix/client/unstable/rooms/{self.room}" + f"/aggregations/{self.parent_id}/{RelationTypes.ANNOTATION}" + f"/m.reaction/{encoded_key}?limit=1{from_token}", access_token=self.user_token, ) self.assertEquals(200, channel.code, channel.json_body) -- cgit 1.5.1 From d36943c4df841e789ef85c206c7fc7c4665ba4bd Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Fri, 11 Feb 2022 10:32:11 +0100 Subject: Support the stable API endpoint for MSC3283: new settings in `/capabilities` endpoint (#11933) --- changelog.d/11933.feature | 1 + docs/upgrade.md | 19 ++++++++++ synapse/rest/client/capabilities.py | 15 +++++++- tests/rest/client/test_capabilities.py | 69 +++++++++++----------------------- 4 files changed, 55 insertions(+), 49 deletions(-) create mode 100644 changelog.d/11933.feature (limited to 'tests') diff --git a/changelog.d/11933.feature b/changelog.d/11933.feature new file mode 100644 index 0000000000..2b1b0d1786 --- /dev/null +++ b/changelog.d/11933.feature @@ -0,0 +1 @@ +Support the stable API endpoint for [MSC3283](https://github.com/matrix-org/matrix-doc/pull/3283): new settings in `/capabilities` endpoint. diff --git a/docs/upgrade.md b/docs/upgrade.md index 581fd7de53..c5e0697333 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -100,6 +100,25 @@ to: Please update any relevant reverse proxy or firewall configurations appropriately. +## Deprecation of `capability` `org.matrix.msc3283.*` + +The `capabilities` of MSC3283 from the REST API `/_matrix/client/r0/capabilities` +becomes stable. + +The old `capabilities` +- `org.matrix.msc3283.set_displayname`, +- `org.matrix.msc3283.set_avatar_url` and +- `org.matrix.msc3283.3pid_changes` + +are deprecated and scheduled to be removed in Synapse v1.(next+1).0. + +The new `capabilities` +- `m.set_displayname`, +- `m.set_avatar_url` and +- `m.3pid_changes` + +are now active by default. + # Upgrading to v1.53.0 ## Dropping support for `webclient` listeners and non-HTTP(S) `web_client_location` diff --git a/synapse/rest/client/capabilities.py b/synapse/rest/client/capabilities.py index 5c0e3a5680..6682da077a 100644 --- a/synapse/rest/client/capabilities.py +++ b/synapse/rest/client/capabilities.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from http import HTTPStatus from typing import TYPE_CHECKING, Tuple from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, MSC3244_CAPABILITIES @@ -54,6 +55,15 @@ class CapabilitiesRestServlet(RestServlet): }, }, "m.change_password": {"enabled": change_password}, + "m.set_displayname": { + "enabled": self.config.registration.enable_set_displayname + }, + "m.set_avatar_url": { + "enabled": self.config.registration.enable_set_avatar_url + }, + "m.3pid_changes": { + "enabled": self.config.registration.enable_3pid_changes + }, } } @@ -62,6 +72,9 @@ class CapabilitiesRestServlet(RestServlet): "org.matrix.msc3244.room_capabilities" ] = MSC3244_CAPABILITIES + # Must be removed in later versions. + # Is only included for migration. + # Also the parts in `synapse/config/experimental.py`. if self.config.experimental.msc3283_enabled: response["capabilities"]["org.matrix.msc3283.set_displayname"] = { "enabled": self.config.registration.enable_set_displayname @@ -76,7 +89,7 @@ class CapabilitiesRestServlet(RestServlet): if self.config.experimental.msc3440_enabled: response["capabilities"]["io.element.thread"] = {"enabled": True} - return 200, response + return HTTPStatus.OK, response def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: diff --git a/tests/rest/client/test_capabilities.py b/tests/rest/client/test_capabilities.py index 249808b031..989e801768 100644 --- a/tests/rest/client/test_capabilities.py +++ b/tests/rest/client/test_capabilities.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from http import HTTPStatus + import synapse.rest.admin from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.rest.client import capabilities, login @@ -28,7 +30,7 @@ class CapabilitiesTestCase(unittest.HomeserverTestCase): ] def make_homeserver(self, reactor, clock): - self.url = b"/_matrix/client/r0/capabilities" + self.url = b"/capabilities" hs = self.setup_test_homeserver() self.config = hs.config self.auth_handler = hs.get_auth_handler() @@ -96,39 +98,20 @@ class CapabilitiesTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.code, 200) self.assertFalse(capabilities["m.change_password"]["enabled"]) - def test_get_change_users_attributes_capabilities_when_msc3283_disabled(self): - """Test that per default msc3283 is disabled server returns `m.change_password`.""" + def test_get_change_users_attributes_capabilities(self): + """Test that server returns capabilities by default.""" access_token = self.login(self.localpart, self.password) channel = self.make_request("GET", self.url, access_token=access_token) capabilities = channel.json_body["capabilities"] - self.assertEqual(channel.code, 200) + self.assertEqual(channel.code, HTTPStatus.OK) self.assertTrue(capabilities["m.change_password"]["enabled"]) - self.assertNotIn("org.matrix.msc3283.set_displayname", capabilities) - self.assertNotIn("org.matrix.msc3283.set_avatar_url", capabilities) - self.assertNotIn("org.matrix.msc3283.3pid_changes", capabilities) - - @override_config({"experimental_features": {"msc3283_enabled": True}}) - def test_get_change_users_attributes_capabilities_when_msc3283_enabled(self): - """Test if msc3283 is enabled server returns capabilities.""" - access_token = self.login(self.localpart, self.password) - - channel = self.make_request("GET", self.url, access_token=access_token) - capabilities = channel.json_body["capabilities"] + self.assertTrue(capabilities["m.set_displayname"]["enabled"]) + self.assertTrue(capabilities["m.set_avatar_url"]["enabled"]) + self.assertTrue(capabilities["m.3pid_changes"]["enabled"]) - self.assertEqual(channel.code, 200) - self.assertTrue(capabilities["m.change_password"]["enabled"]) - self.assertTrue(capabilities["org.matrix.msc3283.set_displayname"]["enabled"]) - self.assertTrue(capabilities["org.matrix.msc3283.set_avatar_url"]["enabled"]) - self.assertTrue(capabilities["org.matrix.msc3283.3pid_changes"]["enabled"]) - - @override_config( - { - "enable_set_displayname": False, - "experimental_features": {"msc3283_enabled": True}, - } - ) + @override_config({"enable_set_displayname": False}) def test_get_set_displayname_capabilities_displayname_disabled(self): """Test if set displayname is disabled that the server responds it.""" access_token = self.login(self.localpart, self.password) @@ -136,15 +119,10 @@ class CapabilitiesTestCase(unittest.HomeserverTestCase): channel = self.make_request("GET", self.url, access_token=access_token) capabilities = channel.json_body["capabilities"] - self.assertEqual(channel.code, 200) - self.assertFalse(capabilities["org.matrix.msc3283.set_displayname"]["enabled"]) - - @override_config( - { - "enable_set_avatar_url": False, - "experimental_features": {"msc3283_enabled": True}, - } - ) + self.assertEqual(channel.code, HTTPStatus.OK) + self.assertFalse(capabilities["m.set_displayname"]["enabled"]) + + @override_config({"enable_set_avatar_url": False}) def test_get_set_avatar_url_capabilities_avatar_url_disabled(self): """Test if set avatar_url is disabled that the server responds it.""" access_token = self.login(self.localpart, self.password) @@ -152,24 +130,19 @@ class CapabilitiesTestCase(unittest.HomeserverTestCase): channel = self.make_request("GET", self.url, access_token=access_token) capabilities = channel.json_body["capabilities"] - self.assertEqual(channel.code, 200) - self.assertFalse(capabilities["org.matrix.msc3283.set_avatar_url"]["enabled"]) - - @override_config( - { - "enable_3pid_changes": False, - "experimental_features": {"msc3283_enabled": True}, - } - ) - def test_change_3pid_capabilities_3pid_disabled(self): + self.assertEqual(channel.code, HTTPStatus.OK) + self.assertFalse(capabilities["m.set_avatar_url"]["enabled"]) + + @override_config({"enable_3pid_changes": False}) + def test_get_change_3pid_capabilities_3pid_disabled(self): """Test if change 3pid is disabled that the server responds it.""" access_token = self.login(self.localpart, self.password) channel = self.make_request("GET", self.url, access_token=access_token) capabilities = channel.json_body["capabilities"] - self.assertEqual(channel.code, 200) - self.assertFalse(capabilities["org.matrix.msc3283.3pid_changes"]["enabled"]) + self.assertEqual(channel.code, HTTPStatus.OK) + self.assertFalse(capabilities["m.3pid_changes"]["enabled"]) @override_config({"experimental_features": {"msc3244_enabled": False}}) def test_get_does_not_include_msc3244_fields_when_disabled(self): -- cgit 1.5.1 From c3db7a0b59d48b8872bc24096f9a2467ef35f703 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 11 Feb 2022 12:06:02 +0000 Subject: Tests: replace mocked Authenticator with the real thing (#11913) If we prepopulate the test homeserver with a key for a remote homeserver, we can make federation requests to it without having to stub out the authenticator. This has two advantages: * means that what we are testing is closer to reality (ie, we now have complete tests for the incoming-request-authorisation flow) * some tests require that other objects be signed by the remote server (eg, the event in `/send_join`), and doing that would require a whole separate set of mocking out. It's much simpler just to use real keys. --- changelog.d/11913.misc | 1 + tests/federation/test_complexity.py | 4 +- tests/federation/test_federation_server.py | 4 +- tests/federation/transport/test_knocking.py | 4 +- tests/federation/transport/test_server.py | 6 +- tests/rest/client/test_third_party_rules.py | 6 +- tests/unittest.py | 136 ++++++++++++++++++++++------ 7 files changed, 117 insertions(+), 44 deletions(-) create mode 100644 changelog.d/11913.misc (limited to 'tests') diff --git a/changelog.d/11913.misc b/changelog.d/11913.misc new file mode 100644 index 0000000000..cb70560364 --- /dev/null +++ b/changelog.d/11913.misc @@ -0,0 +1 @@ +Tests: replace mocked `Authenticator` with the real thing. diff --git a/tests/federation/test_complexity.py b/tests/federation/test_complexity.py index 7b486aba4a..e40ef95874 100644 --- a/tests/federation/test_complexity.py +++ b/tests/federation/test_complexity.py @@ -47,7 +47,7 @@ class RoomComplexityTests(unittest.FederatingHomeserverTestCase): ) # Get the room complexity - channel = self.make_request( + channel = self.make_signed_federation_request( "GET", "/_matrix/federation/unstable/rooms/%s/complexity" % (room_1,) ) self.assertEquals(200, channel.code) @@ -59,7 +59,7 @@ class RoomComplexityTests(unittest.FederatingHomeserverTestCase): store.get_current_state_event_counts = lambda x: make_awaitable(500 * 1.23) # Get the room complexity again -- make sure it's our artificial value - channel = self.make_request( + channel = self.make_signed_federation_request( "GET", "/_matrix/federation/unstable/rooms/%s/complexity" % (room_1,) ) self.assertEquals(200, channel.code) diff --git a/tests/federation/test_federation_server.py b/tests/federation/test_federation_server.py index 03e1e11f49..1af284bd2f 100644 --- a/tests/federation/test_federation_server.py +++ b/tests/federation/test_federation_server.py @@ -113,7 +113,7 @@ class StateQueryTests(unittest.FederatingHomeserverTestCase): room_1 = self.helper.create_room_as(u1, tok=u1_token) self.inject_room_member(room_1, "@user:other.example.com", "join") - channel = self.make_request( + channel = self.make_signed_federation_request( "GET", "/_matrix/federation/v1/state/%s" % (room_1,) ) self.assertEquals(200, channel.code, channel.result) @@ -145,7 +145,7 @@ class StateQueryTests(unittest.FederatingHomeserverTestCase): room_1 = self.helper.create_room_as(u1, tok=u1_token) - channel = self.make_request( + channel = self.make_signed_federation_request( "GET", "/_matrix/federation/v1/state/%s" % (room_1,) ) self.assertEquals(403, channel.code, channel.result) diff --git a/tests/federation/transport/test_knocking.py b/tests/federation/transport/test_knocking.py index bfa156eebb..686f42ab48 100644 --- a/tests/federation/transport/test_knocking.py +++ b/tests/federation/transport/test_knocking.py @@ -245,7 +245,7 @@ class FederationKnockingTestCase( self.hs, room_id, user_id ) - channel = self.make_request( + channel = self.make_signed_federation_request( "GET", "/_matrix/federation/v1/make_knock/%s/%s?ver=%s" % ( @@ -288,7 +288,7 @@ class FederationKnockingTestCase( ) # Send the signed knock event into the room - channel = self.make_request( + channel = self.make_signed_federation_request( "PUT", "/_matrix/federation/v1/send_knock/%s/%s" % (room_id, signed_knock_event.event_id), diff --git a/tests/federation/transport/test_server.py b/tests/federation/transport/test_server.py index 84fa72b9ff..eb62addda8 100644 --- a/tests/federation/transport/test_server.py +++ b/tests/federation/transport/test_server.py @@ -22,10 +22,9 @@ class RoomDirectoryFederationTests(unittest.FederatingHomeserverTestCase): """Test that unauthenticated requests to the public rooms directory 403 when allow_public_rooms_over_federation is False. """ - channel = self.make_request( + channel = self.make_signed_federation_request( "GET", "/_matrix/federation/v1/publicRooms", - federation_auth_origin=b"example.com", ) self.assertEquals(403, channel.code) @@ -34,9 +33,8 @@ class RoomDirectoryFederationTests(unittest.FederatingHomeserverTestCase): """Test that unauthenticated requests to the public rooms directory 200 when allow_public_rooms_over_federation is True. """ - channel = self.make_request( + channel = self.make_signed_federation_request( "GET", "/_matrix/federation/v1/publicRooms", - federation_auth_origin=b"example.com", ) self.assertEquals(200, channel.code) diff --git a/tests/rest/client/test_third_party_rules.py b/tests/rest/client/test_third_party_rules.py index 4e71b6ec12..ac6b86ff6b 100644 --- a/tests/rest/client/test_third_party_rules.py +++ b/tests/rest/client/test_third_party_rules.py @@ -107,6 +107,7 @@ class ThirdPartyRulesTestCase(unittest.FederatingHomeserverTestCase): return hs def prepare(self, reactor, clock, homeserver): + super().prepare(reactor, clock, homeserver) # Create some users and a room to play with during the tests self.user_id = self.register_user("kermit", "monkey") self.invitee = self.register_user("invitee", "hackme") @@ -473,8 +474,6 @@ class ThirdPartyRulesTestCase(unittest.FederatingHomeserverTestCase): def _send_event_over_federation(self) -> None: """Send a dummy event over federation and check that the request succeeds.""" body = { - "origin": self.hs.config.server.server_name, - "origin_server_ts": self.clock.time_msec(), "pdus": [ { "sender": self.user_id, @@ -492,11 +491,10 @@ class ThirdPartyRulesTestCase(unittest.FederatingHomeserverTestCase): ], } - channel = self.make_request( + channel = self.make_signed_federation_request( method="PUT", path="/_matrix/federation/v1/send/1", content=body, - federation_auth_origin=self.hs.config.server.server_name.encode("utf8"), ) self.assertEqual(channel.code, 200, channel.result) diff --git a/tests/unittest.py b/tests/unittest.py index 6fc617601a..a71892cb9d 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -17,6 +17,7 @@ import gc import hashlib import hmac import inspect +import json import logging import secrets import time @@ -36,9 +37,11 @@ from typing import ( ) from unittest.mock import Mock, patch -from canonicaljson import json +import canonicaljson +import signedjson.key +import unpaddedbase64 -from twisted.internet.defer import Deferred, ensureDeferred, succeed +from twisted.internet.defer import Deferred, ensureDeferred from twisted.python.failure import Failure from twisted.python.threadpool import ThreadPool from twisted.test.proto_helpers import MemoryReactor @@ -49,8 +52,7 @@ from twisted.web.server import Request from synapse import events from synapse.api.constants import EventTypes, Membership from synapse.config.homeserver import HomeServerConfig -from synapse.config.ratelimiting import FederationRateLimitConfig -from synapse.federation.transport import server as federation_server +from synapse.federation.transport.server import TransportLayerServer from synapse.http.server import JsonResource from synapse.http.site import SynapseRequest, SynapseSite from synapse.logging.context import ( @@ -61,10 +63,10 @@ from synapse.logging.context import ( ) from synapse.rest import RegisterServletsFunc from synapse.server import HomeServer +from synapse.storage.keys import FetchKeyResult from synapse.types import JsonDict, UserID, create_requester from synapse.util import Clock from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.ratelimitutils import FederationRateLimiter from tests.server import FakeChannel, get_clock, make_request, setup_test_homeserver from tests.test_utils import event_injection, setup_awaitable_errors @@ -755,42 +757,116 @@ class HomeserverTestCase(TestCase): class FederatingHomeserverTestCase(HomeserverTestCase): """ - A federating homeserver that authenticates incoming requests as `other.example.com`. + A federating homeserver, set up to validate incoming federation requests """ - def create_resource_dict(self) -> Dict[str, Resource]: - d = super().create_resource_dict() - d["/_matrix/federation"] = TestTransportLayerServer(self.hs) - return d + OTHER_SERVER_NAME = "other.example.com" + OTHER_SERVER_SIGNATURE_KEY = signedjson.key.generate_signing_key("test") + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer): + super().prepare(reactor, clock, hs) -class TestTransportLayerServer(JsonResource): - """A test implementation of TransportLayerServer + # poke the other server's signing key into the key store, so that we don't + # make requests for it + verify_key = signedjson.key.get_verify_key(self.OTHER_SERVER_SIGNATURE_KEY) + verify_key_id = "%s:%s" % (verify_key.alg, verify_key.version) - authenticates incoming requests as `other.example.com`. - """ + self.get_success( + hs.get_datastore().store_server_verify_keys( + from_server=self.OTHER_SERVER_NAME, + ts_added_ms=clock.time_msec(), + verify_keys=[ + ( + self.OTHER_SERVER_NAME, + verify_key_id, + FetchKeyResult( + verify_key=verify_key, + valid_until_ts=clock.time_msec() + 1000, + ), + ) + ], + ) + ) + + def create_resource_dict(self) -> Dict[str, Resource]: + d = super().create_resource_dict() + d["/_matrix/federation"] = TransportLayerServer(self.hs) + return d - def __init__(self, hs): - super().__init__(hs) + def make_signed_federation_request( + self, + method: str, + path: str, + content: Optional[JsonDict] = None, + await_result: bool = True, + custom_headers: Optional[Iterable[Tuple[AnyStr, AnyStr]]] = None, + client_ip: str = "127.0.0.1", + ) -> FakeChannel: + """Make an inbound signed federation request to this server - class Authenticator: - def authenticate_request(self, request, content): - return succeed("other.example.com") + The request is signed as if it came from "other.example.com", which our HS + already has the keys for. + """ - authenticator = Authenticator() + if custom_headers is None: + custom_headers = [] + else: + custom_headers = list(custom_headers) + + custom_headers.append( + ( + "Authorization", + _auth_header_for_request( + origin=self.OTHER_SERVER_NAME, + destination=self.hs.hostname, + signing_key=self.OTHER_SERVER_SIGNATURE_KEY, + method=method, + path=path, + content=content, + ), + ) + ) - ratelimiter = FederationRateLimiter( - hs.get_clock(), - FederationRateLimitConfig( - window_size=1, - sleep_limit=1, - sleep_delay=1, - reject_limit=1000, - concurrent=1000, - ), + return make_request( + self.reactor, + self.site, + method=method, + path=path, + content=content, + shorthand=False, + await_result=await_result, + custom_headers=custom_headers, + client_ip=client_ip, ) - federation_server.register_servlets(hs, self, authenticator, ratelimiter) + +def _auth_header_for_request( + origin: str, + destination: str, + signing_key: signedjson.key.SigningKey, + method: str, + path: str, + content: Optional[JsonDict], +) -> str: + """Build a suitable Authorization header for an outgoing federation request""" + request_description: JsonDict = { + "method": method, + "uri": path, + "destination": destination, + "origin": origin, + } + if content is not None: + request_description["content"] = content + signature_base64 = unpaddedbase64.encode_base64( + signing_key.sign( + canonicaljson.encode_canonical_json(request_description) + ).signature + ) + return ( + f"X-Matrix origin={origin}," + f"key={signing_key.alg}:{signing_key.version}," + f"sig={signature_base64}" + ) def override_config(extra_config): -- cgit 1.5.1 From a121507cfec0ffce45a89f5a1019034eda5b0c70 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 11 Feb 2022 07:20:16 -0500 Subject: Adds misc missing type hints (#11953) --- changelog.d/11953.misc | 1 + mypy.ini | 6 +++++ synapse/event_auth.py | 4 +++- synapse/handlers/oidc.py | 4 ++-- synapse/http/client.py | 11 ++++----- synapse/http/matrixfederationclient.py | 3 +-- synapse/notifier.py | 43 +++++++++++++++++++--------------- synapse/server.py | 8 +++---- tests/handlers/test_oidc.py | 9 ++----- 9 files changed, 48 insertions(+), 41 deletions(-) create mode 100644 changelog.d/11953.misc (limited to 'tests') diff --git a/changelog.d/11953.misc b/changelog.d/11953.misc new file mode 100644 index 0000000000..d44571b731 --- /dev/null +++ b/changelog.d/11953.misc @@ -0,0 +1 @@ +Add missing type hints. diff --git a/mypy.ini b/mypy.ini index cd28ac0dd2..63848d664c 100644 --- a/mypy.ini +++ b/mypy.ini @@ -142,6 +142,9 @@ disallow_untyped_defs = True [mypy-synapse.crypto.*] disallow_untyped_defs = True +[mypy-synapse.event_auth] +disallow_untyped_defs = True + [mypy-synapse.events.*] disallow_untyped_defs = True @@ -166,6 +169,9 @@ disallow_untyped_defs = True [mypy-synapse.module_api.*] disallow_untyped_defs = True +[mypy-synapse.notifier] +disallow_untyped_defs = True + [mypy-synapse.push.*] disallow_untyped_defs = True diff --git a/synapse/event_auth.py b/synapse/event_auth.py index e885961698..19b55a9559 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -763,7 +763,9 @@ def get_named_level(auth_events: StateMap[EventBase], name: str, default: int) - return default -def _verify_third_party_invite(event: EventBase, auth_events: StateMap[EventBase]): +def _verify_third_party_invite( + event: EventBase, auth_events: StateMap[EventBase] +) -> bool: """ Validates that the invite event is authorized by a previous third-party invite. diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py index deb3539751..8f71d975e9 100644 --- a/synapse/handlers/oidc.py +++ b/synapse/handlers/oidc.py @@ -544,9 +544,9 @@ class OidcProvider: """ metadata = await self.load_metadata() token_endpoint = metadata.get("token_endpoint") - raw_headers = { + raw_headers: Dict[str, str] = { "Content-Type": "application/x-www-form-urlencoded", - "User-Agent": self._http_client.user_agent, + "User-Agent": self._http_client.user_agent.decode("ascii"), "Accept": "application/json", } diff --git a/synapse/http/client.py b/synapse/http/client.py index d617055617..c01d2326cf 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -322,21 +322,20 @@ class SimpleHttpClient: self._ip_whitelist = ip_whitelist self._ip_blacklist = ip_blacklist self._extra_treq_args = treq_args or {} - - self.user_agent = hs.version_string self.clock = hs.get_clock() + + user_agent = hs.version_string if hs.config.server.user_agent_suffix: - self.user_agent = "%s %s" % ( - self.user_agent, + user_agent = "%s %s" % ( + user_agent, hs.config.server.user_agent_suffix, ) + self.user_agent = user_agent.encode("ascii") # We use this for our body producers to ensure that they use the correct # reactor. self._cooperator = Cooperator(scheduler=_make_scheduler(hs.get_reactor())) - self.user_agent = self.user_agent.encode("ascii") - if self._ip_blacklist: # If we have an IP blacklist, we need to use a DNS resolver which # filters out blacklisted IP addresses, to prevent DNS rebinding. diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 2e668363b2..c5f8fcbb2a 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -334,12 +334,11 @@ class MatrixFederationHttpClient: user_agent = hs.version_string if hs.config.server.user_agent_suffix: user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix) - user_agent = user_agent.encode("ascii") federation_agent = MatrixFederationAgent( self.reactor, tls_client_options_factory, - user_agent, + user_agent.encode("ascii"), hs.config.server.federation_ip_range_whitelist, hs.config.server.federation_ip_range_blacklist, ) diff --git a/synapse/notifier.py b/synapse/notifier.py index 5988c67d90..e0fad2da66 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -14,6 +14,7 @@ import logging from typing import ( + TYPE_CHECKING, Awaitable, Callable, Collection, @@ -32,7 +33,6 @@ from prometheus_client import Counter from twisted.internet import defer -import synapse.server from synapse.api.constants import EventTypes, HistoryVisibility, Membership from synapse.api.errors import AuthError from synapse.events import EventBase @@ -53,6 +53,9 @@ from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.metrics import Measure from synapse.visibility import filter_events_for_client +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) notified_events_counter = Counter("synapse_notifier_notified_events", "") @@ -82,7 +85,7 @@ class _NotificationListener: __slots__ = ["deferred"] - def __init__(self, deferred): + def __init__(self, deferred: "defer.Deferred"): self.deferred = deferred @@ -124,7 +127,7 @@ class _NotifierUserStream: stream_key: str, stream_id: Union[int, RoomStreamToken], time_now_ms: int, - ): + ) -> None: """Notify any listeners for this user of a new event from an event source. Args: @@ -152,7 +155,7 @@ class _NotifierUserStream: self.notify_deferred = ObservableDeferred(defer.Deferred()) noify_deferred.callback(self.current_token) - def remove(self, notifier: "Notifier"): + def remove(self, notifier: "Notifier") -> None: """Remove this listener from all the indexes in the Notifier it knows about. """ @@ -188,7 +191,7 @@ class EventStreamResult: start_token: StreamToken end_token: StreamToken - def __bool__(self): + def __bool__(self) -> bool: return bool(self.events) @@ -212,7 +215,7 @@ class Notifier: UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 - def __init__(self, hs: "synapse.server.HomeServer"): + def __init__(self, hs: "HomeServer"): self.user_to_user_stream: Dict[str, _NotifierUserStream] = {} self.room_to_user_streams: Dict[str, Set[_NotifierUserStream]] = {} @@ -248,7 +251,7 @@ class Notifier: # This is not a very cheap test to perform, but it's only executed # when rendering the metrics page, which is likely once per minute at # most when scraping it. - def count_listeners(): + def count_listeners() -> int: all_user_streams: Set[_NotifierUserStream] = set() for streams in list(self.room_to_user_streams.values()): @@ -270,7 +273,7 @@ class Notifier: "synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream) ) - def add_replication_callback(self, cb: Callable[[], None]): + def add_replication_callback(self, cb: Callable[[], None]) -> None: """Add a callback that will be called when some new data is available. Callback is not given any arguments. It should *not* return a Deferred - if it needs to do any asynchronous work, a background thread should be started and @@ -284,7 +287,7 @@ class Notifier: event_pos: PersistedEventPosition, max_room_stream_token: RoomStreamToken, extra_users: Optional[Collection[UserID]] = None, - ): + ) -> None: """Unwraps event and calls `on_new_room_event_args`.""" await self.on_new_room_event_args( event_pos=event_pos, @@ -307,7 +310,7 @@ class Notifier: event_pos: PersistedEventPosition, max_room_stream_token: RoomStreamToken, extra_users: Optional[Collection[UserID]] = None, - ): + ) -> None: """Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -338,7 +341,9 @@ class Notifier: self.notify_replication() - def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken): + def _notify_pending_new_room_events( + self, max_room_stream_token: RoomStreamToken + ) -> None: """Notify for the room events that were queued waiting for a previous event to be persisted. Args: @@ -374,7 +379,7 @@ class Notifier: ) self._on_updated_room_token(max_room_stream_token) - def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken): + def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken) -> None: """Poke services that might care that the room position has been updated. """ @@ -386,13 +391,13 @@ class Notifier: if self.federation_sender: self.federation_sender.notify_new_events(max_room_stream_token) - def _notify_app_services(self, max_room_stream_token: RoomStreamToken): + def _notify_app_services(self, max_room_stream_token: RoomStreamToken) -> None: try: self.appservice_handler.notify_interested_services(max_room_stream_token) except Exception: logger.exception("Error notifying application services of event") - def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): + def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken) -> None: try: self._pusher_pool.on_new_notifications(max_room_stream_token) except Exception: @@ -475,8 +480,8 @@ class Notifier: user_id: str, timeout: int, callback: Callable[[StreamToken, StreamToken], Awaitable[T]], - room_ids=None, - from_token=StreamToken.START, + room_ids: Optional[Collection[str]] = None, + from_token: StreamToken = StreamToken.START, ) -> T: """Wait until the callback returns a non empty response or the timeout fires. @@ -700,14 +705,14 @@ class Notifier: for expired_stream in expired_streams: expired_stream.remove(self) - def _register_with_keys(self, user_stream: _NotifierUserStream): + def _register_with_keys(self, user_stream: _NotifierUserStream) -> None: self.user_to_user_stream[user_stream.user_id] = user_stream for room in user_stream.rooms: s = self.room_to_user_streams.setdefault(room, set()) s.add(user_stream) - def _user_joined_room(self, user_id: str, room_id: str): + def _user_joined_room(self, user_id: str, room_id: str) -> None: new_user_stream = self.user_to_user_stream.get(user_id) if new_user_stream is not None: room_streams = self.room_to_user_streams.setdefault(room_id, set()) @@ -719,7 +724,7 @@ class Notifier: for cb in self.replication_callbacks: cb() - def notify_remote_server_up(self, server: str): + def notify_remote_server_up(self, server: str) -> None: """Notify any replication that a remote server has come back up""" # We call federation_sender directly rather than registering as a # callback as a) we already have a reference to it and b) it introduces diff --git a/synapse/server.py b/synapse/server.py index 3032f0b738..564afdcb96 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -233,8 +233,8 @@ class HomeServer(metaclass=abc.ABCMeta): self, hostname: str, config: HomeServerConfig, - reactor=None, - version_string="Synapse", + reactor: Optional[ISynapseReactor] = None, + version_string: str = "Synapse", ): """ Args: @@ -244,7 +244,7 @@ class HomeServer(metaclass=abc.ABCMeta): if not reactor: from twisted.internet import reactor as _reactor - reactor = _reactor + reactor = cast(ISynapseReactor, _reactor) self._reactor = reactor self.hostname = hostname @@ -264,7 +264,7 @@ class HomeServer(metaclass=abc.ABCMeta): self._module_web_resources: Dict[str, Resource] = {} self._module_web_resources_consumed = False - def register_module_web_resource(self, path: str, resource: Resource): + def register_module_web_resource(self, path: str, resource: Resource) -> None: """Allows a module to register a web resource to be served at the given path. If multiple modules register a resource for the same path, the module that diff --git a/tests/handlers/test_oidc.py b/tests/handlers/test_oidc.py index cfe3de5266..a552d8182e 100644 --- a/tests/handlers/test_oidc.py +++ b/tests/handlers/test_oidc.py @@ -155,7 +155,7 @@ class OidcHandlerTestCase(HomeserverTestCase): def make_homeserver(self, reactor, clock): self.http_client = Mock(spec=["get_json"]) self.http_client.get_json.side_effect = get_json - self.http_client.user_agent = "Synapse Test" + self.http_client.user_agent = b"Synapse Test" hs = self.setup_test_homeserver(proxied_http_client=self.http_client) @@ -438,12 +438,9 @@ class OidcHandlerTestCase(HomeserverTestCase): state = "state" nonce = "nonce" client_redirect_url = "http://client/redirect" - user_agent = "Browser" ip_address = "10.0.0.1" session = self._generate_oidc_session_token(state, nonce, client_redirect_url) - request = _build_callback_request( - code, state, session, user_agent=user_agent, ip_address=ip_address - ) + request = _build_callback_request(code, state, session, ip_address=ip_address) self.get_success(self.handler.handle_oidc_callback(request)) @@ -1274,7 +1271,6 @@ def _build_callback_request( code: str, state: str, session: str, - user_agent: str = "Browser", ip_address: str = "10.0.0.1", ): """Builds a fake SynapseRequest to mock the browser callback @@ -1289,7 +1285,6 @@ def _build_callback_request( query param. Should be the same as was embedded in the session in _build_oidc_session. session: the "session" which would have been passed around in the cookie. - user_agent: the user-agent to present ip_address: the IP address to pretend the request came from """ request = Mock( -- cgit 1.5.1 From 79fb64e417686c91eeb64016e91dffeac9115a80 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 Feb 2022 13:38:05 +0000 Subject: Fix to-device being dropped in limited sync in SQLite. (#11966) If ther are more than 100 to-device messages pending for a device `/sync` will only return the first 100, however the next batch token was incorrectly calculated and so all other pending messages would be dropped. This is due to `txn.rowcount` only returning the number of rows that *changed*, rather than the number *selected* in SQLite. --- changelog.d/11966.bugfix | 1 + synapse/storage/databases/main/deviceinbox.py | 5 +++- tests/rest/client/test_sendtodevice.py | 40 +++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) create mode 100644 changelog.d/11966.bugfix (limited to 'tests') diff --git a/changelog.d/11966.bugfix b/changelog.d/11966.bugfix new file mode 100644 index 0000000000..af8e096667 --- /dev/null +++ b/changelog.d/11966.bugfix @@ -0,0 +1 @@ +Fix to-device messages being dropped during limited sync when using SQLite. diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 8801b7b2dd..1392363de1 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -362,7 +362,10 @@ class DeviceInboxWorkerStore(SQLBaseStore): # intended for each device. last_processed_stream_pos = to_stream_id recipient_device_to_messages: Dict[Tuple[str, str], List[JsonDict]] = {} + rowcount = 0 for row in txn: + rowcount += 1 + last_processed_stream_pos = row[0] recipient_user_id = row[1] recipient_device_id = row[2] @@ -373,7 +376,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): (recipient_user_id, recipient_device_id), [] ).append(message_dict) - if limit is not None and txn.rowcount == limit: + if limit is not None and rowcount == limit: # We ended up bumping up against the message limit. There may be more messages # to retrieve. Return what we have, as well as the last stream position that # was processed. diff --git a/tests/rest/client/test_sendtodevice.py b/tests/rest/client/test_sendtodevice.py index 6db7062a8e..e2ed14457f 100644 --- a/tests/rest/client/test_sendtodevice.py +++ b/tests/rest/client/test_sendtodevice.py @@ -198,3 +198,43 @@ class SendToDeviceTestCase(HomeserverTestCase): "content": {"idx": 3}, }, ) + + def test_limited_sync(self): + """If a limited sync for to-devices happens the next /sync should respond immediately.""" + + self.register_user("u1", "pass") + user1_tok = self.login("u1", "pass", "d1") + + user2 = self.register_user("u2", "pass") + user2_tok = self.login("u2", "pass", "d2") + + # Do an initial sync + channel = self.make_request("GET", "/sync", access_token=user2_tok) + self.assertEqual(channel.code, 200, channel.result) + sync_token = channel.json_body["next_batch"] + + # Send 150 to-device messages. We limit to 100 in `/sync` + for i in range(150): + test_msg = {"foo": "bar"} + chan = self.make_request( + "PUT", + f"/_matrix/client/r0/sendToDevice/m.test/1234-{i}", + content={"messages": {user2: {"d2": test_msg}}}, + access_token=user1_tok, + ) + self.assertEqual(chan.code, 200, chan.result) + + channel = self.make_request( + "GET", f"/sync?since={sync_token}&timeout=300000", access_token=user2_tok + ) + self.assertEqual(channel.code, 200, channel.result) + messages = channel.json_body.get("to_device", {}).get("events", []) + self.assertEqual(len(messages), 100) + sync_token = channel.json_body["next_batch"] + + channel = self.make_request( + "GET", f"/sync?since={sync_token}&timeout=300000", access_token=user2_tok + ) + self.assertEqual(channel.code, 200, channel.result) + messages = channel.json_body.get("to_device", {}).get("events", []) + self.assertEqual(len(messages), 50) -- cgit 1.5.1 From 0171fa5226a6aa808d9965dab20f22f9794810d9 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 11 Feb 2022 14:58:11 +0100 Subject: Remove deprecated user_may_create_room_with_invites callback (#11950) Co-authored-by: Patrick Cloke --- changelog.d/11950.removal | 1 + docs/upgrade.md | 29 ++++++---- synapse/events/spamcheck.py | 42 -------------- synapse/handlers/room.py | 5 -- synapse/module_api/__init__.py | 5 -- tests/rest/client/test_rooms.py | 119 +--------------------------------------- 6 files changed, 22 insertions(+), 179 deletions(-) create mode 100644 changelog.d/11950.removal (limited to 'tests') diff --git a/changelog.d/11950.removal b/changelog.d/11950.removal new file mode 100644 index 0000000000..f75de40f2f --- /dev/null +++ b/changelog.d/11950.removal @@ -0,0 +1 @@ +Remove deprecated `user_may_create_room_with_invites` spam checker callback. See the [upgrade notes](https://matrix-org.github.io/synapse/latest/upgrade.html#removal-of-user_may_create_room_with_invites) for more information. diff --git a/docs/upgrade.md b/docs/upgrade.md index c5e0697333..6f20000295 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -84,7 +84,18 @@ process, for example: wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb ``` -# Upgrading to v1.(next) + +# Upgrading to v1.53.0 + +## Dropping support for `webclient` listeners and non-HTTP(S) `web_client_location` + +Per the deprecation notice in Synapse v1.51.0, listeners of type `webclient` +are no longer supported and configuring them is a now a configuration error. + +Configuring a non-HTTP(S) `web_client_location` configuration is is now a +configuration error. Since the `webclient` listener is no longer supported, this +setting only applies to the root path `/` of Synapse's web server and no longer +the `/_matrix/client/` path. ## Stablisation of MSC3231 @@ -119,17 +130,15 @@ The new `capabilities` are now active by default. -# Upgrading to v1.53.0 - -## Dropping support for `webclient` listeners and non-HTTP(S) `web_client_location` +## Removal of `user_may_create_room_with_invites` -Per the deprecation notice in Synapse v1.51.0, listeners of type `webclient` -are no longer supported and configuring them is a now a configuration error. +As announced with the release of [Synapse 1.47.0](#deprecation-of-the-user_may_create_room_with_invites-module-callback), +the deprecated `user_may_create_room_with_invites` module callback has been removed. -Configuring a non-HTTP(S) `web_client_location` configuration is is now a -configuration error. Since the `webclient` listener is no longer supported, this -setting only applies to the root path `/` of Synapse's web server and no longer -the `/_matrix/client/` path. +Modules relying on it can instead implement [`user_may_invite`](https://matrix-org.github.io/synapse/latest/modules/spam_checker_callbacks.html#user_may_invite) +and use the [`get_room_state`](https://github.com/matrix-org/synapse/blob/872f23b95fa980a61b0866c1475e84491991fa20/synapse/module_api/__init__.py#L869-L876) +module API to infer whether the invite is happening while creating a room (see [this function](https://github.com/matrix-org/synapse-domain-rule-checker/blob/e7d092dd9f2a7f844928771dbfd9fd24c2332e48/synapse_domain_rule_checker/__init__.py#L56-L89) +as an example). Alternately, modules can also implement [`on_create_room`](https://matrix-org.github.io/synapse/latest/modules/third_party_rules_callbacks.html#on_create_room). # Upgrading to v1.52.0 diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index 3134beb8d3..04afd48274 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -48,9 +48,6 @@ USER_MAY_JOIN_ROOM_CALLBACK = Callable[[str, str, bool], Awaitable[bool]] USER_MAY_INVITE_CALLBACK = Callable[[str, str, str], Awaitable[bool]] USER_MAY_SEND_3PID_INVITE_CALLBACK = Callable[[str, str, str, str], Awaitable[bool]] USER_MAY_CREATE_ROOM_CALLBACK = Callable[[str], Awaitable[bool]] -USER_MAY_CREATE_ROOM_WITH_INVITES_CALLBACK = Callable[ - [str, List[str], List[Dict[str, str]]], Awaitable[bool] -] USER_MAY_CREATE_ROOM_ALIAS_CALLBACK = Callable[[str, RoomAlias], Awaitable[bool]] USER_MAY_PUBLISH_ROOM_CALLBACK = Callable[[str, str], Awaitable[bool]] CHECK_USERNAME_FOR_SPAM_CALLBACK = Callable[[Dict[str, str]], Awaitable[bool]] @@ -174,9 +171,6 @@ class SpamChecker: USER_MAY_SEND_3PID_INVITE_CALLBACK ] = [] self._user_may_create_room_callbacks: List[USER_MAY_CREATE_ROOM_CALLBACK] = [] - self._user_may_create_room_with_invites_callbacks: List[ - USER_MAY_CREATE_ROOM_WITH_INVITES_CALLBACK - ] = [] self._user_may_create_room_alias_callbacks: List[ USER_MAY_CREATE_ROOM_ALIAS_CALLBACK ] = [] @@ -198,9 +192,6 @@ class SpamChecker: user_may_invite: Optional[USER_MAY_INVITE_CALLBACK] = None, user_may_send_3pid_invite: Optional[USER_MAY_SEND_3PID_INVITE_CALLBACK] = None, user_may_create_room: Optional[USER_MAY_CREATE_ROOM_CALLBACK] = None, - user_may_create_room_with_invites: Optional[ - USER_MAY_CREATE_ROOM_WITH_INVITES_CALLBACK - ] = None, user_may_create_room_alias: Optional[ USER_MAY_CREATE_ROOM_ALIAS_CALLBACK ] = None, @@ -229,11 +220,6 @@ class SpamChecker: if user_may_create_room is not None: self._user_may_create_room_callbacks.append(user_may_create_room) - if user_may_create_room_with_invites is not None: - self._user_may_create_room_with_invites_callbacks.append( - user_may_create_room_with_invites, - ) - if user_may_create_room_alias is not None: self._user_may_create_room_alias_callbacks.append( user_may_create_room_alias, @@ -359,34 +345,6 @@ class SpamChecker: return True - async def user_may_create_room_with_invites( - self, - userid: str, - invites: List[str], - threepid_invites: List[Dict[str, str]], - ) -> bool: - """Checks if a given user may create a room with invites - - If this method returns false, the creation request will be rejected. - - Args: - userid: The ID of the user attempting to create a room - invites: The IDs of the Matrix users to be invited if the room creation is - allowed. - threepid_invites: The threepids to be invited if the room creation is allowed, - as a dict including a "medium" key indicating the threepid's medium (e.g. - "email") and an "address" key indicating the threepid's address (e.g. - "alice@example.com") - - Returns: - True if the user may create the room, otherwise False - """ - for callback in self._user_may_create_room_with_invites_callbacks: - if await callback(userid, invites, threepid_invites) is False: - return False - - return True - async def user_may_create_room_alias( self, userid: str, room_alias: RoomAlias ) -> bool: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 1420d67729..a990727fc5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -694,11 +694,6 @@ class RoomCreationHandler: if not is_requester_admin and not ( await self.spam_checker.user_may_create_room(user_id) - and await self.spam_checker.user_may_create_room_with_invites( - user_id, - invite_list, - invite_3pid_list, - ) ): raise SynapseError( 403, "You are not permitted to create rooms", Codes.FORBIDDEN diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index a91a7fa3ce..c636779308 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -48,7 +48,6 @@ from synapse.events.spamcheck import ( CHECK_USERNAME_FOR_SPAM_CALLBACK, USER_MAY_CREATE_ROOM_ALIAS_CALLBACK, USER_MAY_CREATE_ROOM_CALLBACK, - USER_MAY_CREATE_ROOM_WITH_INVITES_CALLBACK, USER_MAY_INVITE_CALLBACK, USER_MAY_JOIN_ROOM_CALLBACK, USER_MAY_PUBLISH_ROOM_CALLBACK, @@ -217,9 +216,6 @@ class ModuleApi: user_may_invite: Optional[USER_MAY_INVITE_CALLBACK] = None, user_may_send_3pid_invite: Optional[USER_MAY_SEND_3PID_INVITE_CALLBACK] = None, user_may_create_room: Optional[USER_MAY_CREATE_ROOM_CALLBACK] = None, - user_may_create_room_with_invites: Optional[ - USER_MAY_CREATE_ROOM_WITH_INVITES_CALLBACK - ] = None, user_may_create_room_alias: Optional[ USER_MAY_CREATE_ROOM_ALIAS_CALLBACK ] = None, @@ -240,7 +236,6 @@ class ModuleApi: user_may_invite=user_may_invite, user_may_send_3pid_invite=user_may_send_3pid_invite, user_may_create_room=user_may_create_room, - user_may_create_room_with_invites=user_may_create_room_with_invites, user_may_create_room_alias=user_may_create_room_alias, user_may_publish_room=user_may_publish_room, check_username_for_spam=check_username_for_spam, diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index 10a4a4dc5e..b7f086927b 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -18,7 +18,7 @@ """Tests REST events for /rooms paths.""" import json -from typing import Dict, Iterable, List, Optional +from typing import Iterable, List from unittest.mock import Mock, call from urllib import parse as urlparse @@ -35,7 +35,7 @@ from synapse.api.errors import Codes, HttpResponseException from synapse.handlers.pagination import PurgeStatus from synapse.rest import admin from synapse.rest.client import account, directory, login, profile, room, sync -from synapse.types import JsonDict, Requester, RoomAlias, UserID, create_requester +from synapse.types import JsonDict, RoomAlias, UserID, create_requester from synapse.util.stringutils import random_string from tests import unittest @@ -674,121 +674,6 @@ class RoomsCreateTestCase(RoomBase): channel = self.make_request("POST", "/createRoom", content) self.assertEqual(200, channel.code) - def test_spamchecker_invites(self): - """Tests the user_may_create_room_with_invites spam checker callback.""" - - # Mock do_3pid_invite, so we don't fail from failing to send a 3PID invite to an - # IS. - async def do_3pid_invite( - room_id: str, - inviter: UserID, - medium: str, - address: str, - id_server: str, - requester: Requester, - txn_id: Optional[str], - id_access_token: Optional[str] = None, - ) -> int: - return 0 - - do_3pid_invite_mock = Mock(side_effect=do_3pid_invite) - self.hs.get_room_member_handler().do_3pid_invite = do_3pid_invite_mock - - # Add a mock callback for user_may_create_room_with_invites. Make it allow any - # room creation request for now. - return_value = True - - async def user_may_create_room_with_invites( - user: str, - invites: List[str], - threepid_invites: List[Dict[str, str]], - ) -> bool: - return return_value - - callback_mock = Mock(side_effect=user_may_create_room_with_invites) - self.hs.get_spam_checker()._user_may_create_room_with_invites_callbacks.append( - callback_mock, - ) - - # The MXIDs we'll try to invite. - invited_mxids = [ - "@alice1:red", - "@alice2:red", - "@alice3:red", - "@alice4:red", - ] - - # The 3PIDs we'll try to invite. - invited_3pids = [ - { - "id_server": "example.com", - "id_access_token": "sometoken", - "medium": "email", - "address": "alice1@example.com", - }, - { - "id_server": "example.com", - "id_access_token": "sometoken", - "medium": "email", - "address": "alice2@example.com", - }, - { - "id_server": "example.com", - "id_access_token": "sometoken", - "medium": "email", - "address": "alice3@example.com", - }, - ] - - # Create a room and invite the Matrix users, and check that it succeeded. - channel = self.make_request( - "POST", - "/createRoom", - json.dumps({"invite": invited_mxids}).encode("utf8"), - ) - self.assertEqual(200, channel.code) - - # Check that the callback was called with the right arguments. - expected_call_args = ((self.user_id, invited_mxids, []),) - self.assertEquals( - callback_mock.call_args, - expected_call_args, - callback_mock.call_args, - ) - - # Create a room and invite the 3PIDs, and check that it succeeded. - channel = self.make_request( - "POST", - "/createRoom", - json.dumps({"invite_3pid": invited_3pids}).encode("utf8"), - ) - self.assertEqual(200, channel.code) - - # Check that do_3pid_invite was called the right amount of time - self.assertEquals(do_3pid_invite_mock.call_count, len(invited_3pids)) - - # Check that the callback was called with the right arguments. - expected_call_args = ((self.user_id, [], invited_3pids),) - self.assertEquals( - callback_mock.call_args, - expected_call_args, - callback_mock.call_args, - ) - - # Now deny any room creation. - return_value = False - - # Create a room and invite the 3PIDs, and check that it failed. - channel = self.make_request( - "POST", - "/createRoom", - json.dumps({"invite_3pid": invited_3pids}).encode("utf8"), - ) - self.assertEqual(403, channel.code) - - # Check that do_3pid_invite wasn't called this time. - self.assertEquals(do_3pid_invite_mock.call_count, len(invited_3pids)) - def test_spam_checker_may_join_room(self): """Tests that the user_may_join_room spam checker callback is correctly bypassed when creating a new room. -- cgit 1.5.1 From 4d7e74b2e503373da66ec929afd1aa7010676878 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 11 Feb 2022 11:20:27 -0500 Subject: Support the MSC3715 for `/relations`. (#11941) This adds an unstable org.matrix.msc3715.dir parameter which acts like dir on /mesages. --- changelog.d/11941.feature | 1 + synapse/rest/client/relations.py | 4 ++++ tests/rest/client/test_relations.py | 37 +++++++++++++++++++++++++++++++------ 3 files changed, 36 insertions(+), 6 deletions(-) create mode 100644 changelog.d/11941.feature (limited to 'tests') diff --git a/changelog.d/11941.feature b/changelog.d/11941.feature new file mode 100644 index 0000000000..2b5b11cb9f --- /dev/null +++ b/changelog.d/11941.feature @@ -0,0 +1 @@ +Support the `dir` parameter on the `/relations` endpoint, per [MSC3715](https://github.com/matrix-org/matrix-doc/pull/3715). diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py index 9ec425888a..2cab83c4e6 100644 --- a/synapse/rest/client/relations.py +++ b/synapse/rest/client/relations.py @@ -111,6 +111,9 @@ class RelationPaginationServlet(RestServlet): raise SynapseError(404, "Unknown parent event.") limit = parse_integer(request, "limit", default=5) + direction = parse_string( + request, "org.matrix.msc3715.dir", default="b", allowed_values=["f", "b"] + ) from_token_str = parse_string(request, "from") to_token_str = parse_string(request, "to") @@ -128,6 +131,7 @@ class RelationPaginationServlet(RestServlet): relation_type=relation_type, event_type=event_type, limit=limit, + direction=direction, from_token=from_token, to_token=to_token, ) diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 9768fb2971..de80aca037 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -169,24 +169,28 @@ class RelationsTestCase(unittest.HomeserverTestCase): """Tests that calling pagination API correctly the latest relations.""" channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", "a") self.assertEquals(200, channel.code, channel.json_body) + first_annotation_id = channel.json_body["event_id"] channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", "b") self.assertEquals(200, channel.code, channel.json_body) - annotation_id = channel.json_body["event_id"] + second_annotation_id = channel.json_body["event_id"] channel = self.make_request( "GET", - "/_matrix/client/unstable/rooms/%s/relations/%s?limit=1" - % (self.room, self.parent_id), + f"/_matrix/client/unstable/rooms/{self.room}/relations/{self.parent_id}?limit=1", access_token=self.user_token, ) self.assertEquals(200, channel.code, channel.json_body) - # We expect to get back a single pagination result, which is the full - # relation event we sent above. + # We expect to get back a single pagination result, which is the latest + # full relation event we sent above. self.assertEquals(len(channel.json_body["chunk"]), 1, channel.json_body) self.assert_dict( - {"event_id": annotation_id, "sender": self.user_id, "type": "m.reaction"}, + { + "event_id": second_annotation_id, + "sender": self.user_id, + "type": "m.reaction", + }, channel.json_body["chunk"][0], ) @@ -201,6 +205,27 @@ class RelationsTestCase(unittest.HomeserverTestCase): channel.json_body.get("next_batch"), str, channel.json_body ) + # Request the relations again, but with a different direction. + channel = self.make_request( + "GET", + f"/_matrix/client/unstable/rooms/{self.room}/relations" + f"/{self.parent_id}?limit=1&org.matrix.msc3715.dir=f", + access_token=self.user_token, + ) + self.assertEquals(200, channel.code, channel.json_body) + + # We expect to get back a single pagination result, which is the earliest + # full relation event we sent above. + self.assertEquals(len(channel.json_body["chunk"]), 1, channel.json_body) + self.assert_dict( + { + "event_id": first_annotation_id, + "sender": self.user_id, + "type": "m.reaction", + }, + channel.json_body["chunk"][0], + ) + def _stream_token_to_relation_token(self, token: str) -> str: """Convert a StreamToken into a legacy token (RelationPaginationToken).""" room_key = self.get_success(StreamToken.from_string(self.store, token)).room_key -- cgit 1.5.1 From 63c46349c41aa967e64a5a4042ef5177f934be47 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Sat, 12 Feb 2022 10:44:16 +0000 Subject: Implement MSC3706: partial state in `/send_join` response (#11967) * Make `get_auth_chain_ids` return a Set It has a set internally, and a set is often useful where it gets used, so let's avoid converting to an intermediate list. * Minor refactors in `on_send_join_request` A little bit of non-functional groundwork * Implement MSC3706: partial state in /send_join response --- changelog.d/11967.feature | 1 + synapse/config/experimental.py | 3 + synapse/federation/federation_server.py | 91 +++++++++++-- synapse/federation/transport/server/federation.py | 20 ++- synapse/storage/databases/main/event_federation.py | 12 +- tests/federation/test_federation_server.py | 148 +++++++++++++++++++++ tests/storage/test_event_federation.py | 8 +- 7 files changed, 262 insertions(+), 21 deletions(-) create mode 100644 changelog.d/11967.feature (limited to 'tests') diff --git a/changelog.d/11967.feature b/changelog.d/11967.feature new file mode 100644 index 0000000000..d09320a290 --- /dev/null +++ b/changelog.d/11967.feature @@ -0,0 +1 @@ +Experimental implementation of [MSC3706](https://github.com/matrix-org/matrix-doc/pull/3706): extensions to `/send_join` to support reduced response size. diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index f05a803a71..09d692d9a1 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -61,3 +61,6 @@ class ExperimentalConfig(Config): self.msc2409_to_device_messages_enabled: bool = experimental.get( "msc2409_to_device_messages_enabled", False ) + + # MSC3706 (server-side support for partial state in /send_join responses) + self.msc3706_enabled: bool = experimental.get("msc3706_enabled", False) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index af9cb98f67..482bbdd867 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -20,6 +20,7 @@ from typing import ( Any, Awaitable, Callable, + Collection, Dict, Iterable, List, @@ -64,7 +65,7 @@ from synapse.replication.http.federation import ( ReplicationGetQueryRestServlet, ) from synapse.storage.databases.main.lock import Lock -from synapse.types import JsonDict, get_domain_from_id +from synapse.types import JsonDict, StateMap, get_domain_from_id from synapse.util import json_decoder, unwrapFirstError from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results from synapse.util.caches.response_cache import ResponseCache @@ -571,7 +572,7 @@ class FederationServer(FederationBase): ) -> JsonDict: state_ids = await self.handler.get_state_ids_for_pdu(room_id, event_id) auth_chain_ids = await self.store.get_auth_chain_ids(room_id, state_ids) - return {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids} + return {"pdu_ids": state_ids, "auth_chain_ids": list(auth_chain_ids)} async def _on_context_state_request_compute( self, room_id: str, event_id: Optional[str] @@ -645,27 +646,61 @@ class FederationServer(FederationBase): return {"event": ret_pdu.get_pdu_json(time_now)} async def on_send_join_request( - self, origin: str, content: JsonDict, room_id: str + self, + origin: str, + content: JsonDict, + room_id: str, + caller_supports_partial_state: bool = False, ) -> Dict[str, Any]: event, context = await self._on_send_membership_event( origin, content, Membership.JOIN, room_id ) prev_state_ids = await context.get_prev_state_ids() - state_ids = list(prev_state_ids.values()) - auth_chain = await self.store.get_auth_chain(room_id, state_ids) - state = await self.store.get_events(state_ids) + state_event_ids: Collection[str] + servers_in_room: Optional[Collection[str]] + if caller_supports_partial_state: + state_event_ids = _get_event_ids_for_partial_state_join( + event, prev_state_ids + ) + servers_in_room = await self.state.get_hosts_in_room_at_events( + room_id, event_ids=event.prev_event_ids() + ) + else: + state_event_ids = prev_state_ids.values() + servers_in_room = None + + auth_chain_event_ids = await self.store.get_auth_chain_ids( + room_id, state_event_ids + ) + + # if the caller has opted in, we can omit any auth_chain events which are + # already in state_event_ids + if caller_supports_partial_state: + auth_chain_event_ids.difference_update(state_event_ids) + + auth_chain_events = await self.store.get_events_as_list(auth_chain_event_ids) + state_events = await self.store.get_events_as_list(state_event_ids) + + # we try to do all the async stuff before this point, so that time_now is as + # accurate as possible. time_now = self._clock.time_msec() - event_json = event.get_pdu_json() - return { + event_json = event.get_pdu_json(time_now) + resp = { # TODO Remove the unstable prefix when servers have updated. "org.matrix.msc3083.v2.event": event_json, "event": event_json, - "state": [p.get_pdu_json(time_now) for p in state.values()], - "auth_chain": [p.get_pdu_json(time_now) for p in auth_chain], + "state": [p.get_pdu_json(time_now) for p in state_events], + "auth_chain": [p.get_pdu_json(time_now) for p in auth_chain_events], + "org.matrix.msc3706.partial_state": caller_supports_partial_state, } + if servers_in_room is not None: + resp["org.matrix.msc3706.servers_in_room"] = list(servers_in_room) + + return resp + async def on_make_leave_request( self, origin: str, room_id: str, user_id: str ) -> Dict[str, Any]: @@ -1339,3 +1374,39 @@ class FederationHandlerRegistry: # error. logger.warning("No handler registered for query type %s", query_type) raise NotFoundError("No handler for Query type '%s'" % (query_type,)) + + +def _get_event_ids_for_partial_state_join( + join_event: EventBase, + prev_state_ids: StateMap[str], +) -> Collection[str]: + """Calculate state to be retuned in a partial_state send_join + + Args: + join_event: the join event being send_joined + prev_state_ids: the event ids of the state before the join + + Returns: + the event ids to be returned + """ + + # return all non-member events + state_event_ids = { + event_id + for (event_type, state_key), event_id in prev_state_ids.items() + if event_type != EventTypes.Member + } + + # we also need the current state of the current user (it's going to + # be an auth event for the new join, so we may as well return it) + current_membership_event_id = prev_state_ids.get( + (EventTypes.Member, join_event.state_key) + ) + if current_membership_event_id is not None: + state_event_ids.add(current_membership_event_id) + + # TODO: return a few more members: + # - those with invites + # - those that are kicked? / banned + + return state_event_ids diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index d86dfede4e..310733097d 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -412,6 +412,16 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet): PREFIX = FEDERATION_V2_PREFIX + def __init__( + self, + hs: "HomeServer", + authenticator: Authenticator, + ratelimiter: FederationRateLimiter, + server_name: str, + ): + super().__init__(hs, authenticator, ratelimiter, server_name) + self._msc3706_enabled = hs.config.experimental.msc3706_enabled + async def on_PUT( self, origin: str, @@ -422,7 +432,15 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet): ) -> Tuple[int, JsonDict]: # TODO(paul): assert that event_id parsed from path actually # match those given in content - result = await self.handler.on_send_join_request(origin, content, room_id) + + partial_state = False + if self._msc3706_enabled: + partial_state = parse_boolean_from_args( + query, "org.matrix.msc3706.partial_state", default=False + ) + result = await self.handler.on_send_join_request( + origin, content, room_id, caller_supports_partial_state=partial_state + ) return 200, result diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 22f6474127..277e6422eb 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -121,7 +121,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas room_id: str, event_ids: Collection[str], include_given: bool = False, - ) -> List[str]: + ) -> Set[str]: """Get auth events for given event_ids. The events *must* be state events. Args: @@ -130,7 +130,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas include_given: include the given events in result Returns: - list of event_ids + set of event_ids """ # Check if we have indexed the room so we can use the chain cover @@ -159,7 +159,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas def _get_auth_chain_ids_using_cover_index_txn( self, txn: Cursor, room_id: str, event_ids: Collection[str], include_given: bool - ) -> List[str]: + ) -> Set[str]: """Calculates the auth chain IDs using the chain index.""" # First we look up the chain ID/sequence numbers for the given events. @@ -272,11 +272,11 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas txn.execute(sql, (chain_id, max_no)) results.update(r for r, in txn) - return list(results) + return results def _get_auth_chain_ids_txn( self, txn: LoggingTransaction, event_ids: Collection[str], include_given: bool - ) -> List[str]: + ) -> Set[str]: """Calculates the auth chain IDs. This is used when we don't have a cover index for the room. @@ -331,7 +331,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas front = new_front results.update(front) - return list(results) + return results async def get_auth_chain_difference( self, room_id: str, state_sets: List[Set[str]] diff --git a/tests/federation/test_federation_server.py b/tests/federation/test_federation_server.py index 1af284bd2f..d084919ef7 100644 --- a/tests/federation/test_federation_server.py +++ b/tests/federation/test_federation_server.py @@ -16,12 +16,21 @@ import logging from parameterized import parameterized +from twisted.test.proto_helpers import MemoryReactor + +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.config.server import DEFAULT_ROOM_VERSION +from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events import make_event_from_dict from synapse.federation.federation_server import server_matches_acl_event from synapse.rest import admin from synapse.rest.client import login, room +from synapse.server import HomeServer +from synapse.types import JsonDict +from synapse.util import Clock from tests import unittest +from tests.unittest import override_config class FederationServerTests(unittest.FederatingHomeserverTestCase): @@ -152,6 +161,145 @@ class StateQueryTests(unittest.FederatingHomeserverTestCase): self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN") +class SendJoinFederationTests(unittest.FederatingHomeserverTestCase): + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer): + super().prepare(reactor, clock, hs) + + # create the room + creator_user_id = self.register_user("kermit", "test") + tok = self.login("kermit", "test") + self._room_id = self.helper.create_room_as( + room_creator=creator_user_id, tok=tok + ) + + # a second member on the orgin HS + second_member_user_id = self.register_user("fozzie", "bear") + tok2 = self.login("fozzie", "bear") + self.helper.join(self._room_id, second_member_user_id, tok=tok2) + + def _make_join(self, user_id) -> JsonDict: + channel = self.make_signed_federation_request( + "GET", + f"/_matrix/federation/v1/make_join/{self._room_id}/{user_id}" + f"?ver={DEFAULT_ROOM_VERSION}", + ) + self.assertEquals(channel.code, 200, channel.json_body) + return channel.json_body + + def test_send_join(self): + """happy-path test of send_join""" + joining_user = "@misspiggy:" + self.OTHER_SERVER_NAME + join_result = self._make_join(joining_user) + + join_event_dict = join_result["event"] + add_hashes_and_signatures( + KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION], + join_event_dict, + signature_name=self.OTHER_SERVER_NAME, + signing_key=self.OTHER_SERVER_SIGNATURE_KEY, + ) + channel = self.make_signed_federation_request( + "PUT", + f"/_matrix/federation/v2/send_join/{self._room_id}/x", + content=join_event_dict, + ) + self.assertEquals(channel.code, 200, channel.json_body) + + # we should get complete room state back + returned_state = [ + (ev["type"], ev["state_key"]) for ev in channel.json_body["state"] + ] + self.assertCountEqual( + returned_state, + [ + ("m.room.create", ""), + ("m.room.power_levels", ""), + ("m.room.join_rules", ""), + ("m.room.history_visibility", ""), + ("m.room.member", "@kermit:test"), + ("m.room.member", "@fozzie:test"), + # nb: *not* the joining user + ], + ) + + # also check the auth chain + returned_auth_chain_events = [ + (ev["type"], ev["state_key"]) for ev in channel.json_body["auth_chain"] + ] + self.assertCountEqual( + returned_auth_chain_events, + [ + ("m.room.create", ""), + ("m.room.member", "@kermit:test"), + ("m.room.power_levels", ""), + ("m.room.join_rules", ""), + ], + ) + + # the room should show that the new user is a member + r = self.get_success( + self.hs.get_state_handler().get_current_state(self._room_id) + ) + self.assertEqual(r[("m.room.member", joining_user)].membership, "join") + + @override_config({"experimental_features": {"msc3706_enabled": True}}) + def test_send_join_partial_state(self): + """When MSC3706 support is enabled, /send_join should return partial state""" + joining_user = "@misspiggy:" + self.OTHER_SERVER_NAME + join_result = self._make_join(joining_user) + + join_event_dict = join_result["event"] + add_hashes_and_signatures( + KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION], + join_event_dict, + signature_name=self.OTHER_SERVER_NAME, + signing_key=self.OTHER_SERVER_SIGNATURE_KEY, + ) + channel = self.make_signed_federation_request( + "PUT", + f"/_matrix/federation/v2/send_join/{self._room_id}/x?org.matrix.msc3706.partial_state=true", + content=join_event_dict, + ) + self.assertEquals(channel.code, 200, channel.json_body) + + # expect a reduced room state + returned_state = [ + (ev["type"], ev["state_key"]) for ev in channel.json_body["state"] + ] + self.assertCountEqual( + returned_state, + [ + ("m.room.create", ""), + ("m.room.power_levels", ""), + ("m.room.join_rules", ""), + ("m.room.history_visibility", ""), + ], + ) + + # the auth chain should not include anything already in "state" + returned_auth_chain_events = [ + (ev["type"], ev["state_key"]) for ev in channel.json_body["auth_chain"] + ] + self.assertCountEqual( + returned_auth_chain_events, + [ + ("m.room.member", "@kermit:test"), + ], + ) + + # the room should show that the new user is a member + r = self.get_success( + self.hs.get_state_handler().get_current_state(self._room_id) + ) + self.assertEqual(r[("m.room.member", joining_user)].membership, "join") + + def _create_acl_event(content): return make_event_from_dict( { diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 2bc89512f8..667ca90a4d 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -260,16 +260,16 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): self.assertCountEqual(auth_chain_ids, ["h", "i", "j", "k"]) auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["h"])) - self.assertEqual(auth_chain_ids, ["k"]) + self.assertEqual(auth_chain_ids, {"k"}) auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["i"])) - self.assertEqual(auth_chain_ids, ["j"]) + self.assertEqual(auth_chain_ids, {"j"}) # j and k have no parents. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["j"])) - self.assertEqual(auth_chain_ids, []) + self.assertEqual(auth_chain_ids, set()) auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["k"])) - self.assertEqual(auth_chain_ids, []) + self.assertEqual(auth_chain_ids, set()) # More complex input sequences. auth_chain_ids = self.get_success( -- cgit 1.5.1