4. 代码示例
声明您的Access Key、Secret Key、VPC资源的HCL语法模板、资源栈名称以及执行计划名称。
Copied!
ak = os.environ["HUAWEICLOUD_SDK_AK"]
sk = os.environ["HUAWEICLOUD_SDK_SK"]
project_id = "<YOUR PROJECT ID>"
template_body = "<YOUR TEMPLATE>"
stack_name = "<YOUR STACK NAME>"
通过Aos SDK创建一个Client,资源编排服务为region级别服务,将在您选择的region下生成目标资源。
Copied!
auth = BasicCredentials(
project_id=project_id,
ak=ak,
sk=sk
)
client = AosClient.new_builder() \
.with_credentials(credentials=auth) \
.with_region(region=AosRegion.CN_NORTH_4) \
.with_http_config(config) \
.build()
首先创建一个空的资源栈,在该步骤中请不要传入templateBody或templateUri,否则将立即触发部署资源栈。
Copied!
@staticmethod
def create_stack(client, stack_name):
create_stack_request = CreateStackRequest()
create_stack_request.client_request_id = str(uuid.uuid4())
create_stack_request_body = CreateStackRequestBody()
create_stack_request_body.stack_name = stack_name
create_stack_request.body = create_stack_request_body
response = client.create_stack(create_stack_request)
print("create stack return:" + str(response))
return response.stack_id
在当前资源栈下调用CreateExecutionPlan,资源编排将为您异步处理解析模板,生成执行计划,此时不会直接部署资源。
Copied!
@staticmethod
def create_execution_plan(client, stack_name, stack_id, execution_plan_name, template_body):
create_execution_plan_request = CreateExecutionPlanRequest()
create_execution_plan_request.client_request_id = str(uuid.uuid4())
create_execution_plan_request.stack_name = stack_name
create_execution_plan_request_body = CreateExecutionPlanRequestBody()
create_execution_plan_request_body.execution_plan_name = execution_plan_name
create_execution_plan_request_body.stack_id = stack_id
create_execution_plan_request_body.template_body = template_body
create_execution_plan_request.body = create_execution_plan_request_body
response = client.create_execution_plan(create_execution_plan_request)
print("create execution plan return: " + str(response))
return response.execution_plan_id
您可通过GetExecutionPlan获取执行计划的详细信息。
Copied!
@staticmethod
def get_execution_plan(client, stack_name, execution_plan_name, stack_id, execution_plan_id):
get_execution_plan_request = GetExecutionPlanRequest()
get_execution_plan_request.client_request_id = str(uuid.uuid4())
get_execution_plan_request.execution_plan_name = execution_plan_name
get_execution_plan_request.execution_plan_id = execution_plan_id
get_execution_plan_request.stack_name = stack_name
get_execution_plan_request.stack_id = stack_id
start_time = datetime.now().timestamp()
while True:
now_time = datetime.now().timestamp()
if now_time - start_time > Example.__timeout_interval:
print("create execution plan time out and will be exit")
raise UnExpectedException("create execution plan time out")
print("waiting for create execution plan ...")
try:
time.sleep(5)
except Exception as e:
print("waiting for create execution plan throw exception: " + str(e))
raise e
response = client.get_execution_plan(get_execution_plan_request)
if response.status_code != 404:
print("get execution plan return: " + str(response))
break
print("create execution plan in progress ...")
确认执行计划与目标资源状态一致后,可继续触发ApplyExecutionPlan,此时才会正式生成资源。
Copied!
@staticmethod
def apply_execution_plan(client, stack_name, execution_plan_name, stack_id, execution_plan_id):
apply_execution_plan_request = ApplyExecutionPlanRequest()
apply_execution_plan_request.client_request_id = str(uuid.uuid4())
apply_execution_plan_request.execution_plan_name = execution_plan_name
apply_execution_plan_request.stack_name = stack_name
apply_execution_plan_request_body = ApplyExecutionPlanRequestBody()
apply_execution_plan_request_body.execution_plan_id = execution_plan_id
apply_execution_plan_request_body.stack_id = stack_id
apply_execution_plan_request.body = apply_execution_plan_request_body
response = client.apply_execution_plan(apply_execution_plan_request)
print("apply execution plan return : " + str(response))
通过getExecutionPlanMetadata轮询获取执行计划的状态,等待部署完成。
Copied!
@staticmethod
def get_execution_plan_metadata(client, stack_name, execution_plan_name, stack_id, execution_plan_id):
get_execution_plan_metadata_request = GetExecutionPlanMetadataRequest()
get_execution_plan_metadata_request.client_request_id = str(uuid.uuid4())
get_execution_plan_metadata_request.execution_plan_name = execution_plan_name
get_execution_plan_metadata_request.stack_name = stack_name
get_execution_plan_metadata_request.execution_plan_id = execution_plan_id
get_execution_plan_metadata_request.stack_id = stack_id
start_time = datetime.now().timestamp()
while True:
now_time = datetime.now().timestamp()
if now_time - start_time > Example.__timeout_interval:
print("apply execution plan time out and will be exit")
raise UnExpectedException("apply execution plan time out")
print("waiting for apply execution plan ...")
try:
time.sleep(5)
except Exception as e:
print("waiting for apply execution plan throw exception: " + str(e))
raise e
response = client.get_execution_plan_metadata(get_execution_plan_metadata_request)
if response.status == "APPLIED":
print("execution plan applied!")
print("get execution plan metadata return: " + str(response))
break
print("apply execution plan in progress ...")
此时可通过ListStackResource来查看生成的资源。
Copied!
@staticmethod
def list_stack_resource(client, stack_id, stack_name):
list_stack_resources_request = ListStackResourcesRequest()
list_stack_resources_request.client_request_id = str(uuid.uuid4())
list_stack_resources_request.stack_id = stack_id
list_stack_resources_request.stack_name = stack_name
response = client.list_stack_resources(list_stack_resources_request)
print("list stack resources return: " + str(response))
最后,可以通过删除资源栈来删除资源栈、执行计划以及生成的资源。
Copied!
@staticmethod
def delete_stack(client, stack_id, stack_name):
delete_stack_request = DeleteStackRequest()
delete_stack_request.client_request_id = str(uuid.uuid4())
delete_stack_request.stack_id = stack_id
delete_stack_request.stack_name = stack_name
client.delete_stack(delete_stack_request)
get_stack_metadata_request = GetStackMetadataRequest()
get_stack_metadata_request.client_request_id = str(uuid.uuid4())
get_stack_metadata_request.stack_id = stack_id
get_stack_metadata_request.stack_name = stack_name
start_time = datetime.now().timestamp()
while True:
now_time = datetime.now().timestamp()
if now_time - start_time > Example.__timeout_interval:
print("delete stack time out and will be exit")
raise UnExpectedException("delete stack time out")
print("waiting for delete stack ...")
try:
time.sleep(5)
client.get_stack_metadata(get_stack_metadata_request)
except ClientRequestException as e:
if e.status_code == 404:
print("delete complete!")
return
print("delete stack throw exception: " + str(e))
except Exception as e:
print("waiting delete stack throw exception: " + str(e))
raise e
1. 示例简介
华为云提供了AOS服务端SDK,您可以直接集成服务端SDK来调用资源编排的相关API,从而实现对资源的快速操作。
该示例展示了如何通过Python语言版本AOS SDK使用资源栈快速创建您的资源。
2. 开发前准备
3. 安装SDK
使用 pip 安装 SDK 依赖包。
4. 代码示例
声明您的Access Key、Secret Key、VPC资源的HCL语法模板、资源栈名称以及执行计划名称。
# 认证用的ak和sk直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。 # 本示例以ak和sk保存在环境变量中来实现身份验证为例,运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK。 ak = os.environ["HUAWEICLOUD_SDK_AK"] sk = os.environ["HUAWEICLOUD_SDK_SK"] # stack以project_id为单位进行管理 project_id = "<YOUR PROJECT ID>" # HCL语法文本描述文件,支持tf、tf.json文件格式 template_body = "<YOUR TEMPLATE>" # stack_name是唯一的,用于标识您的资源栈 stack_name = "<YOUR STACK NAME>"
通过Aos SDK创建一个Client,资源编排服务为region级别服务,将在您选择的region下生成目标资源。
auth = BasicCredentials( project_id=project_id, ak=ak, sk=sk ) # 资源栈及其资源将创建在此region下,此处以CN_NORTH_4为例 client = AosClient.new_builder() \ .with_credentials(credentials=auth) \ .with_region(region=AosRegion.CN_NORTH_4) \ .with_http_config(config) \ .build()
首先创建一个空的资源栈,在该步骤中请不要传入templateBody或templateUri,否则将立即触发部署资源栈。
@staticmethod def create_stack(client, stack_name): create_stack_request = CreateStackRequest() create_stack_request.client_request_id = str(uuid.uuid4()) create_stack_request_body = CreateStackRequestBody() create_stack_request_body.stack_name = stack_name create_stack_request.body = create_stack_request_body response = client.create_stack(create_stack_request) print("create stack return:" + str(response)) return response.stack_id
在当前资源栈下调用CreateExecutionPlan,资源编排将为您异步处理解析模板,生成执行计划,此时不会直接部署资源。
@staticmethod def create_execution_plan(client, stack_name, stack_id, execution_plan_name, template_body): create_execution_plan_request = CreateExecutionPlanRequest() create_execution_plan_request.client_request_id = str(uuid.uuid4()) create_execution_plan_request.stack_name = stack_name create_execution_plan_request_body = CreateExecutionPlanRequestBody() create_execution_plan_request_body.execution_plan_name = execution_plan_name create_execution_plan_request_body.stack_id = stack_id create_execution_plan_request_body.template_body = template_body create_execution_plan_request.body = create_execution_plan_request_body response = client.create_execution_plan(create_execution_plan_request) print("create execution plan return: " + str(response)) return response.execution_plan_id
您可通过GetExecutionPlan获取执行计划的详细信息。
@staticmethod def get_execution_plan(client, stack_name, execution_plan_name, stack_id, execution_plan_id): get_execution_plan_request = GetExecutionPlanRequest() get_execution_plan_request.client_request_id = str(uuid.uuid4()) get_execution_plan_request.execution_plan_name = execution_plan_name get_execution_plan_request.execution_plan_id = execution_plan_id get_execution_plan_request.stack_name = stack_name get_execution_plan_request.stack_id = stack_id start_time = datetime.now().timestamp() while True: now_time = datetime.now().timestamp() if now_time - start_time > Example.__timeout_interval: print("create execution plan time out and will be exit") raise UnExpectedException("create execution plan time out") print("waiting for create execution plan ...") try: time.sleep(5) except Exception as e: print("waiting for create execution plan throw exception: " + str(e)) raise e response = client.get_execution_plan(get_execution_plan_request) if response.status_code != 404: print("get execution plan return: " + str(response)) break print("create execution plan in progress ...")
确认执行计划与目标资源状态一致后,可继续触发ApplyExecutionPlan,此时才会正式生成资源。
@staticmethod def apply_execution_plan(client, stack_name, execution_plan_name, stack_id, execution_plan_id): apply_execution_plan_request = ApplyExecutionPlanRequest() apply_execution_plan_request.client_request_id = str(uuid.uuid4()) apply_execution_plan_request.execution_plan_name = execution_plan_name apply_execution_plan_request.stack_name = stack_name apply_execution_plan_request_body = ApplyExecutionPlanRequestBody() apply_execution_plan_request_body.execution_plan_id = execution_plan_id apply_execution_plan_request_body.stack_id = stack_id apply_execution_plan_request.body = apply_execution_plan_request_body response = client.apply_execution_plan(apply_execution_plan_request) print("apply execution plan return : " + str(response))
通过getExecutionPlanMetadata轮询获取执行计划的状态,等待部署完成。
@staticmethod def get_execution_plan_metadata(client, stack_name, execution_plan_name, stack_id, execution_plan_id): get_execution_plan_metadata_request = GetExecutionPlanMetadataRequest() get_execution_plan_metadata_request.client_request_id = str(uuid.uuid4()) get_execution_plan_metadata_request.execution_plan_name = execution_plan_name get_execution_plan_metadata_request.stack_name = stack_name get_execution_plan_metadata_request.execution_plan_id = execution_plan_id get_execution_plan_metadata_request.stack_id = stack_id start_time = datetime.now().timestamp() while True: now_time = datetime.now().timestamp() if now_time - start_time > Example.__timeout_interval: print("apply execution plan time out and will be exit") raise UnExpectedException("apply execution plan time out") print("waiting for apply execution plan ...") try: time.sleep(5) except Exception as e: print("waiting for apply execution plan throw exception: " + str(e)) raise e response = client.get_execution_plan_metadata(get_execution_plan_metadata_request) if response.status == "APPLIED": print("execution plan applied!") print("get execution plan metadata return: " + str(response)) break print("apply execution plan in progress ...")
此时可通过ListStackResource来查看生成的资源。
@staticmethod def list_stack_resource(client, stack_id, stack_name): list_stack_resources_request = ListStackResourcesRequest() list_stack_resources_request.client_request_id = str(uuid.uuid4()) list_stack_resources_request.stack_id = stack_id list_stack_resources_request.stack_name = stack_name response = client.list_stack_resources(list_stack_resources_request) print("list stack resources return: " + str(response))
最后,可以通过删除资源栈来删除资源栈、执行计划以及生成的资源。
@staticmethod def delete_stack(client, stack_id, stack_name): delete_stack_request = DeleteStackRequest() delete_stack_request.client_request_id = str(uuid.uuid4()) delete_stack_request.stack_id = stack_id delete_stack_request.stack_name = stack_name client.delete_stack(delete_stack_request) get_stack_metadata_request = GetStackMetadataRequest() get_stack_metadata_request.client_request_id = str(uuid.uuid4()) get_stack_metadata_request.stack_id = stack_id get_stack_metadata_request.stack_name = stack_name start_time = datetime.now().timestamp() while True: now_time = datetime.now().timestamp() if now_time - start_time > Example.__timeout_interval: print("delete stack time out and will be exit") raise UnExpectedException("delete stack time out") print("waiting for delete stack ...") try: time.sleep(5) client.get_stack_metadata(get_stack_metadata_request) except ClientRequestException as e: if e.status_code == 404: print("delete complete!") return print("delete stack throw exception: " + str(e)) except Exception as e: print("waiting delete stack throw exception: " + str(e)) raise e
5. 参考
更多信息请参考:API Explorer
6. 修订记录