お久しぶりです。オンラインコンサルタントの直井です(^^♪
皆様いかがお過ごしでしょうか?最近は暑さも和らぎ、比較的過ごしやすい日々が続いております。気温の変化は体調にも表れやすいので、どうぞお気をつけてお過ごしください。
という堅苦しいはいりは置いておいて、今回はタイトルの通りSQSの最大メッセージサイズが256KBということにキレています。このサイズ、微妙に足りなくないですか…?そういうのには向いてないとか、そもそもそんな大きい内容をキューに入れておくな!とか言われるかもしれませんが、入れたいんですわ!入れないと進めないんですわ!なんて状況は多々あると思います。
1. SQSを利用する構成
そもそも、どんな感じでSQSを利用するか?について説明いたします。今回弊社ではルート計算プログラムをサーバレス環境に移行すべく、API Gateway,Lambda, SQSを利用を検討しました。以下の図の通りです。
これまで動いていた環境では、高性能なサーバで常にリクエストを待ち受けて、リクエストがあったら計算する形だったので、待ち受け時間が多いと不要なリソースとして問題がありました。これを解決するため、イベント駆動型に近いサーバレス環境に移行する計画が実行されました。
2. リクエストが肥大化していた
プログラムで計算するための情報は、計算して欲しい地点の位置情報(緯度経度)情報くらいなのでPOSTデータはそこまで大きくないだろう(^^♪ラッキー!標準機能でごり押し移行ができると考えていました。
ですが、実際にリクエストされる内容を確認してみると….その地点の位置情報だけでなく、備考情報やid等計算に必要ない内容もPOSTされており、戦慄しました。また、その内容をそっくりそのまま返す必要があります。
このため、リクエストが、2MBや5MBなどを超えることが多々あります。このことから、SQS標準のメッセージ容量では足りないことが明らかとなりました。
3. メッセージを拡張
このような困った状況でも、AWSは最適なソリューションを用意してくれていました。S3を使えばメッセージの容量を最大2GBまで拡張できるよ!とのことです。これは心強い!2GBもあれば何でもできます。なんでも。というかS3を使った時点で何でもできるような気がしてきました。
https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html
4. リクエストを受け付けてSQSに登録する
できそうなことがわかったので、とりあえず作っていきます。作る部分は、APIGatewayから受け取るLambda関数です。今回の計算プログラムがScalaで掛かれていたので、それそそっくりそのまま受け継ぎます。Lambdaのランタイムは、java on amazon linux2 を利用します。
受付部分はさっきの公式ドキュメントをそのまま利用します。
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model._
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient
import com.amazon.sqs.javamessaging.ExtendedClientConfiguration
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.S3ClientBuilder
def sentSqsMessage(request_json: String): SendMessageResponse = {
val S3_BUCKET_NAME: String = "sqs_message_bucket_name"
val AWS_ACCESS_KEY: String = ""
val AWS_SECRET_KEY: String = ""
val QUEUE_URL: String = "sqs_queue_url"
val awsCredentials = StaticCredentialsProvider.create(AwsBasicCredentials.create(AWS_ACCESS_KEY, AWS_SECRET_KEY))
val s3Client: S3Client = S3Client.builder().region(Region.AP_NORTHEAST_1).credentialsProvider(awsCredentials).httpClient(ApacheHttpClient.create()).build()
val qsClient: SqsClient = SqsClient.builder().region(Region.AP_NORTHEAST_1).credentialsProvider(awsCredentials).httpClient(ApacheHttpClient.create()).build()
val extendedClientConfig = new ExtendedClientConfiguration().withPayloadSupportEnabled(s3Client, S3_BUCKET_NAME)
val sqsExtendClient: AmazonSQSExtendedClient = new AmazonSQSExtendedClient(sqsClient, extendedClientConfig)
val sendMessageRequest: SendMessageRequest = SendMessageRequest.builder().queueUrl(QUEUE_URL).messageBody(request_json).build()
val sendResult: SendMessageResponse = sqsExtendClient.sendMessage(sendMessageRequest)
return sendResult
}
ざっくりと処理の流れを記載すると、AWS クレデンシャルをを生成し、S3とSQSのクライアントを作成します。その後に、SQSをS3を利用して拡張するためのクライアントを作成しリクエストを投げる流れになります。
5. 最後に
あまり日本語で書かれている記事がなく、試し試しで動かしたことを昨日のことのように思い出します….