Fork me on GitHub

Spark SequenceFile 多路径输出

使用Spark读写SequenceFile时,有时需要根据一定的规则动态生成文件的输出路径,且需要将某一类的数据都输出到同一个Path下,这时就需要使用MultipleSequenceFileOutputFormat

SoftWare Version
Scala 2.11.11
Spark 2.4.0

MultipleSequenceFileOutputFormat 源码预览

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 继承自MultipleOutputFormat
public class MultipleSequenceFileOutputFormat <K,V> extends MultipleOutputFormat<K, V> {
private SequenceFileOutputFormat<K,V> theSequenceFileOutputFormat = null;

// 重写了 getBaseRecordWriter 方法
@Override
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
if (theSequenceFileOutputFormat == null) {
theSequenceFileOutputFormat = new SequenceFileOutputFormat<K,V>();
}
// 返回 SequenceFileOutputFormat 的getRecordWriter,提供writer句柄
return theSequenceFileOutputFormat.getRecordWriter(fs, job, name, arg3);
}
}

SequenceFileOutputFormat 源码预览

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
// 获取输出临时目录
Path file = FileOutputFormat.getTaskOutputPath(job, name);

// 获取FileSystem操作句柄
FileSystem fs = file.getFileSystem(job);
CompressionCodec codec = null;
// 设置压缩编码
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
// find the kind of compression to do
compressionType = getOutputCompressionType(job);
// find the right codec
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
}
// 正式创建SequenceFile writer,其实这里和我们自己写sequenceFile writer过程是一样的
final SequenceFile.Writer out = SequenceFile.createWriter(fs, job, file, job.getOutputKeyClass(), job.getOutputValueClass(), compressionType, codec, progress);

// 返回被RecordWriter 包装后的 sequenceFile writer
return new RecordWriter<K, V>() {
public void write(K key, V value)throws IOException {
out.append(key, value);
}

public void close(Reporter reporter) throws IOException {
out.close();
}
};
}

....其他方法略过,例如设置Compression的方法等

MultipleOutputFormat 源码预览

主要是对这个类中方法的重写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 内容略,就是上面的getRecordWriter,获取writer
public RecordWriter<K, V> getRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
..........
}

/**
* Generate the leaf name for the output file name. The default behavior does
* not change the leaf file name (such as part-00000)
*
* @param name
* the leaf file name for the output file
* @return the given leaf file name
*/
// 定义输出文件名
protected String generateLeafFileName(String name) {
return name;
}

/**
* Generate the file output file name based on the given key and the leaf file
* name. The default behavior is that the file name does not depend on the
* key.
*
* @param key
* the key of the output data
* @param name
* the leaf file name
* @return generated file name
*/
// 可根据key,value值自定义输出文件路径
protected String generateFileNameForKeyValue(K key, V value, String name) {
return name;
}

/**
* Generate the actual key from the given key/value. The default behavior is that
* the actual key is equal to the given key
*
* @param key
* the key of the output data
* @param value
* the value of the output data
* @return the actual key derived from the given key/value
*/
// 自定义输出key
protected K generateActualKey(K key, V value) {
return key;
}

/**
* Generate the actual value from the given key and value. The default behavior is that
* the actual value is equal to the given value
*
* @param key
* the key of the output data
* @param value
* the value of the output data
* @return the actual value derived from the given key/value
*/
// 自定义输出value
protected V generateActualValue(K key, V value) {
return value;
}

自定义OutputFormat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 自定义一个输出类,继承自MultipleSequenceFileOutputFormat
// 然后重写里面的各种方法
// 其中重写generateFileNameForKeyValue以达到自定义路径的目的
// 可以根据key,value生成目录,以达到将相同key或其他形式的内容输出到同一个路径下

class SplitMultipleSequenceFileOutputFormat[K, V] extends MultipleSequenceFileOutputFormat[K, V]() {

// 可以自定义输出文件名,such as part-00000
override def generateLeafFileName(name: String): String = super.generateLeafFileName(name)

// 可以自定义输出value
override def generateActualValue(key: K, value: V): V = super.generateActualValue(key, value)

// 可以自定义输出key
override def generateActualKey(key: K, value: V): K = super.generateActualKey(key, value)

override def generateFileNameForKeyValue(key: K, value: V, name: String): String = {
// 具体拆分逻辑
// 例如文件中key的分布为:a、b、c,我希望a、b、c各自有独立的目录,且独立目录中只有包含各自key 的数据
// 这样输出的目录就是:
// basePath/a/part-00000 且key为a的全在这个文件中
// basePath/b/part-00000 且key为b的全在这个文件中
// basePath/c/part-00000 且key为c的全在这个文件中
key.toString + "/" + name
}
}

自定义OutputFormat 使用

1
2
3
4
5
6
7
8
9
10
// 第一步是读取数据源,可以是其他,我这里是读取一个txt文件然后根据一定规则转换成sequenceFile
// 第二步是数据中间的处理逻辑
// 第三步是输出,使用saveAsHadoopFile(输出base路径,key的类型,value的类型,输出文件类型),最后设置自定义的SplitMultipleSequenceFileOutputFormat
// 注意:其中sequenceFile的key,value都必须是可序列化的,例如hadoop text

sparkContext.textFile(inputPath)
.map(line => parserData(line))
.filter(newLine => filterData(newLine))
......数据处理逻辑
.saveAsHadoopFile(outPath, classOf[Text], classOf[Text], classOf[SplitMultipleSequenceFileOutputFormat[_, _]])

spark-file-split案例

【此Project的作用是】:

  1. 数据以竖线分隔的文本,且与avro schema字段对应
  2. 通过文本内容和avro schema,判断数据时间event_time是否符合入参标准
  3. 从文本内容中解析出表名、数据时间,生成分区日期、自定义输出路径
  4. 使得同一张表、同一日期分区的数据,落入同一个path
  5. 之后可以将输出目录的数据直接load到hive sequence表中

转载请注明出处:https://github.com/imperio-wxm


Thank you for your support.