org.apache.hadoop.mapred.RecordReader#createValue ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.RecordReader#createValue ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hudi   文件: TestHoodieParquetInputFormat.java
private void ensureRecordsInCommit(String msg, String commit, int expectedNumberOfRecordsInCommit,
    int totalExpected) throws IOException {
  int actualCount = 0;
  int totalCount = 0;
  InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
  for (InputSplit split : splits) {
    RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat.getRecordReader(split, jobConf, null);
    NullWritable key = recordReader.createKey();
    ArrayWritable writable = recordReader.createValue();

    while (recordReader.next(key, writable)) {
      // writable returns an array with [field1, field2, _hoodie_commit_time,
      // _hoodie_commit_seqno]
      // Take the commit time and compare with the one we are interested in
      if (commit.equals((writable.get()[2]).toString())) {
        actualCount++;
      }
      totalCount++;
    }
  }
  assertEquals(expectedNumberOfRecordsInCommit, actualCount, msg);
  assertEquals(totalExpected, totalCount, msg);
}
 
源代码2 项目: hadoop   文件: InputSampler.java
/**
 * From each split sampled, take the first numSamples / numSplits records.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  int samplesPerSplit = numSamples / splitsToSample;
  long records = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey();
      ++records;
      if ((i+1) * samplesPerSplit <= records) {
        break;
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码3 项目: hadoop   文件: InputSampler.java
/**
 * For each split sampled, emit when the ratio of the number of records
 * retained to the total record count is less than the specified
 * frequency.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>();
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  long records = 0;
  long kept = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      ++records;
      if ((double) kept / records < freq) {
        ++kept;
        samples.add(key);
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码4 项目: big-c   文件: InputSampler.java
/**
 * From each split sampled, take the first numSamples / numSplits records.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  int samplesPerSplit = numSamples / splitsToSample;
  long records = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey();
      ++records;
      if ((i+1) * samplesPerSplit <= records) {
        break;
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码5 项目: big-c   文件: InputSampler.java
/**
 * For each split sampled, emit when the ratio of the number of records
 * retained to the total record count is less than the specified
 * frequency.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>();
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  long records = 0;
  long kept = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      ++records;
      if ((double) kept / records < freq) {
        ++kept;
        samples.add(key);
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码6 项目: mnemonic   文件: MneMapredPersonDataTest.java
@Test(enabled = true, dependsOnMethods = {"testWritePersonData"})
public void testReadPersonData() throws Exception {
  long sumage = 0L;
  long reccnt = 0L;
  File folder = new File(m_workdir.toString());
  File[] listfiles = folder.listFiles();
  for (int idx = 0; idx < listfiles.length; ++idx) {
    if (listfiles[idx].isFile()
            && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
            && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
      System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
      FileSplit split = new FileSplit(
              new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
      InputFormat<NullWritable, MneDurableInputValue<Person<Long>>> inputFormat =
              new MneInputFormat<MneDurableInputValue<Person<Long>>, Person<Long>>();
      RecordReader<NullWritable, MneDurableInputValue<Person<Long>>> reader =
              inputFormat.getRecordReader(split, m_conf, null);
      MneDurableInputValue<Person<Long>> personval = null;
      NullWritable personkey = reader.createKey();
      while (true) {
        personval = reader.createValue();
        if (reader.next(personkey, personval)) {
          AssertJUnit.assertTrue(personval.getValue().getAge() < 51);
          sumage += personval.getValue().getAge();
          ++reccnt;
        } else {
          break;
        }
      }
      reader.close();
    }
  }
  AssertJUnit.assertEquals(m_reccnt, reccnt);
  AssertJUnit.assertEquals(m_sumage, sumage);
  System.out.println(String.format("The checksum of ages is %d", sumage));
}
 
源代码7 项目: hadoop   文件: DumpTypedBytes.java
/**
 * Dump given list of files to standard output as typed bytes.
 */
@SuppressWarnings("unchecked")
private int dumpTypedBytes(List<FileStatus> files) throws IOException {
  JobConf job = new JobConf(getConf()); 
  DataOutputStream dout = new DataOutputStream(System.out);
  AutoInputFormat autoInputFormat = new AutoInputFormat();
  for (FileStatus fileStatus : files) {
    FileSplit split = new FileSplit(fileStatus.getPath(), 0,
      fileStatus.getLen() * fileStatus.getBlockSize(),
      (String[]) null);
    RecordReader recReader = null;
    try {
      recReader = autoInputFormat.getRecordReader(split, job, Reporter.NULL);
      Object key = recReader.createKey();
      Object value = recReader.createValue();
      while (recReader.next(key, value)) {
        if (key instanceof Writable) {
          TypedBytesWritableOutput.get(dout).write((Writable) key);
        } else {
          TypedBytesOutput.get(dout).write(key);
        }
        if (value instanceof Writable) {
          TypedBytesWritableOutput.get(dout).write((Writable) value);
        } else {
          TypedBytesOutput.get(dout).write(value);
        }
      }
    } finally {
      if (recReader != null) {
        recReader.close();
      }
    }
  }
  dout.flush();
  return 0;
}
 
源代码8 项目: big-c   文件: DumpTypedBytes.java
/**
 * Dump given list of files to standard output as typed bytes.
 */
@SuppressWarnings("unchecked")
private int dumpTypedBytes(List<FileStatus> files) throws IOException {
  JobConf job = new JobConf(getConf()); 
  DataOutputStream dout = new DataOutputStream(System.out);
  AutoInputFormat autoInputFormat = new AutoInputFormat();
  for (FileStatus fileStatus : files) {
    FileSplit split = new FileSplit(fileStatus.getPath(), 0,
      fileStatus.getLen() * fileStatus.getBlockSize(),
      (String[]) null);
    RecordReader recReader = null;
    try {
      recReader = autoInputFormat.getRecordReader(split, job, Reporter.NULL);
      Object key = recReader.createKey();
      Object value = recReader.createValue();
      while (recReader.next(key, value)) {
        if (key instanceof Writable) {
          TypedBytesWritableOutput.get(dout).write((Writable) key);
        } else {
          TypedBytesOutput.get(dout).write(key);
        }
        if (value instanceof Writable) {
          TypedBytesWritableOutput.get(dout).write((Writable) value);
        } else {
          TypedBytesOutput.get(dout).write(value);
        }
      }
    } finally {
      if (recReader != null) {
        recReader.close();
      }
    }
  }
  dout.flush();
  return 0;
}
 
源代码9 项目: mnemonic   文件: MneMapredLongDataTest.java
@Test(enabled = true, dependsOnMethods = {"testWriteLongData"})
public void testReadLongData() throws Exception {
  long sum = 0L;
  long reccnt = 0L;
  File folder = new File(m_workdir.toString());
  File[] listfiles = folder.listFiles();
  for (int idx = 0; idx < listfiles.length; ++idx) {
    if (listfiles[idx].isFile()
            && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
            && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
      System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
      FileSplit split = new FileSplit(
              new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
      InputFormat<NullWritable, MneDurableInputValue<Long>> inputFormat =
              new MneInputFormat<MneDurableInputValue<Long>, Long>();
      RecordReader<NullWritable, MneDurableInputValue<Long>> reader =
              inputFormat.getRecordReader(split, m_conf, null);
      MneDurableInputValue<Long> mdval = null;
      NullWritable mdkey = reader.createKey();
      while (true) {
        mdval = reader.createValue();
        if (reader.next(mdkey, mdval)) {
          sum += mdval.getValue();
          ++reccnt;
        } else {
          break;
        }
      }
      reader.close();
    }
  }
  AssertJUnit.assertEquals(m_sum, sum);
  AssertJUnit.assertEquals(m_reccnt, reccnt);
  System.out.println(String.format("The checksum of long data is %d", sum));
}
 
源代码10 项目: hadoop   文件: PipesMapRunner.java
/**
 * Run the map task.
 * @param input the set of inputs
 * @param output the object to collect the outputs of the map
 * @param reporter the object to update with status
 */
@SuppressWarnings("unchecked")
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                Reporter reporter) throws IOException {
  Application<K1, V1, K2, V2> application = null;
  try {
    RecordReader<FloatWritable, NullWritable> fakeInput = 
      (!Submitter.getIsJavaRecordReader(job) && 
       !Submitter.getIsJavaMapper(job)) ? 
 (RecordReader<FloatWritable, NullWritable>) input : null;
    application = new Application<K1, V1, K2, V2>(job, fakeInput, output, 
                                                  reporter,
        (Class<? extends K2>) job.getOutputKeyClass(), 
        (Class<? extends V2>) job.getOutputValueClass());
  } catch (InterruptedException ie) {
    throw new RuntimeException("interrupted", ie);
  }
  DownwardProtocol<K1, V1> downlink = application.getDownlink();
  boolean isJavaInput = Submitter.getIsJavaRecordReader(job);
  downlink.runMap(reporter.getInputSplit(), 
                  job.getNumReduceTasks(), isJavaInput);
  boolean skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
  try {
    if (isJavaInput) {
      // allocate key & value instances that are re-used for all entries
      K1 key = input.createKey();
      V1 value = input.createValue();
      downlink.setInputTypes(key.getClass().getName(),
                             value.getClass().getName());
      
      while (input.next(key, value)) {
        // map pair to output
        downlink.mapItem(key, value);
        if(skipping) {
          //flush the streams on every record input if running in skip mode
          //so that we don't buffer other records surrounding a bad record.
          downlink.flush();
        }
      }
      downlink.endOfInput();
    }
    application.waitForFinish();
  } catch (Throwable t) {
    application.abort(t);
  } finally {
    application.cleanup();
  }
}
 
源代码11 项目: hadoop   文件: InputSampler.java
/**
 * Randomize the split order, then take the specified number of keys from
 * each split sampled, where each key is selected with the specified
 * probability and possibly replaced by a subsequently selected key when
 * the quota of keys from that split is satisfied.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);

  Random r = new Random();
  long seed = r.nextLong();
  r.setSeed(seed);
  LOG.debug("seed: " + seed);
  // shuffle splits
  for (int i = 0; i < splits.length; ++i) {
    InputSplit tmp = splits[i];
    int j = r.nextInt(splits.length);
    splits[i] = splits[j];
    splits[j] = tmp;
  }
  // our target rate is in terms of the maximum number of sample splits,
  // but we accept the possibility of sampling additional splits to hit
  // the target sample keyset
  for (int i = 0; i < splitsToSample ||
                 (i < splits.length && samples.size() < numSamples); ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
        Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      if (r.nextDouble() <= freq) {
        if (samples.size() < numSamples) {
          samples.add(key);
        } else {
          // When exceeding the maximum number of samples, replace a
          // random element with this one, then adjust the frequency
          // to reflect the possibility of existing elements being
          // pushed out
          int ind = r.nextInt(numSamples);
          if (ind != numSamples) {
            samples.set(ind, key);
          }
          freq *= (numSamples - 1) / (double) numSamples;
        }
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码12 项目: gemfirexd-oss   文件: GFXDHiveInputFormatTest.java
public void testHiveInputFormat() throws Exception {
  getConnection();
  Connection conn = startNetserverAndGetLocalNetConnection();
  
  Statement st = conn.createStatement();
  st.execute("create hdfsstore myhdfs namenode 'localhost' homedir '" + HDFS_DIR + "' batchtimeinterval 5000 milliseconds");
  st.execute("create table app.mytab1 (col1 int primary key, col2 varchar(100)) persistent hdfsstore (myhdfs) BUCKETS 1");

  PreparedStatement ps = conn.prepareStatement("insert into mytab1 values (?, ?)");
  int NUM_ENTRIES = 20;
  for(int i = 0; i < NUM_ENTRIES; i++) {
    ps.setInt(1, i);
    ps.setString(2, "Value-" + System.nanoTime());
    ps.execute();
  }
  //Wait for data to get to HDFS...
  String qname = HDFSStoreFactoryImpl.getEventQueueName("/APP/MYTAB1");
  st.execute("CALL SYS.WAIT_FOR_SENDER_QUEUE_FLUSH('" + qname + "', 1, 0)");
  
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(conf);
  FileStatus[] list = fs.listStatus(new Path(HDFS_DIR + "/APP_MYTAB1/0/"));
  assertEquals(1, list.length);
  
  conf.set(RowInputFormat.INPUT_TABLE, "MYTAB1");
  conf.set(RowInputFormat.HOME_DIR, HDFS_DIR);
  
  JobConf job = new JobConf(conf);
  job.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
  GFXDHiveInputFormat ipformat = new GFXDHiveInputFormat();
  InputSplit[] splits = ipformat.getSplits(job, 2);
  assertEquals(1, splits.length);
  GFXDHiveSplit split = (GFXDHiveSplit) splits[0];
  assertEquals(1, split.getPaths().length);
  assertEquals(list[0].getPath().toString(), split.getPath(0).toString());
  assertEquals(0, split.getOffset(0));
  assertEquals(list[0].getLen(), split.getLength(0));

  RecordReader<Key, Row> rr = ipformat.getRecordReader(split, job, null);
  assertTrue("Row record reader should be an instace of GFXDHiveRowRecordReader " +
  		"but it is an instance of " + rr.getClass(),  (rr instanceof GFXDHiveRowRecordReader));
  Key key = rr.createKey();
  Row value = rr.createValue();

  int count = 0;
  while (rr.next(key, value)) {
    assertEquals(count++, value.getRowAsResultSet().getInt("col1"));
  }
  
  assertEquals(20, count);
  
  TestUtil.shutDown();
}
 
源代码13 项目: gemfirexd-oss   文件: EventInputFormatTest.java
public void testEventInputFormat() throws Exception {
  getConnection();
  Connection conn = startNetserverAndGetLocalNetConnection();
  
  Statement st = conn.createStatement();
  st.execute("create hdfsstore myhdfs namenode 'localhost' homedir '" + HDFS_DIR + "' batchtimeinterval 5000 milliseconds");
  st.execute("create table app.mytab1 (col1 int primary key, col2 varchar(100)) persistent hdfsstore (myhdfs) BUCKETS 1");

  PreparedStatement ps = conn.prepareStatement("insert into mytab1 values (?, ?)");
  int NUM_ENTRIES = 20;
  for(int i = 0; i < NUM_ENTRIES; i++) {
    ps.setInt(1, i);
    ps.setString(2, "Value-" + System.nanoTime());
    ps.execute();
  }
  //Wait for data to get to HDFS...
  String qname = HDFSStoreFactoryImpl.getEventQueueName("/APP/MYTAB1");
  st.execute("CALL SYS.WAIT_FOR_SENDER_QUEUE_FLUSH('" + qname + "', 1, 0)");
  
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(conf);
  FileStatus[] list = fs.listStatus(new Path(HDFS_DIR + "/APP_MYTAB1/0/"));
  assertEquals(1, list.length);
  
  conf.set(RowInputFormat.INPUT_TABLE, "MYTAB1");
  conf.set(RowInputFormat.HOME_DIR, HDFS_DIR);
  
  JobConf job = new JobConf(conf);
  job.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
  RowInputFormat ipformat = new RowInputFormat();
  InputSplit[] splits = ipformat.getSplits(job, 2);
  assertEquals(1, splits.length);
  CombineFileSplit split = (CombineFileSplit) splits[0];
  assertEquals(1, split.getPaths().length);
  assertEquals(list[0].getPath().toString(), split.getPath(0).toString());
  assertEquals(0, split.getOffset(0));
  assertEquals(list[0].getLen(), split.getLength(0));

  RecordReader<Key, Row> rr = ipformat.getRecordReader(split, job, null);
  Key key = rr.createKey();
  Row value = rr.createValue();

  int count = 0;
  while (rr.next(key, value)) {
    assertEquals(count++, value.getRowAsResultSet().getInt("col1"));
  }
  
  assertEquals(20, count);
  
  TestUtil.shutDown();
}
 
源代码14 项目: gemfirexd-oss   文件: EventInputFormatTest.java
public void testNoSecureHdfsCheck() throws Exception {
  getConnection();
  Connection conn = startNetserverAndGetLocalNetConnection();
  
  
  Statement st = conn.createStatement();
  st.execute("create hdfsstore myhdfs namenode 'localhost' homedir '" + HDFS_DIR + "'  batchtimeinterval 5000 milliseconds");
  st.execute("create table app.mytab1 (col1 int primary key, col2 varchar(100)) persistent hdfsstore (myhdfs) BUCKETS 1");
  
  PreparedStatement ps = conn.prepareStatement("insert into mytab1 values (?, ?)");
  int NUM_ENTRIES = 20;
  for(int i = 0; i < NUM_ENTRIES; i++) {
    ps.setInt(1, i);
    ps.setString(2, "Value-" + System.nanoTime());
    ps.execute();
  }
  //Wait for data to get to HDFS...
  String qname = HDFSStoreFactoryImpl.getEventQueueName("/APP/MYTAB1");
  st.execute("CALL SYS.WAIT_FOR_SENDER_QUEUE_FLUSH('" + qname + "', 1, 0)");
  
  stopNetServer();
  FabricServiceManager.currentFabricServiceInstance().stop(new Properties());
  
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(conf);
  FileStatus[] list = fs.listStatus(new Path(HDFS_DIR + "/APP_MYTAB1/0/"));
  assertEquals(1, list.length);
  
  conf.set(RowInputFormat.INPUT_TABLE, "MYTAB1");
  conf.set(RowInputFormat.HOME_DIR, HDFS_DIR);
  conf.set("hadoop.security.authentication", "kerberos");
  
  JobConf job = new JobConf(conf);
  job.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
  RowInputFormat ipformat = new RowInputFormat();
  InputSplit[] splits = ipformat.getSplits(job, 2);
  assertEquals(1, splits.length);
  CombineFileSplit split = (CombineFileSplit) splits[0];
  assertEquals(1, split.getPaths().length);
  assertEquals(list[0].getPath().toString(), split.getPath(0).toString());
  assertEquals(0, split.getOffset(0));
  assertEquals(list[0].getLen(), split.getLength(0));
  
  RecordReader<Key, Row> rr = ipformat.getRecordReader(split, job, null);
  Key key = rr.createKey();
  Row value = rr.createValue();
  
  int count = 0;
  while (rr.next(key, value)) {
    assertEquals(count++, value.getRowAsResultSet().getInt("col1"));
  }
  
  assertEquals(20, count);
  
  TestUtil.shutDown();
}
 
源代码15 项目: gemfirexd-oss   文件: EventInputFormatTest.java
private void doTestRowSerDe(boolean concurrencyChecks) throws Exception {
  getConnection();
  Connection conn = startNetserverAndGetLocalNetConnection();
  final long statTS = System.currentTimeMillis();
  Statement st = conn.createStatement();
  st.execute("create hdfsstore myhdfs namenode 'localhost' homedir '" + HDFS_DIR + "' batchtimeinterval 5000 milliseconds");
  String concurrency = "persistent ENABLE CONCURRENCY CHECKS";
  st.execute("create table app.mytab1 (col1 int primary key, col2 varchar(100)) partition by primary key buckets 1 hdfsstore (myhdfs) "
      +(concurrencyChecks ? concurrency : ""));

  PreparedStatement ps = conn.prepareStatement("insert into mytab1 values (?, ?)");
  ps.setInt(1, 1);
  ps.setString(2, "Value-1");
  ps.execute();
  
  //Wait for data to get to HDFS...
  String qname = HDFSStoreFactoryImpl.getEventQueueName("/APP/MYTAB1");
  st.execute("CALL SYS.WAIT_FOR_SENDER_QUEUE_FLUSH('" + qname + "', 1, 0)");
  
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(conf);
  FileStatus[] list = fs.listStatus(new Path(HDFS_DIR + "/APP_MYTAB1/0/"));
  assertEquals(1, list.length);
  
  conf.set(RowInputFormat.INPUT_TABLE, "MYTAB1");
  conf.set(RowInputFormat.HOME_DIR, HDFS_DIR);
  
  JobConf job = new JobConf(conf);
  job.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
  RowInputFormat ipformat = new RowInputFormat();
  InputSplit[] splits = ipformat.getSplits(job, 2);
  assertEquals(1, splits.length);
  RecordReader<Key, Row> rr = ipformat.getRecordReader(splits[0], job, null);
  Key key = rr.createKey();
  Row value = rr.createValue();
  assertTrue(rr.next(key, value));
  assertEquals(1, value.getRowAsResultSet().getInt(1));
  assertEquals("Value-1", value.getRowAsResultSet().getString(2));
  assertTrue(value.getTimestamp() > statTS);
  assertFalse(value.getRowAsResultSet().next());
  
  ByteArrayOutputStream baos = new ByteArrayOutputStream();
  DataOutputStream dos = new DataOutputStream(baos);
  value.write(dos);
  dos.close();
  
  byte[] buf = baos.toByteArray();
  DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
  Row row = new Row();
  row.readFields(dis);
  dis.close();
  
  assertEquals(1, row.getRowAsResultSet().getInt(1));
  assertEquals("Value-1", row.getRowAsResultSet().getString(2));
  assertFalse(value.getRowAsResultSet().next());
  
  TestUtil.shutDown();
}
 
源代码16 项目: hudi   文件: TestHoodieCombineHiveInputFormat.java
@Test
@Disabled
public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception {

  Configuration conf = new Configuration();
  // initial commit
  Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
  HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
  String commitTime = "100";
  final int numRecords = 1000;
  // Create 3 parquet files with 1000 records each
  File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime);
  InputFormatTestUtil.commit(tempDir, commitTime);

  // insert 1000 update records to log file 0
  String newCommitTime = "101";
  HoodieLogFormat.Writer writer =
      InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", commitTime, newCommitTime,
          numRecords, numRecords, 0);
  writer.close();
  // insert 1000 update records to log file 1
  writer =
      InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid1", commitTime, newCommitTime,
          numRecords, numRecords, 0);
  writer.close();
  // insert 1000 update records to log file 2
  writer =
      InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid2", commitTime, newCommitTime,
          numRecords, numRecords, 0);
  writer.close();

  TableDesc tblDesc = Utilities.defaultTd;
  // Set the input format
  tblDesc.setInputFileFormatClass(HoodieCombineHiveInputFormat.class);
  PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
  LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
  pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc);
  MapredWork mrwork = new MapredWork();
  mrwork.getMapWork().setPathToPartitionInfo(pt);
  Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
  Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
  jobConf = new JobConf(conf);
  // Add the paths
  FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
  jobConf.set(HAS_MAP_WORK, "true");
  // The following config tells Hive to choose ExecMapper to read the MAP_WORK
  jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
  // setting the split size to be 3 to create one split for 3 file groups
  jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "3");

  HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat();
  String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double";
  InputFormatTestUtil.setPropsForInputFormat(jobConf, schema, tripsHiveColumnTypes);
  InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
  // Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 file groups
  assertEquals(1, splits.length);
  RecordReader<NullWritable, ArrayWritable> recordReader =
      combineHiveInputFormat.getRecordReader(splits[0], jobConf, null);
  NullWritable nullWritable = recordReader.createKey();
  ArrayWritable arrayWritable = recordReader.createValue();
  int counter = 0;
  while (recordReader.next(nullWritable, arrayWritable)) {
    // read over all the splits
    counter++;
  }
  // should read out 3 splits, each for file0, file1, file2 containing 1000 records each
  assertEquals(3000, counter);
}
 
源代码17 项目: big-c   文件: PipesMapRunner.java
/**
 * Run the map task.
 * @param input the set of inputs
 * @param output the object to collect the outputs of the map
 * @param reporter the object to update with status
 */
@SuppressWarnings("unchecked")
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                Reporter reporter) throws IOException {
  Application<K1, V1, K2, V2> application = null;
  try {
    RecordReader<FloatWritable, NullWritable> fakeInput = 
      (!Submitter.getIsJavaRecordReader(job) && 
       !Submitter.getIsJavaMapper(job)) ? 
 (RecordReader<FloatWritable, NullWritable>) input : null;
    application = new Application<K1, V1, K2, V2>(job, fakeInput, output, 
                                                  reporter,
        (Class<? extends K2>) job.getOutputKeyClass(), 
        (Class<? extends V2>) job.getOutputValueClass());
  } catch (InterruptedException ie) {
    throw new RuntimeException("interrupted", ie);
  }
  DownwardProtocol<K1, V1> downlink = application.getDownlink();
  boolean isJavaInput = Submitter.getIsJavaRecordReader(job);
  downlink.runMap(reporter.getInputSplit(), 
                  job.getNumReduceTasks(), isJavaInput);
  boolean skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
  try {
    if (isJavaInput) {
      // allocate key & value instances that are re-used for all entries
      K1 key = input.createKey();
      V1 value = input.createValue();
      downlink.setInputTypes(key.getClass().getName(),
                             value.getClass().getName());
      
      while (input.next(key, value)) {
        // map pair to output
        downlink.mapItem(key, value);
        if(skipping) {
          //flush the streams on every record input if running in skip mode
          //so that we don't buffer other records surrounding a bad record.
          downlink.flush();
        }
      }
      downlink.endOfInput();
    }
    application.waitForFinish();
  } catch (Throwable t) {
    application.abort(t);
  } finally {
    application.cleanup();
  }
}
 
源代码18 项目: big-c   文件: MultithreadedMapRunner.java
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                Reporter reporter)
  throws IOException {
  try {
    // allocate key & value instances these objects will not be reused
    // because execution of Mapper.map is not serialized.
    K1 key = input.createKey();
    V1 value = input.createValue();

    while (input.next(key, value)) {

      executorService.execute(new MapperInvokeRunable(key, value, output,
                              reporter));

      checkForExceptionsFromProcessingThreads();

      // Allocate new key & value instances as mapper is running in parallel
      key = input.createKey();
      value = input.createValue();
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("Finished dispatching all Mappper.map calls, job "
                + job.getJobName());
    }

    // Graceful shutdown of the Threadpool, it will let all scheduled
    // Runnables to end.
    executorService.shutdown();

    try {

      // Now waiting for all Runnables to end.
      while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Awaiting all running Mappper.map calls to finish, job "
                    + job.getJobName());
        }

        // NOTE: while Mapper.map dispatching has concluded there are still
        // map calls in progress and exceptions would be thrown.
        checkForExceptionsFromProcessingThreads();

      }

      // NOTE: it could be that a map call has had an exception after the
      // call for awaitTermination() returing true. And edge case but it
      // could happen.
      checkForExceptionsFromProcessingThreads();

    } catch (IOException ioEx) {
      // Forcing a shutdown of all thread of the threadpool and rethrowing
      // the IOException
      executorService.shutdownNow();
      throw ioEx;
    } catch (InterruptedException iEx) {
      throw new RuntimeException(iEx);
    }

  } finally {
    mapper.close();
  }
}
 
