You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
243 lines
10 KiB
243 lines
10 KiB
# Copyright 2020-2021 The Kubeflow Authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
import json
|
|
import os
|
|
import base64
|
|
|
|
# From awscli installed in alpine/k8s image
|
|
import botocore.session
|
|
|
|
S3_BUCKET_NAME = 'mlpipeline'
|
|
|
|
session = botocore.session.get_session()
|
|
# To interact with seaweedfs user management. Region does not matter.
|
|
iam = session.create_client('iam', region_name='foobar')
|
|
|
|
|
|
def main():
|
|
settings = get_settings_from_env()
|
|
server = server_factory(**settings)
|
|
server.serve_forever()
|
|
|
|
|
|
def get_settings_from_env(controller_port=None,
|
|
visualization_server_image=None, frontend_image=None,
|
|
visualization_server_tag=None, frontend_tag=None, disable_istio_sidecar=None):
|
|
"""
|
|
Returns a dict of settings from environment variables relevant to the controller
|
|
|
|
Environment settings can be overridden by passing them here as arguments.
|
|
|
|
Settings are pulled from the all-caps version of the setting name. The
|
|
following defaults are used if those environment variables are not set
|
|
to enable backwards compatibility with previous versions of this script:
|
|
visualization_server_image: ghcr.io/kubeflow/kfp-visualization-server
|
|
visualization_server_tag: value of KFP_VERSION environment variable
|
|
frontend_image: ghcr.io/kubeflow/kfp-frontend
|
|
frontend_tag: value of KFP_VERSION environment variable
|
|
disable_istio_sidecar: Required (no default)
|
|
minio_access_key: Required (no default)
|
|
minio_secret_key: Required (no default)
|
|
"""
|
|
settings = dict()
|
|
settings["controller_port"] = \
|
|
controller_port or \
|
|
os.environ.get("CONTROLLER_PORT", "8080")
|
|
|
|
settings["visualization_server_image"] = \
|
|
visualization_server_image or \
|
|
os.environ.get("VISUALIZATION_SERVER_IMAGE", "ghcr.io/kubeflow/kfp-visualization-server")
|
|
|
|
settings["frontend_image"] = \
|
|
frontend_image or \
|
|
os.environ.get("FRONTEND_IMAGE", "ghcr.io/kubeflow/kfp-frontend")
|
|
|
|
# Look for specific tags for each image first, falling back to
|
|
# previously used KFP_VERSION environment variable for backwards
|
|
# compatibility
|
|
settings["visualization_server_tag"] = \
|
|
visualization_server_tag or \
|
|
os.environ.get("VISUALIZATION_SERVER_TAG") or \
|
|
os.environ["KFP_VERSION"]
|
|
|
|
settings["frontend_tag"] = \
|
|
frontend_tag or \
|
|
os.environ.get("FRONTEND_TAG") or \
|
|
os.environ["KFP_VERSION"]
|
|
|
|
settings["disable_istio_sidecar"] = \
|
|
disable_istio_sidecar if disable_istio_sidecar is not None \
|
|
else os.environ.get("DISABLE_ISTIO_SIDECAR") == "true"
|
|
|
|
return settings
|
|
|
|
|
|
def server_factory(visualization_server_image,
|
|
visualization_server_tag, frontend_image, frontend_tag,
|
|
disable_istio_sidecar, url="", controller_port=8080):
|
|
"""
|
|
Returns an HTTPServer populated with Handler with customized settings
|
|
"""
|
|
class Controller(BaseHTTPRequestHandler):
|
|
def sync(self, parent, attachments):
|
|
# parent is a namespace
|
|
namespace = parent.get("metadata", {}).get("name")
|
|
|
|
pipeline_enabled = parent.get("metadata", {}).get(
|
|
"labels", {}).get("pipelines.kubeflow.org/enabled")
|
|
|
|
if pipeline_enabled != "true":
|
|
return {"status": {}, "attachments": []}
|
|
|
|
# Compute status based on observed state.
|
|
desired_status = {
|
|
"kubeflow-pipelines-ready":
|
|
len(attachments["Secret.v1"]) == 1 and
|
|
len(attachments["ConfigMap.v1"]) == 3 and
|
|
len(attachments["Deployment.apps/v1"]) == 2 and
|
|
len(attachments["Service.v1"]) == 2 and
|
|
len(attachments["DestinationRule.networking.istio.io/v1alpha3"]) == 1 and
|
|
len(attachments["AuthorizationPolicy.security.istio.io/v1beta1"]) == 1 and
|
|
"True" or "False"
|
|
}
|
|
|
|
# Generate the desired attachment object(s).
|
|
desired_resources = [
|
|
{
|
|
"apiVersion": "v1",
|
|
"kind": "ConfigMap",
|
|
"metadata": {
|
|
"name": "kfp-launcher",
|
|
"namespace": namespace,
|
|
},
|
|
"data": {
|
|
"defaultPipelineRoot": f"minio://{S3_BUCKET_NAME}/private-artifacts/{namespace}/v2/artifacts",
|
|
},
|
|
},
|
|
{
|
|
"apiVersion": "v1",
|
|
"kind": "ConfigMap",
|
|
"metadata": {
|
|
"name": "metadata-grpc-configmap",
|
|
"namespace": namespace,
|
|
},
|
|
"data": {
|
|
"METADATA_GRPC_SERVICE_HOST":
|
|
"metadata-grpc-service.kubeflow",
|
|
"METADATA_GRPC_SERVICE_PORT": "8080",
|
|
},
|
|
},
|
|
{
|
|
"apiVersion": "v1",
|
|
"kind": "ConfigMap",
|
|
"metadata": {
|
|
"name": "artifact-repositories",
|
|
"namespace": namespace,
|
|
"annotations": {
|
|
"workflows.argoproj.io/default-artifact-repository": "default-namespaced"
|
|
}
|
|
},
|
|
"data": {
|
|
"default-namespaced": json.dumps({
|
|
"archiveLogs": True,
|
|
"s3": {
|
|
"endpoint": "minio-service.kubeflow:9000",
|
|
"bucket": S3_BUCKET_NAME,
|
|
"keyFormat": f"private-artifacts/{namespace}/{{{{workflow.name}}}}/{{{{workflow.creationTimestamp.Y}}}}/{{{{workflow.creationTimestamp.m}}}}/{{{{workflow.creationTimestamp.d}}}}/{{{{pod.name}}}}",
|
|
"insecure": True,
|
|
"accessKeySecret": {
|
|
"name": "mlpipeline-minio-artifact",
|
|
"key": "accesskey",
|
|
},
|
|
"secretKeySecret": {
|
|
"name": "mlpipeline-minio-artifact",
|
|
"key": "secretkey",
|
|
}
|
|
}
|
|
})
|
|
}
|
|
},
|
|
]
|
|
print('Received request:\n', json.dumps(parent, sort_keys=True))
|
|
print('Desired resources except secrets:\n', json.dumps(desired_resources, sort_keys=True))
|
|
|
|
# Moved after the print argument because this is sensitive data.
|
|
|
|
# Check if secret is already there when the controller made the request. If yes, then
|
|
# use it. Else create a new credentials on seaweedfs for the namespace.
|
|
if s3_secret := attachments["Secret.v1"].get(f"{namespace}/mlpipeline-minio-artifact"):
|
|
desired_resources.append(s3_secret)
|
|
print('Using existing secret')
|
|
else:
|
|
print('Creating new access key.')
|
|
s3_access_key = iam.create_access_key(UserName=namespace)
|
|
# Use the AWS IAM API of seaweedfs to manage access policies to bucket.
|
|
# This policy ensures that a user can only access artifacts from his own profile.
|
|
iam.put_user_policy(
|
|
UserName=namespace,
|
|
PolicyName=f"KubeflowProject{namespace}",
|
|
PolicyDocument=json.dumps(
|
|
{
|
|
"Version": "2012-10-17",
|
|
"Statement": [{
|
|
"Effect": "Allow",
|
|
"Action": [
|
|
"s3:Put*",
|
|
"s3:Get*",
|
|
"s3:List*"
|
|
],
|
|
"Resource": [
|
|
f"arn:aws:s3:::{S3_BUCKET_NAME}/artifacts/*",
|
|
f"arn:aws:s3:::{S3_BUCKET_NAME}/private-artifacts/{namespace}/*",
|
|
f"arn:aws:s3:::{S3_BUCKET_NAME}/private/{namespace}/*",
|
|
f"arn:aws:s3:::{S3_BUCKET_NAME}/shared/*",
|
|
]
|
|
}]
|
|
})
|
|
)
|
|
desired_resources.insert(
|
|
0,
|
|
{
|
|
"apiVersion": "v1",
|
|
"kind": "Secret",
|
|
"metadata": {
|
|
"name": "mlpipeline-minio-artifact",
|
|
"namespace": namespace,
|
|
},
|
|
"data": {
|
|
"accesskey": base64.b64encode(s3_access_key["AccessKey"]["AccessKeyId"].encode('utf-8')).decode("utf-8"),
|
|
"secretkey": base64.b64encode(s3_access_key["AccessKey"]["SecretAccessKey"].encode('utf-8')).decode("utf-8"),
|
|
},
|
|
})
|
|
|
|
return {"status": desired_status, "attachments": desired_resources}
|
|
|
|
def do_POST(self):
|
|
# Serve the sync() function as a JSON webhook.
|
|
observed = json.loads(
|
|
self.rfile.read(int(self.headers.get("content-length"))))
|
|
desired = self.sync(observed["object"], observed["attachments"])
|
|
|
|
self.send_response(200)
|
|
self.send_header("Content-type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(bytes(json.dumps(desired), 'utf-8'))
|
|
|
|
return HTTPServer((url, int(controller_port)), Controller)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|