com.codahale.metrics.json.MetricsModule#kafka.javaapi.producer.Producer源码实例Demo

下面列出了com.codahale.metrics.json.MetricsModule#kafka.javaapi.producer.Producer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: lambda-arch   文件: IoTDataProducer.java
public static void main(String[] args) throws Exception {
	//read config file
	Properties prop = PropertyFileReader.readPropertyFile();		
	String zookeeper = prop.getProperty("com.iot.app.kafka.zookeeper");
	String brokerList = prop.getProperty("com.iot.app.kafka.brokerlist");
	String topic = prop.getProperty("com.iot.app.kafka.topic");
	logger.info("Using Zookeeper=" + zookeeper + " ,Broker-list=" + brokerList + " and topic " + topic);

	// set producer properties
	Properties properties = new Properties();
	properties.put("zookeeper.connect", zookeeper);
	properties.put("metadata.broker.list", brokerList);
	properties.put("request.required.acks", "1");
	properties.put("serializer.class", "com.iot.app.kafka.util.IoTDataEncoder");
	//generate event
	Producer<String, IoTData> producer = new Producer<String, IoTData>(new ProducerConfig(properties));
	IoTDataProducer iotProducer = new IoTDataProducer();
	iotProducer.generateIoTEvent(producer,topic);		
}
 
源代码2 项目: Transwarp-Sample-Code   文件: kafkaProducer.java
/**
 * 读取配置文件,创建线程池,运行线程
 */
public void go() {
    Constant constant = new Constant();
    kafkaProperties kafkaProperties = new kafkaProperties();
    ProducerConfig config = new ProducerConfig(kafkaProperties.properties());

    ExecutorService executorService = Executors.newFixedThreadPool(Integer.parseInt(constant.THREAD_POOL_SIZE));

    String topic = constant.TOPIC_NAME;
    Task[] tasks = new Task[Integer.parseInt(constant.THREAD_NUM)];
    String[] folders = constant.FILE_FOLDERS.split(";");
    int batchSize = Integer.parseInt(constant.BATCH_SIZE);
    CopyOnWriteArrayList<String> fileList = addFiles(folders);

    for (int i = 0; i < tasks.length; ++i) {
        tasks[i] = new Task(i, topic, new Producer<String, String>(config), fileList, batchSize);
    }

    for (Task task : tasks) {
        executorService.execute(task);
    }
    executorService.shutdown();
}
 
源代码3 项目: mod-kafka   文件: KafkaProducerFactory.java
/**
 * Creates kafka producers based on given message serializer type.
 *
 * @param serializerType    message serializer type
 * @param properties        properties to be used to send a message
 * @return                  created kafka producer
 */
public Producer createProducer(MessageSerializerType serializerType, Properties properties) {
    Producer producer;

    switch (serializerType) {
            case BYTE_SERIALIZER:
                producer = new Producer<String, byte[]>(new ProducerConfig(properties));
                break;
            case STRING_SERIALIZER:
                producer = new Producer<String, String>(new ProducerConfig(properties));
                break;
            default:
                throw new IllegalArgumentException("Incorrect serialazier class specified...");
    }
    return producer;
}
 
public void execute(JavaPairRDD<String, byte[]> inputMessage) {
    JavaPairRDD<String, byte[]> partitionedRDD;
    if (config.getLocalMode())
        partitionedRDD = inputMessage;
    else {
        // Helps scale beyond number of input partitions in kafka
        partitionedRDD = inputMessage.repartition(config.getRepartitionCount());

    }

    partitionedRDD.foreachPartition(prdd -> {
        // You can choose binary or string encoder
        Producer validProducer = ConnectionManager.getKafkaSingletonConnectionWithBinaryEncoder(config);
        prdd.forEachRemaining(records -> {
            byte[] msg = records._2();
            try {
                // TODO: Add your logic here to process data
                // As default we are just publishing back to another kafka topic
                logger.info("Processing event=" + new String(msg));
                publishMessagesToKafka(validProducer, msg);
            } catch (Exception e){
                logger.error("Error processing message:" + msg);
            }
        });
    });
}
 
