diff options
Diffstat (limited to 'synapse')
51 files changed, 86 insertions, 83 deletions
diff --git a/synapse/_scripts/update_synapse_database.py b/synapse/_scripts/update_synapse_database.py index 0adf94bba6..8bf5ad2b36 100644 --- a/synapse/_scripts/update_synapse_database.py +++ b/synapse/_scripts/update_synapse_database.py @@ -59,7 +59,7 @@ def run_background_updates(hs: HomeServer) -> None: def run() -> None: # Apply all background updates on the database. - defer.ensureDeferred( + defer.ensureDeferred( # type: ignore[unused-awaitable] run_as_background_process("background_updates", run_background_updates) ) diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py index 897dd3edac..722ff81dbf 100644 --- a/synapse/app/phone_stats_home.py +++ b/synapse/app/phone_stats_home.py @@ -189,7 +189,7 @@ def start_phone_stats_home(hs: "HomeServer") -> None: clock.looping_call( hs.get_datastores().main.reap_monthly_active_users, 1000 * 60 * 60 ) - hs.get_datastores().main.reap_monthly_active_users() + hs.get_datastores().main.reap_monthly_active_users() # type: ignore[unused-awaitable] @wrap_as_background_process("generate_monthly_active_users") async def generate_monthly_active_users() -> None: @@ -212,7 +212,7 @@ def start_phone_stats_home(hs: "HomeServer") -> None: max_mau_gauge.set(float(hs.config.server.max_mau_value)) if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only: - generate_monthly_active_users() + generate_monthly_active_users() # type: ignore[unused-awaitable] clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000) # End of monthly active user settings diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 3a319b0d42..72f90ceb46 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -200,7 +200,7 @@ class _ServiceQueuer: if service.id in self.requests_in_flight: return - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "as-sender-%s" % (service.id,), self._send_request, service ) @@ -407,10 +407,10 @@ class _TransactionController: if sent: await txn.complete(self.store) else: - run_in_background(self._on_txn_fail, service) + run_in_background(self._on_txn_fail, service) # type: ignore[unused-awaitable] except Exception: logger.exception("Error creating appservice transaction") - run_in_background(self._on_txn_fail, service) + run_in_background(self._on_txn_fail, service) # type: ignore[unused-awaitable] async def on_recovered(self, recoverer: "_Recoverer") -> None: logger.info( @@ -479,7 +479,7 @@ class _Recoverer: def recover(self) -> None: def _retry() -> None: - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "as-recoverer-%s" % (self.service.id,), self.retry ) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 6d99845de5..86cd2c87f1 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -200,7 +200,7 @@ class FederationServer(FederationBase): ) if lock: logger.info("Handling old staged inbound events in %s", room_id) - self._process_incoming_pdus_in_room_inner( + self._process_incoming_pdus_in_room_inner( # type: ignore[unused-awaitable] room_id, room_version, lock, @@ -277,7 +277,7 @@ class FederationServer(FederationBase): # any old events in the staging area. if not self._started_handling_of_staged_events: self._started_handling_of_staged_events = True - self._handle_old_staged_events() + self._handle_old_staged_events() # type: ignore[unused-awaitable] # Start a periodic check for old staged events. This is to handle # the case where locks time out, e.g. if another process gets killed @@ -1144,7 +1144,7 @@ class FederationServer(FederationBase): _INBOUND_EVENT_HANDLING_LOCK_NAME, pdu.room_id ) if lock: - self._process_incoming_pdus_in_room_inner( + self._process_incoming_pdus_in_room_inner( # type: ignore[unused-awaitable] pdu.room_id, room_version, lock, origin, pdu ) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 43421a9c72..3e500e3643 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -187,7 +187,7 @@ class _DestinationWakeupQueue: self.queue[destination] = None if not self.processing: - self._handle() + self._handle() # type: ignore[unused-awaitable] @wrap_as_background_process("_DestinationWakeupQueue.handle") async def _handle(self) -> None: @@ -342,7 +342,7 @@ class FederationSender(AbstractFederationSender): return # fire off a processing loop in the background - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "process_event_queue_for_federation", self._process_event_queue_loop ) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index ffc9d95ee7..a2a907d6aa 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -301,7 +301,7 @@ class PerDestinationQueue: logger.debug("TX [%s] Starting transaction loop", self._destination) - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "federation_transaction_transmission_loop", self._transaction_transmission_loop, ) diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py index cdaf0d5de7..2e0c6085af 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py @@ -132,7 +132,7 @@ class Authenticator: # alive retry_timings = await self.store.get_destination_retry_timings(origin) if retry_timings and retry_timings.retry_last_ts: - run_in_background(self.reset_retry_timings, origin) + run_in_background(self.reset_retry_timings, origin) # type: ignore[unused-awaitable] return origin diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index ec3ab968e9..455d4005df 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -97,7 +97,7 @@ class ApplicationServicesHandler: # We only start a new background process if necessary rather than # optimistically (to cut down on overhead). - self._notify_interested_services(max_token) + self._notify_interested_services(max_token) # type: ignore[unused-awaitable] @wrap_as_background_process("notify_interested_services") async def _notify_interested_services(self, max_token: RoomStreamToken) -> None: @@ -144,7 +144,7 @@ class ApplicationServicesHandler: except Exception: logger.error("Application Services Failure") - run_as_background_process("as_scheduler", start_scheduler) + run_as_background_process("as_scheduler", start_scheduler) # type: ignore[unused-awaitable] self.started_scheduler = True # Fork off pushes to these services @@ -307,7 +307,7 @@ class ApplicationServicesHandler: # We only start a new background process if necessary rather than # optimistically (to cut down on overhead). - self._notify_interested_services_ephemeral( + self._notify_interested_services_ephemeral( # type: ignore[unused-awaitable] services, stream_key, new_token, users ) diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index d31263c717..c1850521e4 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -223,7 +223,7 @@ class DeactivateAccountHandler: pending deactivation, if it isn't already running. """ if not self._user_parter_running: - run_as_background_process("user_parter_loop", self._user_parter_loop) + run_as_background_process("user_parter_loop", self._user_parter_loop) # type: ignore[unused-awaitable] async def _user_parter_loop(self) -> None: """Loop that parts deactivated users from rooms""" diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 6f7963df43..08afcbeefa 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -589,7 +589,7 @@ class DeviceHandler(DeviceWorkerHandler): # We may need to do some processing asynchronously for local user IDs. if self.hs.is_mine_id(user_id): - self._handle_new_device_update_async() + self._handle_new_device_update_async() # type: ignore[unused-awaitable] async def notify_user_signature_update( self, from_user_id: str, user_ids: List[str] diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 00c403db49..22f298d445 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -198,7 +198,7 @@ class DeviceMessageHandler: await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,)) # Immediately attempt a resync in the background - run_in_background(self._user_device_resync, user_id=sender_user_id) + run_in_background(self._user_device_resync, user_id=sender_user_id) # type: ignore[unused-awaitable] async def send_device_message( self, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 5f2057269d..2944deaa00 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -192,7 +192,7 @@ class FederationHandler: # any partial-state-resync operations which were in flight when we # were shut down. if not hs.config.worker.worker_app: - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "resume_sync_partial_state_room", self._resume_partial_state_room_sync ) @@ -778,7 +778,7 @@ class FederationHandler: # lots of requests for missing prev_events which we do actually # have. Hence we fire off the background task, but don't wait for it. - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "handle_queued_pdus", self._handle_queued_pdus, room_queue ) @@ -1838,7 +1838,7 @@ class FederationHandler: room_id=room_id, ) - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] desc="sync_partial_state_room", func=_sync_partial_state_room_wrapper ) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index b7136f8d1c..3a65ccbb55 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1461,7 +1461,7 @@ class FederationEventHandler: resync = True if resync: - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "resync_device_due_to_pdu", self._resync_device, event.sender, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index da129ec16a..29ec7e3544 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -101,7 +101,7 @@ class MessageHandler: self._scheduled_expiry: Optional[IDelayedCall] = None if not hs.config.worker.worker_app: - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "_schedule_next_expiry", self._schedule_next_expiry ) @@ -1954,7 +1954,7 @@ class EventCreationHandler: if event.type == EventTypes.Message: # We don't want to block sending messages on any presence code. This # matters as sometimes presence code can take a while. - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "bump_presence_active_time", self._bump_active_time, requester.user ) @@ -1966,7 +1966,7 @@ class EventCreationHandler: except Exception: logger.exception("Error notifying about new room events") - run_in_background(_notify) + run_in_background(_notify) # type: ignore[unused-awaitable] return persisted_events[-1] diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 63b35c8d62..d7085c001d 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -293,7 +293,7 @@ class PaginationHandler: # We want to purge everything, including local events, and to run the purge in # the background so that it's not blocking any other operation apart from # other purges in the same room. - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "_purge_history", self._purge_history, purge_id, @@ -328,7 +328,7 @@ class PaginationHandler: logger.info("[purge] starting purge_id %s", purge_id) self._purges_by_id[purge_id] = PurgeStatus() - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "purge_history", self._purge_history, purge_id, @@ -777,7 +777,7 @@ class PaginationHandler: self._delete_by_id[delete_id] = DeleteStatus() self._delete_by_room.setdefault(room_id, []).append(delete_id) - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "shutdown_and_purge_room", self._shutdown_and_purge_room, delete_id, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 4ad2233573..b289b6cb23 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1064,7 +1064,7 @@ class PresenceHandler(BasePresenceHandler): yield finally: if affect_presence: - run_in_background(_end) + run_in_background(_end) # type: ignore[unused-awaitable] return _user_syncing() @@ -1337,7 +1337,7 @@ class PresenceHandler(BasePresenceHandler): finally: self._event_processing = False - run_as_background_process("presence.notify_new_event", _process_presence) + run_as_background_process("presence.notify_new_event", _process_presence) # type: ignore[unused-awaitable] async def _unsafe_process(self) -> None: # Loop round handling deltas until we're up to date diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 5c01482acf..9a6faf6270 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -75,7 +75,7 @@ class StatsHandler: finally: self._is_processing = False - run_as_background_process("stats.notify_new_event", process) + run_as_background_process("stats.notify_new_event", process) # type: ignore[unused-awaitable] async def _unsafe_process(self) -> None: # If self.pos is None then means we haven't fetched it from DB diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 3f656ea4f5..b38b5ff495 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -116,7 +116,7 @@ class FollowerTypingHandler: if self.federation and self.is_mine_id(member.user_id): last_fed_poke = self._member_last_federation_poke.get(member, None) if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now: - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "typing._push_remote", self._push_remote, member=member, typing=True ) @@ -180,7 +180,7 @@ class FollowerTypingHandler: self._room_typing[row.room_id] = now_typing if self.federation: - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "_send_changes_in_typing_to_remotes", self._send_changes_in_typing_to_remotes, row.room_id, @@ -327,7 +327,7 @@ class TypingWriterHandler(FollowerTypingHandler): def _push_update(self, member: RoomMember, typing: bool) -> None: if self.hs.is_mine_id(member.user_id): # Only send updates for changes to our own users. - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "typing._push_remote", self._push_remote, member, typing ) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 3610b6bf78..96b692dca9 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -122,7 +122,7 @@ class UserDirectoryHandler(StateDeltasHandler): self._is_processing = False self._is_processing = True - run_as_background_process("user_directory.notify_new_event", process) + run_as_background_process("user_directory.notify_new_event", process) # type: ignore[unused-awaitable] async def handle_local_profile_change( self, user_id: str, profile: ProfileInfo diff --git a/synapse/http/client.py b/synapse/http/client.py index d777d59ccf..39439cb558 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -458,7 +458,7 @@ class SimpleHttpClient: ) # turn timeouts into RequestTimedOutErrors - request_deferred.addErrback(_timeout_to_request_timed_out_error) + request_deferred.addErrback(_timeout_to_request_timed_out_error) # type: ignore[unused-awaitable] response = await make_deferred_yieldable(request_deferred) diff --git a/synapse/http/connectproxyclient.py b/synapse/http/connectproxyclient.py index 23a60af171..466e2f57ea 100644 --- a/synapse/http/connectproxyclient.py +++ b/synapse/http/connectproxyclient.py @@ -102,7 +102,7 @@ class HTTPConnectProxyEndpoint: d = self._proxy_endpoint.connect(f) # once the tcp socket connects successfully, we need to wait for the # CONNECT to complete. - d.addCallback(lambda conn: f.on_connection) + d.addCallback(lambda conn: f.on_connection) # type: ignore[unused-awaitable] return d @@ -196,7 +196,7 @@ class HTTPConnectProtocol(protocol.Protocol): self.http_setup_client = HTTPConnectSetupClient( self.host, self.port, self.proxy_creds ) - self.http_setup_client.on_connected.addCallback(self.proxyConnected) + self.http_setup_client.on_connected.addCallback(self.proxyConnected) # type: ignore[unused-awaitable] def connectionMade(self) -> None: self.http_setup_client.makeConnection(self.transport) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 3302d4e48a..915224da70 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -1216,7 +1216,7 @@ class MatrixFederationHttpClient: try: d = read_body_with_max_size(response, output_stream, max_size) - d.addTimeout(self.default_timeout, self.reactor) + d.addTimeout(self.default_timeout, self.reactor) # type: ignore[unused-awaitable] length = await make_deferred_yieldable(d) except BodyExceededMaxSize: msg = "Requested file is too large > %r bytes" % (max_size,) diff --git a/synapse/http/server.py b/synapse/http/server.py index 9314454af1..ba6d52623b 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -764,7 +764,7 @@ def respond_with_json( if send_cors: set_cors_headers(request) - run_in_background( + run_in_background( # type: ignore[unused-awaitable] _async_write_json_to_request_in_thread, request, encoder, json_object ) return NOT_DONE_YET diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py index 5a61b21eaf..8edf2083fc 100644 --- a/synapse/logging/_remote.py +++ b/synapse/logging/_remote.py @@ -193,7 +193,7 @@ class RemoteHandler(logging.Handler): self._connection_waiter = None deferred: Deferred = self._service.whenConnected(failAfterFailures=1) - deferred.addCallbacks(writer, fail) + deferred.addCallbacks(writer, fail) # type: ignore[unused-awaitable] self._connection_waiter = deferred def _handle_pressure(self) -> None: diff --git a/synapse/logging/context.py b/synapse/logging/context.py index f62bea968f..27041dd870 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -843,7 +843,7 @@ def run_in_background( # type: ignore[misc] # which is supposed to have a single entry and exit point. But # by spawning off another deferred, we are effectively # adding a new exit point.) - res.addBoth(_set_context_cb, ctx) + res.addBoth(_set_context_cb, ctx) # type: ignore[unused-awaitable] return res @@ -870,7 +870,7 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T] # ok, we can't be sure that a yield won't block, so let's reset the # logcontext, and add a callback to the deferred to restore it. prev_context = set_current_context(SENTINEL_CONTEXT) - deferred.addBoth(_set_context_cb, prev_context) + deferred.addBoth(_set_context_cb, prev_context) # type: ignore[unused-awaitable] return deferred diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index c70eee649c..9379d43ecb 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -933,7 +933,7 @@ def _custom_sync_async_decorator( scope.__exit__(None, None, None) return result - result.addCallbacks(call_back, err_back) + result.addCallbacks(call_back, err_back) # type: ignore[unused-awaitable] else: if inspect.isawaitable(result): diff --git a/synapse/metrics/_gc.py b/synapse/metrics/_gc.py index a22c4e5bbd..405a7858b7 100644 --- a/synapse/metrics/_gc.py +++ b/synapse/metrics/_gc.py @@ -129,7 +129,7 @@ def install_gc_manager() -> None: gc_unreachable.labels(i).set(unreachable) gc_task = task.LoopingCall(_maybe_gc) - gc_task.start(0.1) + gc_task.start(0.1) # type: ignore[unused-awaitable] # diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 9ea4e23b31..ec6b95fdf3 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -207,6 +207,9 @@ def run_as_background_process( clock.looping_call and friends (or for firing-and-forgetting in the middle of a normal synapse async function). + Mypy will flag up this Deferred as unawaited. This is safe to ignore: the background + process runs automatically, even if we don't await the returned deferred. + Args: desc: a description for this background process type func: a function, which may return a Deferred or a coroutine diff --git a/synapse/metrics/common_usage_metrics.py b/synapse/metrics/common_usage_metrics.py index 6e05b043d3..bb7c3122b8 100644 --- a/synapse/metrics/common_usage_metrics.py +++ b/synapse/metrics/common_usage_metrics.py @@ -54,7 +54,7 @@ class CommonUsageMetricsManager: async def setup(self) -> None: """Keep the gauges for common usage metrics up to date.""" - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] desc="common_usage_metrics_update_gauges", func=self._update_gauges ) self._clock.looping_call( diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 1710dd51b9..8b8615673f 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -113,7 +113,7 @@ class EmailPusher(Pusher): if self._is_processing: return - run_as_background_process("emailpush.process", self._process) + run_as_background_process("emailpush.process", self._process) # type: ignore[unused-awaitable] def _pause_processing(self) -> None: """Used by tests to temporarily pause processing of events. diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index b048b03a74..5d65194c42 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -160,7 +160,7 @@ class HttpPusher(Pusher): # We could check the receipts are actually m.read receipts here, # but currently that's the only type of receipt anyway... - run_as_background_process("http_pusher.on_new_receipts", self._update_badge) + run_as_background_process("http_pusher.on_new_receipts", self._update_badge) # type: ignore[unused-awaitable] async def _update_badge(self) -> None: # XXX as per https://github.com/matrix-org/matrix-doc/issues/2627, this seems @@ -189,7 +189,7 @@ class HttpPusher(Pusher): if self._is_processing: return - run_as_background_process("httppush.process", self._process) + run_as_background_process("httppush.process", self._process) # type: ignore[unused-awaitable] async def _process(self) -> None: # we should never get here if we are already processing diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index e2648cbc93..4e1bec1110 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -92,7 +92,7 @@ class PusherPool: if not self._should_start_pushers: logger.info("Not starting pushers because they are disabled in the config") return - run_as_background_process("start_pushers", self._start_pushers) + run_as_background_process("start_pushers", self._start_pushers) # type: ignore[unused-awaitable] async def add_or_update_pusher( self, @@ -236,7 +236,7 @@ class PusherPool: # We only start a new background process if necessary rather than # optimistically (to cut down on overhead). - self._on_new_notifications(max_token) + self._on_new_notifications(max_token) # type: ignore[unused-awaitable] @wrap_as_background_process("on_new_notifications") async def _on_new_notifications(self, max_token: RoomStreamToken) -> None: diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 424854efbe..2accffe18d 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -526,7 +526,7 @@ class FederationSenderHandler: # no need to queue up another task. return - run_as_background_process("_save_and_send_ack", self._save_and_send_ack) + run_as_background_process("_save_and_send_ack", self._save_and_send_ack) # type: ignore[unused-awaitable] async def _save_and_send_ack(self) -> None: """Save the current federation position in the database and send an ACK diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index d03a53d764..ffe882bc99 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -291,7 +291,7 @@ class ReplicationCommandHandler: return # fire off a background process to start processing the queue. - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "process-replication-data", self._unsafe_process_queue, stream_name ) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 56a5c21910..4e39e1c43f 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -300,7 +300,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): # if so. if isawaitable(res): - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "replication-" + cmd.get_logcontext_id(), lambda: res ) diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index dfc061eb5e..2a6ad95986 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -112,7 +112,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): def connectionMade(self) -> None: logger.info("Connected to redis") super().connectionMade() - run_as_background_process("subscribe-replication", self._send_subscribe) + run_as_background_process("subscribe-replication", self._send_subscribe) # type: ignore[unused-awaitable] async def _send_subscribe(self) -> None: # it's important to make sure that we only send the REPLICATE command once we @@ -183,7 +183,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): # if so. if isawaitable(res): - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "replication-" + cmd.get_logcontext_id(), lambda: res ) @@ -205,7 +205,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): Args: cmd: The command to send """ - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "send-cmd", self._async_send_command, cmd, bg_start_span=False ) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 347467d863..f1c3e7595a 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -137,7 +137,7 @@ class ReplicationStreamer: logger.debug("Notifier poke loop already running") return - run_as_background_process("replication_notifier", self._run_notifier_loop) + run_as_background_process("replication_notifier", self._run_notifier_loop) # type: ignore[unused-awaitable] async def _run_notifier_loop(self) -> None: self.is_looping = True diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 129b6fe6b0..3a3abed56a 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -1124,7 +1124,7 @@ class RoomRedactEventRestServlet(TransactionRestServlet): ) if with_relations: - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "redact_related_events", self._relation_handler.redact_events_related_to, requester=requester, diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index f2aaab6227..df194f8439 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -116,7 +116,7 @@ class HttpTransactionCache: # we deliberately do not propagate the error any further, as we # expect the observers to have reported it. - deferred.addErrback(remove_from_map) + deferred.addErrback(remove_from_map) # type: ignore[unused-awaitable] return make_deferred_yieldable(observable.observe()) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index a99aea8926..1666c4616a 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -272,7 +272,7 @@ class BackgroundUpdater: # if we start a new background update, not all updates are done. self._all_done = False sleep = self.sleep_enabled - run_as_background_process( + run_as_background_process( # type: ignore[unused-awaitable] "background_updates", self.run_background_updates, sleep ) diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index f1d2c71c91..7e13f691e3 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -293,7 +293,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]): self._currently_persisting_rooms.discard(room_id) # set handle_queue_loop off in the background - run_as_background_process("persist_events", handle_queue_loop) + run_as_background_process("persist_events", handle_queue_loop) # type: ignore[unused-awaitable] def _get_drainining_queue( self, room_id: str diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 20b7a68362..0d90407ebd 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1065,7 +1065,7 @@ class EventsWorkerStore(SQLBaseStore): should_start = False if should_start: - run_as_background_process("fetch_events", self._fetch_thread) + run_as_background_process("fetch_events", self._fetch_thread) # type: ignore[unused-awaitable] async def _fetch_thread(self) -> None: """Services requests for events from `_event_fetch_list`.""" diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 7be9d5f113..714be27d86 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -132,7 +132,7 @@ class Clock: call = task.LoopingCall(f, *args, **kwargs) call.clock = self._reactor d = call.start(msec / 1000.0, now=False) - d.addErrback(log_failure, "Looping call died", consumeErrors=False) + d.addErrback(log_failure, "Looping call died", consumeErrors=False) # type: ignore[unused-awaitable] return call def call_later( diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 01e3cd46f6..d612fca03d 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -154,7 +154,7 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]): else: return f - deferred.addCallbacks(callback, errback) + deferred.addCallbacks(callback, errback) # type: ignore[unused-awaitable] def observe(self) -> "defer.Deferred[_T]": """Observe the underlying deferred. @@ -635,7 +635,7 @@ class ReadWriteLock: # writer waiting for us and it completed entirely within the # `new_defer.callback()` call above. if self.key_to_current_writer.get(key) == new_defer: - self.key_to_current_writer.pop(key) + self.key_to_current_writer.pop(key) # type: ignore[unused-awaitable] return _ctx_manager() @@ -693,7 +693,7 @@ def timeout_deferred( raise defer.TimeoutError("Timed out after %gs" % (timeout,)) return value - deferred.addErrback(convert_cancelled) + deferred.addErrback(convert_cancelled) # type: ignore[unused-awaitable] def cancel_timeout(result: _T) -> _T: # stop the pending call to cancel the deferred if it's been fired @@ -701,7 +701,7 @@ def timeout_deferred( delayed_call.cancel() return result - deferred.addBoth(cancel_timeout) + deferred.addBoth(cancel_timeout) # type: ignore[unused-awaitable] def success_cb(val: _T) -> None: if not new_d.called: @@ -711,7 +711,7 @@ def timeout_deferred( if not new_d.called: new_d.errback(val) - deferred.addCallbacks(success_cb, failure_cb) + deferred.addCallbacks(success_cb, failure_cb) # type: ignore[unused-awaitable] return new_d @@ -759,7 +759,7 @@ def stop_cancellation(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]": wrapped with `make_deferred_yieldable`. """ new_deferred: "defer.Deferred[T]" = defer.Deferred() - deferred.chainDeferred(new_deferred) + deferred.chainDeferred(new_deferred) # type: ignore[unused-awaitable] return new_deferred @@ -821,10 +821,10 @@ def delay_cancellation(awaitable: Awaitable[T]) -> Awaitable[T]: new_deferred.pause() new_deferred.errback(Failure(CancelledError())) - deferred.addBoth(lambda _: new_deferred.unpause()) + deferred.addBoth(lambda _: new_deferred.unpause()) # type: ignore[unused-awaitable] new_deferred: "defer.Deferred[T]" = defer.Deferred(handle_cancel) - deferred.chainDeferred(new_deferred) + deferred.chainDeferred(new_deferred) # type: ignore[unused-awaitable] return new_deferred diff --git a/synapse/util/batching_queue.py b/synapse/util/batching_queue.py index 2a903004a9..72a13cd1a4 100644 --- a/synapse/util/batching_queue.py +++ b/synapse/util/batching_queue.py @@ -128,7 +128,7 @@ class BatchingQueue(Generic[V, R]): # If we're not currently processing the key fire off a background # process to start processing. if key not in self._processing_keys: - run_as_background_process(self._name, self._process_queue, key) + run_as_background_process(self._name, self._process_queue, key) # type: ignore[unused-awaitable] with self._number_in_flight_metric.track_inprogress(): return await make_deferred_yieldable(d) diff --git a/synapse/util/caches/cached_call.py b/synapse/util/caches/cached_call.py index e325f44da3..4061db56a8 100644 --- a/synapse/util/caches/cached_call.py +++ b/synapse/util/caches/cached_call.py @@ -89,7 +89,7 @@ class CachedCall(Generic[TV]): def got_result(r: Union[TV, Failure]) -> None: self._result = r - self._deferred.addBoth(got_result) + self._deferred.addBoth(got_result) # type: ignore[unused-awaitable] # TODO: consider cancellation semantics. Currently, if the call to get() # is cancelled, the underlying call will continue (and any future calls diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 81df71a0c5..740d9585cf 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -377,7 +377,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase): for k, v in r.items(): results[cache_key_to_arg(k)] = v - pending_deferred.addCallback(update_results) + pending_deferred.addCallback(update_results) # type: ignore[unused-awaitable] cached_defers.append(pending_deferred) if missing: diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index b580bdd0de..c5019a1074 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -84,7 +84,7 @@ class Distributor: if name not in self.signals: raise KeyError("%r does not have a signal named %s" % (self, name)) - run_as_background_process(name, self.signals[name].fire, *args, **kwargs) + run_as_background_process(name, self.signals[name].fire, *args, **kwargs) # type: ignore[unused-awaitable] P = ParamSpec("P") diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py index d00d34e652..8078ab0bef 100644 --- a/synapse/util/patch_inline_callbacks.py +++ b/synapse/util/patch_inline_callbacks.py @@ -108,7 +108,7 @@ def do_patch() -> None: raise Exception(err) return r - res.addBoth(check_ctx) + res.addBoth(check_ctx) # type: ignore[unused-awaitable] return res return wrapped diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index f262bf95a0..e01645f1ab 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -334,7 +334,7 @@ class _PerHostRatelimiter: queue_defer = queue_request() return queue_defer - ret_defer.addBoth(on_wait_finished) + ret_defer.addBoth(on_wait_finished) # type: ignore[unused-awaitable] else: ret_defer = queue_request() @@ -358,8 +358,8 @@ class _PerHostRatelimiter: self.ready_request_queue.pop(request_id, None) return r - ret_defer.addCallbacks(on_start, on_err) - ret_defer.addBoth(on_both) + ret_defer.addCallbacks(on_start, on_err) # type: ignore[unused-awaitable] + ret_defer.addBoth(on_both) # type: ignore[unused-awaitable] return make_deferred_yieldable(ret_defer) def _on_exit(self, request_id: object) -> None: diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index dcc037b982..9ddc00fddd 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -265,4 +265,4 @@ class RetryDestinationLimiter: logger.exception("Failed to store destination_retry_timings") # we deliberately do this in the background. - run_as_background_process("store_retry_timings", store_retry_timings) + run_as_background_process("store_retry_timings", store_retry_timings) # type: ignore[unused-awaitable] |