Use an S3 bucket event to trigger SQS Queue to insert image info into DynamoDB Table

Use an S3 bucket event to trigger SQS Queue to insert image info into DynamoDB Table

avatar

Phong Nguyen

2024.03.28

Bài lab này sẽ hướng dẫn tạo DynamoDB table, Sử dụng S3 Event Notification trigger SQS queue, Sử dụng Lambda để poll message về và xử lý, thông tin image sẽ được lưu vào DynamoDB. Đồng thời mỗi khi 1 item được INSERT vào DynamoDB, chúng ta sẽ nhận được email thông báo.

Lab Details

  1. Duration: 40 minutes
  2. AWS Region: US East (N. Virginia) us-east-1

Introduction

  • Amazon DynamoDB là fully managed NoSQL database service, trong đó AWS quản lý việc maintenance, administrative, operations and scaling
  • Việc scaling nhanh chóng và chúng ta không cần phải quan tâm đến workload
  • DynamoDB cung cấp single-digit millisecond latency, ngay cả đối với hàng terabyte dữ liệu. Đây là lý do tại sao nó được sử dụng cho các ứng dụng yêu cầu tốc độ đọc rất nhanh.
  • Nó được sử dụng trong các ứng dụng như chơi game, nơi dữ liệu cần được ghi lại và các thay đổi diễn ra rất nhanh chóng.

Architecture Diagram

Task Details

  1. Create SQS Standard Queue
  2. Create source S3 buckets
  3. Create S3 Event Notification
  4. Create a Lambda function
  5. Create DynamoDB table
  6. Test hoạt động 1
  7. Create SNS Topic
  8. Create Lambda xử lý DynamoDB Stream
  9. Create DynamoDB Stream
  10. Test hoạt động 2

1. Create SQS Standard Queue

  • Type: Standard
  • Name: d-sqs-dva-generate-thumbnai-images-queue
  • Access policy: Advanced -> Edit policy chổ Principal cho phép user / service nào cũng có thể gửi message đến SQS
      "Principal": {
        "AWS": "*"
      },
  • Tags - optional:
    • Key: Name
    • Value: d-sqs-dva-generate-thumbnai-images-queue
  • Các thuộc tính khác để như default

2. Create source S3 buckets

Lưu ý bucket không được trùng tên trong Region, nên hãy đặt tên phù hợp.

  • AWS Region: US East (N. Virginia) us-east-1
  • Bucket name: d-s3-[your-name]-dva-image-source-bucket (Ex: d-s3-cmp-dva-image-source-bucket)

3. Create S3 Event Notification

Vào S3 Console -> Chọn source bucket -> Properties -> Event notifications

  • Event name: CreateThumbnail
  • Event type:
    • Check All object create events
  • Destination
    • SQS queue
    • Choose from your SQS queues: d-sqs-dva-generate-thumbnai-images-queue

4. Create a Lambda function

4.1 Create a permissions policy

  • Policy name: DVALambdaGenerateThumbnailImagePolicy
  • Policy JSON
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject"
            ],
            "Resource": "arn:aws:s3:::*/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeStream",
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
                "dynamodb:ListStreams",
                "dynamodb:BatchGetItem",
                "dynamodb:BatchWriteItem",
                "dynamodb:ConditionCheckItem",
                "dynamodb:PutItem",
                "dynamodb:DescribeTable",
                "dynamodb:DeleteItem",
                "dynamodb:GetItem",
                "dynamodb:Scan",
                "dynamodb:Query",
                "dynamodb:UpdateItem"
            ],
            "Resource": "arn:aws:dynamodb:*:*:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sns:Publish"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

4.2 Create an execution role for Lambda

Create Role cho Lambda

  • Trusted entity type: AWS service
  • Use case: Lambda
  • Permissions policies: DVALambdaGenerateThumbnailImagePolicy
  • Role name: DVALambdaGenerateThumbnailImageRole

4.3 Create lambda function

Tại Lambda Console -> Menu Function -> Create function

  • Author from scratch
  • Basic information
    • Function name: d-lam-nonvpc-dva-generate-thumbnai-images
    • Runtime: Python 3.9
    • Architecture: x86_64
    • Execution role: Use an existing role -> DVALambdaGenerateThumbnailImageRole
  • Create function

