Ethan Hur's blog

Nodejs Mysql 과 Stream

2018-01-25

Node.js Stream 을 이용하여 .pipe() 을 거는 코딩을 했다. 꿀잼이었다.

Node.js 의 Stream 문서에 나와있는 인터페이스를 잘 활용하면 어렵지 않게 할 수 있다.

Stream 에는 크게 Readable Stream 과 Writable Stream 이 있고, 이 두개를 잘 합친 Duplex 와 Transform 이 있다.

Node.js Mysql Stream

내가 하고 싶은 작업은 mysql DB 의 row 들을 csv 포맷으로 S3에 덤프를 뜨는 것이었다.

그러기 위해서 Readable Stream 2개를 연결해야 하는데, 커스텀 포매팅이 필요했다.

이를 Transform stream 을 이용하여 구현하였다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
const Transform = require('stream').Transform;
let s3 = new AWS.S3({...});
let transformStream = new Transform({
writableObjectMode: true
});
transformStream._transform = function transfrom(chunk, encoding, callback) {
let line = Object.keys(chunk).map((key) => {
if(chunk[key] instanceof Date) return moment(chunk[key]).format('YYYY-MM-DD HH:mm:ss');
return chunk[key];
}).join(',') + '\n';
this.push(line);
callback(null);
}
function mysqlToS3(sql, cb) {
conn = mysql.createConnection(...);
conn.query(sql)
.stream()
.pipe(transformStream);
s3.upload({
Bucket: BUCKET_NAME,
Key: KEY_NAME,
Body: transformStream,
ACL: '...'
}, (err, data) => {
console.log(data);
cb(err);
});
}

이런 식으로 pipe 을 이용하여 row 들을 S3로 stream 할 수 있었다.

Transform stream 을 쓰지 않을 땐 이걸 file 로 떨군 뒤 다시 읽었는데, 생각해보니 굳이 그럴 필요가 없어서 이 방법이 더 좋았던 것 같다.

objectMode and mysql

stream 의 옵션 중 하나인 objectMode는 stream 을 object 로 전달할 것인가? 아니면 Buffer/string 으로 전달할 것인가? 에 대한 옵션이다.

writableObjectMode: true

위의 코드에서 이 옵션을 제거하게 되면 동작을 하지 않는다.

잘 생각해보면 이유를 알 수 있다.

objectMode 를 끄게 되면 serialized 된 포맷으로 전송되게 된다. 이 data stream 을 보내는 주체는 Mysql Server 이며, 따라서 objectMode: false 인 경우 chunk 를 사용자가 직접 파싱해야 한다.

그 파서는 Mysql data Parser 와 똑같다. 즉, Mysql spec 을 잘 이해하고 있어야 해당 chunk 를 파싱할 수 있는 것이다.

따라서 nodejs 의 mysql 모듈의 경우는 강제적으로 objectMode: true 로 세팅해버린다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Query.prototype.stream = function(options) {
var self = this,
stream;
options = options || {};
options.objectMode = true; //// 요기다
stream = new Readable(options);
stream._read = function() {
self._connection && self._connection.resume();
};
this.on('result',function(row,i) {
if (!stream.push(row)) self._connection.pause();
stream.emit('result',row,i); // replicate old emitter
});
/* 이하 생략 */

잘 생각해보면 맞는 구현인 것 같다. mysql chunk 를 직접 파싱하는 일은 절대 필요가 없다.

Further Study

Node.js Stream 역시 EventEmitter 의 wrapper 이다. 따라서 EventEmitter 를 더 보아야 더 깊은 이해가 가능할 듯 하다.

Tags: Node.js