Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support @task.spark_submit or @task.pyspark_submit #40566

Open
2 tasks done
softyoungha opened this issue Jul 2, 2024 · 0 comments · May be fixed by #40633
Open
2 tasks done

Support @task.spark_submit or @task.pyspark_submit #40566

softyoungha opened this issue Jul 2, 2024 · 0 comments · May be fixed by #40633

Comments

@softyoungha
Copy link
Contributor

softyoungha commented Jul 2, 2024

Description

Hello,
I'm an user of Airflow for managing Spark jobs.
I was wondering if there's any plan to provide @task.spark_submit decorator in the future.

Actually, I’ve been using a custom provider that I made myself.
In my situation, I write and execute pyspark code locally, and thenI transform the code to a form that can be submitted to YARN finally.
Transforming the code into a form suitable for YARN (just creating separate .py file) is not particularly difficult.
However, when the task code and the YARN submission code are separated, the folder structure within the DAG can become complex, and the task code and PySpark job files cannot reference each other in the IDE.

by @task.spark_submit (or @task.pyspark_submit?), it would improve development productivity significantly and very convenient.
This is because we could replace the existing @task.pyspark decorators by simply changing the decorator name and injecting the YARN options.

all I need to do is just changing the decorator name and inject the YARN options.

Use case/motivation

The idea is to borrow the approach from the existing @task.pyspark and @task.docker decorators.
It works by writing a temporary py file to /tmp and then submitting that file.

class _SparkSubmitDecoratedOperator(DecoratedOperator, SparkSubmitOperator):
    custom_operator_name = "@task.spark_submit"
    
    # In the constructor, Some codes like removing the sc and spark arguments are located .

    ...
    def execute(self, context: Context):
        with TemporaryDirectory() as tmp_dir:
            input_filename = os.path.join(tmp_dir, "script.in")
            script_filename = os.path.join(tmp_dir, "script.py")
            output_filename = os.path.join(tmp_dir, "script.out")
            error_filename = os.path.join(tmp_dir, "script.err")

            with open(input_filename, "w", encoding="utf-8") as file:
                if self.op_args or self.op_kwargs:
                    self.pickling_library.dump(
                        {"args": self.op_args, "kwargs": self.op_kwargs}, file
                    )

            py_source = self.get_python_source()
            write_python_script(
                jinja_context={
                    "op_args": self.op_args,
                    "op_kwargs": self.op_kwargs,
                    "pickling_library": self.pickling_library.__name__,
                    "python_callable": self.python_callable.__name__,
                    "python_callable_source": py_source,
                    "expect_airflow": True,
                    "string_args_global": False,
                },
                filename=script_filename,
            )
            self.application = script_filename
            self.application_args = [
                input_filename,
                output_filename,
                "--no-string-args",
                error_filename,
            ]
            return super().execute(context)

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@softyoungha softyoungha added kind:feature Feature Requests needs-triage label for new issues that we didn't triage yet labels Jul 2, 2024
@eladkal eladkal removed the needs-triage label for new issues that we didn't triage yet label Jul 6, 2024
@softyoungha softyoungha linked a pull request Jul 6, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants