Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SparkKubernetesOperator fails to fetch the driver pod when SparkApplication is still in pending state #40495

Open
2 tasks done
akrava opened this issue Jun 29, 2024 · 0 comments · May be fixed by #40496
Open
2 tasks done
Labels
area:providers kind:bug This is a clearly a bug

Comments

@akrava
Copy link
Contributor

akrava commented Jun 29, 2024

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes==8.3.1

Apache Airflow version

2.9.2

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Other Docker-based deployment

Deployment details

Airflow is deployed on Kubernetes, also customized Spark Operator with custom API group is deployed on the same cluster.

What happened

I've a DAG with SparkKubernetesOperator task, which is failing. The issue is that SparkKubernetesOperator task is failed due it couldn't fine the driver pod, but SparkApplication is still in pending state at this moment, and pod is appeared in 1-2 minutes after the SparkKubernetesOperator task is failed.

What you think should happen instead

SparkKubernetesOperator task should wait the running state of the SparkApplication, and DAG should complete successfully.

How to reproduce

Prepare env with Airflow on Kubernetes and Spark Operator, deployed there.

Create a DAG, which consist of two files:

spark_pi.py
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.utils.dates import days_ago

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": days_ago(1),
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "max_active_runs": 1,
    "retries": 0,
}

dag = DAG(
    "spark_pi",
    default_args=default_args,
    schedule_interval=None,
)

submit = SparkKubernetesOperator(
    task_id="submit",
    application_file="spark_pi.yaml",
    delete_on_termination=False,
    dag=dag,
    enable_impersonation_from_ldap_user=True,
)
spark_pi.yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-pi
spec:
  type: Scala
  mode: cluster
  image: "spark:3.5.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar"
  sparkVersion: "3.5.0"
  restartPolicy:
    type: Never
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.5.0
    serviceAccount: spark-operator-spark
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.5.0

Launch this DAG via Airflow UI, check the status and log of the failed task.

Anything else

