こいついつもLambdaで何かやってるな。
AWSのSES(Simple Email Service)でバウンスや苦情が発生した際、それをSlackに通知することで、迅速な発生の検知と社内共有ができるようにしてみました。
「SESバウンス発生→SNSトピック呼び出し→SNSトピックからLambda関数を呼び出し」という流れでLambda関数に処理を渡しています。
Lambda関数
大まかな処理の流れは下記のとおりで、処理内容自体は単純です。
苦情の場合は配信抑制リストからの削除処理がありません。
- SNSトピックからLambdaが呼び出される。
- SNSトピックからのメッセージ内容を見て、バウンスか苦情かで処理を分岐する。
- バウンスの場合
- バウンス通知メッセージを作成する。
- 配信抑制リストから削除するアドレスの場合、削除する。
- 削除した場合、削除通知メッセージを通知に追加する。
- Slack通知を行うアドレスの場合、SlackへPOSTする。
- 苦情の場合
- 苦情通知メッセージを作成する。
- Slack通知を行うアドレスの場合、SlackへPOSTする。
- バウンスの場合
SlackにPOSTするコンテンツをblock構造で記述しているせいで、非常にコードが長くなってしまっていまいました。見づらくて申し訳ありません。
import json
from datetime import date, datetime
import urllib.request
import boto3
def json_serial(obj):
# If it is a date type, convert it to a character string.
if isinstance(obj, (datetime, date)):
return obj.isoformat()
# Other than the above, it is not supported.
raise TypeError ("Type %s not serializable" % type(obj))
def delete_suppressed_destination(mail):
client = boto3.client('sesv2')
suppressed = client.get_suppressed_destination(EmailAddress=mail)
del_json = json.dumps(suppressed, default=json_serial)
message = '\n'.join([
'*Delete suppressed destination json*: `' + mail + '`',
'```' + del_json + '```'
])
try:
client.delete_suppressed_destination(EmailAddress=mail)
except client.exceptions.NotFoundException:
post_slack('*NotFoundException*: `' + mail + '`\n' + message)
return
except client.exceptions.BadRequestException:
post_slack('*BadRequestException*: `' + mail + '`\n' + message)
return
except client.exceptions.TooManyRequestsException:
post_slack('*TooManyRequestsException*: `' + mail + '`\n' + message)
return
try:
suppressed = client.get_suppressed_destination(EmailAddress=mail)
except client.exceptions.NotFoundException:
del_response = '*Deleted!*'
except client.exceptions.BadRequestException:
del_response = '*BadRequestException*'
except client.exceptions.TooManyRequestsException:
del_response = '*TooManyRequestsException*'
return (del_response, del_json)
def is_no_suppress_mail(mail):
if 'no-suppress' in mail or mail.endswith('@no.suppress.domain.jp'):
return True
return False
def is_ignore_mail(mail):
ignore_list = (
'ignore@example.com',
)
if mail in ignore_list or mail.endswith('@ignore.domain.jp'):
return True
return False
# Bounce.
def build_blocks_bounce(bounced_recipient, message_json):
mail = bounced_recipient['emailAddress']
diagnosticCode = bounced_recipient['diagnosticCode']
bounce_status = '\n'.join((
'action: ' + bounced_recipient['action'],
'status: ' + bounced_recipient['status'],
'diagnostic_code: ' + diagnosticCode,
))
bounce_json = json.dumps(message_json)
blocks = [
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Mail: *"
},
{
"type": "mrkdwn",
"text": "`" + mail + "`"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Bounced information: *\n```" + bounce_status + "```"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Bounced message json: *\n```" + bounce_json + "```"
}
}
]
return blocks
def build_blocks_delete(mail, del_response, del_json):
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "Delete notification"
}
},
{
"type": "divider"
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Mail:*"
},
{
"type": "mrkdwn",
"text": "`" + mail + "`"
}
]
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Response:*"
},
{
"type": "mrkdwn",
"text": del_response
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Suppressed destination json: *\n```" + del_json + "```"
}
}
]
return blocks
def prepare_bounced_message(message_json):
bounced_message = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "Bounce notification"
}
},
{
"type": "divider"
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Timestamp (UTC): *"
},
{
"type": "mrkdwn",
"text": "`" + message_json['bounce']['timestamp'] + "`"
}
]
}
]
return bounced_message
# Complaint.
def build_blocks_complaint(complained_recipient, message_json):
mail = complained_recipient['emailAddress']
complaint_json = json.dumps(message_json)
blocks = [
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Mail: *"
},
{
"type": "mrkdwn",
"text": "`" + mail + "`"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Complaintd message json: *\n```" + complaint_json + "```"
}
}
]
return blocks
def prepare_complained_message(message_json):
complained_message = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "Complaint notification"
}
},
{
"type": "divider"
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Timestamp (UTC): *"
},
{
"type": "mrkdwn",
"text": "`" + message_json['complaint']['timestamp'] + "`"
}
]
}
]
return complained_message
def post_slack(blocks, message=None):
send_data = {
"channel": "XXXXXXXXXXX", # SESの通知用Channel-ID
"username": "SES Notice",
"icon_emoji": ":mailbox_with_no_mail:",
"text": message,
"blocks": blocks
}
send_text = "payload=" + json.dumps(send_data)
print('post_slack() send_text: ', send_text)
request = urllib.request.Request(
"(SlackのWebhook URL)",
data=send_text.encode('utf-8'),
method="POST"
)
with urllib.request.urlopen(request) as response:
response_body = response.read().decode('utf-8')
def ses_process(event):
message_json = json.loads(event['Records'][0]['Sns']['Message'])
if message_json['notificationType'] == 'Bounce':
bounced_recipients = message_json['bounce']['bouncedRecipients']
bounced_mails = []
delete_result = []
notice_message = ""
for bounced_recipient in bounced_recipients:
bounced_mails += build_blocks_bounce(bounced_recipient, message_json)
mail = bounced_recipient['emailAddress']
if is_no_suppress_mail(mail) and not is_ignore_mail(mail):
del_result, del_json = delete_suppressed_destination(mail)
delete_result += build_blocks_delete(mail, del_result, del_json)
notice_message = "DELETE: " + mail
bounced_message = prepare_bounced_message(message_json)
bounced_message = bounced_message + bounced_mails + delete_result
if not notice_message:
notice_message = "BOUNCE: " + mail
if not is_ignore_mail(mail):
post_slack(bounced_message, notice_message)
elif message_json['notificationType'] == 'Complaint':
complained_recipients = message_json['complaint']['complainedRecipients']
complained_mails = []
for complained_recipient in complained_recipients:
complained_mails += build_blocks_complaint(complained_recipient, message_json)
mail = complained_recipient['emailAddress']
complained_message = prepare_complained_message(message_json)
complained_message = complained_message + complained_mails
notice_message = "COMPLAINT: " + mail
if not is_ignore_mail(mail):
post_slack(complained_message, notice_message)
return
def lambda_handler(event, context):
if event['Records'][0]['EventSource'] == 'aws:sns' :
ses_process(event)
return
例外処理が甘いですね。
権限については実装当時に精査していなかったようで、「AmazonSESFullAccess」を付与されていましたが、get_suppressed_destination()
とdelete_suppressed_destination()
をする権限くらいがあれば動きそうです。甘々ですね。
細かい処理についての補足……?
is_no_suppress_mail(mail)
SESではバウンスの発生時に、配信抑制リスト(suppression-list)というものにメールアドレスを登録し、メールのバウンスが再発することを防止する仕組みがあります。
この仕組みは便利なもので活用したいのですが、一時的な問題でバウンスが発生することが度々ある等、特定のアドレスだけは抑制リストに入らないでほしいようなケースがあるかもしれません。
こういったケースでは、抑制リストに登録された直後にリストから自動で登録解除することで、擬似的にリストに登録されないような状態にすることができます。(バウンスレートが急上昇するリスクもあるので、運用時は要注意)
この「自動で配信抑制リストから削除するアドレス」かを判定する関数が is_no_suppress_mail(mail)
です。
is_ignore_mail(mail)
Slackへ通知したくないメールアドレス(既知のダミーアドレス等)もあるため、通知を送らないアドレスのリストを用意します。
Slack通知の有無を判定する際に用いる判定関数が is_ignore_mail(mail)
です。
build_blocks_bounce(), build_blocks_delete(), prepare_bounced_message() 等
SNSトピックから受け取ったメッセージを元に、見やすく整形してSlackにPOSTするコンテンツを作成します。
実際の投稿の様子
コードの短さを犠牲に情報が見やすい投稿を実現しています。