Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-12582. TypedTable support using different codec #8073

Merged
merged 1 commit into from
Mar 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang3.ClassUtils;

/**
Expand Down Expand Up @@ -61,6 +62,7 @@ private CodecMap(Map<Class<?>, Codec<?>> map) {
}

<T> Codec<T> get(Class<T> clazz) {
Objects.requireNonNull(clazz, "clazz == null");
final Codec<?> codec = map.get(clazz);
return (Codec<T>) codec;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,19 @@ <KEY, VALUE> Table<KEY, VALUE> getTable(String name,
Class<KEY> keyType, Class<VALUE> valueType,
TableCache.CacheType cacheType) throws IOException;

/**
* Gets table store with implict key/value conversion.
*
* @param name - table name
* @param keyCodec - key codec
* @param valueCodec - value codec
* @param cacheType - cache type
* @return - Table Store
* @throws IOException
*/
<KEY, VALUE> TypedTable<KEY, VALUE> getTable(
String name, Codec<KEY> keyCodec, Codec<VALUE> valueCodec, TableCache.CacheType cacheType) throws IOException;

/**
* Lists the Known list of Tables in a DB.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ public <K, V> TypedTable<K, V> getTable(String name,
valueType);
}

@Override
public <K, V> TypedTable<K, V> getTable(
String name, Codec<K> keyCodec, Codec<V> valueCodec, TableCache.CacheType cacheType) throws IOException {
return new TypedTable<>(getTable(name), keyCodec, valueCodec, cacheType);
}

@Override
public <K, V> Table<K, V> getTable(String name,
Class<K> keyType, Class<V> valueType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.hdds.utils.db.cache.PartialTableCache;
import org.apache.hadoop.hdds.utils.db.cache.TableCache;
import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType;
import org.apache.hadoop.hdds.utils.db.cache.TableNoCache;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.function.CheckedBiFunction;

Expand Down Expand Up @@ -88,19 +89,27 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
*/
TypedTable(RDBTable rawTable, CodecRegistry codecRegistry, Class<KEY> keyType, Class<VALUE> valueType,
CacheType cacheType) throws IOException {
this.rawTable = Objects.requireNonNull(rawTable, "rawTable==null");
Objects.requireNonNull(codecRegistry, "codecRegistry == null");

Objects.requireNonNull(keyType, "keyType == null");
this.keyCodec = codecRegistry.getCodecFromClass(keyType);
Objects.requireNonNull(keyCodec, "keyCodec == null");
this(rawTable, codecRegistry.getCodecFromClass(keyType), codecRegistry.getCodecFromClass(valueType),
cacheType);
}

Objects.requireNonNull(valueType, "valueType == null");
this.valueCodec = codecRegistry.getCodecFromClass(valueType);
Objects.requireNonNull(valueCodec, "valueCodec == null");
/**
* Create an TypedTable from the raw table with specified cache type.
*
* @param rawTable The underlying (untyped) table in RocksDB.
* @param keyCodec The key codec.
* @param valueCodec The value codec.
* @param cacheType How to cache the entries?
* @throws IOException
*/
public TypedTable(
RDBTable rawTable, Codec<KEY> keyCodec, Codec<VALUE> valueCodec, CacheType cacheType) throws IOException {
this.rawTable = Objects.requireNonNull(rawTable, "rawTable==null");
this.keyCodec = Objects.requireNonNull(keyCodec, "keyCodec == null");
this.valueCodec = Objects.requireNonNull(valueCodec, "valueCodec == null");

this.info = getClassSimpleName(getClass()) + "-" + getName()
+ "(" + getClassSimpleName(keyType) + "->" + getClassSimpleName(valueType) + ")";
this.info = getClassSimpleName(getClass()) + "-" + getName() + "(" + getClassSimpleName(keyCodec.getTypeClass())
+ "->" + getClassSimpleName(valueCodec.getTypeClass()) + ")";

this.supportCodecBuffer = keyCodec.supportCodecBuffer()
&& valueCodec.supportCodecBuffer();
Expand All @@ -109,8 +118,7 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
if (cacheType == CacheType.FULL_CACHE) {
cache = new FullTableCache<>(threadNamePrefix);
//fill cache
try (TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> tableIterator =
iterator()) {
try (TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> tableIterator = iterator()) {

while (tableIterator.hasNext()) {
KeyValue< KEY, VALUE > kv = tableIterator.next();
Expand All @@ -122,8 +130,10 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
CacheValue.get(EPOCH_DEFAULT, kv.getValue()));
}
}
} else {
} else if (cacheType == CacheType.PARTIAL_CACHE) {
cache = new PartialTableCache<>(threadNamePrefix);
} else {
cache = TableNoCache.instance();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ public CacheResult<VALUE> lookup(CacheKey<KEY> cachekey) {
CacheValue<VALUE> cachevalue = cache.get(cachekey);
statsRecorder.recordValue(cachevalue);
if (cachevalue == null) {
return new CacheResult<>(CacheResult.CacheStatus.MAY_EXIST,
null);
return (CacheResult<VALUE>) MAY_EXIST;
} else {
if (cachevalue.getCacheValue() != null) {
return new CacheResult<>(CacheResult.CacheStatus.EXISTS, cachevalue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
@Private
@Evolving
public interface TableCache<KEY, VALUE> {
CacheResult<?> MAY_EXIST = new CacheResult<>(CacheResult.CacheStatus.MAY_EXIST, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add also mapExist() method. The is the Collections.emptyList() trick.

static <T> CacheResult<T> mapExist() {
  return CacheResult<T> MAY_EXIST;
}


/**
* Return the value for the key if it is present, otherwise return null.
Expand Down Expand Up @@ -113,7 +114,8 @@ public interface TableCache<KEY, VALUE> {
enum CacheType {
FULL_CACHE, // This mean's the table maintains full cache. Cache and DB
// state are same.
PARTIAL_CACHE // This is partial table cache, cache state is partial state
PARTIAL_CACHE, // This is partial table cache, cache state is partial state
// compared to DB state.
NO_CACHE
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.utils.db.cache;

import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import org.apache.hadoop.hdds.annotation.InterfaceAudience.Private;
import org.apache.hadoop.hdds.annotation.InterfaceStability.Evolving;

/**
* Dummy cache implementation for the table, means key/value are not cached.
* @param <KEY>
* @param <VALUE>
*/
@Private
@Evolving
public final class TableNoCache<KEY, VALUE> implements TableCache<KEY, VALUE> {
public static final CacheStats EMPTY_STAT = new CacheStats(0, 0, 0);

private static final TableCache<?, ?> NO_CACHE_INSTANCE = new TableNoCache<>();
public static <K, V> TableCache<K, V> instance() {
return (TableCache<K, V>) NO_CACHE_INSTANCE;
}

private TableNoCache() {
}

@Override
public CacheValue<VALUE> get(CacheKey<KEY> cachekey) {
return null;
}

@Override
public void loadInitial(CacheKey<KEY> key, CacheValue<VALUE> value) {
}

@Override
public void put(CacheKey<KEY> cacheKey, CacheValue<VALUE> value) {
}

@Override
public void cleanup(List<Long> epochs) {
}

@Override
public int size() {
return 0;
}

@Override
public Iterator<Map.Entry<CacheKey<KEY>, CacheValue<VALUE>>> iterator() {
return Collections.emptyIterator();
}

@VisibleForTesting
@Override
public void evictCache(List<Long> epochs) {
}

@Override
public CacheResult<VALUE> lookup(CacheKey<KEY> cachekey) {
return (CacheResult<VALUE>) MAY_EXIST;
}

@VisibleForTesting
@Override
public NavigableMap<Long, Set<CacheKey<KEY>>> getEpochEntries() {
return Collections.emptyNavigableMap();
}

@Override
public CacheStats getStats() {
return EMPTY_STAT;
}

@Override
public CacheType getCacheType() {
return CacheType.NO_CACHE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.google.protobuf.ByteString;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand All @@ -41,6 +42,7 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.db.cache.TableCache;
import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -68,7 +70,8 @@ public class TestRDBTableStore {
"First", "Second", "Third",
"Fourth", "Fifth",
"Sixth", "Seventh",
"Eighth", "Ninth");
"Eighth", "Ninth",
"Ten");
private final List<String> prefixedFamilies = Arrays.asList(
"PrefixFirst",
"PrefixTwo", "PrefixThree",
Expand Down Expand Up @@ -304,6 +307,19 @@ public void batchDelete() throws Exception {
}
}

@Test
public void putGetTypedTableCodec() throws Exception {
try (Table<String, String> testTable = rdbStore.getTable("Ten", String.class, String.class)) {
testTable.put("test1", "123");
assertFalse(testTable.isEmpty());
assertEquals("123", testTable.get("test1"));
}
try (Table<String, ByteString> testTable = rdbStore.getTable("Ten",
StringCodec.get(), ByteStringCodec.get(), TableCache.CacheType.NO_CACHE)) {
assertEquals("123", testTable.get("test1").toStringUtf8());
}
}

@Test
public void forEachAndIterator() throws Exception {
final int iterCount = 100;
Expand Down
Loading