๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

Apache NiFi

API ๋ฐ์ดํ„ฐ ์ ์žฌ ์—ฐ์Šตํ•˜๊ธฐ 03_Open API ํ˜ธ์ถœํ•œ ๋ฐ์ดํ„ฐ DB์ ์žฌ ํ›„ ์ˆ˜์ •ํ•˜๊ธฐ

 

 

๐Ÿ“– API ๋ฐ์ดํ„ฐ ์ ์žฌ ์‹œ๋‚˜๋ฆฌ์˜ค

 

 

1. API๋กœ ์ˆ˜์ง‘ํ•˜์—ฌ FTP ์„œ๋ฒ„ /nifi_dataset/api_dataset ์— ์ ์žฌํ•œ ๋ฐ์ดํ„ฐ๋ฅผ get ํ•˜๊ธฐ

2. jsonํฌ๋งท์˜ ๋ฐ์ดํ„ฐ๋ฅผ csv๋กœ ๋ณ€ํ™˜

3. ๋ ˆ์ฝ”๋“œ ๊ฑด์ˆ˜ ์ถ”์ถœ

4. ์ถ”์ถœํ•œ ๋ ˆ์ฝ”๋“œ ๊ฑด์ˆ˜๋ฅผ data_co ์ปฌ๋Ÿผ์—, 20230215 ๊ฐ’์ด load_de ์ปฌ๋Ÿผ์— ๋“ค์–ด๊ฐ€๋„๋ก ํ•˜์—ฌ
postgres ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋‚ด nifi ์Šคํ‚ค๋งˆ์˜ lfr_price_rate_year ํ…Œ์ด๋ธ”์— ์ ์žฌ

5. ๋ ˆ์ฝ”๋“œ ์ ์žฌ ํ›„ level_no ์ปฌ๋Ÿผ์˜ ๊ฐ’์ด 1์ธ ๋ฐ์ดํ„ฐ๋งŒ load_de ์ปฌ๋Ÿผ์˜ ๊ฐ’์„ 0์œผ๋กœ ์—…๋ฐ์ดํŠธ

 

 

 

 

 

 

 

์‹คํ–‰ ๋œ Flow

 

 

 

 

 

์ ์žฌ๋œ br_20230215.json ํŒŒ์ผ์„ ๋ถˆ๋Ÿฌ์˜จ๋‹ค.

 

 

 

 

GetFTP ํ”„๋กœ์„ธ์Šค๋ฅผ ์‚ฌ์šฉํ•ด์„œ ํŒŒ์ผ์„ ๋ถˆ๋Ÿฌ์˜จ๋‹ค.

 

 

 

 

 

ConvertRecord๋ฅผ ์‚ฌ์šฉํ•ด์„œ jsonํŒŒ์ผ์„ ์ฝ์–ด์™€์„œ csv ํŒŒ์ผ๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.

 

 

 

 

 

CalculateRecordstats ํ”„๋กœ์„ธ์Šค๋กœ ๋ ˆ์ฝ”๋“œ ๊ฑด์ˆ˜ ์ธก์ •ํ•œ๋‹ค.

 

 

 

 

 

UpdateAttribute ํ”„๋กœ์„ธ์Šค๋กœ time์†์„ฑ์„ ์ƒ์„ฑํ•œ๋‹ค.

 

 

 

 

 

 

 

 

QueryRecord - ํ”Œ๋กœ์šฐ ํŒŒ์ผ ๋‚ด์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฟผ๋ฆฌ ํ•  ๋•Œ ์‚ฌ์šฉํ•˜๋Š” ํ”„๋กœ์„ธ์Šค

์ถ”์ถœํ•œ ๋ ˆ์ฝ”๋“œ ๊ฑด์ˆ˜๋ฅผ data_co ์ปฌ๋Ÿผ์—, 20230215 ๊ฐ’์ด load_de ์ปฌ๋Ÿผ์— ๋“ค์–ด๊ฐ€๊ฒŒ ํ•œ๋‹ค.

 

์ƒ์„ฑํ•œ ์†์„ฑ์„ ํ…Œ์ด๋ธ”์˜ ์ปฌ๋Ÿผ์œผ๋กœ ์‚ฌ์šฉํ•œ๋‹ค

 

 

 

record.count๋Š” CalculateRecordstats ํ”„๋กœ์„ธ์Šค์˜ Data Provenance๋ฅผ ํ†ตํ•ด ์†์„ฑ์„ ํ™•์ธํ•œ๋‹ค.

 

 

 

 

 

 

 

 

PutDatabaseRecord ํ”„๋กœ์„ธ์Šค์— DB๋ฅผ ์ ์žฌ์‹œํ‚ค๊ธฐ ์œ„ํ•œ ์†์„ฑ๊ฐ’์„ ์ž…๋ ฅํ•œ๋‹ค.

 

PostgreSQL ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋‚ด nifi ์Šคํ‚ค๋งˆ์˜ lfr_price_rate_year ํ…Œ์ด๋ธ”์— ์ ์žฌํ•œ๋‹ค.

 

 

 

 

 

DB์— ๋ฐ์ดํ„ฐ๊ฐ€ ์ ์žฌ๋œ ๊ฒƒ์„ ํ™•์ธํ•œ๋‹ค.

