下面列出了怎么用com.google.common.collect.MigrateMap的API类实例代码及写法,或者点击链接到github查看源代码。
public void start() {
super.start();
batches = MigrateMap.makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>() {
public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity) {
return MemoryClientIdentityBatch.create(clientIdentity);
}
});
cursors = new MapMaker().makeMap();
destinations = MigrateMap.makeComputingMap(new Function<String, List<ClientIdentity>>() {
public List<ClientIdentity> apply(String destination) {
return Lists.newArrayList();
}
});
}
public void start() {
if (!isStart()) {
super.start();
// 如果存在provider,则启动metrics service
loadCanalMetrics();
metrics.setServerPort(metricsPort);
metrics.initialize();
canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() {
public CanalInstance apply(String destination) {
return canalInstanceGenerator.generate(destination);
}
});
// lastRollbackPostions = new MapMaker().makeMap();
}
}
public FileMixedLogPositionManager(File dataDir, long period, MemoryLogPositionManager memoryLogPositionManager){
if (dataDir == null) {
throw new NullPointerException("null dataDir");
}
if (period <= 0) {
throw new IllegalArgumentException("period must be positive, given: " + period);
}
if (memoryLogPositionManager == null) {
throw new NullPointerException("null memoryLogPositionManager");
}
this.dataDir = dataDir;
this.period = period;
this.memoryLogPositionManager = memoryLogPositionManager;
this.dataFileCaches = MigrateMap.makeComputingMap(new Function<String, File>() {
public File apply(String destination) {
return getDataFile(destination);
}
});
this.executorService = Executors.newScheduledThreadPool(1);
this.persistTasks = Collections.synchronizedSet(new HashSet<String>());
}
@Override
public void start() {
super.start();
cursors = MigrateMap.makeComputingMap(new Function<ClientIdentity, Position>() {
@Override
public Position apply(ClientIdentity clientIdentity) {
MysqlReaderPosition readerPosition = taskPositionManager.getPosition(canalReader.getReaderType());
if (readerPosition == null) {
return nullCursor; // 返回一个空对象标识,避免出现异常
} else {
return taskPositionManager.translateToLogPosition(readerPosition);
}
}
});
destinations = MigrateMap.makeComputingMap(new Function<String, List<ClientIdentity>>() {
@Override
public List<ClientIdentity> apply(String destination) {
logger.info("ClientIdentity is initialized for destination {}.", destination);
return Lists.newArrayList(new ClientIdentity(destination, (short) 1001, ""));
}
});
}
protected void doApply(List<Record> records) {
Map<List<String>, List<Record>> buckets = MigrateMap.makeComputingMap(new Function<List<String>, List<Record>>() {
public List<Record> apply(List<String> names) {
return Lists.newArrayList();
}
});
// 根据目标库的不同,划分为多个bucket
for (Record record : records) {
buckets.get(Arrays.asList(record.getSchemaName(), record.getTableName())).add(record);
}
JdbcTemplate jdbcTemplate = new JdbcTemplate(context.getTargetDs());
for (final List<Record> batchRecords : buckets.values()) {
TableSqlUnit sqlUnit = getSqlUnit(batchRecords.get(0));
if (context.isBatchApply()) {
applierByBatch(jdbcTemplate, batchRecords, sqlUnit);
} else {
applyOneByOne(jdbcTemplate, batchRecords, sqlUnit);
}
}
}
public void start() {
super.start();
dbType = YuGongUtils.judgeDbType(context.getTargetDs());
tableCache = MigrateMap.makeComputingMap(new Function<List<String>, Table>() {
public Table apply(List<String> names) {
if (names.size() != 2) {
throw new YuGongException("names[" + names.toString() + "] is not valid");
}
return TableMetaGenerator.getTableMeta(context.getTargetDs(),
context.isIgnoreSchema() ? null : names.get(0),
names.get(1));
}
});
selectSqlCache = MigrateMap.makeMap();
}
protected void doApply(List<Record> records) {
Map<List<String>, List<Record>> buckets = MigrateMap.makeComputingMap(new Function<List<String>, List<Record>>() {
public List<Record> apply(List<String> names) {
return Lists.newArrayList();
}
});
// 根据目标库的不同,划分为多个bucket
for (Record record : records) {
buckets.get(Arrays.asList(record.getSchemaName(), record.getTableName())).add(record);
}
JdbcTemplate jdbcTemplate = new JdbcTemplate(context.getTargetDs());
for (final List<Record> batchRecords : buckets.values()) {
List<Record> queryRecords = null;
if (context.isBatchApply()) {
queryRecords = queryByBatch(jdbcTemplate, batchRecords);
} else {
queryRecords = queryOneByOne(jdbcTemplate, batchRecords);
}
diff(batchRecords, queryRecords);
}
}
public void start() {
super.start();
batches = MigrateMap.makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>() {
public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity) {
return MemoryClientIdentityBatch.create(clientIdentity);
}
});
cursors = new MapMaker().makeMap();
destinations = MigrateMap.makeComputingMap(new Function<String, List<ClientIdentity>>() {
public List<ClientIdentity> apply(String destination) {
return Lists.newArrayList();
}
});
}
public void start() {
if (!isStart()) {
super.start();
// 如果存在provider,则启动metrics service
loadCanalMetrics();
metrics.setServerPort(metricsPort);
metrics.initialize();
canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() {
public CanalInstance apply(String destination) {
return canalInstanceGenerator.generate(destination);
}
});
// lastRollbackPostions = new MapMaker().makeMap();
}
}
public FileMixedLogPositionManager(File dataDir, long period, MemoryLogPositionManager memoryLogPositionManager){
if (dataDir == null) {
throw new NullPointerException("null dataDir");
}
if (period <= 0) {
throw new IllegalArgumentException("period must be positive, given: " + period);
}
if (memoryLogPositionManager == null) {
throw new NullPointerException("null memoryLogPositionManager");
}
this.dataDir = dataDir;
this.period = period;
this.memoryLogPositionManager = memoryLogPositionManager;
this.dataFileCaches = MigrateMap.makeComputingMap(new Function<String, File>() {
public File apply(String destination) {
return getDataFile(destination);
}
});
this.executorService = Executors.newScheduledThreadPool(1);
this.persistTasks = Collections.synchronizedSet(new HashSet<String>());
}
public void start() {
super.start();
dbType = YuGongUtils.judgeDbType(context.getTargetDs());
insertSqlCache = MigrateMap.makeMap();
updateSqlCache = MigrateMap.makeMap();
deleteSqlCache = MigrateMap.makeMap();
}
public void start() {
super.start();
dataSources = MigrateMap.makeComputingMap(new Function<DataSourceConfig, DataSource>() {
public DataSource apply(DataSourceConfig config) {
return createDataSource(config.getUrl(),
config.getUsername(),
config.getPassword(),
config.getType(),
config.getProperties());
}
});
}
public void start() {
super.start();
dbType = YuGongUtils.judgeDbType(context.getTargetDs());
applierSqlCache = MigrateMap.makeMap();
}