博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊storm的PartialKeyGrouping
阅读量:6246 次
发布时间:2019-06-22

本文共 13672 字,大约阅读时间需要 45 分钟。

本文主要研究一下storm的PartialKeyGrouping

实例

@Test    public void testPartialKeyGrouping() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {        String spoutId = "wordGenerator";        String counterId = "counter";        String aggId = "aggregator";        String intermediateRankerId = "intermediateRanker";        String totalRankerId = "finalRanker";        int TOP_N = 5;        TopologyBuilder builder = new TopologyBuilder();        builder.setSpout(spoutId, new TestWordSpout(), 5);        //NOTE 通过partialKeyGrouping替代fieldsGrouping,实现较为均衡的负载到countBolt        builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields("word"));        builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj"));        builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj"));        builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);        submitRemote(builder);    }
  • 值得注意的是在wordCount的bolt使用PartialKeyGrouping,同一个单词不再固定发给相同的task,因此这里还需要RollingCountAggBolt按fieldsGrouping进行合并。

PartialKeyGrouping(1.2.2版)

storm-core-1.2.2-sources.jar!/org/apache/storm/grouping/PartialKeyGrouping.java

public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {    private static final long serialVersionUID = -447379837314000353L;    private List
targetTasks; private long[] targetTaskStats; private HashFunction h1 = Hashing.murmur3_128(13); private HashFunction h2 = Hashing.murmur3_128(17); private Fields fields = null; private Fields outFields = null; public PartialKeyGrouping() { //Empty } public PartialKeyGrouping(Fields fields) { this.fields = fields; } @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List
targetTasks) { this.targetTasks = targetTasks; targetTaskStats = new long[this.targetTasks.size()]; if (this.fields != null) { this.outFields = context.getComponentOutputFields(stream); } } @Override public List
chooseTasks(int taskId, List
values) { List
boltIds = new ArrayList<>(1); if (values.size() > 0) { byte[] raw; if (fields != null) { List
selectedFields = outFields.select(fields, values); ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4); for (Object o: selectedFields) { if (o instanceof List) { out.putInt(Arrays.deepHashCode(((List)o).toArray())); } else if (o instanceof Object[]) { out.putInt(Arrays.deepHashCode((Object[])o)); } else if (o instanceof byte[]) { out.putInt(Arrays.hashCode((byte[]) o)); } else if (o instanceof short[]) { out.putInt(Arrays.hashCode((short[]) o)); } else if (o instanceof int[]) { out.putInt(Arrays.hashCode((int[]) o)); } else if (o instanceof long[]) { out.putInt(Arrays.hashCode((long[]) o)); } else if (o instanceof char[]) { out.putInt(Arrays.hashCode((char[]) o)); } else if (o instanceof float[]) { out.putInt(Arrays.hashCode((float[]) o)); } else if (o instanceof double[]) { out.putInt(Arrays.hashCode((double[]) o)); } else if (o instanceof boolean[]) { out.putInt(Arrays.hashCode((boolean[]) o)); } else if (o != null) { out.putInt(o.hashCode()); } else { out.putInt(0); } } raw = out.array(); } else { raw = values.get(0).toString().getBytes(); // assume key is the first field } int firstChoice = (int) (Math.abs(h1.hashBytes(raw).asLong()) % this.targetTasks.size()); int secondChoice = (int) (Math.abs(h2.hashBytes(raw).asLong()) % this.targetTasks.size()); int selected = targetTaskStats[firstChoice] > targetTaskStats[secondChoice] ? secondChoice : firstChoice; boltIds.add(targetTasks.get(selected)); targetTaskStats[selected]++; } return boltIds; }}
  • 可以看到PartialKeyGrouping是一种CustomStreamGrouping,在prepare的时候,初始化了long[] targetTaskStats用于统计每个task
  • partialKeyGrouping如果没有指定fields,则默认按outputFields的第一个field来计算
  • 这里使用guava类库提供的Hashing.murmur3_128函数,构造了两个HashFunction,然后计算哈希值的绝对值与targetTasks.size()取余数得到两个可选的taskId下标
  • 然后根据targetTaskStats的统计值,取用过的次数小的那个taskId,选中之后更新targetTaskStats

PartialKeyGrouping(2.0.0版)

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

/** * A variation on FieldGrouping. This grouping operates on a partitioning of the incoming tuples (like a FieldGrouping), but it can send * Tuples from a given partition to multiple downstream tasks. * * Given a total pool of target tasks, this grouping will always send Tuples with a given key to one member of a subset of those tasks. Each * key is assigned a subset of tasks. Each tuple is then sent to one task from that subset. * * Notes: - the default TaskSelector ensures each task gets as close to a balanced number of Tuples as possible - the default * AssignmentCreator hashes the key and produces an assignment of two tasks */public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {    private static final long serialVersionUID = -1672360572274911808L;    private List
targetTasks; private Fields fields = null; private Fields outFields = null; private AssignmentCreator assignmentCreator; private TargetSelector targetSelector; public PartialKeyGrouping() { this(null); } public PartialKeyGrouping(Fields fields) { this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector()); } public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) { this(fields, assignmentCreator, new BalancedTargetSelector()); } public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) { this.fields = fields; this.assignmentCreator = assignmentCreator; this.targetSelector = targetSelector; } @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List
targetTasks) { this.targetTasks = targetTasks; if (this.fields != null) { this.outFields = context.getComponentOutputFields(stream); } } @Override public List
chooseTasks(int taskId, List
values) { List
boltIds = new ArrayList<>(1); if (values.size() > 0) { final byte[] rawKeyBytes = getKeyBytes(values); final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes); final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey); boltIds.add(selectedTask); } return boltIds; } /** * Extract the key from the input Tuple. */ private byte[] getKeyBytes(List
values) { byte[] raw; if (fields != null) { List selectedFields = outFields.select(fields, values); ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4); for (Object o : selectedFields) { if (o instanceof List) { out.putInt(Arrays.deepHashCode(((List) o).toArray())); } else if (o instanceof Object[]) { out.putInt(Arrays.deepHashCode((Object[]) o)); } else if (o instanceof byte[]) { out.putInt(Arrays.hashCode((byte[]) o)); } else if (o instanceof short[]) { out.putInt(Arrays.hashCode((short[]) o)); } else if (o instanceof int[]) { out.putInt(Arrays.hashCode((int[]) o)); } else if (o instanceof long[]) { out.putInt(Arrays.hashCode((long[]) o)); } else if (o instanceof char[]) { out.putInt(Arrays.hashCode((char[]) o)); } else if (o instanceof float[]) { out.putInt(Arrays.hashCode((float[]) o)); } else if (o instanceof double[]) { out.putInt(Arrays.hashCode((double[]) o)); } else if (o instanceof boolean[]) { out.putInt(Arrays.hashCode((boolean[]) o)); } else if (o != null) { out.putInt(o.hashCode()); } else { out.putInt(0); } } raw = out.array(); } else { raw = values.get(0).toString().getBytes(); // assume key is the first field } return raw; } //......}
  • 2.0.0版本将逻辑封装到了RandomTwoTaskAssignmentCreator以及BalancedTargetSelector中

RandomTwoTaskAssignmentCreator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

/**     * This interface is responsible for choosing a subset of the target tasks to use for a given key.     *     * NOTE: whatever scheme you use to create the assignment should be deterministic. This may be executed on multiple Storm Workers, thus     * each of them needs to come up with the same assignment for a given key.     */    public interface AssignmentCreator extends Serializable {        int[] createAssignment(List
targetTasks, byte[] key); } /*========== Implementations ==========*/ /** * This implementation of AssignmentCreator chooses two arbitrary tasks. */ public static class RandomTwoTaskAssignmentCreator implements AssignmentCreator { /** * Creates a two task assignment by selecting random tasks. */ public int[] createAssignment(List
tasks, byte[] key) { // It is necessary that this produce a deterministic assignment based on the key, so seed the Random from the key final long seedForRandom = Arrays.hashCode(key); final Random random = new Random(seedForRandom); final int choice1 = random.nextInt(tasks.size()); int choice2 = random.nextInt(tasks.size()); // ensure that choice1 and choice2 are not the same task choice2 = choice1 == choice2 ? (choice2 + 1) % tasks.size() : choice2; return new int[]{ tasks.get(choice1), tasks.get(choice2) }; } }
  • 2.0.0版本不再使用guava类库提供的Hashing.murmur3_128哈希函数,转而使用key的哈希值作为seed,采用Random函数来计算两个taskId的下标,这里返回两个值供bolt做负载均衡选择

BalancedTargetSelector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

/**     * This interface chooses one element from a task assignment to send a specific Tuple to.     */    public interface TargetSelector extends Serializable {        Integer chooseTask(int[] assignedTasks);    }    /**     * A basic implementation of target selection. This strategy chooses the task within the assignment that has received the fewest Tuples     * overall from this instance of the grouping.     */    public static class BalancedTargetSelector implements TargetSelector {        private Map
targetTaskStats = Maps.newHashMap(); /** * Chooses one of the incoming tasks and selects the one that has been selected the fewest times so far. */ public Integer chooseTask(int[] assignedTasks) { Integer taskIdWithMinLoad = null; Long minTaskLoad = Long.MAX_VALUE; for (Integer currentTaskId : assignedTasks) { final Long currentTaskLoad = targetTaskStats.getOrDefault(currentTaskId, 0L); if (currentTaskLoad < minTaskLoad) { minTaskLoad = currentTaskLoad; taskIdWithMinLoad = currentTaskId; } } targetTaskStats.put(taskIdWithMinLoad, targetTaskStats.getOrDefault(taskIdWithMinLoad, 0L) + 1); return taskIdWithMinLoad; } }
  • BalancedTargetSelector根据选中的taskId,然后根据targetTaskStats计算taskIdWithMinLoad返回

FieldsGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

public static class FieldsGrouper implements CustomStreamGrouping {        private Fields outFields;        private List
> targetTasks; private Fields groupFields; private int numTasks; public FieldsGrouper(Fields outFields, Grouping thriftGrouping) { this.outFields = outFields; this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping)); } @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List
targetTasks) { this.targetTasks = new ArrayList
>(); for (Integer targetTask : targetTasks) { this.targetTasks.add(Collections.singletonList(targetTask)); } this.numTasks = targetTasks.size(); } @Override public List
chooseTasks(int taskId, List
values) { int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks); return targetTasks.get(targetTaskIndex); } }
  • 这里可以看到FieldsGrouper的chooseTasks方法使用TupleUtils.chooseTaskIndex来选择taskId下标

