AWS Lambda笔记-SNS/SQS事件通知-13【含源代码】

SNS(Simple Notification Service),SQS(Simple Queue Service)都是AWS非常重要的部分,它允许软件的不通部分通过消息传递彼此进行交流。在SNS中可以创建主题并为其订阅资源。订阅的资源可以是HTTP端点,Lambda函数,移动应用,发送邮件,向SQS发送队列消息,发送SMS短信等。在SQS中可以创建队列并发送消息到队列中,然后消费者可以轮询这些消息进行消费,类似Rabbit MQ。

  • 创建SNS主题
  • Lambda发布SNS通知
  • Lambda订阅SNS消息
  • SQS订阅SNS消息
  • 代码下载地址: https://pan.baidu.com/s/1rjK5Pm-UQM8OnPRUUBjuMg
    提取码:j9ak

    1. 创建SNS主题

    创建SNS主题比较简单,可以通过控制台创建 https://console.aws.amazon.com/sns/v3/home 或通过CloudFormation模版创建。
    当前通过cloudformation模版创建:

    "UserRegistrationSnsTopic": {
      "Type": "AWS::SNS::Topic",
      "Properties": {
        "Subscription": [
          //此主题的 SNS 订阅(终端节点)
          //下面会在这边添加Lambda和SQS订阅
    

    发布工程./gradlew deploy成功后,我们可以在控制台https://console.aws.amazon.com/sns/v3/home中看到发布的主题。

    2. Lambda发布SNS通知

    在Lambda里面发布SNS消息,需要应用SNS SDK。发布消息很简单,只需要调用AmazonSNSClient.publish(topicArn,message)即可发布信息。参数topicArn可以通过环境变量获取。

  • 在cloudformation.template中添加Arn的环境变量
  • 在Lambda中添发布消息代码
  • 在cloudformation.template中添加Arn的环境变量
  •  "UserRegistrationLambda": {
          "Type": "AWS::Lambda::Function",
          "Properties": {
            "Handler": "com.serverlessbook.lambda.userregistration.Handler",
            "Description": "User registration Lambda",
            "Role": {
              "Fn::GetAtt": [
            "Environment": {
                 //添加环境变量,后续UserRegistrationLambda的Handle中使用到。
                "UserRegistrationSnsTopic": {
                  "Ref": "UserRegistrationSnsTopic"
    
  • 在Lambda中添发布消息代码
    代码中省略内部类和注册成功跳转地址的代码片段,重点展示SNS的信息发布。
  • //用户注册Lambda
    public class Handler extends LambdaHandler<Handler.RegistrationInput, Handler.RegistrationOutput> {
        //SNS Client 
        private AmazonSNSClient amazonSNSClient;
        //在构造器中我们使用 INJECTOR.injectMembers(this);
        //这样我们可以通过New方法,将AmazonSNSClient注入进来。不需要在DependencyInjectionModule的config()中配置。
        @Inject
        public Handler setAmazonSNSClient(AmazonSNSClient amazonSNSClient) {
            this.amazonSNSClient = amazonSNSClient;
            return this;
        //获取用户信息发布消息
        private void notifySnsSubscribers(User user) {
          try {
            amazonSNSClient.publish(System.getenv("UserRegistrationSnsTopic"), user.getEmail());
            LOGGER.info("SNS notification sent for "+user.getEmail());
          } catch (Exception anyException) {
            LOGGER.info("SNS notification failed for "+user.getEmail(), anyException);
        public Handler() {
            INJECTOR.injectMembers(this);
            Objects.requireNonNull(userService);
        @Override
        public RegistrationOutput handleRequest(RegistrationInput input, Context context) {
            User createdUser = userService.registerNewUser(input.username, input.email);
            //发布该主题的消息
            notifySnsSubscribers(createdUser);
            //返回生成user的原始URL
            return new RegistrationOutput(createdUser);
    

    3. Lambda订阅SNS消息

    接下来需要订阅消息,当订阅到消息后可以触发一些动作,例如接受到用户注册功能信息,给用户发送电子邮件。我们创建一个lambda-userregistration-welcomemail工程,在这个工程中订阅“UserRegistrationSnsTopic”主题发布的信息。

  • 创建Lambda工程,获取订阅消息

  • 在cloudformation中配置lambda

  • 在cloudformation中添加消息订阅

  • 创建Lambda工程,获取订阅消息
    创建lambda-userregistration-welcomemail工程,将该工程加入settings.gradle,并在build.gradle文件中添加SNS Event的SDK,添加Lambda获取订阅消息的逻辑代码。
    如图创建工程和目录结构。

    在当前工程的build.gradle中添加依赖:

    dependencies {
        compile group: 'com.amazonaws', name: 'aws-lambda-java-events', version: '1.3.0'
        compile group: 'com.google.inject', name: 'guice', version: guiceVersion
    

    添加Lambda代码,代码中SNSEvent对象的getRecords()方法的值,大多数情况下,返回的是一个元素的列表,但也有可能是多个消息,所以这边需要通过循环遍历来获得每条信息,并进行处理。

    public class Handler implements RequestHandler<SNSEvent, Void> {
      private static final Injector INJECTOR = Guice.createInjector();
      private AmazonSimpleEmailServiceClient simpleEmailServiceClient;
      //直接注入new的AmazonSimpleEmailServiceClient对象
      @Inject
      public Handler setSimpleEmailServiceClient(
          AmazonSimpleEmailServiceClient simpleEmailServiceClient) {
        this.simpleEmailServiceClient = simpleEmailServiceClient;
        return this;
      public Handler() {
        INJECTOR.injectMembers(this);
        Objects.nonNull(simpleEmailServiceClient);
      private void sendEmail(final String emailAddress) {
         //可以发送SES邮件
         LOGGER.debug("Sending welcome mail to " + emailAddress + " succeeded");
      @Override
      public Void handleRequest(SNSEvent input, Context context) {
        //收到的是标准的SNSEvent事件
        //getRecords()返回的是一个列表,表示Lambda可能一次收多条SNS消息。
        input.getRecords().forEach(snsMessage -> sendEmail(snsMessage.getSNS().getMessage()));
        return null;
    
  • 在cloudformation中配置lambda
  •     "UserRegistrationWelcomeMailLambda": {
          "Type": "AWS::Lambda::Function",
          "Properties": {
            "Handler": "com.serverlessbook.lambda.userregistration.welcomemail.Handler",
            "Runtime": "java8",
            "Timeout": "300",
            "MemorySize": "1024",
            "Description": "User registration welcome mail Lambda",
            "Role": {
              "Fn::GetAtt": [
                "LambdaExecutionRole",
                "Arn"
            "Code": {
              "S3Bucket": {
                "Ref": "DeploymentBucket"
              "S3Key": {
                "Fn::Sub": "artifacts/lambda-userregistration-welcomemail/${ProjectVersion}/${DeploymentTime}.jar"
        //授权器
        "UserRegistrationWelcomeMailLambdaPermission": {
          "Type": "AWS::Lambda::Permission",
          "Properties": {
            "Action": "lambda:InvokeFunction",
            "FunctionName": {
              "Ref": "UserRegistrationWelcomeMailLambda"
            "Principal": "sns.amazonaws.com",
            "SourceArn": {
              "Fn::Sub": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:*"
    
  • 在cloudformation中添加消息订阅
    这边需要在上面的"UserRegistrationSnsTopic"中添加消息订阅。注意,后续还会添加SQS的订阅。
  • "UserRegistrationSnsTopic": {
        "Type": "AWS::SNS::Topic",
        "Properties": {
            "Subscription": [
                //Lambda订阅
                    "Endpoint": {
                        "Fn::GetAtt": [
                            "UserRegistrationWelcomeMailLambda",
                            "Arn"
                    "Protocol": "lambda"
    

    第二阶段./gradlew deploy 在用户注册成功后,可以在CloudWatch控制台看到一行日志"Sending welcome mail to xxx succeeded"的日志。到此整个信息的发布和订阅就完成了。后续我们在这个SNS消息中添加SQS的订阅。

    4. SQS订阅SNS消息

    在SNS的Topic中添加SQS的订阅和Lambda的订阅是一样的。

  • 创建一个消息队列
  • 添加消息订阅
  • 创建一个消息队列
    详细内容标注在注释中。
  • //创建标准SQS队列
    "UserRegistrationQueue": {
        "Type": "AWS::SQS::Queue"
    SQS队列的策略,应用与上面的SQS
    "UserRegistrationQueuePolicy": {
        "Type": "AWS::SQS::QueuePolicy",
        "Properties": {
            "PolicyDocument": {
                "Version": "2012-10-17",
                //权限设置,所有人允许在队列UserRegistrationQueue上发送消息
                //(只是发送,接受消息可以添加SQS:ReceiveMessage)
                "Statement": [
                        "Effect": "Allow",
                        "Principal": "*",
                        "Action": "SQS:SendMessage",
                        "Resource": {
                            "Fn::GetAtt": [
                                "UserRegistrationQueue",
                                "Arn"
                        "Condition": {
                            "ArnEquals": {
                                "aws:SourceArn": {
                                    "Ref": "UserRegistrationSnsTopic"
            //策略作用于UserRegistrationQueue队列
            "Queues": [
                    "Ref": "UserRegistrationQueue"
    
  • 添加消息订阅
    需要继续在“UserRegistrationSnsTopic”继续添加订阅消息。完整的Topic和订阅如下:
  • "UserRegistrationSnsTopic": {
        "Type": "AWS::SNS::Topic",
        "Properties": {
            "Subscription": [
                //Lambda订阅
                    "Endpoint": {
                        "Fn::GetAtt": [
                            "UserRegistrationWelcomeMailLambda",
                            "Arn"
                    "Protocol": "lambda"
                //SQS订阅
                    "Endpoint": {
                        "Fn::GetAtt": [
                            "UserRegistrationQueue",
                            "Arn"
                    "Protocol": "sqs"
    

    第三阶段./gradlew deploy 在用户注册成功后,可以登陆SQS控制台https://console.aws.amazon.com/sqs/v2/home
    已经有一条信息。