プロジェクト

全般

プロフィール

ToDo #781 » glue-job-flashsalesreports-nec-recovery.py

武田 遼河, 2024/11/21 19:35

 
1
import sys
2
import re
3
import boto3
4
from decimal import *
5
from datetime import datetime, timedelta
6
from pyspark.sql import functions as F
7
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType
8
from awsglue.transforms import *
9
from awsglue.utils import getResolvedOptions
10
from pyspark.context import SparkContext
11
from awsglue.context import GlueContext
12
from awsglue.job import Job
13
import pytz
14
from pyspark.sql import DataFrame
15
import time
16

    
17
# Initialize Glue context and job
18
sc = SparkContext()
19
glueContext = GlueContext(sc)
20
spark = glueContext.spark_session
21

    
22
# Glue job initialization
23
job = Job(glueContext)
24
job.init("RecoveryJob", args={})
25

    
26
# Redshift connection options
27
redshift_connection_options = {
28
    "url": "jdbc:redshift://redshift.crlryjxenjmk.ap-northeast-1.redshift.amazonaws.com:5439/redshift_db",
29
    "user": "admin",
30
    "password": "Password123!",
31
    "truncate_table": "ON",
32
    "usestagingtable": "OFF",
33
    "redshiftTmpDir": "s3://data-platform-prd-glue-temp"
34
}
35

    
36
# Function to load a table from Redshift and create a temporary view
37
def load_redshift_table(dbtable, view_name):
38
    connection_options = redshift_connection_options.copy()
39
    connection_options["dbtable"] = dbtable
40
    df = glueContext.create_dynamic_frame_from_options("redshift", connection_options).toDF()
41
    df.createOrReplaceTempView(view_name)
42
    return df
43

    
44
# Load necessary tables from Redshift
45
load_redshift_table("dwh.dwh_forecast_tbl", "tbl_forecast")
46
load_redshift_table("dwh.dwh_jiseki_2019", "tbl_jiseki_2019")
47
load_redshift_table("dwh.dwh_mst_calendar", "tbl_calendar")
48
load_redshift_table("public.nec_tbl_401", "tbl_nec_401")
49

    
50
# Date range for processing
51
start_date = datetime.strptime('20241115', '%Y%m%d')
52
end_date = datetime.strptime('20241118', '%Y%m%d')
53
current_date = start_date
54

    
55
while current_date <= end_date:
56
    try:
57
        # Calculate previous year date
58
        ly_today = current_date.replace(year=current_date.year - 1)
59

    
60
        # Calculate 2019 date
61
        today_2019 = ly_today.replace(year=2019)
62
        
63
        # 修正部分: pre_year_cal_date_ymdを使用して前年の日付を取得
64
        pre_year_date_query = f"""
65
            SELECT pre_year_cal_date_ymd 
66
            FROM tbl_calendar 
67
            WHERE cal_date_ymd = '{current_date.strftime('%Y%m%d')}'
68
        """
69
        pre_year_date_df = spark.sql(pre_year_date_query)
70
        pre_year_date = datetime.strptime(pre_year_date_df.collect()[0][0], '%Y%m%d')
71
        
72
        # Create parameter set for current date
73
        params = {
74
            "today": current_date.strftime('%Y%m%d'),
75
            "ly_today": ly_today.strftime('%Y%m%d'),
76
            "2019_today": today_2019.strftime('%Y%m%d'),
77
            "pre_year_date": pre_year_date.strftime('%Y%m%d')
78
        }
79

    
80
        # Main SQL query with tbl_forecast as the base