TupleUtils.chooseTaskIndex

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/TupleUtils.java

public static 
int chooseTaskIndex(List
keys, int numTasks) { return Math.floorMod(listHashCode(keys), numTasks); } private static
int listHashCode(List
alist) { if (alist == null) { return 1; } else { return Arrays.deepHashCode(alist.toArray()); } }
  • 这里先对keys进行listHashCode,然后与numTasks进行Math.floorMod运算,即向下取模
  • listHashCode调用了Arrays.deepHashCode(alist.toArray())进行哈希值计算

小结

  • storm的PartialKeyGrouping是解决fieldsGrouping造成的bolt节点skewed load的问题
  • fieldsGrouping采取的是对所选字段进行哈希然后与taskId数量向下取模来选择taskId的下标
  • PartialKeyGrouping在1.2.2版本的实现是使用guava提供的Hashing.murmur3_128哈希函数计算哈希值,然后取绝对值与taskId数量取余数得到两个可选的taskId下标;在2.0.0版本则使用key的哈希值作为seed,采用Random函数来计算两个taskId的下标。注意这里返回两个值供bolt做负载均衡选择,这是与fieldsGrouping的差别。在得到两个候选taskId之后,PartialKeyGrouping额外维护了taskId的使用数,每次选择使用少的,与此同时也更新每次选择的计数。
  • 值得注意的是在wordCount的bolt使用PartialKeyGrouping,同一个单词不再固定发给相同的task,因此这里还需要RollingCountAggBolt按fieldsGrouping进行合并。

doc

转载地址:http://rjlia.baihongyu.com/

你可能感兴趣的文章
vuex简单示例
查看>>
根据数据库结构生成RzCheckTree
查看>>
hihocoder [Offer收割]编程练习赛8 矩形计数
查看>>
汇编实验九
查看>>
哈夫曼编码
查看>>
go语言学习之闭包函数
查看>>
javax.servlet.http.HttpServletRequest; 不存在
查看>>
类型自动转换规则
查看>>
kvm-控制台登陆配置
查看>>
SpringAOP
查看>>
有赞MySQL自动化运维之路—ZanDB
查看>>
String与常量池(JDK1.8)
查看>>
lightoj 1031(dp)
查看>>
SQL Server转sqlite数据库
查看>>
python print和strip
查看>>
2016学年第一学期软件工程第二次作业
查看>>
Powershell检查邮件队列设置阈值,通过html形式进行邮件告警
查看>>
痞子衡嵌入式:恩智浦i.MXRT系列微控制器量产神器RT-Flash用户指南
查看>>
PHP学习笔记1
查看>>
MySQL学习1
查看>>