/ netty

Netty 高性能网络协议服务器开发

本文通过一个实例来讲解如何使用 Netty 框架来开发网络协议服务器,项目使用 Gradle 工具来构建和运行,并且支持 Docker 部署。项目代码已在 GitHub 开源,JW Netty Demo

Netty 简介

Netty 是一个异步、事件驱动的网络应用框架,使用它可以快速开发出可维护良好的、高性能的网络协议服务器。它大幅简化和流程化了网络编程,比如 TCP 和 UDP 套接字服务器开发。难能可贵的是,在保证快速和易用性的同时,使用 Netty 开发的应用并没有丧失可维护性和性能。

components

上图是来自于官网的 Netty 架构图,可以看到整体结构非常清晰,每一层各司其职。

项目介绍

本项目实现了一个用来接收和存储传感器数据的 TCP 网络协议服务器。这些数据由连接在硬件设备上的传感器采集,然后由硬件设备上报到服务器。一个硬件设备可以连接多个传感器,每次上报硬件设备会把所有传感器的数据一并上报。服务端需要计算所有传感器数据的平均值,并保存起来。这是一个典型的物联网数据采集场景。

协议

考虑到物联网环境硬件设备性能不高、网络带宽较小且稳定性不够,我们需要尽可能降低协议编解码开销、减少报文大小。因此最好使用二进制协议,而不是 JSON 这样的文本格式。

本项目采用的二进制协议如下:

| 起始位(2 bytes) | 报文总长(2 bytes) | 协议版本(1 byte) | 设备号(4 bytes) | 时间戳(4 bytes) | 数据项长度(1 byte) | 数据项值 | 更多数据项... | 校验和(2 bytes) |

  • 所有段都是无符号整数,字节顺序为网络字节序
  • 起始位固定为 0x55 0xaa,用来防止网络传输过程中数据错乱
  • 报文总长包含了起始位和最后的校验和
  • 数据项值占用的字节数由其前面一个段的值决定
  • 数据项可以有很多个,只要报文总长不超过 65535 就行

代码解读

目录结构

.
├── Dockerfile # Docker 镜像构建配置文件
├── README.md
├── bin
├── build
├── build.gradle # Gradle 构建配置文件
├── docker-compose.yml # Docker Compose 配置文件
├── gradle # Gradle 目录
├── gradlew # Gradle 包装脚本,通过它来执行构建任务
├── gradlew.bat # Windows 平台下的包装脚本
├── settings.gradle # Gradle 设置
└── src
    ├── main
    │   ├── java
    │   │   └── net
    │   │       └── jaggerwang
    │   │           └── jwnettydemo
    │   │               ├── Main.java # 应用入口
    │   │               ├── config # 配置
    │   │               │   ├── ApplicationConfig.java
    │   │               │   └── MongoDBConfig.java
    │   │               ├── decoder # 报文解码器
    │   │               │   ├── Message.java
    │   │               │   └── MessageDecoder.java
    │   │               ├── handler # 报文处理器
    │   │               │   └── MessageHandler.java
    │   │               └── saver # 报文保存器
    │   │                   ├── Metric.java
    │   │                   └── MetricSaver.java
    │   └── resources
    │       ├── application.properties # 应用配置
    │       └── log4j2.xml # 日志配置
    └── test
        └── java
            └── net
                └── jaggerwang
                    └── jwnettydemo
                        ├── decoder # 报文解码单元测试
                        │   └── MessageDecoderTests.java
                        └── saver # 报文保存单元测试
                            └── MetricSaverTests.java

可以看到,项目结构为典型的 Java 项目结构。

配置文件

通过配置文件,使得同一套代码可以在不同环境中都能运行。为了不为每套环境都维护一套配置文件,本项目将依赖环境的配置抽取了出来,使得运行时可以通过环境变量来覆盖指定配置项。这样各个环境的配置文件就保持了一致,可以共用一份。

配置文件有两个,一个是应用配置,一个是日志配置。日志组件用的是 Log4j 2,配置文件里环境变量的嵌入格式遵循的是 Log4j 风格,包括默认值的指定方式。

应用配置文件内容如下:

# Path
path.app=${PATH_APP:-/Users/jagger/projects/jw/jw-netty-demo}
path.data=${PATH_DATA:-/Users/jagger/data/jw/jw-netty-demo}

# Server
server.port=${SERVER_PORT:-8080}

# MongoDB
mongodb.uri=${MONGODB_URI:-mongodb://localhost:27017/}
mongodb.db=${MONGODB_DB:-jw_netty_demo}

核心代码

整个报文处理过程分两步,第一步解码,由类 MessageDecoder 负责,第二步保存解码出来的 Message,由类 MessageHandler 负责。下面我们重点来看这两个类的实现。

MessageDecoder 类

package net.jaggerwang.jwnettydemo.decoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;

public class MessageDecoder extends ReplayingDecoder {

    private static final Logger logger = LogManager.getLogger();

    public static final int HEADER_LENGTH = 13;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        Message msg = new Message();

        msg.setStartFirst(in.readUnsignedByte());
        msg.setStartSecond(in.readUnsignedByte());
        logger.debug("decode start ok: {} {}", msg.getStartFirst(), msg.getStartSecond());

        msg.setLength(in.readUnsignedShort());
        logger.debug("decode length ok: {}", msg.getLength());

        msg.setVersion(in.readUnsignedByte());
        logger.debug("decode version ok: {}", msg.getVersion());

        msg.setDeviceNo(in.readUnsignedInt());
        logger.debug("decode device no ok: {}", msg.getDeviceNo());

        msg.setTime(in.readUnsignedInt());
        logger.debug("decode time ok: {}", msg.getTime());

        int pos = HEADER_LENGTH;
        while (pos < msg.getLength() - 2) {
            short len = in.readUnsignedByte();
            pos += 1;

            long data;
            switch (len) {
                case 1:
                    data = (long) in.readUnsignedByte();
                    pos += 1;
                    break;
                case 2:
                    data = (long) in.readUnsignedShort();
                    pos += 2;
                    break;
                case 4:
                    data = in.readUnsignedInt();
                    pos += 4;
                    break;
                default:
                    logger.error("unsupported data length: {}", len);
                    continue;
            }

            msg.getDatas().add(data);
            logger.debug("decode one data ok: {}", data);
        }

        msg.setChecksum(in.readUnsignedShort());
        logger.debug("decode checksum ok: {}", msg.getChecksum());

        out.add(msg);
    }
}

MessageDecoder 类继承自 Netty 的 ReplayingDecoder 类,该类解决了网络报文接收不完整的问题。从网络中接收报文时,不能保证一次接收到整个报文,很可能只是其中一部分。如果在应用里来检测报文完整性,在接收到完整报文时才解码,会比较麻烦。通过继承 ReplayingDecoder 类,实现它的 decode 方法,应用就可以假设报文是完整的。实际处理过程中,如果解码时发现报文不完整,decode 会抛出异常,ReplayingDecoder 捕获该异常后将解码位置归零,下次再重头开始。

MessageHandler 类

package net.jaggerwang.jwnettydemo.handler;

import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import net.jaggerwang.jwnettydemo.decoder.Message;
import net.jaggerwang.jwnettydemo.saver.Metric;
import net.jaggerwang.jwnettydemo.saver.MetricSaver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@Sharable
public class MessageHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = LogManager.getLogger();

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        Message message = (Message) msg;
        if (!message.isValid()) {
            logger.error("invalid message: {}", message);
            ctx.close();
            return;
        }
        logger.debug("received message: {}", message);

        MetricSaver saver = new MetricSaver();
        Metric metric = new Metric(message);
        saver.save(metric);
        logger.debug("saved metric: {}", metric);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

MessageHandler 在 Netty Channel 处理管道里位于 MessageDecoder 之后,因此它接收到的是已经解码出来的 Message 对象。它不用关心协议细节,只需要简单地把 Message 对象保存到数据库里就可以。

开发

构建工具

本项目使用 Gradle 工具来构建和运行,但是不需要额外去安装。只要使用 ./gradlew <task> 执行任意任务,如果没有检测到 Gradle 工具,就会自动下载并安装到项目下。

本项目的 build.gradle 配置文件如下:

apply plugin : 'application'

mainClassName = 'net.jaggerwang.jwnettydemo.Main'

dependencies {
    compile 'io.netty:netty-all:4.1.21.Final'
    compile 'org.apache.logging.log4j:log4j-api:2.10.0'
    compile 'org.apache.logging.log4j:log4j-core:2.10.0'
    compile 'org.mongodb:mongodb-driver:3.6.3'
    compile 'com.aliyun:hitsdb-client:0.0.5'
    compile 'org.apache.kafka:kafka-clients:0.10.0.0'
    compile 'org.apache.kafka:connect-json:0.10.0.0'
    compile 'com.aliyun.openservices:ons-sasl-client:0.1'
    compile 'com.fasterxml.jackson.core:jackson-core:2.9.4'
    compile 'com.fasterxml.jackson.core:jackson-databind:2.9.4'
    compile 'com.fasterxml.jackson.module:jackson-module-parameter-names:2.9.4'
    compile 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.9.4'
    compile 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.9.4'
    compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    compile 'commons-codec:commons-codec:1.10'

    compileOnly 'org.projectlombok:lombok:1.16.18'

    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.1.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.1.0'
}

repositories {
    mavenCentral()
}

test {
    useJUnitPlatform()
}

运行

执行下面的命令来运行项目:

$ ./gradlew run
  Starting a Gradle Daemon (subsequent builds will be faster)
  
  > Task :run
  2018-四月-26 21:21:49 INFO  net.jaggerwang.jwnettydemo.config.ApplicationConfig - load properties ok
  SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  SLF4J: Defaulting to no-operation (NOP) logger implementation
  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  2018-四月-26 21:21:50 INFO  net.jaggerwang.jwnettydemo.Main - server started on port 8080
  <=========----> 75% EXECUTING [17s]
  > :run

运行起来的 Server 会在 8080 端口监听请求,按照协议组装报文后发往 localhost:8080 就可以上报数据。

测试

使用下面的命令来运行单元测试:

$ ./gradlew test
  Starting a Gradle Daemon (subsequent builds will be faster)
  
  Deprecated Gradle features were used in this build, making it incompatible with Gradle 5.0.
  See https://docs.gradle.org/4.6/userguide/command_line_interface.html#sec:command_line_warnings
  
  BUILD SUCCESSFUL in 7s
  4 actionable tasks: 4 up-to-date

本项目使用 JUnit 5 来编写和运行单元测试。目前有两个单元测试,一个是报文解码,另一个是报文保存。

打包

使用下面的命令来打包和运行:

$ ./gradlew installDist
  
  Deprecated Gradle features were used in this build, making it incompatible with Gradle 5.0.
  See https://docs.gradle.org/4.6/userguide/command_line_interface.html#sec:command_line_warnings
  
  BUILD SUCCESSFUL in 2s
  5 actionable tasks: 3 executed, 2 up-to-date

$ ./build/install/jw-netty-demo/bin/jw-netty-demo
  2018-四月-26 22:23:22 INFO  net.jaggerwang.jwnettydemo.config.ApplicationConfig - load properties ok
  SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  SLF4J: Defaulting to no-operation (NOP) logger implementation
  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  2018-四月-26 22:23:22 INFO  net.jaggerwang.jwnettydemo.Main - server started on port 8080

打包好的应用会安装到目录 ./build/install/jw-netty-demo 下, 可以使用其下的 bin/jw-netty-demo 脚本来运行应用。

Docker 部署

构建镜像

执行下面的命令来构建镜像:

$ docker build -t jw-netty-demo .

构建镜像的配置文件如下:

FROM java:8

ENV APP_PATH=/app
ENV DATA_PATH=/data

WORKDIR $APP_PATH

COPY . .
RUN ./gradlew installDist

VOLUME $DATA_PATH

EXPOSE 8080

CMD ./build/install/jw-netty-demo/bin/jw-netty-demo

在容器里运行服务

执行下面的命令来运行项目所有服务:

$ docker-compose up

Docker Compose 配置文件如下:

version: "2"
services:
  app:
    image: jw-netty-demo:latest
    environment:
      PATH_APP: /app
      PATH_DATA: /data
      MONGODB_URI: mongodb:27017
    ports:
    - 19900:8080
    volumes:
    - ~/data/jw-netty-demo/app:/data
  mongodb:
    image: mongo:3
    volumes:
    - ~/data/jw-netty-demo/mongodb:/data/db

Docker Compose 会同时启动 app 服务,及其依赖的 mongodb 服务。

参考资料

  1. JW Netty Demo
Netty 高性能网络协议服务器开发
Share this