/** * A variation on FieldGrouping. This grouping operates on a partitioning of the incoming tuples (like a FieldGrouping), but it can send * Tuples from a given partition to multiple downstream tasks. * * Given a total pool of target tasks, this grouping will always send Tuples with a given key to one member of a subset of those tasks. Each * key is assigned a subset of tasks. Each tuple is then sent to one task from that subset. * * Notes: - the default TaskSelector ensures each task gets as close to a balanced number of Tuples as possible - the default * AssignmentCreator hashes the key and produces an assignment of two tasks */public class PartialKeyGrouping implements CustomStreamGrouping, Serializable { private static final long serialVersionUID = -1672360572274911808L; private List targetTasks; private Fields fields = null; private Fields outFields = null; private AssignmentCreator assignmentCreator; private TargetSelector targetSelector; public PartialKeyGrouping() { this(null); } public PartialKeyGrouping(Fields fields) { this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector()); } public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) { this(fields, assignmentCreator, new BalancedTargetSelector()); } public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) { this.fields = fields; this.assignmentCreator = assignmentCreator; this.targetSelector = targetSelector; } @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) { this.targetTasks = targetTasks; if (this.fields != null) { this.outFields = context.getComponentOutputFields(stream); } } @Override public List chooseTasks(int taskId, List values) { List boltIds = new ArrayList<>(1); if (values.size() > 0) { final byte[] rawKeyBytes = getKeyBytes(values); final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes); final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey); boltIds.add(selectedTask); } return boltIds; } /** * Extract the key from the input Tuple. */ private byte[] getKeyBytes(List values) { byte[] raw; if (fields != null) { List selectedFields = outFields.select(fields, values); ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4); for (Object o : selectedFields) { if (o instanceof List) { out.putInt(Arrays.deepHashCode(((List) o).toArray())); } else if (o instanceof Object[]) { out.putInt(Arrays.deepHashCode((Object[]) o)); } else if (o instanceof byte[]) { out.putInt(Arrays.hashCode((byte[]) o)); } else if (o instanceof short[]) { out.putInt(Arrays.hashCode((short[]) o)); } else if (o instanceof int[]) { out.putInt(Arrays.hashCode((int[]) o)); } else if (o instanceof long[]) { out.putInt(Arrays.hashCode((long[]) o)); } else if (o instanceof char[]) { out.putInt(Arrays.hashCode((char[]) o)); } else if (o instanceof float[]) { out.putInt(Arrays.hashCode((float[]) o)); } else if (o instanceof double[]) { out.putInt(Arrays.hashCode((double[]) o)); } else if (o instanceof boolean[]) { out.putInt(Arrays.hashCode((boolean[]) o)); } else if (o != null) { out.putInt(o.hashCode()); } else { out.putInt(0); } } raw = out.array(); } else { raw = values.get(0).toString().getBytes(); // assume key is the first field } return raw; } //......}
/** * This interface is responsible for choosing a subset of the target tasks to use for a given key. * * NOTE: whatever scheme you use to create the assignment should be deterministic. This may be executed on multiple Storm Workers, thus * each of them needs to come up with the same assignment for a given key. */ public interface AssignmentCreator extends Serializable { int[] createAssignment(List targetTasks, byte[] key); } /*========== Implementations ==========*/ /** * This implementation of AssignmentCreator chooses two arbitrary tasks. */ public static class RandomTwoTaskAssignmentCreator implements AssignmentCreator { /** * Creates a two task assignment by selecting random tasks. */ public int[] createAssignment(List tasks, byte[] key) { // It is necessary that this produce a deterministic assignment based on the key, so seed the Random from the key final long seedForRandom = Arrays.hashCode(key); final Random random = new Random(seedForRandom); final int choice1 = random.nextInt(tasks.size()); int choice2 = random.nextInt(tasks.size()); // ensure that choice1 and choice2 are not the same task choice2 = choice1 == choice2 ? (choice2 + 1) % tasks.size() : choice2; return new int[]{ tasks.get(choice1), tasks.get(choice2) }; } }
/** * This interface chooses one element from a task assignment to send a specific Tuple to. */ public interface TargetSelector extends Serializable { Integer chooseTask(int[] assignedTasks); } /** * A basic implementation of target selection. This strategy chooses the task within the assignment that has received the fewest Tuples * overall from this instance of the grouping. */ public static class BalancedTargetSelector implements TargetSelector { private Map targetTaskStats = Maps.newHashMap(); /** * Chooses one of the incoming tasks and selects the one that has been selected the fewest times so far. */ public Integer chooseTask(int[] assignedTasks) { Integer taskIdWithMinLoad = null; Long minTaskLoad = Long.MAX_VALUE; for (Integer currentTaskId : assignedTasks) { final Long currentTaskLoad = targetTaskStats.getOrDefault(currentTaskId, 0L); if (currentTaskLoad < minTaskLoad) { minTaskLoad = currentTaskLoad; taskIdWithMinLoad = currentTaskId; } } targetTaskStats.put(taskIdWithMinLoad, targetTaskStats.getOrDefault(taskIdWithMinLoad, 0L) + 1); return taskIdWithMinLoad; } }