123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923 |
- "use strict";
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.BulkWriter = exports.BulkWriterError = exports.DEFAULT_JITTER_FACTOR = exports.DEFAULT_MAXIMUM_OPS_PER_SECOND_LIMIT = exports.DEFAULT_INITIAL_OPS_PER_SECOND_LIMIT = exports.RETRY_MAX_BATCH_SIZE = void 0;
- const assert = require("assert");
- const backoff_1 = require("./backoff");
- const rate_limiter_1 = require("./rate-limiter");
- const timestamp_1 = require("./timestamp");
- const util_1 = require("./util");
- const write_batch_1 = require("./write-batch");
- const validate_1 = require("./validate");
- const logger_1 = require("./logger");
- const trace_util_1 = require("./telemetry/trace-util");
- /*!
- * The maximum number of writes that can be in a single batch.
- */
- const MAX_BATCH_SIZE = 20;
- /*!
- * The maximum number of writes can be can in a single batch that is being retried.
- */
- exports.RETRY_MAX_BATCH_SIZE = 10;
- /*!
- * The starting maximum number of operations per second as allowed by the
- * 500/50/5 rule.
- *
- * https://firebase.google.com/docs/firestore/best-practices#ramping_up_traffic.
- */
- exports.DEFAULT_INITIAL_OPS_PER_SECOND_LIMIT = 500;
- /*!
- * The maximum number of operations per second as allowed by the 500/50/5 rule.
- * By default the rate limiter will not exceed this value.
- *
- * https://firebase.google.com/docs/firestore/best-practices#ramping_up_traffic.
- */
- exports.DEFAULT_MAXIMUM_OPS_PER_SECOND_LIMIT = 10000;
- /*!
- * The default jitter to apply to the exponential backoff used in retries. For
- * example, a factor of 0.3 means a 30% jitter is applied.
- */
- exports.DEFAULT_JITTER_FACTOR = 0.3;
- /*!
- * The rate by which to increase the capacity as specified by the 500/50/5 rule.
- */
- const RATE_LIMITER_MULTIPLIER = 1.5;
- /*!
- * How often the operations per second capacity should increase in milliseconds
- * as specified by the 500/50/5 rule.
- */
- const RATE_LIMITER_MULTIPLIER_MILLIS = 5 * 60 * 1000;
- /*!
- * The default maximum number of pending operations that can be enqueued onto a
- * BulkWriter instance. An operation is considered pending if BulkWriter has
- * sent it via RPC and is awaiting the result. BulkWriter buffers additional
- * writes after this many pending operations in order to avoiding going OOM.
- */
- const DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT = 500;
- /**
- * Represents a single write for BulkWriter, encapsulating operation dispatch
- * and error handling.
- * @private
- * @internal
- */
- class BulkWriterOperation {
- /**
- * @param ref The document reference being written to.
- * @param type The type of operation that created this write.
- * @param sendFn A callback to invoke when the operation should be sent.
- * @param errorFn The user provided global error callback.
- * @param successFn The user provided global success callback.
- */
- constructor(ref, type, sendFn, errorFn, successFn) {
- this.ref = ref;
- this.type = type;
- this.sendFn = sendFn;
- this.errorFn = errorFn;
- this.successFn = successFn;
- this.deferred = new util_1.Deferred();
- this.failedAttempts = 0;
- this._backoffDuration = 0;
- /** Whether flush() was called when this was the last enqueued operation. */
- this._flushed = false;
- }
- get promise() {
- return this.deferred.promise;
- }
- get backoffDuration() {
- return this._backoffDuration;
- }
- markFlushed() {
- this._flushed = true;
- }
- get flushed() {
- return this._flushed;
- }
- onError(error) {
- ++this.failedAttempts;
- try {
- const bulkWriterError = new BulkWriterError(error.code, error.message, this.ref, this.type, this.failedAttempts);
- const shouldRetry = this.errorFn(bulkWriterError);
- (0, logger_1.logger)('BulkWriter.errorFn', null, 'Ran error callback on error code:', error.code, ', shouldRetry:', shouldRetry, ' for document:', this.ref.path);
- if (shouldRetry) {
- this.lastStatus = error.code;
- this.updateBackoffDuration();
- this.sendFn(this);
- }
- else {
- this.deferred.reject(bulkWriterError);
- }
- }
- catch (userCallbackError) {
- this.deferred.reject(userCallbackError);
- }
- }
- updateBackoffDuration() {
- if (this.lastStatus === 8 /* StatusCode.RESOURCE_EXHAUSTED */) {
- this._backoffDuration = backoff_1.DEFAULT_BACKOFF_MAX_DELAY_MS;
- }
- else if (this._backoffDuration === 0) {
- this._backoffDuration = backoff_1.DEFAULT_BACKOFF_INITIAL_DELAY_MS;
- }
- else {
- this._backoffDuration *= backoff_1.DEFAULT_BACKOFF_FACTOR;
- }
- }
- onSuccess(result) {
- try {
- this.successFn(this.ref, result);
- this.deferred.resolve(result);
- }
- catch (userCallbackError) {
- this.deferred.reject(userCallbackError);
- }
- }
- }
- /**
- * Used to represent a batch on the BatchQueue.
- *
- * @private
- * @internal
- */
- class BulkCommitBatch extends write_batch_1.WriteBatch {
- constructor(firestore, maxBatchSize) {
- super(firestore);
- // The set of document reference paths present in the WriteBatch.
- this.docPaths = new Set();
- // An array of pending write operations. Only contains writes that have not
- // been resolved.
- this.pendingOps = [];
- this._maxBatchSize = maxBatchSize;
- }
- get maxBatchSize() {
- return this._maxBatchSize;
- }
- setMaxBatchSize(size) {
- assert(this.pendingOps.length <= size, 'New batch size cannot be less than the number of enqueued writes');
- this._maxBatchSize = size;
- }
- has(documentRef) {
- return this.docPaths.has(documentRef.path);
- }
- async bulkCommit(options = {}) {
- return this._firestore._traceUtil.startActiveSpan(trace_util_1.SPAN_NAME_BULK_WRITER_COMMIT, async () => {
- var _a;
- const tag = (_a = options === null || options === void 0 ? void 0 : options.requestTag) !== null && _a !== void 0 ? _a : (0, util_1.requestTag)();
- // Capture the error stack to preserve stack tracing across async calls.
- const stack = Error().stack;
- let response;
- try {
- (0, logger_1.logger)('BulkCommitBatch.bulkCommit', tag, `Sending next batch with ${this._opCount} writes`);
- const retryCodes = (0, util_1.getRetryCodes)('batchWrite');
- response = await this._commit({ retryCodes, methodName: 'batchWrite', requestTag: tag });
- }
- catch (err) {
- // Map the failure to each individual write's result.
- const ops = Array.from({ length: this.pendingOps.length });
- response = {
- writeResults: ops.map(() => {
- return {};
- }),
- status: ops.map(() => err),
- };
- }
- for (let i = 0; i < (response.writeResults || []).length; ++i) {
- // Since delete operations currently do not have write times, use a
- // sentinel Timestamp value.
- // TODO(b/158502664): Use actual delete timestamp.
- const DELETE_TIMESTAMP_SENTINEL = timestamp_1.Timestamp.fromMillis(0);
- const status = (response.status || [])[i];
- if (status.code === 0 /* StatusCode.OK */) {
- const updateTime = timestamp_1.Timestamp.fromProto(response.writeResults[i].updateTime || DELETE_TIMESTAMP_SENTINEL);
- this.pendingOps[i].onSuccess(new write_batch_1.WriteResult(updateTime));
- }
- else {
- const error = new (require('google-gax/build/src/fallback').GoogleError)(status.message || undefined);
- error.code = status.code;
- this.pendingOps[i].onError((0, util_1.wrapError)(error, stack));
- }
- }
- }, {
- [trace_util_1.ATTRIBUTE_KEY_DOC_COUNT]: this._opCount,
- });
- }
- /**
- * Helper to update data structures associated with the operation and returns
- * the result.
- */
- processLastOperation(op) {
- assert(!this.docPaths.has(op.ref.path), 'Batch should not contain writes to the same document');
- this.docPaths.add(op.ref.path);
- this.pendingOps.push(op);
- }
- }
- /**
- * Used to represent a buffered BulkWriterOperation.
- *
- * @private
- * @internal
- */
- class BufferedOperation {
- constructor(operation, sendFn) {
- this.operation = operation;
- this.sendFn = sendFn;
- }
- }
- /**
- * The error thrown when a BulkWriter operation fails.
- *
- * @class BulkWriterError
- */
- class BulkWriterError extends Error {
- /**
- * @private
- * @internal
- */
- constructor(
- /** The status code of the error. */
- code,
- /** The error message of the error. */
- message,
- /** The document reference the operation was performed on. */
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- documentRef,
- /** The type of operation performed. */
- operationType,
- /** How many times this operation has been attempted unsuccessfully. */
- failedAttempts) {
- super(message);
- this.code = code;
- this.message = message;
- this.documentRef = documentRef;
- this.operationType = operationType;
- this.failedAttempts = failedAttempts;
- }
- }
- exports.BulkWriterError = BulkWriterError;
- /**
- * A Firestore BulkWriter that can be used to perform a large number of writes
- * in parallel.
- *
- * @class BulkWriter
- */
- class BulkWriter {
- // Visible for testing.
- /**
- * @private
- * @internal
- */
- _getBufferedOperationsCount() {
- return this._bufferedOperations.length;
- }
- // Visible for testing.
- /**
- * @private
- * @internal
- */
- _setMaxBatchSize(size) {
- assert(this._bulkCommitBatch.pendingOps.length === 0, 'BulkCommitBatch should be empty');
- this._maxBatchSize = size;
- this._bulkCommitBatch = new BulkCommitBatch(this.firestore, size);
- }
- // Visible for testing.
- /**
- * @private
- * @internal
- */
- _setMaxPendingOpCount(newMax) {
- this._maxPendingOpCount = newMax;
- }
- /** @private */
- constructor(firestore, options) {
- var _a, _b;
- this.firestore = firestore;
- /**
- * The maximum number of writes that can be in a single batch.
- * Visible for testing.
- * @private
- * @internal
- */
- this._maxBatchSize = MAX_BATCH_SIZE;
- /**
- * The batch that is currently used to schedule operations. Once this batch
- * reaches maximum capacity, a new batch is created.
- * @private
- * @internal
- */
- this._bulkCommitBatch = new BulkCommitBatch(this.firestore, this._maxBatchSize);
- /**
- * A pointer to the tail of all active BulkWriter operations. This pointer
- * is advanced every time a new write is enqueued.
- * @private
- * @internal
- */
- this._lastOp = Promise.resolve();
- /**
- * Whether this BulkWriter instance has started to close. Afterwards, no
- * new operations can be enqueued, except for retry operations scheduled by
- * the error handler.
- * @private
- * @internal
- */
- this._closing = false;
- /**
- * The number of pending operations enqueued on this BulkWriter instance.
- * An operation is considered pending if BulkWriter has sent it via RPC and
- * is awaiting the result.
- * @private
- * @internal
- */
- this._pendingOpsCount = 0;
- /**
- * An array containing buffered BulkWriter operations after the maximum number
- * of pending operations has been enqueued.
- * @private
- * @internal
- */
- this._bufferedOperations = [];
- /**
- * Whether a custom error handler has been set. BulkWriter only swallows
- * errors if an error handler is set. Otherwise, an UnhandledPromiseRejection
- * is thrown by Node if an operation promise is rejected without being
- * handled.
- * @private
- * @internal
- */
- this._errorHandlerSet = false;
- /**
- * The maximum number of pending operations that can be enqueued onto this
- * BulkWriter instance. Once the this number of writes have been enqueued,
- * subsequent writes are buffered.
- * @private
- * @internal
- */
- this._maxPendingOpCount = DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT;
- /**
- * The user-provided callback to be run every time a BulkWriter operation
- * successfully completes.
- * @private
- * @internal
- */
- this._successFn = () => { };
- /**
- * The user-provided callback to be run every time a BulkWriter operation
- * fails.
- * @private
- * @internal
- */
- this._errorFn = error => {
- const isRetryableDeleteError = error.operationType === 'delete' &&
- error.code === 13 /* StatusCode.INTERNAL */;
- const retryCodes = (0, util_1.getRetryCodes)('batchWrite');
- return ((retryCodes.includes(error.code) || isRetryableDeleteError) &&
- error.failedAttempts < backoff_1.MAX_RETRY_ATTEMPTS);
- };
- this.firestore._incrementBulkWritersCount();
- validateBulkWriterOptions(options);
- if ((options === null || options === void 0 ? void 0 : options.throttling) === false) {
- this._rateLimiter = new rate_limiter_1.RateLimiter(Number.POSITIVE_INFINITY, Number.POSITIVE_INFINITY, Number.POSITIVE_INFINITY, Number.POSITIVE_INFINITY);
- }
- else {
- let startingRate = exports.DEFAULT_INITIAL_OPS_PER_SECOND_LIMIT;
- let maxRate = exports.DEFAULT_MAXIMUM_OPS_PER_SECOND_LIMIT;
- if (typeof (options === null || options === void 0 ? void 0 : options.throttling) !== 'boolean') {
- if (((_a = options === null || options === void 0 ? void 0 : options.throttling) === null || _a === void 0 ? void 0 : _a.maxOpsPerSecond) !== undefined) {
- maxRate = options.throttling.maxOpsPerSecond;
- }
- if (((_b = options === null || options === void 0 ? void 0 : options.throttling) === null || _b === void 0 ? void 0 : _b.initialOpsPerSecond) !== undefined) {
- startingRate = options.throttling.initialOpsPerSecond;
- }
- // The initial validation step ensures that the maxOpsPerSecond is
- // greater than initialOpsPerSecond. If this inequality is true, that
- // means initialOpsPerSecond was not set and maxOpsPerSecond is less
- // than the default starting rate.
- if (maxRate < startingRate) {
- startingRate = maxRate;
- }
- // Ensure that the batch size is not larger than the number of allowed
- // operations per second.
- if (startingRate < this._maxBatchSize) {
- this._maxBatchSize = startingRate;
- }
- }
- this._rateLimiter = new rate_limiter_1.RateLimiter(startingRate, RATE_LIMITER_MULTIPLIER, RATE_LIMITER_MULTIPLIER_MILLIS, maxRate);
- }
- }
- /**
- * Create a document with the provided data. This single operation will fail
- * if a document exists at its location.
- *
- * @param {DocumentReference} documentRef A reference to the document to be
- * created.
- * @param {T} data The object to serialize as the document.
- * @throws {Error} If the provided input is not a valid Firestore document.
- * @returns {Promise<WriteResult>} A promise that resolves with the result of
- * the write. If the write fails, the promise is rejected with a
- * [BulkWriterError]{@link BulkWriterError}.
- *
- * @example
- * ```
- * let bulkWriter = firestore.bulkWriter();
- * let documentRef = firestore.collection('col').doc();
- *
- * bulkWriter
- * .create(documentRef, {foo: 'bar'})
- * .then(result => {
- * console.log('Successfully executed write at: ', result);
- * })
- * .catch(err => {
- * console.log('Write failed with: ', err);
- * });
- * });
- * ```
- */
- create(documentRef, data) {
- this._verifyNotClosed();
- return this._enqueue(documentRef, 'create', bulkCommitBatch => bulkCommitBatch.create(documentRef, data));
- }
- /**
- * Delete a document from the database.
- *
- * @param {DocumentReference} documentRef A reference to the document to be
- * deleted.
- * @param {Precondition=} precondition A precondition to enforce for this
- * delete.
- * @param {Timestamp=} precondition.lastUpdateTime If set, enforces that the
- * document was last updated at lastUpdateTime. Fails the batch if the
- * document doesn't exist or was last updated at a different time.
- * @returns {Promise<WriteResult>} A promise that resolves with the result of
- * the delete. If the delete fails, the promise is rejected with a
- * [BulkWriterError]{@link BulkWriterError}.
- *
- * @example
- * ```
- * let bulkWriter = firestore.bulkWriter();
- * let documentRef = firestore.doc('col/doc');
- *
- * bulkWriter
- * .delete(documentRef)
- * .then(result => {
- * console.log('Successfully deleted document');
- * })
- * .catch(err => {
- * console.log('Delete failed with: ', err);
- * });
- * });
- * ```
- */
- delete(documentRef, precondition) {
- this._verifyNotClosed();
- return this._enqueue(documentRef, 'delete', bulkCommitBatch => bulkCommitBatch.delete(documentRef, precondition));
- }
- /**
- * Write to the document referred to by the provided
- * [DocumentReference]{@link DocumentReference}. If the document does not
- * exist yet, it will be created. If you pass [SetOptions]{@link SetOptions}.,
- * the provided data can be merged into the existing document.
- *
- * @param {DocumentReference} documentRef A reference to the document to be
- * set.
- * @param {T} data The object to serialize as the document.
- * @param {SetOptions=} options An object to configure the set behavior.
- * @throws {Error} If the provided input is not a valid Firestore document.
- * @param {boolean=} options.merge - If true, set() merges the values
- * specified in its data argument. Fields omitted from this set() call remain
- * untouched. If your input sets any field to an empty map, all nested fields
- * are overwritten.
- * @param {Array.<string|FieldPath>=} options.mergeFields - If provided, set()
- * only replaces the specified field paths. Any field path that is not
- * specified is ignored and remains untouched. If your input sets any field to
- * an empty map, all nested fields are overwritten.
- * @returns {Promise<WriteResult>} A promise that resolves with the result of
- * the write. If the write fails, the promise is rejected with a
- * [BulkWriterError]{@link BulkWriterError}.
- *
- *
- * @example
- * ```
- * let bulkWriter = firestore.bulkWriter();
- * let documentRef = firestore.collection('col').doc();
- *
- * bulkWriter
- * .set(documentRef, {foo: 'bar'})
- * .then(result => {
- * console.log('Successfully executed write at: ', result);
- * })
- * .catch(err => {
- * console.log('Write failed with: ', err);
- * });
- * });
- * ```
- */
- set(documentRef, data, options) {
- this._verifyNotClosed();
- return this._enqueue(documentRef, 'set', bulkCommitBatch => {
- if (options) {
- return bulkCommitBatch.set(documentRef, data, options);
- }
- else {
- return bulkCommitBatch.set(documentRef, data);
- }
- });
- }
- /**
- * Update fields of the document referred to by the provided
- * [DocumentReference]{@link DocumentReference}. If the document doesn't yet
- * exist, the update fails and the entire batch will be rejected.
- *
- * The update() method accepts either an object with field paths encoded as
- * keys and field values encoded as values, or a variable number of arguments
- * that alternate between field paths and field values. Nested fields can be
- * updated by providing dot-separated field path strings or by providing
- * FieldPath objects.
- *
- *
- * A Precondition restricting this update can be specified as the last
- * argument.
- *
- * @param {DocumentReference} documentRef A reference to the document to be
- * updated.
- * @param {UpdateData|string|FieldPath} dataOrField An object containing the
- * fields and values with which to update the document or the path of the
- * first field to update.
- * @param {...(Precondition|*|string|FieldPath)} preconditionOrValues - An
- * alternating list of field paths and values to update or a Precondition to
- * restrict this update
- * @throws {Error} If the provided input is not valid Firestore data.
- * @returns {Promise<WriteResult>} A promise that resolves with the result of
- * the write. If the write fails, the promise is rejected with a
- * [BulkWriterError]{@link BulkWriterError}.
- *
- * @example
- * ```
- * let bulkWriter = firestore.bulkWriter();
- * let documentRef = firestore.doc('col/doc');
- *
- * bulkWriter
- * .update(documentRef, {foo: 'bar'})
- * .then(result => {
- * console.log('Successfully executed write at: ', result);
- * })
- * .catch(err => {
- * console.log('Write failed with: ', err);
- * });
- * });
- * ```
- */
- update(documentRef, dataOrField, ...preconditionOrValues) {
- this._verifyNotClosed();
- return this._enqueue(documentRef, 'update', bulkCommitBatch => bulkCommitBatch.update(documentRef, dataOrField, ...preconditionOrValues));
- }
- /**
- * Callback function set by {@link BulkWriter#onWriteResult} that is run
- * every time a {@link BulkWriter} operation successfully completes.
- *
- * @callback BulkWriter~successCallback
- * @param {DocumentReference} documentRef The document reference the
- * operation was performed on
- * @param {WriteResult} result The server write time of the operation.
- */
- /**
- * Attaches a listener that is run every time a BulkWriter operation
- * successfully completes.
- *
- * @param {BulkWriter~successCallback} successCallback A callback to be
- * called every time a BulkWriter operation successfully completes.
- * @example
- * ```
- * let bulkWriter = firestore.bulkWriter();
- *
- * bulkWriter
- * .onWriteResult((documentRef, result) => {
- * console.log(
- * 'Successfully executed write on document: ',
- * documentRef,
- * ' at: ',
- * result
- * );
- * });
- * ```
- */
- onWriteResult(successCallback) {
- this._successFn = successCallback;
- }
- /**
- * Callback function set by {@link BulkWriter#onWriteError} that is run when
- * a write fails in order to determine whether {@link BulkWriter} should
- * retry the operation.
- *
- * @callback BulkWriter~shouldRetryCallback
- * @param {BulkWriterError} error The error object with information about the
- * operation and error.
- * @returns {boolean} Whether or not to retry the failed operation. Returning
- * `true` retries the operation. Returning `false` will stop the retry loop.
- */
- /**
- * Attaches an error handler listener that is run every time a BulkWriter
- * operation fails.
- *
- * BulkWriter has a default error handler that retries UNAVAILABLE and
- * ABORTED errors up to a maximum of 10 failed attempts. When an error
- * handler is specified, the default error handler will be overwritten.
- *
- * @param shouldRetryCallback {BulkWriter~shouldRetryCallback} A callback to
- * be called every time a BulkWriter operation fails. Returning `true` will
- * retry the operation. Returning `false` will stop the retry loop.
- * @example
- * ```
- * let bulkWriter = firestore.bulkWriter();
- *
- * bulkWriter
- * .onWriteError((error) => {
- * if (
- * error.code === GrpcStatus.UNAVAILABLE &&
- * error.failedAttempts < MAX_RETRY_ATTEMPTS
- * ) {
- * return true;
- * } else {
- * console.log('Failed write at document: ', error.documentRef);
- * return false;
- * }
- * });
- * ```
- */
- onWriteError(shouldRetryCallback) {
- this._errorHandlerSet = true;
- this._errorFn = shouldRetryCallback;
- }
- /**
- * Commits all writes that have been enqueued up to this point in parallel.
- *
- * Returns a Promise that resolves when all currently queued operations have
- * been committed. The Promise will never be rejected since the results for
- * each individual operation are conveyed via their individual Promises.
- *
- * The Promise resolves immediately if there are no pending writes. Otherwise,
- * the Promise waits for all previously issued writes, but it does not wait
- * for writes that were added after the method is called. If you want to wait
- * for additional writes, call `flush()` again.
- *
- * @return {Promise<void>} A promise that resolves when all enqueued writes
- * up to this point have been committed.
- *
- * @example
- * ```
- * let bulkWriter = firestore.bulkWriter();
- *
- * bulkWriter.create(documentRef, {foo: 'bar'});
- * bulkWriter.update(documentRef2, {foo: 'bar'});
- * bulkWriter.delete(documentRef3);
- * await flush().then(() => {
- * console.log('Executed all writes');
- * });
- * ```
- */
- flush() {
- this._verifyNotClosed();
- this._scheduleCurrentBatch(/* flush= */ true);
- // Mark the most recent operation as flushed to ensure that the batch
- // containing it will be sent once it's popped from the buffer.
- if (this._bufferedOperations.length > 0) {
- this._bufferedOperations[this._bufferedOperations.length - 1].operation.markFlushed();
- }
- return this._lastOp;
- }
- /**
- * Commits all enqueued writes and marks the BulkWriter instance as closed.
- *
- * After calling `close()`, calling any method will throw an error. Any
- * retries scheduled as part of an `onWriteError()` handler will be run
- * before the `close()` promise resolves.
- *
- * Returns a Promise that resolves when there are no more pending writes. The
- * Promise will never be rejected. Calling this method will send all requests.
- * The promise resolves immediately if there are no pending writes.
- *
- * @return {Promise<void>} A promise that resolves when all enqueued writes
- * up to this point have been committed.
- *
- * @example
- * ```
- * let bulkWriter = firestore.bulkWriter();
- *
- * bulkWriter.create(documentRef, {foo: 'bar'});
- * bulkWriter.update(documentRef2, {foo: 'bar'});
- * bulkWriter.delete(documentRef3);
- * await close().then(() => {
- * console.log('Executed all writes');
- * });
- * ```
- */
- close() {
- this._verifyNotClosed();
- this.firestore._decrementBulkWritersCount();
- const flushPromise = this.flush();
- this._closing = true;
- return flushPromise;
- }
- /**
- * Throws an error if the BulkWriter instance has been closed.
- * @private
- * @internal
- */
- _verifyNotClosed() {
- if (this._closing) {
- throw new Error('BulkWriter has already been closed.');
- }
- }
- /**
- * Sends the current batch and resets `this._bulkCommitBatch`.
- *
- * @param flush If provided, keeps re-sending operations until no more
- * operations are enqueued. This allows retries to resolve as part of a
- * `flush()` or `close()` call.
- * @private
- * @internal
- */
- _scheduleCurrentBatch(flush = false) {
- if (this._bulkCommitBatch._opCount === 0)
- return;
- const pendingBatch = this._bulkCommitBatch;
- this._bulkCommitBatch = new BulkCommitBatch(this.firestore, this._maxBatchSize);
- // Use the write with the longest backoff duration when determining backoff.
- const highestBackoffDuration = pendingBatch.pendingOps.reduce((prev, cur) => (prev.backoffDuration > cur.backoffDuration ? prev : cur)).backoffDuration;
- const backoffMsWithJitter = BulkWriter._applyJitter(highestBackoffDuration);
- const delayedExecution = new util_1.Deferred();
- if (backoffMsWithJitter > 0) {
- (0, backoff_1.delayExecution)(() => delayedExecution.resolve(), backoffMsWithJitter);
- }
- else {
- delayedExecution.resolve();
- }
- delayedExecution.promise.then(() => this._sendBatch(pendingBatch, flush));
- }
- /**
- * Sends the provided batch once the rate limiter does not require any delay.
- * @private
- * @internal
- */
- async _sendBatch(batch, flush = false) {
- const tag = (0, util_1.requestTag)();
- // Send the batch if it is does not require any delay, or schedule another
- // attempt after the appropriate timeout.
- const underRateLimit = this._rateLimiter.tryMakeRequest(batch._opCount);
- if (underRateLimit) {
- await batch.bulkCommit({ requestTag: tag });
- if (flush)
- this._scheduleCurrentBatch(flush);
- }
- else {
- const delayMs = this._rateLimiter.getNextRequestDelayMs(batch._opCount);
- (0, logger_1.logger)('BulkWriter._sendBatch', tag, `Backing off for ${delayMs} seconds`);
- (0, backoff_1.delayExecution)(() => this._sendBatch(batch, flush), delayMs);
- }
- }
- /**
- * Adds a 30% jitter to the provided backoff.
- *
- * @private
- * @internal
- */
- static _applyJitter(backoffMs) {
- if (backoffMs === 0)
- return 0;
- // Random value in [-0.3, 0.3].
- const jitter = exports.DEFAULT_JITTER_FACTOR * (Math.random() * 2 - 1);
- return Math.min(backoff_1.DEFAULT_BACKOFF_MAX_DELAY_MS, backoffMs + jitter * backoffMs);
- }
- /**
- * Schedules and runs the provided operation on the next available batch.
- * @private
- * @internal
- */
- _enqueue(ref, type, enqueueOnBatchCallback) {
- const bulkWriterOp = new BulkWriterOperation(ref, type, this._sendFn.bind(this, enqueueOnBatchCallback), this._errorFn.bind(this), this._successFn.bind(this));
- // Swallow the error if the developer has set an error listener. This
- // prevents UnhandledPromiseRejections from being thrown if a floating
- // BulkWriter operation promise fails when an error handler is specified.
- //
- // This is done here in order to chain the caught promise onto `lastOp`,
- // which ensures that flush() resolves after the operation promise.
- const userPromise = bulkWriterOp.promise.catch(err => {
- if (!this._errorHandlerSet) {
- throw err;
- }
- else {
- return bulkWriterOp.promise;
- }
- });
- // Advance the `_lastOp` pointer. This ensures that `_lastOp` only resolves
- // when both the previous and the current write resolve.
- this._lastOp = this._lastOp.then(() => (0, util_1.silencePromise)(userPromise));
- // Schedule the operation if the BulkWriter has fewer than the maximum
- // number of allowed pending operations, or add the operation to the
- // buffer.
- if (this._pendingOpsCount < this._maxPendingOpCount) {
- this._pendingOpsCount++;
- this._sendFn(enqueueOnBatchCallback, bulkWriterOp);
- }
- else {
- this._bufferedOperations.push(new BufferedOperation(bulkWriterOp, () => {
- this._pendingOpsCount++;
- this._sendFn(enqueueOnBatchCallback, bulkWriterOp);
- }));
- }
- // Chain the BulkWriter operation promise with the buffer processing logic
- // in order to ensure that it runs and that subsequent operations are
- // enqueued before the next batch is scheduled in `_sendBatch()`.
- return userPromise
- .then(res => {
- this._pendingOpsCount--;
- this._processBufferedOps();
- return res;
- })
- .catch(err => {
- this._pendingOpsCount--;
- this._processBufferedOps();
- throw err;
- });
- }
- /**
- * Manages the pending operation counter and schedules the next BulkWriter
- * operation if we're under the maximum limit.
- * @private
- * @internal
- */
- _processBufferedOps() {
- if (this._pendingOpsCount < this._maxPendingOpCount &&
- this._bufferedOperations.length > 0) {
- const nextOp = this._bufferedOperations.shift();
- nextOp.sendFn();
- }
- }
- /**
- * Schedules the provided operations on current BulkCommitBatch.
- * Sends the BulkCommitBatch if it reaches maximum capacity.
- *
- * @private
- * @internal
- */
- _sendFn(enqueueOnBatchCallback, op) {
- // A backoff duration greater than 0 implies that this batch is a retry.
- // Retried writes are sent with a batch size of 10 in order to guarantee
- // that the batch is under the 10MiB limit.
- if (op.backoffDuration > 0) {
- if (this._bulkCommitBatch.pendingOps.length >= exports.RETRY_MAX_BATCH_SIZE) {
- this._scheduleCurrentBatch(/* flush= */ false);
- }
- this._bulkCommitBatch.setMaxBatchSize(exports.RETRY_MAX_BATCH_SIZE);
- }
- if (this._bulkCommitBatch.has(op.ref)) {
- // Create a new batch since the backend doesn't support batches with two
- // writes to the same document.
- this._scheduleCurrentBatch();
- }
- enqueueOnBatchCallback(this._bulkCommitBatch);
- this._bulkCommitBatch.processLastOperation(op);
- if (this._bulkCommitBatch._opCount === this._bulkCommitBatch.maxBatchSize) {
- this._scheduleCurrentBatch();
- }
- else if (op.flushed) {
- // If flush() was called before this operation was enqueued into a batch,
- // we still need to schedule it.
- this._scheduleCurrentBatch(/* flush= */ true);
- }
- }
- }
- exports.BulkWriter = BulkWriter;
- /**
- * Validates the use of 'value' as BulkWriterOptions.
- *
- * @private
- * @internal
- * @param value The BulkWriterOptions object to validate.
- * @throws if the input is not a valid BulkWriterOptions object.
- */
- function validateBulkWriterOptions(value) {
- if ((0, validate_1.validateOptional)(value, { optional: true })) {
- return;
- }
- const argName = 'options';
- if (!(0, util_1.isObject)(value)) {
- throw new Error(`${(0, validate_1.invalidArgumentMessage)(argName, 'bulkWriter() options argument')} Input is not an object.`);
- }
- const options = value;
- if (options.throttling === undefined ||
- typeof options.throttling === 'boolean') {
- return;
- }
- if (options.throttling.initialOpsPerSecond !== undefined) {
- (0, validate_1.validateInteger)('initialOpsPerSecond', options.throttling.initialOpsPerSecond, {
- minValue: 1,
- });
- }
- if (options.throttling.maxOpsPerSecond !== undefined) {
- (0, validate_1.validateInteger)('maxOpsPerSecond', options.throttling.maxOpsPerSecond, {
- minValue: 1,
- });
- if (options.throttling.initialOpsPerSecond !== undefined &&
- options.throttling.initialOpsPerSecond >
- options.throttling.maxOpsPerSecond) {
- throw new Error(`${(0, validate_1.invalidArgumentMessage)(argName, 'bulkWriter() options argument')} "maxOpsPerSecond" cannot be less than "initialOpsPerSecond".`);
- }
- }
- }
- //# sourceMappingURL=bulk-writer.js.map
|