diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Proto2Codec.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Proto2Codec.java index 4148495cce5..3a9d38863bb 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Proto2Codec.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Proto2Codec.java @@ -17,15 +17,24 @@ package org.apache.hadoop.hdds.utils.db; -import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.ExtensionRegistryLite; +import com.google.protobuf.Message; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; +import com.google.protobuf.WireFormat; import jakarta.annotation.Nonnull; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; +import org.apache.commons.lang3.tuple.Pair; import org.apache.ratis.util.function.CheckedFunction; /** @@ -33,7 +42,7 @@ */ public final class Proto2Codec implements Codec { - private static final ConcurrentMap, + private static final ConcurrentMap, Set>, Codec> CODECS = new ConcurrentHashMap<>(); @@ -41,17 +50,30 @@ public final class Proto2Codec * @return the {@link Codec} for the given class. */ public static Codec get(T t) { - final Codec codec = CODECS.computeIfAbsent(t.getClass(), - key -> new Proto2Codec<>(t)); + return get(t, Collections.emptySet()); + } + + /** + * @return the {@link Codec} for the given class. + */ + public static Codec get(T t, Set fieldsToBeSkipped) { + final Codec codec = CODECS.computeIfAbsent(Pair.of(t.getClass(), fieldsToBeSkipped), + key -> new Proto2Codec<>(t, fieldsToBeSkipped)); return (Codec) codec; } private final Class clazz; private final Parser parser; + private final Descriptors.Descriptor descriptor; + private final Supplier builderSupplier; + private final Set fieldsToBeSkipped; - private Proto2Codec(M m) { + private Proto2Codec(M m, Set fieldsToBeSkipped) { this.clazz = (Class) m.getClass(); this.parser = (Parser) m.getParserForType(); + this.descriptor = ((Message)m).getDescriptorForType(); + this.fieldsToBeSkipped = fieldsToBeSkipped; + this.builderSupplier = ((Message)m)::newBuilderForType; } @Override @@ -83,19 +105,124 @@ private CheckedFunction writeTo( public M fromCodecBuffer(@Nonnull CodecBuffer buffer) throws IOException { try (InputStream in = buffer.getInputStream()) { - return parser.parseFrom(in); + if (this.fieldsToBeSkipped.isEmpty()) { + return parser.parseFrom(in); + } else { + return parse(CodedInputStream.newInstance(in)); + } } } + private Object getValue(CodedInputStream input, Descriptors.FieldDescriptor field) throws IOException { + Object value; + switch (field.getType()) { + case DOUBLE: + value = input.readDouble(); + break; + case FLOAT: + value = input.readFloat(); + break; + case INT64: + value = input.readInt64(); + break; + case UINT64: + value = input.readUInt64(); + break; + case INT32: + value = input.readInt32(); + break; + case FIXED64: + value = input.readFixed64(); + break; + case FIXED32: + value = input.readFixed32(); + break; + case BOOL: + value = input.readBool(); + break; + case STRING: + value = input.readString(); + break; + case GROUP: + case MESSAGE: + value = DynamicMessage.newBuilder(field.getMessageType()); + input.readMessage((MessageLite.Builder) value, + ExtensionRegistryLite.getEmptyRegistry()); + value = ((MessageLite.Builder) value).build(); + break; + case BYTES: + value = input.readBytes(); + break; + case UINT32: + value = input.readUInt32(); + break; + case ENUM: + value = field.getEnumType().findValueByNumber(input.readEnum()); + System.out.println(((Descriptors.EnumValueDescriptor)value).getName()); + break; + case SFIXED32: + value = input.readSFixed32(); + break; + case SFIXED64: + value = input.readSFixed64(); + break; + case SINT32: + value = input.readSInt32(); + break; + case SINT64: + value = input.readSInt64(); + break; + default: + throw new UnsupportedOperationException(); + } + System.out.println(field.getName() + ": " + value); + return value; + } + + private M parse(CodedInputStream codedInputStream) throws IOException { + Message.Builder builder = this.builderSupplier.get(); + while (!codedInputStream.isAtEnd()) { + int tag = codedInputStream.readTag(); + + if (tag == 0) { + break; + } + int fieldNumber = WireFormat.getTagFieldNumber(tag); + + final Descriptors.FieldDescriptor field = descriptor.findFieldByNumber(fieldNumber); + if (field != null && !this.fieldsToBeSkipped.contains(field.getName())) { + try { + Object value = getValue(codedInputStream, field); + if (field.isRepeated()) { + builder.addRepeatedField(field, value); + } else { + builder.setField(field, value); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + codedInputStream.skipField(tag); + } + } + return (M) builder.build(); + } + @Override public byte[] toPersistedFormat(M message) { return message.toByteArray(); } + @Override public M fromPersistedFormat(byte[] bytes) - throws InvalidProtocolBufferException { - return parser.parseFrom(bytes); + throws IOException { + if (fieldsToBeSkipped.isEmpty()) { + return parser.parseFrom(bytes); + } else { + return parse(CodedInputStream.newInstance(bytes)); + } + } @Override diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java index 5a07558cd54..c44ff534a6f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java @@ -165,6 +165,19 @@ default VALUE getReadCopy(KEY key) throws IOException { TableIterator> iterator() throws IOException; + /** + * Returns the iterator for this metadata store. + * @param keyCodec + * @param valueCodec + * @param prefix + * @return MetaStoreIterator + * @throws IOException on failure. + */ + default TableIterator> iterator(Codec keyCodec, Codec valueCodec, + KEY prefix) throws IOException { + throw new NotImplementedException("iterator is not implemented"); + } + /** * Returns a prefixed iterator for this metadata store. * @param prefix diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index 24676ac33b5..6131b68da8f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -141,12 +141,16 @@ private byte[] encodeValue(VALUE value) throws IOException { return value == null ? null : valueCodec.toPersistedFormat(value); } - private KEY decodeKey(byte[] key) throws IOException { - return key == null ? null : keyCodec.fromPersistedFormat(key); + private KEY decodeKey(Codec kCodec, byte[] key) throws IOException { + return key == null ? null : kCodec.fromPersistedFormat(key); } private VALUE decodeValue(byte[] value) throws IOException { - return value == null ? null : valueCodec.fromPersistedFormat(value); + return decodeValue(valueCodec, value); + } + + private VALUE decodeValue(Codec vCodec, byte[] value) throws IOException { + return value == null ? null : vCodec.fromPersistedFormat(value); } @Override @@ -418,12 +422,12 @@ public Table.KeyValueIterator iterator() throws IOException { } @Override - public Table.KeyValueIterator iterator(KEY prefix) + public Table.KeyValueIterator iterator(Codec kCodec, Codec vCodec, KEY prefix) throws IOException { if (supportCodecBuffer) { final CodecBuffer prefixBuffer = encodeKeyCodecBuffer(prefix); try { - return newCodecBufferTableIterator(rawTable.iterator(prefixBuffer)); + return newCodecBufferTableIterator(kCodec, vCodec, rawTable.iterator(prefixBuffer)); } catch (Throwable t) { if (prefixBuffer != null) { prefixBuffer.release(); @@ -432,10 +436,15 @@ public Table.KeyValueIterator iterator(KEY prefix) } } else { final byte[] prefixBytes = encodeKey(prefix); - return new TypedTableIterator(rawTable.iterator(prefixBytes)); + return new TypedTableIterator(kCodec, vCodec, rawTable.iterator(prefixBytes)); } } + @Override + public Table.KeyValueIterator iterator(KEY prefix) throws IOException { + return iterator(keyCodec, valueCodec, prefix); + } + @Override public String getName() { return rawTable.getName(); @@ -499,7 +508,8 @@ public List getRangeKVs( rawTable.getRangeKVs(startKeyBytes, count, prefixBytes, filters); List rangeKVs = new ArrayList<>(); - rangeKVBytes.forEach(byteKV -> rangeKVs.add(new TypedKeyValue(byteKV))); + rangeKVBytes.forEach(byteKV -> rangeKVs.add(new TypedKeyValue(keyCodec, + valueCodec, byteKV))); return rangeKVs; } @@ -520,7 +530,8 @@ public List getSequentialRangeKVs( prefixBytes, filters); List rangeKVs = new ArrayList<>(); - rangeKVBytes.forEach(byteKV -> rangeKVs.add(new TypedKeyValue(byteKV))); + rangeKVBytes.forEach(byteKV -> rangeKVs.add(new TypedKeyValue( + keyCodec, valueCodec, byteKV))); return rangeKVs; } @@ -558,19 +569,23 @@ TableCache getCache() { public final class TypedKeyValue implements KeyValue { private final KeyValue rawKeyValue; + private final Codec kCodec; + private final Codec vCodec; - private TypedKeyValue(KeyValue rawKeyValue) { + private TypedKeyValue(Codec kCodec, Codec vCodec, KeyValue rawKeyValue) { + this.kCodec = kCodec; + this.vCodec = vCodec; this.rawKeyValue = rawKeyValue; } @Override public KEY getKey() throws IOException { - return decodeKey(rawKeyValue.getKey()); + return decodeKey(kCodec, rawKeyValue.getKey()); } @Override public VALUE getValue() throws IOException { - return decodeValue(rawKeyValue.getValue()); + return decodeValue(vCodec, rawKeyValue.getValue()); } public byte[] getRawKey() throws IOException { @@ -582,8 +597,8 @@ public byte[] getRawValue() throws IOException { } } - RawIterator newCodecBufferTableIterator( - TableIterator> i) { + RawIterator newCodecBufferTableIterator(Codec kCodec, + Codec vCodec, TableIterator> i) { return new RawIterator(i) { @Override AutoCloseSupplier convert(KEY key) throws IOException { @@ -616,9 +631,14 @@ KeyValue convert(KeyValue raw) * Table Iterator implementation for strongly typed tables. */ public class TypedTableIterator extends RawIterator { - TypedTableIterator( + private final Codec kCodec; + private final Codec vCodec; + + TypedTableIterator(Codec kCodec, Codec vCodec, TableIterator> rawIterator) { super(rawIterator); + this.kCodec = kCodec; + this.vCodec = vCodec; } @Override @@ -629,7 +649,7 @@ AutoCloseSupplier convert(KEY key) throws IOException { @Override KeyValue convert(KeyValue raw) { - return new TypedKeyValue(raw); + return new TypedKeyValue(kCodec, vCodec, raw); } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDirectoryInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDirectoryInfo.java index 51158df3778..086926d3a22 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDirectoryInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDirectoryInfo.java @@ -17,16 +17,22 @@ package org.apache.hadoop.ozone.om.helpers; +import com.google.protobuf.Descriptors; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import org.apache.hadoop.hdds.utils.db.Codec; import org.apache.hadoop.hdds.utils.db.CopyObject; import org.apache.hadoop.hdds.utils.db.DelegatedCodec; import org.apache.hadoop.hdds.utils.db.Proto2Codec; import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DirectoryInfo; /** @@ -36,11 +42,20 @@ */ public class OmDirectoryInfo extends WithParentObjectId implements CopyObject { - private static final Codec CODEC = new DelegatedCodec<>( - Proto2Codec.get(DirectoryInfo.getDefaultInstance()), - OmDirectoryInfo::getFromProtobuf, - OmDirectoryInfo::getProtobuf, - OmDirectoryInfo.class); + private static final Codec CODEC = newCodec(Collections.emptySet()); + private static final Map> CODEC_MAP = new ConcurrentHashMap<>(); + public static final Set FIELDS_LIST = OzoneManagerProtocolProtos.DirectoryInfo.getDescriptor().getFields() + .stream().map(Descriptors.FieldDescriptor::getName).collect(Collectors.toSet()); + + public static Codec newCodec(Set ignoredFields) { + assert CODEC_MAP != null; + return CODEC_MAP.computeIfAbsent(String.join(",", ignoredFields), + (k) -> new DelegatedCodec<>( + Proto2Codec.get(DirectoryInfo.getDefaultInstance(), ignoredFields), + OmDirectoryInfo::getFromProtobuf, + OmDirectoryInfo::getProtobuf, + OmDirectoryInfo.class)); + } public static Codec getCodec() { return CODEC; diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java index 4dffb14b11d..4b3ca3b849b 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java @@ -18,13 +18,18 @@ package org.apache.hadoop.ozone.om.helpers; import com.google.common.collect.ImmutableList; +import com.google.protobuf.Descriptors; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileChecksum; @@ -55,16 +60,20 @@ public final class OmKeyInfo extends WithParentObjectId implements CopyObject, WithTags { private static final Logger LOG = LoggerFactory.getLogger(OmKeyInfo.class); - - private static final Codec CODEC_TRUE = newCodec(true); - private static final Codec CODEC_FALSE = newCodec(false); - - private static Codec newCodec(boolean ignorePipeline) { - return new DelegatedCodec<>( - Proto2Codec.get(KeyInfo.getDefaultInstance()), - OmKeyInfo::getFromProtobuf, - k -> k.getProtobuf(ignorePipeline, ClientVersion.CURRENT_VERSION), - OmKeyInfo.class); + private static final Map> CODEC_MAP = new ConcurrentHashMap<>(); + private static final Codec CODEC_TRUE = newCodec(true, Collections.emptySet()); + private static final Codec CODEC_FALSE = newCodec(false, Collections.emptySet()); + + public static final Set FIELDS_LIST = KeyInfo.getDescriptor().getFields().stream() + .map(Descriptors.FieldDescriptor::getName).collect(Collectors.toSet()); + + public static Codec newCodec(boolean ignorePipeline, Set ignoredFields) { + return CODEC_MAP.computeIfAbsent(String.join(",", ignoredFields) + "," + ignorePipeline, + (key) -> new DelegatedCodec<>( + Proto2Codec.get(KeyInfo.getDefaultInstance(), ignoredFields), + OmKeyInfo::getFromProtobuf, + k -> k.getProtobuf(ignorePipeline, ClientVersion.CURRENT_VERSION), + OmKeyInfo.class)); } public static Codec getCodec(boolean ignorePipeline) {