サイトアイコン 協栄情報ブログ

Amazon Connect と AWS Lambdaを組み合わせて自動架電システムを構築してみた

はじめに

こんにちは、クラ本部の稲村です。
お客様に自動的に架電を実施して音声を再生できれば、商品の宣伝や営業、料金の督促が効率的に実施できそうです。
そのような機能をAmazon ConnectとAWS Lambdaを組み合わせることで構築が可能ですので、その方法をご紹介したいと思います。

想定

今回は、CSVファイル上の電話番号へ一斉に架電をして、応答しなかった人の情報をチャットツールへ通知するシステムを構築してみます。
また、架電先ごとに再生音声を切替える機能も実装してみましょう。

目標

システムの構築にあたり、下記の達成を目標とします。

  1. CSVファイルの電話番号へ一斉架電
  2. 再生音声の切り替え
  3. 不在情報をチャットツールへ通知
  4. 上記システムの定期実行

システム構成

CSVファイル

今回は架電先の情報をCSVファイルに記載します。
下記の形式で作成します。

No,電話番号,名前,フラグ
1,+8180XXXXXXXX,協栄太郎,1
2,+8170XXXXXXXX,協栄次郎,2
3,+8190XXXXXXXX,協栄三郎,3

ヘッダーの意味

Slackへの不在通知メッセージ

Slackへ送信するメッセージは下記の形式で通知します。

構成図

サービス構成

Incoming Webhook

今回は通知先チャットツールにSlackを利用し、Incoming Webhookを介して通知を行います。
Incoming Webhook は、外部サービスやアプリから Slack にメッセージを送るための仕組みです。
Slack 側で Webhook URL を発行し、その URL に対して HTTP POST リクエストを送ることで、任意のチャンネルに通知を投稿できます。
本稿ではWebhook URLの発行方法については割愛します。

Amazon S3

目的

S3上のCSVファイルを読み込んで架電を実施する想定のため、CSVファイルの格納用バケット、および格納用フォルダを構築します。

タスク

構築

1. CSV格納用バケットの作成

CSVを格納するバケットを作成していきます。
今回は以下の例で作成します。

パラメータ
バケットタイプ 汎用
バケット名 customerlist-bucket
ACL 無効
ブロックパブリックアクセス設定 すべてブロック
バージョニング 無効
暗号化 S3マネージドキー(SSE-S3)
バケットキー 有効

2. フォルダの作成

作成したバケットにアクセス → 「フォルダの作成」からCSV格納用フォルダを作成していきます。

項目
フォルダ名 customer_csv
暗号化 暗号化キーを指定する
暗号化タイプ S3マネージドキー(SSE-S3)

Amazon Connect

目的

本システムの中核となるサービスです。
ConnectのAPIを利用して架電を実現しますが、その際に必要なコンタクトフローと、コンタクトフロー内で利用する音声ファイルのインポートの作業が必要になります。

タスク

前提条件

項目
ファイル形式 .wav
ファイルサイズ 50MB未満
再生時間 5分未満
サンプリング周波数 8kHz
量子化ビット数 8bit
コーデック μ-law形式(推奨)

構築

1. 再生する音声をConnectインスタンスへインポートする

コンタクトフロー上で自前の音声を利用するためには、Connectインスタンス内へ音声ファイルをインポートする必要があります。
音声ファイルは、Connectコンソールへログイン後、サイドバーから ルーティング → プロンプト の順で押下し画面右上の「プロンプトを追加」を押した先でインポート可能です。

下記のような画面が表示される事を確認し、各設定項目を埋めてインポート作業を完了しましょう。

今回は、CSVファイルのフラグ値に応じて再生する音声を切替えるので、3つの音声ファイルを用意してインポートしていきます。

