Skip to content

Commit f4be262

Browse files
authored
Merge d09adf3 into 03e4e82
2 parents 03e4e82 + d09adf3 commit f4be262

File tree

18 files changed

+729
-18
lines changed

18 files changed

+729
-18
lines changed

Diff for: ydb/core/fq/libs/actors/clusters_from_connections.cpp

+15
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.h>
66
#include <yql/essentials/utils/url_builder.h>
77
#include <ydb/library/actors/http/http.h>
8+
#include <ydb/core/fq/libs/common/iceberg_processor.h>
89

910
#include <util/generic/hash.h>
1011
#include <util/string/builder.h>
@@ -156,6 +157,9 @@ void FillGenericClusterConfigBase(
156157
case NYql::EGenericDataSourceKind::POSTGRESQL:
157158
clusterCfg.SetProtocol(NYql::EGenericProtocol::NATIVE);
158159
break;
160+
case NYql::EGenericDataSourceKind::ICEBERG:
161+
clusterCfg.SetProtocol(NYql::EGenericProtocol::NATIVE);
162+
break;
159163
default:
160164
ythrow yexception() << "Unexpected data source kind: '"
161165
<< NYql::EGenericDataSourceKind_Name(dataSourceKind) << "'";
@@ -342,6 +346,17 @@ void AddClustersFromConnections(
342346
break;
343347
}
344348

349+
case FederatedQuery::ConnectionSetting::kIceberg: {
350+
const auto& db = conn.content().setting().iceberg();
351+
auto& clusterConfig = *gatewaysConfig.MutableGeneric()->AddClusterMapping();
352+
353+
clusterConfig.SetName(connectionName);
354+
NFq::FillIcebergGenericClusterConfig(common, db, clusterConfig);
355+
FillClusterAuth(clusterConfig, db.warehouse_auth(), authToken, accountIdSignatures);
356+
clusters.emplace(connectionName, GenericProviderName);
357+
break;
358+
}
359+
345360
// Do not replace with default. Adding a new connection should cause a compilation error
346361
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
347362
break;

Diff for: ydb/core/fq/libs/cloud_audit/yq_cloud_audit_service.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ std::string MapConnectionType(const FederatedQuery::ConnectionSetting::Connectio
7171
return "MySQLCluster";
7272
case FederatedQuery::ConnectionSetting::ConnectionCase::kLogging:
7373
return "Logging";
74+
case FederatedQuery::ConnectionSetting::ConnectionCase::kIceberg:
75+
return "Iceberg";
7476
case FederatedQuery::ConnectionSetting::ConnectionCase::CONNECTION_NOT_SET:
7577
Y_ENSURE(false, "Invalid connection case " << i32(connectionCase));
7678
}

Diff for: ydb/core/fq/libs/common/iceberg_processor.cpp

+259
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
#include <util/string/builder.h>
2+
#include <util/string/strip.h>
3+
#include <contrib/libs/fmt/include/fmt/format.h>
4+
#include <ydb/core/external_sources/iceberg_fields.h>
5+
#include <ydb/core/fq/libs/result_formatter/result_formatter.h>
6+
#include <ydb/core/fq/libs/common/util.h>
7+
8+
#include "iceberg_processor.h"
9+
10+
namespace NFq {
11+
12+
constexpr char VALUE_DEFAULT_REGION[] = "ru-central1";
13+
14+
TString RemoveTrailingSlashes(const TString& str) {
15+
if (str.empty()) {
16+
return "";
17+
}
18+
19+
const auto first = str.find_first_not_of('/');
20+
21+
if (TString::npos == first) {
22+
return "";
23+
}
24+
25+
const auto last = str.find_last_not_of('/');
26+
return str.substr(first, last - first + 1);
27+
}
28+
29+
TIcebergProcessor::TIcebergProcessor(const FederatedQuery::Iceberg& config, NYql::TIssues& issues)
30+
: Config_(config)
31+
, Issues_(&issues)
32+
{ }
33+
34+
TIcebergProcessor::TIcebergProcessor(const FederatedQuery::Iceberg& config)
35+
: Config_(config)
36+
, Issues_(nullptr)
37+
{ }
38+
39+
void TIcebergProcessor::TIcebergProcessor::Process() {
40+
if (!Config_.has_warehouse_auth()
41+
|| Config_.warehouse_auth().identity_case() == FederatedQuery::IamAuth::IDENTITY_NOT_SET) {
42+
DoOnPropertyRequiredError("warehouse.auth");
43+
}
44+
45+
ProcessSkipAuth();
46+
}
47+
48+
void TIcebergProcessor::ProcessSkipAuth() {
49+
if (!Config_.has_warehouse()) {
50+
DoOnPropertyRequiredError("warehouse");
51+
} else {
52+
ProcessWarehouse(Config_.warehouse());
53+
}
54+
55+
if (!Config_.has_catalog()) {
56+
DoOnPropertyRequiredError("catalog");
57+
} else {
58+
ProcessCatalog(Config_.catalog());
59+
}
60+
}
61+
62+
void TIcebergProcessor::DoOnPropertyRequiredError(const TString& property) {
63+
DoOnError(property, "has to be set");
64+
}
65+
66+
void TIcebergProcessor::DoOnError(const TString& property, const TString& msg) {
67+
if (!Issues_) {
68+
throw yexception() << property << ": " << msg;
69+
}
70+
71+
auto m = TStringBuilder()
72+
<< "content.setting.iceberg."
73+
<< property << " "
74+
<< msg;
75+
76+
Issues_->AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, m));
77+
}
78+
79+
void TIcebergProcessor::ProcessWarehouse(const FederatedQuery::IcebergWarehouse& warehouse) {
80+
if (warehouse.has_s3()) {
81+
ProcessWarehouseS3(warehouse.s3());
82+
} else {
83+
DoOnPropertyRequiredError("warehouse.type");
84+
}
85+
}
86+
87+
void TIcebergProcessor::ProcessWarehouseS3(const FederatedQuery::IcebergWarehouse_S3& s3) {
88+
TString bucket;
89+
90+
if (!s3.has_bucket()
91+
|| (bucket = RemoveTrailingSlashes(s3.bucket())).empty()) {
92+
DoOnPropertyRequiredError("warehouse.s3.bucket");
93+
}
94+
95+
if (OnS3Callback_ && !HasErrors()) {
96+
auto uri = TStringBuilder() << bucket;
97+
auto path = RemoveTrailingSlashes(s3.path());
98+
99+
if (!path.empty()) {
100+
uri << "/" << path;
101+
}
102+
103+
OnS3Callback_(s3, uri);
104+
}
105+
}
106+
107+
void TIcebergProcessor::ProcessCatalogHadoop(const FederatedQuery::IcebergCatalog_Hadoop& hadoop) {
108+
if (!hadoop.has_directory()
109+
|| hadoop.directory().empty()) {
110+
DoOnPropertyRequiredError("hadoop.directory");
111+
}
112+
113+
if (OnHadoopCallback_ && !HasErrors()) {
114+
OnHadoopCallback_(hadoop);
115+
}
116+
}
117+
118+
void TIcebergProcessor::ProcessCatalogHive(const FederatedQuery::IcebergCatalog_HiveMetastore& hive) {
119+
if (!hive.has_uri()
120+
|| hive.uri().empty()) {
121+
DoOnPropertyRequiredError("hive_metastore.uri");
122+
}
123+
124+
if (!hive.has_database_name()
125+
|| hive.database_name().empty()) {
126+
DoOnPropertyRequiredError("hive_metastore.database");
127+
}
128+
129+
if (OnHiveCallback_ && !HasErrors()) {
130+
OnHiveCallback_(hive);
131+
}
132+
}
133+
134+
void TIcebergProcessor::ProcessCatalog(const FederatedQuery::IcebergCatalog& catalog) {
135+
if (catalog.has_hive()) {
136+
ProcessCatalogHive(catalog.hive());
137+
} else if (catalog.has_hadoop()) {
138+
ProcessCatalogHadoop(catalog.hadoop());
139+
} else {
140+
DoOnPropertyRequiredError("catalog.type");
141+
}
142+
}
143+
144+
TString MakeIcebergCreateExternalDataSourceProperties(const NConfig::TCommonConfig& yqConfig, const FederatedQuery::Iceberg& config) {
145+
using namespace fmt::literals;
146+
using namespace NKikimr::NExternalSource::NIceberg;
147+
148+
TIcebergProcessor processor(config);
149+
150+
// warehouse configuration
151+
TString warehouseSection;
152+
153+
processor.SetDoOnWarehouseS3([&warehouseSection, &yqConfig](const FederatedQuery::IcebergWarehouse_S3&, const TString& uri) {
154+
warehouseSection = fmt::format(
155+
R"(
156+
{warehouse_type}={warehouse_type_value},
157+
{warehouse_s3_region}={warehouse_s3_region_value},
158+
{warehouse_s3_endpoint}={warehouse_s3_endpoint_value},
159+
{warehouse_s3_uri}={warehouse_s3_uri_value}
160+
)",
161+
"warehouse_type"_a = WAREHOUSE_TYPE,
162+
"warehouse_type_value"_a = EncloseAndEscapeString(VALUE_S3, '"'),
163+
"warehouse_s3_region"_a = WAREHOUSE_S3_REGION,
164+
"warehouse_s3_region_value"_a = EncloseAndEscapeString(VALUE_DEFAULT_REGION, '"'),
165+
"warehouse_s3_endpoint"_a = WAREHOUSE_S3_ENDPOINT,
166+
"warehouse_s3_endpoint_value"_a = EncloseAndEscapeString(yqConfig.GetObjectStorageEndpoint(), '"'),
167+
"warehouse_s3_uri"_a = WAREHOUSE_S3_URI,
168+
"warehouse_s3_uri_value"_a = EncloseAndEscapeString(uri, '"')
169+
);
170+
});
171+
172+
// catalog configuration
173+
TString catalogSection;
174+
175+
processor.SetDoOnCatalogHive([&catalogSection](const FederatedQuery::IcebergCatalog_HiveMetastore& hive) {
176+
catalogSection = fmt::format(
177+
R"(
178+
{catalog_type}={catalog_type_value},
179+
{catalog_hive_uri}={catalog_hive_uri_value},
180+
database_name={database_name}
181+
)",
182+
"catalog_type"_a = CATALOG_TYPE,
183+
"catalog_type_value"_a = EncloseAndEscapeString(VALUE_HIVE, '"'),
184+
"catalog_hive_uri"_a = CATALOG_HIVE_URI,
185+
"catalog_hive_uri_value"_a = EncloseAndEscapeString(hive.uri(), '"'),
186+
"database_name"_a = EncloseAndEscapeString(hive.database_name(), '"')
187+
);
188+
});
189+
190+
processor.SetDoOnCatalogHadoop([&catalogSection](const FederatedQuery::IcebergCatalog_Hadoop& hadoop) {
191+
catalogSection = fmt::format(
192+
R"(
193+
{catalog_type}={catalog_type_value},
194+
database_name={database_name}
195+
)",
196+
"catalog_type"_a = CATALOG_TYPE,
197+
"catalog_type_value"_a = EncloseAndEscapeString(VALUE_HADOOP, '"'),
198+
"database_name"_a = EncloseAndEscapeString(hadoop.directory(), '"')
199+
);
200+
});
201+
202+
processor.Process();
203+
204+
// common configuration for all warehouses and catalogs
205+
TString commonSection = fmt::format(
206+
R"(
207+
source_type="Iceberg",
208+
use_tls="{use_tls}"
209+
)",
210+
"use_tls"_a = !yqConfig.GetDisableSslForGenericDataSources() ? "true" : "false"
211+
);
212+
213+
// merge config
214+
auto r = fmt::format(
215+
R"(
216+
{common_section},
217+
{warehouse_section},
218+
{catalog_section}
219+
)",
220+
"common_section"_a = commonSection,
221+
"warehouse_section"_a = warehouseSection,
222+
"catalog_section"_a = catalogSection
223+
);
224+
225+
return r;
226+
}
227+
228+
void FillIcebergGenericClusterConfig(const NConfig::TCommonConfig& yqConfig, const FederatedQuery::Iceberg& config, ::NYql::TGenericClusterConfig& cluster) {
229+
using namespace NKikimr::NExternalSource::NIceberg;
230+
231+
TIcebergProcessor processor(config);
232+
cluster.SetKind(NYql::EGenericDataSourceKind::ICEBERG);
233+
234+
auto& options = *cluster.MutableDataSourceOptions();
235+
236+
processor.SetDoOnWarehouseS3([&options, &yqConfig](const FederatedQuery::IcebergWarehouse_S3&, const TString& uri) {
237+
options[WAREHOUSE_TYPE] = VALUE_S3;
238+
options[WAREHOUSE_S3_ENDPOINT] = yqConfig.GetObjectStorageEndpoint();
239+
options[WAREHOUSE_S3_REGION] = VALUE_DEFAULT_REGION;
240+
options[WAREHOUSE_S3_URI] = uri;
241+
});
242+
243+
processor.SetDoOnCatalogHive([&options, &cluster](const FederatedQuery::IcebergCatalog_HiveMetastore& hive) {
244+
options[CATALOG_TYPE] = VALUE_HIVE;
245+
options[CATALOG_HIVE_URI] = hive.uri();
246+
247+
cluster.SetDatabaseName(hive.database_name());
248+
});
249+
250+
processor.SetDoOnCatalogHadoop([&options, &cluster](const FederatedQuery::IcebergCatalog_Hadoop& hadoop) {
251+
options[CATALOG_TYPE] = VALUE_HADOOP;
252+
253+
cluster.SetDatabaseName(hadoop.directory());
254+
});
255+
256+
processor.ProcessSkipAuth();
257+
}
258+
259+
} // NFq

