From 4b965c862dc66c0da5d3240add70e9b5f0aa720b Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Wed, 14 Apr 2021 16:34:27 +0200 Subject: Remove redundant "coding: utf-8" lines (#9786) Part of #9744 Removes all redundant `# -*- coding: utf-8 -*-` lines from files, as python 3 automatically reads source code as utf-8 now. `Signed-off-by: Jonathan de Jong ` --- tests/replication/__init__.py | 1 - tests/replication/_base.py | 1 - tests/replication/slave/__init__.py | 1 - tests/replication/slave/storage/__init__.py | 1 - tests/replication/tcp/__init__.py | 1 - tests/replication/tcp/streams/__init__.py | 1 - tests/replication/tcp/streams/test_account_data.py | 1 - tests/replication/tcp/streams/test_events.py | 1 - tests/replication/tcp/streams/test_federation.py | 1 - tests/replication/tcp/streams/test_receipts.py | 1 - tests/replication/tcp/streams/test_typing.py | 1 - tests/replication/tcp/test_commands.py | 1 - tests/replication/tcp/test_remote_server_up.py | 1 - tests/replication/test_auth.py | 1 - tests/replication/test_client_reader_shard.py | 1 - tests/replication/test_federation_ack.py | 1 - tests/replication/test_federation_sender_shard.py | 1 - tests/replication/test_multi_media_repo.py | 1 - tests/replication/test_pusher_shard.py | 1 - tests/replication/test_sharded_event_persister.py | 1 - 20 files changed, 20 deletions(-) (limited to 'tests/replication') diff --git a/tests/replication/__init__.py b/tests/replication/__init__.py index b7df13c9ee..f43a360a80 100644 --- a/tests/replication/__init__.py +++ b/tests/replication/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/_base.py b/tests/replication/_base.py index aff19d9fb3..36138d69aa 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/slave/__init__.py b/tests/replication/slave/__init__.py index b7df13c9ee..f43a360a80 100644 --- a/tests/replication/slave/__init__.py +++ b/tests/replication/slave/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/slave/storage/__init__.py b/tests/replication/slave/storage/__init__.py index b7df13c9ee..f43a360a80 100644 --- a/tests/replication/slave/storage/__init__.py +++ b/tests/replication/slave/storage/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/__init__.py b/tests/replication/tcp/__init__.py index 1453d04571..743fb9904a 100644 --- a/tests/replication/tcp/__init__.py +++ b/tests/replication/tcp/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/streams/__init__.py b/tests/replication/tcp/streams/__init__.py index 1453d04571..743fb9904a 100644 --- a/tests/replication/tcp/streams/__init__.py +++ b/tests/replication/tcp/streams/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/streams/test_account_data.py b/tests/replication/tcp/streams/test_account_data.py index 153634d4ee..cdd052001b 100644 --- a/tests/replication/tcp/streams/test_account_data.py +++ b/tests/replication/tcp/streams/test_account_data.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/streams/test_events.py b/tests/replication/tcp/streams/test_events.py index 77856fc304..323237c1bb 100644 --- a/tests/replication/tcp/streams/test_events.py +++ b/tests/replication/tcp/streams/test_events.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/streams/test_federation.py b/tests/replication/tcp/streams/test_federation.py index aa4bf1c7e3..ffec06a0d6 100644 --- a/tests/replication/tcp/streams/test_federation.py +++ b/tests/replication/tcp/streams/test_federation.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py index 7d848e41ff..7f5d932f0b 100644 --- a/tests/replication/tcp/streams/test_receipts.py +++ b/tests/replication/tcp/streams/test_receipts.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/streams/test_typing.py b/tests/replication/tcp/streams/test_typing.py index 4a0b342264..ecd360c2d0 100644 --- a/tests/replication/tcp/streams/test_typing.py +++ b/tests/replication/tcp/streams/test_typing.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/test_commands.py b/tests/replication/tcp/test_commands.py index 60c10a441a..cca7ebb719 100644 --- a/tests/replication/tcp/test_commands.py +++ b/tests/replication/tcp/test_commands.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/test_remote_server_up.py b/tests/replication/tcp/test_remote_server_up.py index 1fe9d5b4d0..262c35cef3 100644 --- a/tests/replication/tcp/test_remote_server_up.py +++ b/tests/replication/tcp/test_remote_server_up.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/test_auth.py b/tests/replication/test_auth.py index f8fd8a843c..1346e0e160 100644 --- a/tests/replication/test_auth.py +++ b/tests/replication/test_auth.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/test_client_reader_shard.py b/tests/replication/test_client_reader_shard.py index 5da1d5dc4d..b9751efdc5 100644 --- a/tests/replication/test_client_reader_shard.py +++ b/tests/replication/test_client_reader_shard.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py index 44ad5eec57..04a869e295 100644 --- a/tests/replication/test_federation_ack.py +++ b/tests/replication/test_federation_ack.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py index 8ca595c3ee..48ab3aa4e3 100644 --- a/tests/replication/test_federation_sender_shard.py +++ b/tests/replication/test_federation_sender_shard.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/test_multi_media_repo.py b/tests/replication/test_multi_media_repo.py index b0800f9840..76e6644353 100644 --- a/tests/replication/test_multi_media_repo.py +++ b/tests/replication/test_multi_media_repo.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/test_pusher_shard.py b/tests/replication/test_pusher_shard.py index 1f12bde1aa..1e4e3821b9 100644 --- a/tests/replication/test_pusher_shard.py +++ b/tests/replication/test_pusher_shard.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py index 6c2e1674cb..d739eb6b17 100644 --- a/tests/replication/test_sharded_event_persister.py +++ b/tests/replication/test_sharded_event_persister.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); -- cgit 1.5.1 From 00a6db967655daf1d6db290b7e0d2bb53827ade9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Apr 2021 17:06:06 +0100 Subject: Move some replication processing out of generic_worker (#9796) Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- changelog.d/9796.misc | 1 + synapse/app/generic_worker.py | 470 +------------------------------------- synapse/handlers/presence.py | 246 ++++++++++++++++++++ synapse/replication/tcp/client.py | 231 ++++++++++++++++++- synapse/server.py | 13 +- tests/replication/_base.py | 8 +- 6 files changed, 486 insertions(+), 483 deletions(-) create mode 100644 changelog.d/9796.misc (limited to 'tests/replication') diff --git a/changelog.d/9796.misc b/changelog.d/9796.misc new file mode 100644 index 0000000000..59bb1813c3 --- /dev/null +++ b/changelog.d/9796.misc @@ -0,0 +1 @@ +Move some replication processing out of `generic_worker`. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index e35e17492c..28e3b1aa3c 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -13,12 +13,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import contextlib import logging import sys -from typing import Dict, Iterable, Optional, Set - -from typing_extensions import ContextManager +from typing import Dict, Iterable, Optional from twisted.internet import address from twisted.web.resource import IResource @@ -40,24 +37,13 @@ from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.config.server import ListenerConfig -from synapse.federation import send_queue from synapse.federation.transport.server import TransportLayerServer -from synapse.handlers.presence import ( - BasePresenceHandler, - PresenceState, - get_interested_parties, -) from synapse.http.server import JsonResource, OptionsResource from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy -from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource -from synapse.replication.http.presence import ( - ReplicationBumpPresenceActiveTime, - ReplicationPresenceSetState, -) from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore @@ -77,19 +63,6 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore -from synapse.replication.tcp.client import ReplicationDataHandler -from synapse.replication.tcp.commands import ClearUserSyncsCommand -from synapse.replication.tcp.streams import ( - AccountDataStream, - DeviceListsStream, - GroupServerStream, - PresenceStream, - PushersStream, - PushRulesStream, - ReceiptsStream, - TagAccountDataStream, - ToDeviceStream, -) from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.client.v1 import events, login, room from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet @@ -128,7 +101,7 @@ from synapse.rest.client.versions import VersionsRestServlet from synapse.rest.health import HealthResource from synapse.rest.key.v2 import KeyApiV2Resource from synapse.rest.synapse.client import build_synapse_client_resource_tree -from synapse.server import HomeServer, cache_in_self +from synapse.server import HomeServer from synapse.storage.databases.main.censor_events import CensorEventsStore from synapse.storage.databases.main.client_ips import ClientIpWorkerStore from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore @@ -137,14 +110,11 @@ from synapse.storage.databases.main.metrics import ServerMetricsStore from synapse.storage.databases.main.monthly_active_users import ( MonthlyActiveUsersWorkerStore, ) -from synapse.storage.databases.main.presence import UserPresenceState from synapse.storage.databases.main.search import SearchWorkerStore from synapse.storage.databases.main.stats import StatsStore from synapse.storage.databases.main.transactions import TransactionWorkerStore from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore from synapse.storage.databases.main.user_directory import UserDirectoryStore -from synapse.types import ReadReceipt -from synapse.util.async_helpers import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.versionstring import get_version_string @@ -264,214 +234,6 @@ class KeyUploadServlet(RestServlet): return 200, {"one_time_key_counts": result} -class _NullContextManager(ContextManager[None]): - """A context manager which does nothing.""" - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - -UPDATE_SYNCING_USERS_MS = 10 * 1000 - - -class GenericWorkerPresence(BasePresenceHandler): - def __init__(self, hs): - super().__init__(hs) - self.hs = hs - self.is_mine_id = hs.is_mine_id - - self.presence_router = hs.get_presence_router() - self._presence_enabled = hs.config.use_presence - - # The number of ongoing syncs on this process, by user id. - # Empty if _presence_enabled is false. - self._user_to_num_current_syncs = {} # type: Dict[str, int] - - self.notifier = hs.get_notifier() - self.instance_id = hs.get_instance_id() - - # user_id -> last_sync_ms. Lists the users that have stopped syncing - # but we haven't notified the master of that yet - self.users_going_offline = {} - - self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs) - self._set_state_client = ReplicationPresenceSetState.make_client(hs) - - self._send_stop_syncing_loop = self.clock.looping_call( - self.send_stop_syncing, UPDATE_SYNCING_USERS_MS - ) - - self._busy_presence_enabled = hs.config.experimental.msc3026_enabled - - hs.get_reactor().addSystemEventTrigger( - "before", - "shutdown", - run_as_background_process, - "generic_presence.on_shutdown", - self._on_shutdown, - ) - - def _on_shutdown(self): - if self._presence_enabled: - self.hs.get_tcp_replication().send_command( - ClearUserSyncsCommand(self.instance_id) - ) - - def send_user_sync(self, user_id, is_syncing, last_sync_ms): - if self._presence_enabled: - self.hs.get_tcp_replication().send_user_sync( - self.instance_id, user_id, is_syncing, last_sync_ms - ) - - def mark_as_coming_online(self, user_id): - """A user has started syncing. Send a UserSync to the master, unless they - had recently stopped syncing. - - Args: - user_id (str) - """ - going_offline = self.users_going_offline.pop(user_id, None) - if not going_offline: - # Safe to skip because we haven't yet told the master they were offline - self.send_user_sync(user_id, True, self.clock.time_msec()) - - def mark_as_going_offline(self, user_id): - """A user has stopped syncing. We wait before notifying the master as - its likely they'll come back soon. This allows us to avoid sending - a stopped syncing immediately followed by a started syncing notification - to the master - - Args: - user_id (str) - """ - self.users_going_offline[user_id] = self.clock.time_msec() - - def send_stop_syncing(self): - """Check if there are any users who have stopped syncing a while ago - and haven't come back yet. If there are poke the master about them. - """ - now = self.clock.time_msec() - for user_id, last_sync_ms in list(self.users_going_offline.items()): - if now - last_sync_ms > UPDATE_SYNCING_USERS_MS: - self.users_going_offline.pop(user_id, None) - self.send_user_sync(user_id, False, last_sync_ms) - - async def user_syncing( - self, user_id: str, affect_presence: bool - ) -> ContextManager[None]: - """Record that a user is syncing. - - Called by the sync and events servlets to record that a user has connected to - this worker and is waiting for some events. - """ - if not affect_presence or not self._presence_enabled: - return _NullContextManager() - - curr_sync = self._user_to_num_current_syncs.get(user_id, 0) - self._user_to_num_current_syncs[user_id] = curr_sync + 1 - - # If we went from no in flight sync to some, notify replication - if self._user_to_num_current_syncs[user_id] == 1: - self.mark_as_coming_online(user_id) - - def _end(): - # We check that the user_id is in user_to_num_current_syncs because - # user_to_num_current_syncs may have been cleared if we are - # shutting down. - if user_id in self._user_to_num_current_syncs: - self._user_to_num_current_syncs[user_id] -= 1 - - # If we went from one in flight sync to non, notify replication - if self._user_to_num_current_syncs[user_id] == 0: - self.mark_as_going_offline(user_id) - - @contextlib.contextmanager - def _user_syncing(): - try: - yield - finally: - _end() - - return _user_syncing() - - async def notify_from_replication(self, states, stream_id): - parties = await get_interested_parties(self.store, self.presence_router, states) - room_ids_to_states, users_to_states = parties - - self.notifier.on_new_event( - "presence_key", - stream_id, - rooms=room_ids_to_states.keys(), - users=users_to_states.keys(), - ) - - async def process_replication_rows(self, token, rows): - states = [ - UserPresenceState( - row.user_id, - row.state, - row.last_active_ts, - row.last_federation_update_ts, - row.last_user_sync_ts, - row.status_msg, - row.currently_active, - ) - for row in rows - ] - - for state in states: - self.user_to_current_state[state.user_id] = state - - stream_id = token - await self.notify_from_replication(states, stream_id) - - def get_currently_syncing_users_for_replication(self) -> Iterable[str]: - return [ - user_id - for user_id, count in self._user_to_num_current_syncs.items() - if count > 0 - ] - - async def set_state(self, target_user, state, ignore_status_msg=False): - """Set the presence state of the user.""" - presence = state["presence"] - - valid_presence = ( - PresenceState.ONLINE, - PresenceState.UNAVAILABLE, - PresenceState.OFFLINE, - PresenceState.BUSY, - ) - - if presence not in valid_presence or ( - presence == PresenceState.BUSY and not self._busy_presence_enabled - ): - raise SynapseError(400, "Invalid presence state") - - user_id = target_user.to_string() - - # If presence is disabled, no-op - if not self.hs.config.use_presence: - return - - # Proxy request to master - await self._set_state_client( - user_id=user_id, state=state, ignore_status_msg=ignore_status_msg - ) - - async def bump_presence_active_time(self, user): - """We've seen the user do something that indicates they're interacting - with the app. - """ - # If presence is disabled, no-op - if not self.hs.config.use_presence: - return - - # Proxy request to master - user_id = user.to_string() - await self._bump_active_client(user_id=user_id) - - class GenericWorkerSlavedStore( # FIXME(#3714): We need to add UserDirectoryStore as we write directly # rather than going via the correct worker. @@ -657,234 +419,6 @@ class GenericWorkerServer(HomeServer): self.get_tcp_replication().start_replication(self) - @cache_in_self - def get_replication_data_handler(self): - return GenericWorkerReplicationHandler(self) - - @cache_in_self - def get_presence_handler(self): - return GenericWorkerPresence(self) - - -class GenericWorkerReplicationHandler(ReplicationDataHandler): - def __init__(self, hs): - super().__init__(hs) - - self.store = hs.get_datastore() - self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence - self.notifier = hs.get_notifier() - - self.notify_pushers = hs.config.start_pushers - self.pusher_pool = hs.get_pusherpool() - - self.send_handler = None # type: Optional[FederationSenderHandler] - if hs.config.send_federation: - self.send_handler = FederationSenderHandler(hs) - - async def on_rdata(self, stream_name, instance_name, token, rows): - await super().on_rdata(stream_name, instance_name, token, rows) - await self._process_and_notify(stream_name, instance_name, token, rows) - - async def _process_and_notify(self, stream_name, instance_name, token, rows): - try: - if self.send_handler: - await self.send_handler.process_replication_rows( - stream_name, token, rows - ) - - if stream_name == PushRulesStream.NAME: - self.notifier.on_new_event( - "push_rules_key", token, users=[row.user_id for row in rows] - ) - elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME): - self.notifier.on_new_event( - "account_data_key", token, users=[row.user_id for row in rows] - ) - elif stream_name == ReceiptsStream.NAME: - self.notifier.on_new_event( - "receipt_key", token, rooms=[row.room_id for row in rows] - ) - await self.pusher_pool.on_new_receipts( - token, token, {row.room_id for row in rows} - ) - elif stream_name == ToDeviceStream.NAME: - entities = [row.entity for row in rows if row.entity.startswith("@")] - if entities: - self.notifier.on_new_event("to_device_key", token, users=entities) - elif stream_name == DeviceListsStream.NAME: - all_room_ids = set() # type: Set[str] - for row in rows: - if row.entity.startswith("@"): - room_ids = await self.store.get_rooms_for_user(row.entity) - all_room_ids.update(room_ids) - self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) - elif stream_name == PresenceStream.NAME: - await self.presence_handler.process_replication_rows(token, rows) - elif stream_name == GroupServerStream.NAME: - self.notifier.on_new_event( - "groups_key", token, users=[row.user_id for row in rows] - ) - elif stream_name == PushersStream.NAME: - for row in rows: - if row.deleted: - self.stop_pusher(row.user_id, row.app_id, row.pushkey) - else: - await self.start_pusher(row.user_id, row.app_id, row.pushkey) - except Exception: - logger.exception("Error processing replication") - - async def on_position(self, stream_name: str, instance_name: str, token: int): - await super().on_position(stream_name, instance_name, token) - # Also call on_rdata to ensure that stream positions are properly reset. - await self.on_rdata(stream_name, instance_name, token, []) - - def stop_pusher(self, user_id, app_id, pushkey): - if not self.notify_pushers: - return - - key = "%s:%s" % (app_id, pushkey) - pushers_for_user = self.pusher_pool.pushers.get(user_id, {}) - pusher = pushers_for_user.pop(key, None) - if pusher is None: - return - logger.info("Stopping pusher %r / %r", user_id, key) - pusher.on_stop() - - async def start_pusher(self, user_id, app_id, pushkey): - if not self.notify_pushers: - return - - key = "%s:%s" % (app_id, pushkey) - logger.info("Starting pusher %r / %r", user_id, key) - return await self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id) - - def on_remote_server_up(self, server: str): - """Called when get a new REMOTE_SERVER_UP command.""" - - # Let's wake up the transaction queue for the server in case we have - # pending stuff to send to it. - if self.send_handler: - self.send_handler.wake_destination(server) - - -class FederationSenderHandler: - """Processes the fedration replication stream - - This class is only instantiate on the worker responsible for sending outbound - federation transactions. It receives rows from the replication stream and forwards - the appropriate entries to the FederationSender class. - """ - - def __init__(self, hs: GenericWorkerServer): - self.store = hs.get_datastore() - self._is_mine_id = hs.is_mine_id - self.federation_sender = hs.get_federation_sender() - self._hs = hs - - # Stores the latest position in the federation stream we've gotten up - # to. This is always set before we use it. - self.federation_position = None - - self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") - - def wake_destination(self, server: str): - self.federation_sender.wake_destination(server) - - async def process_replication_rows(self, stream_name, token, rows): - # The federation stream contains things that we want to send out, e.g. - # presence, typing, etc. - if stream_name == "federation": - send_queue.process_rows_for_federation(self.federation_sender, rows) - await self.update_token(token) - - # ... and when new receipts happen - elif stream_name == ReceiptsStream.NAME: - await self._on_new_receipts(rows) - - # ... as well as device updates and messages - elif stream_name == DeviceListsStream.NAME: - # The entities are either user IDs (starting with '@') whose devices - # have changed, or remote servers that we need to tell about - # changes. - hosts = {row.entity for row in rows if not row.entity.startswith("@")} - for host in hosts: - self.federation_sender.send_device_messages(host) - - elif stream_name == ToDeviceStream.NAME: - # The to_device stream includes stuff to be pushed to both local - # clients and remote servers, so we ignore entities that start with - # '@' (since they'll be local users rather than destinations). - hosts = {row.entity for row in rows if not row.entity.startswith("@")} - for host in hosts: - self.federation_sender.send_device_messages(host) - - async def _on_new_receipts(self, rows): - """ - Args: - rows (Iterable[synapse.replication.tcp.streams.ReceiptsStream.ReceiptsStreamRow]): - new receipts to be processed - """ - for receipt in rows: - # we only want to send on receipts for our own users - if not self._is_mine_id(receipt.user_id): - continue - receipt_info = ReadReceipt( - receipt.room_id, - receipt.receipt_type, - receipt.user_id, - [receipt.event_id], - receipt.data, - ) - await self.federation_sender.send_read_receipt(receipt_info) - - async def update_token(self, token): - """Update the record of where we have processed to in the federation stream. - - Called after we have processed a an update received over replication. Sends - a FEDERATION_ACK back to the master, and stores the token that we have processed - in `federation_stream_position` so that we can restart where we left off. - """ - self.federation_position = token - - # We save and send the ACK to master asynchronously, so we don't block - # processing on persistence. We don't need to do this operation for - # every single RDATA we receive, we just need to do it periodically. - - if self._fed_position_linearizer.is_queued(None): - # There is already a task queued up to save and send the token, so - # no need to queue up another task. - return - - run_as_background_process("_save_and_send_ack", self._save_and_send_ack) - - async def _save_and_send_ack(self): - """Save the current federation position in the database and send an ACK - to master with where we're up to. - """ - try: - # We linearize here to ensure we don't have races updating the token - # - # XXX this appears to be redundant, since the ReplicationCommandHandler - # has a linearizer which ensures that we only process one line of - # replication data at a time. Should we remove it, or is it doing useful - # service for robustness? Or could we replace it with an assertion that - # we're not being re-entered? - - with (await self._fed_position_linearizer.queue(None)): - # We persist and ack the same position, so we take a copy of it - # here as otherwise it can get modified from underneath us. - current_position = self.federation_position - - await self.store.update_federation_out_pos( - "federation", current_position - ) - - # We ACK this token over replication so that the master can drop - # its in memory queues - self._hs.get_tcp_replication().send_federation_ack(current_position) - except Exception: - logger.exception("Error updating federation stream position") - def start(config_options): try: diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 251b48148d..e120dd1f48 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -22,6 +22,7 @@ The methods that define policy are: - should_notify """ import abc +import contextlib import logging from contextlib import contextmanager from typing import ( @@ -48,6 +49,11 @@ from synapse.logging.context import run_in_background from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.replication.http.presence import ( + ReplicationBumpPresenceActiveTime, + ReplicationPresenceSetState, +) +from synapse.replication.tcp.commands import ClearUserSyncsCommand from synapse.state import StateHandler from synapse.storage.databases.main import DataStore from synapse.types import Collection, JsonDict, UserID, get_domain_from_id @@ -104,6 +110,10 @@ FEDERATION_PING_INTERVAL = 25 * 60 * 1000 # are dead. EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000 +# Delay before a worker tells the presence handler that a user has stopped +# syncing. +UPDATE_SYNCING_USERS_MS = 10 * 1000 + assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER @@ -208,6 +218,242 @@ class BasePresenceHandler(abc.ABC): with the app. """ + async def update_external_syncs_row( + self, process_id, user_id, is_syncing, sync_time_msec + ): + """Update the syncing users for an external process as a delta. + + This is a no-op when presence is handled by a different worker. + + Args: + process_id (str): An identifier for the process the users are + syncing against. This allows synapse to process updates + as user start and stop syncing against a given process. + user_id (str): The user who has started or stopped syncing + is_syncing (bool): Whether or not the user is now syncing + sync_time_msec(int): Time in ms when the user was last syncing + """ + pass + + async def update_external_syncs_clear(self, process_id): + """Marks all users that had been marked as syncing by a given process + as offline. + + Used when the process has stopped/disappeared. + + This is a no-op when presence is handled by a different worker. + """ + pass + + async def process_replication_rows(self, token, rows): + """Process presence stream rows received over replication.""" + pass + + +class _NullContextManager(ContextManager[None]): + """A context manager which does nothing.""" + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + +class WorkerPresenceHandler(BasePresenceHandler): + def __init__(self, hs): + super().__init__(hs) + self.hs = hs + self.is_mine_id = hs.is_mine_id + + self.presence_router = hs.get_presence_router() + self._presence_enabled = hs.config.use_presence + + # The number of ongoing syncs on this process, by user id. + # Empty if _presence_enabled is false. + self._user_to_num_current_syncs = {} # type: Dict[str, int] + + self.notifier = hs.get_notifier() + self.instance_id = hs.get_instance_id() + + # user_id -> last_sync_ms. Lists the users that have stopped syncing + # but we haven't notified the master of that yet + self.users_going_offline = {} + + self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs) + self._set_state_client = ReplicationPresenceSetState.make_client(hs) + + self._send_stop_syncing_loop = self.clock.looping_call( + self.send_stop_syncing, UPDATE_SYNCING_USERS_MS + ) + + self._busy_presence_enabled = hs.config.experimental.msc3026_enabled + + hs.get_reactor().addSystemEventTrigger( + "before", + "shutdown", + run_as_background_process, + "generic_presence.on_shutdown", + self._on_shutdown, + ) + + def _on_shutdown(self): + if self._presence_enabled: + self.hs.get_tcp_replication().send_command( + ClearUserSyncsCommand(self.instance_id) + ) + + def send_user_sync(self, user_id, is_syncing, last_sync_ms): + if self._presence_enabled: + self.hs.get_tcp_replication().send_user_sync( + self.instance_id, user_id, is_syncing, last_sync_ms + ) + + def mark_as_coming_online(self, user_id): + """A user has started syncing. Send a UserSync to the master, unless they + had recently stopped syncing. + + Args: + user_id (str) + """ + going_offline = self.users_going_offline.pop(user_id, None) + if not going_offline: + # Safe to skip because we haven't yet told the master they were offline + self.send_user_sync(user_id, True, self.clock.time_msec()) + + def mark_as_going_offline(self, user_id): + """A user has stopped syncing. We wait before notifying the master as + its likely they'll come back soon. This allows us to avoid sending + a stopped syncing immediately followed by a started syncing notification + to the master + + Args: + user_id (str) + """ + self.users_going_offline[user_id] = self.clock.time_msec() + + def send_stop_syncing(self): + """Check if there are any users who have stopped syncing a while ago + and haven't come back yet. If there are poke the master about them. + """ + now = self.clock.time_msec() + for user_id, last_sync_ms in list(self.users_going_offline.items()): + if now - last_sync_ms > UPDATE_SYNCING_USERS_MS: + self.users_going_offline.pop(user_id, None) + self.send_user_sync(user_id, False, last_sync_ms) + + async def user_syncing( + self, user_id: str, affect_presence: bool + ) -> ContextManager[None]: + """Record that a user is syncing. + + Called by the sync and events servlets to record that a user has connected to + this worker and is waiting for some events. + """ + if not affect_presence or not self._presence_enabled: + return _NullContextManager() + + curr_sync = self._user_to_num_current_syncs.get(user_id, 0) + self._user_to_num_current_syncs[user_id] = curr_sync + 1 + + # If we went from no in flight sync to some, notify replication + if self._user_to_num_current_syncs[user_id] == 1: + self.mark_as_coming_online(user_id) + + def _end(): + # We check that the user_id is in user_to_num_current_syncs because + # user_to_num_current_syncs may have been cleared if we are + # shutting down. + if user_id in self._user_to_num_current_syncs: + self._user_to_num_current_syncs[user_id] -= 1 + + # If we went from one in flight sync to non, notify replication + if self._user_to_num_current_syncs[user_id] == 0: + self.mark_as_going_offline(user_id) + + @contextlib.contextmanager + def _user_syncing(): + try: + yield + finally: + _end() + + return _user_syncing() + + async def notify_from_replication(self, states, stream_id): + parties = await get_interested_parties(self.store, self.presence_router, states) + room_ids_to_states, users_to_states = parties + + self.notifier.on_new_event( + "presence_key", + stream_id, + rooms=room_ids_to_states.keys(), + users=users_to_states.keys(), + ) + + async def process_replication_rows(self, token, rows): + states = [ + UserPresenceState( + row.user_id, + row.state, + row.last_active_ts, + row.last_federation_update_ts, + row.last_user_sync_ts, + row.status_msg, + row.currently_active, + ) + for row in rows + ] + + for state in states: + self.user_to_current_state[state.user_id] = state + + stream_id = token + await self.notify_from_replication(states, stream_id) + + def get_currently_syncing_users_for_replication(self) -> Iterable[str]: + return [ + user_id + for user_id, count in self._user_to_num_current_syncs.items() + if count > 0 + ] + + async def set_state(self, target_user, state, ignore_status_msg=False): + """Set the presence state of the user.""" + presence = state["presence"] + + valid_presence = ( + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + PresenceState.OFFLINE, + PresenceState.BUSY, + ) + + if presence not in valid_presence or ( + presence == PresenceState.BUSY and not self._busy_presence_enabled + ): + raise SynapseError(400, "Invalid presence state") + + user_id = target_user.to_string() + + # If presence is disabled, no-op + if not self.hs.config.use_presence: + return + + # Proxy request to master + await self._set_state_client( + user_id=user_id, state=state, ignore_status_msg=ignore_status_msg + ) + + async def bump_presence_active_time(self, user): + """We've seen the user do something that indicates they're interacting + with the app. + """ + # If presence is disabled, no-op + if not self.hs.config.use_presence: + return + + # Proxy request to master + user_id = user.to_string() + await self._bump_active_client(user_id=user_id) + class PresenceHandler(BasePresenceHandler): def __init__(self, hs: "HomeServer"): diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index ced69ee904..ce5d651cb8 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -14,22 +14,36 @@ """A replication client for use by synapse workers. """ import logging -from typing import TYPE_CHECKING, Dict, List, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple from twisted.internet.defer import Deferred from twisted.internet.protocol import ReconnectingClientFactory from synapse.api.constants import EventTypes +from synapse.federation import send_queue +from synapse.federation.sender import FederationSender from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol -from synapse.replication.tcp.streams import TypingStream +from synapse.replication.tcp.streams import ( + AccountDataStream, + DeviceListsStream, + GroupServerStream, + PresenceStream, + PushersStream, + PushRulesStream, + ReceiptsStream, + TagAccountDataStream, + ToDeviceStream, + TypingStream, +) from synapse.replication.tcp.streams.events import ( EventsStream, EventsStreamEventRow, EventsStreamRow, ) -from synapse.types import PersistedEventPosition, UserID -from synapse.util.async_helpers import timeout_deferred +from synapse.types import PersistedEventPosition, ReadReceipt, UserID +from synapse.util.async_helpers import Linearizer, timeout_deferred from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -105,6 +119,14 @@ class ReplicationDataHandler: self._instance_name = hs.get_instance_name() self._typing_handler = hs.get_typing_handler() + self._notify_pushers = hs.config.start_pushers + self._pusher_pool = hs.get_pusherpool() + self._presence_handler = hs.get_presence_handler() + + self.send_handler = None # type: Optional[FederationSenderHandler] + if hs.should_send_federation(): + self.send_handler = FederationSenderHandler(hs) + # Map from stream to list of deferreds waiting for the stream to # arrive at a particular position. The lists are sorted by stream position. self._streams_to_waiters = {} # type: Dict[str, List[Tuple[int, Deferred]]] @@ -125,13 +147,53 @@ class ReplicationDataHandler: """ self.store.process_replication_rows(stream_name, instance_name, token, rows) + if self.send_handler: + await self.send_handler.process_replication_rows(stream_name, token, rows) + if stream_name == TypingStream.NAME: self._typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( "typing_key", token, rooms=[row.room_id for row in rows] ) - - if stream_name == EventsStream.NAME: + elif stream_name == PushRulesStream.NAME: + self.notifier.on_new_event( + "push_rules_key", token, users=[row.user_id for row in rows] + ) + elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME): + self.notifier.on_new_event( + "account_data_key", token, users=[row.user_id for row in rows] + ) + elif stream_name == ReceiptsStream.NAME: + self.notifier.on_new_event( + "receipt_key", token, rooms=[row.room_id for row in rows] + ) + await self._pusher_pool.on_new_receipts( + token, token, {row.room_id for row in rows} + ) + elif stream_name == ToDeviceStream.NAME: + entities = [row.entity for row in rows if row.entity.startswith("@")] + if entities: + self.notifier.on_new_event("to_device_key", token, users=entities) + elif stream_name == DeviceListsStream.NAME: + all_room_ids = set() # type: Set[str] + for row in rows: + if row.entity.startswith("@"): + room_ids = await self.store.get_rooms_for_user(row.entity) + all_room_ids.update(room_ids) + self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) + elif stream_name == GroupServerStream.NAME: + self.notifier.on_new_event( + "groups_key", token, users=[row.user_id for row in rows] + ) + elif stream_name == PushersStream.NAME: + for row in rows: + if row.deleted: + self.stop_pusher(row.user_id, row.app_id, row.pushkey) + else: + await self.start_pusher(row.user_id, row.app_id, row.pushkey) + elif stream_name == PresenceStream.NAME: + await self._presence_handler.process_replication_rows(token, rows) + elif stream_name == EventsStream.NAME: # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows. for row in rows: @@ -190,7 +252,7 @@ class ReplicationDataHandler: waiting_list[:] = waiting_list[index_of_first_deferred_not_called:] async def on_position(self, stream_name: str, instance_name: str, token: int): - self.store.process_replication_rows(stream_name, instance_name, token, []) + await self.on_rdata(stream_name, instance_name, token, []) # We poke the generic "replication" notifier to wake anything up that # may be streaming. @@ -199,6 +261,11 @@ class ReplicationDataHandler: def on_remote_server_up(self, server: str): """Called when get a new REMOTE_SERVER_UP command.""" + # Let's wake up the transaction queue for the server in case we have + # pending stuff to send to it. + if self.send_handler: + self.send_handler.wake_destination(server) + async def wait_for_stream_position( self, instance_name: str, stream_name: str, position: int ): @@ -235,3 +302,153 @@ class ReplicationDataHandler: logger.info( "Finished waiting for repl stream %r to reach %s", stream_name, position ) + + def stop_pusher(self, user_id, app_id, pushkey): + if not self._notify_pushers: + return + + key = "%s:%s" % (app_id, pushkey) + pushers_for_user = self._pusher_pool.pushers.get(user_id, {}) + pusher = pushers_for_user.pop(key, None) + if pusher is None: + return + logger.info("Stopping pusher %r / %r", user_id, key) + pusher.on_stop() + + async def start_pusher(self, user_id, app_id, pushkey): + if not self._notify_pushers: + return + + key = "%s:%s" % (app_id, pushkey) + logger.info("Starting pusher %r / %r", user_id, key) + return await self._pusher_pool.start_pusher_by_id(app_id, pushkey, user_id) + + +class FederationSenderHandler: + """Processes the fedration replication stream + + This class is only instantiate on the worker responsible for sending outbound + federation transactions. It receives rows from the replication stream and forwards + the appropriate entries to the FederationSender class. + """ + + def __init__(self, hs: "HomeServer"): + assert hs.should_send_federation() + + self.store = hs.get_datastore() + self._is_mine_id = hs.is_mine_id + self._hs = hs + + # We need to make a temporary value to ensure that mypy picks up the + # right type. We know we should have a federation sender instance since + # `should_send_federation` is True. + sender = hs.get_federation_sender() + assert isinstance(sender, FederationSender) + self.federation_sender = sender + + # Stores the latest position in the federation stream we've gotten up + # to. This is always set before we use it. + self.federation_position = None # type: Optional[int] + + self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") + + def wake_destination(self, server: str): + self.federation_sender.wake_destination(server) + + async def process_replication_rows(self, stream_name, token, rows): + # The federation stream contains things that we want to send out, e.g. + # presence, typing, etc. + if stream_name == "federation": + send_queue.process_rows_for_federation(self.federation_sender, rows) + await self.update_token(token) + + # ... and when new receipts happen + elif stream_name == ReceiptsStream.NAME: + await self._on_new_receipts(rows) + + # ... as well as device updates and messages + elif stream_name == DeviceListsStream.NAME: + # The entities are either user IDs (starting with '@') whose devices + # have changed, or remote servers that we need to tell about + # changes. + hosts = {row.entity for row in rows if not row.entity.startswith("@")} + for host in hosts: + self.federation_sender.send_device_messages(host) + + elif stream_name == ToDeviceStream.NAME: + # The to_device stream includes stuff to be pushed to both local + # clients and remote servers, so we ignore entities that start with + # '@' (since they'll be local users rather than destinations). + hosts = {row.entity for row in rows if not row.entity.startswith("@")} + for host in hosts: + self.federation_sender.send_device_messages(host) + + async def _on_new_receipts(self, rows): + """ + Args: + rows (Iterable[synapse.replication.tcp.streams.ReceiptsStream.ReceiptsStreamRow]): + new receipts to be processed + """ + for receipt in rows: + # we only want to send on receipts for our own users + if not self._is_mine_id(receipt.user_id): + continue + receipt_info = ReadReceipt( + receipt.room_id, + receipt.receipt_type, + receipt.user_id, + [receipt.event_id], + receipt.data, + ) + await self.federation_sender.send_read_receipt(receipt_info) + + async def update_token(self, token): + """Update the record of where we have processed to in the federation stream. + + Called after we have processed a an update received over replication. Sends + a FEDERATION_ACK back to the master, and stores the token that we have processed + in `federation_stream_position` so that we can restart where we left off. + """ + self.federation_position = token + + # We save and send the ACK to master asynchronously, so we don't block + # processing on persistence. We don't need to do this operation for + # every single RDATA we receive, we just need to do it periodically. + + if self._fed_position_linearizer.is_queued(None): + # There is already a task queued up to save and send the token, so + # no need to queue up another task. + return + + run_as_background_process("_save_and_send_ack", self._save_and_send_ack) + + async def _save_and_send_ack(self): + """Save the current federation position in the database and send an ACK + to master with where we're up to. + """ + # We should only be calling this once we've got a token. + assert self.federation_position is not None + + try: + # We linearize here to ensure we don't have races updating the token + # + # XXX this appears to be redundant, since the ReplicationCommandHandler + # has a linearizer which ensures that we only process one line of + # replication data at a time. Should we remove it, or is it doing useful + # service for robustness? Or could we replace it with an assertion that + # we're not being re-entered? + + with (await self._fed_position_linearizer.queue(None)): + # We persist and ack the same position, so we take a copy of it + # here as otherwise it can get modified from underneath us. + current_position = self.federation_position + + await self.store.update_federation_out_pos( + "federation", current_position + ) + + # We ACK this token over replication so that the master can drop + # its in memory queues + self._hs.get_tcp_replication().send_federation_ack(current_position) + except Exception: + logger.exception("Error updating federation stream position") diff --git a/synapse/server.py b/synapse/server.py index 6c35ae6e50..95a2cd2e5d 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -85,7 +85,11 @@ from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.message import EventCreationHandler, MessageHandler from synapse.handlers.pagination import PaginationHandler from synapse.handlers.password_policy import PasswordPolicyHandler -from synapse.handlers.presence import PresenceHandler +from synapse.handlers.presence import ( + BasePresenceHandler, + PresenceHandler, + WorkerPresenceHandler, +) from synapse.handlers.profile import ProfileHandler from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.receipts import ReceiptsHandler @@ -415,8 +419,11 @@ class HomeServer(metaclass=abc.ABCMeta): return StateResolutionHandler(self) @cache_in_self - def get_presence_handler(self) -> PresenceHandler: - return PresenceHandler(self) + def get_presence_handler(self) -> BasePresenceHandler: + if self.config.worker_app: + return WorkerPresenceHandler(self) + else: + return PresenceHandler(self) @cache_in_self def get_typing_writer_handler(self) -> TypingWriterHandler: diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 36138d69aa..c9d04aef29 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -21,13 +21,11 @@ from twisted.web.http import HTTPChannel from twisted.web.resource import Resource from twisted.web.server import Request, Site -from synapse.app.generic_worker import ( - GenericWorkerReplicationHandler, - GenericWorkerServer, -) +from synapse.app.generic_worker import GenericWorkerServer from synapse.http.server import JsonResource from synapse.http.site import SynapseRequest, SynapseSite from synapse.replication.http import ReplicationRestResource +from synapse.replication.tcp.client import ReplicationDataHandler from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol from synapse.replication.tcp.resource import ( @@ -431,7 +429,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): server_protocol.makeConnection(server_to_client_transport) -class TestReplicationDataHandler(GenericWorkerReplicationHandler): +class TestReplicationDataHandler(ReplicationDataHandler): """Drop-in for ReplicationDataHandler which just collects RDATA rows""" def __init__(self, hs: HomeServer): -- cgit 1.5.1 From 495b214f4f8f45d16ffee851c8ab7a380dd0e2b2 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Tue, 20 Apr 2021 12:50:49 +0200 Subject: Fix (final) Bugbear violations (#9838) --- changelog.d/9838.misc | 1 + scripts-dev/definitions.py | 2 +- scripts-dev/list_url_patterns.py | 2 +- setup.cfg | 3 +-- synapse/event_auth.py | 2 +- synapse/federation/send_queue.py | 4 ++-- synapse/handlers/auth.py | 2 +- synapse/handlers/device.py | 13 +++++-------- synapse/handlers/federation.py | 2 +- synapse/logging/_remote.py | 4 ++-- synapse/rest/key/v2/remote_key_resource.py | 4 ++-- synapse/storage/databases/main/events.py | 10 +++++----- tests/handlers/test_federation.py | 2 +- tests/replication/tcp/streams/test_events.py | 4 ++-- tests/rest/admin/test_device.py | 4 ++-- tests/rest/admin/test_event_reports.py | 8 ++++---- tests/rest/admin/test_room.py | 8 ++++---- tests/rest/admin/test_statistics.py | 2 +- tests/rest/admin/test_user.py | 4 ++-- tests/rest/client/v1/test_rooms.py | 6 +++--- tests/storage/test_event_metrics.py | 4 ++-- tests/unittest.py | 2 +- tests/utils.py | 2 +- 23 files changed, 46 insertions(+), 49 deletions(-) create mode 100644 changelog.d/9838.misc (limited to 'tests/replication') diff --git a/changelog.d/9838.misc b/changelog.d/9838.misc new file mode 100644 index 0000000000..b98ce56309 --- /dev/null +++ b/changelog.d/9838.misc @@ -0,0 +1 @@ +Introduce flake8-bugbear to the test suite and fix some of its lint violations. \ No newline at end of file diff --git a/scripts-dev/definitions.py b/scripts-dev/definitions.py index 313860df13..c82ddd9677 100755 --- a/scripts-dev/definitions.py +++ b/scripts-dev/definitions.py @@ -140,7 +140,7 @@ if __name__ == "__main__": definitions = {} for directory in args.directories: - for root, dirs, files in os.walk(directory): + for root, _, files in os.walk(directory): for filename in files: if filename.endswith(".py"): filepath = os.path.join(root, filename) diff --git a/scripts-dev/list_url_patterns.py b/scripts-dev/list_url_patterns.py index 26ad7c67f4..e85420dea8 100755 --- a/scripts-dev/list_url_patterns.py +++ b/scripts-dev/list_url_patterns.py @@ -48,7 +48,7 @@ args = parser.parse_args() for directory in args.directories: - for root, dirs, files in os.walk(directory): + for root, _, files in os.walk(directory): for filename in files: if filename.endswith(".py"): filepath = os.path.join(root, filename) diff --git a/setup.cfg b/setup.cfg index 33601b71d5..e5ceb7ed19 100644 --- a/setup.cfg +++ b/setup.cfg @@ -18,8 +18,7 @@ ignore = # E203: whitespace before ':' (which is contrary to pep8?) # E731: do not assign a lambda expression, use a def # E501: Line too long (black enforces this for us) -# B007: Subsection of the bugbear suite (TODO: add in remaining fixes) -ignore=W503,W504,E203,E731,E501,B007 +ignore=W503,W504,E203,E731,E501 [isort] line_length = 88 diff --git a/synapse/event_auth.py b/synapse/event_auth.py index 5234e3f81e..c831d9f73c 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -670,7 +670,7 @@ def _verify_third_party_invite(event: EventBase, auth_events: StateMap[EventBase public_key = public_key_object["public_key"] try: for server, signature_block in signed["signatures"].items(): - for key_name, encoded_signature in signature_block.items(): + for key_name in signature_block.keys(): if not key_name.startswith("ed25519:"): continue verify_key = decode_verify_key_bytes( diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index d71f04e43e..65d76ea974 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -501,10 +501,10 @@ def process_rows_for_federation( states=[state], destinations=destinations ) - for destination, edu_map in buff.keyed_edus.items(): + for edu_map in buff.keyed_edus.values(): for key, edu in edu_map.items(): transaction_queue.send_edu(edu, key) - for destination, edu_list in buff.edus.items(): + for edu_list in buff.edus.values(): for edu in edu_list: transaction_queue.send_edu(edu, None) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index b8a37b6477..36f2450e2e 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -1248,7 +1248,7 @@ class AuthHandler(BaseHandler): # see if any of our auth providers want to know about this for provider in self.password_providers: - for token, token_id, device_id in tokens_and_devices: + for token, _, device_id in tokens_and_devices: await provider.on_logged_out( user_id=user_id, device_id=device_id, access_token=token ) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index d75edb184b..c1d7800981 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -156,8 +156,7 @@ class DeviceWorkerHandler(BaseHandler): # The user may have left the room # TODO: Check if they actually did or if we were just invited. if room_id not in room_ids: - for key, event_id in current_state_ids.items(): - etype, state_key = key + for etype, state_key in current_state_ids.keys(): if etype != EventTypes.Member: continue possibly_left.add(state_key) @@ -179,8 +178,7 @@ class DeviceWorkerHandler(BaseHandler): log_kv( {"event": "encountered empty previous state", "room_id": room_id} ) - for key, event_id in current_state_ids.items(): - etype, state_key = key + for etype, state_key in current_state_ids.keys(): if etype != EventTypes.Member: continue possibly_changed.add(state_key) @@ -198,8 +196,7 @@ class DeviceWorkerHandler(BaseHandler): for state_dict in prev_state_ids.values(): member_event = state_dict.get((EventTypes.Member, user_id), None) if not member_event or member_event != current_member_id: - for key, event_id in current_state_ids.items(): - etype, state_key = key + for etype, state_key in current_state_ids.keys(): if etype != EventTypes.Member: continue possibly_changed.add(state_key) @@ -714,7 +711,7 @@ class DeviceListUpdater: # This can happen since we batch updates return - for device_id, stream_id, prev_ids, content in pending_updates: + for device_id, stream_id, prev_ids, _ in pending_updates: logger.debug( "Handling update %r/%r, ID: %r, prev: %r ", user_id, @@ -740,7 +737,7 @@ class DeviceListUpdater: else: # Simply update the single device, since we know that is the only # change (because of the single prev_id matching the current cache) - for device_id, stream_id, prev_ids, content in pending_updates: + for device_id, stream_id, _, content in pending_updates: await self.store.update_remote_device_list_cache_entry( user_id, device_id, content, stream_id ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 4b3730aa3b..dbdd7d2db3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2956,7 +2956,7 @@ class FederationHandler(BaseHandler): try: # for each sig on the third_party_invite block of the actual invite for server, signature_block in signed["signatures"].items(): - for key_name, encoded_signature in signature_block.items(): + for key_name in signature_block.keys(): if not key_name.startswith("ed25519:"): continue diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py index 4e8b0f8d10..c515690b38 100644 --- a/synapse/logging/_remote.py +++ b/synapse/logging/_remote.py @@ -226,11 +226,11 @@ class RemoteHandler(logging.Handler): old_buffer = self._buffer self._buffer = deque() - for i in range(buffer_split): + for _ in range(buffer_split): self._buffer.append(old_buffer.popleft()) end_buffer = [] - for i in range(buffer_split): + for _ in range(buffer_split): end_buffer.append(old_buffer.pop()) self._buffer.extend(reversed(end_buffer)) diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index c57ac22e58..f648678b09 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -144,7 +144,7 @@ class RemoteKey(DirectServeJsonResource): # Note that the value is unused. cache_misses = {} # type: Dict[str, Dict[str, int]] - for (server_name, key_id, from_server), results in cached.items(): + for (server_name, key_id, _), results in cached.items(): results = [(result["ts_added_ms"], result) for result in results] if not results and key_id is not None: @@ -206,7 +206,7 @@ class RemoteKey(DirectServeJsonResource): # Cast to bytes since postgresql returns a memoryview. json_results.add(bytes(most_recent_result["key_json"])) else: - for ts_added, result in results: + for _, result in results: # Cast to bytes since postgresql returns a memoryview. json_results.add(bytes(result["key_json"])) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a362521e20..fd25c8112d 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -170,7 +170,7 @@ class PersistEventsStore: ) async with stream_ordering_manager as stream_orderings: - for (event, context), stream in zip(events_and_contexts, stream_orderings): + for (event, _), stream in zip(events_and_contexts, stream_orderings): event.internal_metadata.stream_ordering = stream await self.db_pool.runInteraction( @@ -297,7 +297,7 @@ class PersistEventsStore: txn.execute(sql + clause, args) to_recursively_check = [] - for event_id, prev_event_id, metadata, rejected in txn: + for _, prev_event_id, metadata, rejected in txn: if prev_event_id in existing_prevs: continue @@ -1127,7 +1127,7 @@ class PersistEventsStore: def _update_forward_extremities_txn( self, txn, new_forward_extremities, max_stream_order ): - for room_id, new_extrem in new_forward_extremities.items(): + for room_id in new_forward_extremities.keys(): self.db_pool.simple_delete_txn( txn, table="event_forward_extremities", keyvalues={"room_id": room_id} ) @@ -1399,7 +1399,7 @@ class PersistEventsStore: ] state_values = [] - for event, context in state_events_and_contexts: + for event, _ in state_events_and_contexts: vals = { "event_id": event.event_id, "room_id": event.room_id, @@ -1468,7 +1468,7 @@ class PersistEventsStore: # nothing to do here return - for event, context in events_and_contexts: + for event, _ in events_and_contexts: if event.type == EventTypes.Redaction and event.redacts is not None: # Remove the entries in the event_push_actions table for the # redacted event. diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index c7b0975a19..8796af45ed 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -222,7 +222,7 @@ class FederationTestCase(unittest.HomeserverTestCase): room_version, ) - for i in range(3): + for _ in range(3): event = create_invite() self.get_success( self.handler.on_invite_request( diff --git a/tests/replication/tcp/streams/test_events.py b/tests/replication/tcp/streams/test_events.py index 323237c1bb..f51fa0a79e 100644 --- a/tests/replication/tcp/streams/test_events.py +++ b/tests/replication/tcp/streams/test_events.py @@ -239,7 +239,7 @@ class EventsStreamTestCase(BaseStreamTestCase): # the state rows are unsorted state_rows = [] # type: List[EventsStreamCurrentStateRow] - for stream_name, token, row in received_rows: + for stream_name, _, row in received_rows: self.assertEqual("events", stream_name) self.assertIsInstance(row, EventsStreamRow) self.assertEqual(row.type, "state") @@ -356,7 +356,7 @@ class EventsStreamTestCase(BaseStreamTestCase): # the state rows are unsorted state_rows = [] # type: List[EventsStreamCurrentStateRow] - for j in range(STATES_PER_USER + 1): + for _ in range(STATES_PER_USER + 1): stream_name, token, row = received_rows.pop(0) self.assertEqual("events", stream_name) self.assertIsInstance(row, EventsStreamRow) diff --git a/tests/rest/admin/test_device.py b/tests/rest/admin/test_device.py index ecbee30bb5..120730b764 100644 --- a/tests/rest/admin/test_device.py +++ b/tests/rest/admin/test_device.py @@ -430,7 +430,7 @@ class DevicesRestTestCase(unittest.HomeserverTestCase): """ # Create devices number_devices = 5 - for n in range(number_devices): + for _ in range(number_devices): self.login("user", "pass") # Get devices @@ -547,7 +547,7 @@ class DeleteDevicesRestTestCase(unittest.HomeserverTestCase): # Create devices number_devices = 5 - for n in range(number_devices): + for _ in range(number_devices): self.login("user", "pass") # Get devices diff --git a/tests/rest/admin/test_event_reports.py b/tests/rest/admin/test_event_reports.py index 8c66da3af4..29341bc6e9 100644 --- a/tests/rest/admin/test_event_reports.py +++ b/tests/rest/admin/test_event_reports.py @@ -48,22 +48,22 @@ class EventReportsTestCase(unittest.HomeserverTestCase): self.helper.join(self.room_id2, user=self.admin_user, tok=self.admin_user_tok) # Two rooms and two users. Every user sends and reports every room event - for i in range(5): + for _ in range(5): self._create_event_and_report( room_id=self.room_id1, user_tok=self.other_user_tok, ) - for i in range(5): + for _ in range(5): self._create_event_and_report( room_id=self.room_id2, user_tok=self.other_user_tok, ) - for i in range(5): + for _ in range(5): self._create_event_and_report( room_id=self.room_id1, user_tok=self.admin_user_tok, ) - for i in range(5): + for _ in range(5): self._create_event_and_report( room_id=self.room_id2, user_tok=self.admin_user_tok, diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index 6bcd997085..6b84188120 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -615,7 +615,7 @@ class RoomTestCase(unittest.HomeserverTestCase): # Create 3 test rooms total_rooms = 3 room_ids = [] - for x in range(total_rooms): + for _ in range(total_rooms): room_id = self.helper.create_room_as( self.admin_user, tok=self.admin_user_tok ) @@ -679,7 +679,7 @@ class RoomTestCase(unittest.HomeserverTestCase): # Create 5 test rooms total_rooms = 5 room_ids = [] - for x in range(total_rooms): + for _ in range(total_rooms): room_id = self.helper.create_room_as( self.admin_user, tok=self.admin_user_tok ) @@ -1577,7 +1577,7 @@ class JoinAliasRoomTestCase(unittest.HomeserverTestCase): channel.json_body["event"]["event_id"], events[midway]["event_id"] ) - for i, found_event in enumerate(channel.json_body["events_before"]): + for found_event in channel.json_body["events_before"]: for j, posted_event in enumerate(events): if found_event["event_id"] == posted_event["event_id"]: self.assertTrue(j < midway) @@ -1585,7 +1585,7 @@ class JoinAliasRoomTestCase(unittest.HomeserverTestCase): else: self.fail("Event %s from events_before not found" % j) - for i, found_event in enumerate(channel.json_body["events_after"]): + for found_event in channel.json_body["events_after"]: for j, posted_event in enumerate(events): if found_event["event_id"] == posted_event["event_id"]: self.assertTrue(j > midway) diff --git a/tests/rest/admin/test_statistics.py b/tests/rest/admin/test_statistics.py index 363bdeeb2d..79cac4266b 100644 --- a/tests/rest/admin/test_statistics.py +++ b/tests/rest/admin/test_statistics.py @@ -467,7 +467,7 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase): number_media: Number of media to be created for the user """ upload_resource = self.media_repo.children[b"upload"] - for i in range(number_media): + for _ in range(number_media): # file size is 67 Byte image_data = unhexlify( b"89504e470d0a1a0a0000000d4948445200000001000000010806" diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index 2844c493fc..b3afd51522 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -1937,7 +1937,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase): # Create rooms and join other_user_tok = self.login("user", "pass") number_rooms = 5 - for n in range(number_rooms): + for _ in range(number_rooms): self.helper.create_room_as(self.other_user, tok=other_user_tok) # Get rooms @@ -2517,7 +2517,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): user_token: Access token of the user number_media: Number of media to be created for the user """ - for i in range(number_media): + for _ in range(number_media): # file size is 67 Byte image_data = unhexlify( b"89504e470d0a1a0a0000000d4948445200000001000000010806" diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 92babf65e0..a3694f3d02 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -646,7 +646,7 @@ class RoomInviteRatelimitTestCase(RoomBase): def test_invites_by_users_ratelimit(self): """Tests that invites to a specific user are actually rate-limited.""" - for i in range(3): + for _ in range(3): room_id = self.helper.create_room_as(self.user_id) self.helper.invite(room_id, self.user_id, "@other-users:red") @@ -668,7 +668,7 @@ class RoomJoinRatelimitTestCase(RoomBase): ) def test_join_local_ratelimit(self): """Tests that local joins are actually rate-limited.""" - for i in range(3): + for _ in range(3): self.helper.create_room_as(self.user_id) self.helper.create_room_as(self.user_id, expect_code=429) @@ -733,7 +733,7 @@ class RoomJoinRatelimitTestCase(RoomBase): for path in paths_to_test: # Make sure we send more requests than the rate-limiting config would allow # if all of these requests ended up joining the user to a room. - for i in range(4): + for _ in range(4): channel = self.make_request("POST", path % room_id, {}) self.assertEquals(channel.code, 200) diff --git a/tests/storage/test_event_metrics.py b/tests/storage/test_event_metrics.py index 397e68fe0a..088fbb247b 100644 --- a/tests/storage/test_event_metrics.py +++ b/tests/storage/test_event_metrics.py @@ -38,12 +38,12 @@ class ExtremStatisticsTestCase(HomeserverTestCase): last_event = None # Make a real event chain - for i in range(event_count): + for _ in range(event_count): ev = self.create_and_send_event(room_id, user, False, last_event) last_event = [ev] # Sprinkle in some extremities - for i in range(extrems): + for _ in range(extrems): ev = self.create_and_send_event(room_id, user, False, last_event) # Let it run for a while, then pull out the statistics from the diff --git a/tests/unittest.py b/tests/unittest.py index d890ad981f..ee22a53849 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -133,7 +133,7 @@ class TestCase(unittest.TestCase): def assertObjectHasAttributes(self, attrs, obj): """Asserts that the given object has each of the attributes given, and that the value of each matches according to assertEquals.""" - for (key, value) in attrs.items(): + for key in attrs.keys(): if not hasattr(obj, key): raise AssertionError("Expected obj to have a '.%s'" % key) try: diff --git a/tests/utils.py b/tests/utils.py index af6b32fc66..63d52b9140 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -303,7 +303,7 @@ def setup_test_homeserver( # database for a few more seconds due to flakiness, preventing # us from dropping it when the test is over. If we can't drop # it, warn and move on. - for x in range(5): + for _ in range(5): try: cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,)) db_conn.commit() -- cgit 1.5.1 From 59d24c5bef4e05fa7be0cad1f7e63f0a0097374b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 23 Apr 2021 17:06:47 +0100 Subject: pass a reactor into SynapseSite (#9874) --- changelog.d/9874.misc | 1 + synapse/app/generic_worker.py | 1 + synapse/app/homeserver.py | 25 ++++++++++--------------- synapse/http/site.py | 37 ++++++++++++++++++++++++++++--------- tests/replication/_base.py | 1 + tests/test_server.py | 1 + tests/unittest.py | 1 + 7 files changed, 43 insertions(+), 24 deletions(-) create mode 100644 changelog.d/9874.misc (limited to 'tests/replication') diff --git a/changelog.d/9874.misc b/changelog.d/9874.misc new file mode 100644 index 0000000000..ba1097e65e --- /dev/null +++ b/changelog.d/9874.misc @@ -0,0 +1 @@ +Pass a reactor into `SynapseSite` to make testing easier. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 7b2ac3ca64..70e07d0574 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -367,6 +367,7 @@ class GenericWorkerServer(HomeServer): listener_config, root_resource, self.version_string, + reactor=self.get_reactor(), ), reactor=self.get_reactor(), ) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 8be8b520eb..140f6bcdee 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -126,19 +126,20 @@ class SynapseHomeServer(HomeServer): else: root_resource = OptionsResource() - root_resource = create_resource_tree(resources, root_resource) + site = SynapseSite( + "synapse.access.%s.%s" % ("https" if tls else "http", site_tag), + site_tag, + listener_config, + create_resource_tree(resources, root_resource), + self.version_string, + reactor=self.get_reactor(), + ) if tls: ports = listen_ssl( bind_addresses, port, - SynapseSite( - "synapse.access.https.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - self.version_string, - ), + site, self.tls_server_context_factory, reactor=self.get_reactor(), ) @@ -148,13 +149,7 @@ class SynapseHomeServer(HomeServer): ports = listen_tcp( bind_addresses, port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - self.version_string, - ), + site, reactor=self.get_reactor(), ) logger.info("Synapse now listening on TCP port %d", port) diff --git a/synapse/http/site.py b/synapse/http/site.py index 32b5e19c09..e911ee4809 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -19,8 +19,9 @@ from typing import Optional, Tuple, Type, Union import attr from zope.interface import implementer -from twisted.internet.interfaces import IAddress +from twisted.internet.interfaces import IAddress, IReactorTime from twisted.python.failure import Failure +from twisted.web.resource import IResource from twisted.web.server import Request, Site from synapse.config.server import ListenerConfig @@ -485,21 +486,39 @@ class _XForwardedForAddress: class SynapseSite(Site): """ - Subclass of a twisted http Site that does access logging with python's - standard logging + Synapse-specific twisted http Site + + This does two main things. + + First, it replaces the requestFactory in use so that we build SynapseRequests + instead of regular t.w.server.Requests. All of the constructor params are really + just parameters for SynapseRequest. + + Second, it inhibits the log() method called by Request.finish, since SynapseRequest + does its own logging. """ def __init__( self, - logger_name, - site_tag, + logger_name: str, + site_tag: str, config: ListenerConfig, - resource, + resource: IResource, server_version_string, - *args, - **kwargs, + reactor: IReactorTime, ): - Site.__init__(self, resource, *args, **kwargs) + """ + + Args: + logger_name: The name of the logger to use for access logs. + site_tag: A tag to use for this site - mostly in access logs. + config: Configuration for the HTTP listener corresponding to this site + resource: The base of the resource tree to be used for serving requests on + this site + server_version_string: A string to present for the Server header + reactor: reactor to be used to manage connection timeouts + """ + Site.__init__(self, resource, reactor=reactor) self.site_tag = site_tag diff --git a/tests/replication/_base.py b/tests/replication/_base.py index c9d04aef29..5cf58d8b60 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -349,6 +349,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): config=worker_hs.config.server.listeners[0], resource=resource, server_version_string="1", + reactor=self.reactor, ) if worker_hs.config.redis.redis_enabled: diff --git a/tests/test_server.py b/tests/test_server.py index 55cde7f62f..45400be367 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -202,6 +202,7 @@ class OptionsResourceTests(unittest.TestCase): parse_listener_def({"type": "http", "port": 0}), self.resource, "1.0", + reactor=self.reactor, ) # render the request and return the channel diff --git a/tests/unittest.py b/tests/unittest.py index ee22a53849..5353e75c7c 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -247,6 +247,7 @@ class HomeserverTestCase(TestCase): config=self.hs.config.server.listeners[0], resource=self.resource, server_version_string="1", + reactor=self.reactor, ) from tests.rest.client.v1.utils import RestHelper -- cgit 1.5.1 From 84936e22648d3c9f6b76028b08c33f0267f5e3a0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 23 Apr 2021 18:40:57 +0100 Subject: Kill off `_PushHTTPChannel`. (#9878) First of all, a fixup to `FakeChannel` which is needed to make it work with the default HTTP channel implementation. Secondly, it looks like we no longer need `_PushHTTPChannel`, because as of #8013, the producer that gets attached to the `HTTPChannel` is now an `IPushProducer`. This is good, because it means we can remove a whole load of test-specific boilerplate which causes variation between tests and production. --- changelog.d/9878.misc | 1 + tests/replication/_base.py | 134 +++++++-------------------------------------- tests/server.py | 6 -- 3 files changed, 20 insertions(+), 121 deletions(-) create mode 100644 changelog.d/9878.misc (limited to 'tests/replication') diff --git a/changelog.d/9878.misc b/changelog.d/9878.misc new file mode 100644 index 0000000000..927876852d --- /dev/null +++ b/changelog.d/9878.misc @@ -0,0 +1 @@ +Remove redundant `_PushHTTPChannel` test class. diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 5cf58d8b60..dc3519ea13 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -12,14 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Any, Callable, Dict, List, Optional, Tuple, Type +from typing import Any, Callable, Dict, List, Optional, Tuple -from twisted.internet.interfaces import IConsumer, IPullProducer, IReactorTime from twisted.internet.protocol import Protocol -from twisted.internet.task import LoopingCall -from twisted.web.http import HTTPChannel from twisted.web.resource import Resource -from twisted.web.server import Request, Site from synapse.app.generic_worker import GenericWorkerServer from synapse.http.server import JsonResource @@ -33,7 +29,6 @@ from synapse.replication.tcp.resource import ( ServerReplicationStreamProtocol, ) from synapse.server import HomeServer -from synapse.util import Clock from tests import unittest from tests.server import FakeTransport @@ -154,7 +149,19 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): client_protocol = client_factory.buildProtocol(None) # Set up the server side protocol - channel = _PushHTTPChannel(self.reactor, SynapseRequest, self.site) + channel = self.site.buildProtocol(None) + + # hook into the channel's request factory so that we can keep a record + # of the requests + requests: List[SynapseRequest] = [] + real_request_factory = channel.requestFactory + + def request_factory(*args, **kwargs): + request = real_request_factory(*args, **kwargs) + requests.append(request) + return request + + channel.requestFactory = request_factory # Connect client to server and vice versa. client_to_server_transport = FakeTransport( @@ -176,7 +183,10 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): server_to_client_transport.loseConnection() client_to_server_transport.loseConnection() - return channel.request + # there should have been exactly one request + self.assertEqual(len(requests), 1) + + return requests[0] def assert_request_is_get_repl_stream_updates( self, request: SynapseRequest, stream_name: str @@ -387,7 +397,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): client_protocol = client_factory.buildProtocol(None) # Set up the server side protocol - channel = _PushHTTPChannel(self.reactor, SynapseRequest, self._hs_to_site[hs]) + channel = self._hs_to_site[hs].buildProtocol(None) # Connect client to server and vice versa. client_to_server_transport = FakeTransport( @@ -445,112 +455,6 @@ class TestReplicationDataHandler(ReplicationDataHandler): self.received_rdata_rows.append((stream_name, token, r)) -class _PushHTTPChannel(HTTPChannel): - """A HTTPChannel that wraps pull producers to push producers. - - This is a hack to get around the fact that HTTPChannel transparently wraps a - pull producer (which is what Synapse uses to reply to requests) with - `_PullToPush` to convert it to a push producer. Unfortunately `_PullToPush` - uses the standard reactor rather than letting us use our test reactor, which - makes it very hard to test. - """ - - def __init__( - self, reactor: IReactorTime, request_factory: Type[Request], site: Site - ): - super().__init__() - self.reactor = reactor - self.requestFactory = request_factory - self.site = site - - self._pull_to_push_producer = None # type: Optional[_PullToPushProducer] - - def registerProducer(self, producer, streaming): - # Convert pull producers to push producer. - if not streaming: - self._pull_to_push_producer = _PullToPushProducer( - self.reactor, producer, self - ) - producer = self._pull_to_push_producer - - super().registerProducer(producer, True) - - def unregisterProducer(self): - if self._pull_to_push_producer: - # We need to manually stop the _PullToPushProducer. - self._pull_to_push_producer.stop() - - def checkPersistence(self, request, version): - """Check whether the connection can be re-used""" - # We hijack this to always say no for ease of wiring stuff up in - # `handle_http_replication_attempt`. - request.responseHeaders.setRawHeaders(b"connection", [b"close"]) - return False - - def requestDone(self, request): - # Store the request for inspection. - self.request = request - super().requestDone(request) - - -class _PullToPushProducer: - """A push producer that wraps a pull producer.""" - - def __init__( - self, reactor: IReactorTime, producer: IPullProducer, consumer: IConsumer - ): - self._clock = Clock(reactor) - self._producer = producer - self._consumer = consumer - - # While running we use a looping call with a zero delay to call - # resumeProducing on given producer. - self._looping_call = None # type: Optional[LoopingCall] - - # We start writing next reactor tick. - self._start_loop() - - def _start_loop(self): - """Start the looping call to""" - - if not self._looping_call: - # Start a looping call which runs every tick. - self._looping_call = self._clock.looping_call(self._run_once, 0) - - def stop(self): - """Stops calling resumeProducing.""" - if self._looping_call: - self._looping_call.stop() - self._looping_call = None - - def pauseProducing(self): - """Implements IPushProducer""" - self.stop() - - def resumeProducing(self): - """Implements IPushProducer""" - self._start_loop() - - def stopProducing(self): - """Implements IPushProducer""" - self.stop() - self._producer.stopProducing() - - def _run_once(self): - """Calls resumeProducing on producer once.""" - - try: - self._producer.resumeProducing() - except Exception: - logger.exception("Failed to call resumeProducing") - try: - self._consumer.unregisterProducer() - except Exception: - pass - - self.stopProducing() - - class FakeRedisPubSubServer: """A fake Redis server for pub/sub.""" diff --git a/tests/server.py b/tests/server.py index b535a5d886..9df8cda24f 100644 --- a/tests/server.py +++ b/tests/server.py @@ -603,12 +603,6 @@ class FakeTransport: if self.disconnected: return - if not hasattr(self.other, "transport"): - # the other has no transport yet; reschedule - if self.autoflush: - self._reactor.callLater(0.0, self.flush) - return - if maxbytes is not None: to_write = self.buffer[:maxbytes] else: -- cgit 1.5.1 From 3ff225175462dde8376aa584e3a47c43b1f0e790 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 23 Apr 2021 19:20:44 +0100 Subject: Improved validation for received requests (#9817) * Simplify `start_listening` callpath * Correctly check the size of uploaded files --- changelog.d/9817.misc | 1 + synapse/api/constants.py | 3 ++ synapse/app/_base.py | 30 ++++++++++-- synapse/app/admin_cmd.py | 8 +-- synapse/app/generic_worker.py | 11 +++-- synapse/app/homeserver.py | 17 +++++-- synapse/config/logger.py | 3 +- synapse/event_auth.py | 4 +- synapse/http/site.py | 32 ++++++++++-- synapse/rest/media/v1/upload_resource.py | 2 - synapse/server.py | 8 +++ tests/http/test_site.py | 83 ++++++++++++++++++++++++++++++++ tests/replication/_base.py | 1 + tests/test_server.py | 1 + tests/unittest.py | 1 + 15 files changed, 174 insertions(+), 31 deletions(-) create mode 100644 changelog.d/9817.misc create mode 100644 tests/http/test_site.py (limited to 'tests/replication') diff --git a/changelog.d/9817.misc b/changelog.d/9817.misc new file mode 100644 index 0000000000..8aa8895f05 --- /dev/null +++ b/changelog.d/9817.misc @@ -0,0 +1 @@ +Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced. diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 31a59bceec..936b6534b4 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -17,6 +17,9 @@ """Contains constants from the specification.""" +# the max size of a (canonical-json-encoded) event +MAX_PDU_SIZE = 65536 + # the "depth" field on events is limited to 2**63 - 1 MAX_DEPTH = 2 ** 63 - 1 diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 2113c4f370..638e01c1b2 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -30,9 +30,10 @@ from twisted.internet import defer, error, reactor from twisted.protocols.tls import TLSMemoryBIOFactory import synapse +from synapse.api.constants import MAX_PDU_SIZE from synapse.app import check_bind_error from synapse.app.phone_stats_home import start_phone_stats_home -from synapse.config.server import ListenerConfig +from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory from synapse.logging.context import PreserveLoggingContext from synapse.metrics.background_process_metrics import wrap_as_background_process @@ -288,7 +289,7 @@ def refresh_certificate(hs): logger.info("Context factories updated.") -async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]): +async def start(hs: "synapse.server.HomeServer"): """ Start a Synapse server or worker. @@ -300,7 +301,6 @@ async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerCon Args: hs: homeserver instance - listeners: Listener configuration ('listeners' in homeserver.yaml) """ # Set up the SIGHUP machinery. if hasattr(signal, "SIGHUP"): @@ -336,7 +336,7 @@ async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerCon synapse.logging.opentracing.init_tracer(hs) # type: ignore[attr-defined] # noqa # It is now safe to start your Synapse. - hs.start_listening(listeners) + hs.start_listening() hs.get_datastore().db_pool.start_profiling() hs.get_pusherpool().start() @@ -530,3 +530,25 @@ def sdnotify(state): # this is a bit surprising, since we don't expect to have a NOTIFY_SOCKET # unless systemd is expecting us to notify it. logger.warning("Unable to send notification to systemd: %s", e) + + +def max_request_body_size(config: HomeServerConfig) -> int: + """Get a suitable maximum size for incoming HTTP requests""" + + # Other than media uploads, the biggest request we expect to see is a fully-loaded + # /federation/v1/send request. + # + # The main thing in such a request is up to 50 PDUs, and up to 100 EDUs. PDUs are + # limited to 65536 bytes (possibly slightly more if the sender didn't use canonical + # json encoding); there is no specced limit to EDUs (see + # https://github.com/matrix-org/matrix-doc/issues/3121). + # + # in short, we somewhat arbitrarily limit requests to 200 * 64K (about 12.5M) + # + max_request_size = 200 * MAX_PDU_SIZE + + # if we have a media repo enabled, we may need to allow larger uploads than that + if config.media.can_load_media_repo: + max_request_size = max(max_request_size, config.media.max_upload_size) + + return max_request_size diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index eb256db749..68ae19c977 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -70,12 +70,6 @@ class AdminCmdSlavedStore( class AdminCmdServer(HomeServer): DATASTORE_CLASS = AdminCmdSlavedStore - def _listen_http(self, listener_config): - pass - - def start_listening(self, listeners): - pass - async def export_data_command(hs, args): """Export data for a user. @@ -232,7 +226,7 @@ def start(config_options): async def run(): with LoggingContext("command"): - _base.start(ss, []) + _base.start(ss) await args.func(ss, args) _base.start_worker_reactor( diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 70e07d0574..1a15ceee81 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -15,7 +15,7 @@ # limitations under the License. import logging import sys -from typing import Dict, Iterable, Optional +from typing import Dict, Optional from twisted.internet import address from twisted.web.resource import IResource @@ -32,7 +32,7 @@ from synapse.api.urls import ( SERVER_KEY_V2_PREFIX, ) from synapse.app import _base -from synapse.app._base import register_start +from synapse.app._base import max_request_body_size, register_start from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging @@ -367,6 +367,7 @@ class GenericWorkerServer(HomeServer): listener_config, root_resource, self.version_string, + max_request_body_size=max_request_body_size(self.config), reactor=self.get_reactor(), ), reactor=self.get_reactor(), @@ -374,8 +375,8 @@ class GenericWorkerServer(HomeServer): logger.info("Synapse worker now listening on port %d", port) - def start_listening(self, listeners: Iterable[ListenerConfig]): - for listener in listeners: + def start_listening(self): + for listener in self.config.worker_listeners: if listener.type == "http": self._listen_http(listener) elif listener.type == "manhole": @@ -468,7 +469,7 @@ def start(config_options): # streams. Will no-op if no streams can be written to by this worker. hs.get_replication_streamer() - register_start(_base.start, hs, config.worker_listeners) + register_start(_base.start, hs) _base.start_worker_reactor("synapse-generic-worker", config) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 140f6bcdee..8e78134bbe 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -17,7 +17,7 @@ import logging import os import sys -from typing import Iterable, Iterator +from typing import Iterator from twisted.internet import reactor from twisted.web.resource import EncodingResourceWrapper, IResource @@ -36,7 +36,13 @@ from synapse.api.urls import ( WEB_CLIENT_PREFIX, ) from synapse.app import _base -from synapse.app._base import listen_ssl, listen_tcp, quit_with_error, register_start +from synapse.app._base import ( + listen_ssl, + listen_tcp, + max_request_body_size, + quit_with_error, + register_start, +) from synapse.config._base import ConfigError from synapse.config.emailconfig import ThreepidBehaviour from synapse.config.homeserver import HomeServerConfig @@ -132,6 +138,7 @@ class SynapseHomeServer(HomeServer): listener_config, create_resource_tree(resources, root_resource), self.version_string, + max_request_body_size=max_request_body_size(self.config), reactor=self.get_reactor(), ) @@ -268,14 +275,14 @@ class SynapseHomeServer(HomeServer): return resources - def start_listening(self, listeners: Iterable[ListenerConfig]): + def start_listening(self): if self.config.redis_enabled: # If redis is enabled we connect via the replication command handler # in the same way as the workers (since we're effectively a client # rather than a server). self.get_tcp_replication().start_replication(self) - for listener in listeners: + for listener in self.config.server.listeners: if listener.type == "http": self._listening_services.extend( self._listener_http(self.config, listener) @@ -407,7 +414,7 @@ def setup(config_options): # Loading the provider metadata also ensures the provider config is valid. await oidc.load_metadata() - await _base.start(hs, config.listeners) + await _base.start(hs) hs.get_datastore().db_pool.updates.start_doing_background_updates() diff --git a/synapse/config/logger.py b/synapse/config/logger.py index b174e0df6d..813076dfe2 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -31,7 +31,6 @@ from twisted.logger import ( ) import synapse -from synapse.app import _base as appbase from synapse.logging._structured import setup_structured_logging from synapse.logging.context import LoggingContextFilter from synapse.logging.filter import MetadataFilter @@ -318,6 +317,8 @@ def setup_logging( # Perform one-time logging configuration. _setup_stdlib_logging(config, log_config_path, logBeginner=logBeginner) # Add a SIGHUP handler to reload the logging configuration, if one is available. + from synapse.app import _base as appbase + appbase.register_sighup(_reload_logging_config, log_config_path) # Log immediately so we can grep backwards. diff --git a/synapse/event_auth.py b/synapse/event_auth.py index afc2bc8267..70c556566e 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -21,7 +21,7 @@ from signedjson.key import decode_verify_key_bytes from signedjson.sign import SignatureVerifyException, verify_signed_json from unpaddedbase64 import decode_base64 -from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.api.constants import MAX_PDU_SIZE, EventTypes, JoinRules, Membership from synapse.api.errors import AuthError, EventSizeError, SynapseError from synapse.api.room_versions import ( KNOWN_ROOM_VERSIONS, @@ -205,7 +205,7 @@ def _check_size_limits(event: EventBase) -> None: too_big("type") if len(event.event_id) > 255: too_big("event_id") - if len(encode_canonical_json(event.get_pdu_json())) > 65536: + if len(encode_canonical_json(event.get_pdu_json())) > MAX_PDU_SIZE: too_big("event") diff --git a/synapse/http/site.py b/synapse/http/site.py index e911ee4809..671fd3fbcc 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -14,7 +14,7 @@ import contextlib import logging import time -from typing import Optional, Tuple, Type, Union +from typing import Optional, Tuple, Union import attr from zope.interface import implementer @@ -50,6 +50,7 @@ class SynapseRequest(Request): * Redaction of access_token query-params in __repr__ * Logging at start and end * Metrics to record CPU, wallclock and DB time by endpoint. + * A limit to the size of request which will be accepted It also provides a method `processing`, which returns a context manager. If this method is called, the request won't be logged until the context manager is closed; @@ -60,8 +61,9 @@ class SynapseRequest(Request): logcontext: the log context for this request """ - def __init__(self, channel, *args, **kw): + def __init__(self, channel, *args, max_request_body_size=1024, **kw): Request.__init__(self, channel, *args, **kw) + self._max_request_body_size = max_request_body_size self.site = channel.site # type: SynapseSite self._channel = channel # this is used by the tests self.start_time = 0.0 @@ -98,6 +100,18 @@ class SynapseRequest(Request): self.site.site_tag, ) + def handleContentChunk(self, data): + # we should have a `content` by now. + assert self.content, "handleContentChunk() called before gotLength()" + if self.content.tell() + len(data) > self._max_request_body_size: + logger.warning( + "Aborting connection from %s because the request exceeds maximum size", + self.client, + ) + self.transport.abortConnection() + return + super().handleContentChunk(data) + @property def requester(self) -> Optional[Union[Requester, str]]: return self._requester @@ -505,6 +519,7 @@ class SynapseSite(Site): config: ListenerConfig, resource: IResource, server_version_string, + max_request_body_size: int, reactor: IReactorTime, ): """ @@ -516,6 +531,8 @@ class SynapseSite(Site): resource: The base of the resource tree to be used for serving requests on this site server_version_string: A string to present for the Server header + max_request_body_size: Maximum request body length to allow before + dropping the connection reactor: reactor to be used to manage connection timeouts """ Site.__init__(self, resource, reactor=reactor) @@ -524,9 +541,14 @@ class SynapseSite(Site): assert config.http_options is not None proxied = config.http_options.x_forwarded - self.requestFactory = ( - XForwardedForRequest if proxied else SynapseRequest - ) # type: Type[Request] + request_class = XForwardedForRequest if proxied else SynapseRequest + + def request_factory(channel, queued) -> Request: + return request_class( + channel, max_request_body_size=max_request_body_size, queued=queued + ) + + self.requestFactory = request_factory # type: ignore self.access_logger = logging.getLogger(logger_name) self.server_version_string = server_version_string.encode("ascii") diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py index 80f017a4dd..024a105bf2 100644 --- a/synapse/rest/media/v1/upload_resource.py +++ b/synapse/rest/media/v1/upload_resource.py @@ -51,8 +51,6 @@ class UploadResource(DirectServeJsonResource): async def _async_render_POST(self, request: SynapseRequest) -> None: requester = await self.auth.get_user_by_req(request) - # TODO: The checks here are a bit late. The content will have - # already been uploaded to a tmp file at this point content_length = request.getHeader("Content-Length") if content_length is None: raise SynapseError(msg="Request must specify a Content-Length", code=400) diff --git a/synapse/server.py b/synapse/server.py index 8c147be2b3..06570bb1ce 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -287,6 +287,14 @@ class HomeServer(metaclass=abc.ABCMeta): if self.config.run_background_tasks: self.setup_background_tasks() + def start_listening(self) -> None: + """Start the HTTP, manhole, metrics, etc listeners + + Does nothing in this base class; overridden in derived classes to start the + appropriate listeners. + """ + pass + def setup_background_tasks(self) -> None: """ Some handlers have side effects on instantiation (like registering diff --git a/tests/http/test_site.py b/tests/http/test_site.py new file mode 100644 index 0000000000..8c13b4f693 --- /dev/null +++ b/tests/http/test_site.py @@ -0,0 +1,83 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet.address import IPv6Address +from twisted.test.proto_helpers import StringTransport + +from synapse.app.homeserver import SynapseHomeServer + +from tests.unittest import HomeserverTestCase + + +class SynapseRequestTestCase(HomeserverTestCase): + def make_homeserver(self, reactor, clock): + return self.setup_test_homeserver(homeserver_to_use=SynapseHomeServer) + + def test_large_request(self): + """overlarge HTTP requests should be rejected""" + self.hs.start_listening() + + # find the HTTP server which is configured to listen on port 0 + (port, factory, _backlog, interface) = self.reactor.tcpServers[0] + self.assertEqual(interface, "::") + self.assertEqual(port, 0) + + # as a control case, first send a regular request. + + # complete the connection and wire it up to a fake transport + client_address = IPv6Address("TCP", "::1", "2345") + protocol = factory.buildProtocol(client_address) + transport = StringTransport() + protocol.makeConnection(transport) + + protocol.dataReceived( + b"POST / HTTP/1.1\r\n" + b"Connection: close\r\n" + b"Transfer-Encoding: chunked\r\n" + b"\r\n" + b"0\r\n" + b"\r\n" + ) + + while not transport.disconnecting: + self.reactor.advance(1) + + # we should get a 404 + self.assertRegex(transport.value().decode(), r"^HTTP/1\.1 404 ") + + # now send an oversized request + protocol = factory.buildProtocol(client_address) + transport = StringTransport() + protocol.makeConnection(transport) + + protocol.dataReceived( + b"POST / HTTP/1.1\r\n" + b"Connection: close\r\n" + b"Transfer-Encoding: chunked\r\n" + b"\r\n" + ) + + # we deliberately send all the data in one big chunk, to ensure that + # twisted isn't buffering the data in the chunked transfer decoder. + # we start with the chunk size, in hex. (We won't actually send this much) + protocol.dataReceived(b"10000000\r\n") + sent = 0 + while not transport.disconnected: + self.assertLess(sent, 0x10000000, "connection did not drop") + protocol.dataReceived(b"\0" * 1024) + sent += 1024 + + # default max upload size is 50M, so it should drop on the next buffer after + # that. + self.assertEqual(sent, 50 * 1024 * 1024 + 1024) diff --git a/tests/replication/_base.py b/tests/replication/_base.py index dc3519ea13..624bd1b927 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -359,6 +359,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): config=worker_hs.config.server.listeners[0], resource=resource, server_version_string="1", + max_request_body_size=4096, reactor=self.reactor, ) diff --git a/tests/test_server.py b/tests/test_server.py index 45400be367..407e172e41 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -202,6 +202,7 @@ class OptionsResourceTests(unittest.TestCase): parse_listener_def({"type": "http", "port": 0}), self.resource, "1.0", + max_request_body_size=1234, reactor=self.reactor, ) diff --git a/tests/unittest.py b/tests/unittest.py index 5353e75c7c..9bd02bd9c4 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -247,6 +247,7 @@ class HomeserverTestCase(TestCase): config=self.hs.config.server.listeners[0], resource=self.resource, server_version_string="1", + max_request_body_size=1234, reactor=self.reactor, ) -- cgit 1.5.1