AWS SESのバウンス発生をSlackに通知する

こいついつもLambdaで何かやってるな。

AWSのSES(Simple Email Service)でバウンスや苦情が発生した際、それをSlackに通知することで、迅速な発生の検知と社内共有ができるようにしてみました。

「SESバウンス発生→SNSトピック呼び出し→SNSトピックからLambda関数を呼び出し」という流れでLambda関数に処理を渡しています。

Lambda関数

大まかな処理の流れは下記のとおりで、処理内容自体は単純です。
苦情の場合は配信抑制リストからの削除処理がありません。

  1. SNSトピックからLambdaが呼び出される。
  2. SNSトピックからのメッセージ内容を見て、バウンスか苦情かで処理を分岐する。
    1. バウンスの場合
      1. バウンス通知メッセージを作成する。
      2. 配信抑制リストから削除するアドレスの場合、削除する。
      3. 削除した場合、削除通知メッセージを通知に追加する。
      4. Slack通知を行うアドレスの場合、SlackへPOSTする。
    2. 苦情の場合
      1. 苦情通知メッセージを作成する。
      2. Slack通知を行うアドレスの場合、SlackへPOSTする。

SlackにPOSTするコンテンツをblock構造で記述しているせいで、非常にコードが長くなってしまっていまいました。見づらくて申し訳ありません。

import json
from datetime import date, datetime
import urllib.request

import boto3


def json_serial(obj):
    # If it is a date type, convert it to a character string.
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    # Other than the above, it is not supported.
    raise TypeError ("Type %s not serializable" % type(obj))


def delete_suppressed_destination(mail):
    client = boto3.client('sesv2')

    suppressed = client.get_suppressed_destination(EmailAddress=mail)
    del_json = json.dumps(suppressed, default=json_serial)
    message = '\n'.join([
        '*Delete suppressed destination json*: `' + mail + '`',
        '```' + del_json + '```'
        ])

    try:
        client.delete_suppressed_destination(EmailAddress=mail)
    except client.exceptions.NotFoundException:
        post_slack('*NotFoundException*: `' + mail + '`\n' + message)
        return
    except client.exceptions.BadRequestException:
        post_slack('*BadRequestException*: `' + mail + '`\n' + message)
        return
    except client.exceptions.TooManyRequestsException:
        post_slack('*TooManyRequestsException*: `' + mail + '`\n' + message)
        return

    try:
        suppressed = client.get_suppressed_destination(EmailAddress=mail)
    except client.exceptions.NotFoundException:
        del_response = '*Deleted!*'
    except client.exceptions.BadRequestException:
        del_response = '*BadRequestException*'
    except client.exceptions.TooManyRequestsException:
        del_response = '*TooManyRequestsException*'

    return (del_response, del_json)


def is_no_suppress_mail(mail):
    if 'no-suppress' in mail or mail.endswith('@no.suppress.domain.jp'):
        return True
    return False
    
    
def is_ignore_mail(mail):
    ignore_list = (
        'ignore@example.com',
        )
    if mail in ignore_list or mail.endswith('@ignore.domain.jp'):
        return True
    return False


# Bounce.
def build_blocks_bounce(bounced_recipient, message_json):

    mail = bounced_recipient['emailAddress']

    diagnosticCode = bounced_recipient['diagnosticCode']
    bounce_status = '\n'.join((
        'action: ' + bounced_recipient['action'],
        'status: ' + bounced_recipient['status'],
        'diagnostic_code: ' + diagnosticCode,
        ))

    bounce_json = json.dumps(message_json)

    blocks = [
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*Mail: *"
                },
                {
                    "type": "mrkdwn",
                    "text": "`" + mail + "`"
                }
            ]
        },
		{
			"type": "section",
			"text": {
				"type": "mrkdwn",
				"text": "*Bounced information: *\n```" + bounce_status + "```"
			}
		},
		{
			"type": "section",
			"text": {
				"type": "mrkdwn",
				"text": "*Bounced message json: *\n```" + bounce_json + "```"
			}
		}
    ]
    return blocks

def build_blocks_delete(mail, del_response, del_json):
    blocks = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": "Delete notification"
            }
        },
        {
            "type": "divider"
        },
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*Mail:*"
                },
                {
                    "type": "mrkdwn",
                    "text": "`" + mail + "`"
                }
            ]
        },
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*Response:*"
                },
                {
                    "type": "mrkdwn",
                    "text": del_response
                }
            ]
        },
		{
			"type": "section",
			"text": {
				"type": "mrkdwn",
				"text": "*Suppressed destination json: *\n```" + del_json + "```"
			}
		}
    ]
    return blocks

def prepare_bounced_message(message_json):
    bounced_message = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": "Bounce notification"
            }
        },
        {
            "type": "divider"
        },
		{
			"type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*Timestamp (UTC): *"
                },
                {
                    "type": "mrkdwn",
                    "text": "`" + message_json['bounce']['timestamp'] + "`"
                }
            ]
		}
    ]
    return bounced_message


