下面列出了怎么用org.springframework.data.redis.core.RedisCallback的API类实例代码及写法,或者点击链接到github查看源代码。
public long calculateDAU(Date start, Date end) {
if (start == null || end == null) {
throw new IllegalArgumentException("参数不能为空!");
}
// 整理该日期范围内的key
List<byte[]> keyList = new ArrayList<>();
Calendar calendar = Calendar.getInstance();
calendar.setTime(start);
while (!calendar.getTime().after(end)) {
String key = RedisKeyUtil.getDAUKey(df.format(calendar.getTime()));
keyList.add(key.getBytes());
calendar.add(Calendar.DATE, 1);
}
// 进行OR运算
return (long) redisTemplate.execute(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
String redisKey = RedisKeyUtil.getDAUKey(df.format(start), df.format(end));
connection.bitOp(RedisStringCommands.BitOperation.OR,
redisKey.getBytes(), keyList.toArray(new byte[0][0]));
return connection.bitCount(redisKey.getBytes());
}
});
}
public void on(SmsVerificationCodeMismatchEvent event) {
String key = toKey(event.getMobile(), event.getScope());
List<Object> attempts = redisTemplate.executePipelined((RedisCallback<Long>) connection -> {
StringRedisConnection conn = (StringRedisConnection) connection;
conn.sAdd(key, event.toString());
long expires = Duration.between(event.getWhen(), event.getExpiresAt()).getSeconds();
conn.expire(key, expires);
conn.sCard(key);
return null;
});
log.debug("Got Redis pipeline {}",
attempts.stream().map(Object::toString).collect(joining(DELIMITER)));
if (attempts.size() == 3) {
if (toAttempts(attempts) >= threshold) {
log.info("Too many failure verification attempts for {} {}", event.getMobile(), event.getScope());
remove(key);
domainEventPublisher.publish(new TooManyFailureSmsVerificationAttemptsEvent(UUID.randomUUID().toString(),
clock.now(),
event.getMobile(),
event.getScope()));
}
}
}
@RequestMapping("/memoryInfo")
@ResponseBody
public String getMemoryInfo() {
Map<String, Object> map = new HashMap<>();
Object o = redisTemplate.execute(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
return connection.info("memory").get("used_memory");
}
});
map.put("used_memory", o);
map.put("create_time", System.currentTimeMillis());
return JSON.toJSONString(map);
}
/**
* 保存用户验证码,和randomStr绑定
* @param deviceId
* 客户端生成
* @param imageCode
* 验证码信息
*/
@Override
public void saveImageCode(String deviceId, String imageCode) {
String text = imageCode.toLowerCase().toString();
redisTemplate.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
// redis info
connection.set(buildKey(deviceId).getBytes(), imageCode.getBytes());
connection.expire(buildKey(deviceId).getBytes(), 60*5);
connection.close();
return "";
}
});
}
/**
* 获取验证码
* @param deviceId
* 前端唯一标识/手机号
*/
@Override
public String getCode(String deviceId) {
String code = "" ;
try {
code = redisTemplate.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
// redis info
byte[] temp = "".getBytes();
temp = connection.get(buildKey(deviceId).getBytes()) ;
connection.close();
return new String(temp);
}
});
} catch (Exception e) {
throw new AuthenticationException("验证码不存在"){};
}
return code ;
}
@Override
protected OAuth2Authentication remove(final String code) {
OAuth2Authentication oAuth2Authentication = redisTemplate.execute(new RedisCallback<OAuth2Authentication>() {
@Override
public OAuth2Authentication doInRedis(RedisConnection connection) throws DataAccessException {
byte[] keyByte = codeKey(code).getBytes();
byte[] valueByte = connection.get(keyByte);
if (valueByte != null) {
connection.del(keyByte);
return SerializationUtils.deserialize(valueByte);
}
return null;
}
});
return oAuth2Authentication;
}
private boolean setRedis(final String key, final long expire) {
try {
boolean status = redisTemplate.execute((RedisCallback<Boolean>) connection -> {
String uuid = UUID.randomUUID().toString();
lockFlag.set(uuid);
byte[] keyByte = redisTemplate.getStringSerializer().serialize(key);
byte[] uuidByte = redisTemplate.getStringSerializer().serialize(uuid);
boolean result = connection.set(keyByte, uuidByte, Expiration.from(expire, TimeUnit.MILLISECONDS), RedisStringCommands.SetOption.ifAbsent());
return result;
});
return status;
} catch (Exception e) {
log.error("set redisDistributeLock occured an exception", e);
}
return false;
}
/**
* 释放锁
* @param key 锁的key
* @return 成功/失败
*/
public boolean releaseLock(String key) {
// 释放锁的时候,有可能因为持锁之后方法执行时间大于锁的有效期,此时有可能已经被另外一个线程持有锁,所以不能直接删除
try {
// 使用lua脚本删除redis中匹配value的key,可以避免由于方法执行时间过长而redis锁自动过期失效的时候误删其他线程的锁
// spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常,所以只能拿到原redis的connection来执行脚本
Boolean result = redisTemplate.execute((RedisCallback<Boolean>) connection -> {
byte[] scriptByte = redisTemplate.getStringSerializer().serialize(UNLOCK_LUA);
return connection.eval(scriptByte, ReturnType.BOOLEAN, 1
, redisTemplate.getStringSerializer().serialize(key)
, redisTemplate.getStringSerializer().serialize(lockFlag.get()));
});
return result;
} catch (Exception e) {
log.error("release redisDistributeLock occured an exception", e);
} finally {
lockFlag.remove();
}
return false;
}
@Override
public void addRecords(HostAndPort hostAndPort, List<StoreRecord> records) {
template.executePipelined((RedisCallback<Void>) connection -> {
for (StoreRecord record : records) {
byte[] keyName = getKeyName(hostAndPort.toInstanceId(), record.getSchemeName())
.getBytes(Charset.defaultCharset());
byte[] value = JsonUtils.toJsonString(record)
.getBytes(Charset.defaultCharset());
long score = record.getTimestamp();
long expire = score - timeoutTtl;
connection.zRemRangeByScore(keyName, 0, expire); // Expire timeout record
connection.zAdd(keyName, score, value);
}
return null;
});
}
/**
* 根据key获取对象
*
* @param keyPatten the key patten
* @return the keys values
*/
public Map<String, Object> getKeysValues(final String keyPatten) {
log.debug("[redisTemplate redis] getValues() patten={} ", keyPatten);
return redisTemplate.execute((RedisCallback<Map<String, Object>>) connection -> {
RedisSerializer<String> serializer = getRedisSerializer();
Map<String, Object> maps = new HashMap<>(16);
Set<String> keys = redisTemplate.keys(keyPatten + "*");
if (!CollectionUtils.isEmpty(keys)) {
for (String key : keys) {
byte[] bKeys = serializer.serialize(key);
byte[] bValues = connection.get(bKeys);
Object value = OBJECT_SERIALIZER.deserialize(bValues);
maps.put(key, value);
}
}
return maps;
});
}
private boolean setRedis(final String key, final long expire) {
try {
boolean status = redisTemplate.execute((RedisCallback<Boolean>) connection -> {
String uuid = UUID.randomUUID().toString();
lockFlag.set(uuid);
byte[] keyByte = redisTemplate.getStringSerializer().serialize(key);
byte[] uuidByte = redisTemplate.getStringSerializer().serialize(uuid);
boolean result = connection.set(keyByte, uuidByte, Expiration.from(expire, TimeUnit.MILLISECONDS), RedisStringCommands.SetOption.ifAbsent());
return result;
});
return status;
} catch (Exception e) {
log.error("set redisDistributeLock occured an exception", e);
}
return false;
}
@Test
public void getWaitingTasks() {
doReturn(redisTemplate).when(stringRqueueRedisTemplate).getRedisTemplate();
doReturn(queueConfigList).when(rqueueSystemManagerService).getSortedQueueConfigs();
doReturn(Arrays.asList(100L, 110L))
.when(redisTemplate)
.executePipelined(any(RedisCallback.class));
List<List<Object>> response = rqueueQDetailService.getWaitingTasks();
assertEquals(3, response.size());
List<Object> headers = Arrays.asList("Queue", "Queue [LIST]", "Size");
List<Object> row = Arrays.asList(queueConfig.getName(), queueConfig.getQueueName(), 100L);
List<Object> row2 = Arrays.asList(queueConfig2.getName(), queueConfig2.getQueueName(), 110L);
assertEquals(Arrays.asList(headers, row, row2), response);
}
@Override
public String get(final String key){
String result = redisTemplate.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
byte[] value = connection.get(serializer.serialize(key));
return serializer.deserialize(value);
}
});
return result;
}
@Override
public long rpush(final String key, Object obj) {
final String value = JSON.toJSONString(obj);
long result = redisTemplate.execute(new RedisCallback<Long>() {
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
long count = connection.rPush(serializer.serialize(key), serializer.serialize(value));
return count;
}
});
return result;
}
/**
* command GETSET key value
*/
public String getSet(final String key, final String value) {
final org.springframework.data.redis.serializer.RedisSerializer redisSerializer = template.getKeySerializer();
final org.springframework.data.redis.serializer.RedisSerializer redisValueSerializer = template.getValueSerializer();
return template.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
byte[] b = connection.getSet(redisSerializer.serialize(key), redisValueSerializer.serialize(value));
return template.getStringSerializer().deserialize(b);
}
});
}
@Override
public void setBytes(String key, byte[] bytes) {
redisTemplate.opsForHash().getOperations().execute((RedisCallback<List<byte[]>>) redis -> {
redis.set(_key(key).getBytes(), bytes);
redis.hSet(region.getBytes(), key.getBytes(), bytes);
return null;
});
}
/**
* 批量操作
* @param callback 业务回调
* @param redisTemplate RedisTemplate
*/
public static void batchConn(Consumer<RedisConnection> callback, RedisTemplate<String, String> redisTemplate) {
redisTemplate.executePipelined((RedisCallback<Long>) (connection) -> {
connection.openPipeline();
callback.accept(connection);
return null;
});
}
/**
* 删除key
*
* @param keys the keys
* @return the long
*/
public long del(final String... keys) {
return redisTemplate.execute((RedisCallback<Long>) connection -> {
long result = 0;
for (String key : keys) {
result = connection.del(key.getBytes(DEFAULT_CHARSET));
}
return result;
});
}
public Boolean pExpire(final String key, final long timeout) {
final org.springframework.data.redis.serializer.RedisSerializer redisSerializer = template.getKeySerializer();
return template.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
return connection.pExpire(redisSerializer.serialize(key), timeout);
}
});
}
/**
* command GETSET key value
*/
public String getSet(final String key, final String value) {
final org.springframework.data.redis.serializer.RedisSerializer redisSerializer = template.getKeySerializer();
final org.springframework.data.redis.serializer.RedisSerializer redisValueSerializer = template.getValueSerializer();
return template.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
byte[] b = connection.getSet(redisSerializer.serialize(key), redisValueSerializer.serialize(value));
return template.getStringSerializer().deserialize(b);
}
});
}
@RequestMapping("/keysSize")
@ResponseBody
public String getKeysSize() {
Map<String, Object> map = new HashMap<>();
Object o = redisTemplate.execute(new RedisCallback() {
public Long doInRedis(RedisConnection connection) throws DataAccessException {
return connection.dbSize();
}
});;
map.put("dbSize", o);
map.put("create_time", System.currentTimeMillis());
return JSON.toJSONString(map);
}
/**
* 根据key获取对象
*
* @param key the key
* @return the string
*/
public String get(final String key) {
String resultStr = redisTemplate.execute((RedisCallback<String>) connection -> {
RedisSerializer<String> serializer = getRedisSerializer();
byte[] keys = serializer.serialize(key);
byte[] values = connection.get(keys);
return serializer.deserialize(values);
});
log.debug("[redisTemplate redis]取出 缓存 url:{} ", key);
return resultStr;
}
/**
* 一次性添加数组到 过期时间的 缓存,不用多次连接,节省开销
*
* @param keys the keys
* @param values the values
*/
public void set(final String[] keys, final String[] values) {
redisTemplate.execute((RedisCallback<Long>) connection -> {
RedisSerializer<String> serializer = getRedisSerializer();
for (int i = 0; i < keys.length; i++) {
byte[] bKeys = serializer.serialize(keys[i]);
byte[] bValues = serializer.serialize(values[i]);
connection.set(bKeys, bValues);
log.debug("[redisTemplate redis]放入 缓存 url:{}", keys[i]);
}
return 1L;
});
}
/**
* 存储code到redis,并设置过期时间,10分钟<br>
* value为OAuth2Authentication序列化后的字节<br>
* 因为OAuth2Authentication没有无参构造函数<br>
* redisTemplate.opsForValue().set(key, value, timeout, unit);
* 这种方式直接存储的话,redisTemplate.opsForValue().get(key)的时候有些问题,
* 所以这里采用最底层的方式存储,get的时候也用最底层的方式获取
*/
@Override
protected void store(String code, OAuth2Authentication authentication) {
redisTemplate.execute(new RedisCallback<Long>() {
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
connection.set(codeKey(code).getBytes(), SerializationUtils.serialize(authentication),
Expiration.from(10, TimeUnit.MINUTES), SetOption.UPSERT);
return 1L;
}
});
}
/**
* Clears this cache instance
*/
@Override
public void clear() {
RedisTemplate redisTemplate = getRedisTemplate();
redisTemplate.execute((RedisCallback) connection -> {
connection.flushDb();
return null;
});
logger.debug("Clear all the cached query result from redis");
}
/**
* 清空redis存储的数据
*
* @return the string
*/
public String flushDB() {
return redisTemplate.execute((RedisCallback<String>) connection -> {
connection.flushDb();
return "ok";
});
}
/**
* 获取Redis List 序列化
*
* @param key
* @param targetClass
* @param <T>
* @return
*/
public <T> List<T> getListCache(final String key, Class<T> targetClass) {
byte[] result = redisTemplate.execute(new RedisCallback<byte[]>() {
@Override
public byte[] doInRedis(RedisConnection connection) throws DataAccessException {
return connection.get(key.getBytes());
}
});
if (result == null) {
return null;
}
return ProtoStuffSerializerUtil.deserializeList(result, targetClass);
}
/**
* 添加到带有 过期时间的 缓存
*
* @param key redis主键
* @param value 值
* @param time 过期时间
*/
public void setExpire(final byte[] key, final byte[] value, final long time) {
redisTemplate.execute((RedisCallback<Long>) connection -> {
connection.set(key, value);
connection.expire(key, time);
log.debug("[redisTemplate redis]放入 缓存 url:{} ========缓存时间为{}秒", key, time);
return 1L;
});
}
@Override
public List<byte[]> getBytes(Collection<String> keys) {
return redisTemplate.opsForHash().getOperations().execute((RedisCallback<List<byte[]>>) redis -> {
byte[][] bytes = keys.stream().map(k -> k.getBytes()).toArray(byte[][]::new);
return redis.hMGet(region.getBytes(), bytes);
});
}
@Override
public V hGet(String key, String field) {
return redisTemplate.execute(new RedisCallback<V>() {
@Override
public V doInRedis(RedisConnection connection) {
byte[] res = connection.hGet(serializer.serialize(key), hkeySerializer.serialize(field));
if (res != null && res.length > 0) {
return hvalueSerializer.deserialize(res);
} else {
return null;
}
}
});
}