From 7e8fde0de46223a6cbe63a5799b3168e4f0a27d1 Mon Sep 17 00:00:00 2001 From: TC0607 Date: Thu, 6 Feb 2025 16:47:50 +0800 Subject: [PATCH] Create Stamp_Version_1 --- Stamp_Version_1 | 555 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 555 insertions(+) create mode 100644 Stamp_Version_1 diff --git a/Stamp_Version_1 b/Stamp_Version_1 new file mode 100644 index 00000000..cf8c317f --- /dev/null +++ b/Stamp_Version_1 @@ -0,0 +1,555 @@ +class StampPipeline: + + product_criteria_fields = ['product_sku_external_reference','product_division','product_department','product_category','product_subcategory','product_class','product_brand'] + + def __init__(self, offer_id): + self.offer_id = offer_id + self.offer_summary = self.get_offer_summary() + self.process_offer_summary() + self.stamp_type = self.get_stamp_type() + self.stamp_pre_transaction_table_name = 'project_yuu_stamp_publish.yuu_stamp_intermediate_pre_transaction' + self.stamp_pro_transaction_table_name = 'project_yuu_stamp_publish.yuu_stamp_intermediate_pro_transaction' + self.stamp_reward_issued_table_name = 'project_yuu_stamp_publish.yuu_stamp_intermediate_reward_issued' + self.stamp_redemption_table_name = 'project_yuu_stamp_publish.yuu_stamp_intermediate_redemption' + # self.stamp_total_pre_transaction_table_name = 'project_yuu_stamp_publish.yuu_stamp_intermediate_total_pre_transaction' + # self.stamp_total_pro_transaction_table_name = 'project_yuu_stamp_publish.yuu_stamp_intermediate_total_pro_transaction' + self.spend_band = self.offer_summary['spend_band'] + + def read_sql(self, query): + sdf = spark.read.format("bigquery").load(query) + for col in sdf.columns: + sdf = sdf.withColumnRenamed(col, col.lower()) + return sdf + + def run(self): + # 1.1 check pre transaction records for each stamp id. If no records, run pre transaction and publish to stamp_pre_transaction_table_name + if self.get_pre_transaction_rows(): + stamp_pre_transaction_sdf = self.get_pre_transaction() + self.sdf_to_gcp(stamp_pre_transaction_sdf, self.stamp_pre_transaction_table_name) + + # 1.2 check pro transaction records for each stamp id. If no refresh date is retrieved, run pro transaction and publish to stamp_pro_transaction_table_name + transaction_last_refresh_date = self.get_pro_transaction_last_refresh_date() + if transaction_last_refresh_date: + start_date = transaction_last_refresh_date + timedelta(days=1) + stamp_pro_transaction_sdf = self.get_pro_transaction(start_date, self.end_date) + self.sdf_to_gcp(stamp_pro_transaction_sdf, self.stamp_pro_transaction_table_name) + else: + stamp_pro_transaction_sdf = self.get_pro_transaction(self.start_date, self.end_date) + self.sdf_to_gcp(stamp_pro_transaction_sdf, self.stamp_pro_transaction_table_name) + + # 1.3 check rewards issued for each stamp id. If no refresh date is retrieved, run stamp reward and publish to stamp_reward_issued_table_name + reward_last_refresh_date = self.get_reward_last_refresh_date() + if reward_last_refresh_date: + start_date = reward_last_refresh_date + timedelta(days=1) + stamp_reward_issued_sdf = self.get_stamp_reward_issued(start_date, self.end_date) + self.sdf_to_gcp(stamp_reward_issued_sdf, self.stamp_reward_issued_table_name) + else: + stamp_reward_issued_sdf = self.get_stamp_reward_issued(self.start_date, self.end_date) + self.sdf_to_gcp(stamp_reward_issued_sdf, self.stamp_reward_issued_table_name) + + # 1.4 check redemption for each stamp id. If no refresh date is retrieved, run stamp redemption and publish to stamp_redemption_table_name + redemption_last_refresh_date = self.get_redemption_last_refresh_date() + if redemption_last_refresh_date: + start_date = redemption_last_refresh_date + timedelta(days=1) + stamp_redemption_issued_sdf = self.get_stamp_redemption(start_date, self.end_date) + self.sdf_to_gcp(stamp_redemption_issued_sdf, self.stamp_redemption_table_name) + else: + stamp_redemption_issued_sdf = self.get_stamp_redemption(self.start_date, self.end_date) + self.sdf_to_gcp(stamp_redemption_issued_sdf, self.stamp_redemption_table_name) + + # 1.5 Delete old record from azure (TODO: Keeping for future use if there is data failure in extraction or loading) + # self.delete_from_gcp_table(self.stamp_pro_transaction_table_name, f"OFFER_ID = '{self.offer_id}' AND TIME_PERIOD BETWEEN '{self.start_date}' AND '{self.end_date}'") + # self.delete_from_gcp_table(self.stamp_reward_issued_table_name, f"OFFER_ID = '{self.offer_id}' AND TIME_PERIOD BETWEEN '{self.start_date}' AND '{self.end_date}'") + # self.delete_from_gcp_table(self.stamp_redemption_table_name, f"OFFER_ID = '{self.offer_id}' AND TIME_PERIOD BETWEEN '{self.start_date}' AND '{self.end_date}'") + # self.delete_from_gcp_table(self.stamp_total_pro_transaction_table_name, f"OFFER_ID = '{self.offer_id}' AND TIME_PERIOD BETWEEN '{self.start_date}' AND '{self.end_date}'") + + # 1.6 Upload log for laoding status (TODO: Indicate the stamp data loading status) + # log_sdf = self.get_log_sdf(offer_usage=1, offer_basket=1) + # self.sdf_to_azure(log_sdf, 'PUBLISH_YUU.YUU_OE_LOG') + # Indicate error and loading progress of each offer each day, and which part of progress failed + + # Function for uploading data tables on to GCP + def sdf_to_gcp(self, sdf, table_name): + sdf.write\ + .format("bigquery") \ + .option("table", table_name)\ + .option("temporaryGcsBucket", "dataproc-temp-asia-southeast1-180559739536-birf5lra") \ + .option("maxStrLength", "3000" )\ + .mode("append").save() #APPEND/OVERWRITE + + # Function for deleting data tables with particular conditions in GCP + def delete_from_gcp_table(self, table_name, condition): + client = bigquery.Client() + delete_sql = f"DELETE FROM {table_name} WHERE {condition}" + refresh_job = client.query(delete_sql) + refresh_job.result() + + # Function for checking if there are any pre transactions records for each stamp id + def get_pre_transaction_rows(self): + # check whether pre transaction is loaded, return True / False + query = f""" + SELECT + COUNT(*) AS number_of_records + FROM `project_yuu_stamp_publish.yuu_stamp_intermediate_pre_transaction` + WHERE + 1 = 1 + AND OFFER_ID = '{self.offer_id}' + """ + df = pandas_gbq.read_gbq(query) + return df.iloc[0,0] == 0 + + # Function for checking if there is latest refresh date in the pro transaction data table for each stamp id, return a date for TRUE while null for FALSE + def get_pro_transaction_last_refresh_date(self): + query = f""" + SELECT + MAX(time_period) AS Latest_Refresh_Date + FROM `project_yuu_stamp_publish.yuu_stamp_intermediate_pro_transaction` + WHERE + 1 = 1 + AND OFFER_ID = '{self.offer_id}' + """ + df = pandas_gbq.read_gbq(query) + if pd.notnull(df.iloc[0,0]): + return df.iloc[0,0] + + # Function for checking if there is latest refresh date in the reward data table for each stamp id, return a date for TRUE while null for FALSE + def get_reward_last_refresh_date(self): + query = f""" + SELECT + MAX(time_period) AS Latest_Refresh_Date + FROM `project_yuu_stamp_publish.yuu_stamp_intermediate_reward_issued` + WHERE + 1 = 1 + AND OFFER_ID = '{self.offer_id}' + """ + df = pandas_gbq.read_gbq(query) + if pd.notnull(df.iloc[0,0]): + return df.iloc[0,0] + + # Function for checking if there is latest refresh date in the redemption data table for each stamp id, return a date for TRUE while null for FALSE + def get_redemption_last_refresh_date(self): + query = f""" + SELECT + MAX(time_period) AS Latest_Refresh_Date + FROM `project_yuu_stamp_publish.yuu_stamp_intermediate_redemption` + WHERE + 1 = 1 + AND OFFER_ID = '{self.offer_id}' + """ + df = pandas_gbq.read_gbq(query) + if pd.notnull(df.iloc[0,0]): + return df.iloc[0,0] + + # FunctionS: + # 1) for getting details of each stamp id and join with the offer details to get the relevant spend threshold + # 2) for joining with the long sku list from SEHK + def get_offer_summary(self): + query = f""" + SELECT + * + FROM `dfgp-prd-datalake-01.project_yuu_sharepoint_analyse.anfield_md_offer_master` + WHERE + 1 = 1 + AND PROMOTIONAL_EXECUTION = 'MMS' + AND STATUS != 'Cancelled' + AND OFFER_ID = '{self.offer_id}' + """ + ma = f""" + SELECT + offer_id + ,spend_threshold + FROM `dfgp-prd-datalake-01.project_yuu_ma_offer_analyse.hkma_tx_base_offer_details` + WHERE + 1 = 1 + AND OFFER_ID = '{self.offer_id}' + """ + sku_df = pd.read_csv("gs://yuhk-prd-datalake-01-gcs-userdata/04-publish/Yuu_Rewards/yuuStamp/long_SKU_list_stamp_offer.csv") + sku_sdf = spark.createDataFrame(sku_df) + sku_sdf = sku_sdf.withColumnRenamed('product_sku_external_reference', 'product_sku_external_reference_long') + sdf = self.read_sql(query) + ma_sdf = self.read_sql(ma) + sdf = sdf.join(ma_sdf, on='offer_id', how='left') + sdf = sdf.withColumn('spend_band', F.coalesce( + sdf.spend_band.cast(IntegerType()), + ma_sdf.spend_threshold.cast(IntegerType()) * (1 - sdf.return_percent_value.cast(DoubleType())) + )) + sdf = sdf.withColumn('stamp_spend_band', F.when(F.col('spend_band').isNotNull(), 1).otherwise(0)) + sdf = sdf.withColumn('stamp_sku', F.when(F.col('product_sku_external_reference').isNotNull() | F.col('product_brand').isNotNull() | F.col('product_category').isNotNull() | F.col('product_subcategory').isNotNull() | F.col('product_class').isNotNull(), 1).otherwise(0)) + sdf_long_sku_list = sdf.filter(F.col('product_sku_external_reference') == 'Please refer to sheet') + sdf_long_sku_list = sdf_long_sku_list.join(sku_sdf, on='external_reference_no', how='left') + sdf_long_sku_list = sdf_long_sku_list.drop('product_sku_external_reference') + sdf_long_sku_list = sdf_long_sku_list.withColumnRenamed('product_sku_external_reference_long', 'product_sku_external_reference') + sdf_normal_sku_list = sdf.filter(F.col('product_sku_external_reference') != 'Please refer to sheet') + sdf = sdf_normal_sku_list.unionByName(sdf_long_sku_list) + #TODO: data cleansing from offer summary + str_columns = [item[0] for item in sdf.dtypes if item[1].startswith('string')] + for c in str_columns: + sdf = sdf.withColumn(c, F.when(F.trim(sdf[c]) != "", F.trim(sdf[c])).otherwise(None)) + sdf = sdf.withColumn(c, F.regexp_replace(c, '[^a-zA-Z0-9\,\-]', '')) + return sdf.first().asDict() + + def process_offer_summary(self): + self.banner = self.offer_summary['business_unit'] + self.offer_type = self.offer_summary['promotional_execution'] + self.offer_id = self.offer_summary['offer_id'] + self.transaction_criteria = self.get_transaction_criteria() + self.transaction_banner_criteria = self.get_transaction_bannr_criteria() + self.product_criteria = self.get_product_criteria() + + # get offer start date + offer_last_refresh_date = self.get_offer_last_refresh_date() + if offer_last_refresh_date: + self.start_date = offer_last_refresh_date + else: + self.start_date = self.offer_summary['offer_availability_start_date'] + + if self.offer_summary['offer_redemption_expiry_date']: + self.end_date = self.offer_summary['offer_redemption_expiry_date'] + elif self.offer_summary['offer_redemption_validity_period_days'] is not None: + if self.offer_summary['offer_redemption_validity_period_days'].isdigit(): + self.end_date = self.offer_summary['offer_availability_end_date'] + timedelta(days=int(self.offer_summary['offer_redemption_validity_period_days'])) + else: + self.end_date = self.offer_summary['offer_availability_end_date'] + pre_period_no_of_days = (self.end_date - self.offer_summary['offer_availability_start_date']).days + + if self.banner == 'IKHK' and self.offer_type in ('MMS'): + # IKEA use same days last year + self.pre_start_date = self.offer_summary['offer_availability_start_date'] + relativedelta(years=-1) + self.pre_end_date = self.end_date + relativedelta(years=-1) + else: + self.pre_start_date = self.offer_summary['offer_availability_start_date'] + timedelta(days=pre_period_no_of_days * -1) + self.pre_end_date = self.offer_summary['offer_availability_start_date'] + timedelta(days=-1) + + def get_offer_last_refresh_date(self): + date = f""" + SELECT + MAX(TIME_PERIOD) AS Offer_Last_Refresh_Date + FROM `project_yuu_stamp_publish.yuu_stamp_intermediate_pro_transaction` + WHERE + 1 = 1 + AND OFFER_ID = '{self.offer_id}' + """ + # return date + df = pandas_gbq.read_gbq(date) + if pd.notnull(df.iloc[0,0]): + return df.iloc[0,0] + + def get_stamp_type(self): + spend_band = self.offer_summary['spend_band'] + product = self.product_criteria + if spend_band is not None and product is not None: + return 'stamp_spend_band_sku' + elif spend_band is not None: + return 'stamp_spend_band' + elif product is not None: + return 'stamp_sku' + return stamp_type + + def get_transaction_criteria(self): + # transaction-level filter + field = ['business_unit','location_group', 'brand', 'country'] + transaction_criteria = {k: v for k, v in self.offer_summary.items() if v and k in field} + if 'business_unit' in transaction_criteria.keys(): + transaction_criteria['sponsor_business_unit'] = transaction_criteria.pop('business_unit') + if 'location_group' in transaction_criteria.keys(): + transaction_criteria['location_external_reference'] = transaction_criteria.pop('location_group') + return transaction_criteria + + def filter_transaction_criteria(self, sdf): + for k, v in self.transaction_criteria.items(): + # Handling for the field "brand" + if k == 'brand': + sdf = self.filter_transaction_criteria_brand(sdf, k, v) + elif k == 'country': + sdf = self.filter_transaction_criteria_country(sdf, k, v) + else: + sdf = sdf.filter(sdf[k].isin(v.split(','))) + return sdf + + def filter_transaction_criteria_brand(self, sdf, k, v): + # WEHK handling for brand criteria + if self.banner == 'WEHK': + sdf = sdf.filter(sdf['store_format'].isin(v.split(','))) + # MXHK handling for brand criteria + if self.banner == 'MXHK': + query = "SELECT location_brand store_format, maxim_location brand FROM `dfgp-prd-datalake-01.banner_hkmx_analyse.hkmx_md_sharepoint_cuisine_type`" + mx_brand_sdf = self.read_sql(query) + sdf = sdf.join(mx_brand_sdf, on='store_format', how='left') + values = [x.strip() for x in v.split(',')] + sdf = sdf.filter(sdf[k].isin(values)) + return sdf + + def filter_transaction_criteria_country(self, sdf, k, v): + v_char = re.sub('[^a-zA-Z0-9\,\-]', '', v) + if self.banner == 'SEHK': + if v_char == 'hongkong': + sdf = sdf.filter(sdf[k] == 'HK') + elif v_char == 'macau': + sdf = sdf.filter(sdf[k] == 'MO') + return sdf + + def get_transaction_bannr_criteria(self): + # transaction-level filter + field = ['business_unit','location_group', 'brand'] + transaction_banner_criteria = {k: v for k, v in self.offer_summary.items() if v and k in field} + if 'business_unit' in transaction_banner_criteria.keys(): + transaction_banner_criteria['sponsor_business_unit'] = transaction_banner_criteria.pop('business_unit') + if 'location_group' in transaction_banner_criteria.keys(): + transaction_banner_criteria['location_external_reference'] = transaction_banner_criteria.pop('location_group') + return transaction_banner_criteria + + def filter_transaction_banner_criteria(self, sdf): + for k, v in self.transaction_banner_criteria.items(): + # Handling for the field "brand" + if k == 'brand': + sdf = self.filter_transaction_criteria_brand(sdf, k, v) + else: + sdf = sdf.filter(sdf[k].isin(v.split(','))) + return sdf + + def filter_transaction_brand_criteria_brand(self, sdf, k, v): + # WEHK handling for brand criteria + if self.banner == 'WEHK': + sdf = sdf.filter(sdf['store_format'].isin(v.split(','))) + # MXHK handling for brand criteria + if self.banner == 'MXHK': + query = "SELECT location_brand store_format, maxim_location brand FROM `dfgp-prd-datalake-01.banner_hkmx_analyse.hkmx_md_sharepoint_cuisine_type`" + mx_brand_sdf = self.read_sql(query) + sdf = sdf.join(mx_brand_sdf, on='store_format', how='left') + values = [x.strip() for x in v.split(',')] + sdf = sdf.filter(sdf[k].isin(values)) + return sdf + + def get_product_criteria(self): + product_criteria = {} + for k, v in self.offer_summary.items(): + if k in self.product_criteria_fields and v: + if k == 'product_sku_external_reference': + product_criteria['product_item_external_reference'] = v + else: + product_criteria[k] = v + return product_criteria + + def filter_product_criteria(self, sdf): + product_filter_list = [] + for k, v in self.product_criteria.items(): # Filter product criteria + product_filter_list.append(sdf[k].isin(v.split(','))) + filter = reduce(lambda x, y: x | y, product_filter_list) # Filter product criteria in OR condition + sdf = sdf.filter(filter) + return sdf + + def get_transaction_product_level_sdf(self, start_date, end_date): + query = f""" + SELECT + a.* + ,b.product_item_external_reference + ,b.final_unit_price + ,c.product_division + ,c.rpoduct_department product_department + ,c.product_category + ,c.product_subcategory + ,c.product_class + ,c.product_brand + FROM `dfgp-prd-datalake-01.project_yuu_loy_publish.anfield_loy_sales_metrics` a + LEFT join `dfgp-prd-datalake-01.project_yuu_alp_analyse.anfield_tx_transaction_product_info` b on CAST(a.transaction_external_reference AS STRING) = CAST(b.app_transaction_id AS STRING) + LEFT JOIN `dfgp-prd-datalake-01.project_yuu_alp_analyse.anfield_md_product` c on c.product_item_external_reference = b.PRODUCT_ITEM_EXTERNAL_REFERENCE + WHERE + 1 = 1 + AND b.final_unit_price > 0 + AND CAST(a.time_period AS DATE) BETWEEN '{start_date}' AND '{end_date}' + """ + sdf = self.read_sql(query) + sdf = self.filter_transaction_banner_criteria(sdf) + + sdf = self.filter_product_criteria(sdf) + product_sdf = sdf.groupBy(['transaction_external_reference']).agg( + F.sum('final_unit_price').alias('offer_product_sales'), + F.count('final_unit_price').alias('offer_product_unit') + ) + product_sdf = product_sdf.filter(product_sdf.offer_product_sales > 0) + return product_sdf + + def get_transaction(self, start_date, end_date): + query = f""" + SELECT + * + FROM `dfgp-prd-datalake-01.project_yuu_loy_publish.anfield_loy_sales_metrics` + WHERE + 1 = 1 + AND TRANSACTION_RETAIL_VALUE > 0 + AND CAST(time_period AS DATE) BETWEEN '{start_date}' AND '{end_date}' + """ + sdf = self.read_sql(query) + sdf = self.filter_transaction_banner_criteria(sdf) + sdf = sdf.withColumn('offer_id', F.lit(self.offer_id)) + + # If offer has product_criteria, run product criteria functions + # print(self.product_criteria) + if self.product_criteria: + #product_sdf = self.get_transaction_product_level_sdf(start_date, end_date, spend_band) + product_sdf = self.get_transaction_product_level_sdf(start_date, end_date) + sdf = sdf.join(product_sdf, on='transaction_external_reference', how='inner').select( + sdf['*'], + product_sdf['offer_product_sales'], + product_sdf['offer_product_unit'], + ) + else: + sdf = sdf.withColumn('offer_product_sales', sdf.transaction_retail_value) + # sdf = sdf.withColumn('offer_product_unit', F.lit(0)) + sdf = sdf.withColumn('offer_product_unit', countDistinct('transaction_external_reference')) + + # print(self.stamp_type) + if self.stamp_type == 'stamp_sku': + sdf = sdf.withColumn('no_of_stamp', sdf.offer_product_unit) + + elif self.stamp_type == 'stamp_spend_band': + sdf = sdf.filter(sdf.offer_product_sales > self.spend_band) + sdf = sdf.withColumn('no_of_stamp', F.floor(sdf.offer_product_sales/self.spend_band)) + + elif self.stamp_type == 'stamp_spend_band_sku': + + sdf = sdf.filter(sdf.offer_product_sales > self.spend_band) + sdf = sdf.withColumn('no_of_stamp', + F.least( + F.floor(sdf.offer_product_sales/self.spend_band) + ,sdf.offer_product_unit + ) + ) + + return sdf + +# 2 Jul first, check offer id max date > run +# +7 <= run date +# pre period of offer id max date e.g.30 June, + 7 days ==> >= 7 July (today) then pass +# Incremental sync + def get_pro_transaction(self, start_date, end_date): + pro_sdf = self.get_transaction(start_date, end_date) + pro_grouped_sdf = pro_sdf.groupBy(['member_id', 'offer_id', 'time_period']).agg( + F.sum(pro_sdf.no_of_stamp).alias('no_of_stamp'), + F.sum(pro_sdf.offer_product_sales).alias('sales'), + F.sum(pro_sdf.offer_product_unit).alias('units'), + F.count(pro_sdf.transaction_external_reference).alias('transaction_count'), + F.sum(pro_sdf.point_amount).alias('point_amount') + ) + pro_grouped_sdf = pro_grouped_sdf.withColumn('update_date', current_date()) + pro_grouped_sdf = pro_grouped_sdf.withColumn('period_type', lit('promotion_period')) + return pro_grouped_sdf + + def get_pre_transaction(self): + pre_sdf = self.get_transaction(self.pre_start_date, self.pre_end_date) + pre_grouped_sdf = pre_sdf.groupBy(['member_id', 'offer_id', 'time_period']).agg( + F.sum(pre_sdf.no_of_stamp).alias('no_of_stamp'), + F.sum(pre_sdf.offer_product_sales).alias('sales'), + F.sum(pre_sdf.offer_product_unit).alias('units'), + F.count(pre_sdf.transaction_external_reference).alias('transaction_count'), + F.sum(pre_sdf.point_amount).alias('point_amount') + ) + pre_grouped_sdf = pre_grouped_sdf.withColumn('update_date', current_date()) + pre_grouped_sdf = pre_grouped_sdf.withColumn('period_type', lit('pre_period')) + return pre_grouped_sdf + + def get_stamp_reward_issued(self, start_date, end_date): + query = f""" + SELECT + Member.member_account_id as member_id + ,Reward.sponsor_business_unit + ,Store.store_format + ,Reward.Reward_Type_External_Reference AS offer_id + ,CAST(Reward.Issued_Timestamp AS DATE) AS time_period + FROM `dfgp-prd-datalake-01.project_yuu_loy_publish.anfield_loy_reward_status_analytics` AS Reward + LEFT JOIN ( + SELECT + member_id + ,member_account_id + FROM `dfgp-prd-datalake-01.project_yuu_loy_publish.anfield_loy_member_account` + WHERE + 1 = 1 + AND ID_TYPE_EXTERNAL_REFERENCE = 'AUTO_GEN_ID') AS Member + ON CAST(Reward.member_id AS STRING) = CAST(Member.member_id AS STRING) + LEFT JOIN `dfgp-prd-datalake-01.project_yuu_alp_analyse.anfield_md_store_profile` AS Store + ON Reward.REDEEMED_LOCATION = Store.LOCATION_EXTERNAL_REFERENCE + WHERE + 1 = 1 + AND Reward.Reward_Type_External_Reference = '{self.offer_id}' + AND CAST(Reward.Issued_Timestamp AS DATE) BETWEEN '{start_date}' AND '{end_date}' + """ + reward_sdf = self.read_sql(query) + reward_sdf = self.filter_transaction_banner_criteria(reward_sdf) + # reward_sdf = reward_sdf.filter(reward_sdf.offer_id == self.offer_id) + # reward_sdf = reward_sdf.filter(reward_sdf.time_period.between(start_date, end_date)) + reward_sdf = reward_sdf.groupBy('member_id', 'offer_id', 'time_period').agg(F.count('time_period').alias('no_of_rewards')) + return reward_sdf + + def get_stamp_redemption(self, start_date, end_date): + redemption = f""" + SELECT + Member.member_account_id AS member_id + ,Redemption.sponsor_business_unit + ,Store.store_format + ,Redemption.Reward_Type_External_Reference AS offer_id + ,CAST(Redemption.Redeemed_Timestamp AS DATE) AS time_period + ,Redemption.redeemed_location AS location_external_reference + FROM `dfgp-prd-datalake-01.project_yuu_loy_publish.anfield_loy_reward_status_analytics` AS Redemption + LEFT JOIN ( + SELECT + member_id + ,member_account_id + FROM `dfgp-prd-datalake-01.project_yuu_loy_publish.anfield_loy_member_account` + WHERE + 1 = 1 + AND ID_TYPE_EXTERNAL_REFERENCE = 'AUTO_GEN_ID') AS Member + ON CAST(Redemption.member_id AS STRING) = CAST(Member.member_id AS STRING) + LEFT JOIN `dfgp-prd-datalake-01.project_yuu_alp_analyse.anfield_md_store_profile` AS Store + ON Redemption.REDEEMED_LOCATION = Store.LOCATION_EXTERNAL_REFERENCE + WHERE + 1 = 1 + AND Redeemed_Timestamp IS NOT NULL + AND Redemption.Reward_Type_External_Reference = '{self.offer_id}' + AND CAST(Redemption.Redeemed_Timestamp AS DATE) BETWEEN '{start_date}' AND '{end_date}' + """ + sales = f""" + SELECT + member_id + ,sponsor_business_unit + ,store_format + ,CAST(TIME_PERIOD AS DATE) AS time_period + ,LOCATION_EXTERNAL_REFERENCE AS location_external_reference + ,TRANSACTION_RETAIL_VALUE AS sales + ,TRANSACTION_EXTERNAL_REFERENCE AS transaction_id + FROM `dfgp-prd-datalake-01.project_yuu_loy_publish.anfield_loy_sales_metrics` + WHERE + 1 = 1 + AND Transaction_Retail_Value > 0 + AND CAST(TIME_PERIOD AS DATE) BETWEEN '{start_date}' AND '{end_date}' + """ + redemption_sdf = self.read_sql(redemption) + redemption_sdf = self.filter_transaction_banner_criteria(redemption_sdf) + # redemption_sdf = redemption_sdf.filter(redemption_sdf.offer_id == self.offer_id) + # redemption_processed_sdf = redemption_sdf.filter(redemption_sdf.time_period.between(start_date, end_date)) + redemption_processed_sdf = redemption_sdf.groupBy('sponsor_business_unit', 'offer_id', 'member_id', 'time_period', 'location_external_reference', 'store_format').agg(F.count('time_period').alias('no_of_redemption')) + sales_sdf = self.read_sql(sales) + sales_processed_sdf = self.filter_transaction_banner_criteria(sales_sdf) + # sales_processed_sdf = sales_sdf.filter(sales_sdf.time_period.between(start_date, end_date)) + associated_sales_sdf = ( + sales_processed_sdf.groupBy('sponsor_business_unit', 'member_id', 'time_period', 'location_external_reference', 'store_format') + .agg( + F.sum('sales').alias('sales'), + F.count_distinct('transaction_id').alias('transaction_count') + ) + ) + stamp_redemption_final_sdf = redemption_processed_sdf.join( + associated_sales_sdf, + on=['member_id', 'time_period', 'location_external_reference'], + how='left' + ).fillna(0).fillna("N/A") + stamp_redemption_final_sdf = stamp_redemption_final_sdf.withColumnRenamed('sales', 'associated_sales') + stamp_redemption_final_sdf = stamp_redemption_final_sdf.withColumnRenamed('transaction_count', 'associated_transaction_count') + stamp_redemption_final_sdf = ( + stamp_redemption_final_sdf.groupBy('member_id', 'offer_id', 'time_period') + .agg( + F.sum('no_of_redemption').alias('no_of_redemption'), + F.sum('associated_sales').alias('assicoated_sales'), + F.sum('associated_transaction_count').alias('associated_transaction_count') + ) + ) + return stamp_redemption_final_sdf