4.开始使用
4.1 导入依赖模块
Copied!
from huaweicloudsdkcfw.v1 import UpdateAclRuleOrderRequest, ListAclRuleHitCountRequest, DeleteAclRuleRequest, \
UpdateAclRuleRequest, ListAclRulesRequest, AddAclRuleRequest, ListEipsRequest
from huaweicloudsdkcfw.v1.cfw_client import CfwClient
from huaweicloudsdkcfw.v1.model.add_rule_acl_dto import AddRuleAclDto
from huaweicloudsdkcfw.v1.model.add_rule_acl_dto_rules import AddRuleAclDtoRules
from huaweicloudsdkcfw.v1.model.list_access_control_logs_request import ListAccessControlLogsRequest
from huaweicloudsdkcfw.v1.model.list_rule_hit_count_dto import ListRuleHitCountDto
from huaweicloudsdkcfw.v1.model.order_rule_acl_dto import OrderRuleAclDto
from huaweicloudsdkcfw.v1.model.rule_address_dto import RuleAddressDto
from huaweicloudsdkcfw.v1.model.rule_service_dto import RuleServiceDto
from huaweicloudsdkcfw.v1.model.update_rule_acl_dto import UpdateRuleAclDto
from huaweicloudsdkcfw.v1.region.cfw_region import CfwRegion
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkcore.exceptions.exceptions import ConnectionException
from huaweicloudsdkcore.exceptions.exceptions import RequestTimeoutException
from huaweicloudsdkcore.exceptions.exceptions import ServiceResponseException
4.2 初始化认证信息
Copied!
auth = BasicCredentials(ak=ak,sk=sk)
4.3 初始化防火墙客户端
Copied!
client = CfwClient.new_builder() \
.with_credentials(credentials=auth) \
.with_region(region=CfwRegion.value_of(region_id="<REGION ID>")) \
.build()
4.4 创建acl规则并使用
此节4.4.1-4.4.8示范了在console界面上如何操作,4.4.9示范了代码如何实现上述操作。
4.4.1 通过查询防护eip列表查询到一条防护eip的地址
![acl-1]()
4.4.2 添加一条acl规则,方向为外到内、名称为ceshi、源地址为0.0.0.0/0、目的地址类型为ip地址、目的地址为防护eip、服务类型为服务、协议类型为TCP、源端口为0-65535、目的端口为0-65535、不支持长连接、动作为阻断、启用状态为打开
![acl-2]()
4.4.3 通过acl列表获取规则id
![acl-3]()
4.4.4 查询访问控制日志,获得阻断的访问控制日志
![acl-3]()
4.4.5 查询规则id访问次数,获得访问规则规则击中次数
![acl-3]()
4.4.6 设置规则为置顶
![acl-4]()
4.4.7 更新acl规则为一个非防护eip的值,其余不变
![acl-5]()
4.4.8 删除acl规则
![acl-6]()
4.4.9 示例代码
Copied!
@staticmethod
def main(args):
# 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全;
# 本示例以ak和sk保存在环境变量中来实现身份验证为例,运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK。
ak = os.environ["HUAWEICLOUD_SDK_AK"]
sk = os.environ["HUAWEICLOUD_SDK_SK"]
auth = BasicCredentials(
ak=ak,
sk=sk
)
client = CfwClient.new_builder() \
.with_credentials(credentials=auth) \
.with_region(region=CfwRegion.value_of(region_id="<REGION ID>")) \
.build()
try:
# 4.4.1 通过查询防护eip列表查询到一条防护eip的地址
public_e_ip = AclRule.__query_eip(client)
# 4.4.2 添加一条acl规则,方向为外到内、名称为ceshi、源地址为0.0.0.0/0、目的地址类型为ip地址、目的地址为防护eip、服务类型为服务、协议类型为TCP、源端口为0-65535、目的端口为0-65535、不支持长连接、动作为阻断、启用状态为打开
_id = AclRule.__add_acl(client, public_e_ip)
# 4.4.3 通过acl列表获取规则id
AclRule.__query_rule_id(client)
# 4.4.4 查询访问控制日志,获得阻断的访问控制日志
AclRule.__query_access_log(client, public_e_ip)
# 4.4.5 查询acl规则的击中次数
AclRule.__query_rule_hit_count(client, _id)
# 4.4.6 将acl规则置顶
AclRule.__order_rule(client, _id)
# 4.4.7 更新acl规则为一个非防护eip的值,其余不变
AclRule.__update_acl(client, _id)
# 4.4.8 删除acl规则
AclRule.__delete_acl(client, _id)
except ConnectionException as e:
print(e.err_message)
except RequestTimeoutException as e:
print(e.err_message)
except ServiceResponseException as e:
print(e.status_code)
print(e.error_code)
print(e.error_msg)
@staticmethod
def __order_rule(client, rule_id):
update_acl_rule_order_request = UpdateAclRuleOrderRequest()
order_rule_acl_dto = OrderRuleAclDto()
order_rule_acl_dto.top = 1
update_acl_rule_order_request.acl_rule_id = rule_id
update_acl_rule_order_request.body = order_rule_acl_dto
client.update_acl_rule_order(update_acl_rule_order_request)
@staticmethod
def __query_rule_hit_count(client, rule_id):
list_acl_rule_hit_count_request = ListAclRuleHitCountRequest()
list_rule_hit_count_dto = ListRuleHitCountDto()
rule_ids = []
rule_ids.append(rule_id)
list_rule_hit_count_dto.rule_ids = rule_ids
list_acl_rule_hit_count_request.body=list_rule_hit_count_dto
list_acl_rule_hit_count_response = client.list_acl_rule_hit_count(list_acl_rule_hit_count_request)
print(list_acl_rule_hit_count_response)
@staticmethod
def __delete_acl(client, _id):
delete_acl_rule_request = DeleteAclRuleRequest()
delete_acl_rule_request.acl_rule_id = _id
delete_acl_rule_response = client.delete_acl_rule(delete_acl_rule_request)
print(delete_acl_rule_response)
@staticmethod
def __update_acl(client, _id):
update_acl_rule_request = UpdateAclRuleRequest()
update_acl_rule_request.acl_rule_id = _id
update_rule_acl_dto = UpdateRuleAclDto()
update_rule_acl_dto.action_type = 1
update_rule_acl_dto.address_type = 0
update_rule_acl_dto.description = ""
new_destination = RuleAddressDto()
new_destination.address = "1.1.1.1"
new_destination.type = 0
update_rule_acl_dto.destination = new_destination
update_rule_acl_dto.direction = 0
update_rule_acl_dto.long_connect_enable = 0
update_rule_acl_dto.name = "ceshiAcl"
rule_service_dto = RuleServiceDto()
rule_service_dto.dest_port = "0-65535"
rule_service_dto.source_port = "0-65535"
rule_service_dto.protocol = 6
rule_service_dto.type = 0
update_rule_acl_dto.service = rule_service_dto
source = RuleAddressDto()
source.address = "0.0.0.0/0"
source.type = 0
update_rule_acl_dto.source = source
update_rule_acl_dto.status = 1
update_rule_acl_dto.type = 0
update_acl_rule_request.body = update_rule_acl_dto
update_acl_rule_response = client.update_acl_rule(update_acl_rule_request)
print(update_acl_rule_response)
@staticmethod
def __query_access_log(client, public_e_ip):
list_access_control_logs_request = ListAccessControlLogsRequest()
list_access_control_logs_request.dst_ip = public_e_ip
list_access_control_logs_request.fw_instance_id = "<YOUR FirewallInstanceId>"
list_access_control_logs_request.start_time = 1670427589817
list_access_control_logs_request.end_time = 1670431189817
list_access_control_logs_request.limit = 10
list_access_control_logs_response = client.list_access_control_logs(list_access_control_logs_request)
print(list_access_control_logs_response)
@staticmethod
def __query_rule_id(client):
list_acl_rule_request = ListAclRulesRequest()
list_acl_rule_request.object_id = "<YOUR ObjectId>"
list_acl_rule_request.limit = 10
list_acl_rule_request.offset = 0
list_acl_rule_response = client.list_acl_rules(list_acl_rule_request)
rule_id = list_acl_rule_response.data.records[0].rule_id
print(rule_id)
return rule_id
@staticmethod
def __add_acl(client, public_e_ip):
add_acl_rule_request = AddAclRuleRequest()
add_rule_acl_dto = AddRuleAclDto()
add_rule_acl_dto.object_id = "<YOUR ObjectId>"
add_rule_acl_dto_rules_list = []
add_rule_acl_dto_rules = AddRuleAclDtoRules()
add_rule_acl_dto_rules.action_type = 1
add_rule_acl_dto_rules.address_type = 0
add_rule_acl_dto_rules.description = ""
destination = RuleAddressDto()
destination.address = public_e_ip
destination.type = 0
add_rule_acl_dto_rules.destination = destination
add_rule_acl_dto_rules.direction = 0
add_rule_acl_dto_rules.long_connect_enable = 0
add_rule_acl_dto_rules.name = "ceshiAcl"
order_rule_acl_dto = OrderRuleAclDto()
order_rule_acl_dto.top = 1
add_rule_acl_dto_rules.sequence = order_rule_acl_dto
rule_service_dto = RuleServiceDto()
rule_service_dto.dest_port = "0-65535"
rule_service_dto.source_port = "0-65535"
rule_service_dto.protocol = 6
rule_service_dto.type = 0
add_rule_acl_dto_rules.service = rule_service_dto
source = RuleAddressDto()
source.address = "0.0.0.0/0"
source.type = 0
add_rule_acl_dto_rules.source = source
add_rule_acl_dto_rules.status = 1
add_rule_acl_dto_rules_list.append(add_rule_acl_dto_rules)
add_rule_acl_dto.rules = add_rule_acl_dto_rules_list
add_rule_acl_dto.type = 0
add_acl_rule_request.body = add_rule_acl_dto
add_rule_acl_using_post_response = client.add_acl_rule(add_acl_rule_request)
_id = add_rule_acl_using_post_response.data.rules[0].id
print(_id)
return _id
@staticmethod
def __query_eip(client):
list_eips_request = ListEipsRequest()
list_eips_request.object_id = "<YOUR ObjectId>"
list_eips_request.limit = 10
list_eips_request.offset = 0
list_eips_request.sync = 1
list_eip_resources_response = client.list_eips(list_eips_request)
eip_resource = list_eip_resources_response.data.records[0]
public_e_ip = eip_resource.public_ip
print(public_e_ip)
return public_e_ip
if __name__ == "__main__":
AclRule().main(any)
5.FAQ
5.1 ObjectId是什么,如何获取
ObjectId是创建云防火墙后用于区分互联网边界防护和VPC边界防护的标志id,可通过调用API Explorer 查询防火墙实例 获取防护对象id(ObjectId),注意type为0的为互联网边界防护,type为1的为VPC边界防护。
![list-firewallinstance-2]()
5.2 FirewallInstanceId是什么,如何获取
FirewallInstanceId是创建云防火墙后用于标志防火墙由系统自动生成的标志id,可通过调用API Explorer 查询防火墙实例 获取防火墙id(FirewallInstanceId)
![list-firewallinstance-1]()
0.版本说明
本示例基于华为云SDK V3.0版本开发。
1.简介
华为云提供了CFW服务端SDK,您可以直接集成服务端SDK来调用CFW的相关API,从而实现对CFW的快速操作。 该示例展示如何通过CFW服务对已防护的eip采用访问控制进行防护,并通过增删改查的方式操作访问控制策略,同时查询因此生成的访问控制日志
2.开发前准备
3.安装sdk
以使用云防火墙 CFW SDK 为例,您需要安装 huaweicloudsdkcfw:
4.开始使用
4.1 导入依赖模块
from huaweicloudsdkcfw.v1 import UpdateAclRuleOrderRequest, ListAclRuleHitCountRequest, DeleteAclRuleRequest, \ UpdateAclRuleRequest, ListAclRulesRequest, AddAclRuleRequest, ListEipsRequest from huaweicloudsdkcfw.v1.cfw_client import CfwClient from huaweicloudsdkcfw.v1.model.add_rule_acl_dto import AddRuleAclDto from huaweicloudsdkcfw.v1.model.add_rule_acl_dto_rules import AddRuleAclDtoRules from huaweicloudsdkcfw.v1.model.list_access_control_logs_request import ListAccessControlLogsRequest from huaweicloudsdkcfw.v1.model.list_rule_hit_count_dto import ListRuleHitCountDto from huaweicloudsdkcfw.v1.model.order_rule_acl_dto import OrderRuleAclDto from huaweicloudsdkcfw.v1.model.rule_address_dto import RuleAddressDto from huaweicloudsdkcfw.v1.model.rule_service_dto import RuleServiceDto from huaweicloudsdkcfw.v1.model.update_rule_acl_dto import UpdateRuleAclDto from huaweicloudsdkcfw.v1.region.cfw_region import CfwRegion from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkcore.exceptions.exceptions import ConnectionException from huaweicloudsdkcore.exceptions.exceptions import RequestTimeoutException from huaweicloudsdkcore.exceptions.exceptions import ServiceResponseException
4.2 初始化认证信息
4.3 初始化防火墙客户端
client = CfwClient.new_builder() \ .with_credentials(credentials=auth) \ .with_region(region=CfwRegion.value_of(region_id="<REGION ID>")) \ .build()
4.4 创建acl规则并使用
此节4.4.1-4.4.8示范了在console界面上如何操作,4.4.9示范了代码如何实现上述操作。
4.4.1 通过查询防护eip列表查询到一条防护eip的地址
4.4.2 添加一条acl规则,方向为外到内、名称为ceshi、源地址为0.0.0.0/0、目的地址类型为ip地址、目的地址为防护eip、服务类型为服务、协议类型为TCP、源端口为0-65535、目的端口为0-65535、不支持长连接、动作为阻断、启用状态为打开
4.4.3 通过acl列表获取规则id
4.4.4 查询访问控制日志,获得阻断的访问控制日志
4.4.5 查询规则id访问次数,获得访问规则规则击中次数
4.4.6 设置规则为置顶
4.4.7 更新acl规则为一个非防护eip的值,其余不变
4.4.8 删除acl规则
4.4.9 示例代码
@staticmethod def main(args): # 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全; # 本示例以ak和sk保存在环境变量中来实现身份验证为例,运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK。 ak = os.environ["HUAWEICLOUD_SDK_AK"] sk = os.environ["HUAWEICLOUD_SDK_SK"] auth = BasicCredentials( ak=ak, sk=sk ) client = CfwClient.new_builder() \ .with_credentials(credentials=auth) \ .with_region(region=CfwRegion.value_of(region_id="<REGION ID>")) \ .build() try: # 4.4.1 通过查询防护eip列表查询到一条防护eip的地址 public_e_ip = AclRule.__query_eip(client) # 4.4.2 添加一条acl规则,方向为外到内、名称为ceshi、源地址为0.0.0.0/0、目的地址类型为ip地址、目的地址为防护eip、服务类型为服务、协议类型为TCP、源端口为0-65535、目的端口为0-65535、不支持长连接、动作为阻断、启用状态为打开 _id = AclRule.__add_acl(client, public_e_ip) # 4.4.3 通过acl列表获取规则id AclRule.__query_rule_id(client) # 4.4.4 查询访问控制日志,获得阻断的访问控制日志 AclRule.__query_access_log(client, public_e_ip) # 4.4.5 查询acl规则的击中次数 AclRule.__query_rule_hit_count(client, _id) # 4.4.6 将acl规则置顶 AclRule.__order_rule(client, _id) # 4.4.7 更新acl规则为一个非防护eip的值,其余不变 AclRule.__update_acl(client, _id) # 4.4.8 删除acl规则 AclRule.__delete_acl(client, _id) except ConnectionException as e: print(e.err_message) except RequestTimeoutException as e: print(e.err_message) except ServiceResponseException as e: print(e.status_code) print(e.error_code) print(e.error_msg) @staticmethod def __order_rule(client, rule_id): update_acl_rule_order_request = UpdateAclRuleOrderRequest() order_rule_acl_dto = OrderRuleAclDto() order_rule_acl_dto.top = 1 update_acl_rule_order_request.acl_rule_id = rule_id update_acl_rule_order_request.body = order_rule_acl_dto client.update_acl_rule_order(update_acl_rule_order_request) @staticmethod def __query_rule_hit_count(client, rule_id): list_acl_rule_hit_count_request = ListAclRuleHitCountRequest() list_rule_hit_count_dto = ListRuleHitCountDto() rule_ids = [] rule_ids.append(rule_id) list_rule_hit_count_dto.rule_ids = rule_ids list_acl_rule_hit_count_request.body=list_rule_hit_count_dto list_acl_rule_hit_count_response = client.list_acl_rule_hit_count(list_acl_rule_hit_count_request) print(list_acl_rule_hit_count_response) @staticmethod def __delete_acl(client, _id): delete_acl_rule_request = DeleteAclRuleRequest() delete_acl_rule_request.acl_rule_id = _id delete_acl_rule_response = client.delete_acl_rule(delete_acl_rule_request) print(delete_acl_rule_response) @staticmethod def __update_acl(client, _id): update_acl_rule_request = UpdateAclRuleRequest() update_acl_rule_request.acl_rule_id = _id update_rule_acl_dto = UpdateRuleAclDto() update_rule_acl_dto.action_type = 1 update_rule_acl_dto.address_type = 0 update_rule_acl_dto.description = "" new_destination = RuleAddressDto() new_destination.address = "1.1.1.1" new_destination.type = 0 update_rule_acl_dto.destination = new_destination update_rule_acl_dto.direction = 0 update_rule_acl_dto.long_connect_enable = 0 update_rule_acl_dto.name = "ceshiAcl" rule_service_dto = RuleServiceDto() rule_service_dto.dest_port = "0-65535" rule_service_dto.source_port = "0-65535" rule_service_dto.protocol = 6 rule_service_dto.type = 0 update_rule_acl_dto.service = rule_service_dto source = RuleAddressDto() source.address = "0.0.0.0/0" source.type = 0 update_rule_acl_dto.source = source update_rule_acl_dto.status = 1 update_rule_acl_dto.type = 0 update_acl_rule_request.body = update_rule_acl_dto update_acl_rule_response = client.update_acl_rule(update_acl_rule_request) print(update_acl_rule_response) @staticmethod def __query_access_log(client, public_e_ip): list_access_control_logs_request = ListAccessControlLogsRequest() list_access_control_logs_request.dst_ip = public_e_ip list_access_control_logs_request.fw_instance_id = "<YOUR FirewallInstanceId>" list_access_control_logs_request.start_time = 1670427589817 list_access_control_logs_request.end_time = 1670431189817 list_access_control_logs_request.limit = 10 list_access_control_logs_response = client.list_access_control_logs(list_access_control_logs_request) print(list_access_control_logs_response) @staticmethod def __query_rule_id(client): list_acl_rule_request = ListAclRulesRequest() list_acl_rule_request.object_id = "<YOUR ObjectId>" list_acl_rule_request.limit = 10 list_acl_rule_request.offset = 0 list_acl_rule_response = client.list_acl_rules(list_acl_rule_request) rule_id = list_acl_rule_response.data.records[0].rule_id print(rule_id) return rule_id @staticmethod def __add_acl(client, public_e_ip): add_acl_rule_request = AddAclRuleRequest() add_rule_acl_dto = AddRuleAclDto() add_rule_acl_dto.object_id = "<YOUR ObjectId>" add_rule_acl_dto_rules_list = [] add_rule_acl_dto_rules = AddRuleAclDtoRules() add_rule_acl_dto_rules.action_type = 1 add_rule_acl_dto_rules.address_type = 0 add_rule_acl_dto_rules.description = "" destination = RuleAddressDto() destination.address = public_e_ip destination.type = 0 add_rule_acl_dto_rules.destination = destination add_rule_acl_dto_rules.direction = 0 add_rule_acl_dto_rules.long_connect_enable = 0 add_rule_acl_dto_rules.name = "ceshiAcl" order_rule_acl_dto = OrderRuleAclDto() order_rule_acl_dto.top = 1 add_rule_acl_dto_rules.sequence = order_rule_acl_dto rule_service_dto = RuleServiceDto() rule_service_dto.dest_port = "0-65535" rule_service_dto.source_port = "0-65535" rule_service_dto.protocol = 6 rule_service_dto.type = 0 add_rule_acl_dto_rules.service = rule_service_dto source = RuleAddressDto() source.address = "0.0.0.0/0" source.type = 0 add_rule_acl_dto_rules.source = source add_rule_acl_dto_rules.status = 1 add_rule_acl_dto_rules_list.append(add_rule_acl_dto_rules) add_rule_acl_dto.rules = add_rule_acl_dto_rules_list add_rule_acl_dto.type = 0 add_acl_rule_request.body = add_rule_acl_dto add_rule_acl_using_post_response = client.add_acl_rule(add_acl_rule_request) _id = add_rule_acl_using_post_response.data.rules[0].id print(_id) return _id @staticmethod def __query_eip(client): list_eips_request = ListEipsRequest() list_eips_request.object_id = "<YOUR ObjectId>" list_eips_request.limit = 10 list_eips_request.offset = 0 list_eips_request.sync = 1 list_eip_resources_response = client.list_eips(list_eips_request) eip_resource = list_eip_resources_response.data.records[0] public_e_ip = eip_resource.public_ip print(public_e_ip) return public_e_ip if __name__ == "__main__": AclRule().main(any)
5.FAQ
5.1 ObjectId是什么,如何获取
ObjectId是创建云防火墙后用于区分互联网边界防护和VPC边界防护的标志id,可通过调用API Explorer 查询防火墙实例 获取防护对象id(ObjectId),注意type为0的为互联网边界防护,type为1的为VPC边界防护。![list-firewallinstance-2]()
5.2 FirewallInstanceId是什么,如何获取
FirewallInstanceId是创建云防火墙后用于标志防火墙由系统自动生成的标志id,可通过调用API Explorer 查询防火墙实例 获取防火墙id(FirewallInstanceId)![list-firewallinstance-1]()
6.参考
更多信息请参考API Explorer
7.修订记录