diff options
author | reivilibre <oliverw@matrix.org> | 2022-05-19 16:29:08 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-19 16:29:08 +0100 |
commit | 177b884ad7cc1ecdd92ff74188732734df203150 (patch) | |
tree | c1134bbcafbe07e7b428cb0c024a1afdcfab954a /tests/replication/tcp/test_handler.py | |
parent | Add detail to `cache_autotuning` config option documentation (#12776) (diff) | |
download | synapse-177b884ad7cc1ecdd92ff74188732734df203150.tar.xz |
Lay some foundation work to allow workers to only subscribe to some kinds of messages, reducing replication traffic. (#12672)
Diffstat (limited to 'tests/replication/tcp/test_handler.py')
-rw-r--r-- | tests/replication/tcp/test_handler.py | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py new file mode 100644 index 0000000000..e6a19eafd5 --- /dev/null +++ b/tests/replication/tcp/test_handler.py @@ -0,0 +1,73 @@ +# Copyright 2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from tests.replication._base import RedisMultiWorkerStreamTestCase + + +class ChannelsTestCase(RedisMultiWorkerStreamTestCase): + def test_subscribed_to_enough_redis_channels(self) -> None: + # The default main process is subscribed to the USER_IP channel. + self.assertCountEqual( + self.hs.get_replication_command_handler()._channels_to_subscribe_to, + ["USER_IP"], + ) + + def test_background_worker_subscribed_to_user_ip(self) -> None: + # The default main process is subscribed to the USER_IP channel. + worker1 = self.make_worker_hs( + "synapse.app.generic_worker", + extra_config={ + "worker_name": "worker1", + "run_background_tasks_on": "worker1", + "redis": {"enabled": True}, + }, + ) + self.assertIn( + "USER_IP", + worker1.get_replication_command_handler()._channels_to_subscribe_to, + ) + + # Advance so the Redis subscription gets processed + self.pump(0.1) + + # The counts are 2 because both the main process and the worker are subscribed. + self.assertEqual(len(self._redis_server._subscribers_by_channel[b"test"]), 2) + self.assertEqual( + len(self._redis_server._subscribers_by_channel[b"test/USER_IP"]), 2 + ) + + def test_non_background_worker_not_subscribed_to_user_ip(self) -> None: + # The default main process is subscribed to the USER_IP channel. + worker2 = self.make_worker_hs( + "synapse.app.generic_worker", + extra_config={ + "worker_name": "worker2", + "run_background_tasks_on": "worker1", + "redis": {"enabled": True}, + }, + ) + self.assertNotIn( + "USER_IP", + worker2.get_replication_command_handler()._channels_to_subscribe_to, + ) + + # Advance so the Redis subscription gets processed + self.pump(0.1) + + # The count is 2 because both the main process and the worker are subscribed. + self.assertEqual(len(self._redis_server._subscribers_by_channel[b"test"]), 2) + # For USER_IP, the count is 1 because only the main process is subscribed. + self.assertEqual( + len(self._redis_server._subscribers_by_channel[b"test/USER_IP"]), 1 + ) |