红框位置的信息都需要配置成自己的,鉴权使用了apig的sdk
代码结构如下
main.py是程序的入口,我的开发流程是将apig整个工程下载下来以后,直接在main.py上进行更改。运行环境是modelarts的notebook。
job_configs下面包含创建的配置文件,配置文件的内容获取方式参考,里面默认用的是resnet50这个算法
https://bbs.huaweicloud.com/blogs/238421
create_wait_job_finish_v2()适配的是新版本训练
check_train_url()方法是用来动态的创建训练的输出目录,如果是在本地进行运行,那么这里可以注释掉,如果需要动态的创建输出目录,可以使用obsutil工具来实现
obsutil参考 https://bbs.huaweicloud.com/blogs/230127
# coding=utf-8
import requests
from apig_sdk import signer
import json
import time
import os
def check_train_url(obs_url):
import moxing as mox
if mox.file.exists(obs_url) == False:
mox.file.mk_dir(obs_url)
print("create obs dir ", obs_url)
else:
print("obs dir exist")
index = "26"
JOB_NAME = "train_test" + index
MODEL_NAME = "model_test" + index
SERVICE_NAME = "service_test" + index
DATA_URL = "<YOUR-OWN-DATA-PATH>"
TRAIN_URL_BUCKET = "<YOUR-OWN-BUCKET-NAME>"
TRAIN_FILE_URL = "<YOUR-OWN-OUTPUT-OBS-PATH>"
TRAIN_URL = TRAIN_URL_BUCKET + TRAIN_FILE_URL
check_train_url("obs:/" + TRAIN_URL)
#
obs_bucket_url = "https://<YOUR-OWN-BUCKET-NAME>.obs.myhwclouds.com"
MODEL_SRC_LOCATION = obs_bucket_url + TRAIN_FILE_URL + "model/"
MODEL_EXEC_CODE_LOCATION = obs_bucket_url + TRAIN_FILE_URL + "model/customize_service.py"
sig = signer.Signer()
own_ak = "<YOUR-OWN-AK>"
own_sk = "<YOUR-OWN-SK>"
sig.Key = own_ak
sig.Secret = own_sk
project_id = "<YOUR-OWN-PROJECTID>"
#base_header此为北京四地址
base_header = "https://modelarts.cn-north-4.myhuaweicloud.com/"
url_header = base_header + "v2/"
url_v1_header = base_header + "v1/"
def create_job_v2():
operation = "/training-jobs"
train_url = url_header + project_id + operation
job_config_file_path = "job_configs/create_job_v2.json"
with open(job_config_file_path, "r") as f:
load_dict = json.load(f)
load_dict["metadata"]["name"] = JOB_NAME
#print(json.dumps(load_dict))
r = signer.HttpRequest("POST",
train_url,
{"x-stage": "RELEASE", "content-type": "application/json"},
json.dumps(load_dict))
sig.Sign(r)
resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
#print(resp.status_code, resp.reason)
#print(json.loads(resp.content.decode()))
jobid = ""
if resp.status_code == 201 or resp.status_code == 200:
jobid = json.loads(resp.content.decode())["metadata"]["id"]
#print(jobid)
return jobid
def create_job_v1():
operation = "/training-jobs"
train_url = url_v1_header + project_id + operation
job_config_file_path = "job_configs/create_job_v1.json"
with open(job_config_file_path, "r") as f:
load_dict = json.load(f)
load_dict["job_name"] = JOB_NAME
load_dict["config"]["inputs"][0]["value"] = TRAIN_URL
load_dict["config"]["inputs"][0]["data_source"]["obs"]["obs"] = TRAIN_URL
load_dict["config"]["outputs"][0]["value"] = DATA_URL
load_dict["config"]["outputs"][0]["data_source"]["obs"]["obs"] = DATA_URL
#print(json.dumps(load_dict))
r = signer.HttpRequest("POST",
train_url,
{"x-stage": "RELEASE", "content-type": "application/json"},
json.dumps(load_dict))
sig.Sign(r)
resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
#print(resp.status_code, resp.reason)
#print(json.loads(resp.content.decode()))
jobid = ""
if resp.status_code == 201 or resp.status_code == 200:
jobid = json.loads(resp.content.decode())["job_id"]
versionid = json.loads(resp.content.decode())["version_id"]
#print(jobid, versionid)
else:
print("create job failed")
exit(1)
#print(jobid)
return jobid, versionid
def get_job_status_v2(jobid):
operation = "/training-jobs"
get_job_url = url_header + project_id + operation + "/" + jobid
r = signer.HttpRequest("GET",
get_job_url,
{"x-stage": "RELEASE"})
sig.Sign(r)
resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
#print(resp.status_code, resp.reason)
print(json.loads(resp.content.decode())["status"]["phase"])
return json.loads(resp.content.decode())["status"]["phase"]
def get_job_status_v1(jobid, versionid):
operation = "/training-jobs"
get_job_url = url_v1_header + project_id + operation + "/" + str(jobid) + "/versions/" + str(versionid)
r = signer.HttpRequest("GET",
get_job_url,
{"x-stage": "RELEASE"})
sig.Sign(r)
resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
#print(resp.status_code, resp.reason)
#print(json.loads(resp.content.decode())["status"])
return json.loads(resp.content.decode())["status"]
def import_model():
operation = "/models"
model_url = url_v1_header + project_id + operation
model_import_config_file_path = "job_configs/model_import.json"
with open(model_import_config_file_path, "r") as f:
load_dict = json.load(f)
load_dict["model_name"] = MODEL_NAME
#print(MODEL_SRC_LOCATION)
#print(MODEL_EXEC_CODE_LOCATION)
load_dict["source_location"] = MODEL_SRC_LOCATION
load_dict["execution_code"] = MODEL_EXEC_CODE_LOCATION
r = signer.HttpRequest("POST",
model_url,
{"x-stage": "RELEASE", "content-type": "application/json"},
json.dumps(load_dict))
sig.Sign(r)
resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
print(resp.status_code, resp.reason)
#print(json.loads(resp.content.decode()))
if resp.status_code != 200:
print("import model failed")
exit(1)
modelid = json.loads(resp.content.decode())["model_id"]
print(modelid)
return modelid
def get_model_status(modelid):
operation = "/models/"
get_model_url = url_v1_header + project_id + operation + modelid
r = signer.HttpRequest("GET",
get_model_url,
{"x-stage": "RELEASE"})
sig.Sign(r)
resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
#print(json.loads(resp.content.decode())["model_status"])
return json.loads(resp.content.decode())["model_status"]
def deploy_model(modelid):
operation = "/services"
model_deploy_url = url_v1_header + project_id + operation
model_deploy_config_file_path = "job_configs/model_deploy.json"
with open(model_deploy_config_file_path, "r") as f:
load_dict = json.load(f)
load_dict["service_name"] = SERVICE_NAME
#load_dict["config"]["model_id"] = modelid
r = signer.HttpRequest("POST",
model_deploy_url,
{"x-stage": "RELEASE", "content-type": "application/json"},
json.dumps(load_dict))
sig.Sign(r)
resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
print(json.loads(resp.content.decode())["service_id"])
return json.loads(resp.content.decode())["service_id"]
def get_service_status(serviceid):
operation = "/services/"
service_url = url_v1_header + project_id + operation + serviceid
r = signer.HttpRequest("GET",
service_url,
{"x-stage": "RELEASE"})
sig.Sign(r)
resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
try:
test = json.loads(resp.content.decode())["status"]
except:
print(json.loads(resp.content.decode()))
return json.loads(resp.content.decode())["status"]
def create_wait_job_finish_v2():
jobid = create_job_v2()
if jobid == "":
print("create job failed")
exit(1)
while(1):
jobstatus = get_job_status_v2(jobid)
print(jobstatus)
if jobstatus == "Completed":
break
if jobstatus == "Failed":
print("job run failed")
exit(1)
time.sleep(5)
def create_wait_job_finish_v1():
jobid,versionid = create_job_v1()
if jobid == "":
print("create job failed")
exit(1)
while(1):
jobstatus = get_job_status_v1(jobid, versionid)
print(jobstatus)
if jobstatus == 10:
break
if jobstatus == 11:
print("job run failed")
exit(1)
time.sleep(10)
def import_deploy_model():
modelid = import_model()
while(1):
modelstatus = get_model_status(modelid)
print(modelstatus)
if modelstatus == "published":
break
if modelstatus == "publishing" or modelstatus == "building":
time.sleep(5)
continue
print("import model failed")
exit(1)
serviceid = deploy_model(modelid)
while(1):
time.sleep(10)
servicestatus = get_service_status(serviceid)
print(servicestatus)
if servicestatus == "running":
break
else:
continue
if servicestatus == "deploying":
continue
print("import model failed")
exit(1)
if __name__ == '__main__':
#create_wait_job_finish_v2()
create_wait_job_finish_v1()
import_deploy_model()