SNS(Simple Notification Service),SQS(Simple Queue Service)都是AWS非常重要的部分,它允许软件的不通部分通过消息传递彼此进行交流。在SNS中可以创建主题并为其订阅资源。订阅的资源可以是HTTP端点,Lambda函数,移动应用,发送邮件,向SQS发送队列消息,发送SMS短信等。在SQS中可以创建队列并发送消息到队列中,然后消费者可以轮询这些消息进行消费,类似Rabbit MQ。
创建SNS主题 Lambda发布SNS通知 Lambda订阅SNS消息 SQS订阅SNS消息 代码下载地址: https://pan.baidu.com/s/1rjK5Pm-UQM8OnPRUUBjuMg
提取码:j9ak1. 创建SNS主题
创建SNS主题比较简单,可以通过控制台创建 https://console.aws.amazon.com/sns/v3/home 或通过CloudFormation模版创建。
当前通过cloudformation模版创建:"UserRegistrationSnsTopic": { "Type": "AWS::SNS::Topic", "Properties": { "Subscription": [ //此主题的 SNS 订阅(终端节点) //下面会在这边添加Lambda和SQS订阅
发布工程
./gradlew deploy
成功后,我们可以在控制台https://console.aws.amazon.com/sns/v3/home中看到发布的主题。
2. Lambda发布SNS通知
在Lambda里面发布SNS消息,需要应用SNS SDK。发布消息很简单,只需要调用AmazonSNSClient.publish(topicArn,message)即可发布信息。参数topicArn可以通过环境变量获取。
在cloudformation.template中添加Arn的环境变量 在Lambda中添发布消息代码 在cloudformation.template中添加Arn的环境变量 "UserRegistrationLambda": { "Type": "AWS::Lambda::Function", "Properties": { "Handler": "com.serverlessbook.lambda.userregistration.Handler", "Description": "User registration Lambda", "Role": { "Fn::GetAtt": [ "Environment": { //添加环境变量,后续UserRegistrationLambda的Handle中使用到。 "UserRegistrationSnsTopic": { "Ref": "UserRegistrationSnsTopic"
在Lambda中添发布消息代码
代码中省略内部类和注册成功跳转地址的代码片段,重点展示SNS的信息发布。//用户注册Lambda public class Handler extends LambdaHandler<Handler.RegistrationInput, Handler.RegistrationOutput> { //SNS Client private AmazonSNSClient amazonSNSClient; //在构造器中我们使用 INJECTOR.injectMembers(this); //这样我们可以通过New方法,将AmazonSNSClient注入进来。不需要在DependencyInjectionModule的config()中配置。 @Inject public Handler setAmazonSNSClient(AmazonSNSClient amazonSNSClient) { this.amazonSNSClient = amazonSNSClient; return this; //获取用户信息发布消息 private void notifySnsSubscribers(User user) { try { amazonSNSClient.publish(System.getenv("UserRegistrationSnsTopic"), user.getEmail()); LOGGER.info("SNS notification sent for "+user.getEmail()); } catch (Exception anyException) { LOGGER.info("SNS notification failed for "+user.getEmail(), anyException); public Handler() { INJECTOR.injectMembers(this); Objects.requireNonNull(userService); @Override public RegistrationOutput handleRequest(RegistrationInput input, Context context) { User createdUser = userService.registerNewUser(input.username, input.email); //发布该主题的消息 notifySnsSubscribers(createdUser); //返回生成user的原始URL return new RegistrationOutput(createdUser);
3. Lambda订阅SNS消息
接下来需要订阅消息,当订阅到消息后可以触发一些动作,例如接受到用户注册功能信息,给用户发送电子邮件。我们创建一个lambda-userregistration-welcomemail工程,在这个工程中订阅“UserRegistrationSnsTopic”主题发布的信息。
创建Lambda工程,获取订阅消息
在cloudformation中配置lambda
在cloudformation中添加消息订阅
创建Lambda工程,获取订阅消息
创建lambda-userregistration-welcomemail工程,将该工程加入settings.gradle,并在build.gradle文件中添加SNS Event的SDK,添加Lambda获取订阅消息的逻辑代码。
如图创建工程和目录结构。在当前工程的build.gradle中添加依赖:
dependencies { compile group: 'com.amazonaws', name: 'aws-lambda-java-events', version: '1.3.0' compile group: 'com.google.inject', name: 'guice', version: guiceVersion
添加Lambda代码,代码中SNSEvent对象的getRecords()方法的值,大多数情况下,返回的是一个元素的列表,但也有可能是多个消息,所以这边需要通过循环遍历来获得每条信息,并进行处理。
public class Handler implements RequestHandler<SNSEvent, Void> { private static final Injector INJECTOR = Guice.createInjector(); private AmazonSimpleEmailServiceClient simpleEmailServiceClient; //直接注入new的AmazonSimpleEmailServiceClient对象 @Inject public Handler setSimpleEmailServiceClient( AmazonSimpleEmailServiceClient simpleEmailServiceClient) { this.simpleEmailServiceClient = simpleEmailServiceClient; return this; public Handler() { INJECTOR.injectMembers(this); Objects.nonNull(simpleEmailServiceClient); private void sendEmail(final String emailAddress) { //可以发送SES邮件 LOGGER.debug("Sending welcome mail to " + emailAddress + " succeeded"); @Override public Void handleRequest(SNSEvent input, Context context) { //收到的是标准的SNSEvent事件 //getRecords()返回的是一个列表,表示Lambda可能一次收多条SNS消息。 input.getRecords().forEach(snsMessage -> sendEmail(snsMessage.getSNS().getMessage())); return null;
在cloudformation中配置lambda "UserRegistrationWelcomeMailLambda": { "Type": "AWS::Lambda::Function", "Properties": { "Handler": "com.serverlessbook.lambda.userregistration.welcomemail.Handler", "Runtime": "java8", "Timeout": "300", "MemorySize": "1024", "Description": "User registration welcome mail Lambda", "Role": { "Fn::GetAtt": [ "LambdaExecutionRole", "Arn" "Code": { "S3Bucket": { "Ref": "DeploymentBucket" "S3Key": { "Fn::Sub": "artifacts/lambda-userregistration-welcomemail/${ProjectVersion}/${DeploymentTime}.jar" //授权器 "UserRegistrationWelcomeMailLambdaPermission": { "Type": "AWS::Lambda::Permission", "Properties": { "Action": "lambda:InvokeFunction", "FunctionName": { "Ref": "UserRegistrationWelcomeMailLambda" "Principal": "sns.amazonaws.com", "SourceArn": { "Fn::Sub": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:*"
在cloudformation中添加消息订阅
这边需要在上面的"UserRegistrationSnsTopic"中添加消息订阅。注意,后续还会添加SQS的订阅。"UserRegistrationSnsTopic": { "Type": "AWS::SNS::Topic", "Properties": { "Subscription": [ //Lambda订阅 "Endpoint": { "Fn::GetAtt": [ "UserRegistrationWelcomeMailLambda", "Arn" "Protocol": "lambda"
第二阶段
./gradlew deploy
在用户注册成功后,可以在CloudWatch控制台看到一行日志"Sending welcome mail to xxx succeeded"的日志。到此整个信息的发布和订阅就完成了。后续我们在这个SNS消息中添加SQS的订阅。4. SQS订阅SNS消息
在SNS的Topic中添加SQS的订阅和Lambda的订阅是一样的。
创建一个消息队列 添加消息订阅 创建一个消息队列
详细内容标注在注释中。//创建标准SQS队列 "UserRegistrationQueue": { "Type": "AWS::SQS::Queue" SQS队列的策略,应用与上面的SQS "UserRegistrationQueuePolicy": { "Type": "AWS::SQS::QueuePolicy", "Properties": { "PolicyDocument": { "Version": "2012-10-17", //权限设置,所有人允许在队列UserRegistrationQueue上发送消息 //(只是发送,接受消息可以添加SQS:ReceiveMessage) "Statement": [ "Effect": "Allow", "Principal": "*", "Action": "SQS:SendMessage", "Resource": { "Fn::GetAtt": [ "UserRegistrationQueue", "Arn" "Condition": { "ArnEquals": { "aws:SourceArn": { "Ref": "UserRegistrationSnsTopic" //策略作用于UserRegistrationQueue队列 "Queues": [ "Ref": "UserRegistrationQueue"
添加消息订阅
需要继续在“UserRegistrationSnsTopic”继续添加订阅消息。完整的Topic和订阅如下:"UserRegistrationSnsTopic": { "Type": "AWS::SNS::Topic", "Properties": { "Subscription": [ //Lambda订阅 "Endpoint": { "Fn::GetAtt": [ "UserRegistrationWelcomeMailLambda", "Arn" "Protocol": "lambda" //SQS订阅 "Endpoint": { "Fn::GetAtt": [ "UserRegistrationQueue", "Arn" "Protocol": "sqs"
第三阶段
./gradlew deploy
在用户注册成功后,可以登陆SQS控制台https://console.aws.amazon.com/sqs/v2/home
已经有一条信息。