-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core][experimental] Support multiple readers for IntraProcessChannel #46431
Conversation
Signed-off-by: kaihsun <[email protected]>
Signed-off-by: kaihsun <[email protected]>
Signed-off-by: kaihsun <[email protected]>
Signed-off-by: kaihsun <[email protected]>
# Worker 0: FFFBBB | ||
assert ray.get(worker_0.get_logs.remote()) == [ | ||
"FWD rank-0, batch-0", | ||
"FWD rank-0, batch-1", | ||
"FWD rank-0, batch-2", | ||
"BWD rank-0, batch-0", | ||
"BWD rank-0, batch-1", | ||
"BWD rank-0, batch-2", | ||
] | ||
# Worker 1: BBB | ||
assert ray.get(worker_1.get_logs.remote()) == [ | ||
"BWD rank-1, batch-0", | ||
"BWD rank-1, batch-1", | ||
"BWD rank-1, batch-2", | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! This is exactly what we need!
|
||
def set_use_external_transport(self, use_external_transport: bool) -> None: | ||
self.use_external_transport = use_external_transport | ||
|
||
def set_data(self, channel_id: str, value: Any) -> None: | ||
def set_data(self, channel_id: str, value: Any, num_readers: int) -> None: | ||
assert num_readers > 0, "num_readers must be greater than 0." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A newbie question here: Seems that there will be one _SerializationContext
per DAG actor. If there's nobody reading the returned value of this node (this is a leaf node), will it raise an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that there will be one _SerializationContext per DAG actor.
You are correct.
If there's nobody reading the returned value of this node (this is a leaf node), will it raise an error?
For IntraProcessChannel, if there is no reader, the channel will not be created. You can see the logic in shared_memory_channel.py
.
if num_local_readers > 0:
local_channel = IntraProcessChannel(num_local_readers)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the leaf node, I think we currently don't do anything. Maybe we should raise a ValueError or warning message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see. from my experiments, these leaf node will not be executed.
It's not a blocker for now, since there will be no leaf node in PP. But for FSDP, there might be some collective calls which doesn't has return values, thus becomes leaf nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM
Signed-off-by: kaihsun <[email protected]>
Signed-off-by: kaihsun <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought you planned to raise an error for non-used leaf node in this PR?
I will handle the leaf node in a separate PR. I think it is not relevant to this pull request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving the change to unblock train workload.
I think we should follow up to better understand edge cases such as leaf nodes (e.g., adding more tests) and properly handle it (e.g., raise error or support).
Example leaf node scenario:
driver --> a --> b --> driver
|
----> c
@woshiyyya @ruisearch42 open an issue #46528 to track the progress. |
Why are these changes needed?
This PR enables
IntraProcessChannel
to be read more than once. Before this PR, the data caches in serialization_context would be removed if read once. This PR also adds a test to simulate the pattern of pipeline parallelism.Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.