put.js 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. 'use strict'
  2. const index = require('./entry-index')
  3. const memo = require('./memoization')
  4. const write = require('./content/write')
  5. const Flush = require('minipass-flush')
  6. const { PassThrough } = require('minipass-collect')
  7. const Pipeline = require('minipass-pipeline')
  8. const putOpts = (opts) => ({
  9. algorithms: ['sha512'],
  10. ...opts,
  11. })
  12. module.exports = putData
  13. async function putData (cache, key, data, opts = {}) {
  14. const { memoize } = opts
  15. opts = putOpts(opts)
  16. const res = await write(cache, data, opts)
  17. const entry = await index.insert(cache, key, res.integrity, { ...opts, size: res.size })
  18. if (memoize) {
  19. memo.put(cache, entry, data, opts)
  20. }
  21. return res.integrity
  22. }
  23. module.exports.stream = putStream
  24. function putStream (cache, key, opts = {}) {
  25. const { memoize } = opts
  26. opts = putOpts(opts)
  27. let integrity
  28. let size
  29. let error
  30. let memoData
  31. const pipeline = new Pipeline()
  32. // first item in the pipeline is the memoizer, because we need
  33. // that to end first and get the collected data.
  34. if (memoize) {
  35. const memoizer = new PassThrough().on('collect', data => {
  36. memoData = data
  37. })
  38. pipeline.push(memoizer)
  39. }
  40. // contentStream is a write-only, not a passthrough
  41. // no data comes out of it.
  42. const contentStream = write.stream(cache, opts)
  43. .on('integrity', (int) => {
  44. integrity = int
  45. })
  46. .on('size', (s) => {
  47. size = s
  48. })
  49. .on('error', (err) => {
  50. error = err
  51. })
  52. pipeline.push(contentStream)
  53. // last but not least, we write the index and emit hash and size,
  54. // and memoize if we're doing that
  55. pipeline.push(new Flush({
  56. async flush () {
  57. if (!error) {
  58. const entry = await index.insert(cache, key, integrity, { ...opts, size })
  59. if (memoize && memoData) {
  60. memo.put(cache, entry, memoData, opts)
  61. }
  62. pipeline.emit('integrity', integrity)
  63. pipeline.emit('size', size)
  64. }
  65. },
  66. }))
  67. return pipeline
  68. }