com.google.common.collect.Table#put ( )源码实例Demo

下面列出了com.google.common.collect.Table#put ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: tutorials   文件: GuavaTableUnitTest.java
@Test
public void givenArrayTable_whenGet_returnsSuccessfully() {
    final List<String> universityRowTable = Lists.newArrayList("Mumbai", "Harvard");
    final List<String> courseColumnTables = Lists.newArrayList("Chemical", "IT", "Electrical");
    final Table<String, String, Integer> universityCourseSeatTable = ArrayTable.create(universityRowTable, courseColumnTables);
    universityCourseSeatTable.put("Mumbai", "Chemical", 120);
    universityCourseSeatTable.put("Mumbai", "IT", 60);
    universityCourseSeatTable.put("Harvard", "Electrical", 60);
    universityCourseSeatTable.put("Harvard", "IT", 120);

    final int seatCount = universityCourseSeatTable.get("Mumbai", "IT");

    assertThat(seatCount).isEqualTo(60);
}
 
源代码2 项目: joyqueue   文件: DefaultMessageFetcher.java
@Override
public void fetchPartitionAsync(BrokerNode brokerNode, final String topic, final String app, final short partition,
                                final long index, int count, long timeout, final PartitionFetchListener listener) {
    Table<String, Short, Long> partitionTable = HashBasedTable.create();
    partitionTable.put(topic, partition, index);
    batchFetchPartitionsAsync(brokerNode, partitionTable, app, count, timeout, new BatchPartitionFetchListener() {
        @Override
        public void onMessage(Table<String, Short, FetchMessageData> fetchMessageTable) {
            FetchMessageData fetchMessageData = fetchMessageTable.get(topic, partition);
            listener.onMessage(fetchMessageData);
        }

        @Override
        public void onException(Throwable cause) {
            listener.onException(cause);
        }
    });
}
 
源代码3 项目: SPDS   文件: AbstractBoomerangResults.java
public Table<Statement, Val, W> asStatementValWeightTable(ForwardQuery query) {
    final Table<Statement, Val, W> results = HashBasedTable.create();
    WeightedPAutomaton<Statement, INode<Val>, W> callAut = queryToSolvers.getOrCreate(query).getCallAutomaton();
    for (Entry<Transition<Statement, INode<Val>>, W> e : callAut.getTransitionsToFinalWeights().entrySet()) {
        Transition<Statement, INode<Val>> t = e.getKey();
        W w = e.getValue();
        if (t.getLabel().equals(Statement.epsilon()))
            continue;
        if (t.getStart().fact().value() instanceof Local
                && !t.getLabel().getMethod().equals(t.getStart().fact().m()))
            continue;
        if (t.getLabel().getUnit().isPresent())
            results.put(t.getLabel(), t.getStart().fact(), w);
    }
    return results;
}
 
源代码4 项目: SPDS   文件: WeightedBoomerang.java
public Table<Statement, Val, W> getResults(Query seed) {
    final Table<Statement, Val, W> results = HashBasedTable.create();
    WeightedPAutomaton<Statement, INode<Val>, W> fieldAut = queryToSolvers.getOrCreate(seed).getCallAutomaton();
    for (Entry<Transition<Statement, INode<Val>>, W> e : fieldAut.getTransitionsToFinalWeights().entrySet()) {
        Transition<Statement, INode<Val>> t = e.getKey();
        W w = e.getValue();
        if (t.getLabel().equals(Statement.epsilon()))
            continue;
        if (t.getStart().fact().value() instanceof Local
                && !t.getLabel().getMethod().equals(t.getStart().fact().m()))
            continue;
        if (t.getLabel().getUnit().isPresent())
            results.put(t.getLabel(), t.getStart().fact(), w);
    }
    return results;
}
 
@Override
public FetchPartitionMessageResponse decode(JoyQueueHeader header, ByteBuf buffer) throws Exception {
    Table<String, Short, FetchPartitionMessageAckData> data = HashBasedTable.create();
    short topicSize = buffer.readShort();
    for (int i = 0; i < topicSize; i++) {
        String topic = Serializer.readString(buffer, Serializer.SHORT_SIZE);
        int partitionSize = buffer.readShort();
        for (int j = 0; j < partitionSize; j++) {
            short partition = buffer.readShort();
            short messageSize = buffer.readShort();
            List<BrokerMessage> messages = Lists.newArrayListWithCapacity(messageSize);
            for (int k = 0; k < messageSize; k++) {
                messages.add(Serializer.readBrokerMessage(buffer));
            }
            JoyQueueCode code = JoyQueueCode.valueOf(buffer.readInt());
            FetchPartitionMessageAckData fetchPartitionMessageAckData = new FetchPartitionMessageAckData(messages, code);
            data.put(topic, partition, fetchPartitionMessageAckData);
        }
    }

    FetchPartitionMessageResponse fetchPartitionMessageResponse = new FetchPartitionMessageResponse();
    fetchPartitionMessageResponse.setData(data);
    return fetchPartitionMessageResponse;
}
 
源代码6 项目: Alink   文件: AnnotationUtils.java
@SuppressWarnings("unchecked")
private static Table<String, IOType, Wrapper<AlgoOperator>> loadIoOpClasses() {
    Reflections reflections = new Reflections("com.alibaba.alink");
    Table<String, IOType, Wrapper<AlgoOperator>> table = HashBasedTable.create();
    for (Class<?> clazz : reflections.getTypesAnnotatedWith(IoOpAnnotation.class)) {
        if (!AlgoOperator.class.isAssignableFrom(clazz)) {
            LOG.error("Class annotated with @IoOpAnnotation should be subclass of AlgoOperator: {}",
                    clazz.getCanonicalName());
            continue;
        }

        IoOpAnnotation annotation = clazz.getAnnotation(IoOpAnnotation.class);
        String name = annotation.name();
        IOType ioType = annotation.ioType();
        boolean hasTimestamp = annotation.hasTimestamp();

        Wrapper<AlgoOperator> origin = table.put(name, ioType,
                new Wrapper<>((Class<? extends AlgoOperator>) clazz, hasTimestamp));


        if (origin != null) {
            LOG.error("Multiple IO Operator class with same name {} and IOType: {}: {} and {}",
                    name, ioType, origin.clazz.getCanonicalName(), clazz.getCanonicalName());
        }
    }

    return ImmutableTable.copyOf(table);
}
 
源代码7 项目: chronus   文件: ExecutorServiceImpl.java
@Override
public List<TaskAssignResultEntity> addNewTask(List<TaskAssignResultEntity> taskAssignResultEntityList, Table<String, Integer, TaskAssignResultEntity> loadCreateErrorTable) {
    for (TaskAssignResultEntity taskAssignResultEntity : taskAssignResultEntityList) {
        if (taskAssignResultEntity.getAssignNum() <= 0) {
            continue;
        }
        TaskManager taskManager = taskManagerTable.get(taskAssignResultEntity.getTaskName(), taskAssignResultEntity.getSeqNo());
        if (taskManager == null) {
            TaskEntity taskEntity = taskService.selectTaskInfoByTaskName(currentNode.getCluster(), taskAssignResultEntity.getTaskName());
            if (taskEntity == null) {
                continue;
            }
            try {
                taskManager = taskManagerFactory.create(taskEntity, taskAssignResultEntity);
                taskManagerTable.put(taskAssignResultEntity.getTaskName(), taskAssignResultEntity.getSeqNo(), taskManager);
                taskAssignResultEntity.setState(TaskCreateStateEnum.SUCC.getState());
                log.info("TaskName:{}, seqNo:{}, taskItems:{} 已创建!", taskAssignResultEntity.getTaskName(), taskAssignResultEntity.getSeqNo(), taskAssignResultEntity.getTaskItems());
            } catch (Exception e) {
                log.error("创建任务管理器异常,currentNode:{},taskAssignResultEntity:{}", currentNode, taskAssignResultEntity, e);
                taskAssignResultEntity.setState(TaskCreateStateEnum.FAIL.getState());
                loadCreateErrorTable.put(taskAssignResultEntity.getTaskName(), taskAssignResultEntity.getSeqNo(), taskAssignResultEntity);
            }
        } else {
            log.error("创建任务管理器异常,当前id已存在!,taskAssignResultEntity:{}", taskAssignResultEntity);
        }
    }
    return taskAssignResultEntityList;
}
 
