华为云ModelArts通过resnet50预置算法rest接口aksk鉴权创建训练作业,导入模型,部署服务

红框位置的信息都需要配置成自己的,鉴权使用了apig的sdk

代码结构如下

main.py是程序的入口,我的开发流程是将apig整个工程下载下来以后,直接在main.py上进行更改。运行环境是modelarts的notebook。

job_configs下面包含创建的配置文件,配置文件的内容获取方式参考,里面默认用的是resnet50这个算法

https://bbs.huaweicloud.com/blogs/238421

create_wait_job_finish_v2()适配的是新版本训练

check_train_url()方法是用来动态的创建训练的输出目录,如果是在本地进行运行,那么这里可以注释掉,如果需要动态的创建输出目录,可以使用obsutil工具来实现

obsutil参考 https://bbs.huaweicloud.com/blogs/230127


# coding=utf-8
import requests
from apig_sdk import signer
import json
import time
import os

def check_train_url(obs_url):
    import moxing as mox
    if mox.file.exists(obs_url) == False:
        mox.file.mk_dir(obs_url)
        print("create obs dir ", obs_url)
    else:
        print("obs dir exist")

index = "26"
JOB_NAME = "train_test" + index
MODEL_NAME = "model_test" + index
SERVICE_NAME = "service_test" + index

DATA_URL = "<YOUR-OWN-DATA-PATH>"
TRAIN_URL_BUCKET = "<YOUR-OWN-BUCKET-NAME>"
TRAIN_FILE_URL = "<YOUR-OWN-OUTPUT-OBS-PATH>"
TRAIN_URL = TRAIN_URL_BUCKET + TRAIN_FILE_URL

check_train_url("obs:/" + TRAIN_URL)

#
obs_bucket_url = "https://<YOUR-OWN-BUCKET-NAME>.obs.myhwclouds.com"

MODEL_SRC_LOCATION = obs_bucket_url + TRAIN_FILE_URL + "model/"
MODEL_EXEC_CODE_LOCATION = obs_bucket_url + TRAIN_FILE_URL + "model/customize_service.py"


sig = signer.Signer()
own_ak = "<YOUR-OWN-AK>"
own_sk = "<YOUR-OWN-SK>"
sig.Key = own_ak
sig.Secret = own_sk
project_id = "<YOUR-OWN-PROJECTID>"
#base_header此为北京四地址
base_header = "https://modelarts.cn-north-4.myhuaweicloud.com/"
url_header = base_header + "v2/"
url_v1_header = base_header + "v1/"


def create_job_v2():
    operation = "/training-jobs"
    train_url = url_header + project_id + operation
    job_config_file_path = "job_configs/create_job_v2.json"
    with open(job_config_file_path, "r") as f:
        load_dict = json.load(f)
    load_dict["metadata"]["name"] = JOB_NAME
    #print(json.dumps(load_dict))
    r = signer.HttpRequest("POST",
                           train_url,
                          {"x-stage": "RELEASE", "content-type": "application/json"},
                          json.dumps(load_dict))
    sig.Sign(r)
    resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
    #print(resp.status_code, resp.reason)
    #print(json.loads(resp.content.decode()))
    jobid = ""
    if resp.status_code == 201 or  resp.status_code == 200:
        jobid = json.loads(resp.content.decode())["metadata"]["id"]
    #print(jobid)
    return jobid

def create_job_v1():
    operation = "/training-jobs"
    train_url = url_v1_header + project_id + operation
    job_config_file_path = "job_configs/create_job_v1.json"
    with open(job_config_file_path, "r") as f:
        load_dict = json.load(f)
    load_dict["job_name"] = JOB_NAME
    load_dict["config"]["inputs"][0]["value"] = TRAIN_URL
    load_dict["config"]["inputs"][0]["data_source"]["obs"]["obs"] = TRAIN_URL
    load_dict["config"]["outputs"][0]["value"] = DATA_URL
    load_dict["config"]["outputs"][0]["data_source"]["obs"]["obs"] = DATA_URL
    
    #print(json.dumps(load_dict))
    r = signer.HttpRequest("POST",
                           train_url,
                          {"x-stage": "RELEASE", "content-type": "application/json"},
                          json.dumps(load_dict))
    sig.Sign(r)
    resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
    #print(resp.status_code, resp.reason)
    #print(json.loads(resp.content.decode()))
    jobid = ""
    if resp.status_code == 201 or resp.status_code == 200:
        jobid = json.loads(resp.content.decode())["job_id"]
        versionid = json.loads(resp.content.decode())["version_id"]
        #print(jobid, versionid)
    else:
        print("create job failed")
        exit(1)
    #print(jobid)
    return jobid, versionid

