

源代码1 项目: kylin-on-parquet-v2   文件: CubeStatsReader.java
public CubeStatsResult(Path path, int precision) throws IOException {
    Configuration hadoopConf = HadoopUtil.getCurrentConfiguration();
    Option seqInput = SequenceFile.Reader.file(path);
    try (Reader reader = new SequenceFile.Reader(hadoopConf, seqInput)) {
        LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf);
        BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), hadoopConf);
        while (reader.next(key, value)) {
            if (key.get() == 0L) {
                percentage = Bytes.toInt(value.getBytes());
            } else if (key.get() == -1) {
                mapperOverlapRatio = Bytes.toDouble(value.getBytes());
            } else if (key.get() == -2) {
                mapperNumber = Bytes.toInt(value.getBytes());
            } else if (key.get() == -3) {
                sourceRecordCount = Bytes.toLong(value.getBytes());
            } else if (key.get() > 0) {
                HLLCounter hll = new HLLCounter(precision);
                ByteArray byteArray = new ByteArray(value.getBytes());
                counterMap.put(key.get(), hll);
@Ignore("convenient trial tool for dev")
public void test() throws IOException, InterruptedException {
    Configuration hconf = HadoopUtil.getCurrentConfiguration();
    HiveToBaseCuboidMapper mapper = new HiveToBaseCuboidMapper();
    Context context = MockupMapContext.create(hconf, metadataUrl, cubeName, null);


    Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
    Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
    Text value = new Text();

    while (reader.next(key, value)) {
        mapper.map(key, value, context);

源代码3 项目: hadoop   文件: DistCpV1.java
static private void finalize(Configuration conf, JobConf jobconf,
    final Path destPath, String presevedAttributes) throws IOException {
  if (presevedAttributes == null) {
  EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
  if (!preseved.contains(FileAttribute.USER)
      && !preseved.contains(FileAttribute.GROUP)
      && !preseved.contains(FileAttribute.PERMISSION)) {

  FileSystem dstfs = destPath.getFileSystem(conf);
  Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
  try (SequenceFile.Reader in =
      new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) {
    Text dsttext = new Text();
    FilePair pair = new FilePair(); 
    for(; in.next(dsttext, pair); ) {
      Path absdst = new Path(destPath, pair.output);
      updateDestStatus(pair.input, dstfs.getFileStatus(absdst),
          preseved, dstfs);
源代码4 项目: hadoop   文件: TestSequenceFileSerialization.java
public void testJavaSerialization() throws Exception {
  Path file = new Path(System.getProperty("test.build.data",".") +
  fs.delete(file, true);
  Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
  writer.append(1L, "one");
  writer.append(2L, "two");
  Reader reader = new Reader(fs, file, conf);
  assertEquals(1L, reader.next((Object) null));
  assertEquals("one", reader.getCurrentValue((Object) null));
  assertEquals(2L, reader.next((Object) null));
  assertEquals("two", reader.getCurrentValue((Object) null));
  assertNull(reader.next((Object) null));
源代码5 项目: big-c   文件: DistCpV1.java
static private void finalize(Configuration conf, JobConf jobconf,
    final Path destPath, String presevedAttributes) throws IOException {
  if (presevedAttributes == null) {
  EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
  if (!preseved.contains(FileAttribute.USER)
      && !preseved.contains(FileAttribute.GROUP)
      && !preseved.contains(FileAttribute.PERMISSION)) {

  FileSystem dstfs = destPath.getFileSystem(conf);
  Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
  try (SequenceFile.Reader in =
      new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) {
    Text dsttext = new Text();
    FilePair pair = new FilePair(); 
    for(; in.next(dsttext, pair); ) {
      Path absdst = new Path(destPath, pair.output);
      updateDestStatus(pair.input, dstfs.getFileStatus(absdst),
          preseved, dstfs);
源代码6 项目: big-c   文件: TestSequenceFileSerialization.java
public void testJavaSerialization() throws Exception {
  Path file = new Path(System.getProperty("test.build.data",".") +
  fs.delete(file, true);
  Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
  writer.append(1L, "one");
  writer.append(2L, "two");
  Reader reader = new Reader(fs, file, conf);
  assertEquals(1L, reader.next((Object) null));
  assertEquals("one", reader.getCurrentValue((Object) null));
  assertEquals(2L, reader.next((Object) null));
  assertEquals("two", reader.getCurrentValue((Object) null));
  assertNull(reader.next((Object) null));
源代码7 项目: big-data-lite   文件: SequenceFileRead.java
public static void main(String[] args) throws IOException {
String uri = args[0];
       String split = args[1];
Configuration conf = new Configuration();
Path path = new Path(uri);
SequenceFile.Reader reader = null;
try {		
reader = new SequenceFile.Reader(conf, Reader.file(path));
       Text key = new Text();
       OrdImageWritable value = new OrdImageWritable();

       int num = 0;

while (reader.next(key, value)) {
           System.out.println(key.toString() + " " + value.getByteLength());
           ImageIO.write(value.getImage(), "jpg", new File("image" +split+"_" + num++ + ".jpg"));
} finally {
源代码8 项目: kylin   文件: CubeStatsReader.java
public CubeStatsResult(Path path, int precision) throws IOException {
    Configuration hadoopConf = HadoopUtil.getCurrentConfiguration();
    Option seqInput = SequenceFile.Reader.file(path);
    try (Reader reader = new SequenceFile.Reader(hadoopConf, seqInput)) {
        LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf);
        BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), hadoopConf);
        while (reader.next(key, value)) {
            if (key.get() == 0L) {
                percentage = Bytes.toInt(value.getBytes());
            } else if (key.get() == -1) {
                mapperOverlapRatio = Bytes.toDouble(value.getBytes());
            } else if (key.get() == -2) {
                mapperNumber = Bytes.toInt(value.getBytes());
            } else if (key.get() == -3) {
                sourceRecordCount = Bytes.toLong(value.getBytes());
            } else if (key.get() > 0) {
                HLLCounter hll = new HLLCounter(precision);
                ByteArray byteArray = new ByteArray(value.getBytes());
                counterMap.put(key.get(), hll);
@Ignore("convenient trial tool for dev")
public void test() throws IOException, InterruptedException {
    Configuration hconf = HadoopUtil.getCurrentConfiguration();
    HiveToBaseCuboidMapper mapper = new HiveToBaseCuboidMapper();
    Context context = MockupMapContext.create(hconf, metadataUrl, cubeName, null);


    Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
    Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
    Text value = new Text();

    while (reader.next(key, value)) {
        mapper.map(key, value, context);

源代码10 项目: Kylin   文件: BaseCuboidMapperPerformanceTest.java
@Ignore("convenient trial tool for dev")
public void test() throws IOException, InterruptedException {
    Configuration hconf = new Configuration();
    BaseCuboidMapper mapper = new BaseCuboidMapper();
    Context context = MockupMapContext.create(hconf, metadataUrl, cubeName, null);


    Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
    Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
    Text value = new Text();

    while (reader.next(key, value)) {
        mapper.map(key, value, context);

private void loadGenerations() throws IOException {
  FileSystem fileSystem = _path.getFileSystem(_configuration);
  FileStatus[] listStatus = fileSystem.listStatus(_path);
  SortedSet<FileStatus> existing = new TreeSet<FileStatus>(Arrays.asList(listStatus));
  if (existing.isEmpty()) {
  FileStatus last = existing.last();
  Reader reader = new SequenceFile.Reader(fileSystem, last.getPath(), _configuration);
  Text key = new Text();
  LongWritable value = new LongWritable();
  while (reader.next(key, value)) {
    String name = key.toString();
    long gen = value.get();
    _namesToGenerations.put(name, gen);
    Set<String> names = _generationsToNames.get(gen);
    if (names == null) {
      names = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
      _generationsToNames.put(gen, names);
  cleanupOldFiles(fileSystem, existing);
private void walkOutput(Path output, Configuration conf, ResultReader resultReader) throws IOException {
  FileSystem fileSystem = output.getFileSystem(conf);
  FileStatus fileStatus = fileSystem.getFileStatus(output);
  if (fileStatus.isDir()) {
    FileStatus[] listStatus = fileSystem.listStatus(output, new PathFilter() {
      public boolean accept(Path path) {
        return !path.getName().startsWith("_");
    for (FileStatus fs : listStatus) {
      walkOutput(fs.getPath(), conf, resultReader);
  } else {
    Reader reader = new SequenceFile.Reader(fileSystem, output, conf);
    Text rowId = new Text();
    TableBlurRecord tableBlurRecord = new TableBlurRecord();
    while (reader.next(rowId, tableBlurRecord)) {
      resultReader.read(rowId, tableBlurRecord);
源代码13 项目: mrgeo   文件: SequenceFileReaderBuilder.java
public Reader build() throws IOException

  when(sequenceFileReader.next(any(Writable.class), any(Writable.class))).thenAnswer(new Answer<Boolean>()
    public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable
      // Get the key and value
      Object[] args = invocationOnMock.getArguments();
      Writable key = (Writable) args[0];
      Writable value = (Writable) args[1];
      return keyValueHelper.next(key, value);

  return sequenceFileReader;
源代码14 项目: RDFS   文件: TestSequenceFileSerialization.java
public void testJavaSerialization() throws Exception {
  Path file = new Path(System.getProperty("test.build.data",".") +
  fs.delete(file, true);
  Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
  writer.append(1L, "one");
  writer.append(2L, "two");
  Reader reader = new Reader(fs, file, conf);
  assertEquals(1L, reader.next((Object) null));
  assertEquals("one", reader.getCurrentValue((Object) null));
  assertEquals(2L, reader.next((Object) null));
  assertEquals("two", reader.getCurrentValue((Object) null));
  assertNull(reader.next((Object) null));
源代码15 项目: hadoop-gpu   文件: TestSequenceFileSerialization.java
public void testJavaSerialization() throws Exception {
  Path file = new Path(System.getProperty("test.build.data",".") +
  fs.delete(file, true);
  Writer writer = SequenceFile.createWriter(fs, conf, file, Long.class,
  writer.append(1L, "one");
  writer.append(2L, "two");
  Reader reader = new Reader(fs, file, conf);
  assertEquals(1L, reader.next((Object) null));
  assertEquals("one", reader.getCurrentValue((Object) null));
  assertEquals(2L, reader.next((Object) null));
  assertEquals("two", reader.getCurrentValue((Object) null));
  assertNull(reader.next((Object) null));
源代码16 项目: alchemy   文件: SequenceFileUtil.java
public static void readSequenceFile(String path) throws Exception{
	Reader.Option filePath = Reader.file(new Path(path));//指定文件路径
	Reader sqReader = new Reader(configuration,filePath);//构造read而对象
	Writable key = (Writable)ReflectionUtils.newInstance(sqReader.getKeyClass(), configuration);
	Writable value = (Writable)ReflectionUtils.newInstance(sqReader.getValueClass(), configuration);
	while(sqReader.next(key, value)){
源代码17 项目: localization_nifi   文件: ValueReader.java
public Set<FlowFile> readSequenceFile(final Path file, Configuration configuration, FileSystem fileSystem) throws IOException {

    Set<FlowFile> flowFiles = new HashSet<>();
    final SequenceFile.Reader reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
    final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
    int counter = 0;
    LOG.debug("Reading from sequence file {}", new Object[]{file});
    final OutputStreamWritableCallback writer = new OutputStreamWritableCallback(reader);
    Text key = new Text();
    try {
        while (reader.next(key)) {
            String fileName = key.toString();
            // the key may be a file name, and may not
            if (LOOKS_LIKE_FILENAME.matcher(fileName).matches()) {
                if (fileName.contains(File.separator)) {
                    fileName = StringUtils.substringAfterLast(fileName, File.separator);
                fileName = fileName + "." + System.nanoTime();
            } else {
                fileName = inputfileName + ++counter;
            FlowFile flowFile = session.create();
            flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
            try {
                flowFile = session.write(flowFile, writer);
            } catch (ProcessException e) {
                LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
    } finally {

    return flowFiles;
源代码18 项目: circus-train   文件: CircusTrainCopyListingTest.java
public void typical() throws IOException {
  File input = temp.newFolder("input");
  File inputSub2 = new File(input, "sub1/sub2");
  Files.asCharSink(new File(inputSub2, "data"), UTF_8).write("test1");

  File listFile = temp.newFile("listFile");
  Path pathToListFile = new Path(listFile.toURI());

  List<Path> sourceDataLocations = new ArrayList<>();
  sourceDataLocations.add(new Path(inputSub2.toURI()));
  DistCpOptions options = new DistCpOptions(sourceDataLocations, new Path("dummy"));

  CircusTrainCopyListing.setRootPath(conf, new Path(input.toURI()));
  CircusTrainCopyListing copyListing = new CircusTrainCopyListing(conf, null);
  copyListing.doBuildListing(pathToListFile, options);

  try (Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(pathToListFile))) {
    Text key = new Text();
    CopyListingFileStatus value = new CopyListingFileStatus();

    assertTrue(reader.next(key, value));
    assertThat(key.toString(), is("/sub1/sub2"));
    assertThat(value.getPath().toUri().toString(), endsWith("/input/sub1/sub2"));

    assertTrue(reader.next(key, value));
    assertThat(key.toString(), is("/sub1/sub2/data"));
    assertThat(value.getPath().toUri().toString(), endsWith("/input/sub1/sub2/data"));

    assertFalse(reader.next(key, value));

public static Reader getSeqFileReader(String filename) throws IOException {
  // read from local filesystem
  Configuration conf = new Configuration();
  if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
    conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
  FileSystem fs = FileSystem.get(conf);
  LOG.info("Opening SequenceFile " + filename);
  return new SequenceFile.Reader(fs, new Path(filename), conf);
public static Object getFirstValue(String filename) throws IOException {
  Reader r = null;
  try {
    // read from local filesystem
    Configuration conf = new Configuration();
    if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
      conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
    FileSystem fs = FileSystem.get(conf);
    r = new SequenceFile.Reader(fs, new Path(filename), conf);
    Object key = ReflectionUtils.newInstance(r.getKeyClass(), conf);
    Object val = ReflectionUtils.newInstance(r.getValueClass(), conf);
    LOG.info("Reading value of type " + r.getValueClassName()
        + " from SequenceFile " + filename);
    LOG.info("Value as string: " + val.toString());
    return val;
  } finally {
    if (null != r) {
      try {
      } catch (IOException ioe) {
        LOG.warn("IOException during close: " + ioe.toString());
源代码21 项目: Kylin   文件: CopySeq.java
public static void copyTo64MB(String src, String dst) throws IOException {
    Configuration hconf = new Configuration();
    Path srcPath = new Path(src);
    Path dstPath = new Path(dst);

    FileSystem fs = FileSystem.get(hconf);
    long srcSize = fs.getFileStatus(srcPath).getLen();
    int copyTimes = (int) (67108864 / srcSize); // 64 MB
    System.out.println("Copy " + copyTimes + " times");

    Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
    Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
    Text value = new Text();

    Writer writer = SequenceFile.createWriter(hconf, Writer.file(dstPath), Writer.keyClass(key.getClass()), Writer.valueClass(Text.class), Writer.compression(CompressionType.BLOCK, getLZOCodec(hconf)));

    int count = 0;
    while (reader.next(key, value)) {
        for (int i = 0; i < copyTimes; i++) {
            writer.append(key, value);

    System.out.println("Len: " + writer.getLength());
    System.out.println("Rows: " + count);

private MyReader openReader(SegmentKey segmentKey) throws IOException {
  Path file = getCacheFilePath(segmentKey);
  FileSystem fileSystem = _cachePath.getFileSystem(_configuration);
  if (!fileSystem.exists(file)) {
    createCacheFile(file, segmentKey);
  Reader reader = new SequenceFile.Reader(_configuration, SequenceFile.Reader.file(file));
  return new MyReader(reader);
源代码23 项目: recsys-offline   文件: UpdateClusterJob.java
public void map(ImmutableBytesWritable row, Result result,
		Context context) throws IOException, InterruptedException {
	String yrstr = Bytes.toString(result.getValue(
	String rltstr = Bytes.toString(result.getValue(

	List<String> list = HdfsHelper
	String clusterid = null;
	for (String file : list) {
		if (file.contains("_")) {
		SequenceFile.Reader reader = new SequenceFile.Reader(
				HBaseContext.config, Reader.file(new Path(file)));
		IntWritable clusterId = new IntWritable();
		WeightedPropertyVectorWritable value = new WeightedPropertyVectorWritable();
		while (reader.next(clusterId, value)) {
			String yearrate = String.valueOf(value.getVector().get(0));
			String repaylimittime = String.valueOf(value.getVector()
			if (yrstr.equals(yearrate) && rltstr.equals(repaylimittime)) {
				clusterid = clusterId.toString();


	clusterid = null;
	context.write(key, value);
源代码24 项目: geowave   文件: GeoWaveNNIT.java
private int readFile() throws IllegalArgumentException, IOException {
  int count = 0;
  final FileSystem fs = FileSystem.get(MapReduceTestUtils.getConfiguration());
  final FileStatus[] fss =
          new Path(
                  + File.separator
                  + MapReduceTestEnvironment.HDFS_BASE_DIRECTORY
                  + "/t1/pairs"));
  for (final FileStatus ifs : fss) {
    if (ifs.isFile() && ifs.getPath().toString().matches(".*part-r-0000[0-9]")) {
      try (SequenceFile.Reader reader =
          new SequenceFile.Reader(
              Reader.file(ifs.getPath()))) {

        final Text key = new Text();
        final Text val = new Text();

        while (reader.next(key, val)) {
  return count;
源代码25 项目: kangaroo   文件: WritableValueInputFormatTest.java
public void testGetSplits() throws Exception {
    final WritableValueInputFormat<Text> inputFormat = spy(new WritableValueInputFormat<Text>());

    final Reader reader = mock(Reader.class);
    doReturn(reader).when(inputFormat).getReader(conf, path);
    when(fs.getFileStatus(path)).thenReturn(new FileStatus(100, false, 1, 10, 1, path));

    conf.set(WritableValueInputFormat.INPUT_FILE_LOCATION_CONF, "/tmp/input_file");
    conf.setInt(WritableValueInputFormat.INPUTS_PER_SPLIT_CONF, 2);
    conf.setClass(WritableValueInputFormat.VALUE_TYPE_CONF, Text.class, Writable.class);

    final JobContext jobCtx = mock(JobContext.class);

    // 3 inputs
    when(reader.next(any(NullWritable.class), any(Text.class))).thenReturn(true).thenReturn(true).thenReturn(true)

    // getPosition gets called after each for loop invocation, so only need two of these

    final List<InputSplit> result = inputFormat.getSplits(jobCtx);

    assertEquals(2, result.size());
    final FileSplit fileSplit1 = (FileSplit) result.get(0);
    assertEquals(0, fileSplit1.getStart());
    assertEquals(30, fileSplit1.getLength());
    assertEquals(path, fileSplit1.getPath());

    final FileSplit fileSplit2 = (FileSplit) result.get(1);
    assertEquals(30, fileSplit2.getStart());
    assertEquals(70, fileSplit2.getLength());
    assertEquals(path, fileSplit2.getPath());
源代码26 项目: nifi   文件: ValueReader.java
public Set<FlowFile> readSequenceFile(final Path file, Configuration configuration, FileSystem fileSystem) throws IOException {

    Set<FlowFile> flowFiles = new HashSet<>();
    final SequenceFile.Reader reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
    final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
    int counter = 0;
    LOG.debug("Reading from sequence file {}", new Object[]{file});
    final OutputStreamWritableCallback writer = new OutputStreamWritableCallback(reader);
    Text key = new Text();
    try {
        while (reader.next(key)) {
            String fileName = key.toString();
            // the key may be a file name, and may not
            if (LOOKS_LIKE_FILENAME.matcher(fileName).matches()) {
                if (fileName.contains(File.separator)) {
                    fileName = StringUtils.substringAfterLast(fileName, File.separator);
                fileName = fileName + "." + System.nanoTime();
            } else {
                fileName = inputfileName + ++counter;
            FlowFile flowFile = session.create();
            flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
            try {
                flowFile = session.write(flowFile, writer);
            } catch (ProcessException e) {
                LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
    } finally {

    return flowFiles;
源代码27 项目: tez   文件: ProtoMessageReader.java
public ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
  this.filePath = filePath;
  // The writer does not flush the length during hflush. Using length options lets us read
  // past length in the FileStatus but it will throw EOFException during a read instead
  // of returning null.
  this.reader = new Reader(conf, Reader.file(filePath), Reader.length(Long.MAX_VALUE));
  this.writable = new ProtoMessageWritable<>(parser);
源代码28 项目: kylin-on-parquet-v2   文件: DFSFileTableReader.java
SeqRowReader(Configuration hconf, String path) throws IOException {
    reader = new Reader(hconf, SequenceFile.Reader.file(new Path(path)));
    key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
    value = new Text();
源代码29 项目: localization_nifi   文件: ValueReader.java
private OutputStreamWritableCallback(Reader reader) {
    this.reader = reader;
源代码30 项目: localization_nifi   文件: KeyValueReader.java
public Set<FlowFile> readSequenceFile(Path file, Configuration configuration, FileSystem fileSystem) throws IOException {

    final SequenceFile.Reader reader;

    Set<FlowFile> flowFiles = new HashSet<>();
    reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
    final Text key = new Text();
    final KeyValueWriterCallback callback = new KeyValueWriterCallback(reader);
    final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
    int counter = 0;
    LOG.debug("Read from SequenceFile: {} ", new Object[]{file});
    try {
        while (reader.next(key)) {
            String fileName = key.toString();
            // the key may be a file name, and may not
            if (LOOKS_LIKE_FILENAME.matcher(fileName).matches()) {
                if (fileName.contains(File.separator)) {
                    fileName = StringUtils.substringAfterLast(fileName, File.separator);
                fileName = fileName + "." + System.nanoTime();
            } else {
                fileName = inputfileName + ++counter;

            FlowFile flowFile = session.create();
            flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
            callback.key = key;
            try {
                flowFile = session.write(flowFile, callback);
            } catch (ProcessException e) {
                LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
    } finally {

    return flowFiles;