Bulkload
LoadIncrementalHFiles doBulkLoad()
初始化线程池
遍历HFile文件discoverLoadQueue()
获取每个region的Key范围getStartEndKeys()
groupOrSplitPhase()
将Hfile分配到所在的region里,若Hfile的范围超过了region,进行切分 groupOrSplit()
切分工作交给线程池处理
收集返回切分结果
bulkLoadPhase()
tryAtomicRegionLoad()
bulkLoadHFiles()
getRegion(regionName)
region.bulkLoadHFiles()
获得锁startBulkRegionOperation()
getStore(familyName)
验证HFile的正确性assertBulkLoadHFileOk()
store.bulkLoadHFile(finalPath)
建立新的StoreFile
将新StoreFile加入列表中
sortAndClone()
notifyChangedReadersObservers()
关闭锁closeBulkRegionOperation()
把没有成功的文件移回原位
返回重试列表
load过程交给线程池处理
收集结果,把重试列表加到队列中
关闭线程池
ImportTSV
读取配置并检查
createSubmittableJob()
创建Job
setInputFormatClass(TextInputFormat.class)
setMapperClass
建立HTable连接
对Put结果排序输出 setReducerClass(PutSortReducer.class)
设置字节数组为Key setMapOutputKeyClass(ImmutableBytesWritable.class)
设置Put为Value setMapOutputValueClass(Put.class)
HFileOutputFormat.configureIncrementalLoad(job, table)
setPartitionerClass(getTotalOrderPartitionerClass())
setOutputKeyClass(ImmutableBytesWritable.class)
setOutputValueClass(KeyValue.class)
setOutputFormatClass(HFileOutputFormat.class)
返回RecordWriter<ImmutableBytesWritable, KeyValue>
如果一行写完且HFile达到阈值rollWriters()
wl.writer.append(kv)
setReducerClass 根据配置选择KeyValueSortReducer或PutSortReducer
根据region数量setNumReduceTasks