下面列出了怎么用org.apache.hadoop.io.SequenceFile.Reader的API类实例代码及写法,或者点击链接到github查看源代码。
public CubeStatsResult(Path path, int precision) throws IOException {
Configuration hadoopConf = HadoopUtil.getCurrentConfiguration();
Option seqInput = SequenceFile.Reader.file(path);
try (Reader reader = new SequenceFile.Reader(hadoopConf, seqInput)) {
LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf);
BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), hadoopConf);
while (reader.next(key, value)) {
if (key.get() == 0L) {
percentage = Bytes.toInt(value.getBytes());
} else if (key.get() == -1) {
mapperOverlapRatio = Bytes.toDouble(value.getBytes());
} else if (key.get() == -2) {
mapperNumber = Bytes.toInt(value.getBytes());
} else if (key.get() == -3) {
sourceRecordCount = Bytes.toLong(value.getBytes());
} else if (key.get() > 0) {
HLLCounter hll = new HLLCounter(precision);
ByteArray byteArray = new ByteArray(value.getBytes());
hll.readRegisters(byteArray.asBuffer());
counterMap.put(key.get(), hll);
}
}
}
}
@Ignore("convenient trial tool for dev")
@Test
public void test() throws IOException, InterruptedException {
Configuration hconf = HadoopUtil.getCurrentConfiguration();
HiveToBaseCuboidMapper mapper = new HiveToBaseCuboidMapper();
Context context = MockupMapContext.create(hconf, metadataUrl, cubeName, null);
mapper.doSetup(context);
Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
Text value = new Text();
while (reader.next(key, value)) {
mapper.map(key, value, context);
}
reader.close();
}
static private void finalize(Configuration conf, JobConf jobconf,
final Path destPath, String presevedAttributes) throws IOException {
if (presevedAttributes == null) {
return;
}
EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
if (!preseved.contains(FileAttribute.USER)
&& !preseved.contains(FileAttribute.GROUP)
&& !preseved.contains(FileAttribute.PERMISSION)) {
return;
}
FileSystem dstfs = destPath.getFileSystem(conf);
Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
try (SequenceFile.Reader in =
new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) {
Text dsttext = new Text();
FilePair pair = new FilePair();
for(; in.next(dsttext, pair); ) {
Path absdst = new Path(destPath, pair.output);
updateDestStatus(pair.input, dstfs.getFileStatus(absdst),
preseved, dstfs);
}
}
}
public void testJavaSerialization() throws Exception {
Path file = new Path(System.getProperty("test.build.data",".") +
"/testseqser.seq");
fs.delete(file, true);
Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
String.class);
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
Reader reader = new Reader(fs, file, conf);
assertEquals(1L, reader.next((Object) null));
assertEquals("one", reader.getCurrentValue((Object) null));
assertEquals(2L, reader.next((Object) null));
assertEquals("two", reader.getCurrentValue((Object) null));
assertNull(reader.next((Object) null));
reader.close();
}
static private void finalize(Configuration conf, JobConf jobconf,
final Path destPath, String presevedAttributes) throws IOException {
if (presevedAttributes == null) {
return;
}
EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
if (!preseved.contains(FileAttribute.USER)
&& !preseved.contains(FileAttribute.GROUP)
&& !preseved.contains(FileAttribute.PERMISSION)) {
return;
}
FileSystem dstfs = destPath.getFileSystem(conf);
Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
try (SequenceFile.Reader in =
new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) {
Text dsttext = new Text();
FilePair pair = new FilePair();
for(; in.next(dsttext, pair); ) {
Path absdst = new Path(destPath, pair.output);
updateDestStatus(pair.input, dstfs.getFileStatus(absdst),
preseved, dstfs);
}
}
}
public void testJavaSerialization() throws Exception {
Path file = new Path(System.getProperty("test.build.data",".") +
"/testseqser.seq");
fs.delete(file, true);
Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
String.class);
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
Reader reader = new Reader(fs, file, conf);
assertEquals(1L, reader.next((Object) null));
assertEquals("one", reader.getCurrentValue((Object) null));
assertEquals(2L, reader.next((Object) null));
assertEquals("two", reader.getCurrentValue((Object) null));
assertNull(reader.next((Object) null));
reader.close();
}
public static void main(String[] args) throws IOException {
String uri = args[0];
String split = args[1];
Configuration conf = new Configuration();
Path path = new Path(uri);
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(conf, Reader.file(path));
Text key = new Text();
OrdImageWritable value = new OrdImageWritable();
int num = 0;
while (reader.next(key, value)) {
System.out.println(key.toString() + " " + value.getByteLength());
ImageIO.write(value.getImage(), "jpg", new File("image" +split+"_" + num++ + ".jpg"));
}
} finally {
IOUtils.closeStream(reader);
}
}
public CubeStatsResult(Path path, int precision) throws IOException {
Configuration hadoopConf = HadoopUtil.getCurrentConfiguration();
Option seqInput = SequenceFile.Reader.file(path);
try (Reader reader = new SequenceFile.Reader(hadoopConf, seqInput)) {
LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf);
BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), hadoopConf);
while (reader.next(key, value)) {
if (key.get() == 0L) {
percentage = Bytes.toInt(value.getBytes());
} else if (key.get() == -1) {
mapperOverlapRatio = Bytes.toDouble(value.getBytes());
} else if (key.get() == -2) {
mapperNumber = Bytes.toInt(value.getBytes());
} else if (key.get() == -3) {
sourceRecordCount = Bytes.toLong(value.getBytes());
} else if (key.get() > 0) {
HLLCounter hll = new HLLCounter(precision);
ByteArray byteArray = new ByteArray(value.getBytes());
hll.readRegisters(byteArray.asBuffer());
counterMap.put(key.get(), hll);
}
}
}
}
@Ignore("convenient trial tool for dev")
@Test
public void test() throws IOException, InterruptedException {
Configuration hconf = HadoopUtil.getCurrentConfiguration();
HiveToBaseCuboidMapper mapper = new HiveToBaseCuboidMapper();
Context context = MockupMapContext.create(hconf, metadataUrl, cubeName, null);
mapper.doSetup(context);
Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
Text value = new Text();
while (reader.next(key, value)) {
mapper.map(key, value, context);
}
reader.close();
}
@Ignore("convenient trial tool for dev")
@Test
public void test() throws IOException, InterruptedException {
Configuration hconf = new Configuration();
BaseCuboidMapper mapper = new BaseCuboidMapper();
Context context = MockupMapContext.create(hconf, metadataUrl, cubeName, null);
mapper.setup(context);
Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
Text value = new Text();
while (reader.next(key, value)) {
mapper.map(key, value, context);
}
reader.close();
}
private void loadGenerations() throws IOException {
FileSystem fileSystem = _path.getFileSystem(_configuration);
FileStatus[] listStatus = fileSystem.listStatus(_path);
SortedSet<FileStatus> existing = new TreeSet<FileStatus>(Arrays.asList(listStatus));
if (existing.isEmpty()) {
return;
}
FileStatus last = existing.last();
Reader reader = new SequenceFile.Reader(fileSystem, last.getPath(), _configuration);
Text key = new Text();
LongWritable value = new LongWritable();
while (reader.next(key, value)) {
String name = key.toString();
long gen = value.get();
_namesToGenerations.put(name, gen);
Set<String> names = _generationsToNames.get(gen);
if (names == null) {
names = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
_generationsToNames.put(gen, names);
}
names.add(name);
}
reader.close();
existing.remove(last);
cleanupOldFiles(fileSystem, existing);
}
private void walkOutput(Path output, Configuration conf, ResultReader resultReader) throws IOException {
FileSystem fileSystem = output.getFileSystem(conf);
FileStatus fileStatus = fileSystem.getFileStatus(output);
if (fileStatus.isDir()) {
FileStatus[] listStatus = fileSystem.listStatus(output, new PathFilter() {
@Override
public boolean accept(Path path) {
return !path.getName().startsWith("_");
}
});
for (FileStatus fs : listStatus) {
walkOutput(fs.getPath(), conf, resultReader);
}
} else {
Reader reader = new SequenceFile.Reader(fileSystem, output, conf);
Text rowId = new Text();
TableBlurRecord tableBlurRecord = new TableBlurRecord();
while (reader.next(rowId, tableBlurRecord)) {
resultReader.read(rowId, tableBlurRecord);
}
reader.close();
}
}
public Reader build() throws IOException
{
when(sequenceFileReader.getKeyClass()).thenReturn(keyValueHelper.getKeyClass());
when(sequenceFileReader.getValueClass()).thenReturn(keyValueHelper.getValueClass());
when(sequenceFileReader.next(any(Writable.class), any(Writable.class))).thenAnswer(new Answer<Boolean>()
{
@Override
public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable
{
// Get the key and value
Object[] args = invocationOnMock.getArguments();
Writable key = (Writable) args[0];
Writable value = (Writable) args[1];
return keyValueHelper.next(key, value);
}
});
return sequenceFileReader;
}
public void testJavaSerialization() throws Exception {
Path file = new Path(System.getProperty("test.build.data",".") +
"/test.seq");
fs.delete(file, true);
Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
String.class);
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
Reader reader = new Reader(fs, file, conf);
assertEquals(1L, reader.next((Object) null));
assertEquals("one", reader.getCurrentValue((Object) null));
assertEquals(2L, reader.next((Object) null));
assertEquals("two", reader.getCurrentValue((Object) null));
assertNull(reader.next((Object) null));
reader.close();
}
public void testJavaSerialization() throws Exception {
Path file = new Path(System.getProperty("test.build.data",".") +
"/test.seq");
fs.delete(file, true);
Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
String.class);
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
Reader reader = new Reader(fs, file, conf);
assertEquals(1L, reader.next((Object) null));
assertEquals("one", reader.getCurrentValue((Object) null));
assertEquals(2L, reader.next((Object) null));
assertEquals("two", reader.getCurrentValue((Object) null));
assertNull(reader.next((Object) null));
reader.close();
}
public static void readSequenceFile(String path) throws Exception{
Reader.Option filePath = Reader.file(new Path(path));//指定文件路径
Reader sqReader = new Reader(configuration,filePath);//构造read而对象
Writable key = (Writable)ReflectionUtils.newInstance(sqReader.getKeyClass(), configuration);
Writable value = (Writable)ReflectionUtils.newInstance(sqReader.getValueClass(), configuration);
while(sqReader.next(key, value)){
logger.info("key:"+key+",value:"+value);
}
}
@Override
public Set<FlowFile> readSequenceFile(final Path file, Configuration configuration, FileSystem fileSystem) throws IOException {
Set<FlowFile> flowFiles = new HashSet<>();
final SequenceFile.Reader reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
int counter = 0;
LOG.debug("Reading from sequence file {}", new Object[]{file});
final OutputStreamWritableCallback writer = new OutputStreamWritableCallback(reader);
Text key = new Text();
try {
while (reader.next(key)) {
String fileName = key.toString();
// the key may be a file name, and may not
if (LOOKS_LIKE_FILENAME.matcher(fileName).matches()) {
if (fileName.contains(File.separator)) {
fileName = StringUtils.substringAfterLast(fileName, File.separator);
}
fileName = fileName + "." + System.nanoTime();
} else {
fileName = inputfileName + ++counter;
}
FlowFile flowFile = session.create();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
try {
flowFile = session.write(flowFile, writer);
flowFiles.add(flowFile);
} catch (ProcessException e) {
LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
session.remove(flowFile);
}
key.clear();
}
} finally {
IOUtils.closeQuietly(reader);
}
return flowFiles;
}
@Test
public void typical() throws IOException {
File input = temp.newFolder("input");
File inputSub2 = new File(input, "sub1/sub2");
inputSub2.mkdirs();
Files.asCharSink(new File(inputSub2, "data"), UTF_8).write("test1");
File listFile = temp.newFile("listFile");
Path pathToListFile = new Path(listFile.toURI());
List<Path> sourceDataLocations = new ArrayList<>();
sourceDataLocations.add(new Path(inputSub2.toURI()));
DistCpOptions options = new DistCpOptions(sourceDataLocations, new Path("dummy"));
CircusTrainCopyListing.setRootPath(conf, new Path(input.toURI()));
CircusTrainCopyListing copyListing = new CircusTrainCopyListing(conf, null);
copyListing.doBuildListing(pathToListFile, options);
try (Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(pathToListFile))) {
Text key = new Text();
CopyListingFileStatus value = new CopyListingFileStatus();
assertTrue(reader.next(key, value));
assertThat(key.toString(), is("/sub1/sub2"));
assertThat(value.getPath().toUri().toString(), endsWith("/input/sub1/sub2"));
assertTrue(reader.next(key, value));
assertThat(key.toString(), is("/sub1/sub2/data"));
assertThat(value.getPath().toUri().toString(), endsWith("/input/sub1/sub2/data"));
assertFalse(reader.next(key, value));
}
}
public static Reader getSeqFileReader(String filename) throws IOException {
// read from local filesystem
Configuration conf = new Configuration();
if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
}
FileSystem fs = FileSystem.get(conf);
LOG.info("Opening SequenceFile " + filename);
return new SequenceFile.Reader(fs, new Path(filename), conf);
}
public static Object getFirstValue(String filename) throws IOException {
Reader r = null;
try {
// read from local filesystem
Configuration conf = new Configuration();
if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
}
FileSystem fs = FileSystem.get(conf);
r = new SequenceFile.Reader(fs, new Path(filename), conf);
Object key = ReflectionUtils.newInstance(r.getKeyClass(), conf);
Object val = ReflectionUtils.newInstance(r.getValueClass(), conf);
LOG.info("Reading value of type " + r.getValueClassName()
+ " from SequenceFile " + filename);
r.next(key);
r.getCurrentValue(val);
LOG.info("Value as string: " + val.toString());
return val;
} finally {
if (null != r) {
try {
r.close();
} catch (IOException ioe) {
LOG.warn("IOException during close: " + ioe.toString());
}
}
}
}
public static void copyTo64MB(String src, String dst) throws IOException {
Configuration hconf = new Configuration();
Path srcPath = new Path(src);
Path dstPath = new Path(dst);
FileSystem fs = FileSystem.get(hconf);
long srcSize = fs.getFileStatus(srcPath).getLen();
int copyTimes = (int) (67108864 / srcSize); // 64 MB
System.out.println("Copy " + copyTimes + " times");
Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
Text value = new Text();
Writer writer = SequenceFile.createWriter(hconf, Writer.file(dstPath), Writer.keyClass(key.getClass()), Writer.valueClass(Text.class), Writer.compression(CompressionType.BLOCK, getLZOCodec(hconf)));
int count = 0;
while (reader.next(key, value)) {
for (int i = 0; i < copyTimes; i++) {
writer.append(key, value);
count++;
}
}
System.out.println("Len: " + writer.getLength());
System.out.println("Rows: " + count);
reader.close();
writer.close();
}
private MyReader openReader(SegmentKey segmentKey) throws IOException {
Path file = getCacheFilePath(segmentKey);
FileSystem fileSystem = _cachePath.getFileSystem(_configuration);
if (!fileSystem.exists(file)) {
createCacheFile(file, segmentKey);
}
Reader reader = new SequenceFile.Reader(_configuration, SequenceFile.Reader.file(file));
return new MyReader(reader);
}
public void map(ImmutableBytesWritable row, Result result,
Context context) throws IOException, InterruptedException {
String yrstr = Bytes.toString(result.getValue(
Constants.hbase_column_family.getBytes(),
Constants.hbase_column_yearrate.getBytes()));
String rltstr = Bytes.toString(result.getValue(
Constants.hbase_column_family.getBytes(),
Constants.hbase_column_repaylimittime.getBytes()));
List<String> list = HdfsHelper
.ls(Constants.hdfs_kmeans_point_output_path);
String clusterid = null;
for (String file : list) {
if (file.contains("_")) {
continue;
}
SequenceFile.Reader reader = new SequenceFile.Reader(
HBaseContext.config, Reader.file(new Path(file)));
IntWritable clusterId = new IntWritable();
WeightedPropertyVectorWritable value = new WeightedPropertyVectorWritable();
while (reader.next(clusterId, value)) {
String yearrate = String.valueOf(value.getVector().get(0));
String repaylimittime = String.valueOf(value.getVector()
.get(1));
if (yrstr.equals(yearrate) && rltstr.equals(repaylimittime)) {
clusterid = clusterId.toString();
break;
}
}
reader.close();
}
key.set(row.get());
value.set(clusterid);
clusterid = null;
context.write(key, value);
}
private int readFile() throws IllegalArgumentException, IOException {
int count = 0;
final FileSystem fs = FileSystem.get(MapReduceTestUtils.getConfiguration());
final FileStatus[] fss =
fs.listStatus(
new Path(
TestUtils.TEMP_DIR
+ File.separator
+ MapReduceTestEnvironment.HDFS_BASE_DIRECTORY
+ "/t1/pairs"));
for (final FileStatus ifs : fss) {
if (ifs.isFile() && ifs.getPath().toString().matches(".*part-r-0000[0-9]")) {
try (SequenceFile.Reader reader =
new SequenceFile.Reader(
MapReduceTestUtils.getConfiguration(),
Reader.file(ifs.getPath()))) {
final Text key = new Text();
final Text val = new Text();
while (reader.next(key, val)) {
count++;
}
}
}
}
return count;
}
@Test
public void testGetSplits() throws Exception {
final WritableValueInputFormat<Text> inputFormat = spy(new WritableValueInputFormat<Text>());
final Reader reader = mock(Reader.class);
doReturn(reader).when(inputFormat).getReader(conf, path);
doReturn(fs).when(inputFormat).getFileSystem(conf);
when(fs.makeQualified(any(Path.class))).thenReturn(path);
when(fs.getFileStatus(path)).thenReturn(new FileStatus(100, false, 1, 10, 1, path));
conf.set(WritableValueInputFormat.INPUT_FILE_LOCATION_CONF, "/tmp/input_file");
conf.setInt(WritableValueInputFormat.INPUTS_PER_SPLIT_CONF, 2);
conf.setClass(WritableValueInputFormat.VALUE_TYPE_CONF, Text.class, Writable.class);
final JobContext jobCtx = mock(JobContext.class);
when(jobCtx.getConfiguration()).thenReturn(conf);
// 3 inputs
when(reader.next(any(NullWritable.class), any(Text.class))).thenReturn(true).thenReturn(true).thenReturn(true)
.thenReturn(false);
// getPosition gets called after each for loop invocation, so only need two of these
when(reader.getPosition()).thenReturn(30l).thenReturn(60l);
final List<InputSplit> result = inputFormat.getSplits(jobCtx);
assertEquals(2, result.size());
final FileSplit fileSplit1 = (FileSplit) result.get(0);
assertEquals(0, fileSplit1.getStart());
assertEquals(30, fileSplit1.getLength());
assertEquals(path, fileSplit1.getPath());
final FileSplit fileSplit2 = (FileSplit) result.get(1);
assertEquals(30, fileSplit2.getStart());
assertEquals(70, fileSplit2.getLength());
assertEquals(path, fileSplit2.getPath());
}
@Override
public Set<FlowFile> readSequenceFile(final Path file, Configuration configuration, FileSystem fileSystem) throws IOException {
Set<FlowFile> flowFiles = new HashSet<>();
final SequenceFile.Reader reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
int counter = 0;
LOG.debug("Reading from sequence file {}", new Object[]{file});
final OutputStreamWritableCallback writer = new OutputStreamWritableCallback(reader);
Text key = new Text();
try {
while (reader.next(key)) {
String fileName = key.toString();
// the key may be a file name, and may not
if (LOOKS_LIKE_FILENAME.matcher(fileName).matches()) {
if (fileName.contains(File.separator)) {
fileName = StringUtils.substringAfterLast(fileName, File.separator);
}
fileName = fileName + "." + System.nanoTime();
} else {
fileName = inputfileName + ++counter;
}
FlowFile flowFile = session.create();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
try {
flowFile = session.write(flowFile, writer);
flowFiles.add(flowFile);
} catch (ProcessException e) {
LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
session.remove(flowFile);
}
key.clear();
}
} finally {
IOUtils.closeQuietly(reader);
}
return flowFiles;
}
public ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
this.filePath = filePath;
// The writer does not flush the length during hflush. Using length options lets us read
// past length in the FileStatus but it will throw EOFException during a read instead
// of returning null.
this.reader = new Reader(conf, Reader.file(filePath), Reader.length(Long.MAX_VALUE));
this.writable = new ProtoMessageWritable<>(parser);
}
SeqRowReader(Configuration hconf, String path) throws IOException {
reader = new Reader(hconf, SequenceFile.Reader.file(new Path(path)));
key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
value = new Text();
}
private OutputStreamWritableCallback(Reader reader) {
this.reader = reader;
}
@Override
public Set<FlowFile> readSequenceFile(Path file, Configuration configuration, FileSystem fileSystem) throws IOException {
final SequenceFile.Reader reader;
Set<FlowFile> flowFiles = new HashSet<>();
reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
final Text key = new Text();
final KeyValueWriterCallback callback = new KeyValueWriterCallback(reader);
final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
int counter = 0;
LOG.debug("Read from SequenceFile: {} ", new Object[]{file});
try {
while (reader.next(key)) {
String fileName = key.toString();
// the key may be a file name, and may not
if (LOOKS_LIKE_FILENAME.matcher(fileName).matches()) {
if (fileName.contains(File.separator)) {
fileName = StringUtils.substringAfterLast(fileName, File.separator);
}
fileName = fileName + "." + System.nanoTime();
} else {
fileName = inputfileName + ++counter;
}
FlowFile flowFile = session.create();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
callback.key = key;
try {
flowFile = session.write(flowFile, callback);
flowFiles.add(flowFile);
} catch (ProcessException e) {
LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
session.remove(flowFile);
}
key.clear();
}
} finally {
IOUtils.closeQuietly(reader);
}
return flowFiles;
}