介绍 Amazon MWAA 对 Airflow REST API 和 Web 服务器自动扩展的支持
  • 8

亚马逊 MWAA 支持 Airflow REST API 和 Web 服务器自动扩展

作者:Mansi Bhutada、Kamen Sharlandjiev 和 Kartikay Khator,发表于 2024 年 5 月 16 日 亚马逊托管工作流 for Apache Airflow (Amazon MWAA) 公告 常用链接 评论区

关键要点

新特性介绍: 亚马逊 MWAA 现在支持 Airflow REST API 和 Web 服务器自动扩展,简化资源管理和数据管道操作。REST API 支持: 允许用户通过编程方式管理工作流,支持对 DAG、DAGRuns 等的操作。Web 服务器自动扩展: 根据 CPU 使用率和活跃连接数自动调整 Web 服务器数量,有效应对高流量负载。

在本篇文章中,我们很高兴地推出两项新功能,旨在解决客户的常见挑战,并为利用亚马逊 MWAA 构建强大、可扩展和灵活的数据编排解决方案打开新机遇。首先,Airflow REST API 支持使用户能够通过编程方式与 Airflow 资源进行交互,如连接、DAG、DAGRuns 和任务实例。其次,水平扩展 Web 服务器容量的选项帮助您应对增加的需求,无论是来自 REST API 请求、命令行接口CLI使用,还是更多的并发 Airflow UI 用户。这两个功能适用于所有当前支持的亚马逊 MWAA 版本,包括 243 及更新版本。

Airflow REST API 支持

亚马逊 MWAA 用户经常请求的功能之一,是能够通过 Airflow APIs 编程方式与他们的工作流进行交互。亚马逊 MWAA 引入的 REST API 支持满足了这个需求,提供了一种标准化的方法来访问和管理您的 Airflow 环境。通过新的 REST API,您现在可以调用 DAG 运行,管理数据集,或获取 Airflow 元数据数据库、触发器和调度程序的状态这一切都无需依赖 Airflow Web UI 或 CLI。

另一个典型应用是构建监控仪表板,以聚合多个亚马逊 MWAA 环境中 DAG 的状态,或响应来自外部系统的事件,例如已完成的数据库作业或新用户注册。

此功能为将您的亚马逊 MWAA 环境与其他系统集成以及构建利用数据编排管道的自定义解决方案打开了无限可能。

接下来,我们将展示如何使用 REST API 来调用新的 DAG 运行。

使用 Airflow REST API 进行身份验证

用户要与 REST API 进行身份验证,需要拥有创建 Web 登录令牌的必要权限,方式与 Airflow UI 相似。请参考 创建 Apache Airflow Web 登录令牌 获取更多详细信息。用户的 AWS 身份和访问管理 (IAM) 角色或策略中必须包括 CreateWebLoginToken 权限,以生成用于身份验证的令牌。此外,用户对 REST API 的交互权限由在亚马逊 MWAA 中分配给他们的 Airflow 角色决定。Airflow 角色控制用户通过 REST API 端点执行各种操作的权限,如调用 DAG 运行、检查状态或修改配置。

以下是身份验证过程的示例:

pythondef getsessioninfo(region envname) 获取 MWAA 环境的 Web 服务器主机名和会话 cookie。

参数:    region (str) MWAA 环境所在的 AWS 区域。    envname (str) MWAA 环境名称。返回:    tuple 包含 Web 服务器主机名和会话 cookie 的元组,失败时返回 (None None)。loggingbasicConfig(level=loggingINFO)try    # 初始化 MWAA 客户端并请求 Web 登录令牌    mwaa = boto3client(mwaa regionname=region)    response = mwaacreateweblogintoken(Name=envname)    # 提取 Web 服务器主机名和登录令牌    webserverhostname = response[WebServerHostname]    webtoken = response[WebToken]    # 构造用于身份验证的 URL     loginurl = fhttps//{webserverhostname}/awsmwaa/login    loginpayload = {token webtoken}    # 使用登录负载向 MWAA 登录 URL 发送 POST 请求    response = requestspost(        loginurl        data=loginpayload        timeout=10    )    # 检查登录是否成功     if responsestatuscode == 200        # 返回主机名和会话 cookie         return (            webserverhostname            responsecookies[session]        )    else        # 记录错误        loggingerror(登录失败:HTTP d responsestatuscode)        return Noneexcept requestsRequestException as e    # 记录请求期间引发的任何异常    loggingerror(请求失败:s str(e))    return Noneexcept Exception as e    # 记录任何其他意外异常    loggingerror(发生意外错误:s str(e))    return None

getsessioninfo 函数使用 AWS SDK for Python (Boto3) 和 Python 请求库完成身份验证的初始步骤,获取有效期为 12 小时的 Web 令牌和会话 cookie,后者将用于后续的 REST API 请求。

调用 Airflow REST API 端点

身份验证完成后,您将拥有发送请求到 API 端点的凭证。在以下示例中,我们使用端点 /dags/{dagid}/dagRuns 来启动一个 DAG 运行:

pythondef triggerdag(region envname dagname) 在指定的 MWAA 环境中使用 Airflow REST API 触发 DAG。

参数:region (str) 托管 MWAA 环境的 AWS 区域。envname (str) MWAA 环境名称。dagname (str) 要触发的 DAG 名称。logginginfo(f尝试在 {envname} 环境中触发 DAG {dagname},区域:{region})# 获取用于身份验证的 Web 服务器主机名和会话 cookietry    webserverhostname sessioncookie = getsessioninfo(region envname)    if not sessioncookie        loggingerror(身份验证失败,未检索到会话 cookie。)        returnexcept Exception as e    loggingerror(f获取会话信息时出错:{str(e)})    return# 准备请求的头信息和负载cookies = {session sessioncookie}jsonbody = {conf {}}# 构造触发 DAG 的 URLurl = fhttps//{webserverhostname}/api/v1/dags/{dagname}/dagRuns# 发送 POST 请求以触发 DAGtry    response = requestspost(url cookies=cookies json=jsonbody)    # 检查响应状态码以确定 DAG 是否成功触发    if responsestatuscode == 200        logginginfo(DAG 成功触发。)    else        loggingerror(f触发 DAG 失败:HTTP {responsestatuscode}  {responsetext})except requestsRequestException as e    loggingerror(f触发 DAG 的请求失败:{str(e)})

以下是 triggerdagpy 的完整代码示例:

pythonimport sysimport boto3import requestsimport logging

def getsessioninfo(region envname) 获取 MWAA 环境的 Web 服务器主机名和会话 cookie。

def triggerdag(region envname dagname) 在指定的 MWAA 环境中使用 Airflow REST API 触发 DAG。

if name == main loggingbasicConfig(level=loggingINFO)

介绍 Amazon MWAA 对 Airflow REST API 和 Web 服务器自动扩展的支持

# 检查是否提供了正确数量的参数if len(sysargv) != 4    loggingerror(使用不当。正确格式:python scriptnamepy ltregiongt ltenvnamegt ltdagnamegt)    sysexit(1)region = sysargv[1]envname = sysargv[2]dagname = sysargv[3]# 使用提供的参数触发 DAGtriggerdag(region envname dagname)

运行请求脚本

运行请求脚本,示例如下,提供 AWS 区域、亚马逊 MWAA 环境名称和 DAG 名称:

bashpython3 triggerdagpy ltregiongt ltenvnamegt ltdagnamegt

验证 API 结果

以下截图展示了命令行界面的结果。

在 Airflow UI 中检查 DAG 运行

以下截图展示了 Airflow UI 中的 DAG 运行状态。

您可以使用 REST API 中的任何其他端点,以实现对 Airflow 工作流和资源的编程控制、自动化、集成和管理。要了解有关 Airflow REST API 及其各种端点的更多信息,请参考 Airflow 文档。

Web 服务器自动扩展

亚马逊 MWAA 用户的另一个主要请求是能够动态扩展他们的 Web 服务器,以应对波动的工作负载。此前,您受到亚马逊 MWAA 上的 Airflow 环境提供的两个 Web 服务器的限制,并无法水平扩展 Web 服务器容量,这可能导致高负载期间的性能问题。亚马逊 MWAA 中新的 Web 服务器自动扩展功能解决了这个问题。通过根据 CPU 使用率和活跃连接数自动调整 Web 服务器数量,亚马逊 MWAA 确保您的 Airflow 环境可以无缝容纳增加的需求,无论是来自 REST API 请求、CLI 使用,还是更多的并发 Airflow UI 用户。

快橙加速器官方版

设置 Web 服务器自动扩展

要为您的亚马逊 MWAA 环境 Web 服务器设置自动扩展,请按照以下步骤操作:

在亚马逊 MWAA 控制台,导航到您想要配置自动扩展的环境。选择 编辑。选择 下一步。在 配置高级设置 页面中,在 环境类 部分,添加最大和最小 Web 服务器数量。在此示例中,我们将上限设置为 5,下限设置为 2。

这些设置允许亚马逊 MWAA 在需求增加时自动扩展 Airflow Web 服务器,并在需求减少时保守地缩减规模,从而优化资源使用和成本。

以编程方式触发自动扩展

配置了自动扩展后,您可能想测试它在模拟条件下的表现。使用之前讨论的调用 DAG 的 Python 代码结构,您还可以使用 Airflow REST API 模拟负载测试,观察您的自动扩展设置的响应能力。为了进行负载测试,我们将亚马逊 MWAA 环境配置为 mw1small 实例类。以下是使用 loadtestpy 的示例实现:

pythonimport sysimport timeimport boto3import requestsimport loggingimport concurrentfutures

def getsessioninfo(region envname) 获取 MWAA 环境的 Web 服务器主机名和会话 cookie。

def callrestapi(webserverhostname sessioncookie) 调用 Airflow Web 服务器 API 获取所有 DAG 的详细信息,并测量调用耗时。

def runloadtest(webserverhostname sessioncookie qps duration) 通过在指定时间发送并发请求来执行负载测试。

def main(region envname qps duration) 执行负载测试脚本的主函数。

if name == main loggingbasicConfig(level=loggingINFO) if len(sysargv) != 5 loggingerror(使用不当。正确格式:python loadtestpy ) sysexit(1)

region = sysargv[1]envname = sysargv[2]qps = int(sysargv[3])duration = int(sysargv[4])main(region envname qps duration)

这段 Python 代码利用线程池和并发概念,帮助测试 Web 服务器的自动扩展性能,模拟流量。此脚本会自动化发送每秒特定数量的请求到您的 Web 服务器,从而触发自动扩展事件。

您可以使用以下命令运行脚本。您需要提供区域、亚马逊 MWAA 环境名称、每秒要对 Web 服务器运行的查询次数和负载测试持续的时长。

bashpython loadtestpy ltregiongt ltenvnamegt ltqpsgt ltdurationgt

例如:

bashpython loadtestpy uswest2 MyMWAAEnvironment 10 1080

前述命令将每秒运行 10 个查询,持续 18 分钟。

当脚本运行时,您将开始看到显示 Web 服务器处理请求所花费的时间以秒为单位的行数。

随着时间的推移,响应时间会逐步增加。随着活跃连接数或 CPU 使用率的增加,亚马逊 MWAA 会动态调整 Web 服务器的数量以应对负载。

随着新的 Web 服务器上线,您的环境将能够处理增加的负载,并且响应时间将降低。亚马逊 MWAA 在 Amazon CloudWatch 中提供 Web 服务器容器度量,允许您监控 Web 服务器的性能。以下截图显示了自动扩展事件的示例。

推荐

确定适当的最小和最大 Web 服务器数量需要仔细考虑您的典型工作负载模式、性能要求和成本限制。设置这些值时,请考虑高峰时所需的 REST API 吞吐量和您预期的最大并发 UI 用户数。值得注意的是,亚马逊 MWAA 在全规模下任何环境大小都能支持每秒最多 10 个查询QPS,前提是您遵循推荐的 DAG 数量。

亚马逊 MWAA 与 CloudWatch 的集成提供了详细的度量和监视能力,帮助您找到适合特定用例的最佳配置。如果您预见到持续高需求或长时间增加的工作负载,可以配置您的亚马逊 MWAA 环境,以维持更高的最小 Web 服务器数量。将最小 Web 服务器设置为 2 或以上,可以确保您的环境始终有足够的能力处理负载峰值,而不必等待自动扩展来配置额外资源。这会增加运行更多 Web 服务器实例的成本,这是成本优化和响应性之间的权衡。

结论

今天,我们宣布亚马逊 MWAA 中可用的 Airflow REST API 和 Web 服务器自动扩展功能。REST API 提供了一种标准化的方法来以编程方式与您的亚马逊 MWAA 环境中的资源进行交互和管理。这使得企业内 Amazon MWAA 与现