Skip to content

Conversation

@savingoyal
Copy link
Collaborator

No description provided.

"METAFLOW_ARGO_WORKFLOWS_ENV_VARS_TO_SKIP",
ARGO_WORKFLOWS_ENV_VARS_TO_SKIP,
)
# TODO: Set this for AWS Batch too.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to set the right vars in argo workflows and airflow too

"METAFLOW_DATATOOLS_S3ROOT": DATATOOLS_S3ROOT,
"METAFLOW_DEFAULT_DATASTORE": self.flow_datastore.TYPE,
"METAFLOW_DEFAULT_METADATA": "service",
"METAFLOW_S3_WORKER_COUNT": max(1, int(k8s_deco.attributes["cpu"]) - 2),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we need a similar change in argo workflows for pods and jobsets?

.environment_variable("METAFLOW_DEFAULT_DATASTORE", "s3")
.environment_variable("METAFLOW_DEFAULT_METADATA", DEFAULT_METADATA)
.environment_variable("METAFLOW_CARD_S3ROOT", CARD_S3ROOT)
.environment_variable("METAFLOW_S3_WORKER_COUNT", max(1, int(cpu) - 2))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we need a change in step functions as well?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

step-functions calls the batch implementation so this should apply there as well. will verify

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confirmed, step-functions is setting the env correctly through the changes in Batch

@saikonen
Copy link
Collaborator

saikonen commented Feb 18, 2025

noting something extra that needs to be handled here:

from metaflow import step, FlowSpec, resources
import os

class S3WorkerFlow(FlowSpec):
    @resources(cpu=1)
    @step
    def start(self):
        val = int(os.environ.get("METAFLOW_S3_WORKER_COUNT"))
        assert val == 1, "Worker count should be 1!"
        self.next(self.bigger)

    @resources(cpu=4)
    @step
    def bigger(self):
        val = int(os.environ.get("METAFLOW_S3_WORKER_COUNT"))
        assert val == 2, "Worker count should be 2!"
        self.next(self.end)

    @step
    def end(self):
        print("Done! 🏁")


if __name__ == "__main__":
    S3WorkerFlow()

running this --with kubernetes or argo workflows fails, as resource decorator values are cast to a str-float, which fails to convert back to an int

@saikonen
Copy link
Collaborator

saikonen commented Mar 4, 2025

running this --with kubernetes or argo workflows fails, as resource decorator values are cast to a str-float, which fails to convert back to an int

The cause for this is the different defaults in @batch compared to @kubernetes for the cpu value, which leads to a string-float cpu value with resources&kubernetes, but a string-int cpu value for resources&batch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants