λ³Έλ¬Έ λ°”λ‘œκ°€κΈ°

Apache NiFi

μ •ν˜• 데이터 적재 μ—°μŠ΅ν•˜κΈ° 02

 

 

πŸ“– μ •ν˜• 데이터 적재 μ‹œλ‚˜λ¦¬μ˜€ 02

 

 



1. FTP μ„œλ²„μ˜ /nifi_data/raw_dataset λ””λ ‰ν† λ¦¬μ—μ„œ μ΄λ‹ˆμ…œ_20161231.csv λ°μ΄ν„°νŒŒμΌμ„ λ‘œμ»¬ NiFi둜 μˆ˜μ§‘

2. λ ˆμ½”λ“œ κ±΄μˆ˜ μΈ‘μ •

3. μ˜€λŠ˜λ‚ μ§œ (년도4μžλ¦¬μ›”μΌ YYYYMMDD) ν˜•νƒœλ‘œ time μ΄λ¦„μ˜ μ†μ„± μƒμ„±

3. λ ˆμ½”λ“œμ˜ μ»¬λŸΌ μ€‘  κ΅¬λΆ„, κ΅­κ°€λͺ…, μ§„μΆœμ§€μ—­, νšŒμ‚¬λͺ…(κ΅­λ¬Έ), νšŒμ‚¬λͺ…(영문), μ§„μΆœλ…„λ„, μ—…μ’…1 μ»¬λŸΌ μΆ”μΆœ 
   + μœ„μ—μ„œ μΈ‘μ •ν•œ λ ˆμ½”λ“œ κ±΄μˆ˜λ₯Ό λ°μ΄ν„° κ±΄μˆ˜ (data_co) μ»¬λŸΌμ— λ„£κΈ°
   + time μ†μ„±μ˜ κ°’을 μ μž¬μΌμž(load_de)μ»¬λŸΌμ— λ„£κΈ° 

 

3번 ν•­λͺ©μ— λŒ€ν•œ 힌트)
SELECT
.........
'${속성λͺ…}' AS "μΆ”κ°€ν• μ»¬λŸΌλͺ…",
.......
FROM flowfile

 

3번 ν•­λͺ©μ˜ μΆ”μΆœλŒ€μƒ μ»¬λŸΌμ€ ovs_expns_kor_entprs ν…Œμ΄λΈ”μ˜ μ»¬λŸΌλͺ…κ³Ό λ§€ν•‘ν•˜μ‹œλ©΄ λ©λ‹ˆλ‹€!

 



4. μ„œλ²„DB postgresql DB nifi μŠ€ν‚€λ§ˆ λ‚΄μ˜ ovs_expns_kor_entprs ν…Œμ΄λΈ”에 μΆ”μΆœν•œ λ ˆμ½”λ“œ μ μž¬

5. λ ˆμ½”λ“œ μ μž¬ μ™„λ£Œ ν›„ νŒŒμΌλͺ…을 μ΄λ‹ˆμ…œ_μ˜€λŠ˜λ‚ μ§œ.csv둜 λ³€κ²½

6. μ μž¬ μ™„λ£Œλœ νŒŒμΌμ„ FTP μ„œλ²„μ˜ /nifi_data/result_dataset κ²½λ‘œ ν•˜μœ„에 λ³ΈμΈμ˜ μ΄λ‹ˆμ…œλ‘œ ν΄λ”λͺ…을 μ§€μ •ν•˜μ—¬ ν•΄λ‹Ήν΄λ”에 λ°μ΄ν„°νŒŒμΌ μ μž¬

 

 

 

 

 

 

 

 

 

μ‹€ν–‰λœ flowλŠ” μ•„λž˜μ™€ κ°™λ‹€.

 

 

 

 

 

 

 

 

둜컬둜 μˆ˜μ§‘ ν•  NiFi 데이터

br_20161231.csv 파일

 

 

 

 

 

 

 

 

 

 

GetFTP ν”„λ‘œμ„ΈμŠ€λ‘œ FTP μ„œλ²„μ˜ /nifi_data/raw_dataset μœ„μΉ˜μ— μžˆλŠ” br_20161231.csv 파일 데이터λ₯Ό λΆˆλŸ¬μ˜¨λ‹€.

 

CalculateRecordStats ν”„λ‘œμ„ΈμŠ€λ‘œ λ ˆμ½”λ“œ 건수λ₯Ό μΈ‘μ •

 

UpdateAttribute ν”„λ‘œμ„ΈμŠ€λ‘œ 속성을 μ„€μ •ν•΄μ£Όμ–΄

μ˜€λŠ˜λ‚ μ§œ(YYYMMDD) ν˜•νƒœλ‘œ time μ΄λ¦„μ˜ 속성을 μƒμ„±ν•˜μ—¬ μ€€λ‹€.

 

 

 

 

 

QueryRecord ν”„λ‘œμ„ΈμŠ€λ‘œ λ ˆμ½”λ“œμ˜ 컬럼 쀑

ꡬ뢄, κ΅­κ°€λͺ…, μ§„μΆœμ§€μ—­, νšŒμ‚¬λͺ…(κ΅­λ¬Έ), νšŒμ‚¬λͺ…(영문), μ§„μΆœλ…„λ„, μ—…μ’…1 컬럼 7개λ₯Ό μΆ”μΆœν•œλ‹€.

 

  • μœ„μ—μ„œ μΈ‘μ •ν•œ λ ˆμ½”λ“œ 건수λ₯Ό 데이터 건수(data_co) μ»¬λŸΌμ— λ„£λŠ”λ‹€.
    • 1. CalculateRecordStats ν”„λ‘œμ„ΈμŠ€μ˜ View Dadta Provenance
    • 2. Provence Eventμ—μ„œ ATTIBUTE 의 λ‚΄μš©μ— 'read.count' 속성을 ν™•μΈν•œλ‹€.
  • time μ†μ„±μ˜ 값을 적재일자(load_de) μ»¬λŸΌμ— λ„£λŠ”λ‹€.

 

 

 

 

 

SELECT
"ꡬ뢄" AS region_se,
"κ΅­κ°€λͺ…" AS country_nm,
"μ§„μΆœμ§€μ—­" AS exp_plc_nm,
"νšŒμ‚¬λͺ…(κ΅­λ¬Έ)" AS entprs_kor_nm,
"νšŒμ‚¬λͺ…(영문)" AS entprs_eng_nm,
"μ§„μΆœλ…„λ„" AS exp_yy,
"μ—…μ’…1" AS busns_ty_1,
'${record.count}' AS data_co,
'${time}' AS load_de
FROM flowfile

 

 

 

 

 

ovs_expns_kor_entprs ν…Œμ΄λΈ”μ— 데이터λ₯Ό 적재 μ‹œν‚€λ„λ‘ μ„€μ •ν•΄μ€€λ‹€.

 

 

 

 

 

 

UpdateAttribute ν”„λ‘œμ„ΈμŠ€μ— FTPμ„œλ²„μ— 파일λͺ…을 'μ΄λ‹ˆμ…œ_μ˜€λŠ˜λ‚ μ§œ.csv' ν˜•μ‹μœΌλ‘œ λ³€κ²½λ˜κ²Œ 속성 μ„€μ •ν•œλ‹€.

 

 

 

 

 

DB에 적재 μ™„λ£Œλœ νŒŒμΌμ„ FTP μ„œλ²„μ˜ /nifi_data/result_dataset/boram κ²½λ‘œμ— λ°μ΄ν„°νŒŒμΌμ„ 적재 μ‹œν‚¨λ‹€.