4.4 Setting lambda trigger from SQS

Click Add

Kết quả

4.5 Update code cho Lambda

"""
Activity Name: d-lam-nonvpc-dva-generate-thumbnai-images
Description: Generate image thumbnail and save image information to DynamoDB
Created by: Cloud Mentor Pro
Created on: 2024/0/28
"""

# **************************************************
# ----- Import Library
# **************************************************
import json
import logging
import traceback
import boto3
import uuid
import sys
import os


# **************************************************
# ----- Set logger
# **************************************************
logger = logging.getLogger()
for h in logger.handlers:
  logger.removeHandler(h)

h = logging.StreamHandler(sys.stdout)
FORMAT = '%(levelname)s %(asctime)s [%(filename)s:%(funcName)s:%(lineno)d] %(message)s'
h.setFormatter(logging.Formatter(FORMAT))
logger.addHandler(h)
logger.setLevel(logging.INFO)

# **************************************************
# ----- Variables
# **************************************************
dynamodb = boto3.client('dynamodb')
dynamodb_table_name = 'd-table-dva-image-info'

# **************************************************
# ----- lambda_handler
# **************************************************
def lambda_handler(event, context):
    log_name = f"{os.path.splitext(os.path.basename(__file__))[0]}"
    logging.info(f"[{log_name}] process start.")
    
    status = ""
    try:
        logger.info('Extract object information from SQS event')
        for record in event['Records']:
            logger.info('Parse the S3 event message')
            s3_event_message = json.loads(record['body'])
            
            logger.info('Extract object information from the S3 event message')
            for s3_record in s3_event_message['Records']:
                bucket_name = s3_record['s3']['bucket']['name']
                object_key = s3_record['s3']['object']['key']
                
                logger.info('Generate UUID')
                item_id = str(uuid.uuid4())
                
                logger.info('Save object information to DynamoDB with UUID as ID')
                save_to_dynamodb(bucket_name, object_key, item_id)
        
        status = "SUCCESS"

    except Exception as err_info:
        status = "ERROR"
        logger.error(
            f"[{log_name}] failed...\n{err_info}\n{traceback.format_exc()}")
        
    finally:
        logger.info(f"[{log_name}] process end. [status:{status}]")
        return {"status": status}
    
# **************************************************
# ----- save image info to dynamodb
# **************************************************
def save_to_dynamodb(bucket_name, object_key, item_id):
    logger.info('Define item to be put into DynamoDB')
    item = {
        'uuid': {'S': item_id},
        'bucket_name': {'S': bucket_name},
        'object_key': {'S': object_key}
    }

    logger.info('Put item into DynamoDB table')
    dynamodb.put_item(TableName=dynamodb_table_name, Item=item)

Click Deploy

5. Create DynamoDB table

Tại Console DynamoDB -> Menu Tables -> Create Table

  • Table name: d-table-dva-image-info
  • Partition key: uuid
  • Các setting khác để default

6. Test hoạt động 1

Upload một vài images lên S3 bucket

Kiểm tra xem thông tin đã được ghi vào DynamoDB chưa

Log lambda từ Cloudwatch log

7. Create SNS Topic

Vào Console của SNS -> Click Topic -> Create topic

  • Type: Standard
  • Name: d-sns-SAA-demo-topic
  • Display name - optional: AWS Notification
  • Tags - optional:
    • Key: Name
    • Value: d-sns-SAA-demo-topic
  • Các thuộc tính khác để như default

Kết quả

Create subscription sử dụng email của bạn. Cách làm tham khảo tại đây

8. Create Lambda xử lý DynamoDB Stream

Tại Lambda Console -> Menu Function -> Create function

  • Author from scratch
  • Basic information
    • Function name: d-lam-nonvpc-dva-stream-notification
    • Runtime: Python 3.11
    • Architecture: arm64
    • Execution role: Use an existing role -> DVALambdaGenerateThumbnailImageRole
  • Create function
  • Code: Lưu ý thay sns_topic_arn bằng ARN Topic bạn vừa tạo

