Step Functionsとは?
分散アプリケーション、マイクロサービスのコンポーネント疎結合化を可能にするAWSマネージドサービスのひとつです。各コンポーネントを独立させることが可能のため、アプリケーションのスケールや変更が比較的容易になります。
もっとも簡単なイメージは複数のLambda関数を連続して呼び出していく機構を手軽に作れるものですが、Lambda以外にもSNSやDynamoDB、SNSなど複数のAWSサービスと連携することが可能です。(Step Functions でサポートされる AWS サービス統合)
ワークフローやステップに関して
Step FunctionsではワークフローをStatesで定義します。個別のStateでは、入力に基づいて決定を行い、アクションを実行して、出力を他の状態に渡すことが可能です。
AWS Step Functionsでは、ワークフローをAmazon States Language(JSON形式)で定義し、Step Functionsのコンソールにてグラフィカルに表示されます。
States内では以下に示すようなさまざまな関数を実行できます。
- ステートマシンで何らかの作業をする (Task 状態)。
- 実行の選択肢間で選択する (Choice 状態)
- 失敗または成功で実行を停止する (Fail または Succeed 状態)
- 入力を単純に出力に渡す、または一部の固定データを出力する (Pass 状態)
- 一定時間または指定された時刻/日付まで遅延を提供する (Wait 状態)
- 実行の並列ブランチを開始する (Parallel 状態)
- ステップを動的に反復する (Map 状態)
ステートマシンの構造
要素 | 内容 |
---|---|
Comment | ASL(Amazon States Language)についてのコメント |
StartAt | 最初に実行するStateを定義 |
TimeoutSeconds | タイムアウト時間の指定 |
Version | ASLのバージョン |
States | ステートマシンを構成するStateを定義する。ワークフローを定義 |
Stateタイプ
要素 | 内容 |
---|---|
Task | ひとつの処理単位 |
Wait | 処理をストップする時間 |
Pass | 入力値をそのまま出力 |
Parallel | 並列処理 |
Choice | 条件分岐 |
Map | 配列の要素ごとに動的並列処理を実行する |
Fail | 実行結果を失敗とする |
Succeed | 実行結果を成功とする |
Task内で定義可能な要素
フィールド | 内容 |
---|---|
Name | Tags |
Resouce | URI/呼び出すサービスのARN |
InputPath | 値をStateに渡す |
Parameters | JSON形式でStateに値を渡す |
ResultPath | Stateの実行結果をどのようなフィールド名で受け取るかを指定する |
OutputPath | 次のStateに渡す値を指定する(未指定の場合、ResultPathの結果が採用される) |
Retry | Stateでランタイムエラーが発生した場合の再試行ポリシー |
Catch | 再試行ポリシーが使い果たされたか定義されていない場合に実行される処理 |
TimeoutSeconds | 処理をタイムアウトさせる時間 |
HeartbeatSeconds | ハートビートの間隔が指定した時間を超えた場合に失敗させる |
State MachineのサンプルテンプレートがAWSから多数提供されているため、実現したい処理に近いものを参考にできます。
テンプレートからもわかるように、かなり多くのAWSサービスと連携でき、柔軟なワークフローが作成できます。
Serverless FrameworkによるStep Functionsアプリのデプロイ
本記事ではStep FunctionsのStates作成をServerless Frameworkにて実施します。
ここではServerless Frameworkの詳細に関しては触れません。詳細に関してはServerless Framework公式をご覧ください。
Serverless Frameworkの導入
$ sls create --template aws-python3 --path sls_sample_project
Serverless: Generating boilerplate...
Serverless: Generating boilerplate in "/mnt/c/Users/Yuta/git/sls_sample_project"
_______ __
| _ .-----.----.--.--.-----.----| .-----.-----.-----.
| |___| -__| _| | | -__| _| | -__|__ --|__ --|
|____ |_____|__| \___/|_____|__| |__|_____|_____|_____|
| | | The Serverless Application Framework
| | serverless.com, v2.41.2
-------'
Serverless: Successfully generated boilerplate for template: "aws-python3"
今回はStep Functionsを使用したいため、Step Functions Pluginをインストールします。
$ npm install --save-dev serverless-step-functions
Step Functionsの定義
Step Functionsの定義を行う場合、Serverless FrameworkではYAMLで記述する必要があります。はじめて作成したい場合などで、すべてをコードのみで書いていくのは非常に難しいと思います。
そこで、AWS StepFunctionsコンソールにてGUI操作でステップを作成し、CloudFormationのデザイナーを使用してJSONをYAML形式に変換するのがオススメです。
今回は以下の構成にしました。
CreateList
にて処理を実施したいUser IDリストを作成し、MAP処理に渡します。MAPではインプットのリストを自動認識し、配列に合わせた並列を動的に行います。
MAP処理ではUser IDをもとにDynamoDBからGetItem
にてデータを取得します。そしてLambda Process
にて取得したデータを処理し、単純に出力します。その際、DynamoDBに指定したName IDがなかった場合、Lambdaの返り値をNullにし、次のJudgeSucceedInMap
で判定を行います。
.
├── serverless.yml
└── src
├── create_list.py
└── process.py
- create_list.py
def lambda_handler(event, context):
process_list = [
{
'name_id': '1'
},
{
'name_id': '2'
},
{
'name_id': '3'
}
]
return process_list
- process.py
def lambda_handler(event, context):
if 'Item' in event['DynamoDB']:
item = event['DynamoDB']['Item']
else:
item = None
return item
- serverless.yml
service: sls-sample-project
frameworkVersion: '2'
provider:
name: aws
runtime: python3.8
stage: dev
region: us-east-1
lambdaHashingVersion: 20201221
memorySize: 128
timeout: 10
logRetentionInDays: 7
plugins:
- serverless-step-functions
package:
patterns:
- '!./**/**'
- '!./**'
individually: true
functions:
CreateList:
handler: src/create_list.lambda_handler
package:
patterns:
- src/**
MainProcess:
handler: src/process.lambda_handler
package:
patterns:
- src/**
stepFunctions:
stateMachines:
SampleState:
role:
Fn::GetAtt: ["StateMachineRole", "Arn"]
definition:
Comment: >-
Sample StepFunctions
StartAt: StartProcess?
States:
StartProcess?:
Type: Choice
Choices:
- Variable: $
StringEquals: No messages
Next: Finish
Default: Create List
Create List:
Type: Task
Resource:
Fn::GetAtt: [CreateList, Arn]
InputPath: $
ResultPath: $
Next: MAP Process
MAP Process:
Type: Map
Next: Finish
Iterator:
StartAt: Get Item from DynamoDB
States:
Get Item from DynamoDB:
Type: Task
Resource: arn:aws:states:::dynamodb:getItem
ResultPath: $.DynamoDB
Parameters:
TableName: sample-dynamodb
Key:
NameID:
S.$: $.name_id
Next: Lambda Process
Lambda Process:
Type: Task
Resource:
Fn::GetAtt: [MainProcess, Arn]
InputPath: $
ResultPath: $.result
Next: JudgeSucceedInMap
JudgeSucceedInMap:
Type: Choice
Choices:
- Variable: $.result
IsNull: true
Next: NoItems
Default: SuccessInMap
SuccessInMap:
Type: Pass
Result:
is_succeed: true
ResultPath: $.is_succeed
End: true
NoItems:
Type: Pass
Result:
is_succeed: False
ResultPath: $.is_succeed
End: true
Finish:
Type: Succeed
resources:
Resources:
# DynamoDBの構築
DynamoDbTable:
Type: 'AWS::DynamoDB::Table'
Properties:
AttributeDefinitions:
-
AttributeName: NameID
AttributeType: S
# プロビジョニングするキャパシティーユニットの設定
KeySchema:
-
AttributeName: NameID
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
# テーブル名の指定
TableName: sample-dynamodb
StateMachineRole:
Type: AWS::IAM::Role
Properties:
RoleName: RoleName
Path: /path_of_state_machine_roles/
AssumeRolePolicyDocument:
Statement:
- Effect: Allow
Principal:
Service:
- states.amazonaws.com
Action:
- sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/PowerUserAccess # テストなので権限をかなり緩めにする
なお、コードはGitHubにもアップしました。
実行
DynamoDBに適当な項目を追加しておきます。
Stateマシーンを開き、実行を行います。
しっかり結果が得られていることが確認できました。
MAP利用時の注意
MAP内でのエラー発生
MAP内でエラーが発生したしまった場合、Stateマシーン全体が停止してしまいます。
そこで、MAP内にエラーをキャッチした場合に受けるType: Pass
のStateを用意することで回避できます。
本記事の例でもこの方法を採用しました。
並列数の注意
MAPはインプットから動的に並列数を決定し、実行します。
MAP内でAthenaなどを呼び出す際、API実行の上限があるため何かしらの対策が必要です。
一番良いのはSQSを利用する手だと思いますが、MAPに用意されているMaxConcurrency
やRetry
を使う方法もあります。
- MaxConcurrency: 反復子の呼び出しを並列実行できる上限数を指定
- Retry: 状態でランタイムエラーが発生した場合の再試行ポリシーを定義
まとめ
Step Functionsの基本的な利用方法と参考になるサイトをまとめました。
今回、Step Functionsに関して詳しくドキュメントを見ましたが、AWSのさまざまなサービスを呼び出すことができ、
これらを簡単にワークフロー化できるため、Serverlessなワークフローを作成する際にとても有用なサービスだと再認識しました。
Step Functionsを利用する機会は今後も増えると思いますので、新たな知見を今後もまとめていきます。
ここでの開発はServerless Frameworkを利用しましたが、AWSのサポートツールも充実してきているようですので、これらも触れてみたいと思います。