λ°μ΄ν° μ μ¬/λ³ν μ°μ΅νκΈ° 05_μ κ·μμ μ¬μ©νμ¬ μμ± μμ± , νμΌ ν΄μ ν μμ±μΌλ‘ λ°μ΄ν° μμ νκ³ DB μ μ¬λ λ°μ΄ν° Delete ν INSERT νμ¬ λ°μ΄ν° μ μ¬ ν νμΌλ‘ μμΆνμ¬ SFTP μλ²μ μ μ¬
π λ°μ΄ν° μ μ¬ / λ³ν μλ리μ€
1. FTP μλ² (172.30.1.3) /nifi_data/raw_dataset/ λλ ν 리μμ business_district_data.zip νμΌ μμ§ (μλ³Έ μ μ§)
2. μ€λλ μ§(yyyymmdd) κ°μ΄ λ€μ΄κ° time μ΄λ¦μ μμ±μ μμ± (κ°μ μ«μμ¬μ©νμ§ μκ³ Expression Language λ₯Ό μ¬μ©νμ¬ μμ±)
3. μμ§ν zip νμΌ μμΆν΄μ
4. νμΌ λͺ
μ μ€λ λ μ§κ°μ΄ λ€μ΄κ° νμΌλ€λ§ λΆλ¦¬
5. λ°μ΄ν° 건μ μΈ‘μ
6. λ°μ΄ν° μμ
1) νμΌμ 컬λΌλͺ
μ μλμ κ°μΌλ‘ λ³κ²½
μκ°μ
μλ²νΈ house_no
μνΈλͺ
cmpny_nm
μ§μ λͺ
point_nm
μκΆμ
μ’
λλΆλ₯μ½λ industry_lgcls_cd
μκΆμ
μ’
λλΆλ₯λͺ
industry_lgcls_nm
μκΆμ
μ’
μ€λΆλ₯μ½λ industry_mdcls_cd
μκΆμ
μ’
μ€λΆλ₯λͺ
industry_mdcls_nm
μκΆμ
μ’
μλΆλ₯μ½λ industry_smcls_cd
μκΆμ
μ’
μλΆλ₯λͺ
industry_smcls_nm
νμ€μ°μ
λΆλ₯μ½λ std_cls_cd
νμ€μ°μ
λΆλ₯λͺ
std_cls_nm
μλμ½λ sd_cd
μλλͺ
sd_nm
μꡰꡬμ½λ sgg_cd
μꡰꡬλͺ
sgg_nm
νμ λμ½λ hd_cd
νμ λλͺ
hd_nm
λ²μ λμ½λ bd_cd
λ²μ λλͺ
bd_nm
μ§λ²μ½λ addr_cd
λμ§κ΅¬λΆμ½λ site_cd
λμ§κ΅¬λΆλͺ
site_se_nm
μ§λ²λ³Έλ²μ§ addr_main_no
μ§λ²λΆλ²μ§ addr_sub_no
μ§λ²μ£Όμ addr_data
λλ‘λͺ
μ½λ road_cd
λλ‘λͺ
road_nm
건물본λ²μ§ bud_main_no
건물λΆλ²μ§ bud_sub_no
건물κ΄λ¦¬λ²νΈ bud_mng_no
건물λͺ
bud_nm
λλ‘λͺ
μ£Όμ bud_addr
ꡬμ°νΈλ²νΈ old_zip_cd
μ μ°νΈλ²νΈ new_zip_cd
λμ 보 dong_info
μΈ΅μ 보 flor_info
νΈμ 보 room_info
κ²½λ logtd
μλ lattd
2) νμΌμ μ»¬λΌ μ€ κ²½λ, μλ컬λΌμ κ°μμ . μ 곡백μΌλ‘ λ체
3) 5μμ μΈ‘μ ν λ°μ΄ν° 건μμ κ°μ΄ λ€μ΄κ°λ data_co μ»¬λΌ μμ±
4) 2μμ μμ±ν time μμ±μ κ°μ΄ λ€μ΄κ°λ load_de μ»¬λΌ μμ±
9. postgresql DB nifi μ€ν€λ§ λ΄μ μ΄λμ
_business_dist_data ν
μ΄λΈμ λ°μ΄ν°λ₯Ό DELETE
10. postgresql DB nifi μ€ν€λ§ λ΄μ μ΄λμ
_business_dist_data ν
μ΄λΈμ μμ§ν λ°μ΄ν° INSERT
11. μ μ¬ μλ£λ λ°μ΄ν°λ€μ zip ν¬λ§·μΌλ‘ μμΆ
12. μμΆλ νμΌμ νμΌλͺ
μ μ΄λμ
_μ€λλ μ§.zip λ‘ λ³κ²½
10. νμΌμ SFTPμλ²μ /data01/result_data/ λλ ν 리 νμμ μ΄λμ
λ‘ ν΄λλ₯Ό λ§λ€μ΄ μ μ¬
μλλ¦¬μ€ Flow
GetFTP νλ‘μΈμ€λ‘ FTP μλ² (172.30.1.3) /nifi_data/raw_dataset/ λλ ν 리μμ business_district_data.zip νμΌ μμ§
(μλ³Έ μ μ§)
UpdateAttribute νλ‘μΈμ€λ‘ μμ± μμ±
μ€λλ μ§(yyyymmdd) κ°μ΄ λ€μ΄κ° time μ΄λ¦μ μμ±μ μμ±
(κ°μ μ«μμ¬μ©νμ§ μκ³ Expression Language λ₯Ό μ¬μ©νμ¬ μμ±)
${now():format('yyyyMMdd')}
UnpackContent
- νλ‘μ° λ΄μμ μμΆνμΌμ ν΄μ ν λ μ¬μ©νλ€. νλ‘μ° λ΄μμλ§ μμΆν΄μ λ μνλ‘ λ¨μ μλ€.
Packaging Format : μμΆν ν¬λ§· μ§μ ( zip , tar)
File Filter : νν°λ§ν νμΌ μ§μ .* (λͺ¨λ νμΌ)
RouteOnAttribute
- νλ‘μ°νμΌ λΆλ¦¬ νλ‘μΈμ€
- νλ‘μ° νμΌ λ΄μ λ°μ΄ν°κ° μ¬λ¬κ°μΈ κ²½μ° νλ‘μ°κ° κ°μ§ μμ±μ κΈ°μ€μΌλ‘ νλ‘μ°νμΌμ λΆλ¦¬ ν λ μ¬μ©
Routing Strategy : λΆκΈ°ν κΈ°μ€ μ ν
filename μ΄λΌλ μμ±μ μμ±νμ¬, filenameμ ${time} μμ±(μ€λλ μ§) ν¬ν¨νλ μ΄λ¦μ κ°μ§ νμΌλ€λ§ λΆλ¦¬νλλ‘ νλ€.
${filename:contains(${time})}
CalculateRecordStats
- cvs(μ νλ°μ΄ν°)λ₯Ό μμ§ λ° μ μ¬νλ κ²½μ° λ°μ΄ν° 건μλ₯Ό μΈ‘μ ν μ μλ νλ‘μΈμ€
cvsλ°μ΄ν°κ° νμμ λ§κ² λμ΄μλμ§ κ²μ¦ν λλ μ¬μ©ν μ μλ€.
QueryRecord
- νλ‘μ°νμΌ λ΄μ λ°μ΄ν°λ₯Ό 쿼리ν λ μ¬μ©νλ νλ‘μΈμ€
μ νλ°μ΄ν°(cvs), λ°μ νλ°μ΄ν°(json,xml)μμλ§ μ¬μ©μ΄ κ°λ₯νλ€.
- μ¬μ©μμ§μ μμ±μ λ§λ€μ΄ μ¬μ©νμ¬ μμ±μ κ°μΌλ‘ λ°μ΄ν°λ² μ΄μ€μμ μ‘°ννλ κ²κ³Ό λμΌνκ² SELECTλ¬Έμ μμ± κ°λ₯
(λμ ! FROMμ μλ ν μ΄λΈλͺ μ΄ μλ flowfileμ μ μ΄μ€λ€. νλ‘μ°νμΌμμ μ‘°ννλκ²μ΄λ―λ‘..!)
select
"μκ°μ
μλ²νΈ" AS house_no,
"μνΈλͺ
" AS cmpny_nm,
"μ§μ λͺ
" AS point_nm,
"μκΆμ
μ’
λλΆλ₯μ½λ" AS industry_lgcls_cd,
"μκΆμ
μ’
λλΆλ₯λͺ
" AS industry_lgcls_nm,
"μκΆμ
μ’
μ€λΆλ₯μ½λ" AS industry_mdcls_cd,
"μκΆμ
μ’
μ€λΆλ₯λͺ
" AS industry_mdcls_nm,
"μκΆμ
μ’
μλΆλ₯μ½λ" AS industry_smcls_cd,
"μκΆμ
μ’
μλΆλ₯λͺ
" AS industry_smcls_nm,
"νμ€μ°μ
λΆλ₯μ½λ" AS std_cls_cd,
"νμ€μ°μ
λΆλ₯λͺ
" AS std_cls_nm,
"μλμ½λ" AS sd_cd,
"μλλͺ
" AS sd_nm,
"μꡰꡬμ½λ" AS sgg_cd,
"μꡰꡬλͺ
" AS sgg_nm,
"νμ λμ½λ" AS hd_cd,
"νμ λλͺ
" AS hd_nm,
"λ²μ λμ½λ" AS bd_cd,
"λ²μ λλͺ
" AS bd_nm,
"μ§λ²μ½λ" AS addr_cd,
"λμ§κ΅¬λΆμ½λ" AS site_cd,
"λμ§κ΅¬λΆλͺ
" AS site_se_nm,
"μ§λ²λ³Έλ²μ§" AS addr_main_no,
"μ§λ²λΆλ²μ§" AS addr_sub_no,
"μ§λ²μ£Όμ" AS addr_data,
"λλ‘λͺ
μ½λ" AS road_cd,
"λλ‘λͺ
" AS road_nm,
"건물본λ²μ§" AS bud_main_no,
"건물λΆλ²μ§" AS bud_sub_no,
"건물κ΄λ¦¬λ²νΈ" AS bud_mng_no,
"건물λͺ
" AS bud_nm,
"λλ‘λͺ
μ£Όμ" AS bud_addr,
"ꡬμ°νΈλ²νΈ" AS old_zip_cd,
"μ μ°νΈλ²νΈ" AS new_zip_cd,
"λμ 보" AS dong_info,
"μΈ΅μ 보" AS flor_info,
"νΈμ 보" AS room_info,
replace("κ²½λ", '.',' ') AS logtd,
replace("μλ", '.',' ') AS lattd,
'${record.count}' AS data_co,
'${time}' AS load_de
from flowfile
νμΌμ 컬λΌλͺ μ μμ κ°μ΄ λ³κ²½ν΄μ£Όμλ€.
μ»¬λΌ μ€ κ²½λ, μλ 컬λΌμ κ°μμ '.' λ₯Ό ' '(곡백)μΌλ‘ λ체νμλ€.
μΈ‘μ ν λ°μ΄ν° 건μμ κ°μ΄ λ€μ΄κ°λ data_co μ»¬λΌ μμ±
time μμ±μ κ°μ΄ λ€μ΄κ°λ load_de μ»¬λΌ μμ±
PutSQL
- νλ‘μ° λ΄μμ λ°μ΄ν°λ² μ΄μ€μ INSERT λλ UPDATE λͺ λ Ήμ μ€νν λ μ¬μ©
- Support Fragmented Transactions : λ¨νΈνλ νΈλμμ μ§μ μ¬λΆ
trueμΌ κ²½μ° νλ‘μΈμκ° κ°μ₯ λ¨Όμ ν΄λΉ FlowFileμ fragment.identifier λ° fragment.count νΉμ±μ νμΈνμ¬
fragment.count κ°μ΄ 1λ³΄λ€ ν¬λ©΄ fragment.identifierλ₯Ό κ°μ§ λͺ¨λ FlowFilesλ₯Ό λ¨μΌ νΈλμμ μΌλ‘ μ²λ¦¬νκ³
falseλ‘ μ€μ νλ©΄ μ¬λ¬ κ°μ νλ‘μ°νμΌμ λν μ λ°μ΄νΈ μ€ν μ λ 립μ μΌλ‘ μνλ¨
SQL Stataement : μ€νν 쿼리문μ μμ±νλ€. Deleteλ μ€ν κ°λ₯!
Delete from nifi.br_business_dist_data
postgresql DB nifi μ€ν€λ§ λ΄μ br_business_dist_data ν
μ΄λΈμ λ°μ΄ν°λ₯Ό DELETE
PutDatabaseRecord
λ°μ΄ν°λ² μ΄μ€μ νλ‘μ°νμΌμ λ°μ΄ν°λ₯Ό μ μ¬ν λ μ¬μ©νλ νλ‘μΈμ€
postgresql DB nifi μ€ν€λ§ λ΄μ br_business_dist_data ν
μ΄λΈμ μμ§ν λ°μ΄ν° INSERT
MergeContent
-νλ‘μ°μ λ€μ΄μ€λ λ°μ΄ν°νμΌλ€μ μμΆν λ μ¬μ©νλ νλ‘μΈμ€. νμΌ λ¨μμ κ²°ν©
Merge Strategy : μμΆλ°©μ
Merge Format : μμΆ ν¬λ§·
Attribute Strategy : μμ± μ μ§ λ°©μ
Minimum Number of Entries : λ μ½λ μ΅μκ°
Maximum Number of Entries : λ μ½λ μ΅λκ°
Minimum Group Size : κ²°ν© μ΅μ μ¬μ΄μ¦
Max Bin Age : μ΅λ κ²½κ³Ό μκ°
nifi μ€ν€λ§ λ΄μ br_business_dist_data ν μ΄λΈμ μ μ¬ μλ£λ λ°μ΄ν°λ€μ zip ν¬λ§·μΌλ‘ μμΆ
UpdateAttribute
- νλ‘μ°νμΌμ΄ κ°μ§κ³ μλ μμ±μ κ°μ λ³κ²½ λλ μλ‘μ΄ μμ±μ νλ‘μ°νμΌμ λ±λ‘ν μ μλ νλ‘μΈμ€
μ£Όλ‘ νμΌλͺ μ λ³κ²½νκ±°λ νμΌμ μ¬μ νμν λ³μλ₯Ό μμ±ν λ μ¬μ©νλ€.
Store State : μνλ₯Ό μ μ₯ν μ§ μ¬λΆλ₯Ό μ ν
Cache Value Lookup Cache Size : μΊμμ μ μ₯ν νμ€ μ‘°ν κ° μλ₯Ό μ§μ
μμΆλ νμΌμ νμΌλͺ μ br_μ€λλ μ§.zip λ‘ λ³κ²½
br_${time}.zip
PutSFTP
- μμ§ν λ°μ΄ν°νμΌμ λ€λ₯Έ μλ² λ΄ νΉμ λλ ν 리μ SFTPλ‘ μ κ·Όνμ¬ μ μ¬νλ κ²½μ° μ¬μ©νλ νλ‘μΈμ€
- ν΄λΉ νλ‘μΈμ€λ κ°μ μλ² λ΄μμ μμ νλ κ²½μ°λ§ μν¨νλ©° λ€λ₯Έ μλ² λ΄ λλ ν 리μ μ μ¬λ λΆκ°νλ€.
Hostname : μλ²μ νΈμ€νΈλͺ
Port : μλ² ν¬νΈλ²νΈ
Username : SFTP μλ²μ κ³μ λͺ
Remote Path : μλ²μ μ μ¬ν λλ ν 리λͺ (κ²½λ‘)
Create Directory : μλ²μ μμ±ν λλ ν λ¦¬κ° μλ κ²½μ° μμ±ν μ§ μ¬λΆ true/false
Confict Resolution : μ μ¬μ μ€λ³΅λ νμΌμ΄ μλ κ²½μ° μ²λ¦¬ν λ°©λ² μ ν
νμΌμ SFTPμλ²μ /data01/result_data/ λλ ν 리 νμμ μ΄λμ λ‘ ν΄λλ₯Ό λ§λ€μ΄ μ μ¬
SFTP μλ²μ μ€μ ν λλ ν λ¦¬κ° μμ±λκ³ νμΌμ΄ μλ²μ μ μ¬λ λͺ¨μ΅μ΄λ€.
νκΈ°
Linuxλ₯Ό μ²μμΌλ‘ νμ©νμ¬ μλ리μ€λ₯Ό λ°λΌ λ°μ΄ν°λ₯Ό μ μ¬ ν΄λ³΄μλ€.
νλ‘κ·Έλ¨μ μ ν νλλ° μκ°μ΄ κ±Έλ Έκ³ , μμ§ λ¦¬λ μ€ νκ²½μ μ΅μν΄μ§μ§ λͺ»νλ€.
λ°μ΄ν°λ₯Ό μμ νλ λΆλΆμμ κ²½λμ μλ 컬λΌμ κ°μμ .μ 곡백μΌλ‘ λ체ν λ
DBμμ μ€νν΄λ³΄μμ λλ λκΈΈλ, κ·Έλλ‘ μΏΌλ¦¬λ₯Ό 볡μ¬ν΄μ λΆμ¬λ£κΈ°λ₯Ό νλλ° flowfileμμ " "λ₯Ό λΆμ¬μ
replace ν¨μλ₯Ό μ€νν΄μΌ νμλ€.
postgresql DB nifi μ€ν€λ§ λ΄μ br_business_dist_data ν μ΄λΈμ λ°μ΄ν°λ₯Ό DELETEλ₯Ό ν΄μ€λ, PutSQL νλ‘μΈμ€λ₯Ό μ¬μ©ν΄μΌ νκ³ ,
postgresql DB nifi μ€ν€λ§ λ΄μ br_business_dist_data ν μ΄λΈμ μμ§ν λ°μ΄ν° INSERTν λλ PutdatabaseRecord νλ‘μΈμ€λ₯Ό μ¬μ©νμΌ νλ€.
λλ λμ λ°κΏμ μ¬μ©νλ€... μ΄μ λ PutdatabaseRecord νλ‘μΈμ€μ deleteλ₯Ό ν μ μλλ‘ μ νν μ μλ μμ±μ΄ μμκΈ° κΈ° λλ¬Έμ μ¬μ©νμλ€. μ λ§ λ¨μν μ΄μ γ ..
UnpackContent, RouteOnAttribute, MergeContent, PutSFTP νλ‘μΈμ€λ₯Ό μ²μ μ¬μ©ν΄ 보μλλ°, μ¬μ©νλ©΄μ μ΄λ€ νλ‘μΈμ€μΈμ§ μμ보면μ μ μ©νλλΌ μκ°μ΄ λ μ€λ κ±Έλ Έλκ±° κ°λ€