# Complaint.
def build_blocks_complaint(complained_recipient, message_json):

    mail = complained_recipient['emailAddress']

    complaint_json = json.dumps(message_json)

    blocks = [
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*Mail: *"
                },
                {
                    "type": "mrkdwn",
                    "text": "`" + mail + "`"
                }
            ]
        },
		{
			"type": "section",
			"text": {
				"type": "mrkdwn",
				"text": "*Complaintd message json: *\n```" + complaint_json + "```"
			}
		}
    ]
    return blocks

def prepare_complained_message(message_json):
    complained_message = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": "Complaint notification"
            }
        },
        {
            "type": "divider"
        },
		{
			"type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*Timestamp (UTC): *"
                },
                {
                    "type": "mrkdwn",
                    "text": "`" + message_json['complaint']['timestamp'] + "`"
                }
            ]
		}
    ]
    return complained_message


def post_slack(blocks, message=None):
    send_data = {
        "channel": "XXXXXXXXXXX",   # SESの通知用Channel-ID
        "username": "SES Notice",
        "icon_emoji": ":mailbox_with_no_mail:",
        "text": message,
        "blocks": blocks
    }

    send_text = "payload=" + json.dumps(send_data)

    print('post_slack() send_text: ', send_text)

    request = urllib.request.Request(
        "(SlackのWebhook URL)",
        data=send_text.encode('utf-8'),
        method="POST"
    )

    with urllib.request.urlopen(request) as response:
        response_body = response.read().decode('utf-8')


def ses_process(event):
    message_json = json.loads(event['Records'][0]['Sns']['Message'])

    if message_json['notificationType'] == 'Bounce':
        bounced_recipients = message_json['bounce']['bouncedRecipients']
        bounced_mails = []
        delete_result = []
        notice_message = ""

        for bounced_recipient in bounced_recipients:
            bounced_mails += build_blocks_bounce(bounced_recipient, message_json)
            mail = bounced_recipient['emailAddress']
            if is_no_suppress_mail(mail) and not is_ignore_mail(mail):
                del_result, del_json = delete_suppressed_destination(mail)
                delete_result += build_blocks_delete(mail, del_result, del_json)
                notice_message = "DELETE: " + mail

        bounced_message = prepare_bounced_message(message_json)
        bounced_message = bounced_message + bounced_mails + delete_result

        if not notice_message:
            notice_message = "BOUNCE: " + mail
            
        if not is_ignore_mail(mail):
            post_slack(bounced_message, notice_message)

    elif message_json['notificationType'] == 'Complaint':
        complained_recipients = message_json['complaint']['complainedRecipients']
        complained_mails = []

        for complained_recipient in complained_recipients:
            complained_mails += build_blocks_complaint(complained_recipient, message_json)
            mail = complained_recipient['emailAddress']

        complained_message = prepare_complained_message(message_json)
        complained_message = complained_message + complained_mails
        notice_message = "COMPLAINT: " + mail
        
        if not is_ignore_mail(mail):
            post_slack(complained_message, notice_message)

    return


def lambda_handler(event, context):
    if event['Records'][0]['EventSource'] == 'aws:sns' :
        ses_process(event)

    return

例外処理が甘いですね。

権限については実装当時に精査していなかったようで、「AmazonSESFullAccess」を付与されていましたが、get_suppressed_destination()delete_suppressed_destination()をする権限くらいがあれば動きそうです。甘々ですね。

細かい処理についての補足……?

is_no_suppress_mail(mail)

SESではバウンスの発生時に、配信抑制リスト(suppression-list)というものにメールアドレスを登録し、メールのバウンスが再発することを防止する仕組みがあります。

この仕組みは便利なもので活用したいのですが、一時的な問題でバウンスが発生することが度々ある等、特定のアドレスだけは抑制リストに入らないでほしいようなケースがあるかもしれません。

こういったケースでは、抑制リストに登録された直後にリストから自動で登録解除することで、擬似的にリストに登録されないような状態にすることができます。(バウンスレートが急上昇するリスクもあるので、運用時は要注意)

この「自動で配信抑制リストから削除するアドレス」かを判定する関数が is_no_suppress_mail(mail) です。

is_ignore_mail(mail)

Slackへ通知したくないメールアドレス(既知のダミーアドレス等)もあるため、通知を送らないアドレスのリストを用意します。

Slack通知の有無を判定する際に用いる判定関数が is_ignore_mail(mail) です。

build_blocks_bounce(), build_blocks_delete(), prepare_bounced_message() 等

SNSトピックから受け取ったメッセージを元に、見やすく整形してSlackにPOSTするコンテンツを作成します。

実際の投稿の様子

コードの短さを犠牲に情報が見やすい投稿を実現しています。

バウンス通知(Bounce)
バウンス通知(配信抑制リストにあるもの)
バウンス通知+削除通知(Bounce+Delete)
苦情通知(Complaint)

Lambda 不要なAMIとスナップショットをまとめて削除する

起動テンプレート自動更新起動設定自動更新の記事を投稿しましたが、こういう自動化をしていると、気づけばAMIが山のように溜まってしまいます。

AMIに紐付いたスナップショットは、(大抵の場合少額とはいえ)ストレージ料金が発生します。

AWSコンソールのAMIのページから、不要なAMIを登録解除することがあると思いますが、紐付いたスナップショットまでは自動で消えてくれません。

紐付いたスナップショットをスナップショットの画面から検索して削除するのが地味に面倒だったので、一括削除できるようなLambda関数を作ってしまいます。

