diff options
Diffstat (limited to 'staging/linux-x86/sample/lambda/BulkDataOperations/src/CSVProcessor.java')
-rw-r--r-- | staging/linux-x86/sample/lambda/BulkDataOperations/src/CSVProcessor.java | 368 |
1 files changed, 368 insertions, 0 deletions
diff --git a/staging/linux-x86/sample/lambda/BulkDataOperations/src/CSVProcessor.java b/staging/linux-x86/sample/lambda/BulkDataOperations/src/CSVProcessor.java new file mode 100644 index 0000000..ded9030 --- /dev/null +++ b/staging/linux-x86/sample/lambda/BulkDataOperations/src/CSVProcessor.java @@ -0,0 +1,368 @@ +/* + * Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * - Neither the name of Oracle nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * This source code is provided to illustrate the usage of a given feature + * or technique and has been deliberately simplified. Additional steps + * required for a production-quality application, such as security checks, + * input validation, and proper error handling, might not be present in + * this sample code. + */ + +import java.io.BufferedReader; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.*; +import java.util.function.*; +import java.util.regex.Pattern; +import java.util.stream.Collector; +import java.util.stream.Collectors; + +import static java.lang.Double.parseDouble; +import static java.util.stream.Collectors.*; + +/** + * CSVProcessor is a tool for processing CSV files. There are several + * command-line options. Consult the {@link #printUsageAndExit} method for + * instructions and command line parameters. This sample shows examples of the + * following features: + * <ul> + * <li>Lambda and bulk operations. Working with streams: map(...), filter(...), + * sorted(...) methods. The collect(...) method with different collectors: + * Collectors.maxBy(...), Collectors.minBy(...), Collectors.toList(), + * Collectors.toCollection(...), Collectors.groupingBy(...), + * Collectors.toDoubleSummaryStatistics(...), and a custom Collector.</li> + * <li>Static method reference for printing values.</li> + * <li>Try-with-resources feature for closing files.</li> + * <li>Switch by String feature.</li> + * <li>Other new APIs: Pattern.asPredicate(), BinaryOperator + * BufferedReader.lines(), Collection.forEach(...), Comparator.comparing(...), + * Comparator.reversed(), Arrays.stream(...).</li> + * </ul> + * + */ +public class CSVProcessor { + + //Number of characters that may be read + private static final int READ_AHEAD_LIMIT = 100_000_000; + + /** + * The main method for the CSVProcessor program. Run the program with an + * empty argument list to see possible arguments. + * + * @param args the argument list for CSVProcessor. + */ + public static void main(String[] args) { + if (args.length < 2) { + printUsageAndExit(); + } + try (BufferedReader br = new BufferedReader( + Files.newBufferedReader(Paths.get(args[args.length - 1])))) { + //Assume that the first line contains column names. + List<String> header = Arrays.stream(br.readLine().split(",")) + .map(String::trim).collect(toList()); + //Calculate an index of the column in question. + int column = getColumnNumber(header, args[1]); + switch (args[0]) { + case "sort": + verifyArgumentNumber(args, 4); + //Define the sort order. + boolean isAsc; + switch (args[2].toUpperCase()) { + case "ASC": + isAsc = true; + break; + case "DESC": + isAsc = false; + break; + default: + printUsageAndExit("Illegal argument" + args[2]); + return;//Should not be reached. + } + /* + * Create a comparator that compares lines by comparing + * values in the specified column. + */ + Comparator<String> cmp + = Comparator.comparing(str -> getCell(str, column), + String.CASE_INSENSITIVE_ORDER); + /* + * sorted(...) is used to sort records. + * forEach(...) is used to output sorted records. + */ + br.lines().sorted(isAsc ? cmp : cmp.reversed()) + .forEach(System.out::println); + break; + case "search": + verifyArgumentNumber(args, 4); + /* + * Records are filtered by a regex. + * forEach(...) is used to output filtered records. + */ + Predicate<String> pattern + = Pattern.compile(args[2]).asPredicate(); + br.lines().filter(str -> pattern.test(getCell(str, column))) + .forEach(System.out::println); + break; + case "groupby": + verifyArgumentNumber(args, 3); + /* + * Group lines by values in the column with collect(...), and + * print with forEach(...) for every distinct value within + * the column. + */ + br.lines().collect( + Collectors.groupingBy(str -> getCell(str, column), + toCollection(TreeSet::new))) + .forEach((str, set) -> { + System.out.println(str + ":"); + set.forEach(System.out::println); + }); + break; + case "stat": + verifyArgumentNumber(args, 3); + + /* + * BufferedReader will be read several times. + * Mark this point to return here after each pass. + * BufferedReader will be read right after the headers line + * because it is already read. + */ + br.mark(READ_AHEAD_LIMIT); + + /* + * Statistics can be collected by a custom collector in one + * pass. One pass is preferable. + */ + System.out.println( + br.lines().collect(new Statistics(column))); + + /* + * Alternatively, statistics can be collected + * by a built-in API in several passes. + * This method demonstrates how separate operations can be + * implemented using a built-in API. + */ + br.reset(); + statInSeveralPasses(br, column); + break; + default: + printUsageAndExit("Illegal argument" + args[0]); + } + } catch (IOException e) { + printUsageAndExit(e.toString()); + } + } + + private static void statInSeveralPasses(BufferedReader br, int column) + throws IOException { + System.out.println("#-----Statistics in several passes-------#"); + //Create a comparator to compare records by the column. + Comparator<String> comparator + = Comparator.comparing( + (String str) -> parseDouble(getCell(str, column))); + //Find max record by using Collectors.maxBy(...) + System.out.println( + "Max: " + br.lines().collect(maxBy(comparator)).get()); + br.reset(); + //Find min record by using Collectors.minBy(...) + System.out.println( + "Min: " + br.lines().collect(minBy(comparator)).get()); + br.reset(); + //Compute the average value and sum with + //Collectors.toDoubleSummaryStatistics(...) + DoubleSummaryStatistics doubleSummaryStatistics + = br.lines().collect(summarizingDouble( + str -> parseDouble(getCell(str, column)))); + System.out.println("Average: " + doubleSummaryStatistics.getAverage()); + System.out.println("Sum: " + doubleSummaryStatistics.getSum()); + } + + private static void verifyArgumentNumber(String[] args, int n) { + if (args.length != n) { + printUsageAndExit("Expected " + n + " arguments but was " + + args.length); + } + } + + private static int getColumnNumber(List<String> header, String name) { + int column = header.indexOf(name); + if (column == -1) { + printUsageAndExit("There is no column with name " + name); + } + return column; + } + + private static String getCell(String record, int column) { + return record.split(",")[column].trim(); + } + + private static void printUsageAndExit(String... str) { + System.out.println("Usages:"); + + System.out.println("CSVProcessor sort COLUMN_NAME ASC|DESC FILE"); + System.out.println("Sort lines by column COLUMN_NAME in CSV FILE\n"); + + System.out.println("CSVProcessor search COLUMN_NAME REGEX FILE"); + System.out.println("Search for REGEX in column COLUMN_NAME in CSV FILE\n"); + + System.out.println("CSVProcessor groupby COLUMN_NAME FILE"); + System.out.println("Split lines into different groups according to column " + + "COLUMN_NAME value\n"); + + System.out.println("CSVProcessor stat COLUMN_NAME FILE"); + System.out.println("Compute max/min/average/sum statistics by column " + + "COLUMN_NAME\n"); + + Arrays.asList(str).forEach(System.err::println); + System.exit(1); + } + + /* + * This is a custom implementation of the Collector interface. + * Statistics are objects gather max,min,sum,average statistics. + */ + private static class Statistics + implements Collector<String, Statistics, Statistics> { + + + /* + * This implementation does not need to be thread safe because + * the parallel implementation of + * {@link java.util.stream.Stream#collect Stream.collect()} + * provides the necessary partitioning and isolation for safe parallel + * execution. + */ + private String maxRecord; + private String minRecord; + + private double sum; + private int lineCount; + private final BinaryOperator<String> maxOperator; + private final BinaryOperator<String> minOperator; + private final int column; + + public Statistics(int column) { + this.column = column; + Comparator<String> cmp = Comparator.comparing( + (String str) -> parseDouble(getCell(str, column))); + maxOperator = BinaryOperator.maxBy(cmp); + minOperator = BinaryOperator.minBy(cmp); + } + + /* + * Process line. + */ + public Statistics accept(String line) { + maxRecord = maxRecord == null + ? line : maxOperator.apply(maxRecord, line); + minRecord = minRecord == null + ? line : minOperator.apply(minRecord, line); + + sum += parseDouble(getCell(line, column)); + lineCount++; + return this; + } + + + /* + * Merge two Statistics. + */ + public Statistics combine(Statistics stat) { + maxRecord = maxOperator.apply(maxRecord, stat.getMaxRecord()); + minRecord = minOperator.apply(minRecord, stat.getMinRecord()); + sum += stat.getSum(); + lineCount += stat.getLineCount(); + return this; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("#------Statistics------#\n"); + sb.append("Max: ").append(getMaxRecord()).append("\n"); + sb.append("Min: ").append(getMinRecord()).append("\n"); + sb.append("Sum = ").append(getSum()).append("\n"); + sb.append("Average = ").append(average()).append("\n"); + sb.append("#------Statistics------#\n"); + return sb.toString(); + } + + @Override + public Supplier<Statistics> supplier() { + return () -> new Statistics(column); + } + + @Override + public BiConsumer<Statistics, String> accumulator() { + return Statistics::accept; + } + + @Override + public BinaryOperator<Statistics> combiner() { + return Statistics::combine; + + } + + @Override + public Function<Statistics, Statistics> finisher() { + return stat -> stat; + } + + @Override + public Set<Characteristics> characteristics() { + return EnumSet.of(Characteristics.IDENTITY_FINISH); + } + + private String getMaxRecord() { + return maxRecord; + } + + private String getMinRecord() { + return minRecord; + } + + private double getSum() { + return sum; + } + + private double average() { + return sum / lineCount; + } + + private int getLineCount() { + return lineCount; + } + + } + +} |