commands-queue.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. "use strict";
  2. var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) {
  3. if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter");
  4. if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it");
  5. return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver);
  6. };
  7. var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, state, value, kind, f) {
  8. if (kind === "m") throw new TypeError("Private method is not writable");
  9. if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a setter");
  10. if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot write private member to an object whose class did not declare it");
  11. return (kind === "a" ? f.call(receiver, value) : f ? f.value = value : state.set(receiver, value)), value;
  12. };
  13. var _RedisCommandsQueue_instances, _a, _RedisCommandsQueue_flushQueue, _RedisCommandsQueue_maxLength, _RedisCommandsQueue_waitingToBeSent, _RedisCommandsQueue_waitingForReply, _RedisCommandsQueue_onShardedChannelMoved, _RedisCommandsQueue_pubSub, _RedisCommandsQueue_chainInExecution, _RedisCommandsQueue_decoder, _RedisCommandsQueue_pushPubSubCommand;
  14. Object.defineProperty(exports, "__esModule", { value: true });
  15. const LinkedList = require("yallist");
  16. const errors_1 = require("../errors");
  17. const decoder_1 = require("./RESP2/decoder");
  18. const encoder_1 = require("./RESP2/encoder");
  19. const pub_sub_1 = require("./pub-sub");
  20. const PONG = Buffer.from('pong');
  21. class RedisCommandsQueue {
  22. get isPubSubActive() {
  23. return __classPrivateFieldGet(this, _RedisCommandsQueue_pubSub, "f").isActive;
  24. }
  25. constructor(maxLength, onShardedChannelMoved) {
  26. _RedisCommandsQueue_instances.add(this);
  27. _RedisCommandsQueue_maxLength.set(this, void 0);
  28. _RedisCommandsQueue_waitingToBeSent.set(this, new LinkedList());
  29. _RedisCommandsQueue_waitingForReply.set(this, new LinkedList());
  30. _RedisCommandsQueue_onShardedChannelMoved.set(this, void 0);
  31. _RedisCommandsQueue_pubSub.set(this, new pub_sub_1.PubSub());
  32. _RedisCommandsQueue_chainInExecution.set(this, void 0);
  33. _RedisCommandsQueue_decoder.set(this, new decoder_1.default({
  34. returnStringsAsBuffers: () => {
  35. return !!__classPrivateFieldGet(this, _RedisCommandsQueue_waitingForReply, "f").head?.value.returnBuffers ||
  36. __classPrivateFieldGet(this, _RedisCommandsQueue_pubSub, "f").isActive;
  37. },
  38. onReply: reply => {
  39. if (__classPrivateFieldGet(this, _RedisCommandsQueue_pubSub, "f").isActive && Array.isArray(reply)) {
  40. if (__classPrivateFieldGet(this, _RedisCommandsQueue_pubSub, "f").handleMessageReply(reply))
  41. return;
  42. const isShardedUnsubscribe = pub_sub_1.PubSub.isShardedUnsubscribe(reply);
  43. if (isShardedUnsubscribe && !__classPrivateFieldGet(this, _RedisCommandsQueue_waitingForReply, "f").length) {
  44. const channel = reply[1].toString();
  45. __classPrivateFieldGet(this, _RedisCommandsQueue_onShardedChannelMoved, "f").call(this, channel, __classPrivateFieldGet(this, _RedisCommandsQueue_pubSub, "f").removeShardedListeners(channel));
  46. return;
  47. }
  48. else if (isShardedUnsubscribe || pub_sub_1.PubSub.isStatusReply(reply)) {
  49. const head = __classPrivateFieldGet(this, _RedisCommandsQueue_waitingForReply, "f").head.value;
  50. if ((Number.isNaN(head.channelsCounter) && reply[2] === 0) ||
  51. --head.channelsCounter === 0) {
  52. __classPrivateFieldGet(this, _RedisCommandsQueue_waitingForReply, "f").shift().resolve();
  53. }
  54. return;
  55. }
  56. if (PONG.equals(reply[0])) {
  57. const { resolve, returnBuffers } = __classPrivateFieldGet(this, _RedisCommandsQueue_waitingForReply, "f").shift(), buffer = (reply[1].length === 0 ? reply[0] : reply[1]);
  58. resolve(returnBuffers ? buffer : buffer.toString());
  59. return;
  60. }
  61. }
  62. const { resolve, reject } = __classPrivateFieldGet(this, _RedisCommandsQueue_waitingForReply, "f").shift();
  63. if (reply instanceof errors_1.ErrorReply) {
  64. reject(reply);
  65. }
  66. else {
  67. resolve(reply);
  68. }
  69. }
  70. }));
  71. __classPrivateFieldSet(this, _RedisCommandsQueue_maxLength, maxLength, "f");
  72. __classPrivateFieldSet(this, _RedisCommandsQueue_onShardedChannelMoved, onShardedChannelMoved, "f");
  73. }
  74. addCommand(args, options) {
  75. if (__classPrivateFieldGet(this, _RedisCommandsQueue_maxLength, "f") && __classPrivateFieldGet(this, _RedisCommandsQueue_waitingToBeSent, "f").length + __classPrivateFieldGet(this, _RedisCommandsQueue_waitingForReply, "f").length >= __classPrivateFieldGet(this, _RedisCommandsQueue_maxLength, "f")) {
  76. return Promise.reject(new Error('The queue is full'));
  77. }
  78. else if (options?.signal?.aborted) {
  79. return Promise.reject(new errors_1.AbortError());
  80. }
  81. return new Promise((resolve, reject) => {
  82. const node = new LinkedList.Node({
  83. args,
  84. chainId: options?.chainId,
  85. returnBuffers: options?.returnBuffers,
  86. resolve,
  87. reject
  88. });
  89. if (options?.signal) {
  90. const listener = () => {
  91. __classPrivateFieldGet(this, _RedisCommandsQueue_waitingToBeSent, "f").removeNode(node);
  92. node.value.reject(new errors_1.AbortError());
  93. };
  94. node.value.abort = {
  95. signal: options.signal,
  96. listener
  97. };
  98. // AbortSignal type is incorrent
  99. options.signal.addEventListener('abort', listener, {
  100. once: true
  101. });
  102. }
  103. if (options?.asap) {
  104. __classPrivateFieldGet(this, _RedisCommandsQueue_waitingToBeSent, "f").unshiftNode(node);
  105. }
  106. else {
  107. __classPrivateFieldGet(this, _RedisCommandsQueue_waitingToBeSent, "f").pushNode(node);
  108. }
  109. });
  110. }
  111. subscribe(type, channels, listener, returnBuffers) {
  112. return __classPrivateFieldGet(this, _RedisCommandsQueue_instances, "m", _RedisCommandsQueue_pushPubSubCommand).call(this, __classPrivateFieldGet(this, _RedisCommandsQueue_pubSub, "f").subscribe(type, channels, listener, returnBuffers));
  113. }
  114. unsubscribe(type, channels, listener, returnBuffers) {
  115. return __classPrivateFieldGet(this, _RedisCommandsQueue_instances, "m", _RedisCommandsQueue_pushPubSubCommand).call(this, __classPrivateFieldGet(this, _RedisCommandsQueue_pubSub, "f").unsubscribe(type, channels, listener, returnBuffers));
  116. }
  117. resubscribe() {
  118. const commands = __classPrivateFieldGet(this, _RedisCommandsQueue_pubSub, "f").resubscribe();
  119. if (!commands.length)
  120. return;
  121. return Promise.all(commands.map(command => __classPrivateFieldGet(this, _RedisCommandsQueue_instances, "m", _RedisCommandsQueue_pushPubSubCommand).call(this, command)));
  122. }
  123. extendPubSubChannelListeners(type, channel, listeners) {
  124. return __classPrivateFieldGet(this, _RedisCommandsQueue_instances, "m", _RedisCommandsQueue_pushPubSubCommand).call(this, __classPrivateFieldGet(this, _RedisCommandsQueue_pubSub, "f").extendChannelListeners(type, channel, listeners));
  125. }
  126. extendPubSubListeners(type, listeners) {
  127. return __classPrivateFieldGet(this, _RedisCommandsQueue_instances, "m", _RedisCommandsQueue_pushPubSubCommand).call(this, __classPrivateFieldGet(this, _RedisCommandsQueue_pubSub, "f").extendTypeListeners(type, listeners));
  128. }
  129. getPubSubListeners(type) {
  130. return __classPrivateFieldGet(this, _RedisCommandsQueue_pubSub, "f").getTypeListeners(type);
  131. }
  132. getCommandToSend() {
  133. const toSend = __classPrivateFieldGet(this, _RedisCommandsQueue_waitingToBeSent, "f").shift();
  134. if (!toSend)
  135. return;
  136. let encoded;
  137. try {
  138. encoded = (0, encoder_1.default)(toSend.args);
  139. }
  140. catch (err) {
  141. toSend.reject(err);
  142. return;
  143. }
  144. __classPrivateFieldGet(this, _RedisCommandsQueue_waitingForReply, "f").push({
  145. resolve: toSend.resolve,
  146. reject: toSend.reject,
  147. channelsCounter: toSend.channelsCounter,
  148. returnBuffers: toSend.returnBuffers
  149. });
  150. __classPrivateFieldSet(this, _RedisCommandsQueue_chainInExecution, toSend.chainId, "f");
  151. return encoded;
  152. }
  153. onReplyChunk(chunk) {
  154. __classPrivateFieldGet(this, _RedisCommandsQueue_decoder, "f").write(chunk);
  155. }
  156. flushWaitingForReply(err) {
  157. __classPrivateFieldGet(this, _RedisCommandsQueue_decoder, "f").reset();
  158. __classPrivateFieldGet(this, _RedisCommandsQueue_pubSub, "f").reset();
  159. __classPrivateFieldGet(_a, _a, "m", _RedisCommandsQueue_flushQueue).call(_a, __classPrivateFieldGet(this, _RedisCommandsQueue_waitingForReply, "f"), err);
  160. if (!__classPrivateFieldGet(this, _RedisCommandsQueue_chainInExecution, "f"))
  161. return;
  162. while (__classPrivateFieldGet(this, _RedisCommandsQueue_waitingToBeSent, "f").head?.value.chainId === __classPrivateFieldGet(this, _RedisCommandsQueue_chainInExecution, "f")) {
  163. __classPrivateFieldGet(this, _RedisCommandsQueue_waitingToBeSent, "f").shift();
  164. }
  165. __classPrivateFieldSet(this, _RedisCommandsQueue_chainInExecution, undefined, "f");
  166. }
  167. flushAll(err) {
  168. __classPrivateFieldGet(this, _RedisCommandsQueue_decoder, "f").reset();
  169. __classPrivateFieldGet(this, _RedisCommandsQueue_pubSub, "f").reset();
  170. __classPrivateFieldGet(_a, _a, "m", _RedisCommandsQueue_flushQueue).call(_a, __classPrivateFieldGet(this, _RedisCommandsQueue_waitingForReply, "f"), err);
  171. __classPrivateFieldGet(_a, _a, "m", _RedisCommandsQueue_flushQueue).call(_a, __classPrivateFieldGet(this, _RedisCommandsQueue_waitingToBeSent, "f"), err);
  172. }
  173. }
  174. _a = RedisCommandsQueue, _RedisCommandsQueue_maxLength = new WeakMap(), _RedisCommandsQueue_waitingToBeSent = new WeakMap(), _RedisCommandsQueue_waitingForReply = new WeakMap(), _RedisCommandsQueue_onShardedChannelMoved = new WeakMap(), _RedisCommandsQueue_pubSub = new WeakMap(), _RedisCommandsQueue_chainInExecution = new WeakMap(), _RedisCommandsQueue_decoder = new WeakMap(), _RedisCommandsQueue_instances = new WeakSet(), _RedisCommandsQueue_flushQueue = function _RedisCommandsQueue_flushQueue(queue, err) {
  175. while (queue.length) {
  176. queue.shift().reject(err);
  177. }
  178. }, _RedisCommandsQueue_pushPubSubCommand = function _RedisCommandsQueue_pushPubSubCommand(command) {
  179. if (command === undefined)
  180. return;
  181. return new Promise((resolve, reject) => {
  182. __classPrivateFieldGet(this, _RedisCommandsQueue_waitingToBeSent, "f").push({
  183. args: command.args,
  184. channelsCounter: command.channelsCounter,
  185. returnBuffers: true,
  186. resolve: () => {
  187. command.resolve();
  188. resolve();
  189. },
  190. reject: err => {
  191. command.reject?.();
  192. reject(err);
  193. }
  194. });
  195. });
  196. };
  197. exports.default = RedisCommandsQueue;