My FeedDiscussionsHashnode Enterprise
New
Sign in
Log inSign up
Learn more about Hashnode Headless CMSHashnode Headless CMS
Collaborate seamlessly with Hashnode Headless CMS for Enterprise.
Upgrade ✨Learn more
Node worker threads

Node worker threads

Liron Navon's photo
Liron Navon
·Jan 26, 2019

Worker threads are already available from version 10.5, but now at version 11.7, the feature flag is removed and we can start using them.

What are worker threads?

The main reason against node is that it is single threaded and therefore cannot make use of all the machine resources, but with worker threads, we can create multiple threads to delegate work from the main thread and keep it free to process new requests faster.

Node is known as an asynchronous programming runtime for javascript, in node we work in a concurrent way (with promises and callbacks), but here we will understand how to work in a parallel way, and why and when we should even do it, you can read more about the difference between concurrency and parallelism here.

When to use worker threads?

As the documentation states:

Workers (threads) are useful for performing CPU-intensive JavaScript operations. They will not help much with I/O-intensive work. Node.js’s built-in asynchronous I/O operations are more efficient than Workers can be.

Let's get started!

For working with worker threads lets first make sure we have node version of 11.7.0 or higher (with the command “node -v”), you can update it with nvm.

We are going to do a time consuming heavy CPU operation — sorting a very large array, millions of elements. The worker_threads API exports a few variables and functions, here is a shot, simple description for each:

Worker (function): A class to create a new worker thread, it accepts a path to a script, and options object that includes a “workerData” variable, it should be called to spawn a new thread.

workerData (object): This variable will contain the data that is passed to the worker thread through the constructor of “Worker”, we can use the instance to emit messages into the worker, or listen to events from it, kill and restart it.

isMainThread (boolean): A variable that tells us if we are on the main thread, you can use it to make decisions from the workers (remember that a worker thread can spawn another worker thread, which means both will not be in the main thread.)

parentPort (object): The child thread can use this object to emit messages to the parent thread.

Ok, now that we know what API the worker threads have, we can see an example, the example is well documented.

index.js is in our main thread.

const { Worker } = require("worker_threads");
const path = require("path");

// create some big array
const elements = 1000000;
const bigArray = Array(elements)
  .fill()
  .map(() => Math.random());

// we get the path of the script
const workerScript = path.join(__dirname, "./sorter.js");

// create a new worker from our script
const worker = new Worker(workerScript, { workerData: bigArray });

// receive events from the worker
worker.on("message", (sortedArray) => console.log('message:', sortedArray[0]));
worker.on("error", (error) => console.error("error", error));
worker.on("exit", () => console.log("exit"));

sorter.js is a script made to work with the worker_threads API as a worker.

const { parentPort, workerData, isMainThread } = require("worker_threads");

// CPU consuming function (sorting a big array)
function sortBigArray(bigArray) {
  return bigArray.sort((a, b) => a - b);
}

// check that the sorter was called as a worker thread
if (!isMainThread) {
  // make sure we got an array of data
  if (!Array.isArray(workerData)) {
    // we can throw an error to emit the "error" event from the worker
    throw new Error("workerData must be an array of numbers");
  }
  // we post a message through the parent port, to emit the "message" event
  parentPort.postMessage(sortBigArray(workerData));
}

You can run it with: node index.js , our worker can now sort a simple array, but in order to see the difference, we need to scale this operation across multiple workers.

Let’s see the difference

That’s nice, but unfortunately, threads in node.js are pretty heavy, so using 1 worker is usually pretty useless, but we can improve the way we use the workers by distributing the load over multiple workers. We are going to do an operation that may take a minute or two in large numbers, so let’s install ora to show a nice loader in the terminal, this is our only dependency.

npm init
npm install ora --save

