๐ ๋ฐ์ดํฐ ์ ์ฌ / ๋ณํ ์๋๋ฆฌ์ค _ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ์ฌ๋ ๋ฐ์ดํฐ์ FTP์๋ฒ์ ํ์ผ๋ฐ์ดํฐ
๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ ์ฌ๋ ๋ฐ์ดํฐ์ FTP์๋ฒ์ ํ์ผ๋ฐ์ดํฐ๋ฅผ ํฉ์ณ DB์ nifi ์คํค๋ง์ company_20230216 ํ ์ด๋ธ์ ์ ์ฌ
์ ์ฌํ ๋ฐ์ดํฐ๋ฅผ cvs ํ์ ๋ณ๊ฒฝํ์ฌ FTP์๋ฒ์ ํ์ผ ์ง์ ํ๋ค.
(DB ๋ฐ์ดํฐ + ํ์ผ ๋ฐ์ดํฐ)
1. postgres ๋ฐ์ดํฐ๋ฒ ์ด์ค์ nifi ์คํค๋ง์ ovs_expns_kor_entprs ํ
์ด๋ธ์์ ๋๋ผ๋ช
์ด '๋ผ์ค์ค'์ธ ๋ฐ์ดํฐ๋ง ์กฐํ
2. ์กฐํํ ๋ฐ์ดํฐ๋ฅผ AVRO ์์ JSON์ผ๋ก ๋ ์ฝ๋ ํ์ ๋ณํ
3. ๋ณํ๋ JSON ๋ฐ์ดํฐ๋ฅผ ํ์ค์ฉ ๋ถ๋ฆฌ ( json ๋ถ๋ฆฌ ๊ธฐ์ค์ $.* ๋ก )
4. ๋ถ๋ฆฌํ JSON ๋ฐ์ดํฐ์์ country_nm, exp_plc_nm, region_se ํญ๋ชฉ์ ํ๋ก์ฐํ์ผ ์์ฑ์ผ๋ก ๋ฑ๋ก
5. FTP ์๋ฒ /nifi_data/raw_dataset ๋๋ ํ ๋ฆฌ์์ load_20230216.json ๋ฐ์ดํฐ ๋ถ๋ฌ์ค๊ธฐ
6. ๋ถ๋ฌ์จ JSON ๋ฐ์ดํฐ์์ entprs_eng_nm, entprs_kor_nm, exp_yy ํญ๋ชฉ์ ํ๋ก์ฐํ์ผ ์์ฑ์ผ๋ก ๋ฑ๋ก
7. postgres ๋ฐ์ดํฐ๋ฒ ์ด์ค์ nifi ์คํค๋ง์ company_20230216 ํ
์ด๋ธ์ 4, 6๋ฒ์์ ๋ฑ๋กํ ์์ฑ๋ค์ ๊ฐ์ผ๋ก ์ปฌ๋ผ๋ช
๊ณผ ๋งคํํ์ฌ ๋ฐ์ดํฐ ์ ์ฌ
์ ์ฌ์ผ์(load_de) ์ปฌ๋ผ์๋ ์ค๋๋ ์ง(YYYYMMDD) ๊ฐ ์ ์ฌ
โป ์ปฌ๋ผ์ ๋ณด (region_se, country_nm, exp_plc_nm, entprs_kor_nm, entprs_eng_nm, exp_yy, load_de)
8. ์ ์ฌ์๋ฃ ํ ํ๋์ JSON ๋ฐ์ดํฐ๋ก ํฉ์น๊ธฐ
9. ํฉ์น ๋ฐ์ดํฐ๋ฅผ JSON ์์ CSV๋ก ๋ ์ฝ๋ ํ์ ๋ณํ
10. ํ์ผ๋ช
์ ์ด๋์
_20230216.csv ๋ก ๋ณ๊ฒฝ
11. FTP ์๋ฒ /nifi_data/result_dataset ๋๋ ํ ๋ฆฌ์ ์ด๋์
ํด๋ ํ์์ ํ์ผ ์ ์ฌ
์๋๋ฆฌ์ค ์คํ Flow
- ExecuteSQL Processor
์ค์ผ์ฅด์ 10 min์ผ๋ก ์ค์
๊ธฐ๋ณธ '0 sec'๋ก ์คํ ์์ผฐ๋๋ ๋ฐ์ดํฐ๊ฐ ๋น ๋ฅด๊ฒ ๊ณ์ ์กฐํ๋์ด์ ์๊ฐ์ ์ง์ ํด๋์๋ค
Database Connection Pooling Service : ์ฐ๊ฒฐํ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ปจํธ๋กค๋ฌ๋ฅผ ์ค์ ํ๋ค.
SQL select query : ๋ฐ์ดํฐ๋ฅผ ์กฐํํ SQL query๋ฅผ ์์ฑ ํด์ค๋ค.
select * from nifi.ovs_expns_kor_entprs
where country_nm = '๋ผ์ค์ค'
ExecuteSQL์์ country_nm='๋ผ์ค์ค'์ธ ๋ฐ์ดํฐ๋ฅผ ์กฐํํด์ flowfile ์์ฑ
- ConvertAvroToJSON Processor
์กฐํํ Avro ๋ฐ์ดํฐ๋ฅผ json ํ์์ผ๋ก ๋ณ๊ฒฝํ๋ค
ConvertRecord ํ๋ก์ธ์ค๋ฅผ ํ์ฉํด์ ํ์ผ ํ์์ ๋ณํ ํ ์์๋ค.
(Avro์ ๋ํ ์ปจํธ๋กค๋ฌ๋ฅผ ์์ฑ ํด์ผํจ.)
- SplitJson Processor
๋ณํ๋ Json ๋ฐ์ดํฐ๋ฅผ jsonํ์์ผ๋ก ('$.*') ๊ตฌ๋ถํ์ฌ ๋ฐ์ดํฐ๋ฅผ ๋ถ๋ฆฌ
- EvaluateJsonPath Processor
๋ถ๋ฆฌํ JSON ๋ฐ์ดํฐ์์ country_nm, exp_plc_nm, region_se ํญ๋ชฉ์ ํ๋ก์ฐํ์ผ ์์ฑ์ผ๋ก ๋ฑ๋ก
๋ฐ์ดํฐ์ ์์ฑ์ ์์ฑํ์ฌ flow์์ ๋ฐ์ดํฐ ๋ฎ์ด์์ด์ง๊ฑฐ๋ ์ฌ๋ผ์ ธ๋ ๋ฐ์ดํฐ๋ฅผ ์ฌ์ฉํ ์ ์๊ฒ ํด์ค๋ค(?!)
- FetchFTP Processor
FTP์๋ฒ /nifi_data/raw_dataset ๋๋ ํ ๋ฆฌ์์ load_20230216.json ํ์ผ ๋ฐ์ดํฐ ๋ถ๋ฌ์ค๊ธฐ
- EvaluateJsonPath Processor
๋ถ๋ฌ์จ ๋ฐ์ดํฐ EvaluateJsonPath ํ๋ก์ธ์ค๋ก entprs_eng_nm, entprs_kor_nm, exp_yy ํญ๋ชฉ ์์ฑ ์์ฑ
- UpdateAttribute Processor
postgres ๋ฐ์ดํฐ๋ฒ ์ด์ค์ nifi.company_20230216 ํ
์ด๋ธ์ ์์ฑ ๊ฐ์ผ๋ก, ํ
์ด๋ธ์ ์ปฌ๋ผ๊ณผ ๋งค์นญํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์ ์ฌํ๋ค
์ด๋, ์ ์ฌ์ผ์ load_de ์ปฌ๋ผ์๋ ์ค๋๋ ์ง(YYYYMMDD) ํ์์ผ๋ก ๊ฐ ์ ์ฌ๋๋๋ก ์์ฑ์ ์์ฑํด์ค๋ค.
์ ๊ท์ ' ${now():format('yyyyMMdd') ' ์ ์ฌ์ฉํด์ค ์ ์๋ค.
- PutSQL Processor
JDBC Connection Pool : ์ฐ๊ฒฐํ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ปจํธ๋กค๋ฌ ์ ํ
SQL Statement : ์คํํ INSERT or UPDATE ์ฟผ๋ฆฌ๋ฌธ์ ์์ฑํ๋ค.
Support Fragmented Transactions :
true์ด๋ฉด ๋จ์ผ ํธ๋์ญ์ ์ ์ฒ๋ฆฌํ๊ณ , false์ด๋ฉด ์ฌ๋ฌ๊ฐ์ flowfile ๋ค์ค ํธ๋์ญ์ ์ ์ฒ๋ฆฌํ๋ค
PutSQL ํ๋ก์ธ์ค๋ก ๋ฐ์ดํฐ๋ฒ ์ด์ค์ insert ์ฟผ๋ฆฌ๋ฅผ ์์ฑํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์ ์ฌํ๋ค.
insert into nifi.company_20230216(region_se,country_nm,exp_plc_nm, entprs_kor_nm,entprs_eng_nm,exp_yy,load_de)
values('${region}','${country}','${exp_plc}','${entprs_kor}','${entprs_eng}','${exp}','${time}')
๋ฐ์ดํฐ๊ฐ ์ ์ฌ ๋ ๋ชจ์ต์ด๋ค.
- MergeRecord Processor
๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ฐ์ดํฐ๊ฐ ์ ์ฌ๋์๋์ง ํ์ธํ๊ณ
MergeRecord ํ๋ก์ธ์ค๋ฅผ ํ์ฉํ์ฌ ํ๋ก์ฐ์ ๋ฐ์ดํฐํ์ผ์ ๋ด์ฉ์ ํ๋์ json ํ์ผ๋ก ํฉ์น๋ค.
- ConvertRecord Processor
๋ฐ์ดํฐ ์ ์ฌ ํ jsonํ์ผ์ ํ์์ ํ๋์ cvs๋ก ํ์์ ๋ณํํ๋ค
- UpdateAttribute Processor
UpdateAttribute ํ๋ก์ธ์ค๋ก ํ์ผ๋ช ์ br_20230216.csv ๋ก ๋ณ๊ฒฝ
time ์์ฑ์ ์ฌ์ฉํ์ฌ ์ค๋ ๋ ์ง๊ฐ ๋ค์ด๊ฐ๋๋ก ํด๋ณด์๋ค!
์์ฑ ํ์ฉ!!
- PutFTP Processor
FTP ์๋ฒ /nifi_data/result_dataset ์์น์ ํ์ผ์ด ์ ์ฅ๋๋๋ก ํ๋ค.
ํ์ผ์ด ์ง์ ๋ ๊ฒฝ๋ก์ ์ ์ฅ ๋์๋ค.
์ง์ ๋ ๊ฒฝ๋ก์ ์ ์ฅ๋ ํ์ผ์ ์ด์ด๋ณด๋ฉด ์ด๋ ๊ฒ ๋ฐ์ดํฐ๊ฐ ๋์จ๋ค.
3๊ฐ ์ปฌ๋ผ์ ๋ํ ๋ฐ์ดํฐ๊ฐ ๋์ค๋๊ฒ ๋ง๋ค
FTP์๋ฒ์์ ๊ฐ์ ธ์จ flowํ์ผ์ ๋ํ ์ปฌ๋ผ์ 3๊ฐ์ง ์ด๋ฏ๋ก ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ฐ์ดํฐ๋ฅผ ์ ์ฌํ๋ค ํด๋
FTP์๋ฒ์ ์ฌ๋ ค์ง๋ ๋ฐ์ดํฐ๋ entprs_eng_nm, entprs_kor_nm, exp_yy ํญ๋ชฉ๋ง ๋์ค๋๊ฒ!