You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
96 lines
2.7 KiB
96 lines
2.7 KiB
#!/usr/bin/env python3
|
|
|
|
import kfp
|
|
import sys
|
|
import time
|
|
from kfp import dsl
|
|
from kfp_server_api.exceptions import ApiException
|
|
|
|
|
|
@dsl.component
|
|
def hello_world_op() -> str:
|
|
print("Hello World from Kubeflow Pipelines V2!")
|
|
return "Hello World"
|
|
|
|
|
|
@dsl.pipeline(
|
|
name="hello-world-v2",
|
|
description="A very simple hello world pipeline"
|
|
)
|
|
def hello_world_pipeline():
|
|
hello_world_op()
|
|
|
|
|
|
def run_pipeline(token, namespace):
|
|
client = kfp.Client(host="http://localhost:8080/pipeline", existing_token=token)
|
|
|
|
try:
|
|
pipelines = client.list_pipelines()
|
|
print(f"Successfully connected to KFP server, found {len(pipelines.pipelines)} pipelines")
|
|
|
|
experiment = client.create_experiment("v2-pipeline-test", namespace=namespace)
|
|
print(f"Created experiment: v2-pipeline-test in namespace {namespace}")
|
|
|
|
run = client.create_run_from_pipeline_func(
|
|
pipeline_func=hello_world_pipeline,
|
|
experiment_name="v2-pipeline-test",
|
|
run_name="v2-test-run",
|
|
arguments={},
|
|
namespace=namespace
|
|
)
|
|
|
|
run_id = run.run_id
|
|
|
|
for _ in range(30):
|
|
status = client.get_run(run_id=run_id).state
|
|
|
|
if status == "SUCCEEDED":
|
|
return
|
|
elif status not in ["PENDING", "RUNNING"]:
|
|
print(f"Pipeline failed with status: {status}")
|
|
|
|
pods = client._get_k8s_client().list_namespaced_pod(
|
|
namespace=namespace,
|
|
label_selector=f"pipeline/runid={run_id}"
|
|
)
|
|
|
|
print(f"Found {len(pods.items)} pods for this run")
|
|
for pod in pods.items:
|
|
print(f"Pod {pod.metadata.name}: {pod.status.phase}")
|
|
|
|
sys.exit(1)
|
|
|
|
time.sleep(10)
|
|
|
|
sys.exit(1)
|
|
|
|
except Exception as exception:
|
|
print(f"Error in pipeline execution: {exception}")
|
|
sys.exit(1)
|
|
|
|
|
|
def test_unauthorized_access(token, namespace):
|
|
client = kfp.Client(host="http://localhost:8080/pipeline", existing_token=token)
|
|
|
|
try:
|
|
pipeline = client.list_runs(namespace=namespace)
|
|
sys.exit(1)
|
|
except ApiException as exception:
|
|
if exception.status != 403:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) < 3:
|
|
sys.exit(1)
|
|
|
|
action = sys.argv[1]
|
|
token = sys.argv[2]
|
|
namespace = sys.argv[3]
|
|
|
|
if action == "run_pipeline":
|
|
run_pipeline(token, namespace)
|
|
elif action == "test_unauthorized_access":
|
|
test_unauthorized_access(token, namespace)
|
|
else:
|
|
sys.exit(1) |