下面列出了org.apache.log4j.spi.ErrorCode#com.amazonaws.services.kinesis.model.PutRecordRequest 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Bean
public AsyncHandler<PutRecordRequest, PutRecordResult> asyncHandler() {
return new AsyncHandler<PutRecordRequest, PutRecordResult>() {
@Override
public void onError(Exception exception) {
}
@Override
public void onSuccess(PutRecordRequest request, PutRecordResult putRecordsResult) {
ProcessorConfiguration.this.resultMonoProcessor.onNext(putRecordsResult);
ProcessorConfiguration.this.resultMonoProcessor.onComplete();
}
};
}
/**
* Check if the input aggregated record is complete and if so, forward it to the
* configured destination Kinesis stream.
*
* @param logger The LambdaLogger from the input Context
* @param aggRecord The aggregated record to transmit or null if the record isn't full yet.
*/
private void checkAndForwardRecords(LambdaLogger logger, AggRecord aggRecord)
{
if(aggRecord == null)
{
return;
}
logger.log("Forwarding " + aggRecord.getNumUserRecords() + " as an aggregated record.");
PutRecordRequest request = aggRecord.toPutRecordRequest(DESTINATION_STREAM_NAME);
try
{
PutRecordResult result = this.kinesisForwarder.putRecord(request);
logger.log("Successfully published record Seq #" + result.getSequenceNumber() + " to shard " + result.getShardId());
}
catch(Exception e)
{
logger.log("ERROR: Failed to forward Kinesis records to destination stream: " + e.getMessage());
return;
}
}
public void processTuple(T tuple)
{
// Send out single data
try {
if (isBatchProcessing) {
if (putRecordsRequestEntryList.size() == batchSize) {
flushRecords();
logger.debug( "flushed {} records.", batchSize );
}
addRecord(tuple);
} else {
Pair<String, V> keyValue = tupleToKeyValue(tuple);
PutRecordRequest requestRecord = new PutRecordRequest();
requestRecord.setStreamName(streamName);
requestRecord.setPartitionKey(keyValue.first);
requestRecord.setData(ByteBuffer.wrap(getRecord(keyValue.second)));
client.putRecord(requestRecord);
}
sendCount++;
} catch (AmazonClientException e) {
throw new RuntimeException(e);
}
}
/**
* Process the input file and send PutRecordRequests to Amazon Kinesis.
*
* This function serves to Isolate StreamSource logic so subclasses
* can process input files differently.
*
* @param inputStream
* the input stream to process
* @param iteration
* the iteration if looping over file
* @throws IOException
* throw exception if error processing inputStream.
*/
protected void processInputStream(InputStream inputStream, int iteration) throws IOException {
try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
int lines = 0;
while ((line = br.readLine()) != null) {
KinesisMessageModel kinesisMessageModel = objectMapper.readValue(line, KinesisMessageModel.class);
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(config.KINESIS_INPUT_STREAM);
putRecordRequest.setData(ByteBuffer.wrap(line.getBytes()));
putRecordRequest.setPartitionKey(Integer.toString(kinesisMessageModel.getUserid()));
kinesisClient.putRecord(putRecordRequest);
lines++;
}
LOG.info("Added " + lines + " records to stream source.");
}
}
public void run() {
while (true) {
try {
// get message from queue - blocking so code will wait here for work to do
Event event = eventsQueue.take();
PutRecordRequest put = new PutRecordRequest();
put.setStreamName(this.streamName);
put.setData(event.getData());
put.setPartitionKey(event.getPartitionKey());
PutRecordResult result = kinesisClient.putRecord(put);
logger.info(result.getSequenceNumber() + ": {}", this);
} catch (Exception e) {
// didn't get record - move on to next\
e.printStackTrace();
}
}
}
/**
* This method is called whenever a logging happens via logger.log(..) API
* calls. Implementation for this appender will take in log events instantly
* as long as the buffer is not full (as per user configuration). This call
* will block if internal buffer is full until internal threads create some
* space by publishing some of the records.
*
* If there is any error in parsing logevents, those logevents would be
* dropped.
*/
@Override
public void append(LoggingEvent logEvent) {
if (initializationFailed) {
error("Check the configuration and whether the configured stream " + streamName
+ " exists and is active. Failed to initialize kinesis log4j appender: " + name);
return;
}
try {
String message = layout.format(logEvent);
ByteBuffer data = ByteBuffer.wrap(message.getBytes(encoding));
kinesisClient.putRecordAsync(new PutRecordRequest().withPartitionKey(UUID.randomUUID().toString())
.withStreamName(streamName).withData(data), asyncCallHander);
} catch (Exception e) {
LOGGER.error("Failed to schedule log entry for publishing into Kinesis stream: " + streamName);
errorHandler.error("Failed to schedule log entry for publishing into Kinesis stream: " + streamName, e,
ErrorCode.WRITE_FAILURE, logEvent);
}
}
/**
* Process the input file and send PutRecordRequests to Amazon Kinesis.
*
* This function serves to Isolate StreamSource logic so subclasses
* can process input files differently.
*
* @param inputStream
* the input stream to process
* @param iteration
* the iteration if looping over file
* @throws IOException
* throw exception if error processing inputStream.
*/
protected void processInputStream(InputStream inputStream, int iteration) throws IOException {
try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
int lines = 0;
while ((line = br.readLine()) != null) {
KinesisMessageModel kinesisMessageModel = objectMapper.readValue(line, KinesisMessageModel.class);
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(config.KINESIS_INPUT_STREAM);
putRecordRequest.setData(ByteBuffer.wrap(line.getBytes()));
putRecordRequest.setPartitionKey(Integer.toString(kinesisMessageModel.getUserid()));
kinesisClient.putRecord(putRecordRequest);
lines++;
}
LOG.info("Added " + lines + " records to stream source.");
}
}
@Override
public PutRecordResult putRecord(PutRecordRequest putRecordRequest)
throws AmazonClientException
{
// Setup method to add a new record:
InternalStream theStream = this.getStream(putRecordRequest.getStreamName());
if (theStream != null) {
return theStream.putRecord(putRecordRequest.getData(), putRecordRequest.getPartitionKey());
}
else {
throw new AmazonClientException("This stream does not exist!");
}
}
public com.amazonaws.services.kinesis.AmazonKinesis build(AmazonKinesis kinesisProperties) {
return new AbstractAmazonKinesis() {
public PutRecordResult putRecord(PutRecordRequest request) {
// do nothing
return new PutRecordResult();
}
};
}
@Override public Call<Void> sendSpans(List<byte[]> list) {
if (closeCalled) throw new IllegalStateException("closed");
ByteBuffer message = ByteBuffer.wrap(BytesMessageEncoder.forEncoding(encoding()).encode(list));
PutRecordRequest request = new PutRecordRequest();
request.setStreamName(streamName);
request.setData(message);
request.setPartitionKey(getPartitionKey());
return new KinesisCall(request);
}
@Override
public void logEvent(String event, Map<String, Object> producerConfig) {
String streamName = (String) producerConfig.get("stream");
if(streamName == null){
streamName = this.streamName;
}
sequenceNumber.getAndIncrement();
try {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName( streamName);
putRecordRequest.setData(generateData(event));
putRecordRequest.setPartitionKey( TIMESTAMP);
PutRecordResult putRecordResult = kinesisClient.putRecord( putRecordRequest );
} catch (Exception ex) {
//got interrupted while waiting
log.error("Error while publishing events : ", ex);
}
long totalTimeElasped = System.currentTimeMillis() - startTimeFull;
log.info("Events Published : " + sequenceNumber + " events in " + (totalTimeElasped / 1000) + " secs");
if(this.maxRecords != 0 && sequenceNumber.intValue() == maxRecords){
shutdown();
System.exit(0);
}
}
@Override
public void run()
{
if (records == null) {
generateRecords();
} else {
for (String msg : records) {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(streamName);
putRecordRequest.setData(ByteBuffer.wrap(msg.getBytes()));
putRecordRequest.setPartitionKey(msg);
client.putRecord(putRecordRequest);
}
}
}
@Override
public PutRecordResult putRecord(PutRecordRequest putRecordRequest) throws AmazonServiceException, AmazonClientException
{
// Setup method to add a new record:
InternalStream theStream = this.getStream(putRecordRequest.getStreamName());
if (theStream != null) {
PutRecordResult result = theStream.putRecord(putRecordRequest.getData(), putRecordRequest.getPartitionKey());
return result;
}
else {
throw new AmazonClientException("This stream does not exist!");
}
}
private void flushBuffer() throws IOException {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(config.KINESIS_INPUT_STREAM);
putRecordRequest.setData(ByteBuffer.wrap(bufferToBytes()));
putRecordRequest.setPartitionKey(String.valueOf(UUID.randomUUID()));
kinesisClient.putRecord(putRecordRequest);
buffer.clear();
}
public void sendMessage(String topic, String msg) {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(topic);
putRecordRequest.setPartitionKey("fakePartitionKey");
putRecordRequest.withData(ByteBuffer.wrap(msg.getBytes()));
PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
LOG.info("added record: {}", putRecordResult.getSequenceNumber());
}
/**
* This method is invoked when a log record is successfully sent to Kinesis.
* Though this is not too useful for production use cases, it provides a good
* debugging tool while tweaking parameters for the appender.
*/
@Override
public void onSuccess(PutRecordRequest request, PutRecordResult result) {
successfulRequestCount++;
if (logger.isDebugEnabled() && (successfulRequestCount + failedRequestCount) % 3000 == 0) {
logger.debug("Appender (" + appenderName + ") made " + successfulRequestCount
+ " successful put requests out of total " + (successfulRequestCount + failedRequestCount) + " in "
+ PeriodFormat.getDefault().print(new Period(startTime, DateTime.now())) + " since start");
}
}
@Override
public List<byte[]> emit(final UnmodifiableBuffer<byte[]> buffer) throws IOException {
// Store the contents of buffer.getRecords because superclass will
// clear the buffer on success
List<byte[]> failed = super.emit(buffer);
// calls S3Emitter to write objects to Amazon S3
if (!failed.isEmpty()) {
return buffer.getRecords();
}
String s3File = getS3FileName(buffer.getFirstSequenceNumber(), buffer.getLastSequenceNumber());
// wrap the name of the Amazon S3 file as the record data
ByteBuffer data = ByteBuffer.wrap(s3File.getBytes());
// Put the list of file names to the manifest Amazon Kinesis stream
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setData(data);
putRecordRequest.setStreamName(manifestStream);
// Use constant partition key to ensure file order
putRecordRequest.setPartitionKey(manifestStream);
try {
kinesisClient.putRecord(putRecordRequest);
LOG.info("S3ManifestEmitter emitted record downstream: " + s3File);
return Collections.emptyList();
} catch (Exception e) {
LOG.error(e);
return buffer.getRecords();
}
}
private void flushBuffer() throws IOException {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(config.KINESIS_INPUT_STREAM);
putRecordRequest.setData(ByteBuffer.wrap(bufferToBytes()));
putRecordRequest.setPartitionKey(String.valueOf(UUID.randomUUID()));
kinesisClient.putRecord(putRecordRequest);
buffer.clear();
}
@Test
@SuppressWarnings("unchecked")
public void testProducerErrorChannel() throws Exception {
KinesisTestBinder binder = getBinder();
final RuntimeException putRecordException = new RuntimeException(
"putRecordRequestEx");
final AtomicReference<Object> sent = new AtomicReference<>();
AmazonKinesisAsync amazonKinesisMock = mock(AmazonKinesisAsync.class);
BDDMockito
.given(amazonKinesisMock.putRecordAsync(any(PutRecordRequest.class),
any(AsyncHandler.class)))
.willAnswer((Answer<Future<PutRecordResult>>) (invocation) -> {
PutRecordRequest request = invocation.getArgument(0);
sent.set(request.getData());
AsyncHandler<?, ?> handler = invocation.getArgument(1);
handler.onError(putRecordException);
return mock(Future.class);
});
new DirectFieldAccessor(binder.getBinder()).setPropertyValue("amazonKinesis",
amazonKinesisMock);
ExtendedProducerProperties<KinesisProducerProperties> producerProps = createProducerProperties();
producerProps.setErrorChannelEnabled(true);
DirectChannel moduleOutputChannel = createBindableChannel("output",
createProducerBindingProperties(producerProps));
Binding<MessageChannel> producerBinding = binder.bindProducer("ec.0",
moduleOutputChannel, producerProps);
ApplicationContext applicationContext = TestUtils.getPropertyValue(
binder.getBinder(), "applicationContext", ApplicationContext.class);
SubscribableChannel ec = applicationContext.getBean("ec.0.errors",
SubscribableChannel.class);
final AtomicReference<Message<?>> errorMessage = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ec.subscribe((message) -> {
errorMessage.set(message);
latch.countDown();
});
String messagePayload = "oops";
moduleOutputChannel.send(new GenericMessage<>(messagePayload.getBytes()));
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(errorMessage.get()).isInstanceOf(ErrorMessage.class);
assertThat(errorMessage.get().getPayload())
.isInstanceOf(AwsRequestFailureException.class);
AwsRequestFailureException exception = (AwsRequestFailureException) errorMessage
.get().getPayload();
assertThat(exception.getCause()).isSameAs(putRecordException);
assertThat(((PutRecordRequest) exception.getRequest()).getData())
.isSameAs(sent.get());
producerBinding.unbind();
}
KinesisCall(PutRecordRequest message) {
this.message = message;
}
@Override public void onSuccess(PutRecordRequest request, PutRecordResult result) {
callback.onSuccess(null);
}
@Override
public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
throw new RuntimeException("Not implemented");
}
@Override
public void write(StreamsDatum entry) {
String document = (String) TypeConverterUtil.getInstance().convert(entry.getDocument(), String.class);
PutRecordRequest putRecordRequest = new PutRecordRequest()
.withStreamName(config.getStream())
.withPartitionKey(entry.getId())
.withData(ByteBuffer.wrap(document.getBytes()));
PutRecordResult putRecordResult = client.putRecord(putRecordRequest);
entry.setSequenceid(new BigInteger(putRecordResult.getSequenceNumber()));
LOGGER.debug("Wrote {}", entry);
}
/**
* Convert the aggregated data in this record into a single PutRecordRequest.
* This method has no side effects (i.e. it will not clear the current contents
* of the aggregated record).
*
* @param streamName
* The Kinesis stream name where this PutRecordRequest will be sent.
* @return A PutRecordRequest containing all the current data in this aggregated
* record.
*/
public PutRecordRequest toPutRecordRequest(String streamName) {
byte[] recordBytes = toRecordBytes();
ByteBuffer bb = ByteBuffer.wrap(recordBytes);
return new PutRecordRequest().withStreamName(streamName).withExplicitHashKey(getExplicitHashKey())
.withPartitionKey(getPartitionKey()).withData(bb);
}