源代码8 项目: tracecompass   文件: TmfEventMatching.java
private void processDependency(@NonNull IEventMatchingKey eventKey, @NonNull TmfEventDependency dep) {
    getProcessingUnit().addMatch(eventKey, dep);
    String sourceHost = dep.getSource().getTrace().getHostId();
    String destHost = dep.getDestination().getTrace().getHostId();
    Table<String, String, TmfEventDependency> lastMatches = getLastMatchTable(eventKey);
    lastMatches.put(sourceHost, destHost, dep);

    // Do some cleanup of events waiting to be matched
    cleanupList(eventKey, lastMatches.row(sourceHost), dep.getSource(), evDep -> evDep.getSource().getTimestamp().toNanos(), fUnmatchedOut);
    cleanupList(eventKey, lastMatches.column(destHost), dep.getDestination(), evDep -> evDep.getDestination().getTimestamp().toNanos(), fUnmatchedIn);
}
 
源代码9 项目: EasySRL   文件: SeenRules.java
private int addToTable(final Table<Category, Category, List<RuleProduction>> tab, int maxID, final Category left,
		final Category right) {
	// Check if any rules can apply to this pair of categories.
	List<RuleProduction> combinators = Combinator.getRules(left, right, Combinator.STANDARD_COMBINATORS);
	if (combinators.size() == 0) {
		return maxID;
	} else if (combinators.size() == 1) {
		combinators = Collections.singletonList(combinators.get(0));
	}
	maxID = Math.max(left.getID(), maxID);
	maxID = Math.max(right.getID(), maxID);
	tab.put(left, right, combinators);
	return maxID;
}
 
源代码10 项目: joyqueue   文件: BrokerMessageConverter.java
public static Table<String, Short, FetchMessageData> convert(String app, Table<String, Short, FetchPartitionMessageAckData> topicMessageTable) {
    Table<String, Short, FetchMessageData> result = HashBasedTable.create();
    if (topicMessageTable == null || topicMessageTable.isEmpty()) {
        return result;
    }
    for (Map.Entry<String, Map<Short, FetchPartitionMessageAckData>> topicEntry : topicMessageTable.rowMap().entrySet()) {
        String topic = topicEntry.getKey();
        Map<Short, FetchPartitionMessageAckData> partitionMap = topicEntry.getValue();
        for (Map.Entry<Short, FetchPartitionMessageAckData> partitionEntry : partitionMap.entrySet()) {
            result.put(topic, partitionEntry.getKey(),
                    new FetchMessageData(convert(topic, app, partitionEntry.getValue().getMessages()), partitionEntry.getValue().getCode()));
        }
    }
    return result;
}
 
源代码11 项目: joyqueue   文件: ConsumerLocalIndexStore.java
protected Table<String, String, Map<Short, LocalIndexData>> doRead(File persistFile) throws Exception {
    String json = FileUtils.readFileToString(persistFile);
    Table<String, String, Map<Short, LocalIndexData>> result = HashBasedTable.create();

    if (StringUtils.isBlank(json)) {
        return result;
    }

    Gson gson = new GsonBuilder().create();
    Map<String, Map<String, Map<String, Map<String, Object>>>> map = gson.fromJson(json, Map.class);

    for (Map.Entry<String, Map<String, Map<String, Map<String, Object>>>> appEntry : map.entrySet()) {
        String app = appEntry.getKey();

        for (Map.Entry<String, Map<String, Map<String, Object>>> topicEntry : appEntry.getValue().entrySet()) {
            String topic = topicEntry.getKey();
            Map<Short, LocalIndexData> partitions = Maps.newHashMap();

            for (Map.Entry<String, Map<String, Object>> partitionEntry : topicEntry.getValue().entrySet()) {
                short partition = Short.valueOf(partitionEntry.getKey());
                Map<String, Object> values = partitionEntry.getValue();

                long index = Double.valueOf(String.valueOf(values.get("index"))).longValue();
                long updateTime = Double.valueOf(String.valueOf(values.get("updateTime"))).longValue();
                long createTime = Double.valueOf(String.valueOf(values.get("createTime"))).longValue();
                partitions.put(partition, new LocalIndexData(index, updateTime, createTime));
            }

            result.put(app, topic, partitions);
        }
    }
    return result;
}
 
源代码12 项目: tutorials   文件: GuavaTableUnitTest.java
@Test
public void givenTable_whenColumnMap_returnsSuccessfully() {
    final Table<String, String, Integer> universityCourseSeatTable = HashBasedTable.create();
    universityCourseSeatTable.put("Mumbai", "Chemical", 120);
    universityCourseSeatTable.put("Mumbai", "IT", 60);
    universityCourseSeatTable.put("Harvard", "Electrical", 60);
    universityCourseSeatTable.put("Harvard", "IT", 120);

    final Map<String, Map<String, Integer>> courseKeyUniversitySeatMap = universityCourseSeatTable.columnMap();

    assertThat(courseKeyUniversitySeatMap).hasSize(3);
    assertThat(courseKeyUniversitySeatMap.get("IT")).hasSize(2);
    assertThat(courseKeyUniversitySeatMap.get("Electrical")).hasSize(1);
    assertThat(courseKeyUniversitySeatMap.get("Chemical")).hasSize(1);
}
 
源代码13 项目: joyqueue   文件: ConsumerClient.java
protected FetchPartitionMessageRequest buildPartitionTopicMessageCommand(Map<String, Short> partitions, String app, int count) {
    Table<String, Short, FetchPartitionMessageData> partitionMap = HashBasedTable.create();
    for (Map.Entry<String, Short> entry : partitions.entrySet()) {
        partitionMap.put(entry.getKey(), entry.getValue(), new FetchPartitionMessageData(count, FetchPartitionMessageRequest.NONE_INDEX));
    }

    FetchPartitionMessageRequest fetchPartitionMessageRequest = new FetchPartitionMessageRequest();
    fetchPartitionMessageRequest.setPartitions(partitionMap);
    fetchPartitionMessageRequest.setApp(app);
    return fetchPartitionMessageRequest;
}
 
源代码14 项目: joyqueue   文件: ConsumerClient.java
protected FetchPartitionMessageRequest buildPartitionTopicMessageCommand(Table<String, Short, Long> partitions, String app, int count) {
    Table<String, Short, FetchPartitionMessageData> partitionMap = HashBasedTable.create();
    for (Map.Entry<String, Map<Short, Long>> topicEntry : partitions.rowMap().entrySet()) {
        String topic = topicEntry.getKey();
        for (Map.Entry<Short, Long> partitionEntry : topicEntry.getValue().entrySet()) {
            partitionMap.put(topic, partitionEntry.getKey(), new FetchPartitionMessageData(count, partitionEntry.getValue()));
        }
    }

    FetchPartitionMessageRequest fetchPartitionMessageRequest = new FetchPartitionMessageRequest();
    fetchPartitionMessageRequest.setPartitions(partitionMap);
    fetchPartitionMessageRequest.setApp(app);
    return fetchPartitionMessageRequest;
}
 
源代码15 项目: blueflood   文件: DMetadataIO.java
@Override
public Table<Locator, String, String> getAllValues( Set<Locator> locators ) throws IOException {

    Timer.Context ctx = Instrumentation.getReadTimerContext( CassandraModel.CF_METRICS_METADATA_NAME );

    Session session = DatastaxIO.getSession();

    try {

        Map<Locator, ResultSetFuture> futures = new HashMap<Locator, ResultSetFuture>();

        for( Locator l : locators) {

            BoundStatement bound = getValue.bind( l.toString() );

            futures.put( l, session.executeAsync( bound ) );
        }

        Table<Locator, String, String> metaTable = HashBasedTable.create();

        for( Map.Entry<Locator, ResultSetFuture> future : futures.entrySet() ) {

            try {
                ResultSet result = future.getValue().getUninterruptibly();


                for ( Row row : result ) {
                    if ( LOG.isTraceEnabled() ) {
                        LOG.trace( "Read metrics_metadata: " +
                                row.getString( KEY ) +
                                row.getString( COLUMN1 ) +
                                serDes.deserialize( row.getBytes( VALUE ) ) );
                    }

                    metaTable.put( Locator.createLocatorFromDbKey( row.getString( KEY ) ), row.getString( COLUMN1 ), serDes.deserialize( row.getBytes( VALUE ) ) );
                }
            }
            catch (Exception e ) {

                Instrumentation.markReadError();
                LOG.error( String.format( "error accessing metadatat for %s", future.getKey() ), e );
            }
        }

        return metaTable;
    }
    finally {
        ctx.stop();
    }
}
 
