๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

Apache NiFi

๋ฐ์ดํ„ฐ ์ ์žฌ ์—ฐ์Šตํ•˜๊ธฐ 04 _ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ ์žฌ๋œ ๋ฐ์ดํ„ฐ์™€ FTP์„œ๋ฒ„์˜ ํŒŒ์ผ๋ฐ์ดํ„ฐ

 

 

 

๐Ÿ“– ๋ฐ์ดํ„ฐ ์ ์žฌ / ๋ณ€ํ™˜ ์‹œ๋‚˜๋ฆฌ์˜ค _ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ ์žฌ๋œ ๋ฐ์ดํ„ฐ์™€ FTP์„œ๋ฒ„์˜ ํŒŒ์ผ๋ฐ์ดํ„ฐ 

 

 

๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์ ์žฌ๋œ ๋ฐ์ดํ„ฐ์™€ FTP์„œ๋ฒ„์˜ ํŒŒ์ผ๋ฐ์ดํ„ฐ๋ฅผ ํ•ฉ์ณ DB์˜ nifi ์Šคํ‚ค๋งˆ์˜ company_20230216 ํ…Œ์ด๋ธ”์— ์ ์žฌ

์ ์žฌํ•œ ๋ฐ์ดํ„ฐ๋ฅผ cvs ํ˜•์‹ ๋ณ€๊ฒฝํ•˜์—ฌ FTP์„œ๋ฒ„์— ํŒŒ์ผ ์ง€์ •ํ•œ๋‹ค.

(DB ๋ฐ์ดํ„ฐ + ํŒŒ์ผ ๋ฐ์ดํ„ฐ)

 

 

 

1. postgres ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ nifi ์Šคํ‚ค๋งˆ์˜ ovs_expns_kor_entprs ํ…Œ์ด๋ธ”์—์„œ ๋‚˜๋ผ๋ช…์ด '๋ผ์˜ค์Šค'์ธ ๋ฐ์ดํ„ฐ๋งŒ ์กฐํšŒ

2. ์กฐํšŒํ•œ ๋ฐ์ดํ„ฐ๋ฅผ AVRO ์—์„œ JSON์œผ๋กœ ๋ ˆ์ฝ”๋“œ ํ˜•์‹ ๋ณ€ํ™˜

3. ๋ณ€ํ™˜๋œ JSON ๋ฐ์ดํ„ฐ๋ฅผ ํ•œ์ค„์”ฉ ๋ถ„๋ฆฌ ( json ๋ถ„๋ฆฌ ๊ธฐ์ค€์€ $.* ๋กœ )


4. ๋ถ„๋ฆฌํ•œ JSON ๋ฐ์ดํ„ฐ์—์„œ country_nm, exp_plc_nm, region_se ํ•ญ๋ชฉ์„ ํ”Œ๋กœ์šฐํŒŒ์ผ ์†์„ฑ์œผ๋กœ ๋“ฑ๋ก

5. FTP ์„œ๋ฒ„ /nifi_data/raw_dataset ๋””๋ ‰ํ† ๋ฆฌ์—์„œ load_20230216.json ๋ฐ์ดํ„ฐ ๋ถˆ๋Ÿฌ์˜ค๊ธฐ

6. ๋ถˆ๋Ÿฌ์˜จ JSON ๋ฐ์ดํ„ฐ์—์„œ entprs_eng_nm, entprs_kor_nm, exp_yy ํ•ญ๋ชฉ์„ ํ”Œ๋กœ์šฐํŒŒ์ผ ์†์„ฑ์œผ๋กœ ๋“ฑ๋ก

7. postgres ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ nifi ์Šคํ‚ค๋งˆ์˜ company_20230216 ํ…Œ์ด๋ธ”์— 4, 6๋ฒˆ์—์„œ ๋“ฑ๋กํ•œ ์†์„ฑ๋“ค์˜ ๊ฐ’์œผ๋กœ ์ปฌ๋Ÿผ๋ช…๊ณผ ๋งคํ•‘ํ•˜์—ฌ ๋ฐ์ดํ„ฐ ์ ์žฌ
   ์ ์žฌ์ผ์ž(load_de) ์ปฌ๋Ÿผ์—๋Š” ์˜ค๋Š˜๋‚ ์งœ(YYYYMMDD) ๊ฐ’ ์ ์žฌ

โ€ป ์ปฌ๋Ÿผ์ •๋ณด (region_se, country_nm, exp_plc_nm, entprs_kor_nm, entprs_eng_nm, exp_yy, load_de)

8. ์ ์žฌ์™„๋ฃŒ ํ›„ ํ•˜๋‚˜์˜ JSON ๋ฐ์ดํ„ฐ๋กœ ํ•ฉ์น˜๊ธฐ

9. ํ•ฉ์นœ ๋ฐ์ดํ„ฐ๋ฅผ JSON ์—์„œ CSV๋กœ ๋ ˆ์ฝ”๋“œ ํ˜•์‹ ๋ณ€ํ™˜

10. ํŒŒ์ผ๋ช…์„ ์ด๋‹ˆ์…œ_20230216.csv ๋กœ ๋ณ€๊ฒฝ

11. FTP ์„œ๋ฒ„ /nifi_data/result_dataset ๋””๋ ‰ํ† ๋ฆฌ์— ์ด๋‹ˆ์…œ ํด๋” ํ•˜์œ„์— ํŒŒ์ผ ์ ์žฌ

 

 

 

 

 

 

 

์‹œ๋‚˜๋ฆฌ์˜ค ์‹คํ–‰ Flow

 

 

 

 

 

 

 

- ExecuteSQL Processor

 

์Šค์ผ€์ฅด์„ 10 min์œผ๋กœ ์„ค์ •

๊ธฐ๋ณธ '0 sec'๋กœ ์‹คํ–‰ ์‹œ์ผฐ๋”๋‹ˆ ๋ฐ์ดํ„ฐ๊ฐ€ ๋น ๋ฅด๊ฒŒ ๊ณ„์† ์กฐํšŒ๋˜์–ด์„œ ์‹œ๊ฐ„์„ ์ง€์ •ํ•ด๋‘์—ˆ๋‹ค

 

 

Database Connection Pooling Service : ์—ฐ๊ฒฐํ•  ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ปจํŠธ๋กค๋Ÿฌ๋ฅผ ์„ค์ •ํ•œ๋‹ค.

SQL select query : ๋ฐ์ดํ„ฐ๋ฅผ ์กฐํšŒํ•  SQL query๋ฅผ ์ž‘์„ฑ ํ•ด์ค€๋‹ค.

select * from nifi.ovs_expns_kor_entprs
where country_nm = '๋ผ์˜ค์Šค'

ExecuteSQL์—์„œ country_nm='๋ผ์˜ค์Šค'์ธ ๋ฐ์ดํ„ฐ๋ฅผ ์กฐํšŒํ•ด์„œ flowfile ์ƒ์„ฑ

 

 

 

 

 

- ConvertAvroToJSON Processor

 

์กฐํšŒํ•œ Avro ๋ฐ์ดํ„ฐ๋ฅผ json ํ˜•์‹์œผ๋กœ ๋ณ€๊ฒฝํ•œ๋‹ค

 

 

ConvertRecord ํ”„๋กœ์„ธ์Šค๋ฅผ ํ™œ์šฉํ•ด์„œ ํŒŒ์ผ ํ˜•์‹์„ ๋ณ€ํ™˜ ํ•  ์ˆ˜์žˆ๋‹ค.

(Avro์— ๋Œ€ํ•œ ์ปจํŠธ๋กค๋Ÿฌ๋ฅผ ์ƒ์„ฑ ํ•ด์•ผํ•จ.)

 

 

 

 

 

 

 

- SplitJson Processor

 

๋ณ€ํ™˜๋œ Json ๋ฐ์ดํ„ฐ๋ฅผ jsonํ˜•์‹์œผ๋กœ ('$.*') ๊ตฌ๋ถ„ํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ๋ถ„๋ฆฌ

 

 

 

 

 

- EvaluateJsonPath Processor

 

 

๋ถ„๋ฆฌํ•œ JSON ๋ฐ์ดํ„ฐ์—์„œ country_nm, exp_plc_nm, region_se ํ•ญ๋ชฉ์„ ํ”Œ๋กœ์šฐํŒŒ์ผ ์†์„ฑ์œผ๋กœ ๋“ฑ๋ก

 

๋ฐ์ดํ„ฐ์˜ ์†์„ฑ์„ ์ƒ์„ฑํ•˜์—ฌ flow์•ˆ์— ๋ฐ์ดํ„ฐ ๋ฎ์–ด์”Œ์–ด์ง€๊ฑฐ๋‚˜ ์‚ฌ๋ผ์ ธ๋„ ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•ด์ค€๋‹ค(?!)

 

 

 

 

 

 

 

- FetchFTP Processor

FTP์„œ๋ฒ„ /nifi_data/raw_dataset ๋””๋ ‰ํ† ๋ฆฌ์—์„œ load_20230216.json ํŒŒ์ผ ๋ฐ์ดํ„ฐ ๋ถˆ๋Ÿฌ์˜ค๊ธฐ

 

 

 

 

 

- EvaluateJsonPath Processor

 

๋ถˆ๋Ÿฌ์˜จ ๋ฐ์ดํ„ฐ EvaluateJsonPath ํ”„๋กœ์„ธ์Šค๋กœ entprs_eng_nm, entprs_kor_nm, exp_yy ํ•ญ๋ชฉ ์†์„ฑ ์ƒ์„ฑ

 

 

 

 

 

 

 

- UpdateAttribute Processor

 

postgres ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ nifi.company_20230216 ํ…Œ์ด๋ธ”์— ์†์„ฑ ๊ฐ’์œผ๋กœ, ํ…Œ์ด๋ธ”์˜ ์ปฌ๋Ÿผ๊ณผ ๋งค์นญํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ์ ์žฌํ•œ๋‹ค
์ด๋•Œ, ์ ์žฌ์ผ์ž load_de ์ปฌ๋Ÿผ์—๋Š” ์˜ค๋Š˜๋‚ ์งœ(YYYYMMDD) ํ˜•์‹์œผ๋กœ ๊ฐ’ ์ ์žฌ๋˜๋„๋ก ์†์„ฑ์„ ์ƒ์„ฑํ•ด์ค€๋‹ค.

 

์ •๊ทœ์‹ ' ${now():format('yyyyMMdd') ' ์„ ์‚ฌ์šฉํ•ด์ค„ ์ˆ˜ ์žˆ๋‹ค.

 

 

 

 

 

 

 

- PutSQL Processor

 

JDBC Connection Pool : ์—ฐ๊ฒฐํ•  ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ปจํŠธ๋กค๋Ÿฌ ์„ ํƒ

SQL Statement : ์‹คํ–‰ํ•  INSERT or UPDATE ์ฟผ๋ฆฌ๋ฌธ์„ ์ž‘์„ฑํ•œ๋‹ค.

Support Fragmented Transactions : 

true์ด๋ฉด ๋‹จ์ผ ํŠธ๋žœ์žญ์…˜์„ ์ฒ˜๋ฆฌํ•˜๊ณ , false์ด๋ฉด ์—ฌ๋Ÿฌ๊ฐœ์˜ flowfile ๋‹ค์ค‘ ํŠธ๋žœ์žญ์…˜์„ ์ฒ˜๋ฆฌํ•œ๋‹ค

 

PutSQL ํ”„๋กœ์„ธ์Šค๋กœ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— insert ์ฟผ๋ฆฌ๋ฅผ ์ž‘์„ฑํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ์ ์žฌํ•œ๋‹ค.

 

insert into nifi.company_20230216(region_se,country_nm,exp_plc_nm, entprs_kor_nm,entprs_eng_nm,exp_yy,load_de)
values('${region}','${country}','${exp_plc}','${entprs_kor}','${entprs_eng}','${exp}','${time}')

 

๋ฐ์ดํ„ฐ๊ฐ€ ์ ์žฌ ๋œ ๋ชจ์Šต์ด๋‹ค.

 

 

 

 

 

 

 

 

 

- MergeRecord Processor

 

๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ๋ฐ์ดํ„ฐ๊ฐ€ ์ ์žฌ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•˜๊ณ  
MergeRecord ํ”„๋กœ์„ธ์Šค๋ฅผ ํ™œ์šฉํ•˜์—ฌ ํ”Œ๋กœ์šฐ์˜ ๋ฐ์ดํ„ฐํŒŒ์ผ์˜ ๋‚ด์šฉ์„ ํ•˜๋‚˜์˜ json ํŒŒ์ผ๋กœ ํ•ฉ์นœ๋‹ค.

 

 

 

 

 

 

 

- ConvertRecord Processor

 

๋ฐ์ดํ„ฐ ์ ์žฌ ํ›„  jsonํŒŒ์ผ์˜ ํ˜•์‹์„ ํ•˜๋‚˜์˜ cvs๋กœ ํ˜•์‹์„ ๋ณ€ํ™˜ํ•œ๋‹ค

 

 

 

 

 

 

- UpdateAttribute Processor

 

UpdateAttribute ํ”„๋กœ์„ธ์Šค๋กœ ํŒŒ์ผ๋ช…์„ br_20230216.csv ๋กœ ๋ณ€๊ฒฝ

time ์†์„ฑ์„ ์‚ฌ์šฉํ•˜์—ฌ ์˜ค๋Š˜ ๋‚ ์งœ๊ฐ€ ๋“ค์–ด๊ฐ€๋„๋ก ํ•ด๋ณด์•˜๋‹ค!

์†์„ฑ ํ™œ์šฉ!!

 

 

 

 

 

 

 

- PutFTP Processor

 

 

FTP ์„œ๋ฒ„ /nifi_data/result_dataset ์œ„์น˜์— ํŒŒ์ผ์ด ์ €์žฅ๋˜๋„๋ก ํ•œ๋‹ค.

 

 

 

 

 

 

 

 

 

 

ํŒŒ์ผ์ด ์ง€์ •๋œ ๊ฒฝ๋กœ์— ์ €์žฅ ๋˜์—ˆ๋‹ค.

 

 

 

 

 

์ง€์ •๋œ ๊ฒฝ๋กœ์— ์ €์žฅ๋œ ํŒŒ์ผ์„ ์—ด์–ด๋ณด๋ฉด ์ด๋ ‡๊ฒŒ ๋ฐ์ดํ„ฐ๊ฐ€ ๋‚˜์˜จ๋‹ค.

3๊ฐœ ์ปฌ๋Ÿผ์— ๋Œ€ํ•œ ๋ฐ์ดํ„ฐ๊ฐ€ ๋‚˜์˜ค๋Š”๊ฒŒ ๋งž๋‹ค

 

FTP์„œ๋ฒ„์—์„œ ๊ฐ€์ ธ์˜จ flowํŒŒ์ผ์— ๋Œ€ํ•œ ์ปฌ๋Ÿผ์€ 3๊ฐ€์ง€ ์ด๋ฏ€๋กœ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋ฐ์ดํ„ฐ๋ฅผ ์ ์žฌํ–ˆ๋‹ค ํ•ด๋„

FTP์„œ๋ฒ„์— ์˜ฌ๋ ค์ง€๋Š” ๋ฐ์ดํ„ฐ๋Š”  entprs_eng_nm, entprs_kor_nm, exp_yy ํ•ญ๋ชฉ๋งŒ ๋‚˜์˜ค๋Š”๊ฒƒ!