Netty
version: 4.1.55.Final
傳統(tǒng)的IO模型的web容器,比如老版本的Tomcat,為了增加系統(tǒng)的吞吐量,需要不斷增加系統(tǒng)核心線程數(shù)量,或者通過水平擴展服務(wù)器數(shù)量,來增加系統(tǒng)處理請求的能力。 有了NIO之后,一個線程即可處理多個連接事件,基于多路復(fù)用模型的Netty框架,不僅降低了使用NIO的復(fù)雜度,
優(yōu)點
Netty是一款以java NIO為基礎(chǔ),基于事件驅(qū)動模型支持異步、高并發(fā)的網(wǎng)絡(luò)應(yīng)用框架
- API使用簡單,開發(fā)門檻低,簡化了NIO開發(fā)網(wǎng)絡(luò)程序的復(fù)雜度
- 功能強大,預(yù)置多種編解碼功能,支持多種主流協(xié)議,比如Http、WebSocket。
- 定制能力強,可以通過ChannelHandler對通信框架靈活擴展。
- 性能高,支持異步非阻塞通信模型
- 成熟穩(wěn)定,社區(qū)活躍,已經(jīng)修復(fù)了Java NIO所有的Bug。
- 經(jīng)歷了大規(guī)模商業(yè)應(yīng)用的考驗,質(zhì)量有保證。
IO模型
select、poll和epoll
操作系統(tǒng)內(nèi)核基于這些函數(shù)實現(xiàn)非阻塞IO,以此實現(xiàn)多路復(fù)用模型
- select
select
- select 調(diào)用需要傳入 fd 數(shù)組,需要拷貝一份到內(nèi)核,高并發(fā)場景下這樣的拷貝消耗的資源是驚人的。(可優(yōu)化為不復(fù)制)
- select 在內(nèi)核層仍然是通過遍歷的方式檢查文件描述符的就緒狀態(tài),是個同步過程,只不過無系統(tǒng)調(diào)用切換上下文的開銷。(內(nèi)核層可優(yōu)化為異步事件通知)
- select 僅僅返回可讀文件描述符的個數(shù),具體哪個可讀還是要用戶自己遍歷。(可優(yōu)化為只返回給用戶就緒的文件描述符,無需用戶做無效的遍歷)
- pool
和 select 的主要區(qū)別就是,去掉了 select 只能監(jiān)聽 1024 個文件描述符的限制
- epool
epool
- 內(nèi)核中保存一份文件描述符集合,無需用戶每次都重新傳入,只需告訴內(nèi)核修改的部分即可。
- 內(nèi)核不再通過輪詢的方式找到就緒的文件描述符,而是通過異步 IO 事件喚醒。
- 內(nèi)核僅會將有 IO 事件的文件描述符返回給用戶,用戶也無需遍歷整個文件描述符集合。
Reactor模型
一、單Reactor單線程 1)可以實現(xiàn)通過一個阻塞對象監(jiān)聽多個鏈接請求
2)Reactor對象通過select監(jiān)聽客戶端請求事件,通過dispatch進(jìn)行分發(fā)
3)如果是建立鏈接請求,則由Acceptor通過accept處理鏈接請求,然后創(chuàng)建一個Handler對象處理完成鏈接后的各種事件
4)如果不是鏈接請求,則由Reactor分發(fā)調(diào)用鏈接對應(yīng)的Handler來處理
5)Handler會完成Read->業(yè)務(wù)處理->send的完整業(yè)務(wù)流程
reactor
二、單Reactor多線程 1)Reactor對象通過select監(jiān)聽客戶端請求事件,收到事件后,通過dispatch分發(fā)
2)如果是建立鏈接請求,則由Acceptor通過accept處理鏈接請求,然后創(chuàng)建一個Handler對象處理完成鏈接后的各種事件
3)如果不是鏈接請求,則由Reactor分發(fā)調(diào)用鏈接對應(yīng)的Handler來處理
4)Handler只負(fù)責(zé)事件響應(yīng)不做具體業(yè)務(wù)處理
5)通過read讀取數(shù)據(jù)后,分發(fā)到worker線程池處理,處理完成后返回給Handler,Handler收到后,通過send將結(jié)果返回給client
reactor
三、主從Reactor多線程 1)Reactor主線程MainReactor對象通過select監(jiān)聽鏈接事件,通過Acceptor處理
2)當(dāng)Acceptor處理鏈接事件后,MainReactor將鏈接分配給SubReactor
3)SubReactor將鏈接加入到隊列進(jìn)行監(jiān)聽,并創(chuàng)建Handler進(jìn)行事件處理
4)當(dāng)有新事件發(fā)生時,SubReactor就會調(diào)用對應(yīng)的Handler處理
5)Handler通過read讀取數(shù)據(jù),分發(fā)到worker線程池處理,處理完成后返回給Handler,Handler收到后,通過send將結(jié)果返回給client
6)Reactor主線程可以對應(yīng)多個Reactor子線程
reactor
三種模式用生活案例來理解 1)單Reactor單線程,前臺接待員和服務(wù)員是同一個人,全程為顧客服務(wù)
2)單Reactor多線程,1個前臺接待員,多個服務(wù)員,接待員只負(fù)責(zé)接待
3)主從Reactor多線程,多個前臺接待員,多個服務(wù)員
Reactor模型具有如下優(yōu)點 1)響應(yīng)快,不必為單個同步事件所阻塞,雖然Reactor本身依然是同步的
2)可以最大程度的避免復(fù)雜的多線程及同步問題,并且避免了多線程/進(jìn)程的切換開銷
3)擴展性好,可以方便的通過增加Reactor實例個數(shù)來充分利用CPU資源
4)復(fù)用性好,Reactor模型本身與具體事件處理邏輯無關(guān),具有很高的復(fù)用性
核心組件
1.Bootstrap 一個Netty應(yīng)用通常由一個Bootstrap開始,它主要作用是配置整個Netty程序,串聯(lián)起各個組件。
Handler,為了支持各種協(xié)議和處理數(shù)據(jù)的方式,便誕生了Handler組件。Handler主要用來處理各種事件,這里的事件很廣泛,比如可以是連接、數(shù)據(jù)接收、異常、數(shù)據(jù)轉(zhuǎn)換等。
2.ChannelInboundHandler 一個最常用的Handler。這個Handler的作用就是處理接收到數(shù)據(jù)時的事件,也就是說,我們的業(yè)務(wù)邏輯一般就是寫在這個Handler里面的,ChannelInboundHandler就是用來處理我們的核心業(yè)務(wù)邏輯。
3.ChannelInitializer 當(dāng)一個鏈接建立時,我們需要知道怎么來接收或者發(fā)送數(shù)據(jù),當(dāng)然,我們有各種各樣的Handler實現(xiàn)來處理它,那么ChannelInitializer便是用來配置這些Handler,它會提供一個ChannelPipeline,并把Handler加入到ChannelPipeline。
4.ChannelPipeline 一個Netty應(yīng)用基于ChannelPipeline機制,這種機制需要依賴于EventLoop和EventLoopGroup,因為它們?nèi)齻€都和事件或者事件處理相關(guān)。
EventLoops的目的是為Channel處理IO操作,一個EventLoop可以為多個Channel服務(wù)。
EventLoopGroup會包含多個EventLoop。
5.Channel 代表了一個Socket鏈接,或者其它和IO操作相關(guān)的組件,它和EventLoop一起用來參與IO處理。
6.Future 在Netty中所有的IO操作都是異步的,因此,你不能立刻得知消息是否被正確處理,但是我們可以過一會等它執(zhí)行完成或者直接注冊一個監(jiān)聽,具體的實現(xiàn)就是通過Future和ChannelFutures,他們可以注冊一個監(jiān)聽,當(dāng)操作執(zhí)行成功或失敗時監(jiān)聽會自動觸發(fā)。
示例
通過一個簡單的示例,首先了解怎么基于netty開發(fā)一個通信程序,包括服務(wù)的與客戶端:
Server:
@Slf4j
public class Server {
private EventLoopGroup boosGroup;
private EventLoopGroup workGroup;
public Server(int port){
try {
init(port);
log.info("----- 服務(wù)啟動成功 -----");
} catch (InterruptedException e) {
log.error("啟動服務(wù)出錯:{}", e.getCause());
}
}
private void init(int port) throws InterruptedException {
// 處理連接
this.boosGroup = new NioEventLoopGroup();
// 處理業(yè)務(wù)
this.workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
// 綁定
bootstrap.group(boosGroup, workGroup)
.channel(NioServerSocketChannel.class) //配置服務(wù)端
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_RCVBUF, 1024)
.childOption(ChannelOption.SO_SNDBUF, 1024)
.childHandler(new ChannelInitializer< SocketChannel >() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
}
public void close(){
this.boosGroup.shutdownGracefully();
this.workGroup.shutdownGracefully();
}
}
@Slf4j
class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info(" >> >> >> > server active");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//1. 讀取客戶端的數(shù)據(jù)(緩存中去取并打印到控制臺)
ByteBuf buf = (ByteBuf) msg;
byte[] request = new byte[buf.readableBytes()];
buf.readBytes(request);
String requestBody = new String(request, "utf-8");
log.info(" >> >> >> >> > receive message: {}", requestBody);
//2. 返回響應(yīng)數(shù)據(jù)
ctx.writeAndFlush(Unpooled.copiedBuffer((requestBody+" too").getBytes()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
Client:
@Slf4j
public class Client {
private EventLoopGroup workGroup;
private ChannelFuture channelFuture;
public Client(int port){
init(port);
}
private void init(int port){
this.workGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_RCVBUF, 1024)
.option(ChannelOption.SO_SNDBUF, 1024)
.handler(new ChannelInitializer< SocketChannel >() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ClientHandler());
}
});
this.channelFuture = bootstrap.connect("127.0.0.1", port).syncUninterruptibly();
}
/**
*
* @param message
*/
public void send(String message){
this.channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(message.getBytes()));
}
/**
*
*/
public void close(){
try {
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
workGroup.shutdownGracefully();
}
}
@Slf4j
class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info(" >> >> >> > client active");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
log.info(" >> >> >> >> > receive message: {}", body);
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
測試:
public class StarterTests {
static int port = 9011;
@Test
public void startServer(){
Server server = new Server(9011);
}
@Test
public void startClient(){
Client client = new Client(port);
client.send("Hello Netty!");
while (true){}
}
}
生態(tài)
- Dubbo
- Spring Reactive
類似技術(shù)
Mina、Netty、Grizzly
其他
Proactor非阻塞異步網(wǎng)絡(luò)模型
-
Web
+關(guān)注
關(guān)注
2文章
1253瀏覽量
69057 -
框架
+關(guān)注
關(guān)注
0文章
396瀏覽量
17269 -
容器
+關(guān)注
關(guān)注
0文章
490瀏覽量
21986 -
模型
+關(guān)注
關(guān)注
1文章
3032瀏覽量
48363
發(fā)布評論請先 登錄
相關(guān)推薦
評論