源代码5 项目: flume-plugin   文件: KafkaChannel.java
@Override
public void start() {
    try {
        LOGGER.info("Starting Kafka Channel: " + getName());
        producer = new Producer<String, byte[]>(new ProducerConfig(kafkaConf));
        // We always have just one topic being read by one thread
        LOGGER.info("Topic = " + topic.get());
        topicCountMap.put(topic.get(), 1);
        counter.start();
        super.start();
    } catch (Exception e) {
        LOGGER.error("Could not start producer");
        throw new FlumeException("Unable to create Kafka Connections. " +
                "Check whether Kafka Brokers are up and that the " +
                "Flume agent can connect to it.", e);
    }
}
 
源代码6 项目: twill   文件: SimpleKafkaPublisher.java
@Override
public ListenableFuture<Integer> send() {
  try {
    int size = messages.size();
    Producer<Integer, ByteBuffer> kafkaProducer = producer.get();
    if (kafkaProducer == null) {
      return Futures.immediateFailedFuture(new IllegalStateException("No kafka producer available."));
    }
    kafkaProducer.send(messages);
    return Futures.immediateFuture(size);
  } catch (Exception e) {
    return Futures.immediateFailedFuture(e);
  } finally {
    messages.clear();
  }
}
 
源代码7 项目: KafkaExample   文件: ProducerDemo.java
private static Producer<String, String> initProducer() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKER_LIST);
    // props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("serializer.class", StringEncoder.class.getName());
    props.put("partitioner.class", HashPartitioner.class.getName());
//    props.put("compression.codec", "0");
    props.put("producer.type", "sync");
    props.put("batch.num.messages", "1");
    props.put("queue.buffering.max.messages", "1000000");
    props.put("queue.enqueue.timeout.ms", "20000000");

    
    ProducerConfig config = new ProducerConfig(props);
    Producer<String, String> producer = new Producer<String, String>(config);
    return producer;
  }
 
源代码8 项目: iot-traffic-monitor   文件: IoTDataProducer.java
public static void main(String[] args) throws Exception {
	//read config file
	Properties prop = PropertyFileReader.readPropertyFile();		
	String zookeeper = prop.getProperty("com.iot.app.kafka.zookeeper");
	String brokerList = prop.getProperty("com.iot.app.kafka.brokerlist");
	String topic = prop.getProperty("com.iot.app.kafka.topic");
	logger.info("Using Zookeeper=" + zookeeper + " ,Broker-list=" + brokerList + " and topic " + topic);

	// set producer properties
	Properties properties = new Properties();
	properties.put("zookeeper.connect", zookeeper);
	properties.put("metadata.broker.list", brokerList);
	properties.put("request.required.acks", "1");
	properties.put("serializer.class", "com.iot.app.kafka.util.IoTDataEncoder");
	//generate event
	Producer<String, IoTData> producer = new Producer<String, IoTData>(new ProducerConfig(properties));
	IoTDataProducer iotProducer = new IoTDataProducer();
	iotProducer.generateIoTEvent(producer,topic);		
}
 
源代码9 项目: ingestion   文件: KafkaSink.java
@Override
public void configure(Context context) {
    topic = context.getString(CONF_TOPIC);
    if (topic == null) {
        throw new ConfigurationException("Kafka topic must be specified.");
    }

    writeBody = context.getBoolean(CONF_WRITE_BODY, DEFAULT_WRITE_BODY);

    ImmutableMap<String, String> subProperties = context.getSubProperties(CONF_KAFKA);
    Properties properties = new Properties();
    properties.putAll(subProperties);

    producer = new Producer<String, String>(new ProducerConfig(properties));

    mapper = new ObjectMapper();
}
 
源代码10 项目: pulsar   文件: PulsarKafkaProducer.java
private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
    try {
        pulsarProducerBuilder.messageRoutingMode(MessageRoutingMode.CustomPartition);
        pulsarProducerBuilder.messageRouter(new MessageRouter() {
            private static final long serialVersionUID = 1L;

            @Override
            public int choosePartition(Message<?> msg, TopicMetadata metadata) {
                // https://kafka.apache.org/08/documentation.html#producerapi
                // The default partitioner is based on the hash of the key.
                return partitioner.partition(msg.getKey(), metadata.numPartitions());
            }
        });
        log.info("Creating producer for topic {} with config {}", topic, pulsarProducerBuilder.toString());
        return pulsarProducerBuilder.clone().topic(topic).create();
    } catch (PulsarClientException e) {
        throw new RuntimeException(e);
    }
}
 
源代码11 项目: metrics-kafka   文件: KafkaReporter.java
private KafkaReporter(MetricRegistry registry, String name,
		TimeUnit rateUnit, TimeUnit durationUnit, boolean showSamples, MetricFilter filter,
		String topic, ProducerConfig config, String prefix,
		String hostName, String ip) {
	super(registry, name, filter, rateUnit, durationUnit);
	this.topic = topic;
	this.config = config;
	this.prefix = prefix;
	this.hostName = hostName;
	this.ip = ip;
	
	this.mapper = new ObjectMapper().registerModule(new MetricsModule(rateUnit,
               durationUnit,
               showSamples));

	producer = new Producer<String, String>(config);

	kafkaExecutor = Executors
			.newSingleThreadExecutor(new ThreadFactoryBuilder()
					.setNameFormat("kafka-producer-%d").build());
}
 
源代码12 项目: geowave   文件: StageToKafkaDriver.java
@Override
protected void processFile(
    final URL file,
    final String typeName,
    final GeoWaveAvroFormatPlugin<?, ?> plugin,
    final StageKafkaData<?> runData) {

  try {
    final Producer<String, Object> producer =
        (Producer<String, Object>) runData.getProducer(typeName, plugin);
    try (final CloseableIterator<?> avroRecords = plugin.toAvroObjects(file)) {
      while (avroRecords.hasNext()) {
        final Object avroRecord = avroRecords.next();
        final KeyedMessage<String, Object> data = new KeyedMessage<>(typeName, avroRecord);
        producer.send(data);
      }
    }
  } catch (final Exception e) {
    LOGGER.info(
        "Unable to send file [" + file.getPath() + "] to Kafka topic: " + e.getMessage(),
        e);
  }
}
 
源代码13 项目: cep   文件: KafkaSink.java
/**
 * Creates a new KafkaSink.
 * This method initializes and starts a new Kafka producer that will be
 * used to produce messages to kafka topics.
 * @param properties KafkaSink propertiers. You should provide a kafka_broker property
 *                   set to the Kafka host address. If none is provided localhost will
 *                   be used
 */

public KafkaSink(Map<String, Object> properties) {
    super(properties);
    // The producer config attributes
    Properties props = new Properties();
    if (properties != null && properties.get("kafka_brokers") != null) {
        props.put("metadata.broker.list", properties.get("kafka_brokers"));
    } else {
        props.put("metadata.broker.list", "127.0.0.1:9092");
    }
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.send.max.retries", "60");
    props.put("retry.backoff.ms", "1000");
    props.put("producer.type", "async");
    props.put("queue.buffering.max.messages", "10000");
    props.put("queue.buffering.max.ms", "500");
    props.put("partitioner.class", "net.redborder.cep.sinks.kafka.SimplePartitioner");

    // Initialize the producer
    ProducerConfig config = new ProducerConfig(props);
    producer = new Producer<>(config);
}
 
源代码14 项目: eagle   文件: KafkaStreamProxyProducerImpl.java
public KafkaStreamProxyProducerImpl(String streamId, StreamSinkConfig streamConfig) {
    Preconditions.checkNotNull(streamConfig, "Stream sink config for " + streamId + " is null");
    this.streamId = streamId;
    this.config = (KafkaStreamSinkConfig) streamConfig;
    Properties properties = new Properties();
    Preconditions.checkNotNull(config.getBrokerList(), "brokerList is null");
    properties.put("metadata.broker.list", config.getBrokerList());
    properties.put("serializer.class", config.getSerializerClass());
    properties.put("key.serializer.class", config.getKeySerializerClass());
    // new added properties for async producer
    properties.put("producer.type", config.getProducerType());
    properties.put("batch.num.messages", config.getNumBatchMessages());
    properties.put("request.required.acks", config.getRequestRequiredAcks());
    properties.put("queue.buffering.max.ms", config.getMaxQueueBufferMs());
    ProducerConfig producerConfig = new ProducerConfig(properties);
    this.producer = new Producer(producerConfig);
}
 
源代码15 项目: mt-flume   文件: KafkaSink.java
@Override
  public synchronized void start() {
      super.start();

Properties props = new Properties();
  	props.put("metadata.broker.list", brokerList);
  	props.put("request.required.acks", String.valueOf(requestRequiredAcks));
  	props.put("request.timeout.ms", String.valueOf(requestTimeoutms));
props.put("serializer.class", serializerClass);
props.put("partitioner.class", partitionerClass);
props.put("producer.type", producerType);
props.put("batch.num.messages", String.valueOf(batchNumMessages));
props.put("queue.buffering.max.messages", String.valueOf(queueBufferingMaxMessages));
props.put("topic.metadata.refresh.interval.ms", "30000");

producer = new Producer<String, String>(new ProducerConfig(props));
  }
 
源代码16 项目: yuzhouwan   文件: KafkaUtils.java
static Producer<String, byte[]> createProducer() {
        Properties props = new Properties();
        try {
//            props.put("zk.connect", p.getProperty("kafka.zk.connect"));   // not need zk in new version
            props.put("key.serializer.class", p.getProperty("kafka.key.serializer.class"));
            props.put("serializer.class", p.getProperty("kafka.serializer.class"));
            props.put("metadata.broker.list", p.getProperty("kafka.metadata.broker.list"));
            props.put("request.required.acks", p.getProperty("kafka.request.required.acks"));
            props.put("producer.type", p.getProperty("kafka.async"));
            props.put("partitioner.class", PARTITIONER_CLASS_NAME);

            props.put("queue.buffering.max.ms", p.getProperty("kafka.queue.buffering.max.ms"));
            props.put("queue.buffering.max.messages", p.getProperty("kafka.queue.buffering.max.messages"));
            props.put("queue.enqueue.timeout.ms", p.getProperty("kafka.queue.enqueue.timeout.ms"));
            // 41,0000,0000 / 24 / 60 / 60 = 47454 / 24 = 1977
            props.put("batch.num.messages", p.getProperty("kafka.batch.num.messages"));
            props.put("send.buffer.bytes", p.getProperty("kafka.send.buffer.bytes"));
//            props.put("compression.type", "lz4");
        } catch (Exception e) {
            _log.error("Connect with kafka failed {}!", e.getMessage());
            throw new RuntimeException(e);
        }
        _log.info("Connect with kafka successfully!");
        return new Producer<>(new ProducerConfig(props));
    }
 
源代码17 项目: Decision   文件: StreamToActionBusCallback.java
public StreamToActionBusCallback(Set<StreamAction> activeActions, String streamName,
        Producer<String, byte[]> avroProducer,
        Serializer<StratioStreamingMessage, Event> javaToSiddhiSerializer,
        Serializer<StratioStreamingMessage, byte[]> javaToAvroSerializer) {
    super(activeActions);
    this.streamName = streamName;
    this.avroProducer = avroProducer;
    this.javaToSiddhiSerializer = javaToSiddhiSerializer;
    this.javaToAvroSerializer = javaToAvroSerializer;
}
 
