Hadoop中一些采样器的达成
发布时间:2021-11-21 20:52:42 所属栏目:教程 来源:互联网
导读:Hadoop中采样是由org.apache.hadoop.mapred.lib.InputSampler类来实现的。 InputSampler类实现了三种采样方法:SplitSampler、RandomSampler和IntervalSampler。 SplitSampler、RandomSampler和IntervalSampler都是InputSampler的静态内部类,它们都实现了Inp
Hadoop中采样是由org.apache.hadoop.mapred.lib.InputSampler类来实现的。 InputSampler类实现了三种采样方法:SplitSampler、RandomSampler和IntervalSampler。 SplitSampler、RandomSampler和IntervalSampler都是InputSampler的静态内部类,它们都实现了InputSampler的内部接口Sampler接口: public interface Sampler<K,V>{ K[] getSample(InputFormat<K,V> inf,JobConf job) throws IOException; } getSample方法根据job的配置信息以及输入格式获得抽样结果,三个采样类各自有不同的实现。 RandomSampler随机地从输入数据中抽取Key,是一个通用的采样器。RandomSampler类有三个属性:freq(一个Key被选中的概率),numSamples(从所有被选中的分区中获得的总共的样本数目),maxSplitsSampled(需要检查扫描的最大分区数目)。 RandomSampler中getSample方法的实现如下: public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException { InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks()); ArrayList<K> samples = new ArrayList<K>(numSamples); int splitsToSample = Math.min(maxSplitsSampled, splits.length); Random r = new Random(); long seed = r.nextLong(); r.setSeed(seed); LOG.debug("seed: " + seed); // shuffle splits for (int i = 0; i < splits.length; ++i) { InputSplit tmp = splits[i]; int j = r.nextInt(splits.length); splits[i] = splits[j]; splits[j] = tmp; } // our target rate is in terms of the maximum number of sample splits, // but we accept the possibility of sampling additional splits to hit // the target sample keyset for (int i = 0; i < splitsToSample || (i < splits.length && samples.size() < numSamples); ++i) { RecordReader<K,V> reader = inf.getRecordReader(splits[i], job, Reporter.NULL); K key = reader.createKey(); V value = reader.createValue(); while (reader.next(key, value)) { if (r.nextDouble() <= freq) { if (samples.size() < numSamples) { samples.add(key); } else { // When exceeding the maximum number of samples, replace a // random element with this one, then adjust the frequency // to reflect the possibility of existing elements being // pushed out int ind = r.nextInt(numSamples); if (ind != numSamples) { samples.set(ind, key); } freq *= (numSamples - 1) / (double) numSamples; } key = reader.createKey(); } } reader.close(); } return (K[])samples.toArray(); } ![]() (编辑:济南站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |