Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,6 @@
*/
package org.apache.hbase.reporter;

import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.datasketches.quantiles.DoublesSketch;
import org.apache.datasketches.quantiles.DoublesUnion;
import org.apache.datasketches.quantiles.UpdateDoublesSketch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
Expand All @@ -58,14 +37,44 @@
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.datasketches.quantiles.DoublesSketch;
import org.apache.datasketches.quantiles.DoublesUnion;
import org.apache.datasketches.quantiles.UpdateDoublesSketch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;

/**
* Run a scan against a table reporting on row size, column size and count.
*
* So can run against cdh5, uses loads of deprecated API and copies some Cell sizing methods local.
*/
public class TableReporter {
public final class TableReporter {
private static String GNUPLOT_DATA_SUFFIX = ".gnuplotdata";

private TableReporter(){
}

/**
* Quantile sketches. Has a print that dumps out sketches on stdout.
* To accumlate Sketches instances, see {@link AccumlatingSketch}
Expand All @@ -91,7 +100,8 @@ public double getAsDouble() {
/**
* Bins that sort of make sense for the data we're seeing here. After some trial and error.
*/
static double [] BINS = new double [] {1, 5, 10, 15, 20, 25, 100, 1024, 5120, 10240, 20480, 51200, 102400, 1048576};
static double [] BINS = new double [] {1, 5, 10, 15, 20, 25, 100, 1024, 5120,
10240, 20480, 51200, 102400, 1048576};

/**
* Size of row.
Expand Down Expand Up @@ -123,14 +133,15 @@ void print() {
}

private static void print(String label, final DoublesSketch sketch) {
System.out.println(label + " quantiles " + Arrays.toString(sketch.getQuantiles(NORMALIZED_RANKS)));
System.out.println(label + " quantiles " +
Arrays.toString(sketch.getQuantiles(NORMALIZED_RANKS)));
double [] pmfs = sketch.getPMF(BINS);
// System.out.println(label + " pmfs " + Arrays.toString(pmfs));
System.out.println(label + " histo " +
(pmfs == null || pmfs.length == 0?
"null": Arrays.toString(Arrays.stream(pmfs).map(d -> d * sketch.getN()).toArray())));
System.out.println(label + "stats N=" + sketch.getN() + ", min=" + sketch.getMinValue() + ", max=" +
sketch.getMaxValue());
System.out.println(label + "stats N=" + sketch.getN() + ", min=" + sketch.getMinValue() +
", max=" + sketch.getMaxValue());
}
}

Expand Down Expand Up @@ -171,7 +182,8 @@ static void processRowResult(Result result, Sketches sketches) {
* @return First <code>fraction</code> of Table's regions.
*/
private static List<RegionInfo> getRegions(Connection connection, TableName tableName,
double fraction, String encodedRegionName) throws IOException {
double fraction, String encodedRegionName)
throws IOException {
try (Admin admin = connection.getAdmin()) {
// Use deprecated API because running against old hbase.
List<RegionInfo> regions = admin.getRegions(tableName);
Expand Down Expand Up @@ -244,7 +256,7 @@ long getDuration() {

private static void sketch(Configuration configuration, String tableNameAsStr, int limit,
double fraction, int threads, String isoNow, String encodedRegionName)
throws IOException, InterruptedException, ExecutionException {
throws IOException, InterruptedException, ExecutionException {
TableName tableName = TableName.valueOf(tableNameAsStr);
AccumlatingSketch totalSketches = new AccumlatingSketch();
long startTime = System.currentTimeMillis();
Expand Down Expand Up @@ -272,12 +284,13 @@ public Thread newThread(Runnable r) {
}
});
try {
List<SketchRegion> srs = regions.stream().map(ri -> new SketchRegion(connection, tableName, ri, limit)).
collect(Collectors.toList());
List<SketchRegion> srs = regions.stream().
map(ri -> new SketchRegion(connection, tableName, ri, limit)).
collect(Collectors.toList());
List<Future<SketchRegion>> futures = new ArrayList<>(srs.size());
for (SketchRegion sr: srs) {
// Do submit rather than inokeall; invokeall blocks until all done. This way I get control back
// after all submitted.
// Do submit rather than inokeall; invokeall blocks until all done.
// This way I get control back after all submitted.
futures.add(es.submit(sr));
}
// Avoid java.util.ConcurrentModificationException
Expand Down Expand Up @@ -305,8 +318,9 @@ public Thread newThread(Runnable r) {
}
Sketches sketches = totalSketches.get();
String isoDuration = Duration.ofMillis(System.currentTimeMillis() - startTime).toString();
sketches.print(Instant.now().toString() + " Totals for " + tableNameAsStr + " regions=" + count +
", limit=" + limit + ", fraction=" + fraction + ", took=" + isoDuration);
sketches.print(Instant.now().toString() + " Totals for " + tableNameAsStr +
" regions=" + count + ", limit=" + limit + ", fraction=" + fraction +
", took=" + isoDuration);
// Dump out the gnuplot files. Saves time generating graphs.
dumpGnuplotDataFiles(isoNow, sketches, tableNameAsStr, count, isoDuration);
}
Expand Down Expand Up @@ -367,13 +381,14 @@ private static String getFileNamePrefix(String isoNow, String tableName, String
return "reporter." + isoNow + "." + tableName + "." + sketchName;
}

private static String getFileFirstLine(String tableName, int regions, String isoDuration, UpdateDoublesSketch sketch) {
return "# " + tableName + " regions=" + regions + ", duration=" + isoDuration + ", N=" + sketch.getN() +
", min=" + sketch.getMinValue() + ", max=" + sketch.getMaxValue();
private static String getFileFirstLine(String tableName, int regions,
String isoDuration, UpdateDoublesSketch sketch) {
return "# " + tableName + " regions=" + regions + ", duration=" + isoDuration + ", N=" +
sketch.getN() + ", min=" + sketch.getMinValue() + ", max=" + sketch.getMaxValue();
}

private static void dumpPercentilesFile(String prefix, String firstLine, UpdateDoublesSketch sketch)
throws IOException {
private static void dumpPercentilesFile(String prefix, String firstLine,
UpdateDoublesSketch sketch) throws IOException {
dumpFile(File.createTempFile(prefix + ".percentiles.", GNUPLOT_DATA_SUFFIX),
firstLine, sketch.getQuantiles(Sketches.NORMALIZED_RANKS));
}
Expand All @@ -398,17 +413,19 @@ private static void dumpFile(File file, String firstLine, double [] ds) throws I
System.out.println(Instant.now().toString() + " wrote " + file.toString());
}

private static void dumpFiles(String prefix, String firstLine, UpdateDoublesSketch sketch) throws IOException {
private static void dumpFiles(String prefix, String firstLine, UpdateDoublesSketch sketch)
throws IOException {
dumpPercentilesFile(prefix, firstLine, sketch);
dumpHistogramFile(prefix, firstLine, sketch);
}

/**
* Write four files, a histogram and percentiles, one each for each of the row size and column count sketches.
* Write four files, a histogram and percentiles,
* one each for each of the row size and column count sketches.
* Tie the four files with isoNow time.
*/
private static void dumpGnuplotDataFiles(String isoNow, Sketches sketches, String tableName, int regions,
String isoDuration) throws IOException {
private static void dumpGnuplotDataFiles(String isoNow, Sketches sketches, String tableName,
int regions, String isoDuration) throws IOException {
UpdateDoublesSketch sketch = sketches.columnCountSketch;
dumpFiles(getFileNamePrefix(isoNow, tableName, "columnCount"),
getFileFirstLine(tableName, regions, isoDuration, sketch), sketch);
Expand All @@ -434,15 +451,17 @@ static void usage(Options options, String error) {
System.out.println("OPTIONS:");
System.out.println(" -h,--help Output this help message");
System.out.println(" -l,--limit Scan row limit (per thread): default none");
System.out.println(" -f,--fraction Fraction of table Regions to read; between 0 and 1: default 1.0 (all)");
System.out.println(" -r,--region Scan this Region only; pass encoded name; 'fraction' is ignored.");
System.out.println(" -f,--fraction " +
"Fraction of table Regions to read; between 0 and 1: default 1.0 (all)");
System.out.println(" -r,--region " +
"Scan this Region only; pass encoded name; 'fraction' is ignored.");
System.out.println(" -t,--threads Concurrent thread count (thread per Region); default 1");
System.out.println(" -Dproperty=value Properties such as the zookeeper to connect to; e.g:");
System.out.println(" -Dhbase.zookeeper.quorum=ZK0.remote.cluster.example.org");
}

public static void main(String [] args)
throws ParseException, IOException, ExecutionException, InterruptedException {
throws ParseException, IOException, ExecutionException, InterruptedException {
Options options = new Options();
Option help = Option.builder("h").longOpt("help").
desc("output this help message").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@
*/
package org.apache.hbase.reporter;

import static org.apache.hadoop.hbase.shaded.junit.framework.TestCase.assertEquals;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

import static org.apache.hadoop.hbase.shaded.junit.framework.TestCase.assertEquals;

public class TestTableReporter {
private static final byte [] CF = Bytes.toBytes("cf");
private static final byte [] Q = Bytes.toBytes("q");
Expand Down Expand Up @@ -56,7 +56,8 @@ public void testSimpleSketching() {
final int columns = 3;
final int versions = 2;
for (int i = 0; i < rows; i++) {
TableReporter.processRowResult(Result.create(makeCells(Bytes.toBytes(i), columns, versions)), sketches);
TableReporter.processRowResult(Result.create(makeCells(Bytes.toBytes(i),
columns, versions)), sketches);
}
sketches.print();
// Just check the column counts. Should be 2.
Expand All @@ -72,12 +73,14 @@ public void testAddSketches() {
final int columns = 3;
final int versions = 2;
for (int i = 0; i < rows; i++) {
TableReporter.processRowResult(Result.create(makeCells(Bytes.toBytes(i), columns, versions)), sketches);
TableReporter.processRowResult(Result.create(makeCells(Bytes.toBytes(i),
columns, versions)), sketches);
}
sketches.print();
TableReporter.Sketches sketches2 = new TableReporter.Sketches();
for (int i = 0; i < rows; i++) {
TableReporter.processRowResult(Result.create(makeCells(Bytes.toBytes(i), columns, versions)), sketches2);
TableReporter.processRowResult(Result.create(makeCells(Bytes.toBytes(i),
columns, versions)), sketches2);
}
sketches2.print();
TableReporter.AccumlatingSketch accumlator = new TableReporter.AccumlatingSketch();
Expand Down