You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello,
I'm an user of Airflow for managing Spark jobs.
I was wondering if there's any plan to provide @task.spark_submit decorator in the future.
Actually, I’ve been using a custom provider that I made myself.
In my situation, I write and execute pyspark code locally, and thenI transform the code to a form that can be submitted to YARN finally.
Transforming the code into a form suitable for YARN (just creating separate .py file) is not particularly difficult.
However, when the task code and the YARN submission code are separated, the folder structure within the DAG can become complex, and the task code and PySpark job files cannot reference each other in the IDE.
by @task.spark_submit (or @task.pyspark_submit?), it would improve development productivity significantly and very convenient.
This is because we could replace the existing @task.pyspark decorators by simply changing the decorator name and injecting the YARN options.
all I need to do is just changing the decorator name and inject the YARN options.
Use case/motivation
The idea is to borrow the approach from the existing @task.pyspark and @task.docker decorators.
It works by writing a temporary py file to /tmp and then submitting that file.
class_SparkSubmitDecoratedOperator(DecoratedOperator, SparkSubmitOperator):
custom_operator_name="@task.spark_submit"# In the constructor, Some codes like removing the sc and spark arguments are located .
...
defexecute(self, context: Context):
withTemporaryDirectory() astmp_dir:
input_filename=os.path.join(tmp_dir, "script.in")
script_filename=os.path.join(tmp_dir, "script.py")
output_filename=os.path.join(tmp_dir, "script.out")
error_filename=os.path.join(tmp_dir, "script.err")
withopen(input_filename, "w", encoding="utf-8") asfile:
ifself.op_argsorself.op_kwargs:
self.pickling_library.dump(
{"args": self.op_args, "kwargs": self.op_kwargs}, file
)
py_source=self.get_python_source()
write_python_script(
jinja_context={
"op_args": self.op_args,
"op_kwargs": self.op_kwargs,
"pickling_library": self.pickling_library.__name__,
"python_callable": self.python_callable.__name__,
"python_callable_source": py_source,
"expect_airflow": True,
"string_args_global": False,
},
filename=script_filename,
)
self.application=script_filenameself.application_args= [
input_filename,
output_filename,
"--no-string-args",
error_filename,
]
returnsuper().execute(context)
Description
Hello,
I'm an user of Airflow for managing Spark jobs.
I was wondering if there's any plan to provide
@task.spark_submit
decorator in the future.Actually, I’ve been using a custom provider that I made myself.
In my situation, I write and execute pyspark code locally, and thenI transform the code to a form that can be submitted to YARN finally.
Transforming the code into a form suitable for YARN (just creating separate .py file) is not particularly difficult.
However, when the task code and the YARN submission code are separated, the folder structure within the DAG can become complex, and the task code and PySpark job files cannot reference each other in the IDE.
by
@task.spark_submit
(or@task.pyspark_submit
?), it would improve development productivity significantly and very convenient.This is because we could replace the existing @task.pyspark decorators by simply changing the decorator name and injecting the YARN options.
Use case/motivation
The idea is to borrow the approach from the existing
@task.pyspark
and@task.docker
decorators.It works by writing a temporary py file to /tmp and then submitting that file.
Related issues
No response
Are you willing to submit a PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: