1.はじめに
会社用のコミュニケーションツール(DingTalk)に毎朝、昨日のAWS What’s Newで更新された内容を通知する仕組みを導入・構築しました。
かなり昔に作ってましたがブログにしたためていなかったので、思い出しながらハンズオンブログに修正しました。
2.ハンズオン
2.1.前提
2.1.1.実行環境
- PoCの観点よりLambdaで利用するIAMは同じものを利用します
- 本番の際は、各関数毎に適切な権限を割り振りください
- 今回利用するBedrockのモデルを有効化していること
- コミュニケーションツール(DingTalk)の Webhook URL取得済み
環境 | 設定 |
---|---|
環境 | AWS CloudShell |
コミュニケーションツール | DingTalk |
2.1.2.事前準備
ファイル構成
aws-rss-notification/
├── rss-fetch-lambda/ # RSS取得Lambda用
├── summary-send-lambda/ # 要約・送信Lambda用
└── common/ # IAM設定、StepFunctions定義など
フォルダ作成
フォルダ作成コマンド
“`shell
# プロジェクトルート作成
mkdir aws-rss-notification
cd aws-rss-notification
# 各Lambda用フォルダ作成
mkdir rss-fetch-lambda # RSS取得Lambda用
mkdir summary-send-lambda # 要約・送信Lambda用
# 共通ファイル用
mkdir common # IAM設定、StepFunctions定義など
“`
2.1.3.アーキテクチャ
2.1.3.1.構成要素
順番 | リソース | 詳細 |
---|---|---|
1 | EventBridge | 毎朝定時にStepFunctionsを実行 |
2 | StepFunctions | ワークフローを制御(配下のLambdaが実行される) |
3 | RSS取得Lambda | RSSを取得・DynamoDBに保存 |
4 | 要約送信Lambda | Bedrockで要約・DingTalkに通知 |
2.1.3.2.構成図
2.2.1.DynamoDB 作成
2.2.1.1.テーブル内容
項番 | カラム | 詳細 |
---|---|---|
1 | id (文字列) | UUIDで一意とする |
2 | link | ブログのURL |
3 | published | RSSで公開された際のタグ |
4 | timestamp | DynamoDBに保存された時刻 |
5 | title | ブログのタイトル |
6 | ttl (TTL) | 項目が自動で削除されるまでの時間(Unixタイムスタンプ) |
7 | updated | RSSで更新された際のタグ |
2.2.1.2.テーブル作成
DynamoDB作成コマンド
“`shell
# 変数設定
TABLE_NAME="list-tables"
REGION="us-east-1"
# テーブル作成
aws dynamodb create-table \
–table-name $TABLE_NAME \
–attribute-definitions \
AttributeName=id,AttributeType=S \
–key-schema \
AttributeName=id,KeyType=HASH \
–billing-mode PAY_PER_REQUEST \
–region $REGION
# TTL有効化(テーブル作成直後だと、まだテーブルが作成されておらず失敗する場合があります)
aws dynamodb update-time-to-live \
–table-name $TABLE_NAME \
–time-to-live-specification \
Enabled=true,AttributeName=ttl
“`
2.2.2.IAMポリシー 作成
2.2.2.1.利用ポリシー一覧
- 本番環境では各Lambda関数に最小権限の原則でIAMポリシーを個別設定する
項番 | ポリシー | アタッチ対象 | 詳細 |
---|---|---|---|
1 | LambdaBasicExecutionRole | Lambda | ログ出力用(デフォルト) |
2 | AmazonDynamoDBFullAccess | Lambda | DynamoDB読み書き用 |
3 | AmazonBedrockFullAccess | Lambda | Bedrock要約用 |
4 | AWSLambdaRole | StepFunctions | Lambda呼出し権限 |
5 | AWSStepFunctionsFullAccess | EventBridge | StepFunctions呼出し権限 |
2.2.1.2.IAMポリシー作成
IAMポリシー作成コマンド
“`shell
# ディレクトリ移動
cd common
# 変数設定
ROLE_NAME="aws-lambda-role"
REGION="us-east-1"
# 信頼関係ポリシー作成
cat > trust-policy.json << EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com",
"states.amazonaws.com",
"events.amazonaws.com"
]
},
"Action": "sts:AssumeRole"
}
]
}
EOF
# IAMロール作成
aws iam create-role \
–role-name $ROLE_NAME \
–assume-role-policy-document file://trust-policy.json
# 必要なポリシーをアタッチ
aws iam attach-role-policy \
–role-name $ROLE_NAME \
–policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
aws iam attach-role-policy \
–role-name $ROLE_NAME \
–policy-arn arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
aws iam attach-role-policy \
–role-name $ROLE_NAME \
–policy-arn arn:aws:iam::aws:policy/AmazonBedrockFullAccess
aws iam attach-role-policy \
–role-name $ROLE_NAME \
–policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaRole
aws iam attach-role-policy \
–role-name $ROLE_NAME \
–policy-arn arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess
“`
2.2.3.RSS取得Lambda
2.2.3.1.挙動
項番 | ポリシー | 詳細 |
---|---|---|
1 | RSSフィード取得 | ・AWS What’s NewのRSSフィードURLにアクセス ・feedparserライブラリでRSSデータを解析 |
2 | 最近更新されたエントリの抽出 | ・各エントリのupdated日時をチェック ・1日以内(24時間以内)に更新されたもののみを抽出 |
3 | DynamoDB用データ形式に変換 | ・各エントリに一意のUUID(id)を生成 ・TTL(自動削除時刻)を翌日の5:59:00に設定 ・タイムスタンプ(保存時刻)を追加 |
4 | DynamoDBへの重複チェック付き保存 | ・各エントリのURLがDynamoDBに既に存在するかスキャンでチェック ・存在しない場合のみ新規保存 ・保存したエントリの件数とIDリストを記録 |
5 | Step Functions用レスポンス作成 | ・正常終了時:ステータスコード200、保存されたIDリスト、処理結果メッセージを返却 ・エラー時:ステータスコード500、エラー内容を返却 |
6 | ログ出力 | ・処理件数の記録 ・エラー時の詳細ログ |
2.2.3.2.環境変数
キー | 値 | 本ブログでの利用値 |
---|---|---|
DDB_TABLE_NAME | DynamoDBのテーブル名 | list-tables |
2.2.3.3.RSS取得Lambda作成
RSS取得Lambda作成コマンド
##### 2.2.3.3.1.Lambdaレイヤー作成
“`shell
# 共通Layer用ディレクトリ作成
cd ../common
mkdir python
# 変数設定
LAYER_NAME="aws-rss-common-layer"
PYTHON_VERSION="python3.12"
DESCRIPTION="Common libraries for AWS RSS notification system (feedparser, dateutil, requests)"
# 共通Layer用ディレクトリ作成
cd ../common
mkdir python
# 必要なライブラリを1つのLayerに
pip install feedparser python-dateutil requests -t python/
# 統合Layer作成
zip -r ${LAYER_NAME}.zip python/
# Layer公開
aws lambda publish-layer-version \
–layer-name $LAYER_NAME \
–zip-file fileb://${LAYER_NAME}.zip \
–compatible-runtimes $PYTHON_VERSION \
–description "$DESCRIPTION"
“`
##### 2.2.3.3.2.RSS取得Lambda作成
“`python
# 関数作成
cat > lambda-functions.py << EOF
import boto3
import datetime
import feedparser
import dateutil.parser
import os
import uuid
import json
import logging
import time
# ログ設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 指定された日付文字列が1日以内に更新されたかどうかを判定
def recently_updated(date_str):
elapsed_time = datetime.datetime.now() – str2datetime(date_str)
return elapsed_time.days <= 1
# 文字列をdatetimeオブジェクトに変換
def str2datetime(time_str):
return dateutil.parser.parse(time_str, ignoretz=True)
# RSSエントリから最近更新された1日以内のものを抽出し、DynamoDB用の形式に変換
def get_recent_entries(entries):
recent_entries = []
for entry in entries:
if recently_updated(entry['updated']):
recent_entries.append({
'id': str(uuid.uuid4()), # UUIDを使用して一意のIDを生成
'title': entry['title'],
'link': entry['link'],
'published': entry['published'],
'updated': entry['updated'],
'ttl': int((datetime.datetime.now() + datetime.timedelta(days=1)).replace(hour=5, minute=59, second=0).timestamp())
})
return recent_entries
# エントリをDynamoDBに保存
# 重複防止のため同じURLは保存しない
def save_to_dynamodb(entries):
table_name = os.environ['DDB_TABLE_NAME']
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(table_name)
added_count = 0
added_ids = []
for entry in entries:
# DynamoDBにエントリが存在するかURLで確認
response = table.scan(
FilterExpression=boto3.dynamodb.conditions.Attr('link').eq(entry['link'])
)
# 重複していない場合のみ保存
if len(response['Items']) == 0:
# timestampフィールド(保存時刻を記録)
entry['timestamp'] = int(time.time())
# エントリが存在しない場合のみ保存
table.put_item(Item=entry)
added_count += 1
added_ids.append(entry['id']) # 登録されたIDをリストに追加
return added_count, added_ids
# Lambda関数のメインハンドラー
def lambda_handler(event, context):
try:
# RSSのURL
feed_url = 'https://aws.amazon.com/about-aws/whats-new/recent/feed/'
# RSSフィードを取得・解析
feed = feedparser.parse(feed_url)
# 1日以内に更新された内容を抽出
recent_entries = get_recent_entries(feed.entries)
# DynamoDBに保存
added_count, added_ids = save_to_dynamodb(recent_entries)
logger.info(f"RSS feed processed successfully. {added_count} entries added.")
# StepFunctions用で added_idsを返す
return {
'statusCode': 200,
'added_ids': added_ids,
'message': f'RSS feed processed successfully. {added_count} entries added.'
}
except Exception as e:
logger.error(f"Error processing RSS feed: {str(e)}")
return {
'statusCode': 500,
'error': str(e)
}
EOF
“`
##### 2.2.3.3.3.RSS取得Lambdaデプロイ
“`shell
# ここまでのコマンド実行結果とRSS取得Lambdaの変数を代入
ROLE_ARN=$(aws iam get-role –role-name aws-lambda-role –query 'Role.Arn' –output text)
LAYER_ARN=$(aws lambda list-layer-versions –layer-name aws-rss-common-layer –query 'LayerVersions[0].LayerVersionArn' –output text)
DDB_TABLE_NAME="list-tables"
FUNCTION_NAME="get-awsrss"
# 変数確認
echo "Role ARN: $ROLE_ARN"
echo "Layer ARN: $LAYER_ARN"
echo "DDB Table: $DDB_TABLE_NAME"
echo "Function Name: $FUNCTION_NAME"
# zipファイル作成
zip function.zip lambda-functions.py
# Lambda関数作成
aws lambda create-function \
–function-name $FUNCTION_NAME \
–runtime python3.12 \
–role $ROLE_ARN \
–handler lambda-functions.lambda_handler \
–zip-file fileb://function.zip \
–layers $LAYER_ARN \
–environment Variables="{DDB_TABLE_NAME=$DDB_TABLE_NAME}" \
–timeout 300 \
–memory-size 512
“`
2.2.4.要約送信Lambda
2.2.4.1.挙動
項番 | ポリシー | 詳細 |
---|---|---|
1 | イベント受信・ID取得 | ・Step Functionsからevent.idsで保存されたIDリストを受信 ・IDが空の場合は「更新なし」メッセージを送信 |
2 | DynamoDB一括取得 | ・batch_get_itemで複数のエントリを取得 ・取得できなかったアイテムのログ出力 ・エラー時は空リストを返却 |
3 | Bedrock翻訳処理 | ・各エントリのタイトルをLLMで日本語翻訳 ・翻訳失敗時は元のタイトルをフォールバック |
4 | まとめメッセージ作成 | ・メッセージの整形 |
5 | DingTalk送信 | ・JSON形式でWebhook経由送信 ・HTTP例外処理とエラーログ記録 |
6 | レスポンス | ・正常終了時:ステータスコード200、処理件数を返却 ・エラー時:ステータスコード500、詳細エラー内容を返却 |
7 | ログ出力 | ・送信成功/失敗の詳細ログ |
2.2.4.2.環境変数
キー | 値 | 本ブログでの利用値 |
---|---|---|
DDB_TABLE_NAME | DynamoDBのテーブル名 | list-tables |
DINGTALK_WEBHOOK_URL | Ding TalkのWebhook | ※各自取得 |
MODEL_ID | モデルID | anthropic.claude-3-haiku-20240307-v1:0 |
MODEL_REGION | モデルを利用するリージョン | us-east-1 |
2.2.4.3.要約送信Lambda作成
デプロイコマンドの詳細
##### 2.2.4.3.1.要約送信Lambda作成
“`shell
# ディレクトリ移動
cd ../summary-send-lambda/
# 関数作成
cat > lambda-functions.py << EOF
import boto3
import json
import os
import requests
import logging
from datetime import datetime, timedelta
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 環境変数から設定を取得
DDB_TABLE_NAME = os.environ["DDB_TABLE_NAME"]
MODEL_ID = os.environ["MODEL_ID"]
MODEL_REGION = os.environ["MODEL_REGION"]
DINGTALK_WEBHOOK_URL = os.environ["DINGTALK_WEBHOOK_URL"]
dynamo = boto3.resource("dynamodb")
table = dynamo.Table(DDB_TABLE_NAME)
def get_bedrock_client(region=None):
if region is None:
region = os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION"))
session = boto3.Session(region_name=region)
return session.client("bedrock-runtime")
def send_dingtalk_message(message_text):
if not DINGTALK_WEBHOOK_URL:
logger.error("DINGTALK_WEBHOOK_URL is not set")
return False
payload = {
"msgtype": "text",
"text": {
"content": message_text
}
}
headers = {'Content-Type': 'application/json;charset=utf-8'}
try:
response = requests.post(DINGTALK_WEBHOOK_URL, headers=headers, data=json.dumps(payload))
response.raise_for_status()
logger.info("Message sent successfully to DingTalk")
return True
except requests.exceptions.RequestException as e:
logger.error(f"Failed to send message to DingTalk: {e}")
return False
def get_items_from_dynamodb(ids):
"""DynamoDBから複数のアイテムを取得"""
if not ids:
return []
try:
# batch_get_itemで複数件を効率的に取得
request_items = {
DDB_TABLE_NAME: {
'Keys': [{'id': id_val} for id_val in ids]
}
}
response = dynamo.batch_get_item(RequestItems=request_items)
items = response.get('Responses', {}).get(DDB_TABLE_NAME, [])
# 取得できなかったアイテムの処理
unprocessed = response.get('UnprocessedKeys', {})
if unprocessed:
logger.warning(f"Unprocessed keys: {unprocessed}")
return items
except Exception as e:
logger.error(f"Error retrieving items from DynamoDB: {str(e)}")
return []
def translate_title_with_bedrock(title):
"""Bedrockを使ってタイトルを日本語に翻訳"""
bedrock_client = get_bedrock_client(region=MODEL_REGION)
prompt_data = f"""以下の英語のタイトルを、必ず自然な日本語に翻訳してください。
<title>{title}</title>
<instruction>
– AWS技術用語は適切な日本語に翻訳してください
– サービス名(Amazon EC2、AWS Lambda等)はそのまま残してください
– 簡潔で分かりやすい日本語にしてください
– 翻訳結果のみを出力してください
</instruction>"""
body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 200,
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": prompt_data
}
]
}
],
"temperature": 0.1,
"top_p": 0.1,
"top_k": 0,
})
try:
response = bedrock_client.invoke_model(
body=body,
modelId=MODEL_ID,
accept="application/json",
contentType="application/json"
)
response_body = json.loads(response.get("body").read().decode())
translated_title = response_body.get("content")[0]["text"].strip()
return translated_title
except Exception as e:
logger.error(f"Error translating title with Bedrock: {str(e)}")
return title # 翻訳失敗時は元のタイトルを返す
def create_summary_message(items):
"""まとめメッセージを作成"""
if not items:
return "Title: 昨日は何も更新がありませんでした、ゆっくりお休みください。"
# 前日の日付を取得
yesterday = (datetime.now() – timedelta(days=1)).strftime('%Y/%m/%d')
message_parts = []
message_parts.append(f"AWS最新ニュース ({yesterday}) – {len(items)}件の更新")
message_parts.append("") # 空行
for i, item in enumerate(items, 1):
title = item.get('title', 'タイトル不明')
link = item.get('link', '')
# タイトルを日本語に翻訳
japanese_title = translate_title_with_bedrock(title)
# 先輩の形式に合わせる + 番号追加
message_parts.append(f"【{i}】Title: {japanese_title}")
message_parts.append(f"URL: {link}")
message_parts.append("") # 空行
final_message = "\n".join(message_parts)
# デバッグ用ログ
message_bytes = len(final_message.encode('utf-8'))
logger.info(f"Message length: {len(final_message)} characters")
logger.info(f"Message bytes: {message_bytes} bytes")
logger.info(f"Message preview: {final_message[:500]}")
return final_message
def lambda_handler(event, context):
try:
# イベントから ids を取得
ids = event.get('ids', [])
if not ids:
# 更新がない場合
message = "昨日は何も更新がありませんでした、ゆっくりお休みください。"
send_dingtalk_message(message)
return {
'statusCode': 200,
'body': json.dumps("No updates message sent", ensure_ascii=False)
}
# DynamoDBから複数のアイテムを取得
items = get_items_from_dynamodb(ids)
if not items:
return {
'statusCode': 404,
'body': json.dumps('No items found', ensure_ascii=False)
}
# まとめメッセージを作成
summary_message = create_summary_message(items)
# 送信前の最終ログ
logger.info(f"TOTAL MESSAGE LENGTH TO BE SENT: {len(summary_message)}")
# DingTalkに送信
if send_dingtalk_message(summary_message):
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Summary sent successfully',
'items_count': len(items)
}, ensure_ascii=False)
}
else:
return {
'statusCode': 500,
'body': json.dumps('Failed to send message', ensure_ascii=False)
}
except Exception as e:
logger.error(f"Error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps(f"Error: {str(e)}", ensure_ascii=False)
}
EOF
“`
##### 2.2.4.3.2.要約送信Lambdaデプロイ
“`
# ここまでのコマンド実行結果と要約送信Lambdaの変数を代入
ROLE_ARN=$(aws iam get-role –role-name aws-lambda-role –query 'Role.Arn' –output text)
LAYER_ARN=$(aws lambda list-layer-versions –layer-name aws-rss-common-layer –query 'LayerVersions[0].LayerVersionArn' –output text)
DDB_TABLE_NAME="list-tables"
FUNCTION_NAME="send-awsrss"
MODEL_ID="anthropic.claude-3-haiku-20240307-v1:0"
MODEL_REGION="us-east-1"
DINGTALK_WEBHOOK_URL="各自設定"
# 変数確認
echo "Role ARN: $ROLE_ARN"
echo "Layer ARN: $LAYER_ARN"
echo "DDB Table: $DDB_TABLE_NAME"
echo "Function Name: $FUNCTION_NAME"
echo "Model ID: $MODEL_ID"
echo "Model Region: $MODEL_REGION"
echo "DingTalk URL: $DINGTALK_WEBHOOK_URL"
# zipファイル作成
zip function.zip lambda-functions.py
# Lambda関数作成
aws lambda create-function \
–function-name $FUNCTION_NAME \
–runtime python3.12 \
–role $ROLE_ARN \
–handler lambda-functions.lambda_handler \
–zip-file fileb://function.zip \
–layers $LAYER_ARN \
–environment Variables="{DDB_TABLE_NAME=$DDB_TABLE_NAME,MODEL_ID=$MODEL_ID,MODEL_REGION=$MODEL_REGION,DINGTALK_WEBHOOK_URL=$DINGTALK_WEBHOOK_URL}" \
–timeout 300 \
–memory-size 512
“`
2.2.5.StepFunctions作成
2.2.5.1.挙動
項番 | ポリシー | 詳細 |
---|---|---|
1 | RSS取得Lambda実行 | ・get-awsrss関数 を実行し、added_idsを取得 |
2 | 結果判定 | ・added_idsが存在するかChoice文で分岐 |
3 | 要約送信Lambda実行 | ・added_idsをsend-awsrss関数 に渡して実行 |
4 | エラーハンドリング | ・各Lambda失敗時の処理 |
2.2.5.2.StepFunctions作成
コードの詳細
##### 2.2.5.2.1.変数取得
“`shell
# ディレクトリ移動
cd ../common/
# RSS取得Lambda ARN取得
RSS_LAMBDA_ARN=$(aws lambda get-function –function-name get-awsrss –query 'Configuration.FunctionArn' –output text)
# 要約送信Lambda ARN取得
SUMMARY_LAMBDA_ARN=$(aws lambda get-function –function-name send-awsrss –query 'Configuration.FunctionArn' –output text)
# 確認
echo "RSS Lambda ARN: $RSS_LAMBDA_ARN"
echo "Summary Lambda ARN: $SUMMARY_LAMBDA_ARN"
“`
##### 2.2.5.2.2.StepFunctions定義ファイル作成
“`shell
# テンプレート作成
cat > step-functions-template.json << EOF
{
"Comment": "State Machine to process multiple RSS feed entries from DynamoDB and send summary notification",
"StartAt": "GetDynamoDBEntries",
"States": {
"GetDynamoDBEntries": {
"Type": "Task",
"Resource": "RSS_LAMBDA_ARN_PLACEHOLDER",
"ResultPath": "$.feedResult",
"Next": "CheckForUpdates"
},
"CheckForUpdates": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.feedResult.added_ids",
"IsPresent": true,
"Next": "CheckIfEmpty"
}
],
"Default": "NoUpdates"
},
"CheckIfEmpty": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.feedResult.added_ids[0]",
"IsPresent": true,
"Next": "ProcessAllEntries"
}
],
"Default": "NoUpdates"
},
"ProcessAllEntries": {
"Type": "Task",
"Resource": "SUMMARY_LAMBDA_ARN_PLACEHOLDER",
"Parameters": {
"ids.$": "$.feedResult.added_ids"
},
"End": true
},
"NoUpdates": {
"Type": "Task",
"Resource": "SUMMARY_LAMBDA_ARN_PLACEHOLDER",
"Parameters": {
"ids": []
},
"End": true
}
}
}
EOF
“`
##### 2.2.5.2.3.sedによる置換(LambdaのARN部分)
“`shell
sed "s/RSS_LAMBDA_ARN_PLACEHOLDER/$RSS_LAMBDA_ARN/g; s/SUMMARY_LAMBDA_ARN_PLACEHOLDER/$SUMMARY_LAMBDA_ARN/g" step-functions-template.json > step-functions-definition.json
“`
##### 2.2.5.2.4.StepFunctionsデプロイ
“`shell
# State Machine作成
STATE_MACHINE_NAME="aws-rss-notification"
aws stepfunctions create-state-machine \
–name $STATE_MACHINE_NAME \
–definition file://step-functions-definition.json \
–role-arn $ROLE_ARN
# State Machine ARN取得
STATE_MACHINE_ARN=$(aws stepfunctions list-state-machines –query "stateMachines[?name=='$STATE_MACHINE_NAME'].stateMachineArn" –output text)
echo "State Machine ARN: $STATE_MACHINE_ARN"
# StepFunctionsを直接実行
aws stepfunctions start-execution \
–state-machine-arn $STATE_MACHINE_ARN \
–name "test-execution-$(date +%Y%m%d-%H%M%S)"
“`
2.2.6.EventBridge作成
コードの詳細
##### 2.2.6.1.1.EventBridge Rule作成
“`shell
# ルール(日本時間毎朝6時 = UTC毎日21時)実行
RULE_NAME="aws-rss-schedule"
SCHEDULE_EXPRESSION="cron(0 21 * * ? *)" # UTC 21時 = JST 6時
aws events put-rule \
–name $RULE_NAME \
–schedule-expression "$SCHEDULE_EXPRESSION" \
–description "RSS feed check daily at 6AM JST (9PM UTC previous day)"
# Step FunctionsをターゲットとしてルールにアタッチするためのロールARN確認
TARGET_ROLE_ARN=$ROLE_ARN
# Step FunctionsをターゲットとしてEventBridgeルールに設定
aws events put-targets \
–rule $RULE_NAME \
–targets "Id"="1","Arn"="$STATE_MACHINE_ARN","RoleArn"="$TARGET_ROLE_ARN"
# ルール有効化
aws events enable-rule –name $RULE_NAME
“`
3.挙動確認
3.1.DingTalkの画面
3.2.StepFunctionsの画面
3.2.1.取得成功の場合
3.2.2.取得するものがなかった場合
4.おわりに
4.1.得られた知見
- StepFunctionsでのLambda呼び出しや分岐などの設定
- DynamoDBのTTL機能利用:自動でデータクリーンアップすることで運用コストを削減可能
4.2.今後の課題
- 処理の並列化とエラー耐性の向上のため、アーキテクチャの分散を考える
- 呼出しLambda – SQS – 処理Lambda – コールバック処理 – 送信Lambda のような構成
- 直接 StepFunctionsからBedrockを呼び出すことが可能なので、そちらとの違い
- Bedrockの監視方法の導入