下面列出了怎么用org.openjdk.jmh.annotations.OperationsPerInvocation的API类实例代码及写法,或者点击链接到github查看源代码。
@Benchmark
@OperationsPerInvocation(POSITIONS)
public Object groupByHashPreCompute(BenchmarkData data)
{
GroupByHash groupByHash = new MultiChannelGroupByHash(data.getTypes(), data.getChannels(), data.getHashChannel(), EXPECTED_SIZE, false, getJoinCompiler(), NOOP);
data.getPages().forEach(p -> groupByHash.getGroupIds(p).process());
ImmutableList.Builder<Page> pages = ImmutableList.builder();
PageBuilder pageBuilder = new PageBuilder(groupByHash.getTypes());
for (int groupId = 0; groupId < groupByHash.getGroupCount(); groupId++) {
pageBuilder.declarePosition();
groupByHash.appendValuesTo(groupId, pageBuilder, 0);
if (pageBuilder.isFull()) {
pages.add(pageBuilder.build());
pageBuilder.reset();
}
}
pages.add(pageBuilder.build());
return pageBuilder.build();
}
@Benchmark
@OperationsPerInvocation(POSITIONS)
public Object addPagePreCompute(BenchmarkData data)
{
GroupByHash groupByHash = new MultiChannelGroupByHash(data.getTypes(), data.getChannels(), data.getHashChannel(), EXPECTED_SIZE, false, getJoinCompiler(), NOOP);
data.getPages().forEach(p -> groupByHash.addPage(p).process());
ImmutableList.Builder<Page> pages = ImmutableList.builder();
PageBuilder pageBuilder = new PageBuilder(groupByHash.getTypes());
for (int groupId = 0; groupId < groupByHash.getGroupCount(); groupId++) {
pageBuilder.declarePosition();
groupByHash.appendValuesTo(groupId, pageBuilder, 0);
if (pageBuilder.isFull()) {
pages.add(pageBuilder.build());
pageBuilder.reset();
}
}
pages.add(pageBuilder.build());
return pageBuilder.build();
}
@Benchmark
@OperationsPerInvocation(POSITIONS)
public Object bigintGroupByHash(SingleChannelBenchmarkData data)
{
GroupByHash groupByHash = new BigintGroupByHash(0, data.getHashEnabled(), EXPECTED_SIZE, NOOP);
data.getPages().forEach(p -> groupByHash.addPage(p).process());
ImmutableList.Builder<Page> pages = ImmutableList.builder();
PageBuilder pageBuilder = new PageBuilder(groupByHash.getTypes());
for (int groupId = 0; groupId < groupByHash.getGroupCount(); groupId++) {
pageBuilder.declarePosition();
groupByHash.appendValuesTo(groupId, pageBuilder, 0);
if (pageBuilder.isFull()) {
pages.add(pageBuilder.build());
pageBuilder.reset();
}
}
pages.add(pageBuilder.build());
return pageBuilder.build();
}
/** Benchmark for measuring HttpTraceContext extract. */
@Benchmark
@BenchmarkMode({Mode.AverageTime})
@Fork(1)
@Measurement(iterations = 15, time = 1)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 1)
@OperationsPerInvocation(COUNT)
@Nullable
public Context measureExtract() {
Context result = null;
for (int i = 0; i < COUNT; i++) {
result = httpTraceContext.extract(Context.ROOT, carriers.get(i), getter);
}
return result;
}
@Benchmark
@OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION)
public void serializerKryoThrift(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
ExecutionConfig executionConfig = env.getConfig();
executionConfig.enableForceKryo();
executionConfig.addDefaultKryoSerializer(org.apache.flink.benchmark.thrift.MyPojo.class, TBaseSerializer.class);
executionConfig.addDefaultKryoSerializer(org.apache.flink.benchmark.thrift.MyOperation.class, TBaseSerializer.class);
env.addSource(new ThriftPojoSource(RECORDS_PER_INVOCATION, 10))
.rebalance()
.addSink(new DiscardingSink<>());
env.execute();
}
@Benchmark
@OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION)
public void serializerKryoProtobuf(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
ExecutionConfig executionConfig = env.getConfig();
executionConfig.enableForceKryo();
executionConfig.registerTypeWithKryoSerializer(org.apache.flink.benchmark.protobuf.MyPojoOuterClass.MyPojo.class, ProtobufSerializer.class);
executionConfig.registerTypeWithKryoSerializer(org.apache.flink.benchmark.protobuf.MyPojoOuterClass.MyOperation.class, ProtobufSerializer.class);
env.addSource(new ProtobufPojoSource(RECORDS_PER_INVOCATION, 10))
.rebalance()
.addSink(new DiscardingSink<>());
env.execute();
}
@Benchmark
@OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION)
public void serializerKryo(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
ExecutionConfig executionConfig = env.getConfig();
executionConfig.enableForceKryo();
executionConfig.registerKryoType(MyPojo.class);
executionConfig.registerKryoType(MyOperation.class);
env.addSource(new PojoSource(RECORDS_PER_INVOCATION, 10))
.rebalance()
.addSink(new DiscardingSink<>());
env.execute();
}
@Benchmark
@OperationsPerInvocation(value = TwoInputBenchmark.RECORDS_PER_INVOCATION)
public void twoInputMapSink(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
env.setParallelism(1);
// Setting buffer timeout to 1 is an attempt to improve twoInputMapSink benchmark stability.
// Without 1ms buffer timeout, some JVM forks are much slower then others, making results
// unstable and unreliable.
env.setBufferTimeout(1);
long numRecordsPerInput = RECORDS_PER_INVOCATION / 2;
DataStreamSource<Long> source1 = env.addSource(new LongSource(numRecordsPerInput));
DataStreamSource<Long> source2 = env.addSource(new LongSource(numRecordsPerInput));
source1
.connect(source2)
.transform("custom operator", TypeInformation.of(Long.class), new MultiplyByTwoCoStreamMap())
.addSink(new DiscardingSink<>());
env.execute();
}
@Benchmark
@OperationsPerInvocation(value = TwoInputBenchmark.ONE_IDLE_RECORDS_PER_INVOCATION)
public void twoInputOneIdleMapSink(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
env.setParallelism(1);
QueuingLongSource.reset();
DataStreamSource<Long> source1 = env.addSource(new QueuingLongSource(1, ONE_IDLE_RECORDS_PER_INVOCATION - 1));
DataStreamSource<Long> source2 = env.addSource(new QueuingLongSource(2, 1));
source1
.connect(source2)
.transform("custom operator", TypeInformation.of(Long.class), new MultiplyByTwoCoStreamMap())
.addSink(new DiscardingSink<>());
env.execute();
}
@Benchmark
@OperationsPerInvocation(ITERATIONS)
public void nanoTime(Blackhole blackhole)
{
for (int i = 0; i < ITERATIONS; i++) {
blackhole.consume(System.nanoTime());
}
}
@Benchmark
@OperationsPerInvocation(ITERATIONS)
public void cpuTime(Blackhole blackhole)
{
for (int i = 0; i < ITERATIONS; i++) {
blackhole.consume(currentThreadCpuTime());
}
}
@Benchmark
@OperationsPerInvocation(ITERATIONS)
public void userTime(Blackhole blackhole)
{
for (int i = 0; i < ITERATIONS; i++) {
blackhole.consume(currentThreadUserTime());
}
}
@Benchmark
@OperationsPerInvocation(2)
public Object baselineLength2(BenchmarkData data)
throws Exception
{
data.input.setPosition(0);
unpackGeneric(data.buffer, 0, 2, data.bits, data.input);
return data.buffer;
}
@Benchmark
@OperationsPerInvocation(3)
public Object baselineLength3(BenchmarkData data)
throws Exception
{
data.input.setPosition(0);
unpackGeneric(data.buffer, 0, 3, data.bits, data.input);
return data.buffer;
}
@Benchmark
@OperationsPerInvocation(4)
public Object baselineLength4(BenchmarkData data)
throws Exception
{
data.input.setPosition(0);
unpackGeneric(data.buffer, 0, 4, data.bits, data.input);
return data.buffer;
}
@Benchmark
@OperationsPerInvocation(5)
public Object baselineLength5(BenchmarkData data)
throws Exception
{
data.input.setPosition(0);
unpackGeneric(data.buffer, 0, 5, data.bits, data.input);
return data.buffer;
}
@Benchmark
@OperationsPerInvocation(6)
public Object baselineLength6(BenchmarkData data)
throws Exception
{
data.input.setPosition(0);
unpackGeneric(data.buffer, 0, 6, data.bits, data.input);
return data.buffer;
}
@Benchmark
@OperationsPerInvocation(7)
public Object baselineLength7(BenchmarkData data)
throws Exception
{
data.input.setPosition(0);
unpackGeneric(data.buffer, 0, 7, data.bits, data.input);
return data.buffer;
}
@Benchmark
@OperationsPerInvocation(256)
public Object baselineLength256(BenchmarkData data)
throws Exception
{
data.input.setPosition(0);
unpackGeneric(data.buffer, 0, 256, data.bits, data.input);
return data.buffer;
}
@Benchmark
@OperationsPerInvocation(2)
public Object optimizedLength2(BenchmarkData data)
throws Exception
{
data.input.setPosition(0);
data.packer.unpack(data.buffer, 0, 2, data.bits, data.input);
return data.buffer;
}
@Benchmark
@OperationsPerInvocation(3)
public Object optimizedLength3(BenchmarkData data)
throws Exception
{
data.input.setPosition(0);
data.packer.unpack(data.buffer, 0, 3, data.bits, data.input);
return data.buffer;
}
@Benchmark
@OperationsPerInvocation(4)
public Object optimizedLength4(BenchmarkData data)
throws Exception
{
data.input.setPosition(0);
data.packer.unpack(data.buffer, 0, 4, data.bits, data.input);
return data.buffer;
}
@Benchmark
@OperationsPerInvocation(5)
public Object optimizedLength5(BenchmarkData data)
throws Exception
{
data.input.setPosition(0);
data.packer.unpack(data.buffer, 0, 5, data.bits, data.input);
return data.buffer;
}
@Benchmark
@OperationsPerInvocation(6)
public Object optimizedLength6(BenchmarkData data)
throws Exception
{
data.input.setPosition(0);
data.packer.unpack(data.buffer, 0, 6, data.bits, data.input);
return data.buffer;
}
@Benchmark
@OperationsPerInvocation(7)
public Object optimizedLength7(BenchmarkData data)
throws Exception
{
data.input.setPosition(0);
data.packer.unpack(data.buffer, 0, 7, data.bits, data.input);
return data.buffer;
}
@Benchmark
@OperationsPerInvocation(256)
public Object optimizedLength256(BenchmarkData data)
throws Exception
{
data.input.setPosition(0);
data.packer.unpack(data.buffer, 0, 256, data.bits, data.input);
return data.buffer;
}
@Benchmark
@OperationsPerInvocation(POSITIONS)
public long baseline(BaselinePagesData data)
{
int hashSize = arraySize(GROUP_COUNT, 0.9f);
int mask = hashSize - 1;
long[] table = new long[hashSize];
Arrays.fill(table, -1);
long groupIds = 0;
for (Page page : data.getPages()) {
Block block = page.getBlock(0);
int positionCount = block.getPositionCount();
for (int position = 0; position < positionCount; position++) {
long value = block.getLong(position, 0);
int tablePosition = (int) (value & mask);
while (table[tablePosition] != -1 && table[tablePosition] != value) {
tablePosition++;
}
if (table[tablePosition] == -1) {
table[tablePosition] = value;
groupIds++;
}
}
}
return groupIds;
}
@Benchmark
@OperationsPerInvocation(POSITIONS)
public long baselineBigArray(BaselinePagesData data)
{
int hashSize = arraySize(GROUP_COUNT, 0.9f);
int mask = hashSize - 1;
LongBigArray table = new LongBigArray(-1);
table.ensureCapacity(hashSize);
long groupIds = 0;
for (Page page : data.getPages()) {
Block block = page.getBlock(0);
int positionCount = block.getPositionCount();
for (int position = 0; position < positionCount; position++) {
long value = BIGINT.getLong(block, position);
int tablePosition = (int) XxHash64.hash(value) & mask;
while (table.get(tablePosition) != -1 && table.get(tablePosition) != value) {
tablePosition++;
}
if (table.get(tablePosition) == -1) {
table.set(tablePosition, value);
groupIds++;
}
}
}
return groupIds;
}
@Benchmark
@OperationsPerInvocation(POSITIONS * ARRAY_SIZE * NUM_TYPES)
public List<Optional<Page>> benchmark(BenchmarkData data)
{
return ImmutableList.copyOf(
data.getPageProcessor().process(
SESSION,
new DriverYieldSignal(),
newSimpleAggregatedMemoryContext().newLocalMemoryContext(PageProcessor.class.getSimpleName()),
data.getPage()));
}
@Benchmark
@OperationsPerInvocation(POSITIONS)
public List<Optional<Page>> mapConcat(BenchmarkData data)
{
return ImmutableList.copyOf(
data.getPageProcessor().process(
SESSION,
new DriverYieldSignal(),
newSimpleAggregatedMemoryContext().newLocalMemoryContext(PageProcessor.class.getSimpleName()),
data.getPage()));
}