Hmm if you would use streams you could do somethings like this: This is just a PoC :) it still has some bugs does not parse the csv but as a starting point.
With this approach you can stream and parse a lot of CSVs without having any memory problems.
Easier would be ofc the blocking version but the blocking version will always load the whole file into the memory and if you got big files for comparison this can suck.
const readline = require('readline');
const fs = require('fs');
const csvMap = [
'csv-file-1',
'csv-file-2',
'csv-file-3'
];
const lineReaders = [];
const currentLines = [];
const pickedSet = [];
console.log('starting');
csvMap.map((fileName) => {
const lineReader = require('readline').createInterface({
input: require('fs').createReadStream(fileName),
crlfDelay: Infinity
}).pause();
const lineReaderWrapper = {
fileName: fileName,
lineNo: 0,
header: {},
closed: false,
lineReader,
};
lineReader.on('line', line => {
console.log('reading:' + fileName + ' line number: ' + lineReaderWrapper.lineNo)
if (Object.keys(lineReaderWrapper.header).length === 0) {
// header should be parsed here
console.log('header: ' + line + "\n");
lineReaderWrapper.header = line;
++lineReaderWrapper.lineNo;
} else {
// either we parse the csv here or we just push it as string
console.log('body: ' + line + "\n");
currentLines.push(line);
++lineReaderWrapper.lineNo;
lineReader.pause();
}
}).on('close', () => {
console.log('closed stream' + fileName + ' line number: ' + lineReaderWrapper.lineNo)
lineReaderWrapper.closed = true;
});
lineReader.resume();
lineReaders.push(lineReaderWrapper);
});
const asyncLoop = () => {
if (atLeastOneStreamIsStillOpen(lineReaders)) {
if (!waitForAllCSVs()) {
setTimeout(asyncLoop, 100);
}
const compareSet = [];
currentLines.map((line) => {
console.log('line')
// here we read our buffered lines and could do some comparing
console.log('buffered lines: ' + line)
})
lineReaders.map((lineReaderWrapper) => {
lineReaderWrapper.lineReader.resume(); // we just resume the next line in all streams and wait for them to fill
})
setTimeout(asyncLoop, 100);
}
}
setTimeout(asyncLoop, 100);
function atLeastOneStreamIsStillOpen() {
const reducer = (currentValue, lineReaderWrapper) => {
return !lineReaderWrapper.closed & currentValue;
};
// we verify that there are still open streams
return lineReaders.reduce(reducer, true);
}
function waitForAllCSVs() {
// we check the amount of open streams
const reducer = (lineReaderWrapper, currentValue) => {
return !lineReaderWrapper.closed ? ++currentValue : currentValue;
};
const countOpenReaders = lineReaders.reduce(reducer, 0);
// if the amount of open streams is the same as the amount of our current lines all streams have pushed their content.
return countOpenReaders === currentLines.length;
}