Lambda関数

処理の流れはシンプルで、「AMI削除→紐付いたスナップショットを検索して削除」という処理を、与えられたAMIのリストを回して繰り返し行うだけです。

import boto3
import logging
from botocore.exceptions import ClientError


ec2 = boto3.client('ec2')


def unregister_ami(delete_ami_id):
    # Unregister AMI.
    try:
        response = ec2.deregister_image(ImageId=str(delete_ami_id))
    except ClientError as e:
        if e.response['Error']['Code'] == 'InvalidAMIID.Unavailable':
            print('InvalidAMIID.Unavailable: ' + delete_ami_id)
            return True
        elif e.response['Error']['Code'] == 'InvalidAMIID.NotFound':
            print('InvalidAMIID.NotFound: ' + delete_ami_id)
            return True
        else:
            print(e.response['Error']['Code'])
            print(e.response['Error']['Message'])
            raise e
    print('Unregister AMI: ' + delete_ami_id)
    return response.get('ResponseMetadata', {}).get('HTTPStatusCode', -1) == 200


def delete_related_snapshots(delete_ami_id):
    # Get target snapshots.
    try:
        response = ec2.describe_snapshots(
            Filters=[
                {
                    'Name': 'description',
                    'Values': ['Created by CreateImage(*) for ' + delete_ami_id + '*',]
                }
            ]
        )
    except ClientError as e:
        print(e.response['Error']['Code'])
        print(e.response['Error']['Message'])
        raise e

    # Delete target snapshots.
    for snapshot in response['Snapshots']:
        try:
            response = ec2.delete_snapshot(SnapshotId=snapshot['SnapshotId'])
            if response.get('ResponseMetadata', {}).get('HTTPStatusCode', -1) != 200:
                raise('# Cannot delete snapshot: %s', snapshot)
            print('Delete Snapshot: ' + snapshot['SnapshotId'])
        except ClientError as e:
            print(e.response['Error']['Code'])
            print(e.response['Error']['Message'])
            logging.error("# Delete snapshot error: %s", e)
        
    return


def lambda_handler(event, context):
    delete_amis = event['delete_amis']
    print(delete_amis)

    for delete_ami in delete_amis:
        # Delete AMI.
        unregister_ami(delete_ami)
    
        # Delete Snapshots.
        delete_related_snapshots(delete_ami)
    
    return

Lambda関数には、以下の権限を付与します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "ec2:DeregisterImage",
                "ec2:DeleteSnapshot",
                "ec2:DescribeSnapshots"
            ],
            "Resource": "*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": (略)
        },
        {
            "Sid": "VisualEditor2",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": (略)
        }
    ]
}

使い方

  1. AWSコンソールのAMIの画面で、不要なAMIをチェックしていきます。
  2. 画面下部にチェックしたAMIのidがずらずらと表示されるので、これをコピーします。
  3. 適当なエディタで整形し、Lambda関数のテストイベントに下記のようなデータを投入して実行すれば、AMIとスナップショットが削除されます。
{
    "delete_amis": [
        "ami-0123456789abcdef1",
        "ami-0123456789abcdef2",
        "ami-0123456789abcdef3"
    ]
}

余談

スナップショットの削除部分のみをLambda関数で作っておき、AMIの削除操作を行った際に、自動で関連スナップショットを削除するようなことも可能(CloudTrail利用)で、そのようにした方が効率的で良いかと思います。

一方であまり削除処理系を自動化しすぎると、大事なデータの消失などが発生する懸念もあるため、「起動テンプレートの更新処理に連動して古い起動テンプレートや古いAMIを削除する」ような処理の実装は控えています。

参考

EC2 Auto Scalingグループ 起動テンプレートの自動更新

起動テンプレート、便利ですよね。Auto Scalingグループで参照するものを「Latest」にしておけば、デプロイのたびにAuto Scalingグループの設定を変更する必要がありません。

起動テンプレートの更新作業をデプロイ後とかに毎回やっていたのですが、
 AMI取得
  ↓
 起動テンプレートを更新(新しいバージョンを作成)
たったこれだけの作業でも、手作業でやると地味に面倒だったり、作業ミスや作業漏れが発生しがちで残念な感じです。
起動設定の更新同様、自動化してしまいます。

※レガシーですが、レガシーゆえに最近の情報が少なかったので、起動設定の自動更新についても記事を書きました

Lambda関数による実装

起動テンプレートを使っている場合、Auto Scalingグループの更新は不要なため、AMI取得元のインスタンスのNameを指定して、そこから必要な情報を取得していきます。
(Auto Scalingグループに属するインスタンスには、共通のNameタグを付けています)

import boto3
from botocore.exceptions import ClientError
import datetime as dt


timestamp = (dt.datetime.now() + dt.timedelta(hours=9)).strftime('%Y%m%d-%H%M')

ec2 = boto3.client('ec2')


def get_instance(name_tag_value):
    try:
        filter_key = 'tag:aws:autoscaling:groupName'
        reservations = ec2.describe_instances(
                Filters=[{'Name':filter_key,'Values':[name_tag_value]}])
        return reservations['Reservations'][0]['Instances'][0]
    except ClientError as e:
        print('#ClientError!! at get_instance()')
        raise e


