Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DAG] Incompatible .execute() API for DAG and Compiled DAG #46441

Open
woshiyyya opened this issue Jul 5, 2024 · 2 comments · May be fixed by #46604
Open

[DAG] Incompatible .execute() API for DAG and Compiled DAG #46441

woshiyyya opened this issue Jul 5, 2024 · 2 comments · May be fixed by #46604
Labels
accelerated-dag bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core P1 Issue that should be fixed within a few weeks

Comments

@woshiyyya
Copy link
Member

woshiyyya commented Jul 5, 2024

What happened + What you expected to happen

The .execute() api does support list as input if we don't compile the DAG, but raised the following error if we compile it

Error Message:

Traceback (most recent call last):
  File "/home/ray/default/check_adag_api.py", line 20, in <module>
    output_dag = output_dag.experimental_compile()
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/dag/dag_node.py", line 170, in experimental_compile
    return build_compiled_dag_from_ray_dag(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/dag/compiled_dag_node.py", line 1436, in build_compiled_dag_from_ray_dag
    compiled_dag._get_or_compile()
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/dag/compiled_dag_node.py", line 869, in _get_or_compile
    task.output_channel = do_allocate_channel(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/dag/compiled_dag_node.py", line 75, in do_allocate_channel
    output_channel = typ.create_channel(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/experimental/channel/shared_memory_channel.py", line 160, in create_channel
    return CompositeChannel(writer, readers)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/experimental/channel/shared_memory_channel.py", line 580, in __init__
    remote_channel = Channel(self._writer, remote_readers)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/experimental/channel/shared_memory_channel.py", line 265, in __init__
    raise ValueError(
ValueError: All reader actors must be on the same node. Actor Actor(Worker, 44ddb4610f3edeebf51d88f512000000) is on node bf15b8e2d57e78ce12128e7a3c414e0dacf94dab0f5fb87892be8f5c while actor Actor(Worker, e254c21017a6b49754b1c49912000000) is on node 579237461d0bae4d2558398fbe524ca2437e50b7f5c5a23d95a7c221.

If I force all DAG actors on the same node, it raises another error:

Traceback (most recent call last):
  File "/home/ray/default/check_adag_api.py", line 22, in <module>
    print(ray.get(output_dag.execute([0, 1])))
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/dag/compiled_dag_node.py", line 1320, in execute
    self._check_inputs(args, kwargs)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/dag/compiled_dag_node.py", line 1343, in _check_inputs
    raise ValueError(
ValueError: dag.execute() or dag.execute_async() must be called with 2 positional args, got 1

Versions / Dependencies

nightly

Reproduction script

import ray
from ray.dag.input_node import InputNode
from ray.dag.output_node import MultiOutputNode

@ray.remote
class Worker:
    def __init__(self) -> None:
        pass

    def func(self, num):
        return num

workers = [Worker.remote() for _ in range(4)]

with InputNode() as dag:
    node_0 = workers[0].func.bind(dag[0])
    node_1 = workers[1].func.bind(dag[1])
    output_dag = MultiOutputNode([node_0, node_1])

# Uncomment the next line to reproduce
# output_dag = output_dag.experimental_compile()
print(ray.get(output_dag.execute([0, 1])))

Issue Severity

High: It blocks me from completing my task.

@woshiyyya woshiyyya added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) accelerated-dag labels Jul 5, 2024
@anyscalesam anyscalesam added the core Issues that should be addressed in Ray Core label Jul 8, 2024
@jjyao jjyao added P1 Issue that should be fixed within a few weeks and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Jul 8, 2024
@Bye-legumes Bye-legumes linked a pull request Jul 12, 2024 that will close this issue
5 tasks
@jackhumphries
Copy link
Contributor

Note that the first issue about all readers needing to be on the same node will be fixed soon (tracked in #46269).

@jackhumphries
Copy link
Contributor

The second issue can be resolved by passing the two arguments to execute() directly rather than via a list:

print(ray.get(output_dag.execute(0, 1)))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accelerated-dag bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants