Lambda 不要なAMIを定期削除する

開発中、テスト用の環境にデプロイするたびにAMIを自動取得していたら、不要なAMIが山のようになってしまったので掃除をする。

以前作成したこちらの記事のLambdaを元に、不要なAMIを定期的に削除するようにしてみます。
Lambda 不要なAMIとスナップショットをまとめて削除する

従来の方法(手動実行用)

元のLambda関数では下記のように、削除対象のAMIを手打ちして、eventから取得していました。

{
    "delete_amis": [
        "ami-0123456789abcdef1",
        "ami-0123456789abcdef2",
        "ami-0123456789abcdef3"
    ]
}
def lambda_handler(event, context):
    delete_amis = event['delete_amis']
    print(delete_amis)

    for delete_ami in delete_amis:
        # Delete AMI.
        unregister_ami(delete_ami)
    
        # Delete Snapshots.
        delete_related_snapshots(delete_ami)
    
    return

今回の方法(自動実行用)

  1. 現在所有しているAMIの一覧から、適当なフィルターをかけつつ一部を取得します。
  2. AMIの作成日部分を取得して、一週間以上古いものというざっくり基準で削除対象のリストに放り込みます。
    (リスト内包表記となぜか実行時間が変わらなかったので、可読性がヤバいことになるのを避けるためにappendで書いています)
  3. 削除対象となったAMI-IDのリストを元に、以前作成したAMI+Snapshotの削除処理を実行します。
def get_old_ami():
    # Describe target AMIs.
    try:
        response = ec2.describe_images(
            Filters=[{'Name': 'name', 'Values': ['delete-sample*']}]
        )
        if response.get('ResponseMetadata', {}).get('HTTPStatusCode', -1) != 200:
            raise('# Cannot describe images!')
        images = response.get('Images')
        print('# All delete-sample images:', len(images))
    except ClientError as e:
        print(e.response['Error']['Code'])
        print(e.response['Error']['Message'])
        logging.error("# Describe images error: %s", e)
        return
    
    seven_days = 604800
    old_images = []
    epoc_now = int(time.time())
    print('# epoc now:', epoc_now)
    
    # Those older than 7 days are subject to deletion.
    for image in images:
        creation_date = image.get('CreationDate')
        datetime = dt.strptime(creation_date, "%Y-%m-%dT%H:%M:%S.000Z")
        epoc_datetime = int(dt.timestamp(datetime))

        if epoc_now - epoc_datetime > seven_days:
            old_images.append(image.get('ImageId'))

    return old_images
def lambda_handler(event, context):
    # Get the AMI to be deleted.
    delete_amis = get_old_ami()
    print('# Target AMIs:', len(delete_amis))
    print(delete_amis)

    for delete_ami in delete_amis:
        # Delete AMI.
        unregister_ami(delete_ami)
    
        # Delete Snapshots.
        delete_related_snapshots(delete_ami)
    
    return

定期実行

トリガーにEventBridgeを指定し、cron式でスケジュールを設定しました。
JSTで「火~土 AM00:00」に実行したい場合、下記のように指定します。

cron(0 15 ? * MON-FRI *)

Amazon Aurora 読み込み整合性とかエンドポイントとか取り留めのないメモを

AWSの優れたDBエンジンの一つである、あの「Amazon Aurora」について、読み取り時のデータ整合性のことが少し気になったらしく、ちょっと調べてみたりしたんですって。

内容には誤りがあるかもしれません。お気付きの場合にはコメント等でご指摘いただけますと幸いです。

読み込み整合性

いきなりAuroraではないのですが、読み込み整合性に関して「Amazon DynamoDB」というDBのお話を少し。

DynamoDBはAWSにて提供されている、NoSQLのDBサービスです。DynamoDBでは、データの読み込みオペレーションとして、2種類が用意されています。

結果整合性のある読み込み(Eventually Consistent Reads)

DynamoDB テーブルからの読み込みオペレーションの応答には、最近の書き込みオペレーションの結果が反映されていないことがあります。応答には古いデータが含まれる場合があります。少し時間がたってから読み込みリクエストを繰り返すと、応答で最新のデータが返されます。


読み込み整合性 – Amazon DynamoDB

読み込みリクエストの直前に書き込みが行われていた場合などに、直前の書き込みが反映されていないデータをレスポンスとして返す可能性があります。

複数ストレージでレプリケーションが行われるような状況では、実際に全てのストレージへの書き込み完了を待つことなく、特定の基準を満たせば書き込みオペレーションが完了したと見なして処理を進めることがあります。
DynamoDBでも同様の仕様ということでしょうか。レスポンス速そう。

強力な整合性のある読み込み(Strongly Consistent Reads)

強力な整合性のある読み込みをリクエストすると、DynamoDB は成功した以前のすべての書き込みオペレーションからの更新が反映された最新データの応答を返します 。ただし、この整合性には以下のような欠点があります。

・強力な整合性のある読み込みは、ネットワークの遅延または停止があった場合には利用できなくなる可能性があります。この場合、DynamoDB はサーバーエラー (HTTP 500) を返す場合があります。

・強力な整合性のある読み込みでは、結果整合性のある読み込みよりもレイテンシーが高くなる場合があります。

・グローバルセカンダリインデックス (GSI) では、強力な整合性のある読み込みはサポートされていません。

・強力な整合性のある読み込みでは、結果整合性のある読み込みよりも多くのスループット容量が使用されます。詳細については、読み込み/書き込みキャパシティーモード を参照してください。

読み込み整合性 – Amazon DynamoDB

「結果整合性のある読み込み」とは異なり、直前に行われた書き込みの内容が反映された、時間軸で見て整合性のあるレスポンスを返す方式です。堅い感じ。

「結果整合性のある読み込み」と「強力な整合性のある読み込み」はユースケースに応じて使い分けるのが良さそう、という定型文を置いておきます。

AuroraDBクラスターのエンドポイント

やっとAuroraのエンドポイントの話になります。
そも、Auroraは下記のようなエンドポイントを持ちます。[4]

クラスターエンドポイント

AuroraDBクラスターの「書き込みエンドポイント」を指します。

例)mydbcluster.cluster-123456789012.us-east-1.rds.amazonaws.com:3306

リーダーエンドポイント

AuroraDBクラスターの「読み取りエンドポイント」を指します。
クラスターに1つ以上のAuroraレプリカが含まれる場合、ラウンドロビンDNSによって自動で負荷分散が行われます。

例)mydbcluster.cluster-ro-123456789012.us-east-1.rds.amazonaws.com:3306

カスタムエンドポイント

ユーザーが作成可能なエンドポイントです。AuroraDBクラスター内のいくつかのDBインスタンスのみを選択して、負荷分散を行うといったことができるようです。

例)myendpoint.cluster-custom-123456789012.us-east-1.rds.amazonaws.com:3306

インスタンスエンドポイント

AuroraDBクラスター内のそれぞれのインスタンスが個別に持つエンドポイントです。通常の負荷分散以上にきめ細かいロードバランシングを行いたいような場合や、特に個別のインスタンスに接続したいような場合の利用が想定されます。

例)mydbinstance.123456789012.us-east-1.rds.amazonaws.com:3306

Q. Auroraの読み込み整合性ってどうなっているの?

  • クラスターエンドポイントを利用する場合、書き込みと読み込みを1つのエンドポイントで行う事になります。
    • この場合、レプリカラグを考慮する必要がなく、Auroraの書き込み/読み取り成功の判定条件(Quorum原理)によれば、強い整合性が保たれると考えられます。
    • 読み書きの成否判定について、書き込み成功の基準は「過半数」、読み取り成功の基準は「半数以上」が設定されているため、理論上は古いデータが返ることはなさそうに見えます。
    • また、Auroraは分散したストレージを単一の論理ボリュームとして認識する構成であるため、やはり構築上古いデータが返ることはない(ようにできている)と考えて良さそうです。[6] [7]
  • リーダーエンドポイント含むその他のエンドポイントを利用する場合には、レプリカラグが影響してきます。
    • 書き込みから読み取りの間隔を超えるラグが生じていた場合、書き込み完了前のデータを読み取ってしまう可能性があると考えられます。
    • DBの変更頻度にもよりますが、レプリカラグは通常100ミリ秒未満です。[6]
  • マルチマスタークラスター構成の場合には、設定で強い整合性を要求することができるようですが、パフォーマンスとトレードオフになるようです。[5]
    • 詳細は割愛。

読み込み整合性について明記した資料が見当たらないなと思ったのですが、読み書きのアーキテクチャ面で理論的に強い整合性が保たれているため、整合性という文脈での説明が不要なのかもしれません。すごい。

Q. AuroraDBクラスターにプライマリしか置いてない場合、エンドポイントはどうなるの?

  • Auroraクラスターにプライマリインスタンスしか含まれていない場合でも、リーダーエンドポイントは作成されます。
    • AuroraDBクラスターに含まれるインスタンスがプライマリインスタンス1台のみである(=レプリカが含まれていない)場合、リーダーエンドポイントに接続した場合でも、プライマリインスタンスに接続されます。
      • この場合、リーダーエンドポイントを介して書き込みオペレーションが実行できます。[4]

書き込み権限は接続したエンドポイントではなく、接続先のDBインスタンスによって判断されるという仕様のようです。
まあでも権限の割り当てはインスタンスごとだし、プライマリに接続される以上はそりゃそうか?なるほど。

参考

WordPress Salient パンくずリストの導入

パンくずリスト、好きですか?

ウェブサイトによくある
「TOP > すごいカテゴリー > イケてる記事」
みたいな階層構造を示しているあれです。便利ですね。

僕は特に好きでも嫌いでもないです。

人が見て記事のカテゴリーなどがわかりやすいだけでなく、検索エンジンがサイトをクローリングするときにも都合がよくてSEO的にも良いらしいです。

WordPressサイトにパンくずリストを入れたときの業務メモみたいなものを記事にしました。

プラグイン選定

パンくずリストを実現するプラグインは、「breadcrumb」で検索すると無数に出てきます。

今回の扱うWordPressサイトは、テーマに「Salient」を使っていたのですが、導入してもテーマとの相性が悪く動かない(表示されない)プラグインなどもありました。

プラグインの設定画面ぽちぽちするだけで使えそうな良さげなやつとして、「Yoast SEO」というものを見つけました。

懸念があったので恐る恐るインストール→有効化したのですが、案の定「All In One SEO」と競合している旨の通知がぴょこぴょこと出てきました。
ファイルの編集を行うことなく自動的にパンくずリストを出してくれるため、大変便利なプラグインでしたが、SEO関連で意図しない誤動作や面倒な副作用が発生すると困るため、やむなく不採用となりました。

仕方がないので、人気の高い別のプラグインである「Breadcrumb NavXT」を使うことにしました。

後述する理由から、ショートコードやスニペットを埋め込む形式はメンテナンス面でかなり面倒な気配がするので避けたかったのですが……仕方ないということにします。

プラグインの設定

「Breadcrumb NavXT」は、WordPressのheader.phpにスニペットを追記して使うものでしたが、記載しても画面内に良い感じにパンくずリストは出ませんでした。

Yoastの設定解説を見ると、single.phpやpage.phpにスニペットを書き込むことでパンくずリストを表示できそうなことが書いてあります。
(不採用のプラグインでもリファレンスは活用していく)

パンくずの区切り文字を変更

デフォルトは「 > 」(全角で書いていますが実際は半角です)でしたが、間隔を自由に調整したいので下記のように変更しました。

 > 
<span style="margin: 0px 7px 0px 7px">></span>

テンプレートの編集

デフォルトではサイトタイトルなどが出たり、マウスオーバーでツールチップが表示されたりするので調整。

  • ホームページテンプレート
    • %htitle%TOP に表示を変更
    • ツールチップの「Go to ~~」が不要だったのでtitle要素を削除
  • 他のテンプレート
    • ツールチップの「Go to ~~」が不要だったのでtitle要素を削除
  • カテゴリー・タグのテンプレート
    • ツールチップの文言を適当な日本語に調整

テンプレートの変更内容詳細は記事の最後に記載しました。長いので。

テーマのファイルを編集

Salient: single.php

「投稿」用です。

nectar_hook_before_content(); の前にスニペットを追記しました。

(略)
<div class="row">

	<?php

	nectar_hook_before_content();
(略)
(略)
<div class="row">

	<p class="breadcrumbs" typeof="BreadcrumbList" vocab="https://schema.org/">
	    <?php if(function_exists('bcn_display')) { bcn_display(); }?>
	</p>

	<?php

	nectar_hook_before_content();
(略)

Salient: page.php

「固定ページ」用です。

nectar_hook_before_content(); の前にスニペットを追記しました。
TOPではパンくずリスト部分を出したくないため、スニペットに少し手を加えました。

(略)
<div class="row">

	<?php

	nectar_hook_before_content();
(略)
(略)
<div class="row">
	
	<?php

	if(!is_front_page()) {
		echo '<p class="breadcrumbs" typeof="BreadcrumbList" vocab="https://schema.org/" style="padding-top:20px">'; 
	    if(function_exists('bcn_display')) { bcn_display(); }
		echo '</p>'; 
	}

	nectar_hook_before_content();
(略)

こんな感じでとりあえず「投稿」と「固定ページ」にパンくずリストを表示できました。

single.phpやpage.phpの編集ではうまくいかない場合

表示位置が意図した場所にならず困ってしまう場合があります。
今回は固定ページで発生したので、解決法も記載しておきます。

メンテコストとかの話でまた後々面倒くさくなりそうですが、下記の記事を参考に対処しました。
参考: パンくずリストプラグイン【Breadcrumb NavXT】をショートコードで呼び出す方法 | WordPressカスタマイズ事例【100ウェブ】

Salient: functions.php

末尾に下記のように追記しました。

/**
 * For Breadcrumb NavXT. (2021/06/24 niwa)
 */
if (!function_exists('display_breadcrumb')) {
    function display_breadcrumb() {
        $html = '<div class="breadcrumbs">'. bcn_display(true). '</div>';
        return $html;
    }
    add_shortcode('display_breadcrumb', 'display_breadcrumb');
}

各固定ページ

上記の変更により、[display_breadcrumb]というショートコードでパンくずリストの表示ができるようになりました。
固定ページを編集して、テキストブロックを追加してショートコードを記述。任意の位置に設置するだけでパンくずリストが表示されます。

WPBakery Page Builderだとこんな感じ

ゴリ押し感~~

🧍‍♀️今回はここまで🧍‍♀️

付録: Breadcrumb NavXT 設定 – テンプレートの変更内容詳細

HTML特殊記号の類(>とか’とか)をエスケープして記載しているのですが、普通に記号に変換されて表示されている様子……

一般 – ホームページテンプレート

<span property="itemListElement" typeof="ListItem"><a property="item" typeof="WebPage" title="Go to %title%." href="%link%" class="%type%" bcn-aria-current><span property="name">%htitle%</span></a><meta property="position" content="%position%"></span>
<span property="itemListElement" typeof="ListItem"><a property="item" typeof="WebPage" href="%link%" class="%type%" bcn-aria-current><span property="name">TOP</span></a><meta property="position" content="%position%"></span>

投稿タイプ – 投稿テンプレート, 固定ページ ページテンプレート, メディアテンプレート

※3項目とも同じ内容

<span property="itemListElement" typeof="ListItem"><a property="item" typeof="WebPage" title="Go to %title%." href="%link%" class="%type%" bcn-aria-current><span property="name">%htitle%</span></a><meta property="position" content="%position%"></span>
<span property="itemListElement" typeof="ListItem"><a property="item" typeof="WebPage" href="%link%" class="%type%" bcn-aria-current><span property="name">%htitle%</span></a><meta property="position" content="%position%"></span>

タクソノミー – カテゴリーテンプレート

<span property="itemListElement" typeof="ListItem"><a property="item" typeof="WebPage" title="Go to the %title% category archives." href="%link%" class="%type%" bcn-aria-current><span property="name">%htitle%</span></a><meta property="position" content="%position%"></span>
<span property="itemListElement" typeof="ListItem"><a property="item" typeof="WebPage" title="カテゴリー: %title%" href="%link%" class="%type%" bcn-aria-current><span property="name">%htitle%</span></a><meta property="position" content="%position%"></span>

タクソノミー – タグテンプレート

<span property="itemListElement" typeof="ListItem"><a property="item" typeof="WebPage" title="Go to the %title% tag archives." href="%link%" class="%type%" bcn-aria-current><span property="name">%htitle%</span></a><meta property="position" content="%position%"></span>
<span property="itemListElement" typeof="ListItem"><a property="item" typeof="WebPage" title="タグ: %title%" href="%link%" class="%type%" bcn-aria-current><span property="name">%htitle%</span></a><meta property="position" content="%position%"></span>

タクソノミー – 投稿フォーマットテンプレート

<span property="itemListElement" typeof="ListItem"><a property="item" typeof="WebPage" title="Go to the %title% archives." href="%link%" class="%type%" bcn-aria-current><span property="name">%htitle%</span></a><meta property="position" content="%position%"></span>
<span property="itemListElement" typeof="ListItem"><a property="item" typeof="WebPage" title="投稿: %title%" href="%link%" class="%type%" bcn-aria-current><span property="name">%htitle%</span></a><meta property="position" content="%position%"></span>

その他 – 投稿者テンプレート

<span property="itemListElement" typeof="ListItem"><span property="name"><a title="Go to the first page of posts by %title%" href="%link%" class="%type%" bcn-aria-current>%htitle%</a> の記事</span><meta property="position" content="%position%"></span>
<span property="itemListElement" typeof="ListItem"><span property="name"><a title="%title% の記事" href="%link%" class="%type%" bcn-aria-current>%htitle%</a> の記事</span><meta property="position" content="%position%"></span>

その他 – 日付テンプレート

<span property="itemListElement" typeof="ListItem"><a property="item" typeof="WebPage" title="Go to the %title% archives." href="%link%" class="%type%" bcn-aria-current><span property="name">%htitle%</span></a><meta property="position" content="%position%"></span>
<span property="itemListElement" typeof="ListItem"><a property="item" typeof="WebPage" title="%title%" href="%link%" class="%type%" bcn-aria-current><span property="name">%htitle%</span></a><meta property="position" content="%position%"></span>

その他 – 検索テンプレート

<span property="itemListElement" typeof="ListItem"><span property="name">Search results for '<a property="item" typeof="WebPage" title="Go to the first page of search results for %title%." href="%link%" class="%type%" bcn-aria-current>%htitle%</a>'</span><meta property="position" content="%position%"></span>
<span property="itemListElement" typeof="ListItem"><span property="name">'<a property="item" typeof="WebPage" title="%title% の検索結果" href="%link%" class="%type%" bcn-aria-current>%htitle%</a>' の検索結果</span><meta property="position" content="%position%"></span>

AWS SESの送信クォータを監視する

AWSのSESには、送信クォータや送信レートという、アカウントごとに一定期間に送信できるメール数の上限が定められています。

送信クォータ
24時間のうちに何通のメールを送信できるか。上限を超えた場合には、送信クォータの上限に達した旨のエラーとなりメールが送信されなくなる(再送処理等はない)。

送信レート
SESが1秒間に受け入れ可能なメールの最大数です。この値を瞬間的に超えることは問題ありませんが、制限を超えた状態が長時間続く場合、送信クォータ同様メールの送信ができなくなる恐れがあります。

送信クォータの上限に達しているかどうかは、CloudWatchアラームなどでリアルタイムに監視するメトリクスが用意されておらず、SESのコンソールやAPIを通じて把握する必要があります。若干の面倒臭さが漂ってきますね。

送信レートの監視をする場合には秒間単位での監視になることや、上限を超えても直ちにペナルティが無いなどのはっきりとしない状況から、より明確なラインで確実な配信不能が訪れる送信クォータの監視をするのが良さそうです。

今回は、送信クォータの60%を消費した場合に、Slackに通知が来るようなLambda関数を紹介します。
※このLambda関数は、送信数に問題がない間は実行ログを吐き出す以外沈黙します。

Lambda関数

大まかな処理の流れは下記のとおりです。

  1. 送信クォータのステータスを取得
  2. 直近24時間の送信数 / 24時間での最大送信数 * 100 [%] を算出
  3. 60%以上消費していた場合、Slackへ通知(60%未満なら何もしない)
    1. 投稿メッセージを作成
    2. WebhookでSlackへPOST

Slackへの投稿はなるべく簡素にし、現在はメタデータなども含めないように(コメントアウト箇所)しています。

import boto3
import json
import urllib.request


def build_message_block(response, send_quota_usage):
    blocks = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": "SES Send quota use " + str(int(send_quota_usage)) + "%"
            }
        },
        {
            "type": "divider"
        },
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*SentLast24Hours (usage):* " + str(f"{int(response['SentLast24Hours']):,}")
                }
            ]
        },
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*Max24HourSend (limt):*   " + str(f"{int(response['Max24HourSend']):,}")
                }
            ]
        },
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*MaxSendRate:* " + str(f"{int(response['MaxSendRate']):,}")
                }
            ]
        }
    ]
    
    # if response['ResponseMetadata']:
    #     meta_data_json = json.dumps(response['ResponseMetadata'])
    #     blocks += [
    #         {
    #             "type": "section",
    #             "text": {
    #                 "type": "mrkdwn",
    #                 "text": "*ResponseMetadata:*\n```" + meta_data_json + "```"
    #             }
    #         }
    #     ]
    
    return blocks


def post_slack(blocks, message=None):
    send_data = {
        "channel": "XXXXXXXXXXX",   # SESの通知用channel
        "username": "SES Monitor",
        "icon_emoji": ":chart_with_upwards_trend:",
        "text": message,
        "blocks": blocks
    }
    send_text = "payload=" + json.dumps(send_data)
    
    print('post_slack() send_text: ', send_text)
    
    request = urllib.request.Request(
        "(SlackのWebhook URL)",
        data=send_text.encode('utf-8'),
        method="POST"
    )
    
    with urllib.request.urlopen(request) as response:
        response_body = response.read().decode('utf-8')


def get_ses_send_quota():
    ses = boto3.client('ses')
    response = ses.get_send_quota()
    send_quota_usage = response['SentLast24Hours'] / response['Max24HourSend'] * 100
    
    if send_quota_usage >= 60:
        message_body = build_message_block(response, send_quota_usage)
        notice_message = 'SES send quota usage: ' + str(send_quota_usage) + '%'
        print(message_body)
        print(notice_message)
        post_slack(message_body, notice_message)


def lambda_handler(event, context):
    get_ses_send_quota()
    
    return

Slackへの投稿内容をBlockで作成している都合上、どうしてもその箇所が長大になってしまっています……

Lambda関数に付与する権限

デフォルトのCloudWatchログ関係の他に、「AmazonSESReadOnlyAccess」を雑に付与してありました。もうちょっと精査しても良かったと思う。

実行方法

Lambda関数のトリガーに「EventBridge (CloudWatch Events)」を指定し、cronで定期実行しています。

送信されるメッセージの例

しきい値を下げてテストしたものがこちら。
※緩和申請済みのため送信クォータの上限が高いです。

これで、知らぬ間に送信クォータを使い果たしてメール配信不能に陥るような事故をケアすることができるようになりました。
緩和申請が間に合えば配信不能には陥りませんし、送信クォータの利用率がわからない状態で日々運用するよりも幾分精神衛生に良いと思います。

AWS SESのバウンス発生をSlackに通知する

こいついつもLambdaで何かやってるな。

AWSのSES(Simple Email Service)でバウンスや苦情が発生した際、それをSlackに通知することで、迅速な発生の検知と社内共有ができるようにしてみました。

「SESバウンス発生→SNSトピック呼び出し→SNSトピックからLambda関数を呼び出し」という流れでLambda関数に処理を渡しています。

Lambda関数

大まかな処理の流れは下記のとおりで、処理内容自体は単純です。
苦情の場合は配信抑制リストからの削除処理がありません。

  1. SNSトピックからLambdaが呼び出される。
  2. SNSトピックからのメッセージ内容を見て、バウンスか苦情かで処理を分岐する。
    1. バウンスの場合
      1. バウンス通知メッセージを作成する。
      2. 配信抑制リストから削除するアドレスの場合、削除する。
      3. 削除した場合、削除通知メッセージを通知に追加する。
      4. Slack通知を行うアドレスの場合、SlackへPOSTする。
    2. 苦情の場合
      1. 苦情通知メッセージを作成する。
      2. Slack通知を行うアドレスの場合、SlackへPOSTする。

SlackにPOSTするコンテンツをblock構造で記述しているせいで、非常にコードが長くなってしまっていまいました。見づらくて申し訳ありません。

import json
from datetime import date, datetime
import urllib.request

import boto3


