下面列出了怎么用com.esotericsoftware.kryo.Serializer的API类实例代码及写法,或者点击链接到github查看源代码。
private TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
MergeResult<Class<?>, SerializableSerializer<?>> reconfiguredDefaultKryoSerializers,
MergeResult<Class<?>, Class<? extends Serializer<?>>> reconfiguredDefaultKryoSerializerClasses,
MergeResult<String, KryoRegistration> reconfiguredRegistrations) {
if (reconfiguredDefaultKryoSerializers.isOrderedSubset() &&
reconfiguredDefaultKryoSerializerClasses.isOrderedSubset() &&
reconfiguredRegistrations.isOrderedSubset()) {
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
// reconfigure a new KryoSerializer
KryoSerializer<T> reconfiguredSerializer = new KryoSerializer<>(
snapshotData.getTypeClass(),
reconfiguredDefaultKryoSerializers.getMerged(),
reconfiguredDefaultKryoSerializerClasses.getMerged(),
reconfiguredRegistrations.getMerged());
return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(reconfiguredSerializer);
}
@SuppressWarnings("unchecked")
private static List<RegistrationBlock> buildRegistrationBlocks(NamespaceConfig config) {
List<Pair<Class<?>[], Serializer<?>>> types = new ArrayList<>();
List<RegistrationBlock> blocks = new ArrayList<>();
blocks.addAll(Namespaces.BASIC.registeredBlocks);
for (NamespaceTypeConfig type : config.getTypes()) {
try {
if (type.getId() == null) {
types.add(Pair.of(new Class[]{type.getType()}, type.getSerializer() != null ? type.getSerializer().newInstance() : null));
} else {
blocks.add(new RegistrationBlock(type.getId(), Collections.singletonList(Pair.of(new Class[]{type.getType()}, type.getSerializer().newInstance()))));
}
} catch (InstantiationException | IllegalAccessException e) {
throw new ConfigurationException("Failed to instantiate serializer from configuration", e);
}
}
blocks.add(new RegistrationBlock(FLOATING_ID, types));
return blocks;
}
@Test
public void testSQLRef() throws Exception {
SQLRef sqlRef = new SQLRef(new HBaseRowLocation(new byte[] {0, 1, 2,3,4,5,6,7,8,9}));
Output output = new Output(new byte[30],30);
Serializer serializer = kryo.getSerializer(SQLRef.class);
serializer.write(kryo, output, sqlRef);
byte[] bytes = output.toBytes();
assertNotNull(bytes);
Input input = new Input(new ByteArrayInputStream(bytes), bytes.length);
SQLRef out = (SQLRef) serializer.read(kryo, input, SQLRef.class);
assertNotNull(out);
assertEquals(sqlRef, out);
}
/**
* Set union, the set with the same key will be reduced(union) together.
* @param mapData map set data
* @param elementSerializer element object Kryo serializer
* @param elementType element object class
* @return the set with the same key will be reduced together.
* @throws Mp4jException
*/
public <T> Map<String, Set<T>> allreduceMapSetUnion(Map<String, Set<T>> mapData, Serializer<T> elementSerializer, Class<T> elementType) throws Mp4jException {
Operand operand = Operands.OBJECT_OPERAND(new ProcessCommSlave.Mp4jSetSerializer<>(elementSerializer, elementType), elementType);
IOperator operator = new IObjectOperator<Set<T>>() {
@Override
public Set<T> apply(Set<T> o1, Set<T> o2) {
for (T val : o2) {
o1.add(val);
}
return o1;
}
};
return allreduceMap(mapData, operand, operator);
}
private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> parseKryoSerializers(
ClassLoader classLoader,
List<String> kryoSerializers) {
return kryoSerializers.stream()
.map(v -> Arrays.stream(v.split(","))
.map(p -> p.split(":"))
.collect(
Collectors.toMap(
arr -> arr[0], // entry key
arr -> arr[1] // entry value
)
)
)
.collect(Collectors.toMap(
m -> loadClass(m.get("class"), classLoader, "Could not load class for kryo serialization"),
m -> loadClass(m.get("serializer"), classLoader, "Could not load serializer's class"),
(m1, m2) -> {
throw new IllegalArgumentException("Duplicated serializer for class: " + m1);
},
LinkedHashMap::new
));
}
private boolean checkForAndApplySerializerOverride(final Map<Class<?>, Serializer<?>> serializerOverrides,
final Kryo kryo, Class<?> targetClass) {
if (serializerOverrides.containsKey(targetClass)) {
final Serializer<?> ser = serializerOverrides.get(targetClass);
if (null == ser) {
// null means use Kryo's default serializer
log.debug("Registering {} with default serializer per overrides", targetClass);
kryo.register(targetClass);
} else {
// nonnull means use that serializer
log.debug("Registering {} with serializer {} per overrides", targetClass, ser);
kryo.register(targetClass, ser);
}
return true;
}
return false;
}
private TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
MergeResult<Class<?>, SerializableSerializer<?>> reconfiguredDefaultKryoSerializers,
MergeResult<Class<?>, Class<? extends Serializer<?>>> reconfiguredDefaultKryoSerializerClasses,
MergeResult<String, KryoRegistration> reconfiguredRegistrations) {
if (reconfiguredDefaultKryoSerializers.isOrderedSubset() &&
reconfiguredDefaultKryoSerializerClasses.isOrderedSubset() &&
reconfiguredRegistrations.isOrderedSubset()) {
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
// reconfigure a new KryoSerializer
KryoSerializer<T> reconfiguredSerializer = new KryoSerializer<>(
snapshotData.getTypeClass(),
reconfiguredDefaultKryoSerializers.getMerged(),
reconfiguredDefaultKryoSerializerClasses.getMerged(),
reconfiguredRegistrations.getMerged());
return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(reconfiguredSerializer);
}
private TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
MergeResult<Class<?>, SerializableSerializer<?>> reconfiguredDefaultKryoSerializers,
MergeResult<Class<?>, Class<? extends Serializer<?>>> reconfiguredDefaultKryoSerializerClasses,
MergeResult<String, KryoRegistration> reconfiguredRegistrations) {
if (reconfiguredDefaultKryoSerializers.isOrderedSubset() &&
reconfiguredDefaultKryoSerializerClasses.isOrderedSubset() &&
reconfiguredRegistrations.isOrderedSubset()) {
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
// reconfigure a new KryoSerializer
KryoSerializer<T> reconfiguredSerializer = new KryoSerializer<>(
snapshotData.getTypeClass(),
reconfiguredDefaultKryoSerializers.getMerged(),
reconfiguredDefaultKryoSerializerClasses.getMerged(),
reconfiguredRegistrations.getMerged());
return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(reconfiguredSerializer);
}
public void write (Kryo kryo, Output output) {
output.writeInt(objectID, true);
output.writeInt(cachedMethod.methodClassID, true);
output.writeByte(cachedMethod.methodIndex);
Serializer[] serializers = cachedMethod.serializers;
Object[] args = this.args;
for (int i = 0, n = serializers.length; i < n; i++) {
Serializer serializer = serializers[i];
if (serializer != null)
kryo.writeObjectOrNull(output, args[i], serializer);
else
kryo.writeClassAndObject(output, args[i]);
}
output.writeByte(responseData);
}
/**
* 快速写入
*
* @param kryo
* @param output
* @param defaultRegistration
* @param value
*/
public static void fastWrite(Kryo kryo, Output output, Registration defaultRegistration, Object value) {
if (value == null) {
kryo.writeClass(output, null);
return;
}
Class<?> type = value.getClass();
if (defaultRegistration.getType().equals(type)) {
if (defaultRegistration.getId() == FastClassResolver.NAME) {
((FastClassResolver) kryo.getClassResolver()).writeName(output, type, defaultRegistration);
} else {
output.writeVarInt(defaultRegistration.getId() + 2, true);
}
kryo.writeObject(output, value, defaultRegistration.getSerializer());
} else {
Registration registration = kryo.writeClass(output, value.getClass());
Serializer<?> serializer = registration.getSerializer();
kryo.writeObject(output, value, serializer);
}
}
public KryoRegistration(Class<?> registeredClass, Class<? extends Serializer<?>> serializerClass) {
this.registeredClass = Preconditions.checkNotNull(registeredClass);
this.serializerClass = Preconditions.checkNotNull(serializerClass);
this.serializableSerializerInstance = null;
this.serializerDefinitionType = SerializerDefinitionType.CLASS;
}
public static void writeRoaringToFile(File f,
RoaringBitmap roaring,
Serializer<RoaringBitmap> serializer) throws FileNotFoundException {
Kryo kryo = createKryo();
Output kryoOutputMap = new Output(new FileOutputStream(f));
kryo.writeObjectOrNull(kryoOutputMap, roaring, serializer);
kryoOutputMap.flush();
kryoOutputMap.close();
}
KryoSerializer(Class<T> type,
LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultSerializers,
LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses,
LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
this.type = checkNotNull(type, "Type class cannot be null.");
this.defaultSerializerClasses = checkNotNull(defaultSerializerClasses, "Default serializer classes cannot be null.");
this.defaultSerializers = checkNotNull(defaultSerializers, "Default serializers cannot be null.");
this.kryoRegistrations = checkNotNull(kryoRegistrations, "Kryo registrations cannot be null.");
}
KryoSerializer(Class<T> type,
LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultSerializers,
LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses,
LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
this.type = checkNotNull(type, "Type class cannot be null.");
this.defaultSerializerClasses = checkNotNull(defaultSerializerClasses, "Default serializer classes cannot be null.");
this.defaultSerializers = checkNotNull(defaultSerializers, "Default serializers cannot be null.");
this.kryoRegistrations = checkNotNull(kryoRegistrations, "Kryo registrations cannot be null.");
}
@SuppressWarnings("unchecked")
private static LinkedOptionalMap<Class<?>, Class<? extends Serializer<?>>> readDefaultKryoSerializerClasses(
DataInputView in,
ClassLoader cl) throws IOException {
return readOptionalMap(in, new ClassResolverByName(cl), new ClassResolverByName<Serializer<?>>(cl));
}
/**
* Utility method that takes lists of registered types and their serializers, and resolve
* them into a single list such that the result will resemble the final registration
* result in Kryo.
*/
private static LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(
Class<?> serializedType,
LinkedHashSet<Class<?>> registeredTypes,
LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses,
LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers) {
final LinkedHashMap<String, KryoRegistration> kryoRegistrations = new LinkedHashMap<>();
kryoRegistrations.put(serializedType.getName(), new KryoRegistration(serializedType));
for (Class<?> registeredType : checkNotNull(registeredTypes)) {
kryoRegistrations.put(registeredType.getName(), new KryoRegistration(registeredType));
}
for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> registeredTypeWithSerializerClassEntry :
checkNotNull(registeredTypesWithSerializerClasses).entrySet()) {
kryoRegistrations.put(
registeredTypeWithSerializerClassEntry.getKey().getName(),
new KryoRegistration(
registeredTypeWithSerializerClassEntry.getKey(),
registeredTypeWithSerializerClassEntry.getValue()));
}
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypeWithSerializerEntry :
checkNotNull(registeredTypesWithSerializers).entrySet()) {
kryoRegistrations.put(
registeredTypeWithSerializerEntry.getKey().getName(),
new KryoRegistration(
registeredTypeWithSerializerEntry.getKey(),
registeredTypeWithSerializerEntry.getValue()));
}
// add Avro support if flink-avro is available; a dummy otherwise
AvroUtils.getAvroUtils().addAvroGenericDataArrayRegistration(kryoRegistrations);
return kryoRegistrations;
}
@Override
public void send(Output output, MetaData metaData) throws IOException, Mp4jException {
Kryo kryo = KryoUtils.getKryo();
try {
switch (container) {
case ARRAY:
Serializer arrSerializer = new IntOperand.Mp4jIntArraySerializer(metaData.convertToArrayMetaData());
if (compress) {
arrSerializer = new DeflateSerializer(arrSerializer);
}
arrSerializer.write(kryo, output, null);
break;
case MAP:
Serializer mapSerializer = new IntOperand.Mp4jIntMapSerializer(metaData.convertToMapMetaData());
if (compress) {
mapSerializer = new DeflateSerializer(mapSerializer);
}
mapSerializer.write(kryo, output, null);
break;
default:
throw new Mp4jException("unsupported container:" + container);
}
} catch (Exception e) {
LOG.error("send exception", e);
throw new Mp4jException(e);
} finally {
output.close();
}
}
@Override
public void send(Output output, MetaData metaData) throws IOException, Mp4jException {
Kryo kryo = KryoUtils.getKryo();
try {
switch (container) {
case ARRAY:
Serializer arrSerializer = new LongOperand.Mp4jLongArraySerializer(metaData.convertToArrayMetaData());
if (compress) {
arrSerializer = new DeflateSerializer(arrSerializer);
}
arrSerializer.write(kryo, output, null);
break;
case MAP:
Serializer mapSerializer = new LongOperand.Mp4jLongMapSerializer(metaData.convertToMapMetaData());
if (compress) {
mapSerializer = new DeflateSerializer(mapSerializer);
}
mapSerializer.write(kryo, output, null);
break;
default:
throw new Mp4jException("unsupported container:" + container);
}
} catch (Exception e) {
LOG.error("send exception", e);
throw new Mp4jException(e);
} finally {
output.close();
}
}
@Override
public Serializer getDefaultSerializer(Class type) {
if (type == null) {
throw new IllegalArgumentException("type cannot be null.");
}
if (!type.isArray() && !ReflectionUtils.checkZeroArgConstructor(type)) {
if (logger.isWarnEnabled()) {
logger.warn(type + " has no zero-arg constructor and this will affect the serialization performance");
}
return new JavaSerializer();
}
return super.getDefaultSerializer(type);
}
@SuppressWarnings("unchecked")
private static LinkedOptionalMap<Class<?>, Class<? extends Serializer<?>>> readDefaultKryoSerializerClasses(
DataInputView in,
ClassLoader cl) throws IOException {
return readOptionalMap(in, new ClassResolverByName(cl), new ClassResolverByName<Serializer<?>>(cl));
}
public UserSerializationBuilder register(Class clazz, Serializer serializer) {
com.esotericsoftware.kryo.Serializer previousSerializer = maps.put(clazz, serializer);
if (previousSerializer != null) {
throw new IllegalArgumentException("There is already " + previousSerializer
+ " configured for class " + clazz);
}
return this;
}
@Override
public void send(Output output, MetaData metaData) throws IOException, Mp4jException {
Kryo kryo = KryoUtils.getKryo();
try {
switch (container) {
case ARRAY:
Serializer arrSerializer = new ObjectOperand.Mp4jObjectArraySerializer(
metaData.convertToArrayMetaData(), serializer, type);
if (compress) {
arrSerializer = new DeflateSerializer(arrSerializer);
}
arrSerializer.write(kryo, output, null);
break;
case MAP:
Serializer mapSerializer = new ObjectOperand.Mp4jObjectMapSerializer(
metaData.convertToMapMetaData(), serializer, type);
if (compress) {
mapSerializer = new DeflateSerializer(mapSerializer);
}
mapSerializer.write(kryo, output, null);
break;
default:
throw new Mp4jException("unsupported container:" + container);
}
} catch (Exception e) {
LOG.error("send exception", e);
throw new Mp4jException(e);
} finally {
output.close();
}
}
public Serializer<?> getSerializer(Kryo kryo) {
switch (serializerDefinitionType) {
case UNSPECIFIED:
return null;
case CLASS:
return ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, registeredClass);
case INSTANCE:
return serializableSerializerInstance.getSerializer();
default:
// this should not happen; adding as a guard for the future
throw new IllegalStateException(
"Unrecognized Kryo registration serializer definition type: " + serializerDefinitionType);
}
}
/**
* Registers the given Serializer via its class as a serializer for the given type at the KryoSerializer
*
* @param type The class of the types serialized with the given serializer.
* @param serializerClass The class of the serializer to use.
*/
@SuppressWarnings("rawtypes")
public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass) {
if (type == null || serializerClass == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
@SuppressWarnings("unchecked")
Class<? extends Serializer<?>> castedSerializerClass = (Class<? extends Serializer<?>>) serializerClass;
registeredTypesWithKryoSerializerClasses.put(type, castedSerializerClass);
}
@Override
public void send(Output output, MetaData metaData) throws IOException, Mp4jException {
Kryo kryo = KryoUtils.getKryo();
try {
switch (container) {
case ARRAY:
Serializer arrSerializer = new ShortOperand.Mp4jShortArraySerializer(metaData.convertToArrayMetaData());
if (compress) {
arrSerializer = new DeflateSerializer(arrSerializer);
}
arrSerializer.write(kryo, output, null);
break;
case MAP:
Serializer mapSerializer = new ShortOperand.Mp4jShortMapSerializer(metaData.convertToMapMetaData());
if (compress) {
mapSerializer = new DeflateSerializer(mapSerializer);
}
mapSerializer.write(kryo, output, null);
break;
default:
throw new Mp4jException("unsupported container:" + container);
}
} catch (Exception e) {
LOG.error("send exception", e);
throw new Mp4jException(e);
} finally {
output.close();
}
}
@Override
public void send(Output output, MetaData metaData) throws IOException, Mp4jException {
Kryo kryo = KryoUtils.getKryo();
try {
switch (container) {
case ARRAY:
Serializer arrSerializer = new Mp4jDoubleArraySerializer(metaData.convertToArrayMetaData());
if (compress) {
arrSerializer = new DeflateSerializer(arrSerializer);
}
arrSerializer.write(kryo, output, null);
break;
case MAP:
Serializer mapSerializer = new Mp4jDoubleMapSerializer(metaData.convertToMapMetaData());
if (compress) {
mapSerializer = new DeflateSerializer(mapSerializer);
}
mapSerializer.write(kryo, output, null);
break;
default:
throw new Mp4jException("unsupported container:" + container);
}
} catch (Exception e) {
LOG.error("send exception", e);
throw new Mp4jException(e);
} finally {
output.close();
}
}
@Override
public Serializer getDefaultSerializer(Class type) {
if (_override) {
return new SerializableSerializer();
} else {
return super.getDefaultSerializer(type);
}
}
KryoSerializerSnapshot(Class<T> typeClass,
LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultKryoSerializers,
LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultKryoSerializerClasses,
LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
this.snapshotData = createFrom(typeClass, defaultKryoSerializers, defaultKryoSerializerClasses, kryoRegistrations);
}
private TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(KryoSerializer<T> newSerializer) {
// merge the default serializers
final MergeResult<Class<?>, SerializableSerializer<?>> reconfiguredDefaultKryoSerializers = mergeRightIntoLeft(
snapshotData.getDefaultKryoSerializers(),
optionalMapOf(newSerializer.getDefaultKryoSerializers(), Class::getName));
if (reconfiguredDefaultKryoSerializers.hasMissingKeys()) {
logMissingKeys(reconfiguredDefaultKryoSerializers);
return TypeSerializerSchemaCompatibility.incompatible();
}
// merge default serializer classes
final MergeResult<Class<?>, Class<? extends Serializer<?>>> reconfiguredDefaultKryoSerializerClasses = mergeRightIntoLeft(
snapshotData.getDefaultKryoSerializerClasses(),
optionalMapOf(newSerializer.getDefaultKryoSerializerClasses(), Class::getName));
if (reconfiguredDefaultKryoSerializerClasses.hasMissingKeys()) {
logMissingKeys(reconfiguredDefaultKryoSerializerClasses);
return TypeSerializerSchemaCompatibility.incompatible();
}
// merge registration
final MergeResult<String, KryoRegistration> reconfiguredRegistrations = mergeRightIntoLeft(
snapshotData.getKryoRegistrations(),
optionalMapOf(newSerializer.getKryoRegistrations(), Function.identity()));
if (reconfiguredRegistrations.hasMissingKeys()) {
logMissingKeys(reconfiguredRegistrations);
return TypeSerializerSchemaCompatibility.incompatible();
}
// there are no missing keys, now we have to decide whether we are compatible as-is or we require reconfiguration.
return resolveSchemaCompatibility(
reconfiguredDefaultKryoSerializers,
reconfiguredDefaultKryoSerializerClasses,
reconfiguredRegistrations);
}
/**
* Adds a new Kryo default serializer to the Runtime.
*
* @param type The class of the types serialized with the given serializer.
* @param serializerClass The class of the serializer to use.
*/
public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) {
if (type == null || serializerClass == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
defaultKryoSerializerClasses.put(type, serializerClass);
}