一、Netty应用场景
讲了一些Netty的组件,来聊一聊大家最关心的事情吧,他能够做什么?毕竟,我们学习就是拿来用的嘛。我可以简单的概括一下,凡是牵扯到网络相关的,都可以使用Neety去实现!
-
构建高性能、低时延的各种 Java 中间件,例如 MQ、分布式服务框架、ESB 消息总线等,Netty 主要作为基础通信框架提供高性能、低时延的通信服务;
-
公有或者私有协议栈的基础通信框架,例如可以基于 Netty 构建异步、高性能的 WebSocket 协议栈;
-
各领域应用,例如大数据、游戏等,Netty 作为高性能的通信框架用于内部各模块的数据分发、传输和汇总等,实现模块之间高性能通信。
接下来的几篇,会围绕Netty实现相关功能进行展开。
二、Netty实现文件的上传和下载
1、MultipartRequest
import io.netty.handler.codec.http.multipart.FileUpload;
import org.json.simple.JSONObject;
import java.util.Map;
* <p>请求对象</p>
* @author DarkKing
public class MultipartRequest {
private Map<String, FileUpload> fileUploads;
private JSONObject params;
public Map<String, FileUpload> getFileUploads() {
return fileUploads;
public void setFileUploads(Map<String, FileUpload> fileUploads) {
this.fileUploads = fileUploads;
public JSONObject getParams() {
return params;
public void setParams(JSONObject params) {
this.params = params;
定义了一个http封装的对象。保存对应的传参数。
2、FileServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
* 作者:DarkKIng
* 创建日期:2019/12/17
* 类说明:文件下载服务端
public class FileServer {
private final int port;
public FileServer(int port) {
this.port = port;
public static void main(String[] args) throws InterruptedException {
int port = 9999;
FileServer fileServer = new FileServer(port);
System.out.println("服务器即将启动");
fileServer.start();
System.out.println("服务器关闭");
public void start() throws InterruptedException {
final FileServerHandle serverHandler = new FileServerHandle();
/*线程组*/
EventLoopGroup group = new NioEventLoopGroup();
Pipeline pipeline = new Pipeline();
try {
/*服务端启动必须*/
ServerBootstrap b = new ServerBootstrap();
b.group(group)/*将线程组传入*/
.channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
.localAddress(new InetSocketAddress(port))/*指定服务器监听端口*/
/*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
所以下面这段代码的作用就是为这个子channel增加handle*/
.childHandler(pipeline);
ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
System.out.println("Netty server start,port is " + port);
f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
} finally {
group.shutdownGracefully().sync();/*优雅关闭线程组*/
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
* 作者:DarkKIng
* 创建日期:2019/12/17
* 作用:职责链
public class Pipeline extends ChannelInitializer<SocketChannel> {
private EventExecutorGroup businessEventExecutorGroup = new DefaultEventExecutorGroup(10);
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
* http服务器端对response编码
pipeline.addLast("encoder", new HttpResponseEncoder());
* http服务器端对request解码3.
pipeline.addLast("decoder", new HttpRequestDecoder());
* 合并请求
pipeline.addLast("aggregator", new HttpObjectAggregator(655300000));
* 正常业务逻辑处理
pipeline.addLast(businessEventExecutorGroup, new FileServerHandle());
编写职责链,请求会从入栈以次从上到下经过编解码,请求和秉承HTTPObject,最后执行业务类
FileServerHandle
。
4、FileServerHandle
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.multipart.*;
import io.netty.util.CharsetUtil;
import org.json.simple.JSONObject;
import java.io.*;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
* 作者:DarkKIng
* 创建日期:2019/12/17
* 类说明:文件下载handler
@Sharable
public class FileServerHandle extends SimpleChannelInboundHandler<FullHttpRequest> {
/*客户端读到数据以后,就会执行*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
throws Exception {
//打印请求url
System.out.println(request.uri());
//下载任务处理
if (request.uri().equals("/downFile")) {
responseExportFile(ctx, "D://model.txt", "model.txt");
//上传接口处理
if (request.uri().equals("/upLoadFile")) {
MultipartRequest MultipartBody = getMultipartBody(request);
Map<String, FileUpload> fileUploads = MultipartBody.getFileUploads();
//输出文件信息
for (String key : fileUploads.keySet()) {
//获取文件对象
FileUpload file = fileUploads.get(key);
System.out.println("fileName is" + file.getFile().getPath());
//获取文件流
InputStream in = new FileInputStream(file.getFile());
BufferedReader bf = new BufferedReader(new InputStreamReader(in));
String content = bf.lines().collect(Collectors.joining("\n"));
//打印文件
System.out.println("content is \n" + content);
//输出参数信息
JSONObject params = MultipartBody.getParams();
//输出文件信息
System.out.println(JSONObject.toJSONString(params));
/*连接建立以后*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer(
"Hello Netty", CharsetUtil.UTF_8));
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
* 返回下载内容
* @param ctx
* @author DarkKing 2019-12-17
public static void responseExportFile(ChannelHandlerContext ctx, String path, String name) {
File file = new File(path);
try {
//随机读取文件
final RandomAccessFile raf = new RandomAccessFile(file, "r");
long fileLength = raf.length();
//定义response对象
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
//设置请求头部
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileLength);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream; charset=UTF-8");
response.headers().add(HttpHeaderNames.CONTENT_DISPOSITION,
"attachment; filename=\"" + URLEncoder.encode(file.getName(), "UTF-8") + "\";");
ctx.write(response);
//设置事件通知对象
ChannelFuture sendFileFuture = ctx
.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
//文件传输完成执行监听器
@Override
public void operationComplete(ChannelProgressiveFuture future)
throws Exception {
System.out.println("file {} transfer complete.");
//文件传输进度监听器
@Override
public void operationProgressed(ChannelProgressiveFuture future,
long progress, long total) throws Exception {
if (total < 0) {
System.out.println("file {} transfer progress: {}");
} else {
System.out.println("file {} transfer progress: {}/{}");
//刷新缓冲区数据,文件结束标志符
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
* 功能描述
* <p>解析文件上传</p>
* @author DarkKing 2019/10/9 15:24
* @params [ctx, httpDecode]
private static MultipartRequest getMultipartBody(FullHttpRequest request) {
try {
//创建HTTP对象工厂
HttpDataFactory factory = new DefaultHttpDataFactory(true);
//使用HTTP POST解码器
HttpPostRequestDecoder httpDecoder = new HttpPostRequestDecoder(factory, request);
httpDecoder.setDiscardThreshold(0);
if (httpDecoder != null) {
//获取HTTP请求对象
final HttpContent chunk = (HttpContent) request;
//加载对象到加吗器。
httpDecoder.offer(chunk);
if (chunk instanceof LastHttpContent) {
//自定义对象bean
MultipartRequest multipartRequest = new MultipartRequest();
//存放文件对象
Map<String, FileUpload> fileUploads = new HashMap<>();
//存放参数对象
JSONObject body = new JSONObject();
//通过迭代器获取HTTP的内容
java.util.List<InterfaceHttpData> InterfaceHttpDataList = httpDecoder.getBodyHttpDatas();
for (InterfaceHttpData data : InterfaceHttpDataList) {
//如果数据类型为文件类型,则保存到fileUploads对象中
if (data != null && InterfaceHttpData.HttpDataType.FileUpload.equals(data.getHttpDataType())) {
FileUpload fileUpload = (FileUpload) data;
fileUploads.put(data.getName(), fileUpload);
//如果数据类型为参数类型,则保存到body对象中
if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
Attribute attribute = (Attribute) data;
body.put(attribute.getName(), attribute.getValue());
//存放文件信息
multipartRequest.setFileUploads(fileUploads);
//存放参数信息
multipartRequest.setParams(body);
return multipartRequest;
} catch (IOException e) {
e.printStackTrace();
return null;
本节我们使用
netty
来
实现
文件
下载
,
netty
的
文件
下载
并不是通过建立长连接来传输
下载
的,而是通过分片chunked模式
下载
。import io.
netty
.bootstrap.ServerBootstrap;
import io.
netty
.channel.ChannelFuture;
import io.
netty
.channel.ChannelInitializer;
import io....
本实例主要参考的是官网的examples:点击这里
使用场景:客户端向
Netty
请求一个
文件
,
Netty
服务端
下载
指定位置
文件
到客户端。
本实例使用的是Http协议,当然,可以通过简单的修改即可换成TCP协议。
需要注意本实例的关键点是,为了更高效的传输大数据,实例中用到了ChunkedWriteHandler编码器,它提供了以zero-memory-copy方式写
文件
。
第一步:先写一个
基于
netty
实现
的
文件上传
下载
(简单的个人云盘)
发布: : : upload? ***
下载
: : : ***
config.
Netty
ServerConfig中可以设置
上传
下载
的端口号,
文件
的目录,访问的密码(passwd)
打成jar包执行的时候,命令行参数可参考Main中main方法中的代码。
寻找解决方案
我在某应用想开发一个http接口时,发现我的应用不是web应用,想用成熟的组件如spring-web、spring-boot、Tomcat等却望梅止渴,然后百度了一下基本没有解答,预热零零散散发现好像有说的
netty
。
我记得sentinel有类似的接口比如说下发规则到客户端,监听的是8720端口,我去翻了sentinel的源码,确实是用
netty
做为接口交互的。我同事说shardingsphere开源软件也用n
public static void main(String[] args) throws InterruptedException {
int port = 9999;
FileServer fileServer = new FileServer(port);
System.out.println(“
服务器
即将启动”);
fileServer.start();
System.out.println(“
服务器
关闭”);
public void start() throws InterruptedEx
所以下面这段代码的作用就是为这个子channel增加handle*/
.childHandler(pipeline);
ChannelFuture f = b.bind().sync();/异步绑定到
服务器
,sync()会阻塞直到完成/
System.out.println("
Netty
server start,port is " + port);
f.channel().closeFuture().sync();/阻塞直到
服务器
的channel关闭/
} finally {
group.shutdow
Netty
服务端启动代码:
public class
Netty
WebSocketServer extends Thread {
protected ServerBootstrap BOOTSTRAP;
protected EventLoopGroup BOSS_GROUP;
protected EventLoopGroup WORKER_GROUP;
prote