設定項目 説明
名前 1回目通知音声 CSVファイル内の「フラグ」値が1だった場合に再生する音声
Audio アップロード → ファイルを選択 用意した.wavファイルを選択する
設定項目 説明
名前 2回目通知音声 CSVファイル内の「フラグ」値が2だった場合に再生する音声
Audio アップロード → ファイルを選択 用意した.wavファイルを選択する
設定項目 説明
名前 3回目通知音声 CSVファイル内の「フラグ」値が3だった場合に再生する音声
Audio アップロード → ファイルを選択 用意した.wavファイルを選択する

2. コンタクトフローの構築

ここでは、架電に応答した時に体験する通話音声をコンタクトフローを作成して定義していきます。
まず、Connectコンソールのサイドバーから ルーティング → フロー を押下します。
表示された画面右上の「フローを作成」を押下すると下記のような画面が表示されることを確認し、構築を進めていきましょう。

完成イメージは下記の通り。

主要ブロック解説

CSVファイル内の「フラグ」値をLambda経由でコンタクトフローに渡し、コンタクトフロー内で「コンタクト属性を確認する」ブロックを利用することで、再生する音声の切り替えを実現します。

設定項目 説明
ブロック名 フラグチェック 任意のブロック名を設定
名前空間 ユーザ定義 コンタクトフローで自由に設定できるカスタム属性領域
キー flag Lambdaから渡される属性名を設定
条件 次と等しい 合致条件を設定
3 フラグ値が3の場合に条件に合致
条件 次と等しい 合致条件を設定
2 フラグ値が2の場合に条件に合致
条件 次と等しい 合致条件を設定
1 フラグ値が1の場合に条件に合致
設定項目 説明
ブロック名 最終通知
音声プロンプト 3回目通知音声 フラグ値が3の場合に再生する音声を設定
設定項目 説明
ブロック名 再度通知
音声プロンプト 2回目通知音声 フラグ値が2の場合に再生する音声を設定
設定項目 説明
ブロック名 初回通知
音声プロンプト 1回目通知音声 フラグ値が1の場合に再生する音声を設定

AWS Lambda

目的

S3の上の電話番号へConnect APIを介して架電を実施する機能と不在通知を行う機能をLambdaを利用して実現します。

タスク

autocall-function

自動架電用Lambda関数です。

処理のフロー

環境変数

システムに必要なパラメータを環境変数として設定し、ハードコーディングを防止します。
CSV_FILE_NAME = CSVファイルのパス
S3_BUCKET_NAME = S3バケットの名前
CONTACT_FLOW_ID = 使用するコンタクトフローのID
INSTANCE_ID = ConnectインスタンスのID
SOURCE_PHONE_NUMBER = 発信元の電話番号

ソースコード

import os
import csv
import json
from io import StringIO
import boto3
from botocore.exceptions import ClientError

# 環境変数
CSV_FILE_NAME = os.environ['CSV_FILE_NAME']
S3_BUCKET_NAME = os.environ['S3_BUCKET_NAME']
CONTACT_FLOW_ID = os.environ['CONTACT_FLOW_ID']
INSTANCE_ID = os.environ['INSTANCE_ID']
SOURCE_PHONE_NUMBER = os.environ['SOURCE_PHONE_NUMBER']

s3 = boto3.resource('s3')
client = boto3.client('connect')

