下面列出了org.openjdk.jmh.annotations.Warmup#org.openjdk.jmh.annotations.Threads 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Threads(4)
@Benchmark
public void allocateBlockBenchMark(BenchMarkOzoneManager state,
Blackhole bh) throws IOException {
int index = (int) (Math.random() * keyNames.size());
String key = keyNames.get(index);
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(key)
.setDataSize(50)
.setFactor(HddsProtos.ReplicationFactor.THREE)
.setType(HddsProtos.ReplicationType.RATIS)
.build();
state.om.allocateBlock(omKeyArgs, clientIDs.get(index), new ExcludeList());
}
@Threads(4)
@Benchmark
public void createAndCommitKeyBenchMark(BenchMarkOzoneManager state,
Blackhole bh) throws IOException {
String key = UUID.randomUUID().toString();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(key)
.setDataSize(50)
.setFactor(HddsProtos.ReplicationFactor.THREE)
.setType(HddsProtos.ReplicationType.RATIS)
.build();
OpenKeySession openKeySession = state.om.openKey(omKeyArgs);
state.om.allocateBlock(omKeyArgs, openKeySession.getId(),
new ExcludeList());
}
@Benchmark
@Threads(1)
public void readUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException {
try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
.project(SCHEMA)
.createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type))
.build()) {
Iterable<InternalRow> unsafeRows = Iterables.transform(
rows,
APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(SCHEMA, SCHEMA))::invoke);
for (InternalRow row : unsafeRows) {
blackhole.consume(row);
}
}
}
@Benchmark
@Threads(1)
public void readUsingSparkReader(Blackhole blackhole) throws IOException {
StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
.project(SCHEMA)
.readSupport(new ParquetReadSupport())
.set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json())
.set("spark.sql.parquet.binaryAsString", "false")
.set("spark.sql.parquet.int96AsTimestamp", "false")
.callInit()
.build()) {
for (InternalRow row : rows) {
blackhole.consume(row);
}
}
}
@Benchmark
@Threads(1)
public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException {
try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
.project(PROJECTED_SCHEMA)
.createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
.build()) {
Iterable<InternalRow> unsafeRows = Iterables.transform(
rows,
APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(PROJECTED_SCHEMA, PROJECTED_SCHEMA))::invoke);
for (InternalRow row : unsafeRows) {
blackhole.consume(row);
}
}
}
@Benchmark
@Threads(1)
public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws IOException {
StructType sparkSchema = SparkSchemaUtil.convert(PROJECTED_SCHEMA);
try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
.project(PROJECTED_SCHEMA)
.readSupport(new ParquetReadSupport())
.set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json())
.set("spark.sql.parquet.binaryAsString", "false")
.set("spark.sql.parquet.int96AsTimestamp", "false")
.callInit()
.build()) {
for (InternalRow row : rows) {
blackhole.consume(row);
}
}
}
@Benchmark
@Threads(1)
public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws IOException {
StructType sparkSchema = SparkSchemaUtil.convert(PROJECTED_SCHEMA);
try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
.project(PROJECTED_SCHEMA)
.readSupport(new ParquetReadSupport())
.set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json())
.set("spark.sql.parquet.binaryAsString", "false")
.set("spark.sql.parquet.int96AsTimestamp", "false")
.callInit()
.build()) {
for (InternalRow row : rows) {
blackhole.consume(row);
}
}
}
@Benchmark
@Threads(1)
public void writeUsingSparkWriter() throws IOException {
StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
try (FileAppender<InternalRow> writer = Parquet.write(Files.localOutput(dataFile))
.writeSupport(new ParquetWriteSupport())
.set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json())
.set("spark.sql.parquet.writeLegacyFormat", "false")
.set("spark.sql.parquet.binaryAsString", "false")
.set("spark.sql.parquet.int96AsTimestamp", "false")
.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
.schema(SCHEMA)
.build()) {
writer.addAll(rows);
}
}
@Threads(4)
@Benchmark
public void allocateBlockBenchMark(BenchMarkSCM state,
Blackhole bh) throws IOException {
state.blockManager
.allocateBlock(50, ReplicationType.RATIS, ReplicationFactor.THREE,
"Genesis", new ExcludeList());
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MINUTES)
@Warmup(iterations = 10, time = 3, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 50, time = 3, timeUnit = TimeUnit.SECONDS)
@Threads(1)
public void testBasicStringSplit(Blackhole blackhole, MyState state) throws IOException {
BufferedReader bf = new BufferedReader(new StringReader(state.eventListStr));
StringBuilder sb = new StringBuilder();
String line;
List<String> msseList = new ArrayList<>();
int dollarCnt = 0;
while ((line = bf.readLine()) != null) {
for (int i = 0; i < line.length(); i++) {
if (dollarCnt == 3) {
msseList.add(sb.toString());
dollarCnt = 0;
sb = new StringBuilder();
}
if (line.charAt(i) != '$') {
sb.append(line.charAt(i));
} else {
dollarCnt++;
}
}
}
blackhole.consume(msseList);
//blackhole.consume(state.eventListStr.split("$$"));
//state.sum = state.a + state.b;
}
@Benchmark
@Threads(1)
public void readWithFilterFileSourceNonVectorized() {
Map<String, String> conf = Maps.newHashMap();
conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
withSQLConf(conf, () -> {
Dataset<Row> df = spark().read().parquet(dataLocation()).filter(FILTER_COND);
materialize(df);
});
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
@Warmup(iterations = 10, time = 3, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 50, time = 3, timeUnit = TimeUnit.SECONDS)
@Threads(1)
public void testSnappyCompress(Blackhole blackhole, MyState state) throws IOException {
blackhole.consume(CompressionUtils.compressAndBase64Encode(state.eventList, true));
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
@Warmup(iterations = 10, time = 3, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 50, time = 3, timeUnit = TimeUnit.SECONDS)
@Threads(1)
public void testSnappyDeCompress(Blackhole blackhole, MyState state) throws IOException {
blackhole.consume(CompressionUtils.decompressAndBase64Decode(state.snappyCompressed, true, true));
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
@Warmup(iterations = 10, time = 3, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 50, time = 3, timeUnit = TimeUnit.SECONDS)
@Threads(1)
public void testGzipCompress(Blackhole blackhole, MyState state) throws IOException {
blackhole.consume(CompressionUtils.compressAndBase64Encode(state.eventList));
}
@Benchmark
@Threads(1)
public void readFileSourceNonVectorized() {
Map<String, String> conf = Maps.newHashMap();
conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
withSQLConf(conf, () -> {
Dataset<Row> df = spark().read().parquet(dataLocation());
materialize(df);
});
}
@Benchmark
@Threads(1)
public void readWithProjectionFileSourceNonVectorized() {
Map<String, String> conf = Maps.newHashMap();
conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
withSQLConf(conf, () -> {
Dataset<Row> df = spark().read().parquet(dataLocation()).select("longCol");
materialize(df);
});
}
@Benchmark
@Threads(50)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void lite_pool_50_thread() {
TestObject object = pool.acquire();
if (object != null) pool.release(object);
}
@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 1)
public void aggregate_1Threads() {
aggregator.recordDouble(100.0056);
}
@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 10)
public void aggregate_10Threads() {
aggregator.recordLong(100);
}
@Benchmark
@Threads(1)
public void readWithProjectionIceberg() {
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
withTableProperties(tableProperties, () -> {
String tableLocation = table().location();
Dataset<Row> df = spark().read().format("iceberg").load(tableLocation).selectExpr("nested.col3");
materialize(df);
});
}
@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 1)
public void aggregate_1Threads() {
aggregator.recordLong(100);
}
@Benchmark
@Threads(value = 1)
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void addAttributesEventsStatusEnd_01Thread() {
doSpanWork(span);
}
@Benchmark
@Threads(value = 5)
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void addAttributesEventsStatusEnd_05Threads() {
doSpanWork(span);
}
@Benchmark
@Threads(value = 10)
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void addAttributesEventsStatusEnd_10Threads() {
doSpanWork(span);
}
@Benchmark
@Threads(1)
public void writeFileSource() {
Map<String, String> conf = Maps.newHashMap();
conf.put(SQLConf.PARQUET_COMPRESSION().key(), "gzip");
withSQLConf(conf, () -> benchmarkData().write().mode(SaveMode.Append).parquet(dataLocation()));
}
@Benchmark
@Threads(1)
public void readDoublesSparkVectorized5k() {
withSQLConf(sparkConfWithVectorizationEnabled(5000), () -> {
Dataset<Row> df = spark().read().parquet(dataLocation()).select("doubleCol");
materialize(df);
});
}
@Benchmark
@Threads(1)
public void readWithProjectionFileSource() {
Map<String, String> conf = Maps.newHashMap();
conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
withSQLConf(conf, () -> {
Dataset<Row> df = spark().read().format("avro").load(dataLocation()).select("nested.col3");
materialize(df);
});
}
@Benchmark
@Threads(1)
public void readWithProjectionFileSourceVectorized() {
Map<String, String> conf = Maps.newHashMap();
conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true");
conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
conf.put(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(), "true");
withSQLConf(conf, () -> {
Dataset<Row> df = spark().read().parquet(dataLocation()).selectExpr("nested.col3");
materialize(df);
});
}
@Benchmark
@Threads(1)
public void readFileSource() {
Map<String, String> conf = Maps.newHashMap();
conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
withSQLConf(conf, () -> {
Dataset<Row> df = spark().read().format("avro").load(dataLocation());
materialize(df);
});
}
@Benchmark
@Threads(1)
public void readFileSourceVectorized() {
Map<String, String> conf = Maps.newHashMap();
conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true");
conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
withSQLConf(conf, () -> {
Dataset<Row> df = spark().read().parquet(dataLocation());
materialize(df);
});
}