def make_image_name(instance_name):
    # Make image name like: test-ec2-instance-20210614-1300
    try:
        return instance_name + '-' + timestamp
    except ClientError as e:
        print('#ClientError!! at make_image_name()')
        raise e


def create_ami(image_name, instance_id, description):
    # Create image NO-reboot.
    try:
        image = ec2.create_image(
                InstanceId=instance_id,
                # DryRun=True,  # For test.
                Name=image_name,
                Description=description,
                NoReboot=True,
            )
        return image['ImageId']
    except ClientError as e:
        print('#ClientError!! at create_ami()')
        raise e


def update_launch_template(target_launch_template_id, ami_id, description):
    # Update target launch template.
    try:
        response = ec2.create_launch_template_version(
                LaunchTemplateId=target_launch_template_id,
                VersionDescription=description,
                SourceVersion="$Latest",
                LaunchTemplateData={
                    "ImageId": ami_id
                }
            )
        print(f"New launch template created with AMI {ami_id}")
    except ClientError as e:
        print('#ClientError!! at make_image_name()')
        raise e


def set_launch_template_default_version(target_launch_template_id):
    try:
        response = ec2.modify_launch_template(
                LaunchTemplateId=target_launch_template_id,
                DefaultVersion="$Latest"
            )
        print("Default launch template set to $Latest.")
    except ClientError as e:
        print('#ClientError!! at set_launch_template_default_version()')
        raise e


def main_process(name_tag_value, description=''):
    try:
        # Get target instance id.
        target_instance = get_instance(name_tag_value)
        instance_id = target_instance['InstanceId']
        print(instance_id)
    
        # Get target launch template id.
        for tag in target_instance['Tags']:
            if tag['Key'] == 'aws:ec2launchtemplate:id':
                target_launch_template_id = tag['Value']
                break
        print(target_launch_template_id)
    
        # Make AMI name.
        image_name = make_image_name(name_tag_value)
        print(image_name)
    
        # Create AMI from target instance.
        if not description:
            description = f'Lambda create. id:{instance_id}'
        ami_id = create_ami(image_name, instance_id, description)
        print(ami_id)
    
        # Update Launch Template
        update_launch_template(target_launch_template_id, ami_id, description)
    
        # Update Launch Template default version.
        set_launch_template_default_version(target_launch_template_id)
    except ClientError as e:
        print(e)


def lambda_handler(event, context):
    target_instances = event['target_instances']
    
    for target_instance in target_instances:
        name_tag_value = target_instance['tag_name']
        description = target_instance['description']
        
        print(name_tag_value)

        main_process(name_tag_value, description)
        
    return

大まかな処理の流れは下記のとおりです。

  1. 指定のインスタンスNameを持つインスタンスを適当に1つ取得
  2. 対象のインスタンスに紐付いた起動テンプレートのIDを取得
  3. 対象のインスタンスからAMIを作成
  4. 取得したAMIを元に起動テンプレートを更新(新しいバージョンを作成)
    1. ソースとして$Latestを指定
    2. ImageIdとして今取得したAMIを指定
  5. 作成した起動テンプレートのバージョンを、デフォルトに指定

また、今回のコードには含めていないものの、古い起動テンプレートを削除する処理も可能です。

例えば、下記のようなdelete_previous_version()を作成し、set_launch_template_default_version()から呼び出すようにしてみると、起動テンプレートの更新後に、3つ前のバージョンが自動で削除されるようになります。

def delete_previous_version(target_launch_template_id, previous_version):
    try:
        response = ec2.delete_launch_template_versions(
                LaunchTemplateId=target_launch_template_id,
                Versions=[
                    previous_version,
                ]
            )
        print(f"Old launch template {previous_version} deleted.")
    except ClientError as e:
        print('#ClientError!! at delete_previous_version()')
        print(e)

def set_launch_template_default_version(target_launch_template_id):
    try:
        response = ec2.modify_launch_template(
                LaunchTemplateId=target_launch_template_id,
                DefaultVersion="$Latest"
            )
        print("Default launch template set to $Latest.")

        previous_version = str(
                int(response["LaunchTemplate"]["LatestVersionNumber"]) - 3)
        print(previous_version)
        delete_previous_version(target_launch_template_id, previous_version)
    except ClientError as e:
        print('#ClientError!! at set_launch_template_default_version()')
        raise e

Lambda関数には、通常のログ作成関連の権限の他に、下記のような権限を与えておきます。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ec2:CreateImage",
                "ec2:DescribeInstances"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": "ec2:DescribeLaunchTemplateVersions",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:ModifyLaunchTemplate",
                "ec2:DeleteLaunchTemplateVersions",
                "ec2:CreateLaunchTemplateVersion"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

起動設定のときと同様、手動で実行する際には、テストイベントに下記のようなデータを入れてテストを走らせます。(いつもの)

※descriptionを空にした場合、「Lambda create. id:{instance_id}」という定型文が入るようにしています。

{
  "target_instances": [
    {
      "tag_name": "test-ec2-instance",
      "description": "hogehoge"
    }
  ]
}

これにより、起動テンプレートの更新作業が
 対象とするEC2インスタンスのNamedescriptionを書き換える
  ↓
 テストを走らせる