Logs of failed SparkKubernetesOperator task
/home/airflow/.local/lib/python3.11/site-packages/airflow/metrics/statsd_logger.py:184 RemovedInAirflow3Warning: The basic metric validator will be deprecated in the future in favor of pattern-matching.  You can try this now by setting config option metrics_use_pattern_match to True.
[2024-06-22T12:08:45.290+0000] {dagbag.py:545} INFO - Filling up the DagBag from /usr/local/airflow/dags/gitdags/Data-Engineering/Airflow/example_spark_pi.py
[2024-06-22T12:08:46.914+0000] {task_command.py:426} INFO - Running <TaskInstance: spark_pi.submit manual__2024-06-22T15:08:05+03:00 [queued]> on host spark-pi-submit-ydt5e5k7
[2024-06-22T12:08:47.090+0000] {local_task_job_runner.py:120} INFO - ::group::Pre task execution logs
[2024-06-22T12:08:47.156+0000] {taskinstance.py:2077} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: spark_pi.submit manual__2024-06-22T15:08:05+03:00 [queued]>
[2024-06-22T12:08:47.172+0000] {taskinstance.py:2077} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: spark_pi.submit manual__2024-06-22T15:08:05+03:00 [queued]>
[2024-06-22T12:08:47.172+0000] {taskinstance.py:2307} INFO - Starting attempt 1 of 1
[2024-06-22T12:08:47.281+0000] {taskinstance.py:2333} INFO - Executing <Task(SparkKubernetesOperator): submit> on 2024-06-22 12:08:05+00:00
[2024-06-22T12:08:47.293+0000] {standard_task_runner.py:63} INFO - Started process 14 to run task
[2024-06-22T12:08:47.302+0000] {standard_task_runner.py:90} INFO - Running: ['airflow', 'tasks', 'run', 'spark_pi', 'submit', 'manual__2024-06-22T15:08:05+03:00', '--job-id', '15', '--raw', '--subdir', 'DAGS_FOLDER/example_spark_pi.py', '--cfg-path', '/tmp/tmpna4el9de']
[2024-06-22T12:08:47.303+0000] {standard_task_runner.py:91} INFO - Job 15: Subtask submit
[2024-06-22T12:08:47.493+0000] {task_command.py:426} INFO - Running <TaskInstance: spark_pi.submit manual__2024-06-22T15:08:05+03:00 [running]> on host spark-pi-submit-ydt5e5k7
[2024-06-22T12:08:47.664+0000] {pod_generator.py:559} WARNING - Model file /usr/local/airflow/templates/dag_template.yaml does not exist
[2024-06-22T12:08:47.865+0000] {taskinstance.py:2655} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='spark_pi' AIRFLOW_CTX_TASK_ID='submit' AIRFLOW_CTX_EXECUTION_DATE='2024-06-22T12:08:05+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-06-22T15:08:05+03:00'
[2024-06-22T12:08:47.866+0000] {taskinstance.py:431} INFO - ::endgroup::
[2024-06-22T12:08:47.904+0000] {spark_kubernetes.py:289} INFO - Creating sparkApplication.
[2024-06-22T12:08:47.976+0000] {base.py:84} INFO - Using connection ID 'kubernetes_default' for task execution.
[2024-06-22T12:08:48.338+0000] {custom_object_launcher.py:305} WARNING - Spark job submitted but not yet started. job_id: submit-odir414j
[2024-06-22T12:09:01.358+0000] {baseoperator.py:400} WARNING - SparkKubernetesOperator.execute cannot be called outside TaskInstance!
[2024-06-22T12:09:04.605+0000] {taskinstance.py:442} INFO - ::group::Post task execution logs
[2024-06-22T12:09:04.606+0000] {taskinstance.py:2912} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 466, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 433, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 315, in execute
    return super().execute(context=context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 596, in execute
    return self.execute_sync(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 621, in execute_sync
    self.await_pod_start(pod=self.pod)
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 573, in await_pod_start
    self.pod_manager.await_pod_start(
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 375, in await_pod_start
    remote_pod = self.read_pod(pod)
                 ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 330, in wrapped_f
    return self(f, *args, **kw)
           ^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 467, in __call__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 368, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 410, in exc_check
    raise retry_exc.reraise()
          ^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 183, in reraise
    raise self.last_attempt.result()
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 470, in __call__
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 720, in read_pod
    return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 23693, in read_namespaced_pod
    return self.read_namespaced_pod_with_http_info(name, namespace, **kwargs)  # noqa: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 23780, in read_namespaced_pod_with_http_info
    return self.api_client.call_api(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 348, in call_api
    return self.__call_api(resource_path, method,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
    response_data = self.request(
                    ^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 373, in request
    return self.rest_client.GET(url,
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 244, in GET
    return self.request("GET", url,
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 238, in request
    raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': '819cd80f-f9ac-4b9f-b99b-626c78f42019', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '5e46e4d7-a775-46eb-a6d8-002bdf3ab578', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'e04343f6-6c68-4729-bf92-c244e7898392', 'Date': 'Sat, 22 Jun 2024 12:09:04 GMT', 'Content-Length': '212'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods \"submit-odir414j-driver\" not found","reason":"NotFound","details":{"name":"submit-odir414j-driver","kind":"pods"},"code":404}


[2024-06-22T12:09:04.754+0000] {miscellaneous.py:119} INFO - Successfully saved audit information about 'failed' event: Audit record was successfully entered
[2024-06-22T12:09:04.756+0000] {taskinstance.py:1207} INFO - Marking task as FAILED. dag_id=spark_pi, task_id=submit, run_id=manual__2024-06-22T15:08:05+03:00, execution_date=20240622T120805, start_date=20240622T120847, end_date=20240622T120904
[2024-06-22T12:09:04.882+0000] {standard_task_runner.py:110} ERROR - Failed to execute job 15 for task submit ((404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': '819cd80f-f9ac-4b9f-b99b-626c78f42019', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '5e46e4d7-a775-46eb-a6d8-002bdf3ab578', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'e04343f6-6c68-4729-bf92-c244e7898392', 'Date': 'Sat, 22 Jun 2024 12:09:04 GMT', 'Content-Length': '212'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods \"submit-odir414j-driver\" not found","reason":"NotFound","details":{"name":"submit-odir414j-driver","kind":"pods"},"code":404}

; 14)
[2024-06-22T12:09:04.907+0000] {local_task_job_runner.py:240} INFO - Task exited with return code 1
[2024-06-22T12:09:04.996+0000] {taskinstance.py:3512} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2024-06-22T12:09:05.004+0000] {local_task_job_runner.py:222} INFO - ::endgroup::

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@akrava akrava added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jun 29, 2024
@ephraimbuddy ephraimbuddy removed the needs-triage label for new issues that we didn't triage yet label Jul 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants