org.apache.log4j.spi.ErrorCode#com.amazonaws.services.kinesis.model.PutRecordRequest源码实例Demo

下面列出了org.apache.log4j.spi.ErrorCode#com.amazonaws.services.kinesis.model.PutRecordRequest 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Bean
public AsyncHandler<PutRecordRequest, PutRecordResult> asyncHandler() {
	return new AsyncHandler<PutRecordRequest, PutRecordResult>() {

		@Override
		public void onError(Exception exception) {

		}

		@Override
		public void onSuccess(PutRecordRequest request, PutRecordResult putRecordsResult) {
			ProcessorConfiguration.this.resultMonoProcessor.onNext(putRecordsResult);
			ProcessorConfiguration.this.resultMonoProcessor.onComplete();
		}

	};
}
 
/**
 * Check if the input aggregated record is complete and if so, forward it to the
 * configured destination Kinesis stream.
 * 
 * @param logger The LambdaLogger from the input Context
 * @param aggRecord The aggregated record to transmit or null if the record isn't full yet.
 */
private void checkAndForwardRecords(LambdaLogger logger, AggRecord aggRecord)
{
    if(aggRecord == null)
    {
        return;
    }
    
    logger.log("Forwarding " + aggRecord.getNumUserRecords() + " as an aggregated record.");
    
    PutRecordRequest request = aggRecord.toPutRecordRequest(DESTINATION_STREAM_NAME);
    try
    {
        PutRecordResult result = this.kinesisForwarder.putRecord(request);
        logger.log("Successfully published record Seq #" + result.getSequenceNumber() + " to shard " + result.getShardId());
    }
    catch(Exception e)
    {
        logger.log("ERROR: Failed to forward Kinesis records to destination stream: " + e.getMessage());
        return;
    }
}
 
public void processTuple(T tuple)
{
  // Send out single data
  try {
    if (isBatchProcessing) {
      if (putRecordsRequestEntryList.size() == batchSize) {
        flushRecords();
        logger.debug( "flushed {} records.", batchSize );
      }
      addRecord(tuple);
    } else {
      Pair<String, V> keyValue = tupleToKeyValue(tuple);
      PutRecordRequest requestRecord = new PutRecordRequest();
      requestRecord.setStreamName(streamName);
      requestRecord.setPartitionKey(keyValue.first);
      requestRecord.setData(ByteBuffer.wrap(getRecord(keyValue.second)));
      client.putRecord(requestRecord);
    }
    sendCount++;
  } catch (AmazonClientException e) {
    throw new RuntimeException(e);
  }
}
 
源代码4 项目: aws-big-data-blog   文件: StreamSource.java
/**
 * Process the input file and send PutRecordRequests to Amazon Kinesis.
 * 
 * This function serves to Isolate StreamSource logic so subclasses
 * can process input files differently.
 * 
 * @param inputStream
 *        the input stream to process
 * @param iteration
 *        the iteration if looping over file
 * @throws IOException
 *         throw exception if error processing inputStream.
 */
protected void processInputStream(InputStream inputStream, int iteration) throws IOException {
    try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream))) {
        String line;
        int lines = 0;
        while ((line = br.readLine()) != null) {
            KinesisMessageModel kinesisMessageModel = objectMapper.readValue(line, KinesisMessageModel.class);

            PutRecordRequest putRecordRequest = new PutRecordRequest();
            putRecordRequest.setStreamName(config.KINESIS_INPUT_STREAM);
            putRecordRequest.setData(ByteBuffer.wrap(line.getBytes()));
            putRecordRequest.setPartitionKey(Integer.toString(kinesisMessageModel.getUserid()));
            kinesisClient.putRecord(putRecordRequest);
            lines++;
        }
        LOG.info("Added " + lines + " records to stream source.");
    }
}
 
源代码5 项目: aws-big-data-blog   文件: ProducerBase.java
public void run() {

		while (true) {
			try {
				// get message from queue - blocking so code will wait here for work to do
				Event event = eventsQueue.take();

				PutRecordRequest put = new PutRecordRequest();
				put.setStreamName(this.streamName);

				put.setData(event.getData());
				put.setPartitionKey(event.getPartitionKey());

				PutRecordResult result = kinesisClient.putRecord(put);
				logger.info(result.getSequenceNumber() + ": {}", this);	

			} catch (Exception e) {
				// didn't get record - move on to next\
				e.printStackTrace();		
			}
		}

	}
 