"""
Activity Name: d-lam-nonvpc-dva-stream-notification
Description: Triggered from DynamoDB Stream, send sns message when item is created!
Created by: Cloud Mentor Pro
Created on: 2024/0/28
"""

# **************************************************
# ----- Import Library
# **************************************************
import json
import logging
import traceback
import boto3
import uuid
import sys
import os


# **************************************************
# ----- Set logger
# **************************************************
logger = logging.getLogger()
for h in logger.handlers:
  logger.removeHandler(h)

h = logging.StreamHandler(sys.stdout)
FORMAT = '%(levelname)s %(asctime)s [%(filename)s:%(funcName)s:%(lineno)d] %(message)s'
h.setFormatter(logging.Formatter(FORMAT))
logger.addHandler(h)
logger.setLevel(logging.INFO)

# **************************************************
# ----- Variables
# **************************************************
sns_client = boto3.client('sns')
sns_topic_arn = 'arn:aws:sns:us-east-1:{your-account-id}:d-sns-SAA-demo-topic'

# **************************************************
# ----- lambda_handler
# **************************************************

def lambda_handler(event, context):
    print(event)
    log_name = f"{os.path.splitext(os.path.basename(__file__))[0]}"
    logging.info(f"[{log_name}] process start.")
    
    status = ""
    try:
        for record in event['Records']:
            if record['eventName'] == 'INSERT':
                logger.info('Extract the new item from the DynamoDB stream record')
                new_image = record['dynamodb']['NewImage']
                
                logger.info('Publish a message to SNS')
                sns_client.publish(
                    TopicArn=sns_topic_arn,
                    Subject='New item added to DynamoDB table',
                    Message=json.dumps(new_image),
                    MessageStructure='string'
                )
    
        status = "SUCCESS"

    except Exception as err_info:
        status = "ERROR"
        logger.error(
            f"[{log_name}] failed...\n{err_info}\n{traceback.format_exc()}")
        
    finally:
        logger.info(f"[{log_name}] process end. [status:{status}]")
        return {"status": status}

Click Deploy

9. Create DynamoDB Stream

  • Tại DynamoDB Console -> Menu Table -> Chọn table d-table-dva-image-info, Tab Exports and streams
  • Turn on DynamoDB stream details

  • Turn on stream

  • Create trigger

  • Chọn Lambda fucntion và Create trigger

10. Test hoạt động 2

Upload vài image lên s3 bucket và kiểm tra thông tin có được ghi vào DynamoDB không, đồng thời kiểm tra có nhận được email chưa. Nếu chưa nhận được email, kiểm tra log của lambda d-lam-nonvpc-dva-stream-notification có lỗi gì không. Nếu vẫn chưa thấy log thì có thể đợi 1-2p lý do DynamoDB Stream vừa tạo có thể chưa được kích hoạt ngay. Thử lại bằng việc upload image.

Troubleshooting

Hễ lúc nào ứng dụng bạn không chạy được thì việc đầu tiên là đọc log lỗi. Nếu không có lỗi nào xuất hiện hãy kiểm tra lại kiến trúc xem các thành phần đã liên kết đúng chưa.

Ví dụ như:

S3 Event Notification -> SQS queue

Lambda d-lam-nonvpc-dva-generate-thumbnai-images được trigger từ SQS queue

Lambda d-lam-nonvpc-dva-stream-notification được trigger từ DynamoDB Stream

Clean up

  1. Delete DynamoDB table: d-table-dva-image-info
  2. Delete 2 lambda function
    • d-lam-nonvpc-dva-generate-thumbnai-images
    • d-lam-nonvpc-dva-stream-notification
  3. Delete Role: DVALambdaGenerateThumbnailImageRole
  4. Delete policy: DVALambdaGenerateThumbnailImagePolicy
  5. Delete SQS queue: d-sqs-dva-generate-thumbnai-images-queue
  6. Delete SNS Topic d-sns-SAA-demo-topic and subscription
  7. Empty and delete bucket d-s3-cmp-dva-image-source-bucket