pub-sub.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. "use strict";
  2. var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) {
  3. if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter");
  4. if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it");
  5. return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver);
  6. };
  7. var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, state, value, kind, f) {
  8. if (kind === "m") throw new TypeError("Private method is not writable");
  9. if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a setter");
  10. if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot write private member to an object whose class did not declare it");
  11. return (kind === "a" ? f.call(receiver, value) : f ? f.value = value : state.set(receiver, value)), value;
  12. };
  13. var _PubSub_instances, _a, _PubSub_channelsArray, _PubSub_listenersSet, _PubSub_subscribing, _PubSub_isActive, _PubSub_listeners, _PubSub_extendChannelListeners, _PubSub_unsubscribeCommand, _PubSub_updateIsActive, _PubSub_emitPubSubMessage;
  14. Object.defineProperty(exports, "__esModule", { value: true });
  15. exports.PubSub = exports.PubSubType = void 0;
  16. var PubSubType;
  17. (function (PubSubType) {
  18. PubSubType["CHANNELS"] = "CHANNELS";
  19. PubSubType["PATTERNS"] = "PATTERNS";
  20. PubSubType["SHARDED"] = "SHARDED";
  21. })(PubSubType || (exports.PubSubType = PubSubType = {}));
  22. const COMMANDS = {
  23. [PubSubType.CHANNELS]: {
  24. subscribe: Buffer.from('subscribe'),
  25. unsubscribe: Buffer.from('unsubscribe'),
  26. message: Buffer.from('message')
  27. },
  28. [PubSubType.PATTERNS]: {
  29. subscribe: Buffer.from('psubscribe'),
  30. unsubscribe: Buffer.from('punsubscribe'),
  31. message: Buffer.from('pmessage')
  32. },
  33. [PubSubType.SHARDED]: {
  34. subscribe: Buffer.from('ssubscribe'),
  35. unsubscribe: Buffer.from('sunsubscribe'),
  36. message: Buffer.from('smessage')
  37. }
  38. };
  39. class PubSub {
  40. constructor() {
  41. _PubSub_instances.add(this);
  42. _PubSub_subscribing.set(this, 0);
  43. _PubSub_isActive.set(this, false);
  44. _PubSub_listeners.set(this, {
  45. [PubSubType.CHANNELS]: new Map(),
  46. [PubSubType.PATTERNS]: new Map(),
  47. [PubSubType.SHARDED]: new Map()
  48. });
  49. }
  50. static isStatusReply(reply) {
  51. return (COMMANDS[PubSubType.CHANNELS].subscribe.equals(reply[0]) ||
  52. COMMANDS[PubSubType.CHANNELS].unsubscribe.equals(reply[0]) ||
  53. COMMANDS[PubSubType.PATTERNS].subscribe.equals(reply[0]) ||
  54. COMMANDS[PubSubType.PATTERNS].unsubscribe.equals(reply[0]) ||
  55. COMMANDS[PubSubType.SHARDED].subscribe.equals(reply[0]));
  56. }
  57. static isShardedUnsubscribe(reply) {
  58. return COMMANDS[PubSubType.SHARDED].unsubscribe.equals(reply[0]);
  59. }
  60. get isActive() {
  61. return __classPrivateFieldGet(this, _PubSub_isActive, "f");
  62. }
  63. subscribe(type, channels, listener, returnBuffers) {
  64. var _b;
  65. const args = [COMMANDS[type].subscribe], channelsArray = __classPrivateFieldGet(_a, _a, "m", _PubSub_channelsArray).call(_a, channels);
  66. for (const channel of channelsArray) {
  67. let channelListeners = __classPrivateFieldGet(this, _PubSub_listeners, "f")[type].get(channel);
  68. if (!channelListeners || channelListeners.unsubscribing) {
  69. args.push(channel);
  70. }
  71. }
  72. if (args.length === 1) {
  73. // all channels are already subscribed, add listeners without issuing a command
  74. for (const channel of channelsArray) {
  75. __classPrivateFieldGet(_a, _a, "m", _PubSub_listenersSet).call(_a, __classPrivateFieldGet(this, _PubSub_listeners, "f")[type].get(channel), returnBuffers).add(listener);
  76. }
  77. return;
  78. }
  79. __classPrivateFieldSet(this, _PubSub_isActive, true, "f");
  80. __classPrivateFieldSet(this, _PubSub_subscribing, (_b = __classPrivateFieldGet(this, _PubSub_subscribing, "f"), _b++, _b), "f");
  81. return {
  82. args,
  83. channelsCounter: args.length - 1,
  84. resolve: () => {
  85. var _b;
  86. __classPrivateFieldSet(this, _PubSub_subscribing, (_b = __classPrivateFieldGet(this, _PubSub_subscribing, "f"), _b--, _b), "f");
  87. for (const channel of channelsArray) {
  88. let listeners = __classPrivateFieldGet(this, _PubSub_listeners, "f")[type].get(channel);
  89. if (!listeners) {
  90. listeners = {
  91. unsubscribing: false,
  92. buffers: new Set(),
  93. strings: new Set()
  94. };
  95. __classPrivateFieldGet(this, _PubSub_listeners, "f")[type].set(channel, listeners);
  96. }
  97. __classPrivateFieldGet(_a, _a, "m", _PubSub_listenersSet).call(_a, listeners, returnBuffers).add(listener);
  98. }
  99. },
  100. reject: () => {
  101. var _b;
  102. __classPrivateFieldSet(this, _PubSub_subscribing, (_b = __classPrivateFieldGet(this, _PubSub_subscribing, "f"), _b--, _b), "f");
  103. __classPrivateFieldGet(this, _PubSub_instances, "m", _PubSub_updateIsActive).call(this);
  104. }
  105. };
  106. }
  107. extendChannelListeners(type, channel, listeners) {
  108. var _b;
  109. if (!__classPrivateFieldGet(this, _PubSub_instances, "m", _PubSub_extendChannelListeners).call(this, type, channel, listeners))
  110. return;
  111. __classPrivateFieldSet(this, _PubSub_isActive, true, "f");
  112. __classPrivateFieldSet(this, _PubSub_subscribing, (_b = __classPrivateFieldGet(this, _PubSub_subscribing, "f"), _b++, _b), "f");
  113. return {
  114. args: [
  115. COMMANDS[type].subscribe,
  116. channel
  117. ],
  118. channelsCounter: 1,
  119. resolve: () => { var _b, _c; return __classPrivateFieldSet(this, _PubSub_subscribing, (_c = __classPrivateFieldGet(this, _PubSub_subscribing, "f"), _b = _c--, _c), "f"), _b; },
  120. reject: () => {
  121. var _b;
  122. __classPrivateFieldSet(this, _PubSub_subscribing, (_b = __classPrivateFieldGet(this, _PubSub_subscribing, "f"), _b--, _b), "f");
  123. __classPrivateFieldGet(this, _PubSub_instances, "m", _PubSub_updateIsActive).call(this);
  124. }
  125. };
  126. }
  127. extendTypeListeners(type, listeners) {
  128. var _b;
  129. const args = [COMMANDS[type].subscribe];
  130. for (const [channel, channelListeners] of listeners) {
  131. if (__classPrivateFieldGet(this, _PubSub_instances, "m", _PubSub_extendChannelListeners).call(this, type, channel, channelListeners)) {
  132. args.push(channel);
  133. }
  134. }
  135. if (args.length === 1)
  136. return;
  137. __classPrivateFieldSet(this, _PubSub_isActive, true, "f");
  138. __classPrivateFieldSet(this, _PubSub_subscribing, (_b = __classPrivateFieldGet(this, _PubSub_subscribing, "f"), _b++, _b), "f");
  139. return {
  140. args,
  141. channelsCounter: args.length - 1,
  142. resolve: () => { var _b, _c; return __classPrivateFieldSet(this, _PubSub_subscribing, (_c = __classPrivateFieldGet(this, _PubSub_subscribing, "f"), _b = _c--, _c), "f"), _b; },
  143. reject: () => {
  144. var _b;
  145. __classPrivateFieldSet(this, _PubSub_subscribing, (_b = __classPrivateFieldGet(this, _PubSub_subscribing, "f"), _b--, _b), "f");
  146. __classPrivateFieldGet(this, _PubSub_instances, "m", _PubSub_updateIsActive).call(this);
  147. }
  148. };
  149. }
  150. unsubscribe(type, channels, listener, returnBuffers) {
  151. const listeners = __classPrivateFieldGet(this, _PubSub_listeners, "f")[type];
  152. if (!channels) {
  153. return __classPrivateFieldGet(this, _PubSub_instances, "m", _PubSub_unsubscribeCommand).call(this, [COMMANDS[type].unsubscribe],
  154. // cannot use `this.#subscribed` because there might be some `SUBSCRIBE` commands in the queue
  155. // cannot use `this.#subscribed + this.#subscribing` because some `SUBSCRIBE` commands might fail
  156. NaN, () => listeners.clear());
  157. }
  158. const channelsArray = __classPrivateFieldGet(_a, _a, "m", _PubSub_channelsArray).call(_a, channels);
  159. if (!listener) {
  160. return __classPrivateFieldGet(this, _PubSub_instances, "m", _PubSub_unsubscribeCommand).call(this, [COMMANDS[type].unsubscribe, ...channelsArray], channelsArray.length, () => {
  161. for (const channel of channelsArray) {
  162. listeners.delete(channel);
  163. }
  164. });
  165. }
  166. const args = [COMMANDS[type].unsubscribe];
  167. for (const channel of channelsArray) {
  168. const sets = listeners.get(channel);
  169. if (sets) {
  170. let current, other;
  171. if (returnBuffers) {
  172. current = sets.buffers;
  173. other = sets.strings;
  174. }
  175. else {
  176. current = sets.strings;
  177. other = sets.buffers;
  178. }
  179. const currentSize = current.has(listener) ? current.size - 1 : current.size;
  180. if (currentSize !== 0 || other.size !== 0)
  181. continue;
  182. sets.unsubscribing = true;
  183. }
  184. args.push(channel);
  185. }
  186. if (args.length === 1) {
  187. // all channels has other listeners,
  188. // delete the listeners without issuing a command
  189. for (const channel of channelsArray) {
  190. __classPrivateFieldGet(_a, _a, "m", _PubSub_listenersSet).call(_a, listeners.get(channel), returnBuffers).delete(listener);
  191. }
  192. return;
  193. }
  194. return __classPrivateFieldGet(this, _PubSub_instances, "m", _PubSub_unsubscribeCommand).call(this, args, args.length - 1, () => {
  195. for (const channel of channelsArray) {
  196. const sets = listeners.get(channel);
  197. if (!sets)
  198. continue;
  199. (returnBuffers ? sets.buffers : sets.strings).delete(listener);
  200. if (sets.buffers.size === 0 && sets.strings.size === 0) {
  201. listeners.delete(channel);
  202. }
  203. }
  204. });
  205. }
  206. reset() {
  207. __classPrivateFieldSet(this, _PubSub_isActive, false, "f");
  208. __classPrivateFieldSet(this, _PubSub_subscribing, 0, "f");
  209. }
  210. resubscribe() {
  211. var _b;
  212. const commands = [];
  213. for (const [type, listeners] of Object.entries(__classPrivateFieldGet(this, _PubSub_listeners, "f"))) {
  214. if (!listeners.size)
  215. continue;
  216. __classPrivateFieldSet(this, _PubSub_isActive, true, "f");
  217. __classPrivateFieldSet(this, _PubSub_subscribing, (_b = __classPrivateFieldGet(this, _PubSub_subscribing, "f"), _b++, _b), "f");
  218. const callback = () => { var _b, _c; return __classPrivateFieldSet(this, _PubSub_subscribing, (_c = __classPrivateFieldGet(this, _PubSub_subscribing, "f"), _b = _c--, _c), "f"), _b; };
  219. commands.push({
  220. args: [
  221. COMMANDS[type].subscribe,
  222. ...listeners.keys()
  223. ],
  224. channelsCounter: listeners.size,
  225. resolve: callback,
  226. reject: callback
  227. });
  228. }
  229. return commands;
  230. }
  231. handleMessageReply(reply) {
  232. if (COMMANDS[PubSubType.CHANNELS].message.equals(reply[0])) {
  233. __classPrivateFieldGet(this, _PubSub_instances, "m", _PubSub_emitPubSubMessage).call(this, PubSubType.CHANNELS, reply[2], reply[1]);
  234. return true;
  235. }
  236. else if (COMMANDS[PubSubType.PATTERNS].message.equals(reply[0])) {
  237. __classPrivateFieldGet(this, _PubSub_instances, "m", _PubSub_emitPubSubMessage).call(this, PubSubType.PATTERNS, reply[3], reply[2], reply[1]);
  238. return true;
  239. }
  240. else if (COMMANDS[PubSubType.SHARDED].message.equals(reply[0])) {
  241. __classPrivateFieldGet(this, _PubSub_instances, "m", _PubSub_emitPubSubMessage).call(this, PubSubType.SHARDED, reply[2], reply[1]);
  242. return true;
  243. }
  244. return false;
  245. }
  246. removeShardedListeners(channel) {
  247. const listeners = __classPrivateFieldGet(this, _PubSub_listeners, "f")[PubSubType.SHARDED].get(channel);
  248. __classPrivateFieldGet(this, _PubSub_listeners, "f")[PubSubType.SHARDED].delete(channel);
  249. __classPrivateFieldGet(this, _PubSub_instances, "m", _PubSub_updateIsActive).call(this);
  250. return listeners;
  251. }
  252. getTypeListeners(type) {
  253. return __classPrivateFieldGet(this, _PubSub_listeners, "f")[type];
  254. }
  255. }
  256. exports.PubSub = PubSub;
  257. _a = PubSub, _PubSub_subscribing = new WeakMap(), _PubSub_isActive = new WeakMap(), _PubSub_listeners = new WeakMap(), _PubSub_instances = new WeakSet(), _PubSub_channelsArray = function _PubSub_channelsArray(channels) {
  258. return (Array.isArray(channels) ? channels : [channels]);
  259. }, _PubSub_listenersSet = function _PubSub_listenersSet(listeners, returnBuffers) {
  260. return (returnBuffers ? listeners.buffers : listeners.strings);
  261. }, _PubSub_extendChannelListeners = function _PubSub_extendChannelListeners(type, channel, listeners) {
  262. const existingListeners = __classPrivateFieldGet(this, _PubSub_listeners, "f")[type].get(channel);
  263. if (!existingListeners) {
  264. __classPrivateFieldGet(this, _PubSub_listeners, "f")[type].set(channel, listeners);
  265. return true;
  266. }
  267. for (const listener of listeners.buffers) {
  268. existingListeners.buffers.add(listener);
  269. }
  270. for (const listener of listeners.strings) {
  271. existingListeners.strings.add(listener);
  272. }
  273. return false;
  274. }, _PubSub_unsubscribeCommand = function _PubSub_unsubscribeCommand(args, channelsCounter, removeListeners) {
  275. return {
  276. args,
  277. channelsCounter,
  278. resolve: () => {
  279. removeListeners();
  280. __classPrivateFieldGet(this, _PubSub_instances, "m", _PubSub_updateIsActive).call(this);
  281. },
  282. reject: undefined // use the same structure as `subscribe`
  283. };
  284. }, _PubSub_updateIsActive = function _PubSub_updateIsActive() {
  285. __classPrivateFieldSet(this, _PubSub_isActive, (__classPrivateFieldGet(this, _PubSub_listeners, "f")[PubSubType.CHANNELS].size !== 0 ||
  286. __classPrivateFieldGet(this, _PubSub_listeners, "f")[PubSubType.PATTERNS].size !== 0 ||
  287. __classPrivateFieldGet(this, _PubSub_listeners, "f")[PubSubType.SHARDED].size !== 0 ||
  288. __classPrivateFieldGet(this, _PubSub_subscribing, "f") !== 0), "f");
  289. }, _PubSub_emitPubSubMessage = function _PubSub_emitPubSubMessage(type, message, channel, pattern) {
  290. const keyString = (pattern ?? channel).toString(), listeners = __classPrivateFieldGet(this, _PubSub_listeners, "f")[type].get(keyString);
  291. if (!listeners)
  292. return;
  293. for (const listener of listeners.buffers) {
  294. listener(message, channel);
  295. }
  296. if (!listeners.strings.size)
  297. return;
  298. const channelString = pattern ? channel.toString() : keyString, messageString = channelString === '__redis__:invalidate' ?
  299. // https://github.com/redis/redis/pull/7469
  300. // https://github.com/redis/redis/issues/7463
  301. (message === null ? null : message.map(x => x.toString())) :
  302. message.toString();
  303. for (const listener of listeners.strings) {
  304. listener(messageString, channelString);
  305. }
  306. };