Flink Kafka Connecter会根据用户设置自动处理消息的序列化、反序列化;官方默认已经给出了几个序列化实现TypeInformation、Json、Avro,我们可以根据其实现扩展自己的序列化、反序列化类
Environment
Software | Version | |
---|---|---|
Flink | 1.10.0 | |
Jdk | 1.8 | |
Kafka | 1.0.0 |
Kafka序列化、反序列化
KafkaSerializationSchema replace KeyedSerializationSchema
Flink在1.7版本之前,Kafka < 1.0.0,对于Kafka的Producer序列化和反序列化是不直接支持
ProducerRecord<byte[], byte[]>
的,这也就导致类似Headers
之类的Kafka元数据信息无法被获取和传递Flink在1.7版本之后,Kafka >= 1.0.0,加入了
KafkaSerializationSchema、KafkaDeserializationSchema
,可以直接对Record进行处理
Before Flink 1.7
Producer
Kafka < 1.0.0
- FlinkKafkaProducer011
- FlinkKafkaProducer010
- FlinkKafkaProducer09
Producer序列化器支持:KeyedSerializationSchema
、SerializationSchema 两种
只支持key、value的传输
如果要动态修改路由的topic,可以重写getTargetTopic
方法
1 | // KeyedSerializationSchema<T> |
1 | // SerializationSchema<T> |
Consumer
Kafka < 1.0.0
- FlinkKafkaConsumer011
- FlinkKafkaConsumer010
- FlinkKafkaConsumer09
Consumer反序列化器支持:DeserializationSchema
、KafkaDeserializationSchema 两种
KafkaDeserializationSchema 可以对ConsumerRecord进行自定义,DeserializationSchema只能反序列化value
1 | // KafkaDeserializationSchema<T> |
1 | // DeserializationSchema<T> |
After Flink 1.7
Flink 1.7之后使用
FlinkKafkaConsumer、FlinkKafkaProducer
统一了Kafka Producer、Consumer端
1 | <dependency> |
Producer
- 支持
KafkaSerializationSchema
,可以自定义ProducerRecord
1 | // KafkaSerializationSchema<T> |
- 保留
KeyedSerializationSchema
,key、value序列化器
1 | // KeyedSerializationSchema<T> |
Flink Kafka 分流
思路(不对Kakfa中value做处理,只根据条件做消息转发):
- Source topic中有多重消息,使用Kafka Headers中设置路由键
real_topic
来确定Sink到具体哪个topic - Flink Consumer消费Source Topic,
ConsumerRecord<byte[], byte[]>
并不做处理,直接转发到下游,因为value有可能是Json、Avro等,不做处理 - 可自定义Process对
ConsumerRecord<byte[], byte[]>
进行处理 - Sink端需要从
ConsumerRecord<byte[], byte[]>
中获取Header的real_topic
,确定具体Sink到哪个下游Topic,然后重新封装ProducerRecord<byte[], byte[]>
转发出去
- Consumer 反序列化
1 | public class KafkaRecordDeserializationSchema implements KafkaDeserializationSchema<ConsumerRecord<byte[], byte[]>> { |
- Producer 序列化
1 | public class KafkaRecordSerializationSchema implements KafkaSerializationSchema<ConsumerRecord<byte[], byte[]>> { |
- Main
1 | ...... |
转载请注明出处:https://github.com/imperio-wxm