node-multiprocess.js 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. /**
  2. * FaceAPI Demo for NodeJS
  3. * - Starts multiple worker processes and uses them as worker pool to process all input images
  4. * - Images are enumerated in main process and sent for processing to worker processes via ipc
  5. */
  6. const fs = require('fs');
  7. const path = require('path');
  8. const log = require('@vladmandic/pilogger'); // this is my simple logger with few extra features
  9. const child_process = require('child_process');
  10. // note that main process does not need to import faceapi or tfjs at all as processing is done in a worker process
  11. const imgPathRoot = './demo'; // modify to include your sample images
  12. const numWorkers = 4; // how many workers will be started
  13. const workers = []; // this holds worker processes
  14. const images = []; // this holds queue of enumerated images
  15. const t = []; // timers
  16. let numImages;
  17. // trigered by main when worker sends ready message
  18. // if image pool is empty, signal worker to exit otherwise dispatch image to worker and remove image from queue
  19. async function detect(worker) {
  20. if (!t[2]) t[2] = process.hrtime.bigint(); // first time do a timestamp so we can measure initial latency
  21. if (images.length === numImages) worker.send({ test: true }); // for first image in queue just measure latency
  22. if (images.length === 0) worker.send({ exit: true }); // nothing left in queue
  23. else {
  24. log.state('Main: dispatching to worker:', worker.pid);
  25. worker.send({ image: images[0] });
  26. images.shift();
  27. }
  28. }
  29. // loop that waits for all workers to complete
  30. function waitCompletion() {
  31. const activeWorkers = workers.reduce((any, worker) => (any += worker.connected ? 1 : 0), 0);
  32. if (activeWorkers > 0) setImmediate(() => waitCompletion());
  33. else {
  34. t[1] = process.hrtime.bigint();
  35. log.info('Processed:', numImages, 'images in', 'total:', Math.trunc(Number(t[1] - t[0]) / 1000000), 'ms', 'working:', Math.trunc(Number(t[1] - t[2]) / 1000000), 'ms', 'average:', Math.trunc(Number(t[1] - t[2]) / numImages / 1000000), 'ms');
  36. }
  37. }
  38. function measureLatency() {
  39. t[3] = process.hrtime.bigint();
  40. const latencyInitialization = Math.trunc(Number(t[2] - t[0]) / 1000 / 1000);
  41. const latencyRoundTrip = Math.trunc(Number(t[3] - t[2]) / 1000 / 1000);
  42. log.info('Latency: worker initializtion: ', latencyInitialization, 'message round trip:', latencyRoundTrip);
  43. }
  44. async function main() {
  45. log.header();
  46. log.info('FaceAPI multi-process test');
  47. // enumerate all images into queue
  48. const dir = fs.readdirSync(imgPathRoot);
  49. for (const imgFile of dir) {
  50. if (imgFile.toLocaleLowerCase().endsWith('.jpg')) images.push(path.join(imgPathRoot, imgFile));
  51. }
  52. numImages = images.length;
  53. t[0] = process.hrtime.bigint();
  54. // manage worker processes
  55. for (let i = 0; i < numWorkers; i++) {
  56. // create worker process
  57. workers[i] = await child_process.fork('demo/node-multiprocess-worker.js', ['special']);
  58. // parse message that worker process sends back to main
  59. // if message is ready, dispatch next image in queue
  60. // if message is processing result, just print how many faces were detected
  61. // otherwise it's an unknown message
  62. workers[i].on('message', (msg) => {
  63. if (msg.ready) detect(workers[i]);
  64. else if (msg.image) log.data('Main: worker finished:', workers[i].pid, 'detected faces:', msg.detected.length);
  65. else if (msg.test) measureLatency();
  66. else log.data('Main: worker message:', workers[i].pid, msg);
  67. });
  68. // just log when worker exits
  69. workers[i].on('exit', (msg) => log.state('Main: worker exit:', workers[i].pid, msg));
  70. // just log which worker was started
  71. log.state('Main: started worker:', workers[i].pid);
  72. }
  73. // wait for all workers to complete
  74. waitCompletion();
  75. }
  76. main();