だけになりました(エラーが無ければ)。

だいぶお手軽ですね。

cronによる定期更新

cronによる定期更新の設定方法は、「EC2 Auto Scalingグループ 起動設定の自動更新」と同様です。「cronによる定期更新」の項を参照してください。

CodePipelineから呼び出す場合

CodePipelineから呼び出す場合の設定方法についても、やることは「EC2 Auto Scalingグループ 起動設定の自動更新」の「CodePipelineから呼び出す場合」と同様です。

Pipelineから与えられる引数を処理するために、下記の関数を追記します。
PipelineのLambda関数を呼び出すステージで、下記のように値を指定します。
UserParameters: {"branch":"master","instance_name":"test-ec2-instance"}

import json


def get_user_params(job_data):
    try:
        user_parameters = job_data['actionConfiguration']['configuration']['UserParameters']
        decoded_parameters = json.loads(user_parameters)
    except Exception as e:
        raise Exception('UserParameters could not be decoded as JSON')
    
    if 'branch' not in decoded_parameters:
        raise Exception('UserParameters JSON must include the "branch"')
    
    if 'instance_name' not in decoded_parameters:
        raise Exception('UserParameters JSON must include the "instance_name"')

    return decoded_parameters

メインの処理では、CodePipelineにレスポンスを返すようにします。

def lambda_handler(event, context):
    codepipeline = boto3.client('codepipeline')
    try:
        # Get CodePipeline user params.
        job_data = event['CodePipeline.job']['data']
        params = get_user_params(job_data)
        name_tag_value = params['instance_name']
        github_branch = params['branch']
        print(name_tag_value)
        print(github_branch)

        # Get target instance id.
        target_instance = get_instance(name_tag_value)
        instance_id = target_instance['InstanceId']
        print(instance_id)

        # Get target launch template id.
        for tag in target_instance['Tags']:
            if tag['Key'] == 'aws:ec2launchtemplate:id':
                target_launch_template_id = tag['Value']
                break
        print(target_launch_template_id)

        # Make AMI name.
        image_name = make_image_name(name_tag_value)
        print(image_name)

        # Create AMI from target instance.
        description = f'Lambda create. branch:{github_branch} id:{instance_id}'
        ami_id = create_ami(image_name, instance_id, description)
        print(ami_id)

        # Update Launch Template
        update_launch_template(target_launch_template_id, ami_id, description)

        # Update Launch Template default version.
        set_launch_template_default_version(target_launch_template_id)
        
        # Return Success to CodePipeline.
        codepipeline.put_job_success_result(
                jobId = event['CodePipeline.job']['id'])
    except ClientError as e:
        print(e)
        # Return Failure to CodePipeline.
        codepipeline.put_job_failure_result(
                jobId = event['CodePipeline.job']['id'],
                failureDetails={
                    'type': 'JobFailed',
                    'message': str(e)
                }
            )

Lambda関数の権限は下記のとおりです。logs関連のResourceは伏せていますが、関数作成時に自動作成される箇所なので問題ないはずです。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": (略)
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                (略)
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:CreateImage",
                "ec2:DescribeInstances"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "codepipeline:PutJobSuccessResult",
                "codepipeline:PutJobFailureResult"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": "ec2:DescribeLaunchTemplateVersions",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:ModifyLaunchTemplate",
                "ec2:DeleteLaunchTemplateVersions",
                "ec2:CreateLaunchTemplateVersion"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

これで、Pipelineの進行にあわせて「デプロイ後に自動で”AMI取得→起動テンプレート更新”」ができるようになりました。

参考

boto3のAPIリファレンス

EC2 Auto Scalingグループ 起動設定の自動更新

前提: 起動設定よりも起動テンプレートを使った方が良いです。

一部の古いシステム等、起動設定を使っているまま起動テンプレートに移行ができていないものが、あったりなかったりすると思います。

Auto Scalingグループに紐付いた起動設定の更新作業って、デプロイ後とかに毎回やっていたのですが、手作業でやると地味に面倒です。
 AMI取得
  ↓
 起動設定を作成(既存のものからコピーしてName等変更)
  ↓
 Auto Scalingグループの設定更新
この3ステップめんどくないですか?めんどいですよね?自動化しませんか?します(しました)。

※AWSの既存サービスを組み合わせて、本記事の内容が全て実装できるかもしれません。無知を晒しているな~と思ったらコメント等お願いいたします。救われます。

※同様の流れで起動テンプレートを自動更新する記事も書きました

Lambda関数による実装

今回はAuto Scalingグループについて起動設定の更新を行いたいので、AMIの取得元はAuto ScalingグループのNameで指定することにしています。

import boto3
from botocore.exceptions import ClientError
import datetime as dt


timestamp = (dt.datetime.now() + dt.timedelta(hours=9)).strftime('%Y%m%d-%H%M')

ec2 = boto3.client('ec2')
autoscaling = boto3.client('autoscaling')


def get_instance(name_tag_value):
    try:
        filter_key = 'tag:aws:autoscaling:groupName'
        response = ec2.describe_instances(
                Filters=[{'Name':filter_key,'Values':[name_tag_value]}])
        return response['Reservations'][0]['Instances'][0]
    except ClientError as e:
        print('#ClientError!! at get_instance()')
        raise e


