write.js 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. 'use strict'
  2. const events = require('events')
  3. const contentPath = require('./path')
  4. const fs = require('fs/promises')
  5. const { moveFile } = require('@npmcli/fs')
  6. const { Minipass } = require('minipass')
  7. const Pipeline = require('minipass-pipeline')
  8. const Flush = require('minipass-flush')
  9. const path = require('path')
  10. const ssri = require('ssri')
  11. const uniqueFilename = require('unique-filename')
  12. const fsm = require('fs-minipass')
  13. module.exports = write
  14. // Cache of move operations in process so we don't duplicate
  15. const moveOperations = new Map()
  16. async function write (cache, data, opts = {}) {
  17. const { algorithms, size, integrity } = opts
  18. if (typeof size === 'number' && data.length !== size) {
  19. throw sizeError(size, data.length)
  20. }
  21. const sri = ssri.fromData(data, algorithms ? { algorithms } : {})
  22. if (integrity && !ssri.checkData(data, integrity, opts)) {
  23. throw checksumError(integrity, sri)
  24. }
  25. for (const algo in sri) {
  26. const tmp = await makeTmp(cache, opts)
  27. const hash = sri[algo].toString()
  28. try {
  29. await fs.writeFile(tmp.target, data, { flag: 'wx' })
  30. await moveToDestination(tmp, cache, hash, opts)
  31. } finally {
  32. if (!tmp.moved) {
  33. await fs.rm(tmp.target, { recursive: true, force: true })
  34. }
  35. }
  36. }
  37. return { integrity: sri, size: data.length }
  38. }
  39. module.exports.stream = writeStream
  40. // writes proxied to the 'inputStream' that is passed to the Promise
  41. // 'end' is deferred until content is handled.
  42. class CacacheWriteStream extends Flush {
  43. constructor (cache, opts) {
  44. super()
  45. this.opts = opts
  46. this.cache = cache
  47. this.inputStream = new Minipass()
  48. this.inputStream.on('error', er => this.emit('error', er))
  49. this.inputStream.on('drain', () => this.emit('drain'))
  50. this.handleContentP = null
  51. }
  52. write (chunk, encoding, cb) {
  53. if (!this.handleContentP) {
  54. this.handleContentP = handleContent(
  55. this.inputStream,
  56. this.cache,
  57. this.opts
  58. )
  59. this.handleContentP.catch(error => this.emit('error', error))
  60. }
  61. return this.inputStream.write(chunk, encoding, cb)
  62. }
  63. flush (cb) {
  64. this.inputStream.end(() => {
  65. if (!this.handleContentP) {
  66. const e = new Error('Cache input stream was empty')
  67. e.code = 'ENODATA'
  68. // empty streams are probably emitting end right away.
  69. // defer this one tick by rejecting a promise on it.
  70. return Promise.reject(e).catch(cb)
  71. }
  72. // eslint-disable-next-line promise/catch-or-return
  73. this.handleContentP.then(
  74. (res) => {
  75. res.integrity && this.emit('integrity', res.integrity)
  76. // eslint-disable-next-line promise/always-return
  77. res.size !== null && this.emit('size', res.size)
  78. cb()
  79. },
  80. (er) => cb(er)
  81. )
  82. })
  83. }
  84. }
  85. function writeStream (cache, opts = {}) {
  86. return new CacacheWriteStream(cache, opts)
  87. }
  88. async function handleContent (inputStream, cache, opts) {
  89. const tmp = await makeTmp(cache, opts)
  90. try {
  91. const res = await pipeToTmp(inputStream, cache, tmp.target, opts)
  92. await moveToDestination(
  93. tmp,
  94. cache,
  95. res.integrity,
  96. opts
  97. )
  98. return res
  99. } finally {
  100. if (!tmp.moved) {
  101. await fs.rm(tmp.target, { recursive: true, force: true })
  102. }
  103. }
  104. }
  105. async function pipeToTmp (inputStream, cache, tmpTarget, opts) {
  106. const outStream = new fsm.WriteStream(tmpTarget, {
  107. flags: 'wx',
  108. })
  109. if (opts.integrityEmitter) {
  110. // we need to create these all simultaneously since they can fire in any order
  111. const [integrity, size] = await Promise.all([
  112. events.once(opts.integrityEmitter, 'integrity').then(res => res[0]),
  113. events.once(opts.integrityEmitter, 'size').then(res => res[0]),
  114. new Pipeline(inputStream, outStream).promise(),
  115. ])
  116. return { integrity, size }
  117. }
  118. let integrity
  119. let size
  120. const hashStream = ssri.integrityStream({
  121. integrity: opts.integrity,
  122. algorithms: opts.algorithms,
  123. size: opts.size,
  124. })
  125. hashStream.on('integrity', i => {
  126. integrity = i
  127. })
  128. hashStream.on('size', s => {
  129. size = s
  130. })
  131. const pipeline = new Pipeline(inputStream, hashStream, outStream)
  132. await pipeline.promise()
  133. return { integrity, size }
  134. }
  135. async function makeTmp (cache, opts) {
  136. const tmpTarget = uniqueFilename(path.join(cache, 'tmp'), opts.tmpPrefix)
  137. await fs.mkdir(path.dirname(tmpTarget), { recursive: true })
  138. return {
  139. target: tmpTarget,
  140. moved: false,
  141. }
  142. }
  143. async function moveToDestination (tmp, cache, sri) {
  144. const destination = contentPath(cache, sri)
  145. const destDir = path.dirname(destination)
  146. if (moveOperations.has(destination)) {
  147. return moveOperations.get(destination)
  148. }
  149. moveOperations.set(
  150. destination,
  151. fs.mkdir(destDir, { recursive: true })
  152. .then(async () => {
  153. await moveFile(tmp.target, destination, { overwrite: false })
  154. tmp.moved = true
  155. return tmp.moved
  156. })
  157. .catch(err => {
  158. if (!err.message.startsWith('The destination file exists')) {
  159. throw Object.assign(err, { code: 'EEXIST' })
  160. }
  161. }).finally(() => {
  162. moveOperations.delete(destination)
  163. })
  164. )
  165. return moveOperations.get(destination)
  166. }
  167. function sizeError (expected, found) {
  168. /* eslint-disable-next-line max-len */
  169. const err = new Error(`Bad data size: expected inserted data to be ${expected} bytes, but got ${found} instead`)
  170. err.expected = expected
  171. err.found = found
  172. err.code = 'EBADSIZE'
  173. return err
  174. }
  175. function checksumError (expected, found) {
  176. const err = new Error(`Integrity check failed:
  177. Wanted: ${expected}
  178. Found: ${found}`)
  179. err.code = 'EINTEGRITY'
  180. err.expected = expected
  181. err.found = found
  182. return err
  183. }