下面列出了java.nio.MappedByteBuffer#putInt ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void damageCommitlog(long offset) throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
File file = new File(messageStoreConfig.getStorePathCommitLog() + File.separator + "00000000000000000000");
FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 10);
int bodyLen = mappedByteBuffer.getInt((int) offset + 84);
int topicLenIndex = (int) offset + 84 + bodyLen + 4;
mappedByteBuffer.position(topicLenIndex);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.force();
fileChannel.force(true);
fileChannel.close();
}
private void damageCommitlog(long offset) throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
File file = new File(messageStoreConfig.getStorePathCommitLog() + File.separator + "00000000000000000000");
FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 10);
int bodyLen = mappedByteBuffer.getInt((int) offset + 84);
int topicLenIndex = (int) offset + 84 + bodyLen + 4;
mappedByteBuffer.position(topicLenIndex);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.force();
fileChannel.force(true);
fileChannel.close();
}
private void damageCommitlog(long offset) throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
File file = new File(messageStoreConfig.getStorePathCommitLog() + File.separator + "00000000000000000000");
FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 10);
int bodyLen = mappedByteBuffer.getInt((int) offset + 84);
int topicLenIndex = (int) offset + 84 + bodyLen + 4;
mappedByteBuffer.position(topicLenIndex);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.force();
fileChannel.force(true);
fileChannel.close();
}
private void damageCommitlog(long offset) throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
File file = new File(messageStoreConfig.getStorePathCommitLog() + File.separator + "00000000000000000000");
FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 10);
int bodyLen = mappedByteBuffer.getInt((int) offset + 84);
int topicLenIndex = (int) offset + 84 + bodyLen + 4;
mappedByteBuffer.position(topicLenIndex);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.force();
fileChannel.force(true);
fileChannel.close();
}
private boolean create(String path) throws IOException {
File file = new File(path);
if (file.exists() == false) {
if (file.createNewFile() == false) {
return false;
}
RandomAccessFile raFile = new RandomAccessFile(file, "rwd");
FileChannel fc = raFile.getChannel();
MappedByteBuffer mappedByteBuffer = fc.map(MapMode.READ_WRITE, 0, this.fileLimitLength);
mappedByteBuffer.put(LogEntity.MAGIC.getBytes());
mappedByteBuffer.putInt(1);// 8 version
mappedByteBuffer.putInt(-1);// 12next fileindex
mappedByteBuffer.putInt(-2);// 16
mappedByteBuffer.force();
MappedByteBufferUtil.clean(mappedByteBuffer);
fc.close();
raFile.close();
return true;
} else {
return false;
}
}
private boolean create(String path) throws IOException {
File file = new File(path);
if (file.exists() == false) {
if (file.createNewFile() == false) {
return false;
}
RandomAccessFile raFile = new RandomAccessFile(file, "rwd");
FileChannel fc = raFile.getChannel();
MappedByteBuffer mappedByteBuffer = fc.map(MapMode.READ_WRITE, 0, this.fileLimitLength);
mappedByteBuffer.put(LogEntity.MAGIC.getBytes());
mappedByteBuffer.putInt(1);// 8 version
mappedByteBuffer.putInt(-1);// 12next fileindex
mappedByteBuffer.putInt(-2);// 16
mappedByteBuffer.force();
MappedByteBufferUtil.clean(mappedByteBuffer);
fc.close();
raFile.close();
return true;
} else {
return false;
}
}
private void flushBuffer() throws BufferOverflowException {
if (pagedIn) {
MappedByteBuffer buffer;
// Only flush if page was changed
if (dirtyPage) {
Collection<byte[]> values = pageMap.values();
buffer = spillFile.getPage(pageIndex);
buffer.clear();
// number of elements
buffer.putInt(values.size());
for (byte[] value : values) {
// element length
buffer.putInt(value.length);
// element
buffer.put(value, 0, value.length);
}
}
buffer = null;
// Reset page stats
pageMap.clear();
totalResultSize = 0;
}
pagedIn = false;
dirtyPage = false;
}
private void save()
{
Assert.assertTrue(max_data_size >= getMinAlloc());
Assert.assertNotEquals(0, data_loc);
Assert.assertNotNull(key);
Assert.assertNotNull(value);
int file = (int) (data_loc / SEGMENT_FILE_SIZE);
MappedByteBuffer mbb = getBufferMap(file);
int offset = (int) (data_loc % SEGMENT_FILE_SIZE);
synchronized(mbb)
{
mbb.position(offset);
mbb.putInt(max_data_size);
mbb.putShort((short)key.size());
mbb.put(key.toByteArray());
mbb.putInt(value.size());
mbb.put(value.toByteArray());
}
}
private void flushBuffer() throws BufferOverflowException {
if (pagedIn) {
MappedByteBuffer buffer;
// Only flush if page was changed
if (dirtyPage) {
Collection<byte[]> values = pageMap.values();
buffer = spillFile.getPage(pageIndex);
buffer.clear();
// number of elements
buffer.putInt(values.size());
for (byte[] value : values) {
// element length
buffer.putInt(value.length);
// element
buffer.put(value, 0, value.length);
}
}
buffer = null;
// Reset page stats
pageMap.clear();
totalResultSize = 0;
}
pagedIn = false;
dirtyPage = false;
}
/**
* Write header.
*
* @param buffer the buffer
*/
private void writeHeader(MappedByteBuffer buffer) {
buffer.putInt(newHeaderSize);
buffer.putInt((int) newArchiveSize);
buffer.putShort((short) newFormatVersion);
buffer.putShort((short) newSectorSizeShift);
buffer.putInt((int) newHashPos);
buffer.putInt((int) newBlockPos);
buffer.putInt(newHashSize);
buffer.putInt(newBlockSize);
// TODO add full write support for versions above 1
}
private void flush(T entry) throws IOException {
Queue<T> inMemQueue = getInMemoryQueue();
int resultSize = sizeOf(entry);
maxResultSize = Math.max(maxResultSize, resultSize);
totalResultSize = hasMaxQueueSize ? maxResultSize * inMemQueue.size() : (totalResultSize + resultSize);
if (totalResultSize >= thresholdBytes) {
this.file = File.createTempFile(UUID.randomUUID().toString(), null);
RandomAccessFile af = new RandomAccessFile(file, "rw");
FileChannel fc = af.getChannel();
int writeIndex = 0;
mappingSize = Math.min(Math.max(maxResultSize, DEFAULT_MAPPING_SIZE), totalResultSize);
MappedByteBuffer writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
int resSize = inMemQueue.size();
for (int i = 0; i < resSize; i++) {
T e = inMemQueue.poll();
writeToBuffer(writeBuffer, e);
// buffer close to exhausted, re-map.
if (mappingSize - writeBuffer.position() < maxResultSize) {
writeIndex += writeBuffer.position();
writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
}
}
writeBuffer.putInt(EOF); // end
fc.force(true);
fc.close();
af.close();
flushedCount = resSize;
inMemQueue.clear();
flushBuffer = true;
}
}
@SuppressWarnings("deprecation")
@Override
protected void writeToBuffer(MappedByteBuffer buffer, Tuple e) {
KeyValue kv = KeyValueUtil.ensureKeyValue(e.getValue(0));
buffer.putInt(kv.getLength() + Bytes.SIZEOF_INT);
buffer.putInt(kv.getLength());
buffer.put(kv.getBuffer(), kv.getOffset(), kv.getLength());
}
protected void writeInt(long position, int value)
{
int file = (int) (position / SEGMENT_FILE_SIZE);
int offset_in_file = (int) (position % SEGMENT_FILE_SIZE);
MappedByteBuffer mbb = getBufferMap(file);
synchronized(mbb)
{
mbb.position(offset_in_file);
mbb.putInt(value);
}
}
@Override
public void writeLittleEndianInt(int value) throws IOException {
MappedByteBuffer curChunk = getCurChunk();
if (curChunk.remaining() >= 4) {
curChunk.putInt(value);
return;
}
// Value is on the chunk boundary - edge case so it is ok if it's a bit slower.
writeUnsignedByte((value) & 0xFF);
writeUnsignedByte((value >>> 8) & 0xFF);
writeUnsignedByte((value >>> 16) & 0xFF);
writeUnsignedByte((value >>> 24) & 0xFF);
}
protected void putKeyValueTable(long table_pos, RecordEntry put_re)
{
long t1=System.nanoTime();
int hash_file = (int) (table_pos / SEGMENT_FILE_SIZE);
MappedByteBuffer hash_mbb = getBufferMap(hash_file);
int file_offset = (int) (table_pos % SEGMENT_FILE_SIZE);
int max;
int items;
long next_ptr;
synchronized(hash_mbb)
{
hash_mbb.position(file_offset + (int) LOCATION_HASH_MAX);
max = hash_mbb.getInt();
items = hash_mbb.getInt();
next_ptr = hash_mbb.getLong();
}
Assert.assertTrue("Max " + max + " items " + items + " table at " + table_pos + " file " + hash_file + " file offset " + file_offset ,max > items);
Assert.assertTrue(max > 4);
Assert.assertTrue(items >= 0);
//DeterministicStream det_stream = new DeterministicStream(key);
//int loc = det_stream.nextInt(max);
int hash = put_re.getKey().hashCode();
int loc = Math.abs(hash % max);
if (loc < 0) loc = 0;
double full = (double) items / (double) max;
while(true)
{
Assert.assertTrue(loc >= 0);
Assert.assertTrue(loc < max);
synchronized(hash_mbb)
{
long t1_check = System.nanoTime();
hash_mbb.position(file_offset + LOCATION_HASH_START + loc * 8);
long ptr = hash_mbb.getLong();
TimeRecord.record(t1_check, "slop_get_ptr");
if ((ptr == 0) && (full >= HASH_FULL))
{
// It isn't here and the hash is full, move on to next table
if (next_ptr == 0)
{
next_ptr = makeNewHashTable(max * HASH_MULTIPLCATION);
hash_mbb.position(file_offset + (int) LOCATION_HASH_NEXT);
hash_mbb.putLong(next_ptr);
}
TimeRecord.record(t1, "slop_put_key_value_table_rec");
putKeyValueTable(next_ptr, put_re);
return;
}
RecordEntry re = null;
if (ptr != 0)
{
re = new RecordEntry(ptr);
if (!re.getKey().equals(put_re.getKey()))
{
re = null;
}
}
if ((ptr == 0) || (re!=null))
{
//If we have an empty or a key match
long data_loc = put_re.storeItem(ptr);
hash_mbb.position(file_offset + LOCATION_HASH_START + loc * 8);
hash_mbb.putLong(data_loc);
if (ptr == 0)
{
hash_mbb.position(file_offset + LOCATION_HASH_ITEMS);
items++;
hash_mbb.putInt(items);
}
TimeRecord.record(t1, "slop_put_key_value_table_add");
return;
}
}
//loc = det_stream.nextInt(max);
loc = (loc + 131 ) % max;
}
}