index.js 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. 'use strict'
  2. const idGeneratorFactory = require('./id-generator')
  3. const purgeAbandoned = require('./purge-abandoned')
  4. /**
  5. * Returns a message tracker object that keeps track of which message
  6. * identifiers correspond to which message handlers. Also handles keeping track
  7. * of abandoned messages.
  8. *
  9. * @param {object} options
  10. * @param {string} options.id An identifier for the tracker.
  11. * @param {object} options.parser An object that will be used to parse messages.
  12. *
  13. * @returns {MessageTracker}
  14. */
  15. module.exports = function messageTrackerFactory (options) {
  16. if (Object.prototype.toString.call(options) !== '[object Object]') {
  17. throw Error('options object is required')
  18. }
  19. if (!options.id || typeof options.id !== 'string') {
  20. throw Error('options.id string is required')
  21. }
  22. if (!options.parser || Object.prototype.toString.call(options.parser) !== '[object Object]') {
  23. throw Error('options.parser object is required')
  24. }
  25. let currentID = 0
  26. const nextID = idGeneratorFactory()
  27. const messages = new Map()
  28. const abandoned = new Map()
  29. /**
  30. * @typedef {object} MessageTracker
  31. * @property {string} id The identifier of the tracker as supplied via the options.
  32. * @property {object} parser The parser object given by the the options.
  33. */
  34. const tracker = {
  35. id: options.id,
  36. parser: options.parser
  37. }
  38. /**
  39. * Count of messages awaiting response.
  40. *
  41. * @alias pending
  42. * @memberof! MessageTracker#
  43. */
  44. Object.defineProperty(tracker, 'pending', {
  45. get () {
  46. return messages.size
  47. }
  48. })
  49. /**
  50. * Move a specific message to the abanded track.
  51. *
  52. * @param {integer} msgID The identifier for the message to move.
  53. *
  54. * @memberof MessageTracker
  55. * @method abandon
  56. */
  57. tracker.abandon = function abandonMessage (msgID) {
  58. if (messages.has(msgID) === false) return false
  59. const toAbandon = messages.get(msgID)
  60. abandoned.set(msgID, {
  61. age: currentID,
  62. message: toAbandon.message,
  63. cb: toAbandon.callback
  64. })
  65. return messages.delete(msgID)
  66. }
  67. /**
  68. * @typedef {object} Tracked
  69. * @property {object} message The tracked message. Usually the outgoing
  70. * request object.
  71. * @property {Function} callback The handler to use when receiving a
  72. * response to the tracked message.
  73. */
  74. /**
  75. * Retrieves the message handler for a message. Removes abandoned messages
  76. * that have been given time to be resolved.
  77. *
  78. * @param {integer} msgID The identifier for the message to get the handler for.
  79. *
  80. * @memberof MessageTracker
  81. * @method fetch
  82. */
  83. tracker.fetch = function fetchMessage (msgID) {
  84. const tracked = messages.get(msgID)
  85. if (tracked) {
  86. purgeAbandoned(msgID, abandoned)
  87. return tracked
  88. }
  89. // We sent an abandon request but the server either wasn't able to process
  90. // it or has not received it yet. Therefore, we received a response for the
  91. // abandoned message. So we must return the abandoned message's callback
  92. // to be processed normally.
  93. const abandonedMsg = abandoned.get(msgID)
  94. if (abandonedMsg) {
  95. return { message: abandonedMsg, callback: abandonedMsg.cb }
  96. }
  97. return null
  98. }
  99. /**
  100. * Removes all message tracks, cleans up the abandoned track, and invokes
  101. * a callback for each message purged.
  102. *
  103. * @param {function} cb A function with the signature `(msgID, handler)`.
  104. *
  105. * @memberof MessageTracker
  106. * @method purge
  107. */
  108. tracker.purge = function purgeMessages (cb) {
  109. messages.forEach((val, key) => {
  110. purgeAbandoned(key, abandoned)
  111. tracker.remove(key)
  112. cb(key, val.callback)
  113. })
  114. }
  115. /**
  116. * Removes a message from all tracking.
  117. *
  118. * @param {integer} msgID The identifier for the message to remove from tracking.
  119. *
  120. * @memberof MessageTracker
  121. * @method remove
  122. */
  123. tracker.remove = function removeMessage (msgID) {
  124. if (messages.delete(msgID) === false) {
  125. abandoned.delete(msgID)
  126. }
  127. }
  128. /**
  129. * Add a message handler to be tracked.
  130. *
  131. * @param {object} message The message object to be tracked. This object will
  132. * have a new property added to it: `messageId`.
  133. * @param {function} callback The handler for the message.
  134. *
  135. * @memberof MessageTracker
  136. * @method track
  137. */
  138. tracker.track = function trackMessage (message, callback) {
  139. currentID = nextID()
  140. // This side effect is not ideal but the client doesn't attach the tracker
  141. // to itself until after the `.connect` method has fired. If this can be
  142. // refactored later, then we can possibly get rid of this side effect.
  143. message.messageId = currentID
  144. messages.set(currentID, { callback, message })
  145. }
  146. return tracker
  147. }