bulk-writer.js 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.BulkWriter = exports.BulkWriterError = exports.DEFAULT_JITTER_FACTOR = exports.DEFAULT_MAXIMUM_OPS_PER_SECOND_LIMIT = exports.DEFAULT_INITIAL_OPS_PER_SECOND_LIMIT = exports.RETRY_MAX_BATCH_SIZE = void 0;
  4. const assert = require("assert");
  5. const backoff_1 = require("./backoff");
  6. const rate_limiter_1 = require("./rate-limiter");
  7. const timestamp_1 = require("./timestamp");
  8. const util_1 = require("./util");
  9. const write_batch_1 = require("./write-batch");
  10. const validate_1 = require("./validate");
  11. const logger_1 = require("./logger");
  12. const trace_util_1 = require("./telemetry/trace-util");
  13. /*!
  14. * The maximum number of writes that can be in a single batch.
  15. */
  16. const MAX_BATCH_SIZE = 20;
  17. /*!
  18. * The maximum number of writes can be can in a single batch that is being retried.
  19. */
  20. exports.RETRY_MAX_BATCH_SIZE = 10;
  21. /*!
  22. * The starting maximum number of operations per second as allowed by the
  23. * 500/50/5 rule.
  24. *
  25. * https://firebase.google.com/docs/firestore/best-practices#ramping_up_traffic.
  26. */
  27. exports.DEFAULT_INITIAL_OPS_PER_SECOND_LIMIT = 500;
  28. /*!
  29. * The maximum number of operations per second as allowed by the 500/50/5 rule.
  30. * By default the rate limiter will not exceed this value.
  31. *
  32. * https://firebase.google.com/docs/firestore/best-practices#ramping_up_traffic.
  33. */
  34. exports.DEFAULT_MAXIMUM_OPS_PER_SECOND_LIMIT = 10000;
  35. /*!
  36. * The default jitter to apply to the exponential backoff used in retries. For
  37. * example, a factor of 0.3 means a 30% jitter is applied.
  38. */
  39. exports.DEFAULT_JITTER_FACTOR = 0.3;
  40. /*!
  41. * The rate by which to increase the capacity as specified by the 500/50/5 rule.
  42. */
  43. const RATE_LIMITER_MULTIPLIER = 1.5;
  44. /*!
  45. * How often the operations per second capacity should increase in milliseconds
  46. * as specified by the 500/50/5 rule.
  47. */
  48. const RATE_LIMITER_MULTIPLIER_MILLIS = 5 * 60 * 1000;
  49. /*!
  50. * The default maximum number of pending operations that can be enqueued onto a
  51. * BulkWriter instance. An operation is considered pending if BulkWriter has
  52. * sent it via RPC and is awaiting the result. BulkWriter buffers additional
  53. * writes after this many pending operations in order to avoiding going OOM.
  54. */
  55. const DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT = 500;
  56. /**
  57. * Represents a single write for BulkWriter, encapsulating operation dispatch
  58. * and error handling.
  59. * @private
  60. * @internal
  61. */
  62. class BulkWriterOperation {
  63. /**
  64. * @param ref The document reference being written to.
  65. * @param type The type of operation that created this write.
  66. * @param sendFn A callback to invoke when the operation should be sent.
  67. * @param errorFn The user provided global error callback.
  68. * @param successFn The user provided global success callback.
  69. */
  70. constructor(ref, type, sendFn, errorFn, successFn) {
  71. this.ref = ref;
  72. this.type = type;
  73. this.sendFn = sendFn;
  74. this.errorFn = errorFn;
  75. this.successFn = successFn;
  76. this.deferred = new util_1.Deferred();
  77. this.failedAttempts = 0;
  78. this._backoffDuration = 0;
  79. /** Whether flush() was called when this was the last enqueued operation. */
  80. this._flushed = false;
  81. }
  82. get promise() {
  83. return this.deferred.promise;
  84. }
  85. get backoffDuration() {
  86. return this._backoffDuration;
  87. }
  88. markFlushed() {
  89. this._flushed = true;
  90. }
  91. get flushed() {
  92. return this._flushed;
  93. }
  94. onError(error) {
  95. ++this.failedAttempts;
  96. try {
  97. const bulkWriterError = new BulkWriterError(error.code, error.message, this.ref, this.type, this.failedAttempts);
  98. const shouldRetry = this.errorFn(bulkWriterError);
  99. (0, logger_1.logger)('BulkWriter.errorFn', null, 'Ran error callback on error code:', error.code, ', shouldRetry:', shouldRetry, ' for document:', this.ref.path);
  100. if (shouldRetry) {
  101. this.lastStatus = error.code;
  102. this.updateBackoffDuration();
  103. this.sendFn(this);
  104. }
  105. else {
  106. this.deferred.reject(bulkWriterError);
  107. }
  108. }
  109. catch (userCallbackError) {
  110. this.deferred.reject(userCallbackError);
  111. }
  112. }
  113. updateBackoffDuration() {
  114. if (this.lastStatus === 8 /* StatusCode.RESOURCE_EXHAUSTED */) {
  115. this._backoffDuration = backoff_1.DEFAULT_BACKOFF_MAX_DELAY_MS;
  116. }
  117. else if (this._backoffDuration === 0) {
  118. this._backoffDuration = backoff_1.DEFAULT_BACKOFF_INITIAL_DELAY_MS;
  119. }
  120. else {
  121. this._backoffDuration *= backoff_1.DEFAULT_BACKOFF_FACTOR;
  122. }
  123. }
  124. onSuccess(result) {
  125. try {
  126. this.successFn(this.ref, result);
  127. this.deferred.resolve(result);
  128. }
  129. catch (userCallbackError) {
  130. this.deferred.reject(userCallbackError);
  131. }
  132. }
  133. }
  134. /**
  135. * Used to represent a batch on the BatchQueue.
  136. *
  137. * @private
  138. * @internal
  139. */
  140. class BulkCommitBatch extends write_batch_1.WriteBatch {
  141. constructor(firestore, maxBatchSize) {
  142. super(firestore);
  143. // The set of document reference paths present in the WriteBatch.
  144. this.docPaths = new Set();
  145. // An array of pending write operations. Only contains writes that have not
  146. // been resolved.
  147. this.pendingOps = [];
  148. this._maxBatchSize = maxBatchSize;
  149. }
  150. get maxBatchSize() {
  151. return this._maxBatchSize;
  152. }
  153. setMaxBatchSize(size) {
  154. assert(this.pendingOps.length <= size, 'New batch size cannot be less than the number of enqueued writes');
  155. this._maxBatchSize = size;
  156. }
  157. has(documentRef) {
  158. return this.docPaths.has(documentRef.path);
  159. }
  160. async bulkCommit(options = {}) {
  161. return this._firestore._traceUtil.startActiveSpan(trace_util_1.SPAN_NAME_BULK_WRITER_COMMIT, async () => {
  162. var _a;
  163. const tag = (_a = options === null || options === void 0 ? void 0 : options.requestTag) !== null && _a !== void 0 ? _a : (0, util_1.requestTag)();
  164. // Capture the error stack to preserve stack tracing across async calls.
  165. const stack = Error().stack;
  166. let response;
  167. try {
  168. (0, logger_1.logger)('BulkCommitBatch.bulkCommit', tag, `Sending next batch with ${this._opCount} writes`);
  169. const retryCodes = (0, util_1.getRetryCodes)('batchWrite');
  170. response = await this._commit({ retryCodes, methodName: 'batchWrite', requestTag: tag });
  171. }
  172. catch (err) {
  173. // Map the failure to each individual write's result.
  174. const ops = Array.from({ length: this.pendingOps.length });
  175. response = {
  176. writeResults: ops.map(() => {
  177. return {};
  178. }),
  179. status: ops.map(() => err),
  180. };
  181. }
  182. for (let i = 0; i < (response.writeResults || []).length; ++i) {
  183. // Since delete operations currently do not have write times, use a
  184. // sentinel Timestamp value.
  185. // TODO(b/158502664): Use actual delete timestamp.
  186. const DELETE_TIMESTAMP_SENTINEL = timestamp_1.Timestamp.fromMillis(0);
  187. const status = (response.status || [])[i];
  188. if (status.code === 0 /* StatusCode.OK */) {
  189. const updateTime = timestamp_1.Timestamp.fromProto(response.writeResults[i].updateTime || DELETE_TIMESTAMP_SENTINEL);
  190. this.pendingOps[i].onSuccess(new write_batch_1.WriteResult(updateTime));
  191. }
  192. else {
  193. const error = new (require('google-gax/build/src/fallback').GoogleError)(status.message || undefined);
  194. error.code = status.code;
  195. this.pendingOps[i].onError((0, util_1.wrapError)(error, stack));
  196. }
  197. }
  198. }, {
  199. [trace_util_1.ATTRIBUTE_KEY_DOC_COUNT]: this._opCount,
  200. });
  201. }
  202. /**
  203. * Helper to update data structures associated with the operation and returns
  204. * the result.
  205. */
  206. processLastOperation(op) {
  207. assert(!this.docPaths.has(op.ref.path), 'Batch should not contain writes to the same document');
  208. this.docPaths.add(op.ref.path);
  209. this.pendingOps.push(op);
  210. }
  211. }
  212. /**
  213. * Used to represent a buffered BulkWriterOperation.
  214. *
  215. * @private
  216. * @internal
  217. */
  218. class BufferedOperation {
  219. constructor(operation, sendFn) {
  220. this.operation = operation;
  221. this.sendFn = sendFn;
  222. }
  223. }
  224. /**
  225. * The error thrown when a BulkWriter operation fails.
  226. *
  227. * @class BulkWriterError
  228. */
  229. class BulkWriterError extends Error {
  230. /**
  231. * @private
  232. * @internal
  233. */
  234. constructor(
  235. /** The status code of the error. */
  236. code,
  237. /** The error message of the error. */
  238. message,
  239. /** The document reference the operation was performed on. */
  240. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  241. documentRef,
  242. /** The type of operation performed. */
  243. operationType,
  244. /** How many times this operation has been attempted unsuccessfully. */
  245. failedAttempts) {
  246. super(message);
  247. this.code = code;
  248. this.message = message;
  249. this.documentRef = documentRef;
  250. this.operationType = operationType;
  251. this.failedAttempts = failedAttempts;
  252. }
  253. }
  254. exports.BulkWriterError = BulkWriterError;
  255. /**
  256. * A Firestore BulkWriter that can be used to perform a large number of writes
  257. * in parallel.
  258. *
  259. * @class BulkWriter
  260. */
  261. class BulkWriter {
  262. // Visible for testing.
  263. /**
  264. * @private
  265. * @internal
  266. */
  267. _getBufferedOperationsCount() {
  268. return this._bufferedOperations.length;
  269. }
  270. // Visible for testing.
  271. /**
  272. * @private
  273. * @internal
  274. */
  275. _setMaxBatchSize(size) {
  276. assert(this._bulkCommitBatch.pendingOps.length === 0, 'BulkCommitBatch should be empty');
  277. this._maxBatchSize = size;
  278. this._bulkCommitBatch = new BulkCommitBatch(this.firestore, size);
  279. }
  280. // Visible for testing.
  281. /**
  282. * @private
  283. * @internal
  284. */
  285. _setMaxPendingOpCount(newMax) {
  286. this._maxPendingOpCount = newMax;
  287. }
  288. /** @private */
  289. constructor(firestore, options) {
  290. var _a, _b;
  291. this.firestore = firestore;
  292. /**
  293. * The maximum number of writes that can be in a single batch.
  294. * Visible for testing.
  295. * @private
  296. * @internal
  297. */
  298. this._maxBatchSize = MAX_BATCH_SIZE;
  299. /**
  300. * The batch that is currently used to schedule operations. Once this batch
  301. * reaches maximum capacity, a new batch is created.
  302. * @private
  303. * @internal
  304. */
  305. this._bulkCommitBatch = new BulkCommitBatch(this.firestore, this._maxBatchSize);
  306. /**
  307. * A pointer to the tail of all active BulkWriter operations. This pointer
  308. * is advanced every time a new write is enqueued.
  309. * @private
  310. * @internal
  311. */
  312. this._lastOp = Promise.resolve();
  313. /**
  314. * Whether this BulkWriter instance has started to close. Afterwards, no
  315. * new operations can be enqueued, except for retry operations scheduled by
  316. * the error handler.
  317. * @private
  318. * @internal
  319. */
  320. this._closing = false;
  321. /**
  322. * The number of pending operations enqueued on this BulkWriter instance.
  323. * An operation is considered pending if BulkWriter has sent it via RPC and
  324. * is awaiting the result.
  325. * @private
  326. * @internal
  327. */
  328. this._pendingOpsCount = 0;
  329. /**
  330. * An array containing buffered BulkWriter operations after the maximum number
  331. * of pending operations has been enqueued.
  332. * @private
  333. * @internal
  334. */
  335. this._bufferedOperations = [];
  336. /**
  337. * Whether a custom error handler has been set. BulkWriter only swallows
  338. * errors if an error handler is set. Otherwise, an UnhandledPromiseRejection
  339. * is thrown by Node if an operation promise is rejected without being
  340. * handled.
  341. * @private
  342. * @internal
  343. */
  344. this._errorHandlerSet = false;
  345. /**
  346. * The maximum number of pending operations that can be enqueued onto this
  347. * BulkWriter instance. Once the this number of writes have been enqueued,
  348. * subsequent writes are buffered.
  349. * @private
  350. * @internal
  351. */
  352. this._maxPendingOpCount = DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT;
  353. /**
  354. * The user-provided callback to be run every time a BulkWriter operation
  355. * successfully completes.
  356. * @private
  357. * @internal
  358. */
  359. this._successFn = () => { };
  360. /**
  361. * The user-provided callback to be run every time a BulkWriter operation
  362. * fails.
  363. * @private
  364. * @internal
  365. */
  366. this._errorFn = error => {
  367. const isRetryableDeleteError = error.operationType === 'delete' &&
  368. error.code === 13 /* StatusCode.INTERNAL */;
  369. const retryCodes = (0, util_1.getRetryCodes)('batchWrite');
  370. return ((retryCodes.includes(error.code) || isRetryableDeleteError) &&
  371. error.failedAttempts < backoff_1.MAX_RETRY_ATTEMPTS);
  372. };
  373. this.firestore._incrementBulkWritersCount();
  374. validateBulkWriterOptions(options);
  375. if ((options === null || options === void 0 ? void 0 : options.throttling) === false) {
  376. this._rateLimiter = new rate_limiter_1.RateLimiter(Number.POSITIVE_INFINITY, Number.POSITIVE_INFINITY, Number.POSITIVE_INFINITY, Number.POSITIVE_INFINITY);
  377. }
  378. else {
  379. let startingRate = exports.DEFAULT_INITIAL_OPS_PER_SECOND_LIMIT;
  380. let maxRate = exports.DEFAULT_MAXIMUM_OPS_PER_SECOND_LIMIT;
  381. if (typeof (options === null || options === void 0 ? void 0 : options.throttling) !== 'boolean') {
  382. if (((_a = options === null || options === void 0 ? void 0 : options.throttling) === null || _a === void 0 ? void 0 : _a.maxOpsPerSecond) !== undefined) {
  383. maxRate = options.throttling.maxOpsPerSecond;
  384. }
  385. if (((_b = options === null || options === void 0 ? void 0 : options.throttling) === null || _b === void 0 ? void 0 : _b.initialOpsPerSecond) !== undefined) {
  386. startingRate = options.throttling.initialOpsPerSecond;
  387. }
  388. // The initial validation step ensures that the maxOpsPerSecond is
  389. // greater than initialOpsPerSecond. If this inequality is true, that
  390. // means initialOpsPerSecond was not set and maxOpsPerSecond is less
  391. // than the default starting rate.
  392. if (maxRate < startingRate) {
  393. startingRate = maxRate;
  394. }
  395. // Ensure that the batch size is not larger than the number of allowed
  396. // operations per second.
  397. if (startingRate < this._maxBatchSize) {
  398. this._maxBatchSize = startingRate;
  399. }
  400. }
  401. this._rateLimiter = new rate_limiter_1.RateLimiter(startingRate, RATE_LIMITER_MULTIPLIER, RATE_LIMITER_MULTIPLIER_MILLIS, maxRate);
  402. }
  403. }
  404. /**
  405. * Create a document with the provided data. This single operation will fail
  406. * if a document exists at its location.
  407. *
  408. * @param {DocumentReference} documentRef A reference to the document to be
  409. * created.
  410. * @param {T} data The object to serialize as the document.
  411. * @throws {Error} If the provided input is not a valid Firestore document.
  412. * @returns {Promise<WriteResult>} A promise that resolves with the result of
  413. * the write. If the write fails, the promise is rejected with a
  414. * [BulkWriterError]{@link BulkWriterError}.
  415. *
  416. * @example
  417. * ```
  418. * let bulkWriter = firestore.bulkWriter();
  419. * let documentRef = firestore.collection('col').doc();
  420. *
  421. * bulkWriter
  422. * .create(documentRef, {foo: 'bar'})
  423. * .then(result => {
  424. * console.log('Successfully executed write at: ', result);
  425. * })
  426. * .catch(err => {
  427. * console.log('Write failed with: ', err);
  428. * });
  429. * });
  430. * ```
  431. */
  432. create(documentRef, data) {
  433. this._verifyNotClosed();
  434. return this._enqueue(documentRef, 'create', bulkCommitBatch => bulkCommitBatch.create(documentRef, data));
  435. }
  436. /**
  437. * Delete a document from the database.
  438. *
  439. * @param {DocumentReference} documentRef A reference to the document to be
  440. * deleted.
  441. * @param {Precondition=} precondition A precondition to enforce for this
  442. * delete.
  443. * @param {Timestamp=} precondition.lastUpdateTime If set, enforces that the
  444. * document was last updated at lastUpdateTime. Fails the batch if the
  445. * document doesn't exist or was last updated at a different time.
  446. * @returns {Promise<WriteResult>} A promise that resolves with the result of
  447. * the delete. If the delete fails, the promise is rejected with a
  448. * [BulkWriterError]{@link BulkWriterError}.
  449. *
  450. * @example
  451. * ```
  452. * let bulkWriter = firestore.bulkWriter();
  453. * let documentRef = firestore.doc('col/doc');
  454. *
  455. * bulkWriter
  456. * .delete(documentRef)
  457. * .then(result => {
  458. * console.log('Successfully deleted document');
  459. * })
  460. * .catch(err => {
  461. * console.log('Delete failed with: ', err);
  462. * });
  463. * });
  464. * ```
  465. */
  466. delete(documentRef, precondition) {
  467. this._verifyNotClosed();
  468. return this._enqueue(documentRef, 'delete', bulkCommitBatch => bulkCommitBatch.delete(documentRef, precondition));
  469. }
  470. /**
  471. * Write to the document referred to by the provided
  472. * [DocumentReference]{@link DocumentReference}. If the document does not
  473. * exist yet, it will be created. If you pass [SetOptions]{@link SetOptions}.,
  474. * the provided data can be merged into the existing document.
  475. *
  476. * @param {DocumentReference} documentRef A reference to the document to be
  477. * set.
  478. * @param {T} data The object to serialize as the document.
  479. * @param {SetOptions=} options An object to configure the set behavior.
  480. * @throws {Error} If the provided input is not a valid Firestore document.
  481. * @param {boolean=} options.merge - If true, set() merges the values
  482. * specified in its data argument. Fields omitted from this set() call remain
  483. * untouched. If your input sets any field to an empty map, all nested fields
  484. * are overwritten.
  485. * @param {Array.<string|FieldPath>=} options.mergeFields - If provided, set()
  486. * only replaces the specified field paths. Any field path that is not
  487. * specified is ignored and remains untouched. If your input sets any field to
  488. * an empty map, all nested fields are overwritten.
  489. * @returns {Promise<WriteResult>} A promise that resolves with the result of
  490. * the write. If the write fails, the promise is rejected with a
  491. * [BulkWriterError]{@link BulkWriterError}.
  492. *
  493. *
  494. * @example
  495. * ```
  496. * let bulkWriter = firestore.bulkWriter();
  497. * let documentRef = firestore.collection('col').doc();
  498. *
  499. * bulkWriter
  500. * .set(documentRef, {foo: 'bar'})
  501. * .then(result => {
  502. * console.log('Successfully executed write at: ', result);
  503. * })
  504. * .catch(err => {
  505. * console.log('Write failed with: ', err);
  506. * });
  507. * });
  508. * ```
  509. */
  510. set(documentRef, data, options) {
  511. this._verifyNotClosed();
  512. return this._enqueue(documentRef, 'set', bulkCommitBatch => {
  513. if (options) {
  514. return bulkCommitBatch.set(documentRef, data, options);
  515. }
  516. else {
  517. return bulkCommitBatch.set(documentRef, data);
  518. }
  519. });
  520. }
  521. /**
  522. * Update fields of the document referred to by the provided
  523. * [DocumentReference]{@link DocumentReference}. If the document doesn't yet
  524. * exist, the update fails and the entire batch will be rejected.
  525. *
  526. * The update() method accepts either an object with field paths encoded as
  527. * keys and field values encoded as values, or a variable number of arguments
  528. * that alternate between field paths and field values. Nested fields can be
  529. * updated by providing dot-separated field path strings or by providing
  530. * FieldPath objects.
  531. *
  532. *
  533. * A Precondition restricting this update can be specified as the last
  534. * argument.
  535. *
  536. * @param {DocumentReference} documentRef A reference to the document to be
  537. * updated.
  538. * @param {UpdateData|string|FieldPath} dataOrField An object containing the
  539. * fields and values with which to update the document or the path of the
  540. * first field to update.
  541. * @param {...(Precondition|*|string|FieldPath)} preconditionOrValues - An
  542. * alternating list of field paths and values to update or a Precondition to
  543. * restrict this update
  544. * @throws {Error} If the provided input is not valid Firestore data.
  545. * @returns {Promise<WriteResult>} A promise that resolves with the result of
  546. * the write. If the write fails, the promise is rejected with a
  547. * [BulkWriterError]{@link BulkWriterError}.
  548. *
  549. * @example
  550. * ```
  551. * let bulkWriter = firestore.bulkWriter();
  552. * let documentRef = firestore.doc('col/doc');
  553. *
  554. * bulkWriter
  555. * .update(documentRef, {foo: 'bar'})
  556. * .then(result => {
  557. * console.log('Successfully executed write at: ', result);
  558. * })
  559. * .catch(err => {
  560. * console.log('Write failed with: ', err);
  561. * });
  562. * });
  563. * ```
  564. */
  565. update(documentRef, dataOrField, ...preconditionOrValues) {
  566. this._verifyNotClosed();
  567. return this._enqueue(documentRef, 'update', bulkCommitBatch => bulkCommitBatch.update(documentRef, dataOrField, ...preconditionOrValues));
  568. }
  569. /**
  570. * Callback function set by {@link BulkWriter#onWriteResult} that is run
  571. * every time a {@link BulkWriter} operation successfully completes.
  572. *
  573. * @callback BulkWriter~successCallback
  574. * @param {DocumentReference} documentRef The document reference the
  575. * operation was performed on
  576. * @param {WriteResult} result The server write time of the operation.
  577. */
  578. /**
  579. * Attaches a listener that is run every time a BulkWriter operation
  580. * successfully completes.
  581. *
  582. * @param {BulkWriter~successCallback} successCallback A callback to be
  583. * called every time a BulkWriter operation successfully completes.
  584. * @example
  585. * ```
  586. * let bulkWriter = firestore.bulkWriter();
  587. *
  588. * bulkWriter
  589. * .onWriteResult((documentRef, result) => {
  590. * console.log(
  591. * 'Successfully executed write on document: ',
  592. * documentRef,
  593. * ' at: ',
  594. * result
  595. * );
  596. * });
  597. * ```
  598. */
  599. onWriteResult(successCallback) {
  600. this._successFn = successCallback;
  601. }
  602. /**
  603. * Callback function set by {@link BulkWriter#onWriteError} that is run when
  604. * a write fails in order to determine whether {@link BulkWriter} should
  605. * retry the operation.
  606. *
  607. * @callback BulkWriter~shouldRetryCallback
  608. * @param {BulkWriterError} error The error object with information about the
  609. * operation and error.
  610. * @returns {boolean} Whether or not to retry the failed operation. Returning
  611. * `true` retries the operation. Returning `false` will stop the retry loop.
  612. */
  613. /**
  614. * Attaches an error handler listener that is run every time a BulkWriter
  615. * operation fails.
  616. *
  617. * BulkWriter has a default error handler that retries UNAVAILABLE and
  618. * ABORTED errors up to a maximum of 10 failed attempts. When an error
  619. * handler is specified, the default error handler will be overwritten.
  620. *
  621. * @param shouldRetryCallback {BulkWriter~shouldRetryCallback} A callback to
  622. * be called every time a BulkWriter operation fails. Returning `true` will
  623. * retry the operation. Returning `false` will stop the retry loop.
  624. * @example
  625. * ```
  626. * let bulkWriter = firestore.bulkWriter();
  627. *
  628. * bulkWriter
  629. * .onWriteError((error) => {
  630. * if (
  631. * error.code === GrpcStatus.UNAVAILABLE &&
  632. * error.failedAttempts < MAX_RETRY_ATTEMPTS
  633. * ) {
  634. * return true;
  635. * } else {
  636. * console.log('Failed write at document: ', error.documentRef);
  637. * return false;
  638. * }
  639. * });
  640. * ```
  641. */
  642. onWriteError(shouldRetryCallback) {
  643. this._errorHandlerSet = true;
  644. this._errorFn = shouldRetryCallback;
  645. }
  646. /**
  647. * Commits all writes that have been enqueued up to this point in parallel.
  648. *
  649. * Returns a Promise that resolves when all currently queued operations have
  650. * been committed. The Promise will never be rejected since the results for
  651. * each individual operation are conveyed via their individual Promises.
  652. *
  653. * The Promise resolves immediately if there are no pending writes. Otherwise,
  654. * the Promise waits for all previously issued writes, but it does not wait
  655. * for writes that were added after the method is called. If you want to wait
  656. * for additional writes, call `flush()` again.
  657. *
  658. * @return {Promise<void>} A promise that resolves when all enqueued writes
  659. * up to this point have been committed.
  660. *
  661. * @example
  662. * ```
  663. * let bulkWriter = firestore.bulkWriter();
  664. *
  665. * bulkWriter.create(documentRef, {foo: 'bar'});
  666. * bulkWriter.update(documentRef2, {foo: 'bar'});
  667. * bulkWriter.delete(documentRef3);
  668. * await flush().then(() => {
  669. * console.log('Executed all writes');
  670. * });
  671. * ```
  672. */
  673. flush() {
  674. this._verifyNotClosed();
  675. this._scheduleCurrentBatch(/* flush= */ true);
  676. // Mark the most recent operation as flushed to ensure that the batch
  677. // containing it will be sent once it's popped from the buffer.
  678. if (this._bufferedOperations.length > 0) {
  679. this._bufferedOperations[this._bufferedOperations.length - 1].operation.markFlushed();
  680. }
  681. return this._lastOp;
  682. }
  683. /**
  684. * Commits all enqueued writes and marks the BulkWriter instance as closed.
  685. *
  686. * After calling `close()`, calling any method will throw an error. Any
  687. * retries scheduled as part of an `onWriteError()` handler will be run
  688. * before the `close()` promise resolves.
  689. *
  690. * Returns a Promise that resolves when there are no more pending writes. The
  691. * Promise will never be rejected. Calling this method will send all requests.
  692. * The promise resolves immediately if there are no pending writes.
  693. *
  694. * @return {Promise<void>} A promise that resolves when all enqueued writes
  695. * up to this point have been committed.
  696. *
  697. * @example
  698. * ```
  699. * let bulkWriter = firestore.bulkWriter();
  700. *
  701. * bulkWriter.create(documentRef, {foo: 'bar'});
  702. * bulkWriter.update(documentRef2, {foo: 'bar'});
  703. * bulkWriter.delete(documentRef3);
  704. * await close().then(() => {
  705. * console.log('Executed all writes');
  706. * });
  707. * ```
  708. */
  709. close() {
  710. this._verifyNotClosed();
  711. this.firestore._decrementBulkWritersCount();
  712. const flushPromise = this.flush();
  713. this._closing = true;
  714. return flushPromise;
  715. }
  716. /**
  717. * Throws an error if the BulkWriter instance has been closed.
  718. * @private
  719. * @internal
  720. */
  721. _verifyNotClosed() {
  722. if (this._closing) {
  723. throw new Error('BulkWriter has already been closed.');
  724. }
  725. }
  726. /**
  727. * Sends the current batch and resets `this._bulkCommitBatch`.
  728. *
  729. * @param flush If provided, keeps re-sending operations until no more
  730. * operations are enqueued. This allows retries to resolve as part of a
  731. * `flush()` or `close()` call.
  732. * @private
  733. * @internal
  734. */
  735. _scheduleCurrentBatch(flush = false) {
  736. if (this._bulkCommitBatch._opCount === 0)
  737. return;
  738. const pendingBatch = this._bulkCommitBatch;
  739. this._bulkCommitBatch = new BulkCommitBatch(this.firestore, this._maxBatchSize);
  740. // Use the write with the longest backoff duration when determining backoff.
  741. const highestBackoffDuration = pendingBatch.pendingOps.reduce((prev, cur) => (prev.backoffDuration > cur.backoffDuration ? prev : cur)).backoffDuration;
  742. const backoffMsWithJitter = BulkWriter._applyJitter(highestBackoffDuration);
  743. const delayedExecution = new util_1.Deferred();
  744. if (backoffMsWithJitter > 0) {
  745. (0, backoff_1.delayExecution)(() => delayedExecution.resolve(), backoffMsWithJitter);
  746. }
  747. else {
  748. delayedExecution.resolve();
  749. }
  750. delayedExecution.promise.then(() => this._sendBatch(pendingBatch, flush));
  751. }
  752. /**
  753. * Sends the provided batch once the rate limiter does not require any delay.
  754. * @private
  755. * @internal
  756. */
  757. async _sendBatch(batch, flush = false) {
  758. const tag = (0, util_1.requestTag)();
  759. // Send the batch if it is does not require any delay, or schedule another
  760. // attempt after the appropriate timeout.
  761. const underRateLimit = this._rateLimiter.tryMakeRequest(batch._opCount);
  762. if (underRateLimit) {
  763. await batch.bulkCommit({ requestTag: tag });
  764. if (flush)
  765. this._scheduleCurrentBatch(flush);
  766. }
  767. else {
  768. const delayMs = this._rateLimiter.getNextRequestDelayMs(batch._opCount);
  769. (0, logger_1.logger)('BulkWriter._sendBatch', tag, `Backing off for ${delayMs} seconds`);
  770. (0, backoff_1.delayExecution)(() => this._sendBatch(batch, flush), delayMs);
  771. }
  772. }
  773. /**
  774. * Adds a 30% jitter to the provided backoff.
  775. *
  776. * @private
  777. * @internal
  778. */
  779. static _applyJitter(backoffMs) {
  780. if (backoffMs === 0)
  781. return 0;
  782. // Random value in [-0.3, 0.3].
  783. const jitter = exports.DEFAULT_JITTER_FACTOR * (Math.random() * 2 - 1);
  784. return Math.min(backoff_1.DEFAULT_BACKOFF_MAX_DELAY_MS, backoffMs + jitter * backoffMs);
  785. }
  786. /**
  787. * Schedules and runs the provided operation on the next available batch.
  788. * @private
  789. * @internal
  790. */
  791. _enqueue(ref, type, enqueueOnBatchCallback) {
  792. const bulkWriterOp = new BulkWriterOperation(ref, type, this._sendFn.bind(this, enqueueOnBatchCallback), this._errorFn.bind(this), this._successFn.bind(this));
  793. // Swallow the error if the developer has set an error listener. This
  794. // prevents UnhandledPromiseRejections from being thrown if a floating
  795. // BulkWriter operation promise fails when an error handler is specified.
  796. //
  797. // This is done here in order to chain the caught promise onto `lastOp`,
  798. // which ensures that flush() resolves after the operation promise.
  799. const userPromise = bulkWriterOp.promise.catch(err => {
  800. if (!this._errorHandlerSet) {
  801. throw err;
  802. }
  803. else {
  804. return bulkWriterOp.promise;
  805. }
  806. });
  807. // Advance the `_lastOp` pointer. This ensures that `_lastOp` only resolves
  808. // when both the previous and the current write resolve.
  809. this._lastOp = this._lastOp.then(() => (0, util_1.silencePromise)(userPromise));
  810. // Schedule the operation if the BulkWriter has fewer than the maximum
  811. // number of allowed pending operations, or add the operation to the
  812. // buffer.
  813. if (this._pendingOpsCount < this._maxPendingOpCount) {
  814. this._pendingOpsCount++;
  815. this._sendFn(enqueueOnBatchCallback, bulkWriterOp);
  816. }
  817. else {
  818. this._bufferedOperations.push(new BufferedOperation(bulkWriterOp, () => {
  819. this._pendingOpsCount++;
  820. this._sendFn(enqueueOnBatchCallback, bulkWriterOp);
  821. }));
  822. }
  823. // Chain the BulkWriter operation promise with the buffer processing logic
  824. // in order to ensure that it runs and that subsequent operations are
  825. // enqueued before the next batch is scheduled in `_sendBatch()`.
  826. return userPromise
  827. .then(res => {
  828. this._pendingOpsCount--;
  829. this._processBufferedOps();
  830. return res;
  831. })
  832. .catch(err => {
  833. this._pendingOpsCount--;
  834. this._processBufferedOps();
  835. throw err;
  836. });
  837. }
  838. /**
  839. * Manages the pending operation counter and schedules the next BulkWriter
  840. * operation if we're under the maximum limit.
  841. * @private
  842. * @internal
  843. */
  844. _processBufferedOps() {
  845. if (this._pendingOpsCount < this._maxPendingOpCount &&
  846. this._bufferedOperations.length > 0) {
  847. const nextOp = this._bufferedOperations.shift();
  848. nextOp.sendFn();
  849. }
  850. }
  851. /**
  852. * Schedules the provided operations on current BulkCommitBatch.
  853. * Sends the BulkCommitBatch if it reaches maximum capacity.
  854. *
  855. * @private
  856. * @internal
  857. */
  858. _sendFn(enqueueOnBatchCallback, op) {
  859. // A backoff duration greater than 0 implies that this batch is a retry.
  860. // Retried writes are sent with a batch size of 10 in order to guarantee
  861. // that the batch is under the 10MiB limit.
  862. if (op.backoffDuration > 0) {
  863. if (this._bulkCommitBatch.pendingOps.length >= exports.RETRY_MAX_BATCH_SIZE) {
  864. this._scheduleCurrentBatch(/* flush= */ false);
  865. }
  866. this._bulkCommitBatch.setMaxBatchSize(exports.RETRY_MAX_BATCH_SIZE);
  867. }
  868. if (this._bulkCommitBatch.has(op.ref)) {
  869. // Create a new batch since the backend doesn't support batches with two
  870. // writes to the same document.
  871. this._scheduleCurrentBatch();
  872. }
  873. enqueueOnBatchCallback(this._bulkCommitBatch);
  874. this._bulkCommitBatch.processLastOperation(op);
  875. if (this._bulkCommitBatch._opCount === this._bulkCommitBatch.maxBatchSize) {
  876. this._scheduleCurrentBatch();
  877. }
  878. else if (op.flushed) {
  879. // If flush() was called before this operation was enqueued into a batch,
  880. // we still need to schedule it.
  881. this._scheduleCurrentBatch(/* flush= */ true);
  882. }
  883. }
  884. }
  885. exports.BulkWriter = BulkWriter;
  886. /**
  887. * Validates the use of 'value' as BulkWriterOptions.
  888. *
  889. * @private
  890. * @internal
  891. * @param value The BulkWriterOptions object to validate.
  892. * @throws if the input is not a valid BulkWriterOptions object.
  893. */
  894. function validateBulkWriterOptions(value) {
  895. if ((0, validate_1.validateOptional)(value, { optional: true })) {
  896. return;
  897. }
  898. const argName = 'options';
  899. if (!(0, util_1.isObject)(value)) {
  900. throw new Error(`${(0, validate_1.invalidArgumentMessage)(argName, 'bulkWriter() options argument')} Input is not an object.`);
  901. }
  902. const options = value;
  903. if (options.throttling === undefined ||
  904. typeof options.throttling === 'boolean') {
  905. return;
  906. }
  907. if (options.throttling.initialOpsPerSecond !== undefined) {
  908. (0, validate_1.validateInteger)('initialOpsPerSecond', options.throttling.initialOpsPerSecond, {
  909. minValue: 1,
  910. });
  911. }
  912. if (options.throttling.maxOpsPerSecond !== undefined) {
  913. (0, validate_1.validateInteger)('maxOpsPerSecond', options.throttling.maxOpsPerSecond, {
  914. minValue: 1,
  915. });
  916. if (options.throttling.initialOpsPerSecond !== undefined &&
  917. options.throttling.initialOpsPerSecond >
  918. options.throttling.maxOpsPerSecond) {
  919. throw new Error(`${(0, validate_1.invalidArgumentMessage)(argName, 'bulkWriter() options argument')} "maxOpsPerSecond" cannot be less than "initialOpsPerSecond".`);
  920. }
  921. }
  922. }
  923. //# sourceMappingURL=bulk-writer.js.map