CFW访问控制策略操作示例
引导式阅读
Python
CFW访问控制策略操作示例
作者
c***r
上架时间
2023-11-14 03:17:10

0.版本说明

本示例基于华为云SDK V3.0版本开发。

1.简介

华为云提供了CFW服务端SDK,您可以直接集成服务端SDK来调用CFW的相关API,从而实现对CFW的快速操作。 该示例展示如何通过CFW服务对已防护的eip采用访问控制进行防护,并通过增删改查的方式操作访问控制策略,同时查询因此生成的访问控制日志

2.开发前准备

  • 注册 华为云,并完成 实名认证
  • 华为云 Python SDK 支持 python3.3以上 的版本。可执行 python --version 检查当前 python 的版本信息。
  • 已获取华为云账号对应的Access Key(AK)和Secret Access Key(SK)。请在华为云控制台“我的凭证 > 访问密钥”页面上创建和查看您的AK/SK。具体请参见 访问秘钥
  • 已获取对应区域的项目,请在华为云控制台“我的凭证 > API凭证 > 项目列表”页面上查看项目,例如:cn-north-4。具体请参见 API凭证
  • 需要在 云防火墙 购买防火墙,并且在 弹性云服务器 购买弹性云服务器。
  • 通过调用API Explorer 查询防火墙实例 获取防火墙id(FirewallInstanceId)、防护对象id(ObjectId),详见5.FAQ
  • 通过弹性云服务器模拟访问控制流量。

3.安装sdk

以使用云防火墙 CFW SDK 为例,您需要安装 huaweicloudsdkcfw:

pip install huaweicloudsdkcfw

4.开始使用

4.1 导入依赖模块

from huaweicloudsdkcfw.v1 import UpdateAclRuleOrderRequest, ListAclRuleHitCountRequest, DeleteAclRuleRequest, \ UpdateAclRuleRequest, ListAclRulesRequest, AddAclRuleRequest, ListEipsRequest from huaweicloudsdkcfw.v1.cfw_client import CfwClient from huaweicloudsdkcfw.v1.model.add_rule_acl_dto import AddRuleAclDto from huaweicloudsdkcfw.v1.model.add_rule_acl_dto_rules import AddRuleAclDtoRules from huaweicloudsdkcfw.v1.model.list_access_control_logs_request import ListAccessControlLogsRequest from huaweicloudsdkcfw.v1.model.list_rule_hit_count_dto import ListRuleHitCountDto from huaweicloudsdkcfw.v1.model.order_rule_acl_dto import OrderRuleAclDto from huaweicloudsdkcfw.v1.model.rule_address_dto import RuleAddressDto from huaweicloudsdkcfw.v1.model.rule_service_dto import RuleServiceDto from huaweicloudsdkcfw.v1.model.update_rule_acl_dto import UpdateRuleAclDto from huaweicloudsdkcfw.v1.region.cfw_region import CfwRegion from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkcore.exceptions.exceptions import ConnectionException from huaweicloudsdkcore.exceptions.exceptions import RequestTimeoutException from huaweicloudsdkcore.exceptions.exceptions import ServiceResponseException

4.2 初始化认证信息

auth = BasicCredentials(ak=ak,sk=sk)

4.3 初始化防火墙客户端

client = CfwClient.new_builder() \ .with_credentials(credentials=auth) \ .with_region(region=CfwRegion.value_of(region_id="<REGION ID>")) \ .build()

4.4 创建acl规则并使用

此节4.4.1-4.4.8示范了在console界面上如何操作,4.4.9示范了代码如何实现上述操作。

4.4.1 通过查询防护eip列表查询到一条防护eip的地址

acl-1

4.4.2 添加一条acl规则,方向为外到内、名称为ceshi、源地址为0.0.0.0/0、目的地址类型为ip地址、目的地址为防护eip、服务类型为服务、协议类型为TCP、源端口为0-65535、目的端口为0-65535、不支持长连接、动作为阻断、启用状态为打开

acl-2

4.4.3 通过acl列表获取规则id

acl-3

4.4.4 查询访问控制日志,获得阻断的访问控制日志

acl-3

4.4.5 查询规则id访问次数,获得访问规则规则击中次数

acl-3

4.4.6 设置规则为置顶

acl-4