def json_serial(obj):
    # If it is a date type, convert it to a character string.
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    # Other than the above, it is not supported.
    raise TypeError ("Type %s not serializable" % type(obj))


def delete_suppressed_destination(mail):
    client = boto3.client('sesv2')

    suppressed = client.get_suppressed_destination(EmailAddress=mail)
    del_json = json.dumps(suppressed, default=json_serial)
    message = '\n'.join([
        '*Delete suppressed destination json*: `' + mail + '`',
        '```' + del_json + '```'
        ])

    try:
        client.delete_suppressed_destination(EmailAddress=mail)
    except client.exceptions.NotFoundException:
        post_slack('*NotFoundException*: `' + mail + '`\n' + message)
        return
    except client.exceptions.BadRequestException:
        post_slack('*BadRequestException*: `' + mail + '`\n' + message)
        return
    except client.exceptions.TooManyRequestsException:
        post_slack('*TooManyRequestsException*: `' + mail + '`\n' + message)
        return

    try:
        suppressed = client.get_suppressed_destination(EmailAddress=mail)
    except client.exceptions.NotFoundException:
        del_response = '*Deleted!*'
    except client.exceptions.BadRequestException:
        del_response = '*BadRequestException*'
    except client.exceptions.TooManyRequestsException:
        del_response = '*TooManyRequestsException*'

    return (del_response, del_json)


def is_no_suppress_mail(mail):
    if 'no-suppress' in mail or mail.endswith('@no.suppress.domain.jp'):
        return True
    return False
    
    
def is_ignore_mail(mail):
    ignore_list = (
        'ignore@example.com',
        )
    if mail in ignore_list or mail.endswith('@ignore.domain.jp'):
        return True
    return False


# Bounce.
def build_blocks_bounce(bounced_recipient, message_json):

    mail = bounced_recipient['emailAddress']

    diagnosticCode = bounced_recipient['diagnosticCode']
    bounce_status = '\n'.join((
        'action: ' + bounced_recipient['action'],
        'status: ' + bounced_recipient['status'],
        'diagnostic_code: ' + diagnosticCode,
        ))

    bounce_json = json.dumps(message_json)

    blocks = [
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*Mail: *"
                },
                {
                    "type": "mrkdwn",
                    "text": "`" + mail + "`"
                }
            ]
        },
		{
			"type": "section",
			"text": {
				"type": "mrkdwn",
				"text": "*Bounced information: *\n```" + bounce_status + "```"
			}
		},
		{
			"type": "section",
			"text": {
				"type": "mrkdwn",
				"text": "*Bounced message json: *\n```" + bounce_json + "```"
			}
		}
    ]
    return blocks

def build_blocks_delete(mail, del_response, del_json):
    blocks = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": "Delete notification"
            }
        },
        {
            "type": "divider"
        },
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*Mail:*"
                },
                {
                    "type": "mrkdwn",
                    "text": "`" + mail + "`"
                }
            ]
        },
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*Response:*"
                },
                {
                    "type": "mrkdwn",
                    "text": del_response
                }
            ]
        },
		{
			"type": "section",
			"text": {
				"type": "mrkdwn",
				"text": "*Suppressed destination json: *\n```" + del_json + "```"
			}
		}
    ]
    return blocks

def prepare_bounced_message(message_json):
    bounced_message = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": "Bounce notification"
            }
        },
        {
            "type": "divider"
        },
		{
			"type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*Timestamp (UTC): *"
                },
                {
                    "type": "mrkdwn",
                    "text": "`" + message_json['bounce']['timestamp'] + "`"
                }
            ]
		}
    ]
    return bounced_message


# Complaint.
def build_blocks_complaint(complained_recipient, message_json):

    mail = complained_recipient['emailAddress']

    complaint_json = json.dumps(message_json)

    blocks = [
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*Mail: *"
                },
                {
                    "type": "mrkdwn",
                    "text": "`" + mail + "`"
                }
            ]
        },
		{
			"type": "section",
			"text": {
				"type": "mrkdwn",
				"text": "*Complaintd message json: *\n```" + complaint_json + "```"
			}
		}
    ]
    return blocks