源代码6 项目: kinesis-log4j-appender   文件: KinesisAppender.java
/**
 * This method is called whenever a logging happens via logger.log(..) API
 * calls. Implementation for this appender will take in log events instantly
 * as long as the buffer is not full (as per user configuration). This call
 * will block if internal buffer is full until internal threads create some
 * space by publishing some of the records.
 * 
 * If there is any error in parsing logevents, those logevents would be
 * dropped.
 */
@Override
public void append(LoggingEvent logEvent) {
  if (initializationFailed) {
    error("Check the configuration and whether the configured stream " + streamName
        + " exists and is active. Failed to initialize kinesis log4j appender: " + name);
    return;
  }
  try {
    String message = layout.format(logEvent);
    ByteBuffer data = ByteBuffer.wrap(message.getBytes(encoding));
    kinesisClient.putRecordAsync(new PutRecordRequest().withPartitionKey(UUID.randomUUID().toString())
        .withStreamName(streamName).withData(data), asyncCallHander);
  } catch (Exception e) {
    LOGGER.error("Failed to schedule log entry for publishing into Kinesis stream: " + streamName);
    errorHandler.error("Failed to schedule log entry for publishing into Kinesis stream: " + streamName, e,
        ErrorCode.WRITE_FAILURE, logEvent);
  }
}
 
源代码7 项目: amazon-kinesis-connectors   文件: StreamSource.java
/**
 * Process the input file and send PutRecordRequests to Amazon Kinesis.
 * 
 * This function serves to Isolate StreamSource logic so subclasses
 * can process input files differently.
 * 
 * @param inputStream
 *        the input stream to process
 * @param iteration
 *        the iteration if looping over file
 * @throws IOException
 *         throw exception if error processing inputStream.
 */
protected void processInputStream(InputStream inputStream, int iteration) throws IOException {
    try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream))) {
        String line;
        int lines = 0;
        while ((line = br.readLine()) != null) {
            KinesisMessageModel kinesisMessageModel = objectMapper.readValue(line, KinesisMessageModel.class);

            PutRecordRequest putRecordRequest = new PutRecordRequest();
            putRecordRequest.setStreamName(config.KINESIS_INPUT_STREAM);
            putRecordRequest.setData(ByteBuffer.wrap(line.getBytes()));
            putRecordRequest.setPartitionKey(Integer.toString(kinesisMessageModel.getUserid()));
            kinesisClient.putRecord(putRecordRequest);
            lines++;
        }
        LOG.info("Added " + lines + " records to stream source.");
    }
}
 
源代码8 项目: presto   文件: MockKinesisClient.java
@Override
public PutRecordResult putRecord(PutRecordRequest putRecordRequest)
        throws AmazonClientException
{
    // Setup method to add a new record:
    InternalStream theStream = this.getStream(putRecordRequest.getStreamName());
    if (theStream != null) {
        return theStream.putRecord(putRecordRequest.getData(), putRecordRequest.getPartitionKey());
    }
    else {
        throw new AmazonClientException("This stream does not exist!");
    }
}
 
public com.amazonaws.services.kinesis.AmazonKinesis build(AmazonKinesis kinesisProperties) {
    return new AbstractAmazonKinesis() {
        public PutRecordResult putRecord(PutRecordRequest request) {
            // do nothing
            return new PutRecordResult();
        }
    };
}
 
源代码10 项目: zipkin-aws   文件: KinesisSender.java
@Override public Call<Void> sendSpans(List<byte[]> list) {
  if (closeCalled) throw new IllegalStateException("closed");

  ByteBuffer message = ByteBuffer.wrap(BytesMessageEncoder.forEncoding(encoding()).encode(list));

  PutRecordRequest request = new PutRecordRequest();
  request.setStreamName(streamName);
  request.setData(message);
  request.setPartitionKey(getPartitionKey());

  return new KinesisCall(request);
}
 
源代码11 项目: json-data-generator   文件: KinesisLogger.java
@Override
public void logEvent(String event, Map<String, Object> producerConfig) {

    String streamName = (String) producerConfig.get("stream");
    if(streamName == null){
        streamName = this.streamName;
    }

    sequenceNumber.getAndIncrement();
    try {

        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setStreamName( streamName);
        putRecordRequest.setData(generateData(event));
        putRecordRequest.setPartitionKey( TIMESTAMP);
        PutRecordResult putRecordResult = kinesisClient.putRecord( putRecordRequest );
    } catch (Exception ex) {
        //got interrupted while waiting
        log.error("Error while publishing events : ", ex);
    }
    long totalTimeElasped = System.currentTimeMillis() - startTimeFull;
    log.info("Events Published : " +  sequenceNumber + " events in " + (totalTimeElasped / 1000) + " secs");
    if(this.maxRecords != 0 && sequenceNumber.intValue() == maxRecords){
        shutdown();
        System.exit(0);
    }
}
 
源代码12 项目: attic-apex-malhar   文件: KinesisTestProducer.java
@Override
public void run()
{
  if (records == null) {
    generateRecords();
  } else {
    for (String msg : records) {
      PutRecordRequest putRecordRequest = new PutRecordRequest();
      putRecordRequest.setStreamName(streamName);
      putRecordRequest.setData(ByteBuffer.wrap(msg.getBytes()));
      putRecordRequest.setPartitionKey(msg);
      client.putRecord(putRecordRequest);
    }
  }
}
 
源代码13 项目: presto-kinesis   文件: MockKinesisClient.java
@Override
public PutRecordResult putRecord(PutRecordRequest putRecordRequest) throws AmazonServiceException, AmazonClientException
{
    // Setup method to add a new record:
    InternalStream theStream = this.getStream(putRecordRequest.getStreamName());
    if (theStream != null) {
        PutRecordResult result = theStream.putRecord(putRecordRequest.getData(), putRecordRequest.getPartitionKey());
        return result;
    }
    else {
        throw new AmazonClientException("This stream does not exist!");
    }
}
 
源代码14 项目: aws-big-data-blog   文件: BatchedStreamSource.java
private void flushBuffer() throws IOException {
    PutRecordRequest putRecordRequest = new PutRecordRequest();
    putRecordRequest.setStreamName(config.KINESIS_INPUT_STREAM);
    putRecordRequest.setData(ByteBuffer.wrap(bufferToBytes()));
    putRecordRequest.setPartitionKey(String.valueOf(UUID.randomUUID()));
    kinesisClient.putRecord(putRecordRequest);
    buffer.clear();
}
 
源代码15 项目: flink   文件: KinesisPubsubClient.java
public void sendMessage(String topic, String msg) {
	PutRecordRequest putRecordRequest = new PutRecordRequest();
	putRecordRequest.setStreamName(topic);
	putRecordRequest.setPartitionKey("fakePartitionKey");
	putRecordRequest.withData(ByteBuffer.wrap(msg.getBytes()));
	PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
	LOG.info("added record: {}", putRecordResult.getSequenceNumber());
}
 
/**
 * This method is invoked when a log record is successfully sent to Kinesis.
 * Though this is not too useful for production use cases, it provides a good
 * debugging tool while tweaking parameters for the appender.
 */
@Override
public void onSuccess(PutRecordRequest request, PutRecordResult result) {
  successfulRequestCount++;
  if (logger.isDebugEnabled() && (successfulRequestCount + failedRequestCount) % 3000 == 0) {
    logger.debug("Appender (" + appenderName + ") made " + successfulRequestCount
        + " successful put requests out of total " + (successfulRequestCount + failedRequestCount) + " in "
        + PeriodFormat.getDefault().print(new Period(startTime, DateTime.now())) + " since start");
  }
}
 
@Override
public List<byte[]> emit(final UnmodifiableBuffer<byte[]> buffer) throws IOException {
    // Store the contents of buffer.getRecords because superclass will
    // clear the buffer on success
    List<byte[]> failed = super.emit(buffer);
    // calls S3Emitter to write objects to Amazon S3
    if (!failed.isEmpty()) {
        return buffer.getRecords();
    }
    String s3File = getS3FileName(buffer.getFirstSequenceNumber(), buffer.getLastSequenceNumber());
    // wrap the name of the Amazon S3 file as the record data
    ByteBuffer data = ByteBuffer.wrap(s3File.getBytes());
    // Put the list of file names to the manifest Amazon Kinesis stream
    PutRecordRequest putRecordRequest = new PutRecordRequest();
    putRecordRequest.setData(data);
    putRecordRequest.setStreamName(manifestStream);
    // Use constant partition key to ensure file order
    putRecordRequest.setPartitionKey(manifestStream);
    try {
        kinesisClient.putRecord(putRecordRequest);
        LOG.info("S3ManifestEmitter emitted record downstream: " + s3File);
        return Collections.emptyList();
    } catch (Exception e) {
        LOG.error(e);
        return buffer.getRecords();
    }
}
 
private void flushBuffer() throws IOException {
    PutRecordRequest putRecordRequest = new PutRecordRequest();
    putRecordRequest.setStreamName(config.KINESIS_INPUT_STREAM);
    putRecordRequest.setData(ByteBuffer.wrap(bufferToBytes()));
    putRecordRequest.setPartitionKey(String.valueOf(UUID.randomUUID()));
    kinesisClient.putRecord(putRecordRequest);
    buffer.clear();
}
 