4.4.7 更新acl规则为一个非防护eip的值,其余不变

acl-5

4.4.8 删除acl规则

acl-6

4.4.9 示例代码

@staticmethod def main(args): # 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全; # 本示例以ak和sk保存在环境变量中来实现身份验证为例,运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK。 ak = os.environ["HUAWEICLOUD_SDK_AK"] sk = os.environ["HUAWEICLOUD_SDK_SK"] auth = BasicCredentials( ak=ak, sk=sk ) client = CfwClient.new_builder() \ .with_credentials(credentials=auth) \ .with_region(region=CfwRegion.value_of(region_id="<REGION ID>")) \ .build() try: # 4.4.1 通过查询防护eip列表查询到一条防护eip的地址 public_e_ip = AclRule.__query_eip(client) # 4.4.2 添加一条acl规则,方向为外到内、名称为ceshi、源地址为0.0.0.0/0、目的地址类型为ip地址、目的地址为防护eip、服务类型为服务、协议类型为TCP、源端口为0-65535、目的端口为0-65535、不支持长连接、动作为阻断、启用状态为打开 _id = AclRule.__add_acl(client, public_e_ip) # 4.4.3 通过acl列表获取规则id AclRule.__query_rule_id(client) # 4.4.4 查询访问控制日志,获得阻断的访问控制日志 AclRule.__query_access_log(client, public_e_ip) # 4.4.5 查询acl规则的击中次数 AclRule.__query_rule_hit_count(client, _id) # 4.4.6 将acl规则置顶 AclRule.__order_rule(client, _id) # 4.4.7 更新acl规则为一个非防护eip的值,其余不变 AclRule.__update_acl(client, _id) # 4.4.8 删除acl规则 AclRule.__delete_acl(client, _id) except ConnectionException as e: print(e.err_message) except RequestTimeoutException as e: print(e.err_message) except ServiceResponseException as e: print(e.status_code) print(e.error_code) print(e.error_msg) @staticmethod def __order_rule(client, rule_id): update_acl_rule_order_request = UpdateAclRuleOrderRequest() order_rule_acl_dto = OrderRuleAclDto() order_rule_acl_dto.top = 1 update_acl_rule_order_request.acl_rule_id = rule_id update_acl_rule_order_request.body = order_rule_acl_dto client.update_acl_rule_order(update_acl_rule_order_request) @staticmethod def __query_rule_hit_count(client, rule_id): list_acl_rule_hit_count_request = ListAclRuleHitCountRequest() list_rule_hit_count_dto = ListRuleHitCountDto() rule_ids = [] rule_ids.append(rule_id) list_rule_hit_count_dto.rule_ids = rule_ids list_acl_rule_hit_count_request.body=list_rule_hit_count_dto list_acl_rule_hit_count_response = client.list_acl_rule_hit_count(list_acl_rule_hit_count_request) print(list_acl_rule_hit_count_response) @staticmethod def __delete_acl(client, _id): delete_acl_rule_request = DeleteAclRuleRequest() delete_acl_rule_request.acl_rule_id = _id delete_acl_rule_response = client.delete_acl_rule(delete_acl_rule_request) print(delete_acl_rule_response) @staticmethod def __update_acl(client, _id): update_acl_rule_request = UpdateAclRuleRequest() update_acl_rule_request.acl_rule_id = _id update_rule_acl_dto = UpdateRuleAclDto() update_rule_acl_dto.action_type = 1 update_rule_acl_dto.address_type = 0 update_rule_acl_dto.description = "" new_destination = RuleAddressDto() new_destination.address = "1.1.1.1" new_destination.type = 0 update_rule_acl_dto.destination = new_destination update_rule_acl_dto.direction = 0 update_rule_acl_dto.long_connect_enable = 0 update_rule_acl_dto.name = "ceshiAcl" rule_service_dto = RuleServiceDto() rule_service_dto.dest_port = "0-65535" rule_service_dto.source_port = "0-65535" rule_service_dto.protocol = 6 rule_service_dto.type = 0 update_rule_acl_dto.service = rule_service_dto source = RuleAddressDto() source.address = "0.0.0.0/0" source.type = 0 update_rule_acl_dto.source = source update_rule_acl_dto.status = 1 update_rule_acl_dto.type = 0 update_acl_rule_request.body = update_rule_acl_dto update_acl_rule_response = client.update_acl_rule(update_acl_rule_request) print(update_acl_rule_response) @staticmethod def __query_access_log(client, public_e_ip): list_access_control_logs_request = ListAccessControlLogsRequest() list_access_control_logs_request.dst_ip = public_e_ip list_access_control_logs_request.fw_instance_id = "<YOUR FirewallInstanceId>" list_access_control_logs_request.start_time = 1670427589817 list_access_control_logs_request.end_time = 1670431189817 list_access_control_logs_request.limit = 10 list_access_control_logs_response = client.list_access_control_logs(list_access_control_logs_request) print(list_access_control_logs_response) @staticmethod def __query_rule_id(client): list_acl_rule_request = ListAclRulesRequest() list_acl_rule_request.object_id = "<YOUR ObjectId>" list_acl_rule_request.limit = 10 list_acl_rule_request.offset = 0 list_acl_rule_response = client.list_acl_rules(list_acl_rule_request) rule_id = list_acl_rule_response.data.records[0].rule_id print(rule_id) return rule_id @staticmethod def __add_acl(client, public_e_ip): add_acl_rule_request = AddAclRuleRequest() add_rule_acl_dto = AddRuleAclDto() add_rule_acl_dto.object_id = "<YOUR ObjectId>" add_rule_acl_dto_rules_list = [] add_rule_acl_dto_rules = AddRuleAclDtoRules() add_rule_acl_dto_rules.action_type = 1 add_rule_acl_dto_rules.address_type = 0 add_rule_acl_dto_rules.description = "" destination = RuleAddressDto() destination.address = public_e_ip destination.type = 0 add_rule_acl_dto_rules.destination = destination add_rule_acl_dto_rules.direction = 0 add_rule_acl_dto_rules.long_connect_enable = 0 add_rule_acl_dto_rules.name = "ceshiAcl" order_rule_acl_dto = OrderRuleAclDto() order_rule_acl_dto.top = 1 add_rule_acl_dto_rules.sequence = order_rule_acl_dto rule_service_dto = RuleServiceDto() rule_service_dto.dest_port = "0-65535" rule_service_dto.source_port = "0-65535" rule_service_dto.protocol = 6 rule_service_dto.type = 0 add_rule_acl_dto_rules.service = rule_service_dto source = RuleAddressDto() source.address = "0.0.0.0/0" source.type = 0 add_rule_acl_dto_rules.source = source add_rule_acl_dto_rules.status = 1 add_rule_acl_dto_rules_list.append(add_rule_acl_dto_rules) add_rule_acl_dto.rules = add_rule_acl_dto_rules_list add_rule_acl_dto.type = 0 add_acl_rule_request.body = add_rule_acl_dto add_rule_acl_using_post_response = client.add_acl_rule(add_acl_rule_request) _id = add_rule_acl_using_post_response.data.rules[0].id print(_id) return _id @staticmethod def __query_eip(client): list_eips_request = ListEipsRequest() list_eips_request.object_id = "<YOUR ObjectId>" list_eips_request.limit = 10 list_eips_request.offset = 0 list_eips_request.sync = 1 list_eip_resources_response = client.list_eips(list_eips_request) eip_resource = list_eip_resources_response.data.records[0] public_e_ip = eip_resource.public_ip print(public_e_ip) return public_e_ip if __name__ == "__main__": AclRule().main(any)

5.FAQ

5.1 ObjectId是什么,如何获取

ObjectId是创建云防火墙后用于区分互联网边界防护和VPC边界防护的标志id,可通过调用API Explorer 查询防火墙实例 获取防护对象id(ObjectId),注意type为0的为互联网边界防护,type为1的为VPC边界防护。 list-firewallinstance-2

5.2 FirewallInstanceId是什么,如何获取

FirewallInstanceId是创建云防火墙后用于标志防火墙由系统自动生成的标志id,可通过调用API Explorer 查询防火墙实例 获取防火墙id(FirewallInstanceId) list-firewallinstance-1

6.参考

更多信息请参考API Explorer

7.修订记录

发布日期 文档版本 修订说明
2023-07-27 1.1 文档首次发布
2023-11-07 1.2 更新sdk版本至3.1.64