Diff for: ydb/core/fq/libs/common/iceberg_processor.h

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#pragma once
2+
3+
#include <ydb/public/api/protos/draft/fq.pb.h>
4+
#include <ydb/core/fq/libs/config/yq_issue.h>
5+
#include <yql/essentials/providers/common/proto/gateways_config.pb.h>
6+
#include <ydb/core/fq/libs/config/protos/fq_config.pb.h>
7+
8+
namespace NFq {
9+
10+
class TIcebergProcessor {
11+
public:
12+
explicit TIcebergProcessor(const FederatedQuery::Iceberg& config);
13+
14+
TIcebergProcessor(const FederatedQuery::Iceberg& config, NYql::TIssues& issues);
15+
16+
~TIcebergProcessor() = default;
17+
18+
void Process();
19+
20+
void ProcessSkipAuth();
21+
22+
void SetDoOnWarehouseS3(std::function<
23+
void(const FederatedQuery::IcebergWarehouse_S3&, const TString&)> callback) {
24+
OnS3Callback_ = callback;
25+
}
26+
27+
void SetDoOnCatalogHive(std::function<
28+
void(const FederatedQuery::IcebergCatalog_HiveMetastore&)> callback) {
29+
OnHiveCallback_ = callback;
30+
}
31+
32+
void SetDoOnCatalogHadoop(std::function<
33+
void(const FederatedQuery::IcebergCatalog_Hadoop&)> callback) {
34+
OnHadoopCallback_ = callback;
35+
}
36+
37+
private:
38+
void DoOnPropertyRequiredError(const TString& property);
39+
40+
void DoOnError(const TString& property, const TString& msg);
41+
42+
void ProcessWarehouse(const FederatedQuery::IcebergWarehouse& warehouse);
43+
44+
void ProcessWarehouseS3(const FederatedQuery::IcebergWarehouse_S3& s3);
45+
46+
void ProcessCatalog(const FederatedQuery::IcebergCatalog& catalog);
47+
48+
void ProcessCatalogHadoop(const FederatedQuery::IcebergCatalog_Hadoop& hadoop);
49+
50+
void ProcessCatalogHive(const FederatedQuery::IcebergCatalog_HiveMetastore& hive);
51+
52+
bool HasErrors() const {
53+
return Issues_ && !Issues_->Empty();
54+
}
55+
56+
private:
57+
const FederatedQuery::Iceberg& Config_;
58+
NYql::TIssues* Issues_;
59+
std::function<void(const FederatedQuery::IcebergWarehouse_S3&, const TString&)> OnS3Callback_;
60+
std::function<void(const FederatedQuery::IcebergCatalog_HiveMetastore&)> OnHiveCallback_;
61+
std::function<void(const FederatedQuery::IcebergCatalog_Hadoop&)> OnHadoopCallback_;
62+
};
63+
64+
TString MakeIcebergCreateExternalDataSourceProperties(const NConfig::TCommonConfig& yqConfig,
65+
const FederatedQuery::Iceberg& config);
66+
67+
void FillIcebergGenericClusterConfig(const NConfig::TCommonConfig& yqConfig,
68+
const FederatedQuery::Iceberg& config,
69+
NYql::TGenericClusterConfig& cluster);
70+
71+
} // NFq

0 commit comments

Comments
 (0)