@Test
@SuppressWarnings("unchecked")
public void testProducerErrorChannel() throws Exception {
	KinesisTestBinder binder = getBinder();

	final RuntimeException putRecordException = new RuntimeException(
			"putRecordRequestEx");
	final AtomicReference<Object> sent = new AtomicReference<>();
	AmazonKinesisAsync amazonKinesisMock = mock(AmazonKinesisAsync.class);
	BDDMockito
			.given(amazonKinesisMock.putRecordAsync(any(PutRecordRequest.class),
					any(AsyncHandler.class)))
			.willAnswer((Answer<Future<PutRecordResult>>) (invocation) -> {
				PutRecordRequest request = invocation.getArgument(0);
				sent.set(request.getData());
				AsyncHandler<?, ?> handler = invocation.getArgument(1);
				handler.onError(putRecordException);
				return mock(Future.class);
			});

	new DirectFieldAccessor(binder.getBinder()).setPropertyValue("amazonKinesis",
			amazonKinesisMock);

	ExtendedProducerProperties<KinesisProducerProperties> producerProps = createProducerProperties();
	producerProps.setErrorChannelEnabled(true);
	DirectChannel moduleOutputChannel = createBindableChannel("output",
			createProducerBindingProperties(producerProps));
	Binding<MessageChannel> producerBinding = binder.bindProducer("ec.0",
			moduleOutputChannel, producerProps);

	ApplicationContext applicationContext = TestUtils.getPropertyValue(
			binder.getBinder(), "applicationContext", ApplicationContext.class);
	SubscribableChannel ec = applicationContext.getBean("ec.0.errors",
			SubscribableChannel.class);
	final AtomicReference<Message<?>> errorMessage = new AtomicReference<>();
	final CountDownLatch latch = new CountDownLatch(1);
	ec.subscribe((message) -> {
		errorMessage.set(message);
		latch.countDown();
	});

	String messagePayload = "oops";
	moduleOutputChannel.send(new GenericMessage<>(messagePayload.getBytes()));

	assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
	assertThat(errorMessage.get()).isInstanceOf(ErrorMessage.class);
	assertThat(errorMessage.get().getPayload())
			.isInstanceOf(AwsRequestFailureException.class);
	AwsRequestFailureException exception = (AwsRequestFailureException) errorMessage
			.get().getPayload();
	assertThat(exception.getCause()).isSameAs(putRecordException);
	assertThat(((PutRecordRequest) exception.getRequest()).getData())
			.isSameAs(sent.get());
	producerBinding.unbind();
}
 
源代码20 项目: zipkin-aws   文件: KinesisSender.java
KinesisCall(PutRecordRequest message) {
  this.message = message;
}
 
源代码21 项目: zipkin-aws   文件: KinesisSender.java
@Override public void onSuccess(PutRecordRequest request, PutRecordResult result) {
  callback.onSuccess(null);
}
 
源代码22 项目: beam   文件: AmazonKinesisMock.java
@Override
public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
  throw new RuntimeException("Not implemented");
}
 
源代码23 项目: streams   文件: KinesisPersistWriter.java
@Override
public void write(StreamsDatum entry) {

  String document = (String) TypeConverterUtil.getInstance().convert(entry.getDocument(), String.class);

  PutRecordRequest putRecordRequest = new PutRecordRequest()
      .withStreamName(config.getStream())
      .withPartitionKey(entry.getId())
      .withData(ByteBuffer.wrap(document.getBytes()));

  PutRecordResult putRecordResult = client.putRecord(putRecordRequest);

  entry.setSequenceid(new BigInteger(putRecordResult.getSequenceNumber()));

  LOGGER.debug("Wrote {}", entry);
}
 
源代码24 项目: kinesis-aggregation   文件: AggRecord.java
/**
 * Convert the aggregated data in this record into a single PutRecordRequest.
 * This method has no side effects (i.e. it will not clear the current contents
 * of the aggregated record).
 * 
 * @param streamName
 *            The Kinesis stream name where this PutRecordRequest will be sent.
 * @return A PutRecordRequest containing all the current data in this aggregated
 *         record.
 */
public PutRecordRequest toPutRecordRequest(String streamName) {
	byte[] recordBytes = toRecordBytes();
	ByteBuffer bb = ByteBuffer.wrap(recordBytes);
	return new PutRecordRequest().withStreamName(streamName).withExplicitHashKey(getExplicitHashKey())
			.withPartitionKey(getPartitionKey()).withData(bb);
}