1. 编码和解码的基本介绍
编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码。codec(编解码器) 的组成部分有两个:decoder(解码器)和 encoder(编码器)。encoder 负责把业务数据转换成字节码数据,decoder 负责把字节码数据转换成业务数据。
1. Netty 本身的编码解码的机制
Netty 自身提供了一些 codec(编解码器)
Netty 提供的编码器
StringEncoder,对字符串数据进行编码
ObjectEncoder,对 Java 对象进行编码
…
Netty 提供的解码器
StringDecoder, 对字符串数据进行解码
ObjectDecoder,对 Java 对象进行解码
Netty 本身自带的 ObjectDecoder 和 ObjectEncoder 可以用来实现 POJO 对象或各种业务对象的编码和解码,底层使用的仍是 Java 序列化技术 , 而Java 序列化技术本身效率就不高,存在如下问题:
(1)无法跨语言
(2)序列化后的体积太大,是二进制编码的 5 倍多。
(3)序列化性能比较低
2. Protobuf
Protobuf 是 Google 发布的开源项目,全称 Google Protocol Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或 RPC[远程过程调用 remote procedure call ] 数据交换格式 。
git地址:
https://github.com/protocolbuffers/protobuf
2. Protobuf 简单实用
1. 到git 上下载Protobuf 客户端工具,用于将proto 文件反向生成java类文件
2. 编写.proto 文件: protobuf 以Message 进行管理
syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO";//生成的外部类名,同时也是文件名
//protobuf 使用message 管理数据
message Student { //会在 StudentPOJO 外部类生成一个内部类 Student, 他是真正发送的POJO对象
int32 id = 1; // Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值
string name = 2;
3. 用上面第一步下载的客户端反向生成java 类, cmd 命令行执行如下命令
protoc.exe --java_out=. StudentPOJO.proto
4. 生成的类信息如下:
1 package netty.codec;// Generated by the protocol buffer compiler. DO NOT EDIT!
2 // source: StudentPOJO.proto
4 public final class StudentPOJO {
5 private StudentPOJO() {}
6 public static void registerAllExtensions(
7 com.google.protobuf.ExtensionRegistryLite registry) {
10 public static void registerAllExtensions(
11 com.google.protobuf.ExtensionRegistry registry) {
12 registerAllExtensions(
13 (com.google.protobuf.ExtensionRegistryLite) registry);
14 }
15 public interface StudentOrBuilder extends
16 // @@protoc_insertion_point(interface_extends:Student)
17 com.google.protobuf.MessageOrBuilder {
19 /**
20 * <pre>
21 * Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值
22 * </pre>
23 *
24 * <code>int32 id = 1;</code>
25 * @return The id.
26 */
27 int getId();
29 /**
30 * <code>string name = 2;</code>
31 * @return The name.
32 */
33 String getName();
34 /**
35 * <code>string name = 2;</code>
36 * @return The bytes for name.
37 */
38 com.google.protobuf.ByteString
39 getNameBytes();
40 }
41 /**
42 * <pre>
43 *protobuf 使用message 管理数据
44 * </pre>
45 *
46 * Protobuf type {@code Student}
47 */
48 public static final class Student extends
49 com.google.protobuf.GeneratedMessageV3 implements
50 // @@protoc_insertion_point(message_implements:Student)
51 StudentOrBuilder {
52 private static final long serialVersionUID = 0L;
53 // Use Student.newBuilder() to construct.
54 private Student(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
55 super(builder);
56 }
57 private Student() {
58 name_ = "";
59 }
61 @Override
62 @SuppressWarnings({"unused"})
63 protected Object newInstance(
64 UnusedPrivateParameter unused) {
65 return new Student();
66 }
68 @Override
69 public final com.google.protobuf.UnknownFieldSet
70 getUnknownFields() {
71 return this.unknownFields;
72 }
73 private Student(
74 com.google.protobuf.CodedInputStream input,
75 com.google.protobuf.ExtensionRegistryLite extensionRegistry)
76 throws com.google.protobuf.InvalidProtocolBufferException {
77 this();
78 if (extensionRegistry == null) {
79 throw new NullPointerException();
80 }
81 com.google.protobuf.UnknownFieldSet.Builder unknownFields =
82 com.google.protobuf.UnknownFieldSet.newBuilder();
83 try {
84 boolean done = false;
85 while (!done) {
86 int tag = input.readTag();
87 switch (tag) {
88 case 0:
89 done = true;
90 break;
91 case 8: {
93 id_ = input.readInt32();
94 break;
95 }
96 case 18: {
97 String s = input.readStringRequireUtf8();
99 name_ = s;
100 break;
101 }
102 default: {
103 if (!parseUnknownField(
104 input, unknownFields, extensionRegistry, tag)) {
105 done = true;
106 }
107 break;
108 }
109 }
110 }
111 } catch (com.google.protobuf.InvalidProtocolBufferException e) {
112 throw e.setUnfinishedMessage(this);
113 } catch (java.io.IOException e) {
114 throw new com.google.protobuf.InvalidProtocolBufferException(
115 e).setUnfinishedMessage(this);
116 } finally {
117 this.unknownFields = unknownFields.build();
118 makeExtensionsImmutable();
119 }
120 }
121 public static final com.google.protobuf.Descriptors.Descriptor
122 getDescriptor() {
123 return StudentPOJO.internal_static_Student_descriptor;
124 }
126 @Override
127 protected FieldAccessorTable
128 internalGetFieldAccessorTable() {
129 return StudentPOJO.internal_static_Student_fieldAccessorTable
130 .ensureFieldAccessorsInitialized(
131 Student.class, Builder.class);
132 }
134 public static final int ID_FIELD_NUMBER = 1;
135 private int id_;
136 /**
137 * <pre>
138 * Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值
139 * </pre>
140 *
141 * <code>int32 id = 1;</code>
142 * @return The id.
143 */
144 @Override
145 public int getId() {
146 return id_;
147 }
149 public static final int NAME_FIELD_NUMBER = 2;
150 private volatile Object name_;
151 /**
152 * <code>string name = 2;</code>
153 * @return The name.
154 */
155 @Override
156 public String getName() {
157 Object ref = name_;
158 if (ref instanceof String) {
159 return (String) ref;
160 } else {
161 com.google.protobuf.ByteString bs =
162 (com.google.protobuf.ByteString) ref;
163 String s = bs.toStringUtf8();
164 name_ = s;
165 return s;
166 }
167 }
168 /**
169 * <code>string name = 2;</code>
170 * @return The bytes for name.
171 */
172 @Override
173 public com.google.protobuf.ByteString
174 getNameBytes() {
175 Object ref = name_;
176 if (ref instanceof String) {
177 com.google.protobuf.ByteString b =
178 com.google.protobuf.ByteString.copyFromUtf8(
179 (String) ref);
180 name_ = b;
181 return b;
182 } else {
183 return (com.google.protobuf.ByteString) ref;
184 }
185 }
187 private byte memoizedIsInitialized = -1;
188 @Override
189 public final boolean isInitialized() {
190 byte isInitialized = memoizedIsInitialized;
191 if (isInitialized == 1) return true;
192 if (isInitialized == 0) return false;
194 memoizedIsInitialized = 1;
195 return true;
196 }
198 @Override
199 public void writeTo(com.google.protobuf.CodedOutputStream output)
200 throws java.io.IOException {
201 if (id_ != 0) {
202 output.writeInt32(1, id_);
203 }
204 if (!getNameBytes().isEmpty()) {
205 com.google.protobuf.GeneratedMessageV3.writeString(output, 2, name_);
206 }
207 unknownFields.writeTo(output);
208 }
210 @Override
211 public int getSerializedSize() {
212 int size = memoizedSize;
213 if (size != -1) return size;
215 size = 0;
216 if (id_ != 0) {
217 size += com.google.protobuf.CodedOutputStream
218 .computeInt32Size(1, id_);
219 }
220 if (!getNameBytes().isEmpty()) {
221 size += com.google.protobuf.GeneratedMessageV3.computeStringSize(2, name_);
222 }
223 size += unknownFields.getSerializedSize();
224 memoizedSize = size;
225 return size;
226 }
228 @Override
229 public boolean equals(final Object obj) {
230 if (obj == this) {
231 return true;
232 }
233 if (!(obj instanceof Student)) {
234 return super.equals(obj);
235 }
236 Student other = (Student) obj;
238 if (getId()
239 != other.getId()) return false;
240 if (!getName()
241 .equals(other.getName())) return false;
242 if (!unknownFields.equals(other.unknownFields)) return false;
243 return true;
244 }
246 @Override
247 public int hashCode() {
248 if (memoizedHashCode != 0) {
249 return memoizedHashCode;
250 }
251 int hash = 41;
252 hash = (19 * hash) + getDescriptor().hashCode();
253 hash = (37 * hash) + ID_FIELD_NUMBER;
254 hash = (53 * hash) + getId();
255 hash = (37 * hash) + NAME_FIELD_NUMBER;
256 hash = (53 * hash) + getName().hashCode();
257 hash = (29 * hash) + unknownFields.hashCode();
258 memoizedHashCode = hash;
259 return hash;
260 }
262 public static Student parseFrom(
263 java.nio.ByteBuffer data)
264 throws com.google.protobuf.InvalidProtocolBufferException {
265 return PARSER.parseFrom(data);
266 }
267 public static Student parseFrom(
268 java.nio.ByteBuffer data,
269 com.google.protobuf.ExtensionRegistryLite extensionRegistry)
270 throws com.google.protobuf.InvalidProtocolBufferException {
271 return PARSER.parseFrom(data, extensionRegistry);
272 }
273 public static Student parseFrom(
274 com.google.protobuf.ByteString data)
275 throws com.google.protobuf.InvalidProtocolBufferException {
276 return PARSER.parseFrom(data);
277 }
278 public static Student parseFrom(
279 com.google.protobuf.ByteString data,
280 com.google.protobuf.ExtensionRegistryLite extensionRegistry)
281 throws com.google.protobuf.InvalidProtocolBufferException {
282 return PARSER.parseFrom(data, extensionRegistry);
283 }
284 public static Student parseFrom(byte[] data)
285 throws com.google.protobuf.InvalidProtocolBufferException {
286 return PARSER.parseFrom(data);
287 }
288 public static Student parseFrom(
289 byte[] data,
290 com.google.protobuf.ExtensionRegistryLite extensionRegistry)
291 throws com.google.protobuf.InvalidProtocolBufferException {
292 return PARSER.parseFrom(data, extensionRegistry);
293 }
294 public static Student parseFrom(java.io.InputStream input)
295 throws java.io.IOException {
296 return com.google.protobuf.GeneratedMessageV3
297 .parseWithIOException(PARSER, input);
298 }
299 public static Student parseFrom(
300 java.io.InputStream input,
301 com.google.protobuf.ExtensionRegistryLite extensionRegistry)
302 throws java.io.IOException {
303 return com.google.protobuf.GeneratedMessageV3
304 .parseWithIOException(PARSER, input, extensionRegistry);
305 }
306 public static Student parseDelimitedFrom(java.io.InputStream input)
307 throws java.io.IOException {
308 return com.google.protobuf.GeneratedMessageV3
309 .parseDelimitedWithIOException(PARSER, input);
310 }
311 public static Student parseDelimitedFrom(
312 java.io.InputStream input,
313 com.google.protobuf.ExtensionRegistryLite extensionRegistry)
314 throws java.io.IOException {
315 return com.google.protobuf.GeneratedMessageV3
316 .parseDelimitedWithIOException(PARSER, input, extensionRegistry);
317 }
318 public static Student parseFrom(
319 com.google.protobuf.CodedInputStream input)
320 throws java.io.IOException {
321 return com.google.protobuf.GeneratedMessageV3
322 .parseWithIOException(PARSER, input);
323 }
324 public static Student parseFrom(
325 com.google.protobuf.CodedInputStream input,
326 com.google.protobuf.ExtensionRegistryLite extensionRegistry)
327 throws java.io.IOException {
328 return com.google.protobuf.GeneratedMessageV3
329 .parseWithIOException(PARSER, input, extensionRegistry);
330 }
332 @Override
333 public Builder newBuilderForType() { return newBuilder(); }
334 public static Builder newBuilder() {
335 return DEFAULT_INSTANCE.toBuilder();
336 }
337 public static Builder newBuilder(Student prototype) {
338 return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
339 }
340 @Override
341 public Builder toBuilder() {
342 return this == DEFAULT_INSTANCE
343 ? new Builder() : new Builder().mergeFrom(this);
344 }
346 @Override
347 protected Builder newBuilderForType(
348 BuilderParent parent) {
349 Builder builder = new Builder(parent);
350 return builder;
351 }
352 /**
353 * <pre>
354 *protobuf 使用message 管理数据
355 * </pre>
356 *
357 * Protobuf type {@code Student}
358 */
359 public static final class Builder extends
360 com.google.protobuf.GeneratedMessageV3.Builder<Builder> implements
361 // @@protoc_insertion_point(builder_implements:Student)
362 StudentOrBuilder {
363 public static final com.google.protobuf.Descriptors.Descriptor
364 getDescriptor() {
365 return StudentPOJO.internal_static_Student_descriptor;
366 }
368 @Override
369 protected FieldAccessorTable
370 internalGetFieldAccessorTable() {
371 return StudentPOJO.internal_static_Student_fieldAccessorTable
372 .ensureFieldAccessorsInitialized(
373 Student.class, Builder.class);
374 }
376 // Construct using StudentPOJO.Student.newBuilder()
377 private Builder() {
378 maybeForceBuilderInitialization();
379 }
381 private Builder(
382 BuilderParent parent) {
383 super(parent);
384 maybeForceBuilderInitialization();
385 }
386 private void maybeForceBuilderInitialization() {
387 if (com.google.protobuf.GeneratedMessageV3
388 .alwaysUseFieldBuilders) {
389 }
390 }
391 @Override
392 public Builder clear() {
393 super.clear();
394 id_ = 0;
396 name_ = "";
398 return this;
399 }
401 @Override
402 public com.google.protobuf.Descriptors.Descriptor
403 getDescriptorForType() {
404 return StudentPOJO.internal_static_Student_descriptor;
405 }
407 @Override
408 public Student getDefaultInstanceForType() {
409 return Student.getDefaultInstance();
410 }
412 @Override
413 public Student build() {
414 Student result = buildPartial();
415 if (!result.isInitialized()) {
416 throw newUninitializedMessageException(result);
417 }
418 return result;
419 }
421 @Override
422 public Student buildPartial() {
423 Student result = new Student(this);
424 result.id_ = id_;
425 result.name_ = name_;
426 onBuilt();
427 return result;
428 }
430 @Override
431 public Builder clone() {
432 return super.clone();
433 }
434 @Override
435 public Builder setField(
436 com.google.protobuf.Descriptors.FieldDescriptor field,
437 Object value) {
438 return super.setField(field, value);
439 }
440 @Override
441 public Builder clearField(
442 com.google.protobuf.Descriptors.FieldDescriptor field) {
443 return super.clearField(field);
444 }
445 @Override
446 public Builder clearOneof(
447 com.google.protobuf.Descriptors.OneofDescriptor oneof) {
448 return super.clearOneof(oneof);
449 }
450 @Override
451 public Builder setRepeatedField(
452 com.google.protobuf.Descriptors.FieldDescriptor field,
453 int index, Object value) {
454 return super.setRepeatedField(field, index, value);
455 }
456 @Override
457 public Builder addRepeatedField(
458 com.google.protobuf.Descriptors.FieldDescriptor field,
459 Object value) {
460 return super.addRepeatedField(field, value);
461 }
462 @Override
463 public Builder mergeFrom(com.google.protobuf.Message other) {
464 if (other instanceof Student) {
465 return mergeFrom((Student)other);
466 } else {
467 super.mergeFrom(other);
468 return this;
469 }
470 }
472 public Builder mergeFrom(Student other) {
473 if (other == Student.getDefaultInstance()) return this;
474 if (other.getId() != 0) {
475 setId(other.getId());
476 }
477 if (!other.getName().isEmpty()) {
478 name_ = other.name_;
479 onChanged();
480 }
481 this.mergeUnknownFields(other.unknownFields);
482 onChanged();
483 return this;
484 }
486 @Override
487 public final boolean isInitialized() {
488 return true;
489 }
491 @Override
492 public Builder mergeFrom(
493 com.google.protobuf.CodedInputStream input,
494 com.google.protobuf.ExtensionRegistryLite extensionRegistry)
495 throws java.io.IOException {
496 Student parsedMessage = null;
497 try {
498 parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
499 } catch (com.google.protobuf.InvalidProtocolBufferException e) {
500 parsedMessage = (Student) e.getUnfinishedMessage();
501 throw e.unwrapIOException();
502 } finally {
503 if (parsedMessage != null) {
504 mergeFrom(parsedMessage);
505 }
506 }
507 return this;
508 }
510 private int id_ ;
511 /**
512 * <pre>
513 * Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值
514 * </pre>
515 *
516 * <code>int32 id = 1;</code>
517 * @return The id.
518 */
519 @Override
520 public int getId() {
521 return id_;
522 }
523 /**
524 * <pre>
525 * Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值
526 * </pre>
527 *
528 * <code>int32 id = 1;</code>
529 * @param value The id to set.
530 * @return This builder for chaining.
531 */
532 public Builder setId(int value) {
534 id_ = value;
535 onChanged();
536 return this;
537 }
538 /**
539 * <pre>
540 * Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值
541 * </pre>
542 *
543 * <code>int32 id = 1;</code>
544 * @return This builder for chaining.
545 */
546 public Builder clearId() {
548 id_ = 0;
549 onChanged();
550 return this;
551
}
553 private Object name_ = "";
554 /**
555 * <code>string name = 2;</code>
556 * @return The name.
557 */
558 public String getName() {
559 Object ref = name_;
560 if (!(ref instanceof String)) {
561 com.google.protobuf.ByteString bs =
562 (com.google.protobuf.ByteString) ref;
563 String s = bs.toStringUtf8();
564 name_ = s;
565 return s;
566 } else {
567 return (String) ref;
568 }
569 }
570 /**
571 * <code>string name = 2;</code>
572 * @return The bytes for name.
573 */
574 public com.google.protobuf.ByteString
575 getNameBytes() {
576 Object ref = name_;
577 if (ref instanceof String) {
578 com.google.protobuf.ByteString b =
579 com.google.protobuf.ByteString.copyFromUtf8(
580 (String) ref);
581 name_ = b;
582 return b;
583 } else {
584 return (com.google.protobuf.ByteString) ref;
585 }
586 }
587 /**
588 * <code>string name = 2;</code>
589 * @param value The name to set.
590 * @return This builder for chaining.
591 */
592 public Builder setName(
593 String value) {
594 if (value == null) {
595 throw new NullPointerException();
596 }
598 name_ = value;
599 onChanged();
600 return this;
601 }
602 /**
603 * <code>string name = 2;</code>
604 * @return This builder for chaining.
605 */
606 public Builder clearName() {
608 name_ = getDefaultInstance().getName();
609 onChanged();
610 return this;
611 }
612 /**
613 * <code>string name = 2;</code>
614 * @param value The bytes for name to set.
615 * @return This builder for chaining.
616 */
617 public Builder setNameBytes(
618 com.google.protobuf.ByteString value) {
619 if (value == null) {
620 throw new NullPointerException();
621 }
622 checkByteStringIsUtf8(value);
624 name_ = value;
625 onChanged();
626 return this;
627 }
628 @Override
629 public final Builder setUnknownFields(
630 final com.google.protobuf.UnknownFieldSet unknownFields) {
631 return super.setUnknownFields(unknownFields);
632 }
634 @Override
635 public final Builder mergeUnknownFields(
636 final com.google.protobuf.UnknownFieldSet unknownFields) {
637 return super.mergeUnknownFields(unknownFields);
638 }
641 // @@protoc_insertion_point(builder_scope:Student)
642 }
644 // @@protoc_insertion_point(class_scope:Student)
645 private static final Student DEFAULT_INSTANCE;
646 static {
647 DEFAULT_INSTANCE = new Student();
648 }
650 public static Student getDefaultInstance() {
651 return DEFAULT_INSTANCE;
652 }
654 private static final com.google.protobuf.Parser<Student>
655 PARSER = new com.google.protobuf.AbstractParser<Student>() {
656 @Override
657 public Student parsePartialFrom(
658 com.google.protobuf.CodedInputStream input,
659 com.google.protobuf.ExtensionRegistryLite extensionRegistry)
660 throws com.google.protobuf.InvalidProtocolBufferException {
661 return new Student(input, extensionRegistry);
662 }
663 };
665 public static com.google.protobuf.Parser<Student> parser() {
666 return PARSER;
667 }
669 @Override
670 public com.google.protobuf.Parser<Student> getParserForType() {
671 return PARSER;
672 }
674 @Override
675 public Student getDefaultInstanceForType() {
676 return DEFAULT_INSTANCE;
677 }
679 }
681 private static final com.google.protobuf.Descriptors.Descriptor
682 internal_static_Student_descriptor;
683 private static final
684 com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
685 internal_static_Student_fieldAccessorTable;
687 public static com.google.protobuf.Descriptors.FileDescriptor
688 getDescriptor() {
689 return descriptor;
690 }
691 private static com.google.protobuf.Descriptors.FileDescriptor
692 descriptor;
693 static {
694 String[] descriptorData = {
695 "\n\021StudentPOJO.proto\"#\n\007Student\022\n\n\002id\030\001 \001" +
696 "(\005\022\014\n\004name\030\002 \001(\tB\rB\013StudentPOJOb\006proto3"
697 };
698 descriptor = com.google.protobuf.Descriptors.FileDescriptor
699 .internalBuildGeneratedFileFrom(descriptorData,
700 new com.google.protobuf.Descriptors.FileDescriptor[] {
701 });
702 internal_static_Student_descriptor =
703 getDescriptor().getMessageTypes().get(0);
704 internal_static_Student_fieldAccessorTable = new
705 com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
706 internal_static_Student_descriptor,
707 new String[] { "Id", "Name", });
708 }
710 // @@protoc_insertion_point(outer_class_scope)
711 }
View Code
可以看到生成一堆信息。
5. 修改之前的类信息,客户端用Prodobuf 编码,服务器端用Protobuf 解码
Serverl
package netty.codec;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 1. 创建bossGrou好eworkerGroup
// bossGroup只负责连接请求,其他交给workerGroup, 两个都是无线循环
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 创建服务器端启动对象用于设置参数
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用链式编程设置参数
bootstrap.group(bossGroup, workerGroup)// 设置两个组
.channel(NioServerSocketChannel.class) // 设置服务器的通道
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() { // 设置通道测试对象(匿名对象)
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 加入ProtobufEncoder
socketChannel.pipeline().addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
socketChannel.pipeline().addLast(new ServerHandler());
System.out.println("服务端is ok。。。");
// 启动服务器并绑定端口。绑定一个端口并且同步,生成一个ChannelFuture对象
ChannelFuture channelFuture = bootstrap.bind(6666).sync();
// 对关闭通道进行监控
channelFuture.channel().closeFuture().sync();
ServerHandler
package netty.codec;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
* 自定义服务器端处理handler,需要继承netty定义的ChannelInboundHandlerAdapter 类
public class ServerHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> {
* 读取事件
* @param ctx
* @param msg
* @throws Exception
@Override
protected void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception {
System.out.println("客户端发送的数据 id=" + msg.getId() + " 名字=" + msg.getName());
* 数据读取完毕事件
* @param ctx
* @throws Exception
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 将数据写到客户端(write + flush)
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!", CharsetUtil.UTF_8));
* 发生异常事件
* @param ctx
* @param cause
* @throws Exception
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
Client
package netty.codec;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 创建一个事件循环组
EventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
// 创建一个启动Bootstrap(注意是Netty包下的)
Bootstrap bootstrap = new Bootstrap();
// 链式设置参数
bootstrap.group(eventExecutors) // 设置线程组
.channel(NioSocketChannel.class) // 设置通道class
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 加入ProtobufEncoder
socketChannel.pipeline().addLast(new ProtobufEncoder());
socketChannel.pipeline().addLast(new ClientHandler());
System.out.println("客户端is ok...");
// 启动客户端连接服务器(ChannelFuture 是netty的异步模型)
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
// 监听关闭通道
channelFuture.channel().closeFuture().sync();
} finally {
// 关闭
eventExecutors.shutdownGracefully();
ClientHandler
package netty.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
* 自定义服务器端处理handler,需要继承netty定义的ChannelInboundHandlerAdapter 类
public class ClientHandler extends ChannelInboundHandlerAdapter {
* 通道就绪事件
* @param ctx
* @throws Exception
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ClientHandler ctx: " + ctx);
ctx.writeAndFlush(StudentPOJO.Student.newBuilder().setId(1).setName("张三").build());
* 读取事件
* @param ctx 上下文对象,含有pipeline管道;通道channel;地址address等信息
* @param msg 客户端发送的数据(实际类型是ByteBuf - netty 封装的ByteBuffer)
* @throws Exception
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 强转为netty的ByteBuffer(实际就是包装的ByteBuffer)
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务器会送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("服务器地址:" + ctx.channel().remoteAddress());
* 发生异常事件
* @param ctx
* @param cause
* @throws Exception
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
3. 案例二
实现一个对象包含两个对象。 根据类型传递不同的对象,服务器端根据对应的对象解开。
(1) proto 文件如下
MyData.proto
syntax = "proto3";
option optimize_for = SPEED; // 加快解析
option java_package="netty.codec"; //指定生成到哪个包下
option java_outer_classname="MyDataInfo"; // 外部类名, 文件名
//protobuf 可以使用message 管理其他的message
message MyMessage {
//定义一个枚举类型
enum DataType {
StudentType = 0; //在proto3 要求enum的编号从0开始
WorkerType = 1;
//用data_type 来标识传的是哪一个枚举类型
DataType data_type = 1;
//表示每次枚举类型最多只能出现其中的一个, 节省空间
oneof dataBody {
Student student = 2;
Worker worker = 3;
message Student {
int32 id = 1;//Student类的属性
string name = 2; //
message Worker {
string name=1;
int32 age=2;
(2) cmd 用如下命令生成java 类
protoc.exe --java_out=. MyData.proto
(3) ClientHandler 改造:
* 通道就绪事件
* @param ctx
* @throws Exception
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ClientHandler ctx: " + ctx);
MyDataInfo.MyMessage myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType).setStudent(MyDataInfo.Student.newBuilder().setId(1).setName("李四").build()).build();
ctx.writeAndFlush(myMessage);
(4) ServerHandler 改造
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
if (msg.getDataType().equals(MyDataInfo.MyMessage.DataType.StudentType)) {
System.out.println("student name: " + msg.getStudent().getName());
hive查询json字段中的每一个key hive jsonserde
首先Hive提供了三种方案(应用于不同的场景):建表JsonSerDe内置函数get_json_object内置函数json_tuple1.使用JsonSerDe创建表加载JSON文件注意此种方式需要满足的条件:json文件必须是形如如下格式:下面 xxx可以是对象,也可以是数组{
key1:xxxx,
key2:xx,
key3,xxx
}另外这个文件必须存的是一个jso