def make_image_name(instance_name):
    # Make image name like: hogehoge-20210611-1530
    try:
        return instance_name + '-' + timestamp
    except ClientError as e:
        print('#ClientError!! at make_image_name()')
        raise e


def create_ami(image_name, instance_id, description):
    # Create image NO-reboot.
    try:
        image = ec2.create_image(
                InstanceId=instance_id,
                # DryRun=True,  # For test.
                Name=image_name,
                Description=description,
                NoReboot=True,
            )
        return image['ImageId']
    except ClientError as e:
        print('#ClientError!! at create_ami()')
        raise e


def get_launch_conf_name(auto_scaling_group_name):
    # Get Launch Configuration Name of Auto Scaling Group.
    try:
        response = autoscaling.describe_auto_scaling_groups(
                AutoScalingGroupNames=[auto_scaling_group_name])
        return response['AutoScalingGroups'][0]['LaunchConfigurationName']
    except ClientError as e:
        print('#ClientError!! at get_launch_conf_name()')
        raise e


def get_launch_conf(launch_conf_name):
    # Get Launch Configuration of Auto Scaling Group.
    try:
        response = autoscaling.describe_launch_configurations(
                LaunchConfigurationNames=[launch_conf_name])
        return response['LaunchConfigurations'][0]
    except ClientError as e:
        print('#ClientError!! at get_launch_conf()')
        raise e


def create_launch_conf(old_launch_conf, launch_conf_name, ami_id):
    # Create NEW Launch Configuration.
    # Get target snapshots.
    try:
        response = ec2.describe_snapshots(
            Filters=[
                {
                    'Name': 'description',
                    'Values': ['Created by CreateImage(*) for ' + ami_id + '*',]
                }
            ]
        )
    except ClientError as e:
        print(e.response['Error']['Code'])
        print(e.response['Error']['Message'])
        raise e
    
    snapshot_id = response['Snapshots'][0]['SnapshotId']
    print(snapshot_id)

    # Create Launch Configuration.
    old_launch_conf['BlockDeviceMappings'][0]['Ebs']['SnapshotId'] = snapshot_id
    try:
        autoscaling.create_launch_configuration(
                LaunchConfigurationName=launch_conf_name,
                ImageId=ami_id,
                KeyName=old_launch_conf['KeyName'],
                SecurityGroups=old_launch_conf['SecurityGroups'],
                UserData="#!/bin/bash\nrm -f /var/tmp/aws-mon/instance-id",
                InstanceType=old_launch_conf['InstanceType'],
                BlockDeviceMappings=old_launch_conf['BlockDeviceMappings'],
                InstanceMonitoring=old_launch_conf['InstanceMonitoring'],
                IamInstanceProfile=old_launch_conf['IamInstanceProfile'],
                EbsOptimized=old_launch_conf['EbsOptimized'])
    except ClientError as e:
        print('#ClientError!! at create_launch_conf()')
        raise e
    return


def update_auto_scaling_group(auto_scaling_group_name, new_launch_conf_name):
    # Update Launch Configuration of Auto Scaling Group.
    autoscaling.update_auto_scaling_group(
            AutoScalingGroupName=auto_scaling_group_name,
            LaunchConfigurationName=new_launch_conf_name)
    return


def main_process(auto_scaling_group_name, description=''):
    try:
        # Get Original(Old) Launcn Configuration.
        old_launch_conf_name = get_launch_conf_name(auto_scaling_group_name)
        print(old_launch_conf_name)
        old_launch_conf = get_launch_conf(old_launch_conf_name)
        # print(old_launch_conf)
        
        # Make AMI name and Launcn Configuration name.
        image_name = make_image_name(auto_scaling_group_name)
        launch_conf_name = image_name
        print(image_name)
    
        # Create AMI from target instance.
        target_instance = get_instance(auto_scaling_group_name)
        instance_id = target_instance['InstanceId']
        print(instance_id)
        if not description:
            description = f'Lambda create. id:{instance_id}'
        ami_id = create_ami(image_name, instance_id, description)
        print(ami_id)
        
        # Create Launch Configuration.
        create_launch_conf(old_launch_conf, launch_conf_name, ami_id)
    
        # Update Auto Scaling Group.
        update_auto_scaling_group(auto_scaling_group_name, launch_conf_name)
    except ClientError as e:
        print(e)


def lambda_handler(event, context):
    targets = event['targets']
    
    for target in targets:
        auto_scaling_group_name = target['auto_scaling_group_name']
        description = target['description']
        
        print(auto_scaling_group_name)

        main_process(auto_scaling_group_name, description)
        
    return

大まかな処理の流れは下記のとおりです。

  1. 現在の(古い)起動設定を取得(現在のものから設定を引き継ぐため)
    1. Auto Scalingグループから現在の(古い)起動設定のNameを取得
    2. 現在の(古い)起動設定を取得
  2. 新しく作成するAMIのNameを決定
  3. 新しい起動設定のNameを決定(便宜上、AMIのNameと同名とする)
  4. 新しい起動設定で使うAMIを作成
    1. Auto Scalingグループから適当なインスタンス1台を取得してインスタンスIDを取得
    2. 対象のインスタンスから、AMIを作成
  5. 新しい起動設定を作成
  6. 新しい起動設定をAuto Scalingグループに割り当て(設定の更新)