Here is a way to compare the speed of sorting millions of random numbers (change the elements variable to the wanted number, 5million is a nice start) with one worker, 8(I have 8 CPU's on my mac) workers, and with no worker at all.<br> I had to increase the memory limit in order to handle so many floating points numbers (it started to crush above 10 million elements), you can increase the memory by applying the flag --max-old-space-size and specifying the size in megabytes (8gb in this case).

const { Worker } = require("worker_threads");
const path = require("path");
const os = require("os");
const ora = require("ora");

// this is how we can get the count of cpu's the computer has,
// using a larger number may result in the app crushing
const cpuCount = os.cpus().length;

// create some big array
const elements = 5000000;
console.log(`generating ${elements} random numbers...`)
const bigArray = Array(elements)
  .fill()
  .map(() => Math.random());

// we get the path of the script
const workerScript = path.join(__dirname, "./sorter.js");

// we turn the worker activation into a promise
const sortArrayWithWorker = arr => {
  return new Promise((resolve, reject) => {
    const worker = new Worker(workerScript, { workerData: arr });
    worker.on("message", resolve);
    worker.on("error", reject);
  });
};

// this function will distribute the array across workers
async function distributeLoadAcrossWorkers(workers) {
  // how many elements each worker should sort
  const segmentsPerWorker = Math.round(bigArray.length / workers);
  const promises = Array(workers)
    .fill()
    .map((_, index) => {
      let arrayToSort;
      if (index === 0) {
        // the first segment
        arrayToSort = bigArray.slice(0, segmentsPerWorker);
      } else if (index === workers - 1) {
        // the last segment
        arrayToSort = bigArray.slice(segmentsPerWorker * index);
      } else {
        // intermediate segments
        arrayToSort = bigArray.slice(segmentsPerWorker * index,segmentsPerWorker * (index + 1))
      }
      return sortArrayWithWorker(arrayToSort)
    });
    // merge all the segments of the array
    const segmentsResults = await Promise.all(promises);
    return segmentsResults.reduce((acc, arr) => acc.concat(arr), []);
}

// this is the main function (it's only to allow the use of async await for simplicity)
async function run() {
  const spinner = ora("Loading unicorns").start();
  spinner.color = "yellow";
  spinner.text = "sorting... this may take a while...";

  // sort with a single worker
  const start1 = Date.now();
  const result1 = await distributeLoadAcrossWorkers(1);
  console.log(
    `sorted ${result1.length} items, with 1 worker in ${Date.now() - start}ms`
  );

  // sort with no worker at all
  let start2 = Date.now();
  const result2 = bigArray.sort((a, b) => a - b);
  console.log(
    `sorted ${result2.length} items, without workers in ${Date.now() - start2}ms`
  );

  // sort with multiple workers, based on the cpu count
  const start3 = Date.now();
  const result3 = await distributeLoadAcrossWorkers(cpuCount);
  console.log(
    `sorted ${result3.length} items, with ${cpuCount} workers in ${Date.now() - start3}ms`
  );

  spinner.stop();
  console.log("\n done");
}

run();

This is the output I got:

# for 5 million items:
sorted 5000000 items, with 1 worker in 11296ms (11.2 seconds)
sorted 5000000 items, without workers in 7045ms (7 seconds)
sorted 5000000 items, with 8 workers in 2049ms (2 seconds)

# for 20 million items
sorted 20000000 items, with 1 worker in 63597ms (63.5 seconds)
sorted 20000000 items, without workers in 48189ms (48 seconds)
sorted 20000000 items, with 8 workers in 8747ms (8.7 seconds)

Well, this is a big difference, please notice that workers are given limited resources and are much slower than the main thread, but they don't block the main thread — of course, Promises can work just as well at keeping the main thread responsive, but a lot of promises running at the same time on the same thread may cause it to be slow and unresponsive, and distributing the load over a bunch of workers can save us a lot of precious time.

When working in a real-world scenario you should manage your own pool of workers and manage them so the number of running threads will not exceed the number of CPU’s, we can achieve this with worker-thread-pool, we only need to change out “sortArrayWithWorker” function, and initiate the thread pool like so:

const Pool = require("worker-threads-pool");
const pool = new Pool({ max: cpuCount });

const sortArrayWithWorker = arr => {
return new Promise((resolve, reject) => {
  pool.acquire(workerScript, { workerData: arr }, (err, worker) => {
    if (err) {
      return reject(err);
    }
    worker.once("message", resolve);
    worker.once("error", reject);
    });
  });
};

Now we can safely use more processors than there are available without fearing running out of processors (the pool will manage them for us and keep a steady number of threads running).

const result = await distributeLoadAcrossWorkers(100);

Running the threads like so will actually make them slower, before, running the 5milion example with 8 real threads took 2049 milliseconds, now with “100” threads took me 4302 milliseconds — still much faster than running it with no worker threads at all, as we increase the number, the process will be slower since the pool will have to handle more instances.

Conclusion

Worker threads are great! unfortunately, they are still very heavy and should be used with care, don’t just throw them around as a way to optimize stuff, they can be used to increase the speed of an HTML parser, for instance, or can to increase the speed of bundling.

Liron Navon

A fullstack software engineer

An engineering blog for lazy people