diff --git a/synapse/_scripts/update_synapse_database.py b/synapse/_scripts/update_synapse_database.py
index 0adf94bba6..8bf5ad2b36 100644
--- a/synapse/_scripts/update_synapse_database.py
+++ b/synapse/_scripts/update_synapse_database.py
@@ -59,7 +59,7 @@ def run_background_updates(hs: HomeServer) -> None:
def run() -> None:
# Apply all background updates on the database.
- defer.ensureDeferred(
+ defer.ensureDeferred( # type: ignore[unused-awaitable]
run_as_background_process("background_updates", run_background_updates)
)
diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py
index 897dd3edac..722ff81dbf 100644
--- a/synapse/app/phone_stats_home.py
+++ b/synapse/app/phone_stats_home.py
@@ -189,7 +189,7 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
clock.looping_call(
hs.get_datastores().main.reap_monthly_active_users, 1000 * 60 * 60
)
- hs.get_datastores().main.reap_monthly_active_users()
+ hs.get_datastores().main.reap_monthly_active_users() # type: ignore[unused-awaitable]
@wrap_as_background_process("generate_monthly_active_users")
async def generate_monthly_active_users() -> None:
@@ -212,7 +212,7 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
max_mau_gauge.set(float(hs.config.server.max_mau_value))
if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
- generate_monthly_active_users()
+ generate_monthly_active_users() # type: ignore[unused-awaitable]
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
# End of monthly active user settings
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 3a319b0d42..72f90ceb46 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -200,7 +200,7 @@ class _ServiceQueuer:
if service.id in self.requests_in_flight:
return
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"as-sender-%s" % (service.id,), self._send_request, service
)
@@ -407,10 +407,10 @@ class _TransactionController:
if sent:
await txn.complete(self.store)
else:
- run_in_background(self._on_txn_fail, service)
+ run_in_background(self._on_txn_fail, service) # type: ignore[unused-awaitable]
except Exception:
logger.exception("Error creating appservice transaction")
- run_in_background(self._on_txn_fail, service)
+ run_in_background(self._on_txn_fail, service) # type: ignore[unused-awaitable]
async def on_recovered(self, recoverer: "_Recoverer") -> None:
logger.info(
@@ -479,7 +479,7 @@ class _Recoverer:
def recover(self) -> None:
def _retry() -> None:
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"as-recoverer-%s" % (self.service.id,), self.retry
)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 6d99845de5..86cd2c87f1 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -200,7 +200,7 @@ class FederationServer(FederationBase):
)
if lock:
logger.info("Handling old staged inbound events in %s", room_id)
- self._process_incoming_pdus_in_room_inner(
+ self._process_incoming_pdus_in_room_inner( # type: ignore[unused-awaitable]
room_id,
room_version,
lock,
@@ -277,7 +277,7 @@ class FederationServer(FederationBase):
# any old events in the staging area.
if not self._started_handling_of_staged_events:
self._started_handling_of_staged_events = True
- self._handle_old_staged_events()
+ self._handle_old_staged_events() # type: ignore[unused-awaitable]
# Start a periodic check for old staged events. This is to handle
# the case where locks time out, e.g. if another process gets killed
@@ -1144,7 +1144,7 @@ class FederationServer(FederationBase):
_INBOUND_EVENT_HANDLING_LOCK_NAME, pdu.room_id
)
if lock:
- self._process_incoming_pdus_in_room_inner(
+ self._process_incoming_pdus_in_room_inner( # type: ignore[unused-awaitable]
pdu.room_id, room_version, lock, origin, pdu
)
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 43421a9c72..3e500e3643 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -187,7 +187,7 @@ class _DestinationWakeupQueue:
self.queue[destination] = None
if not self.processing:
- self._handle()
+ self._handle() # type: ignore[unused-awaitable]
@wrap_as_background_process("_DestinationWakeupQueue.handle")
async def _handle(self) -> None:
@@ -342,7 +342,7 @@ class FederationSender(AbstractFederationSender):
return
# fire off a processing loop in the background
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"process_event_queue_for_federation", self._process_event_queue_loop
)
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index ffc9d95ee7..a2a907d6aa 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -301,7 +301,7 @@ class PerDestinationQueue:
logger.debug("TX [%s] Starting transaction loop", self._destination)
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"federation_transaction_transmission_loop",
self._transaction_transmission_loop,
)
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index cdaf0d5de7..2e0c6085af 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -132,7 +132,7 @@ class Authenticator:
# alive
retry_timings = await self.store.get_destination_retry_timings(origin)
if retry_timings and retry_timings.retry_last_ts:
- run_in_background(self.reset_retry_timings, origin)
+ run_in_background(self.reset_retry_timings, origin) # type: ignore[unused-awaitable]
return origin
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ec3ab968e9..455d4005df 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -97,7 +97,7 @@ class ApplicationServicesHandler:
# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
- self._notify_interested_services(max_token)
+ self._notify_interested_services(max_token) # type: ignore[unused-awaitable]
@wrap_as_background_process("notify_interested_services")
async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
@@ -144,7 +144,7 @@ class ApplicationServicesHandler:
except Exception:
logger.error("Application Services Failure")
- run_as_background_process("as_scheduler", start_scheduler)
+ run_as_background_process("as_scheduler", start_scheduler) # type: ignore[unused-awaitable]
self.started_scheduler = True
# Fork off pushes to these services
@@ -307,7 +307,7 @@ class ApplicationServicesHandler:
# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
- self._notify_interested_services_ephemeral(
+ self._notify_interested_services_ephemeral( # type: ignore[unused-awaitable]
services, stream_key, new_token, users
)
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index d31263c717..c1850521e4 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -223,7 +223,7 @@ class DeactivateAccountHandler:
pending deactivation, if it isn't already running.
"""
if not self._user_parter_running:
- run_as_background_process("user_parter_loop", self._user_parter_loop)
+ run_as_background_process("user_parter_loop", self._user_parter_loop) # type: ignore[unused-awaitable]
async def _user_parter_loop(self) -> None:
"""Loop that parts deactivated users from rooms"""
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 6f7963df43..08afcbeefa 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -589,7 +589,7 @@ class DeviceHandler(DeviceWorkerHandler):
# We may need to do some processing asynchronously for local user IDs.
if self.hs.is_mine_id(user_id):
- self._handle_new_device_update_async()
+ self._handle_new_device_update_async() # type: ignore[unused-awaitable]
async def notify_user_signature_update(
self, from_user_id: str, user_ids: List[str]
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 00c403db49..22f298d445 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -198,7 +198,7 @@ class DeviceMessageHandler:
await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,))
# Immediately attempt a resync in the background
- run_in_background(self._user_device_resync, user_id=sender_user_id)
+ run_in_background(self._user_device_resync, user_id=sender_user_id) # type: ignore[unused-awaitable]
async def send_device_message(
self,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 5f2057269d..2944deaa00 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -192,7 +192,7 @@ class FederationHandler:
# any partial-state-resync operations which were in flight when we
# were shut down.
if not hs.config.worker.worker_app:
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"resume_sync_partial_state_room", self._resume_partial_state_room_sync
)
@@ -778,7 +778,7 @@ class FederationHandler:
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the background task, but don't wait for it.
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"handle_queued_pdus", self._handle_queued_pdus, room_queue
)
@@ -1838,7 +1838,7 @@ class FederationHandler:
room_id=room_id,
)
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
desc="sync_partial_state_room", func=_sync_partial_state_room_wrapper
)
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index b7136f8d1c..3a65ccbb55 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -1461,7 +1461,7 @@ class FederationEventHandler:
resync = True
if resync:
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"resync_device_due_to_pdu",
self._resync_device,
event.sender,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index da129ec16a..29ec7e3544 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -101,7 +101,7 @@ class MessageHandler:
self._scheduled_expiry: Optional[IDelayedCall] = None
if not hs.config.worker.worker_app:
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"_schedule_next_expiry", self._schedule_next_expiry
)
@@ -1954,7 +1954,7 @@ class EventCreationHandler:
if event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"bump_presence_active_time", self._bump_active_time, requester.user
)
@@ -1966,7 +1966,7 @@ class EventCreationHandler:
except Exception:
logger.exception("Error notifying about new room events")
- run_in_background(_notify)
+ run_in_background(_notify) # type: ignore[unused-awaitable]
return persisted_events[-1]
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 63b35c8d62..d7085c001d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -293,7 +293,7 @@ class PaginationHandler:
# We want to purge everything, including local events, and to run the purge in
# the background so that it's not blocking any other operation apart from
# other purges in the same room.
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"_purge_history",
self._purge_history,
purge_id,
@@ -328,7 +328,7 @@ class PaginationHandler:
logger.info("[purge] starting purge_id %s", purge_id)
self._purges_by_id[purge_id] = PurgeStatus()
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"purge_history",
self._purge_history,
purge_id,
@@ -777,7 +777,7 @@ class PaginationHandler:
self._delete_by_id[delete_id] = DeleteStatus()
self._delete_by_room.setdefault(room_id, []).append(delete_id)
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"shutdown_and_purge_room",
self._shutdown_and_purge_room,
delete_id,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 4ad2233573..b289b6cb23 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1064,7 +1064,7 @@ class PresenceHandler(BasePresenceHandler):
yield
finally:
if affect_presence:
- run_in_background(_end)
+ run_in_background(_end) # type: ignore[unused-awaitable]
return _user_syncing()
@@ -1337,7 +1337,7 @@ class PresenceHandler(BasePresenceHandler):
finally:
self._event_processing = False
- run_as_background_process("presence.notify_new_event", _process_presence)
+ run_as_background_process("presence.notify_new_event", _process_presence) # type: ignore[unused-awaitable]
async def _unsafe_process(self) -> None:
# Loop round handling deltas until we're up to date
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 5c01482acf..9a6faf6270 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -75,7 +75,7 @@ class StatsHandler:
finally:
self._is_processing = False
- run_as_background_process("stats.notify_new_event", process)
+ run_as_background_process("stats.notify_new_event", process) # type: ignore[unused-awaitable]
async def _unsafe_process(self) -> None:
# If self.pos is None then means we haven't fetched it from DB
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 3f656ea4f5..b38b5ff495 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -116,7 +116,7 @@ class FollowerTypingHandler:
if self.federation and self.is_mine_id(member.user_id):
last_fed_poke = self._member_last_federation_poke.get(member, None)
if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"typing._push_remote", self._push_remote, member=member, typing=True
)
@@ -180,7 +180,7 @@ class FollowerTypingHandler:
self._room_typing[row.room_id] = now_typing
if self.federation:
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"_send_changes_in_typing_to_remotes",
self._send_changes_in_typing_to_remotes,
row.room_id,
@@ -327,7 +327,7 @@ class TypingWriterHandler(FollowerTypingHandler):
def _push_update(self, member: RoomMember, typing: bool) -> None:
if self.hs.is_mine_id(member.user_id):
# Only send updates for changes to our own users.
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"typing._push_remote", self._push_remote, member, typing
)
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 3610b6bf78..96b692dca9 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -122,7 +122,7 @@ class UserDirectoryHandler(StateDeltasHandler):
self._is_processing = False
self._is_processing = True
- run_as_background_process("user_directory.notify_new_event", process)
+ run_as_background_process("user_directory.notify_new_event", process) # type: ignore[unused-awaitable]
async def handle_local_profile_change(
self, user_id: str, profile: ProfileInfo
diff --git a/synapse/http/client.py b/synapse/http/client.py
index d777d59ccf..39439cb558 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -458,7 +458,7 @@ class SimpleHttpClient:
)
# turn timeouts into RequestTimedOutErrors
- request_deferred.addErrback(_timeout_to_request_timed_out_error)
+ request_deferred.addErrback(_timeout_to_request_timed_out_error) # type: ignore[unused-awaitable]
response = await make_deferred_yieldable(request_deferred)
diff --git a/synapse/http/connectproxyclient.py b/synapse/http/connectproxyclient.py
index 23a60af171..466e2f57ea 100644
--- a/synapse/http/connectproxyclient.py
+++ b/synapse/http/connectproxyclient.py
@@ -102,7 +102,7 @@ class HTTPConnectProxyEndpoint:
d = self._proxy_endpoint.connect(f)
# once the tcp socket connects successfully, we need to wait for the
# CONNECT to complete.
- d.addCallback(lambda conn: f.on_connection)
+ d.addCallback(lambda conn: f.on_connection) # type: ignore[unused-awaitable]
return d
@@ -196,7 +196,7 @@ class HTTPConnectProtocol(protocol.Protocol):
self.http_setup_client = HTTPConnectSetupClient(
self.host, self.port, self.proxy_creds
)
- self.http_setup_client.on_connected.addCallback(self.proxyConnected)
+ self.http_setup_client.on_connected.addCallback(self.proxyConnected) # type: ignore[unused-awaitable]
def connectionMade(self) -> None:
self.http_setup_client.makeConnection(self.transport)
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 3302d4e48a..915224da70 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -1216,7 +1216,7 @@ class MatrixFederationHttpClient:
try:
d = read_body_with_max_size(response, output_stream, max_size)
- d.addTimeout(self.default_timeout, self.reactor)
+ d.addTimeout(self.default_timeout, self.reactor) # type: ignore[unused-awaitable]
length = await make_deferred_yieldable(d)
except BodyExceededMaxSize:
msg = "Requested file is too large > %r bytes" % (max_size,)
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 9314454af1..ba6d52623b 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -764,7 +764,7 @@ def respond_with_json(
if send_cors:
set_cors_headers(request)
- run_in_background(
+ run_in_background( # type: ignore[unused-awaitable]
_async_write_json_to_request_in_thread, request, encoder, json_object
)
return NOT_DONE_YET
diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py
index 5a61b21eaf..8edf2083fc 100644
--- a/synapse/logging/_remote.py
+++ b/synapse/logging/_remote.py
@@ -193,7 +193,7 @@ class RemoteHandler(logging.Handler):
self._connection_waiter = None
deferred: Deferred = self._service.whenConnected(failAfterFailures=1)
- deferred.addCallbacks(writer, fail)
+ deferred.addCallbacks(writer, fail) # type: ignore[unused-awaitable]
self._connection_waiter = deferred
def _handle_pressure(self) -> None:
diff --git a/synapse/logging/context.py b/synapse/logging/context.py
index f62bea968f..27041dd870 100644
--- a/synapse/logging/context.py
+++ b/synapse/logging/context.py
@@ -843,7 +843,7 @@ def run_in_background( # type: ignore[misc]
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
- res.addBoth(_set_context_cb, ctx)
+ res.addBoth(_set_context_cb, ctx) # type: ignore[unused-awaitable]
return res
@@ -870,7 +870,7 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]
# ok, we can't be sure that a yield won't block, so let's reset the
# logcontext, and add a callback to the deferred to restore it.
prev_context = set_current_context(SENTINEL_CONTEXT)
- deferred.addBoth(_set_context_cb, prev_context)
+ deferred.addBoth(_set_context_cb, prev_context) # type: ignore[unused-awaitable]
return deferred
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index c70eee649c..9379d43ecb 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -933,7 +933,7 @@ def _custom_sync_async_decorator(
scope.__exit__(None, None, None)
return result
- result.addCallbacks(call_back, err_back)
+ result.addCallbacks(call_back, err_back) # type: ignore[unused-awaitable]
else:
if inspect.isawaitable(result):
diff --git a/synapse/metrics/_gc.py b/synapse/metrics/_gc.py
index a22c4e5bbd..405a7858b7 100644
--- a/synapse/metrics/_gc.py
+++ b/synapse/metrics/_gc.py
@@ -129,7 +129,7 @@ def install_gc_manager() -> None:
gc_unreachable.labels(i).set(unreachable)
gc_task = task.LoopingCall(_maybe_gc)
- gc_task.start(0.1)
+ gc_task.start(0.1) # type: ignore[unused-awaitable]
#
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 9ea4e23b31..ec6b95fdf3 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -207,6 +207,9 @@ def run_as_background_process(
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
normal synapse async function).
+ Mypy will flag up this Deferred as unawaited. This is safe to ignore: the background
+ process runs automatically, even if we don't await the returned deferred.
+
Args:
desc: a description for this background process type
func: a function, which may return a Deferred or a coroutine
diff --git a/synapse/metrics/common_usage_metrics.py b/synapse/metrics/common_usage_metrics.py
index 6e05b043d3..bb7c3122b8 100644
--- a/synapse/metrics/common_usage_metrics.py
+++ b/synapse/metrics/common_usage_metrics.py
@@ -54,7 +54,7 @@ class CommonUsageMetricsManager:
async def setup(self) -> None:
"""Keep the gauges for common usage metrics up to date."""
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
desc="common_usage_metrics_update_gauges", func=self._update_gauges
)
self._clock.looping_call(
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 1710dd51b9..8b8615673f 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -113,7 +113,7 @@ class EmailPusher(Pusher):
if self._is_processing:
return
- run_as_background_process("emailpush.process", self._process)
+ run_as_background_process("emailpush.process", self._process) # type: ignore[unused-awaitable]
def _pause_processing(self) -> None:
"""Used by tests to temporarily pause processing of events.
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index b048b03a74..5d65194c42 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -160,7 +160,7 @@ class HttpPusher(Pusher):
# We could check the receipts are actually m.read receipts here,
# but currently that's the only type of receipt anyway...
- run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
+ run_as_background_process("http_pusher.on_new_receipts", self._update_badge) # type: ignore[unused-awaitable]
async def _update_badge(self) -> None:
# XXX as per https://github.com/matrix-org/matrix-doc/issues/2627, this seems
@@ -189,7 +189,7 @@ class HttpPusher(Pusher):
if self._is_processing:
return
- run_as_background_process("httppush.process", self._process)
+ run_as_background_process("httppush.process", self._process) # type: ignore[unused-awaitable]
async def _process(self) -> None:
# we should never get here if we are already processing
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index e2648cbc93..4e1bec1110 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -92,7 +92,7 @@ class PusherPool:
if not self._should_start_pushers:
logger.info("Not starting pushers because they are disabled in the config")
return
- run_as_background_process("start_pushers", self._start_pushers)
+ run_as_background_process("start_pushers", self._start_pushers) # type: ignore[unused-awaitable]
async def add_or_update_pusher(
self,
@@ -236,7 +236,7 @@ class PusherPool:
# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
- self._on_new_notifications(max_token)
+ self._on_new_notifications(max_token) # type: ignore[unused-awaitable]
@wrap_as_background_process("on_new_notifications")
async def _on_new_notifications(self, max_token: RoomStreamToken) -> None:
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 424854efbe..2accffe18d 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -526,7 +526,7 @@ class FederationSenderHandler:
# no need to queue up another task.
return
- run_as_background_process("_save_and_send_ack", self._save_and_send_ack)
+ run_as_background_process("_save_and_send_ack", self._save_and_send_ack) # type: ignore[unused-awaitable]
async def _save_and_send_ack(self) -> None:
"""Save the current federation position in the database and send an ACK
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index d03a53d764..ffe882bc99 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -291,7 +291,7 @@ class ReplicationCommandHandler:
return
# fire off a background process to start processing the queue.
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"process-replication-data", self._unsafe_process_queue, stream_name
)
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 56a5c21910..4e39e1c43f 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -300,7 +300,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
# if so.
if isawaitable(res):
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"replication-" + cmd.get_logcontext_id(), lambda: res
)
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index dfc061eb5e..2a6ad95986 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -112,7 +112,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
def connectionMade(self) -> None:
logger.info("Connected to redis")
super().connectionMade()
- run_as_background_process("subscribe-replication", self._send_subscribe)
+ run_as_background_process("subscribe-replication", self._send_subscribe) # type: ignore[unused-awaitable]
async def _send_subscribe(self) -> None:
# it's important to make sure that we only send the REPLICATE command once we
@@ -183,7 +183,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
# if so.
if isawaitable(res):
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"replication-" + cmd.get_logcontext_id(), lambda: res
)
@@ -205,7 +205,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
Args:
cmd: The command to send
"""
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"send-cmd", self._async_send_command, cmd, bg_start_span=False
)
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 347467d863..f1c3e7595a 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -137,7 +137,7 @@ class ReplicationStreamer:
logger.debug("Notifier poke loop already running")
return
- run_as_background_process("replication_notifier", self._run_notifier_loop)
+ run_as_background_process("replication_notifier", self._run_notifier_loop) # type: ignore[unused-awaitable]
async def _run_notifier_loop(self) -> None:
self.is_looping = True
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 129b6fe6b0..3a3abed56a 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -1124,7 +1124,7 @@ class RoomRedactEventRestServlet(TransactionRestServlet):
)
if with_relations:
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"redact_related_events",
self._relation_handler.redact_events_related_to,
requester=requester,
diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py
index f2aaab6227..df194f8439 100644
--- a/synapse/rest/client/transactions.py
+++ b/synapse/rest/client/transactions.py
@@ -116,7 +116,7 @@ class HttpTransactionCache:
# we deliberately do not propagate the error any further, as we
# expect the observers to have reported it.
- deferred.addErrback(remove_from_map)
+ deferred.addErrback(remove_from_map) # type: ignore[unused-awaitable]
return make_deferred_yieldable(observable.observe())
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index a99aea8926..1666c4616a 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -272,7 +272,7 @@ class BackgroundUpdater:
# if we start a new background update, not all updates are done.
self._all_done = False
sleep = self.sleep_enabled
- run_as_background_process(
+ run_as_background_process( # type: ignore[unused-awaitable]
"background_updates", self.run_background_updates, sleep
)
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index f1d2c71c91..7e13f691e3 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -293,7 +293,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
self._currently_persisting_rooms.discard(room_id)
# set handle_queue_loop off in the background
- run_as_background_process("persist_events", handle_queue_loop)
+ run_as_background_process("persist_events", handle_queue_loop) # type: ignore[unused-awaitable]
def _get_drainining_queue(
self, room_id: str
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 20b7a68362..0d90407ebd 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1065,7 +1065,7 @@ class EventsWorkerStore(SQLBaseStore):
should_start = False
if should_start:
- run_as_background_process("fetch_events", self._fetch_thread)
+ run_as_background_process("fetch_events", self._fetch_thread) # type: ignore[unused-awaitable]
async def _fetch_thread(self) -> None:
"""Services requests for events from `_event_fetch_list`."""
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 7be9d5f113..714be27d86 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -132,7 +132,7 @@ class Clock:
call = task.LoopingCall(f, *args, **kwargs)
call.clock = self._reactor
d = call.start(msec / 1000.0, now=False)
- d.addErrback(log_failure, "Looping call died", consumeErrors=False)
+ d.addErrback(log_failure, "Looping call died", consumeErrors=False) # type: ignore[unused-awaitable]
return call
def call_later(
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 01e3cd46f6..d612fca03d 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -154,7 +154,7 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
else:
return f
- deferred.addCallbacks(callback, errback)
+ deferred.addCallbacks(callback, errback) # type: ignore[unused-awaitable]
def observe(self) -> "defer.Deferred[_T]":
"""Observe the underlying deferred.
@@ -635,7 +635,7 @@ class ReadWriteLock:
# writer waiting for us and it completed entirely within the
# `new_defer.callback()` call above.
if self.key_to_current_writer.get(key) == new_defer:
- self.key_to_current_writer.pop(key)
+ self.key_to_current_writer.pop(key) # type: ignore[unused-awaitable]
return _ctx_manager()
@@ -693,7 +693,7 @@ def timeout_deferred(
raise defer.TimeoutError("Timed out after %gs" % (timeout,))
return value
- deferred.addErrback(convert_cancelled)
+ deferred.addErrback(convert_cancelled) # type: ignore[unused-awaitable]
def cancel_timeout(result: _T) -> _T:
# stop the pending call to cancel the deferred if it's been fired
@@ -701,7 +701,7 @@ def timeout_deferred(
delayed_call.cancel()
return result
- deferred.addBoth(cancel_timeout)
+ deferred.addBoth(cancel_timeout) # type: ignore[unused-awaitable]
def success_cb(val: _T) -> None:
if not new_d.called:
@@ -711,7 +711,7 @@ def timeout_deferred(
if not new_d.called:
new_d.errback(val)
- deferred.addCallbacks(success_cb, failure_cb)
+ deferred.addCallbacks(success_cb, failure_cb) # type: ignore[unused-awaitable]
return new_d
@@ -759,7 +759,7 @@ def stop_cancellation(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
wrapped with `make_deferred_yieldable`.
"""
new_deferred: "defer.Deferred[T]" = defer.Deferred()
- deferred.chainDeferred(new_deferred)
+ deferred.chainDeferred(new_deferred) # type: ignore[unused-awaitable]
return new_deferred
@@ -821,10 +821,10 @@ def delay_cancellation(awaitable: Awaitable[T]) -> Awaitable[T]:
new_deferred.pause()
new_deferred.errback(Failure(CancelledError()))
- deferred.addBoth(lambda _: new_deferred.unpause())
+ deferred.addBoth(lambda _: new_deferred.unpause()) # type: ignore[unused-awaitable]
new_deferred: "defer.Deferred[T]" = defer.Deferred(handle_cancel)
- deferred.chainDeferred(new_deferred)
+ deferred.chainDeferred(new_deferred) # type: ignore[unused-awaitable]
return new_deferred
diff --git a/synapse/util/batching_queue.py b/synapse/util/batching_queue.py
index 2a903004a9..72a13cd1a4 100644
--- a/synapse/util/batching_queue.py
+++ b/synapse/util/batching_queue.py
@@ -128,7 +128,7 @@ class BatchingQueue(Generic[V, R]):
# If we're not currently processing the key fire off a background
# process to start processing.
if key not in self._processing_keys:
- run_as_background_process(self._name, self._process_queue, key)
+ run_as_background_process(self._name, self._process_queue, key) # type: ignore[unused-awaitable]
with self._number_in_flight_metric.track_inprogress():
return await make_deferred_yieldable(d)
diff --git a/synapse/util/caches/cached_call.py b/synapse/util/caches/cached_call.py
index e325f44da3..4061db56a8 100644
--- a/synapse/util/caches/cached_call.py
+++ b/synapse/util/caches/cached_call.py
@@ -89,7 +89,7 @@ class CachedCall(Generic[TV]):
def got_result(r: Union[TV, Failure]) -> None:
self._result = r
- self._deferred.addBoth(got_result)
+ self._deferred.addBoth(got_result) # type: ignore[unused-awaitable]
# TODO: consider cancellation semantics. Currently, if the call to get()
# is cancelled, the underlying call will continue (and any future calls
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 81df71a0c5..740d9585cf 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -377,7 +377,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
for k, v in r.items():
results[cache_key_to_arg(k)] = v
- pending_deferred.addCallback(update_results)
+ pending_deferred.addCallback(update_results) # type: ignore[unused-awaitable]
cached_defers.append(pending_deferred)
if missing:
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index b580bdd0de..c5019a1074 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -84,7 +84,7 @@ class Distributor:
if name not in self.signals:
raise KeyError("%r does not have a signal named %s" % (self, name))
- run_as_background_process(name, self.signals[name].fire, *args, **kwargs)
+ run_as_background_process(name, self.signals[name].fire, *args, **kwargs) # type: ignore[unused-awaitable]
P = ParamSpec("P")
diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py
index d00d34e652..8078ab0bef 100644
--- a/synapse/util/patch_inline_callbacks.py
+++ b/synapse/util/patch_inline_callbacks.py
@@ -108,7 +108,7 @@ def do_patch() -> None:
raise Exception(err)
return r
- res.addBoth(check_ctx)
+ res.addBoth(check_ctx) # type: ignore[unused-awaitable]
return res
return wrapped
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index f262bf95a0..e01645f1ab 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -334,7 +334,7 @@ class _PerHostRatelimiter:
queue_defer = queue_request()
return queue_defer
- ret_defer.addBoth(on_wait_finished)
+ ret_defer.addBoth(on_wait_finished) # type: ignore[unused-awaitable]
else:
ret_defer = queue_request()
@@ -358,8 +358,8 @@ class _PerHostRatelimiter:
self.ready_request_queue.pop(request_id, None)
return r
- ret_defer.addCallbacks(on_start, on_err)
- ret_defer.addBoth(on_both)
+ ret_defer.addCallbacks(on_start, on_err) # type: ignore[unused-awaitable]
+ ret_defer.addBoth(on_both) # type: ignore[unused-awaitable]
return make_deferred_yieldable(ret_defer)
def _on_exit(self, request_id: object) -> None:
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index dcc037b982..9ddc00fddd 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -265,4 +265,4 @@ class RetryDestinationLimiter:
logger.exception("Failed to store destination_retry_timings")
# we deliberately do this in the background.
- run_as_background_process("store_retry_timings", store_retry_timings)
+ run_as_background_process("store_retry_timings", store_retry_timings) # type: ignore[unused-awaitable]
|