Lambda関数には、下記のような権限を付与しておきます。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "iam:PassRole",
                "ec2:DescribeSnapshots",
                "ec2:DescribeInstances",
                "ec2:CreateImage",
                "autoscaling:DescribeAutoScalingGroups",
                "autoscaling:UpdateAutoScalingGroup",
                "autoscaling:DescribeLaunchConfigurations",
                "autoscaling:CreateLaunchConfiguration"
            ],
            "Resource": "*"
        }
    ]
}

手動で実行する際には、テストイベントに下記のようなデータを入れてテストを走らせます。(本来の用途とは異なるけどお手軽なのでやりがち)

※descriptionを空にした場合、「Lambda create. id:{instance_id}」という定型文が入るようにしています。

{
  "targets": [
    {
      "auto_scaling_group_name": "example-group-name",
      "description": ""
    },
    {
      "auto_scaling_group_name": "awesome-group-name",
      "description": "The bug was gone with a great fix."
    }
  ]
}

これにより、デプロイ後に行う作業が
 Auto Scalingグループの名前と説明文を書き換える
  ↓
 テストを走らせる
だけになりました(エラーが無ければ)。

手動で3ステップの作業を行う手間が省け、作業ミスが入り込む余地が削減でき、詳細なノウハウが無くても作業が実施可能になってしまいました。すごい。

cronによる定期更新

デプロイ後に手動実行するのではなくて、定期的にバックアップを取りつつ、起動設定の更新をしてAuto Scalingに対応したいというニーズもあるかもしれません。WordPressのサイトをEC2で動かしているとかね。

Lambdaにはcronで定期実行する仕組みが組み込まれています。天才。

Lambda > 関数 > トリガーを追加 > トリガーの設定 でEventBridgeを使います。


cronの時刻がUTCなこと、見落としがち

プログラムの方は現在テストイベントから引数(対象のAuto Scalingグループ名)を受け取っていますが、こちらをコード内で決め打ちにしてしまえばOKです。非常に簡単。

targets = event['targets']

targets = [
    {
        "auto_scaling_group_name": "example-group-name",
        "description": "Daily buckup."
    }
]

CodePipelineから呼び出す場合

デプロイ後にPipelineから自動で呼び出してやって欲しいことの方が多そうですね。こちらも簡単ですが、コードの作りが多少変わります。

だいぶ大雑把ですが……下記のような変更があります。

  • main_process()をtry-exceptでくくり、Pipelineに成功/失敗を返すような処理にします。
    • 下記の書き方の場合、main_process()内のtry-exceptでは例外発生時にエラーをraiseし、lambda_handler()で受け取れるようにする必要があります。
  • Pipelineにレスポンスを返すため、PipelineのJob IDなどの情報もあわせて受け取っています。
  • descriptionには、例えばデプロイしたGitHubのブランチ名などを含めると良いかもしれません。
def lambda_handler(event, context):
    codepipeline = boto3.client('codepipeline')
    try:
        # Get CodePipeline user params.
        job_data = event['CodePipeline.job']['data']
        params = get_user_params(job_data)
        auto_scaling_group_name = params['auto_scaling_group_name']
        github_branch = params['branch']
        print(auto_scaling_group_name)
        print(github_branch)

        description = f'Lambda create. branch:{github_branch}'

        # update Launch Configuration and Auto Scaling Group.
        main_process(auto_scaling_group_name, description)
        
        # Return Success to CodePipeline.
        codepipeline.put_job_success_result(
                jobId=event['CodePipeline.job']['id'])
    except ClientError as e:
        print(e)
        # Return Failure to CodePipeline.
        codepipeline.put_job_failure_result(
                jobId=event['CodePipeline.job']['id'],
                failureDetails={
                    'type': 'JobFailed',
                    'message': str(e)
                }
            )

Pipelineから受け取るパラメータを処理する get_user_params() は下記のような実装です。
PipelineのLambda関数を呼び出すステージで、下記のように値を指定します。
UserParameters: {"branch":"master","auto_scaling_group_name":"example-group-name"}

import json


def get_user_params(job_data):
    try:
        user_parameters = job_data['actionConfiguration']['configuration']['UserParameters']
        decoded_parameters = json.loads(user_parameters)
    except Exception as e:
        raise Exception('UserParameters could not be decoded as JSON')
    
    if 'branch' not in decoded_parameters:
        raise Exception('UserParameters JSON must include the "branch"')
    
    if 'instance_name' not in decoded_parameters:
        raise Exception('UserParameters JSON must include the "instance_name"')

    return decoded_parameters

Lambda関数には、CodePipelineに結果を返す権限も付与する必要があります。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "codepipeline:PutJobSuccessResult",
                "codepipeline:PutJobFailureResult"
            ],
            "Resource": "*"
        }
    ]
}

これで、Pipelineの進行にあわせて「デプロイ後に自動で”AMI取得→起動設定作成→Auto Scalingグループ更新”」ができるようになりました。

参考

boto3のAPIリファレンス

CodePipeline LambdaからPHPUnitを実行する

CodePipelineによるデプロイの流れでPHPUnitを走らせる際には、CodeBuildの段階でテストを実行するのがセオリーなのだと思います。

しかし諸般の事情でそういうわけにもいかず、デプロイ後にデプロイ先の環境でPHPUnitを走らせたいようなケースがあったりなかったりもすると思うんです。

どうにかしてみましょう。

具体的には、
「CodePipelineのLambda実行ステージでPHPUnitを実行」
 ↓
「PHPunitの実行結果に応じて、Pipelineの処理を進行/停止させる」
というお話が今回の記事となります。

PHP コード内で PHPUnit を呼び出す(実行する)という記事を以前書いたのですが、伏線回収というわけですね。

1.Lambda関数の作成

CodePipelineに設置したLambda実行ステージにて呼び出す関数を作成します。

Lambda実行ステージの設置と、Lambda関数作成自体の手順は割愛。

テスト環境と本番環境で接続先のURLを切り替えるため、CodePipelineのステージで、UserParameterとして「staging」や「production」などの文字を指定するものとします。

import boto3
import urllib.request


def lambda_handler(event, context):
    codepipeline = boto3.client('codepipeline')
    try:
        # Get CodePipeline user param.
        environment = event['CodePipeline.job']['data']['actionConfiguration']['configuration']['UserParameters']
        print('environment:', environment)
        
        # Set url
        if environment == 'staging':
            url = 'テスト環境の接続先URL'
        elif environment == 'production':
            url = '本番環境の接続先URL'
        print('url:', url)

        # Run PHPUnit.
        response = urllib.request.urlopen(url).read().decode("utf-8")
        if 'OK' in response:
            # Return Success to CodePipeline.
            codepipeline = boto3.client('codepipeline')
            codepipeline.put_job_success_result(
                    jobId = event['CodePipeline.job']['id'],
                    executionDetails={
                        'summary': response,
                    }
                )
        else:
            # Return Failure to CodePipeline.
            codepipeline.put_job_failure_result(
                    jobId = event['CodePipeline.job']['id'],
                    failureDetails={
                        'type': 'JobFailed',
                        'message': response
                    }
                )
    except Exception as e:
        print(e)
        # Return Failure to CodePipeline.
        codepipeline.put_job_failure_result(
                jobId = event['CodePipeline.job']['id'],
                failureDetails={
                    'type': 'JobFailed',
                    'message': str(e)
                }
            )
        
    return

デフォルトのLog書き込み権限に加え、下記の権限を与えます。

{
"Effect": "Allow",
"Action": [
"codepipeline:PutJobFailureResult",
"codepipeline:PutJobSuccessResult"
],
"Resource": "*"
}

この関数では、
response = urllib.request.urlopen(url).read().decode("utf-8")
の箇所でPHPUnitの実行スクリプトにアクセスし、実行結果を取得しています。

その後、PHPUnitの実行結果に応じて、Pipelineの処理を進行/停止させています。
▶正常だった場合には、Pipelineに「成功」を返して処理を進めます。
▶問題があった場合には、Pipelineに「失敗」を返して進行を止めます。

2. Lambdaから呼び出すPHPUnit実行スクリプトを用意

Lambda関数へのレスポンスは、
Time: 231 ms, Memory: 14.00MB OK (1 test, 1 assertion)
などの最後の結果部分のみです。
(PHP側もLambda側も、雑に「OK」という文字の有無で成否を判定する手抜き仕様ですがご愛嬌)

実行結果の全文ログをS3に保存するため、実行関数はshell_exec()としました。

<?php

$path_to_phpunit = '/path/to/phpunit';
$path_to_tests = '/path/to/tests';

chdir($path_to_tests);

$result = shell_exec("$path_to_phpunit $path_to_tests 2>&1");
// 適宜必要な出力部分を切り出し
$sub1 = mb_substr($result, 0, mb_strpos($result, 'phpunit.xml') + 13);
$sub2 = $sub1 . mb_substr($result, mb_strpos($result, 'Time: '));
$echo_result = mb_substr($sub2, mb_strpos($sub2, 'Tests: '));

print($echo_result);    // Lambdaへのレスポンス

if (strpos($echo_result, 'OK') === false) {
    // phpunit実行ログの全文はS3に保存
    $s3_upload_results = phpunit_log_upload_to_s3($result);
    
    if ($s3_upload_results['result'] == 'SUCCESS') {
        print($s3_upload_results['upload_name']);
    } else {
        // S3へのアップロードに失敗した場合、実行結果はSlack等にPOSTしてケア
        ...
    }
}


function phpunit_log_upload_to_s3($log_body) {
    // phpunit実行ログの全文をS3に保存
    ...
}

3. 動作確認

これらの関数は、下記のように動作します。

  • 正常時
    • Pipelineのステージは正常に終了
  • 異常時
    • 実行結果の全文ログはS3に保存
      • (S3保存に失敗した場合は実行結果をSlackへPOST)
    • Pipelineのステージはエラーでストップ
    • 実行情報に、簡単な実行結果一文と、S3ログへのURLを記載

以上の仕組みにより、CodePipelineの任意のステージでPHPUnitを呼び出し、実行結果によって進行/停止させることができました。