源代码16 项目: tracecompass   文件: StateSystemDataProvider.java
@Override
public @NonNull TmfModelResponse<@NonNull TimeGraphModel> fetchRowModel(Map<String, Object> fetchParameters, @Nullable IProgressMonitor monitor) {
    Table<ITmfStateSystem, Integer, Long> table = HashBasedTable.create();
    // Get the quarks to display
    Collection<Long> selectedItems = DataProviderParameterUtils.extractSelectedItems(fetchParameters);
    synchronized (fEntryBuilder) {
        if (selectedItems == null) {
            // No selected items, take them all
            selectedItems = fIDToDisplayQuark.keySet();
        }
        for (Long id : selectedItems) {
            Pair<ITmfStateSystem, Integer> pair = fIDToDisplayQuark.get(id);
            if (pair != null) {
                table.put(pair.getFirst(), pair.getSecond(), id);
            }
        }
    }
    List<@NonNull ITimeGraphRowModel> allRows = new ArrayList<>();
    try {
        List<Long> times = DataProviderParameterUtils.extractTimeRequested(fetchParameters);
        for (Entry<ITmfStateSystem, Map<Integer, Long>> ssEntry : table.rowMap().entrySet()) {
            ITmfStateSystem ss = Objects.requireNonNull(ssEntry.getKey());
            List<@NonNull ITimeGraphRowModel> rows = getRowModels(ss, ssEntry.getValue(), times, fetchParameters, monitor);
            if (monitor != null && monitor.isCanceled()) {
                return new TmfModelResponse<>(null, Status.CANCELLED, CommonStatusMessage.TASK_CANCELLED);
            }
            synchronized (fEntryBuilder) {
                // Add the SS
                Long ssId = fSsToId.get(ss);
                if (ssId != null && selectedItems.contains(ssId)) {
                    TimeGraphRowModel ssRow = new TimeGraphRowModel(ssId, new ArrayList<>());
                    List<@NonNull ITimeGraphState> states = ssRow.getStates();
                    states.add(new TimeGraphState(ss.getStartTime(), ss.getCurrentEndTime() - ss.getStartTime(), Integer.MAX_VALUE));
                    rows.add(ssRow);
                }
            }
            allRows.addAll(rows);
        }

        synchronized (fEntryBuilder) {
            for (ModuleEntryModel module : fModuleEntryModelList) {
                if (selectedItems.contains(module.getId())) {
                    allRows.add(getModuleRowModels(module));
                }
            }
        }
        if (monitor != null && monitor.isCanceled()) {
            return new TmfModelResponse<>(null, Status.CANCELLED, CommonStatusMessage.TASK_CANCELLED);
        }
        return new TmfModelResponse<>(new TimeGraphModel(allRows), Status.COMPLETED, CommonStatusMessage.COMPLETED);
    } catch (IndexOutOfBoundsException | TimeRangeException | StateSystemDisposedException e) {
        return new TmfModelResponse<>(null, Status.FAILED, CommonStatusMessage.STATE_SYSTEM_FAILED);
    }
}
 
源代码17 项目: twill   文件: YarnTwillRunnerService.java
/**
 * Creates a {@link Runnable} for renewing {@link SecureStore} for running applications.
 *
 * @param scheduler the schedule to schedule next renewal execution
 * @param renewer the {@link SecureStoreRenewer} to use for renewal
 * @param retryRuns if non-empty, only the given set of application name and run id that need to have
 *                  secure store renewed; if empty, renew all running applications
 * @param retryDelay the delay before retrying applications that are failed to have secure store renewed
 * @param timeUnit the unit for the {@code delay} and {@code failureDelay}.
 * @return a {@link Runnable}
 */
private Runnable createSecureStoreUpdateRunnable(final ScheduledExecutorService scheduler,
                                                 final SecureStoreRenewer renewer,
                                                 final Multimap<String, RunId> retryRuns,
                                                 final long retryDelay, final TimeUnit timeUnit) {
  return new Runnable() {
    @Override
    public void run() {
      // Collects the set of running application runs
      Table<String, RunId, YarnTwillController> liveApps;

      synchronized (YarnTwillRunnerService.this) {
        if (retryRuns.isEmpty()) {
          liveApps = HashBasedTable.create(controllers);
        } else {
          // If this is a renew retry, only renew the one in the retryRuns set
          liveApps = HashBasedTable.create();
          for (Table.Cell<String, RunId, YarnTwillController> cell : controllers.cellSet()) {
            if (retryRuns.containsEntry(cell.getRowKey(), cell.getColumnKey())) {
              liveApps.put(cell.getRowKey(), cell.getColumnKey(), cell.getValue());
            }
          }
        }
      }

      Multimap<String, RunId> failureRenews = renewSecureStore(liveApps, renewer, false);

      if (!failureRenews.isEmpty()) {
        // If there are failure during the renewal, schedule a retry with a new Runnable.
        LOG.info("Schedule to retry on secure store renewal for applications {} in {} {}",
                 failureRenews.keySet(), retryDelay, timeUnit.name().toLowerCase());
        try {
          scheduler.schedule(
            createSecureStoreUpdateRunnable(scheduler, renewer, failureRenews, retryDelay, timeUnit),
            retryDelay, timeUnit);
        } catch (RejectedExecutionException e) {
          // If the renewal is stopped, the scheduler will be stopped,
          // hence this exception will be thrown and can be safely ignore.
        }
      }
    }
  };
}
 
源代码18 项目: nifi   文件: AbstractEnrichProcessor.java
/**
 * This method returns the parsed record string in the form of
 * a map of two strings, consisting of a iteration aware attribute
 * names and its values
 *

 * @param  rawResult the raw query results to be parsed
 * @param queryParser The parsing mechanism being used to parse the data into groups
 * @param queryRegex The regex to be used to split the query results into groups. The regex MUST implement at least on named capture group "KEY" to be used to populate the table rows
 * @param lookupKey The regular expression number or the column of a split to be used for matching
 * @return  Table with attribute names and values where each Table row uses the value of the KEY named capture group specified in @param queryRegex
 */
protected Table<String, String, String> parseBatchResponse(String rawResult, String queryParser, String queryRegex, int lookupKey, String schema) {
    // Note the hardcoded record0.
    //  Since iteration is done within the parser and Multimap is used, the record number here will always be 0.
    // Consequentially, 0 is hardcoded so that batched and non batched attributes follow the same naming
    // conventions
    final String recordPosition = ".record0";

    final Table<String, String, String> results = HashBasedTable.create();

    switch (queryParser) {
        case "Split":
            Scanner scanner = new Scanner(rawResult);
            while (scanner.hasNextLine()) {
                String line = scanner.nextLine();
                // Time to Split the results...
                String[] splitResult = line.split(queryRegex);

                for (int r = 0; r < splitResult.length; r++) {
                    results.put(splitResult[ lookupKey - 1 ], "enrich." + schema + recordPosition + ".group" + String.valueOf(r), splitResult[r]);
                }
            }
            break;
        case "RegEx":
        // prepare the regex
        Pattern p;
        // Regex is multiline. Each line should include a KEY for lookup
        p = Pattern.compile(queryRegex, Pattern.MULTILINE);

        Matcher matcher = p.matcher(rawResult);
        while (matcher.find()) {
            try {
                // Note that RegEx matches capture group 0 is usually broad but starting with it anyway
                // for the sake of purity
                for (int r = 0; r <= matcher.groupCount(); r++) {
                    results.put(matcher.group(lookupKey), "enrich." + schema + recordPosition + ".group" + String.valueOf(r), matcher.group(r));
                }
            } catch (IndexOutOfBoundsException e) {
                getLogger().warn("Could not find capture group {} while processing result. You may want to review your " +
                        "Regular Expression to match against the content \"{}\"", new Object[]{lookupKey, rawResult});
            }
        }
        break;
    }

    return results;
}
 