def lambda_handler(event, context):

    # --- CSV 取得 ---
    try:
        obj = s3.Object(S3_BUCKET_NAME, CSV_FILE_NAME)
        csv_data = obj.get()['Body'].read().decode('utf-8-sig')
        csv_reader = csv.DictReader(StringIO(csv_data))
    except Exception:
        return {
            'statusCode': 500,
            'body': json.dumps({'result': False}, ensure_ascii=False) 
        }

    has_data = False

    for row in csv_reader:
        has_data = True

        # 値を前後 trim(None 対応)
        Number = (row.get('No') or '').strip()
        PhoneNumber = (row.get('電話番号') or '').strip()
        Name = (row.get('名前') or '').strip()
        Flag = (row.get('フラグ') or '').strip()

        # 名前が数値だけの場合はスキップ
        if Name.isdigit():
            continue

        # 必須項目欠落 → スキップ
        if not (Number and PhoneNumber and Name and Flag):
            continue

        # フラグ値が不正ならスキップ
        if Flag not in {"1", "2", "3"}:
            continue

        # --- 架電処理 ---
        try:
            client.start_outbound_voice_contact(
                DestinationPhoneNumber=PhoneNumber,
                ContactFlowId=CONTACT_FLOW_ID,
                InstanceId=INSTANCE_ID,
                SourcePhoneNumber=SOURCE_PHONE_NUMBER,
                TrafficType='CAMPAIGN',
                AnswerMachineDetectionConfig={
                    'EnableAnswerMachineDetection': True,
                    'AwaitAnswerMachinePrompt': True
                },
                Attributes={
                    'number': Number,
                    'phonenumber': PhoneNumber,
                    'name': Name,
                    'flag': Flag
                }
            )
        except ClientError:
            continue
        except Exception:
            continue

    # データ行が 0 件
    if not has_data:
        return {
            'statusCode': 500,
            'body': json.dumps({'result': False}, ensure_ascii=False)
        }

    # 正常終了
    return {
        'statusCode': 200,
        'body': json.dumps({'result': True}, ensure_ascii=False)
    }

API解説

フィールド名 意味
DestinationPhoneNumber 発信先の電話番号(顧客の番号)
ContactFlowId 発信時に実行するコンタクトフローの ID
InstanceId Amazon Connect インスタンスの識別子
SourcePhoneNumber 発信元として表示される電話番号
TrafficType 発信の種類
AnswerMachineDetectionConfig 留守番電話検出に関する設定
Attributes コンタクトフローに渡す任意のカスタム属性

notifier-function

不在通知用Lambda関数です。

処理のフローイメージ

環境変数

システムに必要なパラメータを環境変数として設定し、ハードコーディングを防止します。
INSTANCE_ID : Amazon Connect InstanceId
WEBHOOK_URL : Slack Incoming Webhook URL

ソースコード

# -*- coding: utf-8 -*-
"""
Amazon Connect の通話履歴から“不在”を抽出し、Slack に一覧通知する。
【必要な環境変数】
- INSTANCE_ID : Amazon Connect InstanceId
- WEBHOOK_URL : Slack Incoming Webhook URL
"""

import json
import os
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Tuple

import urllib3
import boto3

# ===== 定数 =====
JST = timezone(timedelta(hours=9))
HTTP_TIMEOUT = 5.0

http = urllib3.PoolManager()
connect = boto3.client("connect", region_name="ap-northeast-1")

# ===== 時間帯 =====
def build_time_range_utc(start_h: int, start_m: int, end_h: int, end_m: int) -> Tuple[datetime, datetime]:
    """当日JSTの指定時刻をUTCに変換。endは分の末秒を含む。"""
    today = datetime.now(JST).date()
    start = datetime(today.year, today.month, today.day, start_h, start_m, 0, tzinfo=JST)
    end = datetime(today.year, today.month, today.day, end_h, end_m, 59, tzinfo=JST)
    return start.astimezone(timezone.utc), end.astimezone(timezone.utc)

# ===== Amazon Connect =====
def search_contact_ids(instance_id: str, start_utc: datetime, end_utc: datetime) -> List[str]:
    """search_contacts で contactId を収集。"""
    ids: List[str] = []
    next_token = None
    while True:
        kwargs: Dict[str, Any] = {
            "InstanceId": instance_id,
            "TimeRange": {"Type": "INITIATION_TIMESTAMP", "StartTime": start_utc, "EndTime": end_utc},
            "SearchCriteria": {"Channels": ["VOICE"], "InitiationMethods": ["API"]},
            "MaxResults": 100,
        }
        if next_token:
            kwargs["NextToken"] = next_token
        resp = connect.search_contacts(**kwargs)
        ids.extend([c["Id"] for c in resp.get("Contacts", []) if c.get("Id")])
        next_token = resp.get("NextToken")
        if not next_token:
            break
    return ids

