Fork me on GitHub

Flink Kafka Split (分流自定义序列化器)

Flink Kafka Connecter会根据用户设置自动处理消息的序列化、反序列化;官方默认已经给出了几个序列化实现TypeInformation、Json、Avro,我们可以根据其实现扩展自己的序列化、反序列化类

Environment

Software Version
Flink 1.10.0
Jdk 1.8
Kafka 1.0.0

Kafka序列化、反序列化

KafkaSerializationSchema replace KeyedSerializationSchema

Flink在1.7版本之前,Kafka < 1.0.0,对于Kafka的Producer序列化和反序列化是不直接支持ProducerRecord<byte[], byte[]>的,这也就导致类似Headers之类的Kafka元数据信息无法被获取和传递

Flink在1.7版本之后,Kafka >= 1.0.0,加入了KafkaSerializationSchema、KafkaDeserializationSchema,可以直接对Record进行处理

Before Flink 1.7

Producer

Kafka < 1.0.0

  • FlinkKafkaProducer011
  • FlinkKafkaProducer010
  • FlinkKafkaProducer09

Producer序列化器支持:KeyedSerializationSchema、SerializationSchema 两种

只支持key、value的传输

如果要动态修改路由的topic,可以重写getTargetTopic方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// KeyedSerializationSchema<T>
public interface KeyedSerializationSchema<T> extends Serializable {

/**
* Serializes the key of the incoming element to a byte array
* This method might return null if no key is available.
*
* @param element The incoming element to be serialized
* @return the key of the element as a byte array
*/
byte[] serializeKey(T element);

/**
* Serializes the value of the incoming element to a byte array.
*
* @param element The incoming element to be serialized
* @return the value of the element as a byte array
*/
byte[] serializeValue(T element);

/**
* Optional method to determine the target topic for the element.
*
* @param element Incoming element to determine the target topic from
* @return null or the target topic
*/
String getTargetTopic(T element);
}
1
2
3
4
5
6
7
8
9
10
11
12
// SerializationSchema<T>
public interface SerializationSchema<T> extends Serializable {

/**
* Serializes the incoming element to a specified type.
*
* @param element
* The incoming element to be serialized
* @return The serialized element.
*/
byte[] serialize(T element);
}

Consumer

Kafka < 1.0.0

  • FlinkKafkaConsumer011
  • FlinkKafkaConsumer010
  • FlinkKafkaConsumer09

Consumer反序列化器支持:DeserializationSchema、KafkaDeserializationSchema 两种

KafkaDeserializationSchema 可以对ConsumerRecord进行自定义,DeserializationSchema只能反序列化value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// KafkaDeserializationSchema<T>
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

/**
* Method to decide whether the element signals the end of the stream. If
* true is returned the element won't be emitted.
*
* @param nextElement The element to test for the end-of-stream signal.
*
* @return True, if the element signals end of stream, false otherwise.
*/
boolean isEndOfStream(T nextElement);

/**
* Deserializes the Kafka record.
*
* @param record Kafka record to be deserialized.
*
* @return The deserialized message as an object (null if the message cannot be deserialized).
*/
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// DeserializationSchema<T>
public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

/**
* Deserializes the byte message.
*
* @param message The message, as a byte array.
*
* @return The deserialized message as an object (null if the message cannot be deserialized).
*/
T deserialize(byte[] message) throws IOException;

/**
* Method to decide whether the element signals the end of the stream. If
* true is returned the element won't be emitted.
*
* @param nextElement The element to test for the end-of-stream signal.
* @return True, if the element signals end of stream, false otherwise.
*/
boolean isEndOfStream(T nextElement);
}

After Flink 1.7

Flink 1.7之后使用FlinkKafkaConsumer、FlinkKafkaProducer统一了Kafka Producer、Consumer端

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>

Producer

  • 支持KafkaSerializationSchema,可以自定义ProducerRecord
1
2
3
4
5
6
7
8
9
10
11
12
// KafkaSerializationSchema<T>
public interface KafkaSerializationSchema<T> extends Serializable {

/**
* Serializes given element and returns it as a {@link ProducerRecord}.
*
* @param element element to be serialized
* @param timestamp timestamp (can be null)
* @return Kafka {@link ProducerRecord}
*/
ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);
}
  • 保留KeyedSerializationSchema,key、value序列化器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// KeyedSerializationSchema<T>