源代码19 项目: joyqueue   文件: DefaultConsumerIndexManager.java
protected Map<BrokerNode, Table<String, Short, List<CommitAckData>>> buildCommitAckParams(Map<String, List<ConsumeReply>> ackMap, String app) {
    Map<BrokerNode, Table<String, Short, List<CommitAckData>>> result = Maps.newHashMap();

    for (Map.Entry<String, List<ConsumeReply>> entry : ackMap.entrySet()) {
        String topic = entry.getKey();
        TopicMetadata topicMetadata = clusterManager.fetchTopicMetadata(topic, app);

        if (topicMetadata == null) {
            logger.warn("topic {} metadata is null", topic);
            continue;
        }

        for (ConsumeReply consumeReply : entry.getValue()) {
            PartitionMetadata partitionMetadata = topicMetadata.getPartition(consumeReply.getPartition());
            if (partitionMetadata == null) {
                partitionMetadata = topicMetadata.getPartitions().get(0);
            }

            BrokerNode leader = partitionMetadata.getLeader();
            if (leader == null) {
                logger.warn("topic {}, partition {}, leader is null", topic, consumeReply.getPartition());
                continue;
            }

            Table<String, Short, List<CommitAckData>> topicConsumeAckTable = result.get(leader);
            if (topicConsumeAckTable == null) {
                topicConsumeAckTable = HashBasedTable.create();
                result.put(leader, topicConsumeAckTable);
            }

            List<CommitAckData> commitAckList = topicConsumeAckTable.get(topic, consumeReply.getPartition());
            if (commitAckList == null) {
                commitAckList = Lists.newLinkedList();
                topicConsumeAckTable.put(topic, consumeReply.getPartition(), commitAckList);
            }

            commitAckList.add(new CommitAckData(consumeReply.getPartition(), consumeReply.getIndex(), consumeReply.getRetryType()));
        }
    }

    return result;
}
 
源代码20 项目: cuba   文件: EntitySerialization.java
protected JsonObject serializeEntity(Entity entity, @Nullable View view, Set<Entity> cyclicReferences) {
    JsonObject jsonObject = new JsonObject();
    MetaClass metaClass = entity.getMetaClass();
    if (!metadataTools.isEmbeddable(metaClass)) {
        jsonObject.addProperty(ENTITY_NAME_PROP, metaClass.getName());
        if (serializeInstanceName) {
            String instanceName = null;
            try {
                instanceName = metadataTools.getInstanceName(entity);
            } catch (Exception ignored) {
                // todo trace logging
            }
            jsonObject.addProperty(INSTANCE_NAME_PROP, instanceName);
        }
        writeIdField(entity, jsonObject);
        if (compactRepeatedEntities) {
            Table<Object, MetaClass, Entity> processedObjects = context.get().getProcessedEntities();
            if (processedObjects.get(entity.getId(), metaClass) == null) {
                processedObjects.put(entity.getId(), metaClass, entity);
                writeFields(entity, jsonObject, view, cyclicReferences);
            }
        } else {
            if (!cyclicReferences.contains(entity)) {
                cyclicReferences.add(entity);
                writeFields(entity, jsonObject, view, cyclicReferences);
            }
        }
    } else {
        writeFields(entity, jsonObject, view, cyclicReferences);
    }

    if (globalConfig.getRestRequiresSecurityToken()) {
        if (entity instanceof BaseGenericIdEntity || entity instanceof EmbeddableEntity) {
            SecurityState securityState = getSecurityState(entity);
            if (securityState != null) {
                byte[] securityToken = getSecurityToken(securityState);
                if (securityToken != null) {
                    jsonObject.addProperty("__securityToken", Base64.getEncoder().encodeToString(securityToken));
                }
            }
        }
    }

    return jsonObject;
}