์ ์žฌ ํ›„ level_no ์ปฌ๋Ÿผ์˜ ๊ฐ’์ด 1์ธ ๋ฐ์ดํ„ฐ๋งŒ load_de ์ปฌ๋Ÿผ์˜ ๊ฐ’์„ 0์œผ๋กœ ์—…๋ฐ์ดํŠธ ํ•ด๋ณธ๋‹ค.

 

 

 

 

 

 

PutSQL - ํ”Œ๋กœ์šฐ ๋‚ด์—์„œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ INSERT ๋˜๋Š” UPDATE ๋ช…๋ น์„ ์‹คํ–‰ํ•˜๋Š” ๊ฒฝ์šฐ ์‚ฌ์šฉํ•˜๋Š” ํ”„๋กœ์„ธ์Šค

JDBC Connection Pool : ์—ฐ๊ฒฐํ•  ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ ์ปจํŠธ๋กค๋Ÿฌ ์„œ๋น„์Šค ์„ ํƒ

SQL Statement : ์‹คํ–‰ํ•  INSERT ๋˜๋Š” UPDATE SQL๋ฌธ ์ž‘์„ฑ

 

 

SQL Statement ์†์„ฑ์— ์ž‘์„ฑํ•œ update ์ฟผ๋ฆฌ๋ฌธ

* ์Šคํ‚ค๋งˆ๊ฐ€ ์กด์žฌํ•˜๋Š” ๊ฒฝ์šฐ

ํ…Œ์ด๋ธ”๋ช… ์•ž์— ์Šคํ‚ค๋งˆ๋ช…์„ ๊ผญ ์ ์–ด ์ฃผ์–ด์•ผํ•œ๋‹ค. 

 

 

 

 

 

 

Flow ์‹คํ–‰์ด ๋ชจ๋‘ ์™„๋ฃŒ๋˜๊ณ ,  DB์—์„œ ํ™•์ธ์„ ํ•œ๋ฒˆ ํ•ด๋ณด์•˜๋‹ค.

level_no์˜ ๊ฐ’์ด '1'์ธ ๋ฐ์ดํ„ฐ๋งŒ  load_de ์ปฌ๋Ÿผ์˜ ๊ฐ’์„ 0์œผ๋กœ ์ˆ˜์ •๋œ ๊ฒƒ์„ ํ™•์ธํ•จ.

 

 

 

 

 

 

 

 

 

 

 

ํ›„๊ธฐ

 

 

PutSQL ํ”„๋กœ์„ธ์Šค๋ฅผ ์ฒ˜์Œ ์‚ฌ์šฉํ•˜์—ฌ ์—ฐ์Šต ํ•ด๋ณด์•˜๋‹ค.

Flow๋ฅผ ๋งŒ๋“ค๋•Œ ์–ด๋–ค ํ”„๋กœ์„ธ์Šค๋ฅผ ์‚ฌ์šฉํ•ด์•ผ ์ ์ ˆํ•˜๊ณ  ์กฐ๊ธˆ ๋” ๊ฐ„๊ฒฐํ•˜๊ฒŒ ์‚ฌ์šฉ๋ ์ง€ ๊ณ ๋ฏผ๋„ ํ•ด๋ณด์•˜๋‹ค

 

๋ฌธ์ œ์ ์ด๋ผ๊ณ  ํ•˜๋ฉด,

PutSQL ํ”„๋กœ์„ธ์Šค๋ฅผ ์„ค์ •ํ•  ๋•Œ, SQL statement์— SQL๋ฌธ์„ ์ž‘์„ฑํ•ด์ฃผ์—ˆ๋Š”๋ฐ

์ด๋•Œ๋Š” from fileflow ๊ฐ€ ์•„๋‹Œ "FROM ํ…Œ์ด๋ธ”๋ช…"์„ ์ ์–ด ์ฃผ์–ด DB์—์„œ ์ˆ˜์ •์ด ๋˜๋Š”์ง€ ํ™•์ธ ์ฐจ ์ฟผ๋ฆฌ๋ฅผ ์ž‘์„ฑํ•˜์—ฌ ์‹คํ–‰์‹œ์ผฐ๋”๋‹ˆ

๋ฐ์ดํ„ฐ๊ฐ€ ์ˆ˜์ •๋จ์„ ํ™•์ธ ํ•˜์˜€๋‹ค.

๊ทธ๋Œ€๋กœ update ์ฟผ๋ฆฌ๋ฌธ์„ ์ž‘์„ฑ ํ•ด์ฃผ์—ˆ๋Š”๋ฐ, ์ด๋•Œ ์Šคํ‚ค๋งˆ ๋ช…์„ ์ ์–ด์ฃผ์ง€ ์•Š์•„์„œ flow์—์„œ๋Š” ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•˜๊ฒŒ ๋˜์—ˆ๋˜ ๊ฒƒ.

๊ตฌ๊ธ€์„ ์ฐพ์•„๋ณด์•˜์ง€๋งŒ ํ•ด๊ฒฐ์ด ๋˜์ง€ ์•Š์•„์„œ ์‚ฌ์ˆ˜๋‹˜์—๊ฒŒ ์งˆ๋ฌธํ•˜์—ฌ ํ•ด๊ฒฐ์ด ์™„!!!!

์Šคํ‚ค๋งˆ๊ฐ€ ์กด์žฌํ•  ๋•Œ๋Š” ๊ผญ ํ…Œ์ด๋ธ”๋ช… ์•ž์— ์Šคํ‚ค๋งˆ๋ช…์„ ์ ์–ด์ฃผ์ž!