def is_unanswered(contact: Dict[str, Any]) -> bool:
    """ConnectedToSystemTimestamp が無ければ“不在”。"""
    return not contact.get("ConnectedToSystemTimestamp")

def format_unanswered_line(contact: Dict[str, Any]) -> str:
    """Slack 出力用の1行。"""
    attrs = contact.get("Attributes", {}) or {}
    number = attrs.get("number")
    name = attrs.get("name")
    phone = attrs.get("phonenumber")
    ts = contact.get("InitiationTimestamp")
    ts_jst = ts.astimezone(JST).strftime("%H:%M:%S") if ts else "N/A"
    return f"No.{number} 氏名:{name} 電話番号:{phone} 発信時刻: {ts_jst}"

def describe_unanswered_lines(instance_id: str, contact_ids: List[str]) -> List[str]:
    """describe_contact で“不在”のみ抽出。"""
    lines: List[str] = []
    for cid in contact_ids:
        contact = connect.describe_contact(InstanceId=instance_id, ContactId=cid).get("Contact", {}) or {}
        if is_unanswered(contact):
            lines.append(format_unanswered_line(contact))
    return lines

# ===== Slack =====
def post_to_slack(webhook_url: str, text: str) -> None:
    """Slack Incoming Webhook に送信。"""
    http.request(
        "POST",
        webhook_url,
        body=json.dumps({"text": text}, ensure_ascii=False).encode("utf-8"),
        headers={"Content-Type": "application/json"},
        timeout=urllib3.Timeout(total=HTTP_TIMEOUT),
    )

# ===== Lambda ハンドラ =====
def lambda_handler(event, context):
    """
    フロー:
      1) JST当日の00:00〜23:59をUTCに変換
      2) search_contacts で contactId 収集
      3) describe_contact で“不在”のみ抽出
      4) Slack に通知(0件でも通知)
    """
    instance_id = os.environ["INSTANCE_ID"]
    webhook_url = os.environ["WEBHOOK_URL"]

    # JSTで当日0:00〜23:59 → UTCに変換
    start_utc, end_utc = build_time_range_utc(0, 0, 23, 59)

    contact_ids = search_contact_ids(instance_id, start_utc, end_utc)

    if not contact_ids:
        post_to_slack(webhook_url, "本日の通話はありませんでした")
        return {"statusCode": 200, "body": json.dumps({"message": "OK"})}

    lines = describe_unanswered_lines(instance_id, contact_ids)

    if not lines:
        post_to_slack(webhook_url, "本日の不在着信はありませんでした")
    else:
        post_to_slack(webhook_url, "\n".join(lines))

    return {"statusCode": 200, "body": json.dumps({"message": "OK"})}

API解説

connect.search_contacts(
            "InstanceId": instance_id,
            "TimeRange": {"Type": "INITIATION_TIMESTAMP", "StartTime": start_utc, "EndTime": end_utc},
            "SearchCriteria": {"Channels": ["VOICE"], "InitiationMethods": ["API"]},
            "MaxResults": 100
        )

serch_contactsは、指定した条件に合致する通話履歴の "ContactId" を取得します。
"ContactId" は通話履歴に紐付く識別IDで、次に説明する "describe_contact" で詳細情報を取得するために必須のパラメータです。
今回は00:00~23:59の範囲の"Contactid"を一括で取得しリストに格納しています。
APIの各フィールドの意味は下記のとおりです。

フィールド名 説明
InstanceId 対象となる Amazon Connect インスタンスの ID
TimeRange 基準となるタイムスタンプの種類(ここでは通話開始時刻)
StartTime 検索対象の開始時刻
EndTime 検索対象の終了時刻
SearchCriteria 絞り込み条件
Channels 通話チャネル(例: VOICE, CHAT など)
InitiationMethods 通話の開始方法(例: API, INBOUND, OUTBOUND)
MaxResults 1回の API 呼び出しで取得する最大件数(最大 100)
describe_contact(InstanceId=instance_id, ContactId=cid)