81
        sql_nec = f"""
82
        SELECT
83
            fc.date AS date,
84
            fc.mst_shop_shop_cd AS shop_code,
85
            COALESCE(n1.n1_amount_counter, 0) AS actual_sales,
86
            COALESCE(n2.n2_customer_counter, 0) AS actual_customer_num,
87
            COALESCE(fc.sales_budget, 0) AS budget_sales,
88
            COALESCE(fc.customer_budget, 0) AS budget_customer_num,
89
            COALESCE(n4.n4_amount_counter, 0) AS last_year_sales,
90
            COALESCE(n5.n5_customer_counter, 0) AS last_year_customer_num,
91
            COALESCE(n6.n6_amount_counter, 0) AS last_year_sales_2,
92
            COALESCE(n7.n7_customer_counter, 0) AS last_year_customer_num_2,
93
            COALESCE(n8.sales_2019, 0) AS sales_2019,
94
            COALESCE(n8.customer_2019, 0) AS customer_2019_num,
95
            COALESCE(n9.sales_2019, 0) AS sales_2019_2,
96
            COALESCE(n9.customer_2019, 0) AS customer_2019_num_2
97
        FROM 
98
            tbl_forecast fc
99
        LEFT JOIN 
100
            (
101
                SELECT
102
                    tz.transaction_date,
103
                    tz.store_code,
104
                    SUM(tz.amount_counter) AS n1_amount_counter
105
                FROM tbl_nec_401 tz
106
                WHERE REPLACE(tz.transaction_date, '-', '') = '{params["today"]}'
107
                GROUP BY tz.transaction_date, tz.store_code
108
            ) n1 
109
        ON SUBSTRING(n1.store_code, 1, 6) = fc.mst_shop_shop_cd
110
        LEFT JOIN 
111
            (
112
                SELECT
113
                    tz.store_code,
114
                    SUM(tz.customer_counter) AS n2_customer_counter
115
                FROM tbl_nec_401 tz
116
                WHERE REPLACE(tz.transaction_date, '-', '') = '{params["today"]}'
117
                GROUP BY tz.store_code
118
            ) n2
119
        ON SUBSTRING(n2.store_code, 1, 6) = fc.mst_shop_shop_cd
120
        LEFT JOIN 
121
            (
122
                SELECT
123
                    tz.store_code,
124
                    SUM(tz.amount_counter) AS n4_amount_counter
125
                FROM tbl_nec_401 tz
126
                WHERE REPLACE(tz.transaction_date, '-', '') = '{params["ly_today"]}'
127
                GROUP BY tz.store_code
128
            ) n4
129
        ON SUBSTRING(n4.store_code, 1, 6) = fc.mst_shop_shop_cd
130
        LEFT JOIN 
131
            (
132
                SELECT
133
                    tz.store_code,
134
                    SUM(tz.customer_counter) AS n5_customer_counter
135
                FROM tbl_nec_401 tz
136
                WHERE REPLACE(tz.transaction_date, '-', '') = '{params["ly_today"]}'
137
                GROUP BY tz.store_code
138
            ) n5
139
        ON SUBSTRING(n5.store_code, 1, 6) = fc.mst_shop_shop_cd
140
        LEFT JOIN 
141
            (
142
                SELECT
143
                    tz.store_code,
144
                    SUM(tz.amount_counter) AS n6_amount_counter
145
                FROM tbl_nec_401 tz
146
                WHERE REPLACE(tz.transaction_date, '-', '') = '{params["pre_year_date"]}'
147
                GROUP BY tz.store_code
148
            ) n6
149
        ON SUBSTRING(n6.store_code, 1, 6) = fc.mst_shop_shop_cd
150
        LEFT JOIN 
151
            (
152
                SELECT
153
                    tz.store_code,
154
                    SUM(tz.customer_counter) AS n7_customer_counter
155
                FROM tbl_nec_401 tz
156
                WHERE REPLACE(tz.transaction_date, '-', '') = '{params["pre_year_date"]}'
157
                GROUP BY tz.store_code
158
            ) n7
159
        ON SUBSTRING(n7.store_code, 1, 6) = fc.mst_shop_shop_cd
160
        LEFT JOIN 
161
            (
162
                SELECT
163
                    js.store_code,
164
                    js.sales_2019,
165
                    js.customer_2019
166
                FROM tbl_jiseki_2019 js
167
                WHERE js.date = '{params["2019_today"]}'
168
            ) n8
169
        ON SUBSTRING(n8.store_code, 1, 6) = fc.mst_shop_shop_cd
170
        LEFT JOIN 
171
            (
172
                SELECT
173
                    js.store_code,
174
                    js.sales_2019,
175
                    js.customer_2019
176
                FROM tbl_jiseki_2019 js
177
                WHERE js.date = (
178
                    SELECT year_2019_cal_date_ymd 
179
                    FROM tbl_calendar 
180
                    WHERE cal_date_ymd = '{params["today"]}'
181
                )
182
            ) n9
183
        ON SUBSTRING(n9.store_code, 1, 6) = fc.mst_shop_shop_cd
184
        WHERE 
185
            fc.date = '{params["today"]}'
186
            AND fc.mst_shop_shop_cd > '001000'
187
        """
188
        
189
        # Execute the SQL query
190
        df_result_nec = spark.sql(sql_nec)
191
        
192
        # Write the result to MySQL
193
        def write_to_mysql(df: DataFrame):
194
            for attempt in range(5):
195
                try:
196
                    df.write.format("jdbc") \
197
                        .option(
198
                            "url", 
199
                            "jdbc:mysql://data-platform-prd-aurora.cluster-cy9zohy3hxcg.ap-northeast-1.rds.amazonaws.com:3306/dm"
200
                        ) \
201
                        .option("driver", "com.mysql.jdbc.Driver") \
202
                        .option("dbtable", "flash_sales_reports") \
203
                        .option("user", "root") \
204
                        .option("password", "Password123!") \
205
                        .option("characterEncoding", "UTF8") \
206
                        .mode("Append") \
207
                        .save()
208
                    print(f"Successfully wrote data for date {params['today']} to MySQL.")
209
                    break
210
                except Exception as e:
211
                    print(f"Attempt {attempt + 1} failed: {str(e)}")
212
                    if attempt < 4:
213
                        time.sleep(5)  # Wait before retrying
214
                    else:
215
                        raise
216
        
217
        # Write the result to MySQL
218
        write_to_mysql(df_result_nec)
219
    
220
    except Exception as e:
221
        print(f"Error processing date {current_date.strftime('%Y%m%d')}: {str(e)}")
222
    
223
    # Move to the next date
224
    current_date += timedelta(days=1)
225

    
226
# Commit the Glue job
227
job.commit()
(5-5/5)