Repository for dip
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
tenant-catalog/kubeflow/experimental/seaweedfs/base/pipeline-profile-controller/sync.py

243 lines
10 KiB

# Copyright 2020-2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
import os
import base64
# From awscli installed in alpine/k8s image
import botocore.session
S3_BUCKET_NAME = 'mlpipeline'
session = botocore.session.get_session()
# To interact with seaweedfs user management. Region does not matter.
iam = session.create_client('iam', region_name='foobar')
def main():
settings = get_settings_from_env()
server = server_factory(**settings)
server.serve_forever()
def get_settings_from_env(controller_port=None,
visualization_server_image=None, frontend_image=None,
visualization_server_tag=None, frontend_tag=None, disable_istio_sidecar=None):
"""
Returns a dict of settings from environment variables relevant to the controller
Environment settings can be overridden by passing them here as arguments.
Settings are pulled from the all-caps version of the setting name. The
following defaults are used if those environment variables are not set
to enable backwards compatibility with previous versions of this script:
visualization_server_image: ghcr.io/kubeflow/kfp-visualization-server
visualization_server_tag: value of KFP_VERSION environment variable
frontend_image: ghcr.io/kubeflow/kfp-frontend
frontend_tag: value of KFP_VERSION environment variable
disable_istio_sidecar: Required (no default)
minio_access_key: Required (no default)
minio_secret_key: Required (no default)
"""
settings = dict()
settings["controller_port"] = \
controller_port or \
os.environ.get("CONTROLLER_PORT", "8080")
settings["visualization_server_image"] = \
visualization_server_image or \
os.environ.get("VISUALIZATION_SERVER_IMAGE", "ghcr.io/kubeflow/kfp-visualization-server")
settings["frontend_image"] = \
frontend_image or \
os.environ.get("FRONTEND_IMAGE", "ghcr.io/kubeflow/kfp-frontend")
# Look for specific tags for each image first, falling back to
# previously used KFP_VERSION environment variable for backwards
# compatibility
settings["visualization_server_tag"] = \
visualization_server_tag or \
os.environ.get("VISUALIZATION_SERVER_TAG") or \
os.environ["KFP_VERSION"]
settings["frontend_tag"] = \
frontend_tag or \
os.environ.get("FRONTEND_TAG") or \
os.environ["KFP_VERSION"]
settings["disable_istio_sidecar"] = \
disable_istio_sidecar if disable_istio_sidecar is not None \
else os.environ.get("DISABLE_ISTIO_SIDECAR") == "true"
return settings
def server_factory(visualization_server_image,
visualization_server_tag, frontend_image, frontend_tag,
disable_istio_sidecar, url="", controller_port=8080):
"""
Returns an HTTPServer populated with Handler with customized settings
"""
class Controller(BaseHTTPRequestHandler):
def sync(self, parent, attachments):
# parent is a namespace
namespace = parent.get("metadata", {}).get("name")
pipeline_enabled = parent.get("metadata", {}).get(
"labels", {}).get("pipelines.kubeflow.org/enabled")
if pipeline_enabled != "true":
return {"status": {}, "attachments": []}
# Compute status based on observed state.
desired_status = {
"kubeflow-pipelines-ready":
len(attachments["Secret.v1"]) == 1 and
len(attachments["ConfigMap.v1"]) == 3 and
len(attachments["Deployment.apps/v1"]) == 2 and
len(attachments["Service.v1"]) == 2 and
len(attachments["DestinationRule.networking.istio.io/v1alpha3"]) == 1 and
len(attachments["AuthorizationPolicy.security.istio.io/v1beta1"]) == 1 and
"True" or "False"
}
# Generate the desired attachment object(s).
desired_resources = [
{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "kfp-launcher",
"namespace": namespace,
},
"data": {
"defaultPipelineRoot": f"minio://{S3_BUCKET_NAME}/private-artifacts/{namespace}/v2/artifacts",
},
},
{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "metadata-grpc-configmap",
"namespace": namespace,
},
"data": {
"METADATA_GRPC_SERVICE_HOST":
"metadata-grpc-service.kubeflow",
"METADATA_GRPC_SERVICE_PORT": "8080",
},
},
{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "artifact-repositories",
"namespace": namespace,
"annotations": {
"workflows.argoproj.io/default-artifact-repository": "default-namespaced"
}
},
"data": {
"default-namespaced": json.dumps({
"archiveLogs": True,
"s3": {
"endpoint": "minio-service.kubeflow:9000",
"bucket": S3_BUCKET_NAME,
"keyFormat": f"private-artifacts/{namespace}/{{{{workflow.name}}}}/{{{{workflow.creationTimestamp.Y}}}}/{{{{workflow.creationTimestamp.m}}}}/{{{{workflow.creationTimestamp.d}}}}/{{{{pod.name}}}}",
"insecure": True,
"accessKeySecret": {
"name": "mlpipeline-minio-artifact",
"key": "accesskey",
},
"secretKeySecret": {
"name": "mlpipeline-minio-artifact",
"key": "secretkey",
}
}
})
}
},
]
print('Received request:\n', json.dumps(parent, sort_keys=True))
print('Desired resources except secrets:\n', json.dumps(desired_resources, sort_keys=True))
# Moved after the print argument because this is sensitive data.
# Check if secret is already there when the controller made the request. If yes, then
# use it. Else create a new credentials on seaweedfs for the namespace.
if s3_secret := attachments["Secret.v1"].get(f"{namespace}/mlpipeline-minio-artifact"):
desired_resources.append(s3_secret)
print('Using existing secret')
else:
print('Creating new access key.')
s3_access_key = iam.create_access_key(UserName=namespace)
# Use the AWS IAM API of seaweedfs to manage access policies to bucket.
# This policy ensures that a user can only access artifacts from his own profile.
iam.put_user_policy(
UserName=namespace,
PolicyName=f"KubeflowProject{namespace}",
PolicyDocument=json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"s3:Put*",
"s3:Get*",
"s3:List*"
],
"Resource": [
f"arn:aws:s3:::{S3_BUCKET_NAME}/artifacts/*",
f"arn:aws:s3:::{S3_BUCKET_NAME}/private-artifacts/{namespace}/*",
f"arn:aws:s3:::{S3_BUCKET_NAME}/private/{namespace}/*",
f"arn:aws:s3:::{S3_BUCKET_NAME}/shared/*",
]
}]
})
)
desired_resources.insert(
0,
{
"apiVersion": "v1",
"kind": "Secret",
"metadata": {
"name": "mlpipeline-minio-artifact",
"namespace": namespace,
},
"data": {
"accesskey": base64.b64encode(s3_access_key["AccessKey"]["AccessKeyId"].encode('utf-8')).decode("utf-8"),
"secretkey": base64.b64encode(s3_access_key["AccessKey"]["SecretAccessKey"].encode('utf-8')).decode("utf-8"),
},
})
return {"status": desired_status, "attachments": desired_resources}
def do_POST(self):
# Serve the sync() function as a JSON webhook.
observed = json.loads(
self.rfile.read(int(self.headers.get("content-length"))))
desired = self.sync(observed["object"], observed["attachments"])
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(bytes(json.dumps(desired), 'utf-8'))
return HTTPServer((url, int(controller_port)), Controller)
if __name__ == "__main__":
main()