源代码19 项目: big-c   文件: InputSampler.java
/**
 * Randomize the split order, then take the specified number of keys from
 * each split sampled, where each key is selected with the specified
 * probability and possibly replaced by a subsequently selected key when
 * the quota of keys from that split is satisfied.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);

  Random r = new Random();
  long seed = r.nextLong();
  r.setSeed(seed);
  LOG.debug("seed: " + seed);
  // shuffle splits
  for (int i = 0; i < splits.length; ++i) {
    InputSplit tmp = splits[i];
    int j = r.nextInt(splits.length);
    splits[i] = splits[j];
    splits[j] = tmp;
  }
  // our target rate is in terms of the maximum number of sample splits,
  // but we accept the possibility of sampling additional splits to hit
  // the target sample keyset
  for (int i = 0; i < splitsToSample ||
                 (i < splits.length && samples.size() < numSamples); ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
        Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      if (r.nextDouble() <= freq) {
        if (samples.size() < numSamples) {
          samples.add(key);
        } else {
          // When exceeding the maximum number of samples, replace a
          // random element with this one, then adjust the frequency
          // to reflect the possibility of existing elements being
          // pushed out
          int ind = r.nextInt(numSamples);
          if (ind != numSamples) {
            samples.set(ind, key);
          }
          freq *= (numSamples - 1) / (double) numSamples;
        }
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码20 项目: mnemonic   文件: MneMapredChunkDataTest.java
@Test(enabled = true, dependsOnMethods = {"testWriteChunkData"})
public void testReadChunkData() throws Exception {
  List<String> partfns = new ArrayList<String>();
  long reccnt = 0L;
  long tsize = 0L;
  Checksum cs = new CRC32();
  cs.reset();
  File folder = new File(m_workdir.toString());
  File[] listfiles = folder.listFiles();
  for (int idx = 0; idx < listfiles.length; ++idx) {
    if (listfiles[idx].isFile()
            && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
            && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
      partfns.add(listfiles[idx].getName());
    }
  }
  Collections.sort(partfns); // keep the order for checksum
  for (int idx = 0; idx < partfns.size(); ++idx) {
    System.out.println(String.format("Verifying : %s", partfns.get(idx)));
    FileSplit split = new FileSplit(
            new Path(m_workdir, partfns.get(idx)), 0, 0L, new String[0]);
    InputFormat<NullWritable, MneDurableInputValue<DurableChunk<?>>> inputFormat =
            new MneInputFormat<MneDurableInputValue<DurableChunk<?>>, DurableChunk<?>>();
    RecordReader<NullWritable, MneDurableInputValue<DurableChunk<?>>> reader =
            inputFormat.getRecordReader(split, m_conf, null);
    MneDurableInputValue<DurableChunk<?>> dchkval = null;
    NullWritable dchkkey = reader.createKey();
    while (true) {
      dchkval = reader.createValue();
      if (reader.next(dchkkey, dchkval)) {
        byte b;
        for (int j = 0; j < dchkval.getValue().getSize(); ++j) {
          b = unsafe.getByte(dchkval.getValue().get() + j);
          cs.update(b);
        }
        tsize += dchkval.getValue().getSize();
        ++reccnt;
      } else {
        break;
      }
    }
    reader.close();
  }
  AssertJUnit.assertEquals(m_reccnt, reccnt);
  AssertJUnit.assertEquals(m_totalsize, tsize);
  AssertJUnit.assertEquals(m_checksum, cs.getValue());
  System.out.println(String.format("The checksum of chunk is %d", m_checksum));
}