源代码18 项目: incubator-iotdb   文件: KafkaProducer.java
public KafkaProducer() {

    Properties props = new Properties();
    props.put("metadata.broker.list", "127.0.0.1:9092");
    props.put("zk.connect", "127.0.0.1:2181");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("key.serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "-1");

    producer = new Producer<>(new ProducerConfig(props));
  }
 
源代码19 项目: Decision   文件: KafkaConfiguration.java
@Bean
public Producer<String, byte[]> avroProducer() {

    Properties properties = new Properties();

    properties.put("metadata.broker.list", configurationContext.getKafkaHostsQuorum());
    properties.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
    properties.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

    return new Producer<String, byte[]>(new ProducerConfig(properties));
}
 
@Override
public void initialize(TSDB tsdb) {
  this.tsdb = tsdb;
  config = new KafkaRpcPluginConfig(tsdb.getConfig());

  setKafkaConfig();
  producer = new Producer<String, byte[]>(producer_config);
  LOG.info("Initialized kafka requeue publisher.");
}
 
源代码21 项目: metrics-kafka   文件: TopicReporter.java
private void send(Producer producer,String header, String topic, String message) {
    final Long time = TimeUnit.MILLISECONDS.toSeconds(clock.time() - startTime);
    try {
        producer.send(new KeyedMessage(topic, format("%s\n%d,%s", header, time, message).getBytes("UTF-8")));
    } catch (UnsupportedEncodingException e) {
        throw new RuntimeException(e);
    }
}
 
源代码22 项目: Transwarp-Sample-Code   文件: kafkaProducer.java
Task(int num, String topic, Producer producer, CopyOnWriteArrayList<String> fileList, int batchSize) {
    this.num = num;
    this.topic = topic;
    this.producer = producer;
    this.fileList = fileList;
    this.batchSize = batchSize;
}
 
源代码23 项目: mod-kafka   文件: StringMessageHandler.java
/**
 * {@inheritDoc}
 */
@Override
public void send(Producer producer, String topic, String partition, JsonObject message) {
    producer.send(new KeyedMessage<String, String>(
            topic,
            partition,
            message.getString(PAYLOAD)));
}
 
源代码24 项目: blog_demos   文件: KafkaProducer.java
/**
 * 禁止被外部实例化
 */
private KafkaProducer(){
    super();

    try {
        Properties props = new Properties();
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("zk.connect", Constants.ZK_HOST + ":2181");
        props.put("metadata.broker.list", Constants.BROKER_HOST + ":9092");
        producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
    } catch (Exception e) {
        e.printStackTrace();
    }
}
 
源代码25 项目: blog_demos   文件: MessageServiceImpl.java
@PostConstruct
public void init(){
    try {
        Properties props = new Properties();
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("zk.connect", "hostb1:2181,hostb1:2181,hostb1:2181");
        props.put("metadata.broker.list", "hostb1:9092,hostb1:9092,hostb1:9092");
        props.put("partitioner.class","com.bolingcavalry.service.BusinessPartition");
        producer = new kafka.javaapi.producer.Producer<String, String>(new ProducerConfig(props));
    } catch (Exception e) {
        e.printStackTrace();
    }
}
 
源代码26 项目: mod-kafka   文件: KafkaMessageProcessor.java
/**
 * Returns an initialized instance of kafka producer.
 *
 * @return initialized kafka producer
 */
private Producer createProducer() {
    final Properties props = new Properties();

    final String brokerList = getOptionalStringConfig(BROKER_LIST, DEFAULT_BROKER_LIST);
    final int requestAcks = getOptionalIntConfig(REQUEST_ACKS, DEFAULT_REQUEST_ACKS);

    props.put(BROKER_LIST, brokerList);
    props.put(SERIALIZER_CLASS, serializerType.getValue());
    props.put(REQUEST_ACKS, String.valueOf(requestAcks));
    props.put(KEY_SERIALIZER_CLASS, DEFAULT_KEY_SERIALIZER_CLASS);     // always use String serializer for the key

    return kafkaProducerFactory.createProducer(serializerType, props);
}
 
源代码27 项目: geowave   文件: StageKafkaData.java
private synchronized Producer<String, T> getProducerCreateIfNull(
    final String typeName,
    final GeoWaveAvroFormatPlugin<?, ?> plugin) {
  if (!cachedProducers.containsKey(typeName)) {
    final ProducerConfig producerConfig = new ProducerConfig(properties);

    final Producer<String, T> producer = new Producer<>(producerConfig);

    cachedProducers.put(typeName, producer);
  }
  return cachedProducers.get(typeName);
}
 
public static Producer getSingletonConnectionWithStringEncoder(StreamingConfig streamingConfig) {
    if (null == stringProducer) {
        synchronized (ConnectionManager.class) {
            Properties kafkaProperties = getBaseProducerKafkaProperties(streamingConfig);
            kafkaProperties.put("serializer.class", "kafka.serializer.stringEncoder");
            ProducerConfig config = new ProducerConfig(kafkaProperties);
            stringProducer = new Producer<String, String>(config);
        }
    }

    return stringProducer;
}
 
public static Producer getKafkaSingletonConnectionWithBinaryEncoder(StreamingConfig streamingConfig) {
    if (null == binaryProducer) {
        synchronized (ConnectionManager.class) {
            Properties kafkaProperties = getBaseProducerKafkaProperties(streamingConfig);
            kafkaProperties.put("serializer.class", "kafka.serializer.DefaultEncoder");

            ProducerConfig config = new ProducerConfig(kafkaProperties);
            binaryProducer = new Producer<String, String>(config);
        }
    }

    return binaryProducer;
}
 
源代码30 项目: koper   文件: KafkaSender.java
@Override
public void afterPropertiesSet() throws Exception {
    if (properties == null) {
        throw new IllegalArgumentException("Properties are required when init KafkaSender!");
    }
    this.producer = new Producer<>(new ProducerConfig(this.properties));
}