Flume TaildirSource可实时监控当前目录下一批文件,并记录每个文件最新消费位置,flume进程重启后不会有重复消费的问题;但是无法监听当前目录的子目录,要实现多目录的监听就需要配置多个group,无法做到动态识别子目录
对TaildirSource的使用不做过多介绍,请参考官方文档:Flume-1.7.0 官方文档
参考源代码为flume 1.7.0版本
为了与官网源码区别,我将源码所有.java文件都加了前缀Multiple
源码结构
MultipleTaildirMatcher.jar
- 用于根据条件获取目录文件等操作
MultipleReliableTaildirEventReader.jar
- 用于文件位置记录维护、flume event封装、事件处理等操作
MultipleTaildirSource.jar
- Source的入口,用于参数初始化、轮询线程池定义、文件位置更新等操作
MultipleTailFile.jar
- 封装了具体文件的操作,利用RandomAccessFile可以获取文件位置指针等信息
MultipleTaildirSourceConfigurationConstants.jar
- 参数配置的常量信息等
目录及其子目录遍历
首先需要找到获取文件目录的源码地方:MultipleTaildirMatcher.jar
相关方法:getMatchingFiles
、getMatchingFilesNoCache
、getFiles
- getMatchingFiles
1 | long currentParentDirMTime = parentDir.lastModified(); |
因为有上一次已经识别的文件缓存lastMatchedFiles
,以上代码的作用是检测是否需要从底层从新获取一次文件列表以更新缓存
需要注意的是currentParentDirMTime
这个变量,表示目录最后一次更新时间parentDir.lastModified()
这里有一个问题是,当我们操作子目录时(例如向子目录cp一个新文件),此时根目录的最后修改时间是不发生改变的,例如:1
2
3-- root (lastModified = 12:00)
-- dir01 (lastModified = 12:05)
-- file.txt
root为根目录,在其中有dir01一个子目录,dir01下有一个file.txt文件;如果此时cp一个file_copy.txt到dir01下面,只会更新dir01目录的lastModified,而root的lastModified不发生改变,此时虽然有新的文件加入了,但依然不满足if 条件,无法刷新缓存
因为源码是只支持一个根目录的,所以源码这样写是没有问题,但是目前我们需要对其子目录也做监听,就要考虑如何进入if中
方案一:每次轮训都将当前根目录下所有dir的最后时间获取,取出最大的时间更新currentParentDirMTime;此方案需要每次都轮序全目录,太消耗资源(不可用)
方案二:定时强制刷新缓存,即不管是否满足lastModified条件,到达一定时间就强制从底层获取文件列表更新缓存(可用)
1 | // 使用方案二,加入强制刷新时间参数 |
- getMatchingFilesNoCache
这个方法为源码方法,主要是看下源码如何获取文件路径,以及我们应该如何修改
1 | private List<File> getMatchingFilesNoCache() { |
该方法使用了Files.newDirectoryStream
将parentDir
下的文件全部获取,这里的fileFilter
是通过正则过滤符合条件的文件
- getFiles
这个方法是我自己实现的,获取目录及其子目录下的所有文件路径
1 | private static List<File> getFiles(File files) { |
这里用了一个非递归的遍历,因为在目录结构复杂层数比较多的情况下,使用递归入栈出栈和消耗的cpu、内存都要高,不是最佳选择;
遍历目录是一个性能瓶颈点,当遍历目录的耗时已经大于你所设置的强制刷新缓存间隔时,就会出现性能问题
关于遍历目录也有2个备选方案:
- Nio2遍历
1 | // 这种遍历是效率最高的,但是会有一个问题,当目录是软链ln -s时,根目录的文件无法被识别出来,导致遗漏文件,这个问题暂时还没深究 |
- Google Guava遍历
1 | // 此方法的遍历效率最低,不推荐使用 |
新增参数
在MultipleTaildirSourceConfigurationConstants.jar
中新增了两个参数
1 | // 强制刷新缓存时间间隔,默认:2 * 60 * 1000 ms |
- 具体使用和测试方式参考README.md
转载请注明出处:https://github.com/imperio-wxm