下面列出了java.io.RandomAccessFile#getChannel ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public Segment(String segmentDirectory, String name, int index, boolean memoryMapped) {
this.segmentDirectory = segmentDirectory;
this.name = name;
this.index = index;
if (memoryMapped) {
log.debug("Using memory mapped files");
//TODO need a good way to guess the initial amount of bytes needed
buffer = IoBuffer.allocate(CHUNK_SIZE * (1024 * 4), false);
//direct buffers cannot be auto-expanded
buffer.setAutoExpand(true);
buffer.setAutoShrink(true);
} else {
log.debug("Using disk based files");
try {
file = new RandomAccessFile(String.format("%s%s_%s.ts", segmentDirectory, name, index), "rwd");
// get the channel
channel = file.getChannel();
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
}
public void open(String path, String filename, boolean overwrite)
throws IOException {
Os.createDirectory(new File(path));
String fullpath = path + Os.pathSeparator() + filename;
_name = filename;
File file = new File(fullpath);
if (file.exists()) {
if (overwrite) {
file.delete();
create(file);
} else {
_raf = new RandomAccessFile(file, "rw");
_filechannel = _raf.getChannel();
_bytebuffer = _filechannel.map(FileChannel.MapMode.READ_WRITE,
0, (int) _filechannel.size());
_firstDim = _bytebuffer.getInt();
_secondDim = _bytebuffer.getInt();
_thirdDim = _bytebuffer.getInt();
_mmap = _bytebuffer.asDoubleBuffer();
}
} else
create(file);
}
@Override
public void storeSession(@NonNull SignalProtocolAddress address, @NonNull SessionRecord record) {
synchronized (FILE_LOCK) {
try {
RandomAccessFile sessionFile = new RandomAccessFile(getSessionFile(address), "rw");
FileChannel out = sessionFile.getChannel();
out.position(0);
writeInteger(CURRENT_VERSION, out);
writeBlob(record.serialize(), out);
out.truncate(out.position());
sessionFile.close();
} catch (IOException e) {
throw new AssertionError(e);
}
}
}
static ReadBuffer create(RandomAccessFile file) throws IOException {
FileChannel ch = file.getChannel();
long size = ch.size();
// if file size is more than 2 GB and when file mapping is
// configured (default), use mapped file reader
if (canUseFileMap() && (size <= Integer.MAX_VALUE)) {
MappedByteBuffer buf;
try {
buf = ch.map(FileChannel.MapMode.READ_ONLY, 0, size);
ch.close();
return new MappedReadBuffer(buf);
} catch (IOException exp) {
exp.printStackTrace();
System.err.println("File mapping failed, will use direct read");
// fall through
}
} // else fall through
return new FileReadBuffer(file);
}
static ReadBuffer create(RandomAccessFile file) throws IOException {
FileChannel ch = file.getChannel();
long size = ch.size();
// if file size is more than 2 GB and when file mapping is
// configured (default), use mapped file reader
if (canUseFileMap() && (size <= Integer.MAX_VALUE)) {
MappedByteBuffer buf;
try {
buf = ch.map(FileChannel.MapMode.READ_ONLY, 0, size);
ch.close();
return new MappedReadBuffer(buf);
} catch (IOException exp) {
exp.printStackTrace();
System.err.println("File mapping failed, will use direct read");
// fall through
}
} // else fall through
return new FileReadBuffer(file);
}
/**
* Opens an input stream and locks the underlying file.
* Locking is done via the @java.nio.channels.FileLock class.
* On windows, this is just like a normal input stream.
* On Linux and the likes it uses an "fcntl" type lock.
* REMEMBER TO CLOSE THE INPUT STREAM.
* @param filename to open
* @throws FileNotFoundException
* @throws AdeUsageException
*/
public LockedInputStream(final String filename) throws FileNotFoundException, AdeUsageException {
m_lock = null;
m_filename = filename;
// Get a file channel for the file
m_raf = new RandomAccessFile(filename, "rw");
FileChannel channel = m_raf.getChannel();
// Use the file channel to create a lock on the file.
// This method blocks until it can retrieve the lock.
try {
m_lock = channel.lock();
} catch (IOException e) {
throw new AdeUsageException("Failed locking " + filename, e);
}
}
private static String nio(String pFilePath) throws FileNotFoundException,IOException{
RandomAccessFile file = new RandomAccessFile(pFilePath, "r");
FileChannel fileChannel = file.getChannel();
ByteBuffer buffer = ByteBuffer.allocateDirect((int) fileChannel.size());
fileChannel.read(buffer);
buffer.flip();
CharBuffer charBuffer = Charset.forName("utf-8").decode(buffer);
file.close();
BufferedReader bufferedReader = new BufferedReader(new StringReader(charBuffer.toString()));
StringBuilder sb = new StringBuilder();
String str = null;
while((str = bufferedReader.readLine()) != null){
sb.append(str);
}
return sb.toString();
}
/**
* Creates a new channel to the path indicated by the given ID. The channel hands IO requests to
* the given request queue to be processed.
*
* @param channelID The id describing the path of the file that the channel accessed.
* @param requestQueue The queue that this channel hands its IO requests to.
* @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
* than in read-only mode.
* @throws IOException Thrown, if the channel could no be opened.
*/
protected ChannelAccess(Channel.ID channelID, RequestQueue<R> requestQueue, boolean writeEnabled)
throws IOException
{
if (channelID == null || requestQueue == null) {
throw new NullPointerException();
}
this.id = channelID;
this.requestQueue = requestQueue;
try {
RandomAccessFile file = new RandomAccessFile(id.getPath(), writeEnabled ? "rw" : "r");
this.fileChannel = file.getChannel();
}
catch (IOException e) {
throw new IOException("Channel to path '" + channelID.getPath() + "' could not be opened.", e);
}
}
@Override
public long truncateFileAtURL(LocalFilesystemURL inputURL, long size) throws IOException {
File file = new File(filesystemPathForURL(inputURL));
if (!file.exists()) {
throw new FileNotFoundException("File at " + inputURL.uri + " does not exist.");
}
RandomAccessFile raf = new RandomAccessFile(filesystemPathForURL(inputURL), "rw");
try {
if (raf.length() >= size) {
FileChannel channel = raf.getChannel();
channel.truncate(size);
return size;
}
return raf.length();
} finally {
raf.close();
}
}
public FadvisedFileRegion(RandomAccessFile file, long position, long count,
boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
String identifier, int shuffleBufferSize,
boolean shuffleTransferToAllowed) throws IOException {
super(file.getChannel(), position, count);
this.manageOsCache = manageOsCache;
this.readaheadLength = readaheadLength;
this.readaheadPool = readaheadPool;
this.fd = file.getFD();
this.identifier = identifier;
this.fileChannel = file.getChannel();
this.count = count;
this.position = position;
this.shuffleBufferSize = shuffleBufferSize;
this.shuffleTransferToAllowed = shuffleTransferToAllowed;
}
@Override
public StorageResource allocateResource() throws Exception {
StorageResource resource = null;
if (allocatedSize < RdmaConstants.STORAGE_RDMA_STORAGE_LIMIT){
//mmap buffer
int fileId = fileCount++;
String dataFilePath = dataDirPath + "/" + fileId;
RandomAccessFile dataFile = new RandomAccessFile(dataFilePath, "rw");
if (!RdmaConstants.STORAGE_RDMA_PERSISTENT){
dataFile.setLength(RdmaConstants.STORAGE_RDMA_ALLOCATION_SIZE);
}
FileChannel dataChannel = dataFile.getChannel();
ByteBuffer dataBuffer = dataChannel.map(MapMode.READ_WRITE, 0, RdmaConstants.STORAGE_RDMA_ALLOCATION_SIZE);
dataFile.close();
dataChannel.close();
//register buffer
allocatedSize += dataBuffer.capacity();
IbvMr mr = datanodeServerEndpoint.registerMemory(dataBuffer).execute().free().getMr();
//create resource
resource = StorageResource.createResource(mr.getAddr(), mr.getLength(), mr.getLkey());
}
return resource;
}
@Override
protected ReadableByteChannel getDirectReadableChannel() throws ContentIOException
{
try
{
// the file must exist
if (!file.exists())
{
throw new IOException("File does not exist: " + file);
}
// create the channel
ReadableByteChannel channel = null;
if (allowRandomAccess)
{
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r"); // won't create it
channel = randomAccessFile.getChannel();
}
else
{
InputStream is = new FileInputStream(file);
channel = Channels.newChannel(is);
}
// done
if (logger.isDebugEnabled())
{
logger.debug("Opened read channel to file: \n" +
" file: " + file + "\n" +
" random-access: " + allowRandomAccess);
}
return channel;
}
catch (Throwable e)
{
throw new ContentIOException("Failed to open file channel: " + this, e);
}
}
public Slopbucket(File slop_file, EventLog log)
throws IOException
{
this.log = log;
this.slop_file = slop_file;
RandomAccessFile raf = new RandomAccessFile(slop_file, "rw");
file_channel = raf.getChannel();
open_buffers = new TreeMap<>();
}
private FixBpmPackagesPatchHelper() throws IOException
{
// put the log file into a long life temp directory
File tempDir = TempFileProvider.getLongLifeTempDir("patches");
logFile = new File(tempDir, "FixBpmPackagesPatch.log");
// open the file for appending
RandomAccessFile outputFile = new RandomAccessFile(logFile, "rw");
channel = outputFile.getChannel();
// move to the end of the file
channel.position(channel.size());
// add a newline and it's ready
writeLine("").writeLine("");
writeLine("FixBpmPackagesPatch executing on " + new Date());
}
/**
* Read the inflights file and return a
* {@link com.google.common.collect.SetMultimap}
* of transactionIDs to events that were inflight.
*
* @return - map of inflight events per txnID.
*
*/
public SetMultimap<Long, Long> deserialize()
throws IOException, BadCheckpointException {
SetMultimap<Long, Long> inflights = HashMultimap.create();
if (!fileChannel.isOpen()) {
file = new RandomAccessFile(inflightEventsFile, "rw");
fileChannel = file.getChannel();
}
if(file.length() == 0) {
return inflights;
}
file.seek(0);
byte[] checksum = new byte[16];
file.read(checksum);
ByteBuffer buffer = ByteBuffer.allocate(
(int)(file.length() - file.getFilePointer()));
fileChannel.read(buffer);
byte[] fileChecksum = digest.digest(buffer.array());
if (!Arrays.equals(checksum, fileChecksum)) {
throw new BadCheckpointException("Checksum of inflights file differs"
+ " from the checksum expected.");
}
buffer.position(0);
LongBuffer longBuffer = buffer.asLongBuffer();
try {
while (true) {
long txnID = longBuffer.get();
int numEvents = (int)(longBuffer.get());
for(int i = 0; i < numEvents; i++) {
long val = longBuffer.get();
inflights.put(txnID, val);
}
}
} catch (BufferUnderflowException ex) {
LOG.debug("Reached end of inflights buffer. Long buffer position ="
+ String.valueOf(longBuffer.position()));
}
return inflights;
}
public static void main(String[] args) throws Exception {
int initialSize = 20480*1024;
int maximumMapSize = 16*1024*1024;
int maximumFileSize = 300000000;
File file = File.createTempFile("exp", "tmp");
file.deleteOnExit();
RandomAccessFile f = new RandomAccessFile(file, "rw");
f.setLength(initialSize);
FileChannel fc = f.getChannel();
ByteBuffer[] buffers = new ByteBuffer[128];
System.out.format("map %d -> %d\n", 0, initialSize);
buffers[0] = fc.map(FileChannel.MapMode.READ_WRITE, 0, initialSize);
int currentBuffer = 0;
int currentSize = initialSize;
int currentPosition = 0;
ArrayList<String> junk = new ArrayList<String>();
while (currentPosition+currentSize < maximumFileSize) {
int inc = Math.max(1000*1024, (currentPosition+currentSize)/8);
int size = currentPosition+currentSize+inc;
f.setLength(size);
while (currentSize+inc > maximumMapSize) {
if (currentSize < maximumMapSize) {
System.out.format("map %d -> %d\n", currentPosition,
(currentPosition + maximumMapSize));
buffers[currentBuffer] = fc.map(FileChannel.MapMode.READ_WRITE,
currentPosition, maximumMapSize);
fillBuffer(buffers[currentBuffer], currentSize);
}
currentPosition += maximumMapSize;
inc = currentSize+inc-maximumMapSize;
currentSize = 0;
currentBuffer++;
if (currentBuffer == buffers.length) {
ByteBuffer[] old = buffers;
buffers = new ByteBuffer[currentBuffer+currentBuffer/2];
System.arraycopy(old, 0, buffers, 0, currentBuffer); }
}
currentSize += inc;
if (currentSize > 0) {
System.out.format("map %d -> %d\n", currentPosition,
(currentPosition + currentSize));
buffers[currentBuffer] = fc.map(FileChannel.MapMode.READ_WRITE,
currentPosition, currentSize);
fillBuffer(buffers[currentBuffer], currentSize-inc);
}
// busy loop needed to reproduce issue
long t = System.currentTimeMillis();
while (System.currentTimeMillis() < t+500) {
junk.add(String.valueOf(t));
if (junk.size() > 100000) junk.clear();
}
}
fc.close();
// cleanup the ref to mapped buffers so they can be GCed
for (int i = 0; i < buffers.length; i++)
buffers[i] = null;
System.gc();
// Take a nap to wait for the Cleaner to cleanup those unrefed maps
Thread.sleep(1000);
System.out.println("TEST PASSED");
}
/**
* Test for blockIdCK with block corruption
*/
@Test
public void testBlockIdCKCorruption() throws Exception {
short NUM_DN = 1;
final long blockSize = 512;
Random random = new Random();
DFSClient dfsClient;
LocatedBlocks blocks;
ExtendedBlock block;
short repFactor = 1;
String [] racks = {"/rack1"};
String [] hosts = {"host1"};
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
// Set short retry timeouts so this test runs faster
conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
MiniDFSCluster cluster = null;
DistributedFileSystem dfs = null;
try {
cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts)
.racks(racks).build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", dfs);
DFSTestUtil util = new DFSTestUtil.Builder().
setName(getClass().getSimpleName()).setNumFiles(1).build();
//create files
final String pathString = new String("/testfile");
final Path path = new Path(pathString);
util.createFile(dfs, path, 1024, repFactor, 1000L);
util.waitReplication(dfs, path, repFactor);
StringBuilder sb = new StringBuilder();
for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
}
String[] bIds = sb.toString().split(" ");
//make sure block is healthy before we corrupt it
String outStr = runFsck(conf, 0, true, "/", "-blockId", bIds[0]);
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
// corrupt replicas
block = DFSTestUtil.getFirstBlock(dfs, path);
File blockFile = cluster.getBlockFile(0, block);
if (blockFile != null && blockFile.exists()) {
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
FileChannel channel = raFile.getChannel();
String badString = "BADBAD";
int rand = random.nextInt((int) channel.size()/2);
raFile.seek(rand);
raFile.write(badString.getBytes());
raFile.close();
}
util.waitCorruptReplicas(dfs, cluster.getNamesystem(), path, block, 1);
outStr = runFsck(conf, 1, false, "/", "-blockId", block.getBlockName());
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Deprecated
public FillingNIOParser(RandomAccessFile raf) {
this(raf.getChannel());
}
private MappedFile(RandomAccessFile raf, long size) {
this.raf = raf;
this.size = pageAlign(size);
this.channel = raf.getChannel();
this.address = map();
}
/**
* Note: This is synchronized so that concurrent writers must block during
* this operation.
*/
synchronized public long transferTo(final RandomAccessFile out)
throws IOException {
final long count = nextOffset.get();
final FileChannel outChannel = out.getChannel();
// current position on the output channel.
final long toPosition = outChannel.position();
// if(toPosition + count > Integer.MAX_VALUE) {
//
// throw new IOException("Would exceed int32 bytes.");
//
// }
/*
* Write all the data onto the output channel.
*/
// final long begin = System.currentTimeMillis();
// setup the buffer for the operation.
buffer.limit((int)count);//nextOffset);
buffer.position(0);
FileChannelUtility.writeAll(outChannel, buffer, toPosition);
outChannel.position(toPosition + count);
// // write the data : @todo use an explicit position for this write?
// final long nwritten = outChannel.write(buffer);
//
// if( nwritten != count ) {
//
// throw new AssertionError("Expected to write " + count
// + " bytes but wrote " + nwritten);
//
// }
//
// final long elapsed = System.currentTimeMillis() - begin;
//
// log.info("\nTransferred " + count
// + " bytes from memory to disk at offset=" + toPosition + " in "
// + elapsed + "ms");
return count;
}