download.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. import { Readable } from 'stream';
  2. import type { Document, ObjectId } from '../bson';
  3. import type { Collection } from '../collection';
  4. import type { FindCursor } from '../cursor/find_cursor';
  5. import {
  6. MongoGridFSChunkError,
  7. MongoGridFSStreamError,
  8. MongoInvalidArgumentError,
  9. MongoRuntimeError
  10. } from '../error';
  11. import type { FindOptions } from '../operations/find';
  12. import type { ReadPreference } from '../read_preference';
  13. import type { Sort } from '../sort';
  14. import type { Callback } from '../utils';
  15. import type { GridFSChunk } from './upload';
  16. /** @public */
  17. export interface GridFSBucketReadStreamOptions {
  18. sort?: Sort;
  19. skip?: number;
  20. /**
  21. * 0-indexed non-negative byte offset from the beginning of the file
  22. */
  23. start?: number;
  24. /**
  25. * 0-indexed non-negative byte offset to the end of the file contents
  26. * to be returned by the stream. `end` is non-inclusive
  27. */
  28. end?: number;
  29. }
  30. /** @public */
  31. export interface GridFSBucketReadStreamOptionsWithRevision extends GridFSBucketReadStreamOptions {
  32. /** The revision number relative to the oldest file with the given filename. 0
  33. * gets you the oldest file, 1 gets you the 2nd oldest, -1 gets you the
  34. * newest. */
  35. revision?: number;
  36. }
  37. /** @public */
  38. export interface GridFSFile {
  39. _id: ObjectId;
  40. length: number;
  41. chunkSize: number;
  42. filename: string;
  43. contentType?: string;
  44. aliases?: string[];
  45. metadata?: Document;
  46. uploadDate: Date;
  47. }
  48. /** @internal */
  49. export interface GridFSBucketReadStreamPrivate {
  50. bytesRead: number;
  51. bytesToTrim: number;
  52. bytesToSkip: number;
  53. chunks: Collection<GridFSChunk>;
  54. cursor?: FindCursor<GridFSChunk>;
  55. expected: number;
  56. files: Collection<GridFSFile>;
  57. filter: Document;
  58. init: boolean;
  59. expectedEnd: number;
  60. file?: GridFSFile;
  61. options: {
  62. sort?: Sort;
  63. skip?: number;
  64. start: number;
  65. end: number;
  66. };
  67. readPreference?: ReadPreference;
  68. }
  69. /**
  70. * A readable stream that enables you to read buffers from GridFS.
  71. *
  72. * Do not instantiate this class directly. Use `openDownloadStream()` instead.
  73. * @public
  74. */
  75. export class GridFSBucketReadStream extends Readable {
  76. /** @internal */
  77. s: GridFSBucketReadStreamPrivate;
  78. /**
  79. * Fires when the stream loaded the file document corresponding to the provided id.
  80. * @event
  81. */
  82. static readonly FILE = 'file' as const;
  83. /**
  84. * @param chunks - Handle for chunks collection
  85. * @param files - Handle for files collection
  86. * @param readPreference - The read preference to use
  87. * @param filter - The filter to use to find the file document
  88. * @internal
  89. */
  90. constructor(
  91. chunks: Collection<GridFSChunk>,
  92. files: Collection<GridFSFile>,
  93. readPreference: ReadPreference | undefined,
  94. filter: Document,
  95. options?: GridFSBucketReadStreamOptions
  96. ) {
  97. super({ emitClose: true });
  98. this.s = {
  99. bytesToTrim: 0,
  100. bytesToSkip: 0,
  101. bytesRead: 0,
  102. chunks,
  103. expected: 0,
  104. files,
  105. filter,
  106. init: false,
  107. expectedEnd: 0,
  108. options: {
  109. start: 0,
  110. end: 0,
  111. ...options
  112. },
  113. readPreference
  114. };
  115. }
  116. /**
  117. * Reads from the cursor and pushes to the stream.
  118. * Private Impl, do not call directly
  119. * @internal
  120. */
  121. override _read(): void {
  122. if (this.destroyed) return;
  123. waitForFile(this, () => doRead(this));
  124. }
  125. /**
  126. * Sets the 0-based offset in bytes to start streaming from. Throws
  127. * an error if this stream has entered flowing mode
  128. * (e.g. if you've already called `on('data')`)
  129. *
  130. * @param start - 0-based offset in bytes to start streaming from
  131. */
  132. start(start = 0): this {
  133. throwIfInitialized(this);
  134. this.s.options.start = start;
  135. return this;
  136. }
  137. /**
  138. * Sets the 0-based offset in bytes to start streaming from. Throws
  139. * an error if this stream has entered flowing mode
  140. * (e.g. if you've already called `on('data')`)
  141. *
  142. * @param end - Offset in bytes to stop reading at
  143. */
  144. end(end = 0): this {
  145. throwIfInitialized(this);
  146. this.s.options.end = end;
  147. return this;
  148. }
  149. /**
  150. * Marks this stream as aborted (will never push another `data` event)
  151. * and kills the underlying cursor. Will emit the 'end' event, and then
  152. * the 'close' event once the cursor is successfully killed.
  153. */
  154. async abort(): Promise<void> {
  155. this.push(null);
  156. this.destroy();
  157. await this.s.cursor?.close();
  158. }
  159. }
  160. function throwIfInitialized(stream: GridFSBucketReadStream): void {
  161. if (stream.s.init) {
  162. throw new MongoGridFSStreamError('Options cannot be changed after the stream is initialized');
  163. }
  164. }
  165. function doRead(stream: GridFSBucketReadStream): void {
  166. if (stream.destroyed) return;
  167. if (!stream.s.cursor) return;
  168. if (!stream.s.file) return;
  169. const handleReadResult = ({
  170. error,
  171. doc
  172. }: { error: Error; doc: null } | { error: null; doc: any }) => {
  173. if (stream.destroyed) {
  174. return;
  175. }
  176. if (error) {
  177. stream.destroy(error);
  178. return;
  179. }
  180. if (!doc) {
  181. stream.push(null);
  182. stream.s.cursor?.close().then(
  183. () => null,
  184. error => stream.destroy(error)
  185. );
  186. return;
  187. }
  188. if (!stream.s.file) return;
  189. const bytesRemaining = stream.s.file.length - stream.s.bytesRead;
  190. const expectedN = stream.s.expected++;
  191. const expectedLength = Math.min(stream.s.file.chunkSize, bytesRemaining);
  192. if (doc.n > expectedN) {
  193. return stream.destroy(
  194. new MongoGridFSChunkError(
  195. `ChunkIsMissing: Got unexpected n: ${doc.n}, expected: ${expectedN}`
  196. )
  197. );
  198. }
  199. if (doc.n < expectedN) {
  200. return stream.destroy(
  201. new MongoGridFSChunkError(`ExtraChunk: Got unexpected n: ${doc.n}, expected: ${expectedN}`)
  202. );
  203. }
  204. let buf = Buffer.isBuffer(doc.data) ? doc.data : doc.data.buffer;
  205. if (buf.byteLength !== expectedLength) {
  206. if (bytesRemaining <= 0) {
  207. return stream.destroy(
  208. new MongoGridFSChunkError(
  209. `ExtraChunk: Got unexpected n: ${doc.n}, expected file length ${stream.s.file.length} bytes but already read ${stream.s.bytesRead} bytes`
  210. )
  211. );
  212. }
  213. return stream.destroy(
  214. new MongoGridFSChunkError(
  215. `ChunkIsWrongSize: Got unexpected length: ${buf.byteLength}, expected: ${expectedLength}`
  216. )
  217. );
  218. }
  219. stream.s.bytesRead += buf.byteLength;
  220. if (buf.byteLength === 0) {
  221. return stream.push(null);
  222. }
  223. let sliceStart = null;
  224. let sliceEnd = null;
  225. if (stream.s.bytesToSkip != null) {
  226. sliceStart = stream.s.bytesToSkip;
  227. stream.s.bytesToSkip = 0;
  228. }
  229. const atEndOfStream = expectedN === stream.s.expectedEnd - 1;
  230. const bytesLeftToRead = stream.s.options.end - stream.s.bytesToSkip;
  231. if (atEndOfStream && stream.s.bytesToTrim != null) {
  232. sliceEnd = stream.s.file.chunkSize - stream.s.bytesToTrim;
  233. } else if (stream.s.options.end && bytesLeftToRead < doc.data.byteLength) {
  234. sliceEnd = bytesLeftToRead;
  235. }
  236. if (sliceStart != null || sliceEnd != null) {
  237. buf = buf.slice(sliceStart || 0, sliceEnd || buf.byteLength);
  238. }
  239. stream.push(buf);
  240. return;
  241. };
  242. stream.s.cursor.next().then(
  243. doc => handleReadResult({ error: null, doc }),
  244. error => handleReadResult({ error, doc: null })
  245. );
  246. }
  247. function init(stream: GridFSBucketReadStream): void {
  248. const findOneOptions: FindOptions = {};
  249. if (stream.s.readPreference) {
  250. findOneOptions.readPreference = stream.s.readPreference;
  251. }
  252. if (stream.s.options && stream.s.options.sort) {
  253. findOneOptions.sort = stream.s.options.sort;
  254. }
  255. if (stream.s.options && stream.s.options.skip) {
  256. findOneOptions.skip = stream.s.options.skip;
  257. }
  258. const handleReadResult = ({
  259. error,
  260. doc
  261. }: { error: Error; doc: null } | { error: null; doc: any }) => {
  262. if (error) {
  263. return stream.destroy(error);
  264. }
  265. if (!doc) {
  266. const identifier = stream.s.filter._id
  267. ? stream.s.filter._id.toString()
  268. : stream.s.filter.filename;
  269. const errmsg = `FileNotFound: file ${identifier} was not found`;
  270. // TODO(NODE-3483)
  271. const err = new MongoRuntimeError(errmsg);
  272. err.code = 'ENOENT'; // TODO: NODE-3338 set property as part of constructor
  273. return stream.destroy(err);
  274. }
  275. // If document is empty, kill the stream immediately and don't
  276. // execute any reads
  277. if (doc.length <= 0) {
  278. stream.push(null);
  279. return;
  280. }
  281. if (stream.destroyed) {
  282. // If user destroys the stream before we have a cursor, wait
  283. // until the query is done to say we're 'closed' because we can't
  284. // cancel a query.
  285. stream.destroy();
  286. return;
  287. }
  288. try {
  289. stream.s.bytesToSkip = handleStartOption(stream, doc, stream.s.options);
  290. } catch (error) {
  291. return stream.destroy(error);
  292. }
  293. const filter: Document = { files_id: doc._id };
  294. // Currently (MongoDB 3.4.4) skip function does not support the index,
  295. // it needs to retrieve all the documents first and then skip them. (CS-25811)
  296. // As work around we use $gte on the "n" field.
  297. if (stream.s.options && stream.s.options.start != null) {
  298. const skip = Math.floor(stream.s.options.start / doc.chunkSize);
  299. if (skip > 0) {
  300. filter['n'] = { $gte: skip };
  301. }
  302. }
  303. stream.s.cursor = stream.s.chunks.find(filter).sort({ n: 1 });
  304. if (stream.s.readPreference) {
  305. stream.s.cursor.withReadPreference(stream.s.readPreference);
  306. }
  307. stream.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize);
  308. stream.s.file = doc as GridFSFile;
  309. try {
  310. stream.s.bytesToTrim = handleEndOption(stream, doc, stream.s.cursor, stream.s.options);
  311. } catch (error) {
  312. return stream.destroy(error);
  313. }
  314. stream.emit(GridFSBucketReadStream.FILE, doc);
  315. return;
  316. };
  317. stream.s.files.findOne(stream.s.filter, findOneOptions).then(
  318. doc => handleReadResult({ error: null, doc }),
  319. error => handleReadResult({ error, doc: null })
  320. );
  321. }
  322. function waitForFile(stream: GridFSBucketReadStream, callback: Callback): void {
  323. if (stream.s.file) {
  324. return callback();
  325. }
  326. if (!stream.s.init) {
  327. init(stream);
  328. stream.s.init = true;
  329. }
  330. stream.once('file', () => {
  331. callback();
  332. });
  333. }
  334. function handleStartOption(
  335. stream: GridFSBucketReadStream,
  336. doc: Document,
  337. options: GridFSBucketReadStreamOptions
  338. ): number {
  339. if (options && options.start != null) {
  340. if (options.start > doc.length) {
  341. throw new MongoInvalidArgumentError(
  342. `Stream start (${options.start}) must not be more than the length of the file (${doc.length})`
  343. );
  344. }
  345. if (options.start < 0) {
  346. throw new MongoInvalidArgumentError(`Stream start (${options.start}) must not be negative`);
  347. }
  348. if (options.end != null && options.end < options.start) {
  349. throw new MongoInvalidArgumentError(
  350. `Stream start (${options.start}) must not be greater than stream end (${options.end})`
  351. );
  352. }
  353. stream.s.bytesRead = Math.floor(options.start / doc.chunkSize) * doc.chunkSize;
  354. stream.s.expected = Math.floor(options.start / doc.chunkSize);
  355. return options.start - stream.s.bytesRead;
  356. }
  357. throw new MongoInvalidArgumentError('Start option must be defined');
  358. }
  359. function handleEndOption(
  360. stream: GridFSBucketReadStream,
  361. doc: Document,
  362. cursor: FindCursor<GridFSChunk>,
  363. options: GridFSBucketReadStreamOptions
  364. ) {
  365. if (options && options.end != null) {
  366. if (options.end > doc.length) {
  367. throw new MongoInvalidArgumentError(
  368. `Stream end (${options.end}) must not be more than the length of the file (${doc.length})`
  369. );
  370. }
  371. if (options.start == null || options.start < 0) {
  372. throw new MongoInvalidArgumentError(`Stream end (${options.end}) must not be negative`);
  373. }
  374. const start = options.start != null ? Math.floor(options.start / doc.chunkSize) : 0;
  375. cursor.limit(Math.ceil(options.end / doc.chunkSize) - start);
  376. stream.s.expectedEnd = Math.ceil(options.end / doc.chunkSize);
  377. return Math.ceil(options.end / doc.chunkSize) * doc.chunkSize - options.end;
  378. }
  379. throw new MongoInvalidArgumentError('End option must be defined');
  380. }