Repository for dip
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
tenant-catalog/kubeflow/tests/gh-actions/test_pipeline_v2.py

96 lines
2.7 KiB

#!/usr/bin/env python3
import kfp
import sys
import time
from kfp import dsl
from kfp_server_api.exceptions import ApiException
@dsl.component
def hello_world_op() -> str:
print("Hello World from Kubeflow Pipelines V2!")
return "Hello World"
@dsl.pipeline(
name="hello-world-v2",
description="A very simple hello world pipeline"
)
def hello_world_pipeline():
hello_world_op()
def run_pipeline(token, namespace):
client = kfp.Client(host="http://localhost:8080/pipeline", existing_token=token)
try:
pipelines = client.list_pipelines()
print(f"Successfully connected to KFP server, found {len(pipelines.pipelines)} pipelines")
experiment = client.create_experiment("v2-pipeline-test", namespace=namespace)
print(f"Created experiment: v2-pipeline-test in namespace {namespace}")
run = client.create_run_from_pipeline_func(
pipeline_func=hello_world_pipeline,
experiment_name="v2-pipeline-test",
run_name="v2-test-run",
arguments={},
namespace=namespace
)
run_id = run.run_id
for _ in range(30):
status = client.get_run(run_id=run_id).state
if status == "SUCCEEDED":
return
elif status not in ["PENDING", "RUNNING"]:
print(f"Pipeline failed with status: {status}")
pods = client._get_k8s_client().list_namespaced_pod(
namespace=namespace,
label_selector=f"pipeline/runid={run_id}"
)
print(f"Found {len(pods.items)} pods for this run")
for pod in pods.items:
print(f"Pod {pod.metadata.name}: {pod.status.phase}")
sys.exit(1)
time.sleep(10)
sys.exit(1)
except Exception as exception:
print(f"Error in pipeline execution: {exception}")
sys.exit(1)
def test_unauthorized_access(token, namespace):
client = kfp.Client(host="http://localhost:8080/pipeline", existing_token=token)
try:
pipeline = client.list_runs(namespace=namespace)
sys.exit(1)
except ApiException as exception:
if exception.status != 403:
sys.exit(1)
if __name__ == "__main__":
if len(sys.argv) < 3:
sys.exit(1)
action = sys.argv[1]
token = sys.argv[2]
namespace = sys.argv[3]
if action == "run_pipeline":
run_pipeline(token, namespace)
elif action == "test_unauthorized_access":
test_unauthorized_access(token, namespace)
else:
sys.exit(1)