def get_job_status_v2(jobid):
    operation = "/training-jobs"
    get_job_url = url_header + project_id + operation + "/" + jobid
    r = signer.HttpRequest("GET",
                           get_job_url,
                           {"x-stage": "RELEASE"})
    sig.Sign(r)
    resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
    #print(resp.status_code, resp.reason)
    print(json.loads(resp.content.decode())["status"]["phase"])
    return json.loads(resp.content.decode())["status"]["phase"]

def get_job_status_v1(jobid, versionid):
    operation = "/training-jobs"
    get_job_url = url_v1_header + project_id + operation + "/" + str(jobid) + "/versions/" + str(versionid)
    r = signer.HttpRequest("GET",
                           get_job_url,
                           {"x-stage": "RELEASE"})
    sig.Sign(r)
    resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
    #print(resp.status_code, resp.reason)
    #print(json.loads(resp.content.decode())["status"])
    return json.loads(resp.content.decode())["status"]



def import_model():
    operation = "/models"
    model_url = url_v1_header + project_id + operation
    model_import_config_file_path = "job_configs/model_import.json"
    with open(model_import_config_file_path, "r") as f:
        load_dict = json.load(f)
    load_dict["model_name"] = MODEL_NAME
    #print(MODEL_SRC_LOCATION)
    #print(MODEL_EXEC_CODE_LOCATION)
    load_dict["source_location"] = MODEL_SRC_LOCATION
    load_dict["execution_code"] = MODEL_EXEC_CODE_LOCATION
    r = signer.HttpRequest("POST",
                           model_url,
                          {"x-stage": "RELEASE", "content-type": "application/json"},
                          json.dumps(load_dict))
    sig.Sign(r)
    resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)

    print(resp.status_code, resp.reason)
    #print(json.loads(resp.content.decode()))
    if resp.status_code != 200:
        print("import model failed")
        exit(1)
    modelid = json.loads(resp.content.decode())["model_id"]
    print(modelid)
    return modelid

def get_model_status(modelid):
    operation = "/models/"
    get_model_url = url_v1_header + project_id + operation + modelid
    r = signer.HttpRequest("GET",
                           get_model_url,
                           {"x-stage": "RELEASE"})
    sig.Sign(r)
    resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
    #print(json.loads(resp.content.decode())["model_status"])
    return json.loads(resp.content.decode())["model_status"]


def deploy_model(modelid):
    operation = "/services"
    model_deploy_url = url_v1_header + project_id + operation
    model_deploy_config_file_path = "job_configs/model_deploy.json"
    with open(model_deploy_config_file_path, "r") as f:
        load_dict = json.load(f)
    load_dict["service_name"] = SERVICE_NAME
    #load_dict["config"]["model_id"] = modelid
    r = signer.HttpRequest("POST",
                           model_deploy_url,
                          {"x-stage": "RELEASE", "content-type": "application/json"},
                          json.dumps(load_dict))
    sig.Sign(r)
    resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
    print(json.loads(resp.content.decode())["service_id"])
    return json.loads(resp.content.decode())["service_id"]

def get_service_status(serviceid):
    operation = "/services/"
    service_url = url_v1_header + project_id + operation + serviceid
    r = signer.HttpRequest("GET",
                           service_url,
                           {"x-stage": "RELEASE"})
    sig.Sign(r)
    resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
    try:
        test = json.loads(resp.content.decode())["status"]
    except:
        print(json.loads(resp.content.decode()))
    return json.loads(resp.content.decode())["status"]

def create_wait_job_finish_v2():
    jobid = create_job_v2()
    if jobid == "":
        print("create job failed")
        exit(1)
    while(1):
        jobstatus = get_job_status_v2(jobid)
        print(jobstatus)
        if jobstatus == "Completed":
            break
        if jobstatus == "Failed":
            print("job run failed")
            exit(1)
        time.sleep(5)
        
def create_wait_job_finish_v1():
    jobid,versionid = create_job_v1()
    if jobid == "":
        print("create job failed")
        exit(1)
    while(1):
        jobstatus = get_job_status_v1(jobid, versionid)
        print(jobstatus)
        if jobstatus == 10:
            break
        if jobstatus == 11:
            print("job run failed")
            exit(1)
        time.sleep(10)

def import_deploy_model():
    modelid = import_model()
    while(1):
        
        modelstatus = get_model_status(modelid)
        print(modelstatus)
        if modelstatus == "published":
            break
        if modelstatus == "publishing" or modelstatus == "building":
            time.sleep(5)
            continue
        print("import model failed")
        exit(1)
    serviceid = deploy_model(modelid)
    
    while(1):
        time.sleep(10)
        servicestatus = get_service_status(serviceid)
        print(servicestatus)
        if servicestatus == "running":
            break
        else:
            continue
        if servicestatus == "deploying":
            continue
        print("import model failed")
        exit(1)

if __name__ == '__main__':
    
    #create_wait_job_finish_v2()
    create_wait_job_finish_v1()
    import_deploy_model()




(完)