moveAndMaybeCompressFile.js 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. const debug = require('debug')('streamroller:moveAndMaybeCompressFile');
  2. const fs = require('fs-extra');
  3. const zlib = require('zlib');
  4. const _parseOption = function(rawOptions){
  5. const defaultOptions = {
  6. mode: parseInt("0600", 8),
  7. compress: false,
  8. };
  9. const options = Object.assign({}, defaultOptions, rawOptions);
  10. debug(`_parseOption: moveAndMaybeCompressFile called with option=${JSON.stringify(options)}`);
  11. return options;
  12. };
  13. const moveAndMaybeCompressFile = async (
  14. sourceFilePath,
  15. targetFilePath,
  16. options
  17. ) => {
  18. options = _parseOption(options);
  19. if (sourceFilePath === targetFilePath) {
  20. debug(`moveAndMaybeCompressFile: source and target are the same, not doing anything`);
  21. return;
  22. }
  23. if (await fs.pathExists(sourceFilePath)) {
  24. debug(
  25. `moveAndMaybeCompressFile: moving file from ${sourceFilePath} to ${targetFilePath} ${
  26. options.compress ? "with" : "without"
  27. } compress`
  28. );
  29. if (options.compress) {
  30. await new Promise((resolve, reject) => {
  31. let isCreated = false;
  32. // to avoid concurrency, the forked process which can create the file will proceed (using flags wx)
  33. const writeStream = fs.createWriteStream(targetFilePath, { mode: options.mode, flags: "wx" })
  34. // wait until writable stream is valid before proceeding to read
  35. .on("open", () => {
  36. isCreated = true;
  37. const readStream = fs.createReadStream(sourceFilePath)
  38. // wait until readable stream is valid before piping
  39. .on("open", () => {
  40. readStream.pipe(zlib.createGzip()).pipe(writeStream);
  41. })
  42. .on("error", (e) => {
  43. debug(`moveAndMaybeCompressFile: error reading ${sourceFilePath}`, e);
  44. // manually close writable: https://nodejs.org/api/stream.html#readablepipedestination-options
  45. writeStream.destroy(e);
  46. });
  47. })
  48. .on("finish", () => {
  49. debug(`moveAndMaybeCompressFile: finished compressing ${targetFilePath}, deleting ${sourceFilePath}`);
  50. // delete sourceFilePath
  51. fs.unlink(sourceFilePath)
  52. .then(resolve)
  53. .catch((e) => {
  54. debug(`moveAndMaybeCompressFile: error deleting ${sourceFilePath}, truncating instead`, e);
  55. // fallback to truncate
  56. fs.truncate(sourceFilePath)
  57. .then(resolve)
  58. .catch((e) => {
  59. debug(`moveAndMaybeCompressFile: error truncating ${sourceFilePath}`, e);
  60. reject(e);
  61. });
  62. });
  63. })
  64. .on("error", (e) => {
  65. if (!isCreated) {
  66. debug(`moveAndMaybeCompressFile: error creating ${targetFilePath}`, e);
  67. // do not do anything if handled by another forked process
  68. reject(e);
  69. } else {
  70. debug(`moveAndMaybeCompressFile: error writing ${targetFilePath}, deleting`, e);
  71. // delete targetFilePath (taking as nothing happened)
  72. fs.unlink(targetFilePath)
  73. .then(() => { reject(e); })
  74. .catch((e) => {
  75. debug(`moveAndMaybeCompressFile: error deleting ${targetFilePath}`, e);
  76. reject(e);
  77. });
  78. }
  79. });
  80. }).catch(() => {});
  81. } else {
  82. debug(`moveAndMaybeCompressFile: renaming ${sourceFilePath} to ${targetFilePath}`);
  83. try {
  84. await fs.move(sourceFilePath, targetFilePath, { overwrite: true });
  85. } catch (e) {
  86. debug(`moveAndMaybeCompressFile: error renaming ${sourceFilePath} to ${targetFilePath}`, e);
  87. debug(`moveAndMaybeCompressFile: trying copy+truncate instead`);
  88. await fs.copy(sourceFilePath, targetFilePath, { overwrite: true });
  89. await fs.truncate(sourceFilePath);
  90. }
  91. }
  92. }
  93. };
  94. module.exports = moveAndMaybeCompressFile;