upload.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.GridFSBucketWriteStream = void 0;
  4. const stream_1 = require("stream");
  5. const bson_1 = require("../bson");
  6. const error_1 = require("../error");
  7. const write_concern_1 = require("./../write_concern");
  8. /**
  9. * A writable stream that enables you to write buffers to GridFS.
  10. *
  11. * Do not instantiate this class directly. Use `openUploadStream()` instead.
  12. * @public
  13. */
  14. class GridFSBucketWriteStream extends stream_1.Writable {
  15. /**
  16. * @param bucket - Handle for this stream's corresponding bucket
  17. * @param filename - The value of the 'filename' key in the files doc
  18. * @param options - Optional settings.
  19. * @internal
  20. */
  21. constructor(bucket, filename, options) {
  22. super();
  23. /**
  24. * The document containing information about the inserted file.
  25. * This property is defined _after_ the finish event has been emitted.
  26. * It will remain `null` if an error occurs.
  27. *
  28. * @example
  29. * ```ts
  30. * fs.createReadStream('file.txt')
  31. * .pipe(bucket.openUploadStream('file.txt'))
  32. * .on('finish', function () {
  33. * console.log(this.gridFSFile)
  34. * })
  35. * ```
  36. */
  37. this.gridFSFile = null;
  38. options = options ?? {};
  39. this.bucket = bucket;
  40. this.chunks = bucket.s._chunksCollection;
  41. this.filename = filename;
  42. this.files = bucket.s._filesCollection;
  43. this.options = options;
  44. this.writeConcern = write_concern_1.WriteConcern.fromOptions(options) || bucket.s.options.writeConcern;
  45. // Signals the write is all done
  46. this.done = false;
  47. this.id = options.id ? options.id : new bson_1.ObjectId();
  48. // properly inherit the default chunksize from parent
  49. this.chunkSizeBytes = options.chunkSizeBytes || this.bucket.s.options.chunkSizeBytes;
  50. this.bufToStore = Buffer.alloc(this.chunkSizeBytes);
  51. this.length = 0;
  52. this.n = 0;
  53. this.pos = 0;
  54. this.state = {
  55. streamEnd: false,
  56. outstandingRequests: 0,
  57. errored: false,
  58. aborted: false
  59. };
  60. if (!this.bucket.s.calledOpenUploadStream) {
  61. this.bucket.s.calledOpenUploadStream = true;
  62. checkIndexes(this).then(() => {
  63. this.bucket.s.checkedIndexes = true;
  64. this.bucket.emit('index');
  65. }, () => null);
  66. }
  67. }
  68. /**
  69. * @internal
  70. *
  71. * The stream is considered constructed when the indexes are done being created
  72. */
  73. _construct(callback) {
  74. if (this.bucket.s.checkedIndexes) {
  75. return process.nextTick(callback);
  76. }
  77. this.bucket.once('index', callback);
  78. }
  79. /**
  80. * @internal
  81. * Write a buffer to the stream.
  82. *
  83. * @param chunk - Buffer to write
  84. * @param encoding - Optional encoding for the buffer
  85. * @param callback - Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush.
  86. */
  87. _write(chunk, encoding, callback) {
  88. doWrite(this, chunk, encoding, callback);
  89. }
  90. /** @internal */
  91. _final(callback) {
  92. if (this.state.streamEnd) {
  93. return process.nextTick(callback);
  94. }
  95. this.state.streamEnd = true;
  96. writeRemnant(this, callback);
  97. }
  98. /**
  99. * Places this write stream into an aborted state (all future writes fail)
  100. * and deletes all chunks that have already been written.
  101. */
  102. async abort() {
  103. if (this.state.streamEnd) {
  104. // TODO(NODE-3485): Replace with MongoGridFSStreamClosed
  105. throw new error_1.MongoAPIError('Cannot abort a stream that has already completed');
  106. }
  107. if (this.state.aborted) {
  108. // TODO(NODE-3485): Replace with MongoGridFSStreamClosed
  109. throw new error_1.MongoAPIError('Cannot call abort() on a stream twice');
  110. }
  111. this.state.aborted = true;
  112. await this.chunks.deleteMany({ files_id: this.id });
  113. }
  114. }
  115. exports.GridFSBucketWriteStream = GridFSBucketWriteStream;
  116. function handleError(stream, error, callback) {
  117. if (stream.state.errored) {
  118. process.nextTick(callback);
  119. return;
  120. }
  121. stream.state.errored = true;
  122. process.nextTick(callback, error);
  123. }
  124. function createChunkDoc(filesId, n, data) {
  125. return {
  126. _id: new bson_1.ObjectId(),
  127. files_id: filesId,
  128. n,
  129. data
  130. };
  131. }
  132. async function checkChunksIndex(stream) {
  133. const index = { files_id: 1, n: 1 };
  134. let indexes;
  135. try {
  136. indexes = await stream.chunks.listIndexes().toArray();
  137. }
  138. catch (error) {
  139. if (error instanceof error_1.MongoError && error.code === error_1.MONGODB_ERROR_CODES.NamespaceNotFound) {
  140. indexes = [];
  141. }
  142. else {
  143. throw error;
  144. }
  145. }
  146. const hasChunksIndex = !!indexes.find(index => {
  147. const keys = Object.keys(index.key);
  148. if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) {
  149. return true;
  150. }
  151. return false;
  152. });
  153. if (!hasChunksIndex) {
  154. await stream.chunks.createIndex(index, {
  155. ...stream.writeConcern,
  156. background: true,
  157. unique: true
  158. });
  159. }
  160. }
  161. function checkDone(stream, callback) {
  162. if (stream.done) {
  163. return process.nextTick(callback);
  164. }
  165. if (stream.state.streamEnd && stream.state.outstandingRequests === 0 && !stream.state.errored) {
  166. // Set done so we do not trigger duplicate createFilesDoc
  167. stream.done = true;
  168. // Create a new files doc
  169. const gridFSFile = createFilesDoc(stream.id, stream.length, stream.chunkSizeBytes, stream.filename, stream.options.contentType, stream.options.aliases, stream.options.metadata);
  170. if (isAborted(stream, callback)) {
  171. return;
  172. }
  173. stream.files.insertOne(gridFSFile, { writeConcern: stream.writeConcern }).then(() => {
  174. stream.gridFSFile = gridFSFile;
  175. callback();
  176. }, error => handleError(stream, error, callback));
  177. return;
  178. }
  179. process.nextTick(callback);
  180. }
  181. async function checkIndexes(stream) {
  182. const doc = await stream.files.findOne({}, { projection: { _id: 1 } });
  183. if (doc != null) {
  184. // If at least one document exists assume the collection has the required index
  185. return;
  186. }
  187. const index = { filename: 1, uploadDate: 1 };
  188. let indexes;
  189. try {
  190. indexes = await stream.files.listIndexes().toArray();
  191. }
  192. catch (error) {
  193. if (error instanceof error_1.MongoError && error.code === error_1.MONGODB_ERROR_CODES.NamespaceNotFound) {
  194. indexes = [];
  195. }
  196. else {
  197. throw error;
  198. }
  199. }
  200. const hasFileIndex = !!indexes.find(index => {
  201. const keys = Object.keys(index.key);
  202. if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) {
  203. return true;
  204. }
  205. return false;
  206. });
  207. if (!hasFileIndex) {
  208. await stream.files.createIndex(index, { background: false });
  209. }
  210. await checkChunksIndex(stream);
  211. }
  212. function createFilesDoc(_id, length, chunkSize, filename, contentType, aliases, metadata) {
  213. const ret = {
  214. _id,
  215. length,
  216. chunkSize,
  217. uploadDate: new Date(),
  218. filename
  219. };
  220. if (contentType) {
  221. ret.contentType = contentType;
  222. }
  223. if (aliases) {
  224. ret.aliases = aliases;
  225. }
  226. if (metadata) {
  227. ret.metadata = metadata;
  228. }
  229. return ret;
  230. }
  231. function doWrite(stream, chunk, encoding, callback) {
  232. if (isAborted(stream, callback)) {
  233. return;
  234. }
  235. const inputBuf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
  236. stream.length += inputBuf.length;
  237. // Input is small enough to fit in our buffer
  238. if (stream.pos + inputBuf.length < stream.chunkSizeBytes) {
  239. inputBuf.copy(stream.bufToStore, stream.pos);
  240. stream.pos += inputBuf.length;
  241. process.nextTick(callback);
  242. return;
  243. }
  244. // Otherwise, buffer is too big for current chunk, so we need to flush
  245. // to MongoDB.
  246. let inputBufRemaining = inputBuf.length;
  247. let spaceRemaining = stream.chunkSizeBytes - stream.pos;
  248. let numToCopy = Math.min(spaceRemaining, inputBuf.length);
  249. let outstandingRequests = 0;
  250. while (inputBufRemaining > 0) {
  251. const inputBufPos = inputBuf.length - inputBufRemaining;
  252. inputBuf.copy(stream.bufToStore, stream.pos, inputBufPos, inputBufPos + numToCopy);
  253. stream.pos += numToCopy;
  254. spaceRemaining -= numToCopy;
  255. let doc;
  256. if (spaceRemaining === 0) {
  257. doc = createChunkDoc(stream.id, stream.n, Buffer.from(stream.bufToStore));
  258. ++stream.state.outstandingRequests;
  259. ++outstandingRequests;
  260. if (isAborted(stream, callback)) {
  261. return;
  262. }
  263. stream.chunks.insertOne(doc, { writeConcern: stream.writeConcern }).then(() => {
  264. --stream.state.outstandingRequests;
  265. --outstandingRequests;
  266. if (!outstandingRequests) {
  267. checkDone(stream, callback);
  268. }
  269. }, error => handleError(stream, error, callback));
  270. spaceRemaining = stream.chunkSizeBytes;
  271. stream.pos = 0;
  272. ++stream.n;
  273. }
  274. inputBufRemaining -= numToCopy;
  275. numToCopy = Math.min(spaceRemaining, inputBufRemaining);
  276. }
  277. }
  278. function writeRemnant(stream, callback) {
  279. // Buffer is empty, so don't bother to insert
  280. if (stream.pos === 0) {
  281. return checkDone(stream, callback);
  282. }
  283. ++stream.state.outstandingRequests;
  284. // Create a new buffer to make sure the buffer isn't bigger than it needs
  285. // to be.
  286. const remnant = Buffer.alloc(stream.pos);
  287. stream.bufToStore.copy(remnant, 0, 0, stream.pos);
  288. const doc = createChunkDoc(stream.id, stream.n, remnant);
  289. // If the stream was aborted, do not write remnant
  290. if (isAborted(stream, callback)) {
  291. return;
  292. }
  293. stream.chunks.insertOne(doc, { writeConcern: stream.writeConcern }).then(() => {
  294. --stream.state.outstandingRequests;
  295. checkDone(stream, callback);
  296. }, error => handleError(stream, error, callback));
  297. }
  298. function isAborted(stream, callback) {
  299. if (stream.state.aborted) {
  300. process.nextTick(callback, new error_1.MongoAPIError('Stream has been aborted'));
  301. return true;
  302. }
  303. return false;
  304. }
  305. //# sourceMappingURL=upload.js.map