def prepare_complained_message(message_json):
    complained_message = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": "Complaint notification"
            }
        },
        {
            "type": "divider"
        },
		{
			"type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*Timestamp (UTC): *"
                },
                {
                    "type": "mrkdwn",
                    "text": "`" + message_json['complaint']['timestamp'] + "`"
                }
            ]
		}
    ]
    return complained_message


def post_slack(blocks, message=None):
    send_data = {
        "channel": "XXXXXXXXXXX",   # SESの通知用Channel-ID
        "username": "SES Notice",
        "icon_emoji": ":mailbox_with_no_mail:",
        "text": message,
        "blocks": blocks
    }

    send_text = "payload=" + json.dumps(send_data)

    print('post_slack() send_text: ', send_text)

    request = urllib.request.Request(
        "(SlackのWebhook URL)",
        data=send_text.encode('utf-8'),
        method="POST"
    )

    with urllib.request.urlopen(request) as response:
        response_body = response.read().decode('utf-8')


def ses_process(event):
    message_json = json.loads(event['Records'][0]['Sns']['Message'])

    if message_json['notificationType'] == 'Bounce':
        bounced_recipients = message_json['bounce']['bouncedRecipients']
        bounced_mails = []
        delete_result = []
        notice_message = ""

        for bounced_recipient in bounced_recipients:
            bounced_mails += build_blocks_bounce(bounced_recipient, message_json)
            mail = bounced_recipient['emailAddress']
            if is_no_suppress_mail(mail) and not is_ignore_mail(mail):
                del_result, del_json = delete_suppressed_destination(mail)
                delete_result += build_blocks_delete(mail, del_result, del_json)
                notice_message = "DELETE: " + mail

        bounced_message = prepare_bounced_message(message_json)
        bounced_message = bounced_message + bounced_mails + delete_result

        if not notice_message:
            notice_message = "BOUNCE: " + mail
            
        if not is_ignore_mail(mail):
            post_slack(bounced_message, notice_message)

    elif message_json['notificationType'] == 'Complaint':
        complained_recipients = message_json['complaint']['complainedRecipients']
        complained_mails = []

        for complained_recipient in complained_recipients:
            complained_mails += build_blocks_complaint(complained_recipient, message_json)
            mail = complained_recipient['emailAddress']

        complained_message = prepare_complained_message(message_json)
        complained_message = complained_message + complained_mails
        notice_message = "COMPLAINT: " + mail
        
        if not is_ignore_mail(mail):
            post_slack(complained_message, notice_message)

    return


def lambda_handler(event, context):
    if event['Records'][0]['EventSource'] == 'aws:sns' :
        ses_process(event)

    return

例外処理が甘いですね。

権限については実装当時に精査していなかったようで、「AmazonSESFullAccess」を付与されていましたが、get_suppressed_destination()delete_suppressed_destination()をする権限くらいがあれば動きそうです。甘々ですね。

細かい処理についての補足……?

is_no_suppress_mail(mail)

SESではバウンスの発生時に、配信抑制リスト(suppression-list)というものにメールアドレスを登録し、メールのバウンスが再発することを防止する仕組みがあります。

この仕組みは便利なもので活用したいのですが、一時的な問題でバウンスが発生することが度々ある等、特定のアドレスだけは抑制リストに入らないでほしいようなケースがあるかもしれません。

こういったケースでは、抑制リストに登録された直後にリストから自動で登録解除することで、擬似的にリストに登録されないような状態にすることができます。(バウンスレートが急上昇するリスクもあるので、運用時は要注意)

この「自動で配信抑制リストから削除するアドレス」かを判定する関数が is_no_suppress_mail(mail) です。

is_ignore_mail(mail)

Slackへ通知したくないメールアドレス(既知のダミーアドレス等)もあるため、通知を送らないアドレスのリストを用意します。

Slack通知の有無を判定する際に用いる判定関数が is_ignore_mail(mail) です。

build_blocks_bounce(), build_blocks_delete(), prepare_bounced_message() 等

SNSトピックから受け取ったメッセージを元に、見やすく整形してSlackにPOSTするコンテンツを作成します。

実際の投稿の様子

コードの短さを犠牲に情報が見やすい投稿を実現しています。

バウンス通知(Bounce)
バウンス通知(配信抑制リストにあるもの)
バウンス通知+削除通知(Bounce+Delete)
苦情通知(Complaint)