Skip to content

Adding catalog for sourceDetails and targetDetails #173 #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 31, 2025
64 changes: 52 additions & 12 deletions src/dataflow_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,13 @@ def write_bronze(self):
self.cdc_apply_changes()
else:
target_path = None if self.uc_enabled else bronze_dataflow_spec.targetDetails["path"]
target_cl = bronze_dataflow_spec.targetDetails.get('catalog', None)
target_cl_name = f"{target_cl}." if target_cl is not None else ''
target_db_name = bronze_dataflow_spec.targetDetails['database']
target_table_name = bronze_dataflow_spec.targetDetails['table']

target_table = (
f"{bronze_dataflow_spec.targetDetails['database']}.{bronze_dataflow_spec.targetDetails['table']}"
f"{target_cl_name}{target_db_name}.{target_table_name}"
if self.uc_enabled and self.dpm_enabled
else bronze_dataflow_spec.targetDetails['table']
)
Expand All @@ -192,8 +197,12 @@ def write_silver(self):
self.cdc_apply_changes()
else:
target_path = None if self.uc_enabled else silver_dataflow_spec.targetDetails["path"]
target_cl = silver_dataflow_spec.targetDetails.get('catalog', None)
target_cl_name = f"{target_cl}." if target_cl is not None else ''
target_db_name = silver_dataflow_spec.targetDetails['database']
target_table_name = silver_dataflow_spec.targetDetails['table']
target_table = (
f"{silver_dataflow_spec.targetDetails['database']}.{silver_dataflow_spec.targetDetails['table']}"
f"{target_cl_name}{target_db_name}.{target_table_name}"
if self.uc_enabled and self.dpm_enabled
else silver_dataflow_spec.targetDetails['table']
)
Expand Down Expand Up @@ -239,12 +248,14 @@ def apply_custom_transform_fun(self, input_df):
def get_silver_schema(self):
"""Get Silver table Schema."""
silver_dataflow_spec: SilverDataflowSpec = self.dataflowSpec
source_cl = silver_dataflow_spec.sourceDetails.get('catalog', None)
source_cl_name = f"{source_cl}." if source_cl is not None else ''
source_database = silver_dataflow_spec.sourceDetails["database"]
source_table = silver_dataflow_spec.sourceDetails["table"]
select_exp = silver_dataflow_spec.selectExp
where_clause = silver_dataflow_spec.whereClause
raw_delta_table_stream = self.spark.readStream.table(
f"{source_database}.{source_table}"
f"{source_cl_name}{source_database}.{source_table}"
).selectExpr(*select_exp) if self.uc_enabled else self.spark.readStream.load(
path=silver_dataflow_spec.sourceDetails["path"],
format="delta"
Expand Down Expand Up @@ -272,12 +283,15 @@ def __apply_where_clause(self, where_clause, raw_delta_table_stream):
def read_silver(self) -> DataFrame:
"""Read Silver tables."""
silver_dataflow_spec: SilverDataflowSpec = self.dataflowSpec

source_cl = silver_dataflow_spec.sourceDetails.get('catalog', None)
source_cl_name = f"{source_cl}." if source_cl is not None else ''
source_database = silver_dataflow_spec.sourceDetails["database"]
source_table = silver_dataflow_spec.sourceDetails["table"]
select_exp = silver_dataflow_spec.selectExp
where_clause = silver_dataflow_spec.whereClause
raw_delta_table_stream = self.spark.readStream.table(
f"{source_database}.{source_table}"
f"{source_cl_name}{source_database}.{source_table}"
).selectExpr(*select_exp) if self.uc_enabled else self.spark.readStream.load(
path=silver_dataflow_spec.sourceDetails["path"],
format="delta"
Expand Down Expand Up @@ -323,8 +337,12 @@ def write_bronze_with_dqe(self):
self.cdc_apply_changes()
else:
target_path = None if self.uc_enabled else bronzeDataflowSpec.targetDetails["path"]
target_cl = bronzeDataflowSpec.targetDetails.get('catalog', None)
target_cl_name = f"{target_cl}." if target_cl is not None else ''
target_db_name = bronzeDataflowSpec.targetDetails['database']
target_table_name = bronzeDataflowSpec.targetDetails['table']
target_table = (
f"{bronzeDataflowSpec.targetDetails['database']}.{bronzeDataflowSpec.targetDetails['table']}"
f"{target_cl_name}{target_db_name}.{target_table_name}"
if self.uc_enabled and self.dpm_enabled
else bronzeDataflowSpec.targetDetails['table']
)
Expand Down Expand Up @@ -389,11 +407,12 @@ def write_bronze_with_dqe(self):
quarantineTargetDetails['cluster_by'])

target_path = None if self.uc_enabled else bronzeDataflowSpec.quarantineTargetDetails["path"]
bronze_cl = bronzeDataflowSpec.quarantineTargetDetails.get('catalog', None)
bronze_cl_name = f"{bronze_cl}." if bronze_cl is not None else ''
bronze_db = bronzeDataflowSpec.quarantineTargetDetails['database']
bronze_table = bronzeDataflowSpec.quarantineTargetDetails['table']

target_table = (
f"{bronze_db}.{bronze_table}"
f"{bronze_cl_name}{bronze_db}.{bronze_table}"
if self.uc_enabled and self.dpm_enabled
else bronze_table
)
Expand Down Expand Up @@ -460,8 +479,14 @@ def cdc_apply_changes(self):
apply_as_truncates = None
if cdc_apply_changes.apply_as_truncates:
apply_as_truncates = expr(cdc_apply_changes.apply_as_truncates)

target_cl = self.dataflowSpec.targetDetails.get('catalog', None)
target_cl_name = f"{target_cl}." if target_cl is not None else ''
target_db_name = self.dataflowSpec.targetDetails['database']
target_table_name = self.dataflowSpec.targetDetails['table']

target_table = (
f"{self.dataflowSpec.targetDetails['database']}.{self.dataflowSpec.targetDetails['table']}"
f"{target_cl_name}{target_db_name}.{target_table_name}"
if self.uc_enabled and self.dpm_enabled
else self.dataflowSpec.targetDetails['table']
)
Expand Down Expand Up @@ -518,10 +543,16 @@ def modify_schema_for_cdc_changes(self, cdc_apply_changes):

def create_streaming_table(self, struct_schema, target_path=None):
expect_all_dict, expect_all_or_drop_dict, expect_all_or_fail_dict = self.get_dq_expectations()

target_cl = self.dataflowSpec.targetDetails.get('catalog', None)
target_cl_name = f"{target_cl}." if target_cl is not None else ''
target_db_name = self.dataflowSpec.targetDetails['database']
target_table_name = self.dataflowSpec.targetDetails['table']

target_table = (
f"{self.dataflowSpec.targetDetails['database']}.{self.dataflowSpec.targetDetails['table']}"
f"{target_cl_name}{target_db_name}.{target_table_name}"
if self.uc_enabled and self.dpm_enabled
else self.dataflowSpec.targetDetails['table']
else target_table_name
)
dlt.create_streaming_table(
name=target_table,
Expand Down Expand Up @@ -613,14 +644,23 @@ def _launch_dlt_flow(
quarantine_input_view_name = None
if isinstance(dataflowSpec, BronzeDataflowSpec) and dataflowSpec.quarantineTargetDetails is not None \
and dataflowSpec.quarantineTargetDetails != {}:

qrt_cl = dataflowSpec.quarantineTargetDetails.get('catalog', None)
qrt_cl_str = f"{qrt_cl}_" if qrt_cl is not None else ''
qrt_db = dataflowSpec.quarantineTargetDetails['database'].replace('.', '_')
qrt_table = dataflowSpec.quarantineTargetDetails['table']
quarantine_input_view_name = (
f"{dataflowSpec.quarantineTargetDetails['table']}"
f"{qrt_cl_str}{qrt_db}_{qrt_table}"
f"_{layer}_quarantine_inputView"
)
quarantine_input_view_name = quarantine_input_view_name.replace(".", "")
else:
logger.info("quarantine_input_view_name set to None")
target_view_name = f"{dataflowSpec.targetDetails['table']}_{layer}_inputView"
target_cl = dataflowSpec.targetDetails.get('catalog', None)
target_cl_str = f"{target_cl}_" if target_cl is not None else ''
target_db = dataflowSpec.targetDetails['database'].replace('.', '_')
target_table = dataflowSpec.targetDetails['table']
target_view_name = f"{target_cl_str}{target_db}_{target_table}_{layer}_inputView"
target_view_name = target_view_name.replace(".", "")
dlt_data_flow = DataflowPipeline(
spark,
Expand Down
9 changes: 7 additions & 2 deletions src/pipeline_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,21 @@ def read_dlt_delta(self) -> DataFrame:
logger.info("In read_dlt_cloud_files func")

if self.reader_config_options and len(self.reader_config_options) > 0:

source_cl = self.source_details.get('source_catalog', None)
source_cl_name = f"{source_cl}." if source_cl is not None else ''
return (
self.spark.readStream.options(**self.reader_config_options).table(
f"""{self.source_details["source_database"]}
f"""{source_cl_name}{self.source_details["source_database"]}
.{self.source_details["source_table"]}"""
)
)
else:
source_cl = self.source_details.get('source_catalog', None)
source_cl_name = f"{source_cl}." if source_cl is not None else ''
return (
self.spark.readStream.table(
f"""{self.source_details["source_database"]}
f"""{source_cl_name}{self.source_details["source_database"]}
.{self.source_details["source_table"]}"""
)
)
Expand Down
Loading