Amazon Connect の特定の通話履歴(Contact)を指定して、その詳細情報を取得する API です。
たとえば、
通話がいつ始まったか(InitiationTimestamp)
顧客と実際につながったかどうか(ConnectedToSystemTimestamp の有無)
Lambda などから渡した属性情報(Attributes)
といった情報を確認できます。

今回は、取得した履歴上の ConnectedToSystemTimestamp の有無で不在の判定を行っています。
ConnectedToSystemTimestampが存在していない、つまり通話が始まっていない = 不在というロジックです。

EventBridge

目的

自動架電用Lambdaと不在通知用LambdaのEventBridgeスケジュールを作成し、架電と不在通知の実行を自動化します。
今回は、10:00に自動架電用Lambdaの実行をスケジュールし、10分後の10:10に不在通知用Lambdaを実行するスケジュールを設定してみましょう。
注意点として、通話が終了してから通話履歴の作成が完了するまで多少の時間がかかるため、架電終了後しばらくしてから不在通知用Lambdaを実行する事を推奨します。今回はマージンとして10分の間隔を空けて作成しています。

1. 自動架電用Lambdaのスケジュールの作成

下記のパラメータでスケジュールを作成しました。
架電を実施したい時間を設定してみましょう。
今回のスケジュール時間は、毎日10:00に実行することを想定して作成しました。

項目
スケジュール名 autocall-function-schedule
頻度 定期的なスケジュール
タイムゾーン Asia/Tokyo
スケジュールの種類 cron式
cron (00 , 10 , * , * , ? ,*)
フレックスタイムウィンドウ オフ
ターゲット AWS Lambda Invoke
Invoke autocall-function
再試行ポリシー オフ
スケジュールを有効化 有効化
スケジュール完了後のアクション NONE
デッドレターキュー (DLQ) なし 
アクセス許可  このスケジュールの新しいロールを作成 

2. 自動架電用Lambdaのスケジュールの作成

架電が実施されてから10分後に不在通知用Lambdaが起動されるようスケジュールを作成していきます。

項目
スケジュール名 notifier-function-schedule
頻度 定期的なスケジュール
タイムゾーン Asia/Tokyo
スケジュールの種類 cron式
cron (10 , 10 , * , * , ? ,*)
フレックスタイムウィンドウ オフ
ターゲット AWS Lambda Invoke
Invoke notifier-function
再試行ポリシー オフ
スケジュールを有効化 有効化
スケジュール完了後のアクション NONE
デッドレターキュー (DLQ) なし 
アクセス許可  このスケジュールの新しいロールを作成 

テスト

ここまでくれば、自動架電と不在通知のシステムが構築できています。
手動実行して動作を確認してみましょう。

自動架電用Lambdaの実行

まずは、CSVファイルにNo、自分の電話番号、名前、フラグ値を入れて、自動架電用Lambdaを実行してみましょう。
電話がかかってくれば成功です。そのまま電話を切断しましょう。

不在通知用Lambdaの実行

自動架電用Lambdaの実行が成功したら、不在通知用Lambdaを実行して不在通知を送信しましょう。
先ほどの架電で切断していれば、下記のようにSlackへ通知が届くはずです。

まとめ

今回はシンプルな構成で架電と不在通知の自動化システムを構築しました。
これでCSVファイルをS3にアップロードするだけで、指定の時刻に架電から通知までの処理をすべて自動で実行してくれます。
実運用では架電数に応じて、レートやクォータ制限の緩和申請も視野に入れて構築しましょう。
LambdaやS3、EventBridgeは無料利用枠に収まる場合がほとんどかと思いますので、コスト面でもリーズナブルに目標を実現できています。

モバイルバージョンを終了