Back
Featured image of post AWS Step Functionsを利用するでの個人的ヒント

AWS Step Functionsを利用するでの個人的ヒント

AWSのサーバーレス開発の強い味方Step Functionsのまとめ

Step Functionsとは?

分散アプリケーション、マイクロサービスのコンポーネント疎結合化を可能にするAWSマネージドサービスのひとつです。各コンポーネントを独立させることが可能のため、アプリケーションのスケールや変更が比較的容易になります。

もっとも簡単なイメージは複数のLambda関数を連続して呼び出していく機構を手軽に作れるものですが、Lambda以外にもSNSやDynamoDB、SNSなど複数のAWSサービスと連携することが可能です。(Step Functions でサポートされる AWS サービス統合

ワークフローやステップに関して

Step FunctionsではワークフローをStatesで定義します。個別のStateでは、入力に基づいて決定を行い、アクションを実行して、出力を他の状態に渡すことが可能です。

AWS Step Functionsでは、ワークフローをAmazon States Language(JSON形式)で定義し、Step Functionsのコンソールにてグラフィカルに表示されます。

States内では以下に示すようなさまざまな関数を実行できます。

  • ステートマシンで何らかの作業をする (Task 状態)。
  • 実行の選択肢間で選択する (Choice 状態)
  • 失敗または成功で実行を停止する (Fail または Succeed 状態)
  • 入力を単純に出力に渡す、または一部の固定データを出力する (Pass 状態)
  • 一定時間または指定された時刻/日付まで遅延を提供する (Wait 状態)
  • 実行の並列ブランチを開始する (Parallel 状態)
  • ステップを動的に反復する (Map 状態)

ステートマシンの構造

要素 内容
Comment ASL(Amazon States Language)についてのコメント
StartAt 最初に実行するStateを定義
TimeoutSeconds タイムアウト時間の指定
Version ASLのバージョン
States ステートマシンを構成するStateを定義する。ワークフローを定義

Stateタイプ

要素 内容
Task ひとつの処理単位
Wait 処理をストップする時間
Pass 入力値をそのまま出力
Parallel 並列処理
Choice 条件分岐
Map 配列の要素ごとに動的並列処理を実行する
Fail 実行結果を失敗とする
Succeed 実行結果を成功とする

Task内で定義可能な要素

フィールド 内容
Name Tags
Resouce URI/呼び出すサービスのARN
InputPath 値をStateに渡す
Parameters JSON形式でStateに値を渡す
ResultPath Stateの実行結果をどのようなフィールド名で受け取るかを指定する
OutputPath 次のStateに渡す値を指定する(未指定の場合、ResultPathの結果が採用される)
Retry Stateでランタイムエラーが発生した場合の再試行ポリシー
Catch 再試行ポリシーが使い果たされたか定義されていない場合に実行される処理
TimeoutSeconds 処理をタイムアウトさせる時間
HeartbeatSeconds ハートビートの間隔が指定した時間を超えた場合に失敗させる

State MachineのサンプルテンプレートがAWSから多数提供されているため、実現したい処理に近いものを参考にできます。
テンプレートからもわかるように、かなり多くのAWSサービスと連携でき、柔軟なワークフローが作成できます。

sample_template
sample_template

Serverless FrameworkによるStep Functionsアプリのデプロイ

本記事ではStep FunctionsのStates作成をServerless Frameworkにて実施します。
ここではServerless Frameworkの詳細に関しては触れません。詳細に関してはServerless Framework公式をご覧ください。

Serverless Frameworkの導入

$ sls create --template aws-python3 --path sls_sample_project
Serverless: Generating boilerplate...
Serverless: Generating boilerplate in "/mnt/c/Users/Yuta/git/sls_sample_project"
 _______                             __
|   _   .-----.----.--.--.-----.----|  .-----.-----.-----.
|   |___|  -__|   _|  |  |  -__|   _|  |  -__|__ --|__ --|
|____   |_____|__|  \___/|_____|__| |__|_____|_____|_____|
|   |   |             The Serverless Application Framework
|       |                           serverless.com, v2.41.2
 -------'

Serverless: Successfully generated boilerplate for template: "aws-python3"

今回はStep Functionsを使用したいため、Step Functions Pluginをインストールします。

$ npm install --save-dev serverless-step-functions

Step Functionsの定義

Step Functionsの定義を行う場合、Serverless FrameworkではYAMLで記述する必要があります。はじめて作成したい場合などで、すべてをコードのみで書いていくのは非常に難しいと思います。
そこで、AWS StepFunctionsコンソールにてGUI操作でステップを作成し、CloudFormationのデザイナーを使用してJSONをYAML形式に変換するのがオススメです。

今回は以下の構成にしました。 CreateListにて処理を実施したいUser IDリストを作成し、MAP処理に渡します。MAPではインプットのリストを自動認識し、配列に合わせた並列を動的に行います。 MAP処理ではUser IDをもとにDynamoDBからGetItemにてデータを取得します。そしてLambda Processにて取得したデータを処理し、単純に出力します。その際、DynamoDBに指定したName IDがなかった場合、Lambdaの返り値をNullにし、次のJudgeSucceedInMapで判定を行います。

.
├── serverless.yml
└── src
    ├── create_list.py
    └── process.py
  • create_list.py
def lambda_handler(event, context):
    
    process_list = [
        {
            'name_id': '1'
        },
        {
            'name_id': '2'        
        },
        {
            'name_id': '3'
            
        }
    ]
    
    return process_list
  • process.py
def lambda_handler(event, context):

    if 'Item' in event['DynamoDB']:
        item = event['DynamoDB']['Item']

    else:
        item = None

    return item
  • serverless.yml
service: sls-sample-project

frameworkVersion: '2'

provider:
  name: aws
  runtime: python3.8
  stage: dev
  region: us-east-1
  lambdaHashingVersion: 20201221
  memorySize: 128
  timeout: 10
  logRetentionInDays: 7

plugins:
  - serverless-step-functions


package:
  patterns:
    - '!./**/**'
    - '!./**'
  individually: true


functions:
  CreateList:
    handler: src/create_list.lambda_handler
    package:
      patterns:
        - src/**
  MainProcess:
    handler: src/process.lambda_handler
    package:
      patterns:
        - src/**


stepFunctions:
  stateMachines:
    SampleState:
      role:
        Fn::GetAtt: ["StateMachineRole", "Arn"]
      definition:
        Comment: >-
                    Sample StepFunctions
        StartAt: StartProcess?
        States:
          StartProcess?:
            Type: Choice
            Choices:
              - Variable: $
                StringEquals: No messages
                Next: Finish
            Default: Create List

          Create List:
            Type: Task
            Resource: 
              Fn::GetAtt: [CreateList, Arn]
            InputPath: $
            ResultPath: $
            Next: MAP Process
          
          MAP Process:
            Type: Map
            Next: Finish
            Iterator:
              StartAt: Get Item from DynamoDB
              States:
                Get Item from DynamoDB:
                  Type: Task
                  Resource: arn:aws:states:::dynamodb:getItem
                  ResultPath: $.DynamoDB
                  Parameters:
                    TableName: sample-dynamodb
                    Key:
                      NameID:
                        S.$: $.name_id
                  Next: Lambda Process
                Lambda Process:
                  Type: Task
                  Resource: 
                    Fn::GetAtt: [MainProcess, Arn]
                  InputPath: $
                  ResultPath: $.result
                  Next: JudgeSucceedInMap
                JudgeSucceedInMap:
                  Type: Choice
                  Choices:
                    - Variable: $.result
                      IsNull: true
                      Next: NoItems
                  Default: SuccessInMap
                SuccessInMap:
                  Type: Pass
                  Result:
                    is_succeed: true
                  ResultPath: $.is_succeed
                  End: true
                NoItems:
                  Type: Pass
                  Result:
                    is_succeed: False
                  ResultPath: $.is_succeed
                  End: true

          Finish:
            Type: Succeed

resources:
  Resources:
    # DynamoDBの構築
    DynamoDbTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        AttributeDefinitions:
          -
            AttributeName: NameID
            AttributeType: S

        # プロビジョニングするキャパシティーユニットの設定
        KeySchema:
          -
            AttributeName: NameID
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        # テーブル名の指定
        TableName: sample-dynamodb
    StateMachineRole:
      Type: AWS::IAM::Role
      Properties:
        RoleName: RoleName
        Path: /path_of_state_machine_roles/
        AssumeRolePolicyDocument:
          Statement:
          - Effect: Allow
            Principal:
              Service:
                - states.amazonaws.com
            Action:
              - sts:AssumeRole
        ManagedPolicyArns:
          - arn:aws:iam::aws:policy/PowerUserAccess # テストなので権限をかなり緩めにする

なお、コードはGitHubにもアップしました。

実行

DynamoDBに適当な項目を追加しておきます。

Stateマシーンを開き、実行を行います。

しっかり結果が得られていることが確認できました。

MAP利用時の注意

MAP内でのエラー発生

MAP内でエラーが発生したしまった場合、Stateマシーン全体が停止してしまいます。
そこで、MAP内にエラーをキャッチした場合に受けるType: PassのStateを用意することで回避できます。

本記事の例でもこの方法を採用しました。

並列数の注意

MAPはインプットから動的に並列数を決定し、実行します。
MAP内でAthenaなどを呼び出す際、API実行の上限があるため何かしらの対策が必要です。

一番良いのはSQSを利用する手だと思いますが、MAPに用意されているMaxConcurrencyRetryを使う方法もあります。

  • MaxConcurrency: 反復子の呼び出しを並列実行できる上限数を指定
  • Retry: 状態でランタイムエラーが発生した場合の再試行ポリシーを定義

まとめ

Step Functionsの基本的な利用方法と参考になるサイトをまとめました。
今回、Step Functionsに関して詳しくドキュメントを見ましたが、AWSのさまざまなサービスを呼び出すことができ、 これらを簡単にワークフロー化できるため、Serverlessなワークフローを作成する際にとても有用なサービスだと再認識しました。
Step Functionsを利用する機会は今後も増えると思いますので、新たな知見を今後もまとめていきます。

ここでの開発はServerless Frameworkを利用しましたが、AWSのサポートツールも充実してきているようですので、これらも触れてみたいと思います。

参考

comments powered by Disqus
yuu999
Built with Hugo
Theme Stack designed by Jimmy