public interface KeyedSerializationSchema<T> extends Serializable {

/**
* Serializes the key of the incoming element to a byte array
* This method might return null if no key is available.
*
* @param element The incoming element to be serialized
* @return the key of the element as a byte array
*/
byte[] serializeKey(T element);

/**
* Serializes the value of the incoming element to a byte array.
*
* @param element The incoming element to be serialized
* @return the value of the element as a byte array
*/
byte[] serializeValue(T element);

/**
* Optional method to determine the target topic for the element.
*
* @param element Incoming element to determine the target topic from
* @return null or the target topic
*/
String getTargetTopic(T element);
}

Flink Kafka 分流

思路(不对Kakfa中value做处理,只根据条件做消息转发):

  1. Source topic中有多重消息,使用Kafka Headers中设置路由键real_topic来确定Sink到具体哪个topic
  2. Flink Consumer消费Source Topic,ConsumerRecord<byte[], byte[]>并不做处理,直接转发到下游,因为value有可能是Json、Avro等,不做处理
  3. 可自定义Process对ConsumerRecord<byte[], byte[]>进行处理
  4. Sink端需要从ConsumerRecord<byte[], byte[]>中获取Header的real_topic,确定具体Sink到哪个下游Topic,然后重新封装ProducerRecord<byte[], byte[]>转发出去
  • Consumer 反序列化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class KafkaRecordDeserializationSchema implements KafkaDeserializationSchema<ConsumerRecord<byte[], byte[]>> {

@Override
public boolean isEndOfStream(ConsumerRecord<byte[], byte[]> nextElement) {
return false;
}

@Override
public ConsumerRecord<byte[], byte[]> deserialize(ConsumerRecord<byte[], byte[]> record) {
return record;
}

@Override
public TypeInformation<ConsumerRecord<byte[], byte[]>> getProducedType() {
return TypeInformation.of(new TypeHint<ConsumerRecord<byte[], byte[]>>() {
});
}
}
  • Producer 序列化
1
2
3
4
5
6
7
8
9
10
11
12
13
public class KafkaRecordSerializationSchema implements KafkaSerializationSchema<ConsumerRecord<byte[], byte[]>> {
@Override
public ProducerRecord<byte[], byte[]> serialize(ConsumerRecord<byte[], byte[]> element, @Nullable Long timestamp) {
// 将Header转成Map
Map<String, String> headerMap = CommonUtils.headerToMap(element.headers());
// 从Map中获取real_tpic
String realTopic = headerMap.containsKey(CommonUtils.REAL_TOPIC_KEY) ? headerMap.get(CommonUtils.REAL_TOPIC_KEY) : CommonUtils.DEFAULT_TOPIC;
// 封装新消息,更新topic,转发出去
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(realTopic, element.key(), element.value());
CommonUtils.putHeader(element, record);
return record;
}
}
  • Main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
......
......
// 添加Soruce
DataStream<ConsumerRecord<byte[], byte[]>> stream = env.addSource(new FlinkKafkaConsumer<>(
sourceTopic,
// 添加自定义反序列化器
new KafkaRecordDeserializationSchema(),
properties
));

// 中间处理
stream.process(new ProcessFunction<ConsumerRecord<byte[], byte[]>, ConsumerRecord<byte[], byte[]>>() {
@Override
public void processElement(ConsumerRecord<byte[], byte[]> consumerRecord, Context context, Collector<ConsumerRecord<byte[], byte[]>> collector) throws Exception {
Map<String, String> headerMap = CommonUtils.headerToMap(consumerRecord.headers());
System.out.println(headerMap);
}
}).name("split-kafka").setParallelism(2);

// 添加Sink
stream.addSink(new FlinkKafkaProducer<>(
CommonUtils.DEFAULT_TOPIC,
// 添加自定义序列化器
new KafkaRecordSerializationSchema(),
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
));
......
......

转载请注明出处:https://github.com/imperio-wxm


Thank you for your support.