Fork me on GitHub

Flume TaildirSource 源码扩展(支持子目录)

Flume TaildirSource可实时监控当前目录下一批文件,并记录每个文件最新消费位置,flume进程重启后不会有重复消费的问题;但是无法监听当前目录的子目录,要实现多目录的监听就需要配置多个group,无法做到动态识别子目录

对TaildirSource的使用不做过多介绍,请参考官方文档:Flume-1.7.0 官方文档

参考源代码为flume 1.7.0版本

TaildirSource 官方源码

为了与官网源码区别,我将源码所有.java文件都加了前缀Multiple

MultipleTaildirSource 扩展源码

源码结构

MultipleTaildirMatcher.jar

  • 用于根据条件获取目录文件等操作

MultipleReliableTaildirEventReader.jar

  • 用于文件位置记录维护、flume event封装、事件处理等操作

MultipleTaildirSource.jar

  • Source的入口,用于参数初始化、轮询线程池定义、文件位置更新等操作

MultipleTailFile.jar

  • 封装了具体文件的操作,利用RandomAccessFile可以获取文件位置指针等信息

MultipleTaildirSourceConfigurationConstants.jar

  • 参数配置的常量信息等

目录及其子目录遍历

首先需要找到获取文件目录的源码地方:MultipleTaildirMatcher.jar

相关方法:getMatchingFilesgetMatchingFilesNoCachegetFiles

  • getMatchingFiles
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
long currentParentDirMTime = parentDir.lastModified();
// 当前时间 - 最后一次检查时间 大于 强制刷新间隔,则进行目录的强制刷新
boolean needForceRefresh = (now - lastCheckedTime) > forceRefreshIntervalMs;

// lastSeenParentDirMTime < currentParentDirMTime 为根目录修改时间大于上次查看时间
// !(currentParentDirMTime < lastCheckedTime) 为最后一次查看时间小于等于目录修改时间
if (!cachePatternMatching ||
lastSeenParentDirMTime < currentParentDirMTime ||
!(currentParentDirMTime < lastCheckedTime) ||
needForceRefresh) {
logger.info("Fetching files.....");
long start = System.currentTimeMillis();
List<File> files = getFiles(parentDir);
getAllFilesCost = System.currentTimeMillis() - start;
if (getAllFilesCost > forceRefreshIntervalMs) {
logger.error(String.format("Traversing the directory takes too long!!! cost = %s ms", getAllFilesCost));
}
logger.info(String.format("Getting files takes time = %s ms", getAllFilesCost));
lastMatchedFiles = sortByLastModifiedTime(files);
lastSeenParentDirMTime = currentParentDirMTime;
lastCheckedTime = now;
}

因为有上一次已经识别的文件缓存lastMatchedFiles,以上代码的作用是检测是否需要从底层从新获取一次文件列表以更新缓存

需要注意的是currentParentDirMTime这个变量,表示目录最后一次更新时间parentDir.lastModified()

这里有一个问题是,当我们操作子目录时(例如向子目录cp一个新文件),此时根目录的最后修改时间是不发生改变的,例如:

1
2
3
-- root (lastModified = 12:00)
-- dir01 (lastModified = 12:05)
-- file.txt

root为根目录,在其中有dir01一个子目录,dir01下有一个file.txt文件;如果此时cp一个file_copy.txt到dir01下面,只会更新dir01目录的lastModified,而root的lastModified不发生改变,此时虽然有新的文件加入了,但依然不满足if 条件,无法刷新缓存

因为源码是只支持一个根目录的,所以源码这样写是没有问题,但是目前我们需要对其子目录也做监听,就要考虑如何进入if中

  • 方案一:每次轮训都将当前根目录下所有dir的最后时间获取,取出最大的时间更新currentParentDirMTime;此方案需要每次都轮序全目录,太消耗资源(不可用)

  • 方案二:定时强制刷新缓存,即不管是否满足lastModified条件,到达一定时间就强制从底层获取文件列表更新缓存(可用)

1
2
3
// 使用方案二,加入强制刷新时间参数
// (当前轮序时间 - 最近一次轮训时间) > 我们所设置的间隔,就强制刷新
boolean needForceRefresh = (now - lastCheckedTime) > forceRefreshIntervalMs;

  • getMatchingFilesNoCache

这个方法为源码方法,主要是看下源码如何获取文件路径,以及我们应该如何修改

1
2
3
4
5
6
7
8
9
10
11
12
private List<File> getMatchingFilesNoCache() {
List<File> result = Lists.newArrayList();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(parentDir.toPath(), fileFilter)) {
for (Path entry : stream) {
result.add(entry.toFile());
}
} catch (IOException e) {
logger.error("I/O exception occurred while listing parent directory. " +
"Files already matched will be returned. " + parentDir.toPath(), e);
}
return result;
}

该方法使用了Files.newDirectoryStreamparentDir下的文件全部获取,这里的fileFilter是通过正则过滤符合条件的文件

  • getFiles

这个方法是我自己实现的,获取目录及其子目录下的所有文件路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static List<File> getFiles(File files) {
List<File> result = new ArrayList<>();
result.add(files);
for (int i = 0; i < result.size(); i++) {
if (result.get(i).isDirectory()) {
File[] list = result.get(i).listFiles();
if (null != list && list.length > 0) {
for (File f : list) {
if (null != pattern) {
// 只加入符合正则的文件
if (f.isFile() && pattern.matcher(f.getAbsolutePath()).matches()) {
result.add(f);
} else if (f.isDirectory()) {
result.add(f);
}
} else {
result.add(f);
}

}
}
}
}
result = result.stream().filter(File::isFile).collect(Collectors.toList());
result.forEach(f -> logger.debug("Path = " + f));
return result;
}

这里用了一个非递归的遍历,因为在目录结构复杂层数比较多的情况下,使用递归入栈出栈和消耗的cpu、内存都要高,不是最佳选择;

遍历目录是一个性能瓶颈点,当遍历目录的耗时已经大于你所设置的强制刷新缓存间隔时,就会出现性能问题

关于遍历目录也有2个备选方案:

  • Nio2遍历
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 这种遍历是效率最高的,但是会有一个问题,当目录是软链ln -s时,根目录的文件无法被识别出来,导致遗漏文件,这个问题暂时还没深究

FindJavaVisitor findJavaVisitor = new FindJavaVisitor();
Files.walkFileTree(startingDir, findJavaVisitor);
System.out.println(findJavaVisitor.getFileList().size());

private static class FindJavaVisitor extends SimpleFileVisitor<Path> {
private List<File> fileList = new ArrayList<>();
private List<File> dirs = new ArrayList<>();

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
fileList.add(file.toFile());
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
dirs.add(dir.toFile());
return FileVisitResult.CONTINUE;
}

public List<File> getFileList() {
return fileList;
}

public List<File> getDirs() {
return dirs;
}
}
  • Google Guava遍历
1
2
3
4
5
6
7
8
// 此方法的遍历效率最低,不推荐使用

List<File> list = new ArrayList<>();
for (File file : com.google.common.io.Files.fileTraverser().breadthFirst(new File(basePath))) {
if(file.isFile()) {
list.add(file);
}
}

新增参数

MultipleTaildirSourceConfigurationConstants.jar中新增了两个参数

1
2
3
4
5
// 强制刷新缓存时间间隔,默认:2 * 60 * 1000 ms
forceRefreshIntervalMs

// 对文件过滤的正则表达式,满足正则的文件路径才会监听,默认 null 不进行过滤全部监听
filePathRegularExpression

转载请注明出处:https://github.com/imperio-wxm


Thank you for your support.