Categorie: Tutti - 文件 - 任务

da Haifan Zhu mancano 11 anni

432

Bulkload

这个过程描述了如何使用`Bulkload`和`ImportTSV`等工具来加载大量数据到HBase中。首先,通过创建和配置一个作业,设置输入输出格式以及映射和归约类,将字节数组和Put对象分别设置为Key和Value。接下来,建立与HTable的连接,并配置增量加载。为了确保数据的准确性和有效性,系统会根据region数量设置适当的Reduce任务数量,并进行数据排序和分区。

Bulkload

Bulkload

ImportTSV

createSubmittableJob()
HFileOutputFormat.configureIncrementalLoad(job, table)

根据region数量setNumReduceTasks

setReducerClass 根据配置选择KeyValueSortReducer或PutSortReducer

setOutputFormatClass(HFileOutputFormat.class)

wl.writer.append(kv)

如果一行写完且HFile达到阈值rollWriters()

返回RecordWriter

setOutputValueClass(KeyValue.class)

setOutputKeyClass(ImmutableBytesWritable.class)

setPartitionerClass(getTotalOrderPartitionerClass())

设置Put为Value setMapOutputValueClass(Put.class)
设置字节数组为Key setMapOutputKeyClass(ImmutableBytesWritable.class)
对Put结果排序输出 setReducerClass(PutSortReducer.class)
建立HTable连接
setMapperClass
setInputFormatClass(TextInputFormat.class)
创建Job
读取配置并检查

LoadIncrementalHFiles doBulkLoad()

关闭线程池
bulkLoadPhase()
收集结果,把重试列表加到队列中
load过程交给线程池处理
tryAtomicRegionLoad()

返回重试列表

把没有成功的文件移回原位

bulkLoadHFiles()

region.bulkLoadHFiles()

关闭锁closeBulkRegionOperation()

store.bulkLoadHFile(finalPath)

notifyChangedReadersObservers()

sortAndClone()

将新StoreFile加入列表中

建立新的StoreFile

验证HFile的正确性assertBulkLoadHFileOk()

getStore(familyName)

获得锁startBulkRegionOperation()

getRegion(regionName)

groupOrSplitPhase()
收集返回切分结果
切分工作交给线程池处理
将Hfile分配到所在的region里,若Hfile的范围超过了region,进行切分 groupOrSplit()
获取每个region的Key范围getStartEndKeys()
遍历HFile文件discoverLoadQueue()
初始化线程池