multiprocess.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. const debug = require('debug')('log4js:multiprocess');
  2. const net = require('net');
  3. const LoggingEvent = require('../LoggingEvent');
  4. const END_MSG = '__LOG4JS__';
  5. /**
  6. * Creates a server, listening on config.loggerPort, config.loggerHost.
  7. * Output goes to config.actualAppender (config.appender is used to
  8. * set up that appender).
  9. */
  10. function logServer(config, actualAppender, levels) {
  11. /**
  12. * Takes a utf-8 string, returns an object with
  13. * the correct log properties.
  14. */
  15. function deserializeLoggingEvent(clientSocket, msg) {
  16. debug('(master) deserialising log event');
  17. const loggingEvent = LoggingEvent.deserialise(msg);
  18. loggingEvent.remoteAddress = clientSocket.remoteAddress;
  19. loggingEvent.remotePort = clientSocket.remotePort;
  20. return loggingEvent;
  21. }
  22. const server = net.createServer((clientSocket) => {
  23. debug('(master) connection received');
  24. clientSocket.setEncoding('utf8');
  25. let logMessage = '';
  26. function logTheMessage(msg) {
  27. debug('(master) deserialising log event and sending to actual appender');
  28. actualAppender(deserializeLoggingEvent(clientSocket, msg));
  29. }
  30. function chunkReceived(chunk) {
  31. debug('(master) chunk of data received');
  32. let event;
  33. logMessage += chunk || '';
  34. if (logMessage.indexOf(END_MSG) > -1) {
  35. event = logMessage.slice(0, logMessage.indexOf(END_MSG));
  36. logTheMessage(event);
  37. logMessage = logMessage.slice(event.length + END_MSG.length) || '';
  38. // check for more, maybe it was a big chunk
  39. chunkReceived();
  40. }
  41. }
  42. function handleError(error) {
  43. const loggingEvent = {
  44. startTime: new Date(),
  45. categoryName: 'log4js',
  46. level: levels.ERROR,
  47. data: ['A worker log process hung up unexpectedly', error],
  48. remoteAddress: clientSocket.remoteAddress,
  49. remotePort: clientSocket.remotePort
  50. };
  51. actualAppender(loggingEvent);
  52. }
  53. clientSocket.on('data', chunkReceived);
  54. clientSocket.on('end', chunkReceived);
  55. clientSocket.on('error', handleError);
  56. });
  57. server.listen(config.loggerPort || 5000, config.loggerHost || 'localhost', (e) => {
  58. debug('(master) master server listening, error was ', e);
  59. // allow the process to exit, if this is the only socket active
  60. server.unref();
  61. });
  62. function app(event) {
  63. debug('(master) log event sent directly to actual appender (local event)');
  64. return actualAppender(event);
  65. }
  66. app.shutdown = function (cb) {
  67. debug('(master) master shutdown called, closing server');
  68. server.close(cb);
  69. };
  70. return app;
  71. }
  72. function workerAppender(config) {
  73. let canWrite = false;
  74. const buffer = [];
  75. let socket;
  76. let shutdownAttempts = 3;
  77. function write(loggingEvent) {
  78. debug('(worker) Writing log event to socket');
  79. socket.write(loggingEvent.serialise(), 'utf8');
  80. socket.write(END_MSG, 'utf8');
  81. }
  82. function emptyBuffer() {
  83. let evt;
  84. debug('(worker) emptying worker buffer');
  85. while ((evt = buffer.shift())) {
  86. write(evt);
  87. }
  88. }
  89. function createSocket() {
  90. debug(
  91. `(worker) worker appender creating socket to ${config.loggerHost || 'localhost'}:${config.loggerPort || 5000}`
  92. );
  93. socket = net.createConnection(config.loggerPort || 5000, config.loggerHost || 'localhost');
  94. socket.on('connect', () => {
  95. debug('(worker) worker socket connected');
  96. emptyBuffer();
  97. canWrite = true;
  98. });
  99. socket.on('timeout', socket.end.bind(socket));
  100. socket.on('error', (e) => {
  101. debug('connection error', e);
  102. canWrite = false;
  103. emptyBuffer();
  104. });
  105. socket.on('close', createSocket);
  106. }
  107. createSocket();
  108. function log(loggingEvent) {
  109. if (canWrite) {
  110. write(loggingEvent);
  111. } else {
  112. debug('(worker) worker buffering log event because it cannot write at the moment');
  113. buffer.push(loggingEvent);
  114. }
  115. }
  116. log.shutdown = function (cb) {
  117. debug('(worker) worker shutdown called');
  118. if (buffer.length && shutdownAttempts) {
  119. debug('(worker) worker buffer has items, waiting 100ms to empty');
  120. shutdownAttempts -= 1;
  121. setTimeout(() => {
  122. log.shutdown(cb);
  123. }, 100);
  124. } else {
  125. socket.removeAllListeners('close');
  126. socket.end(cb);
  127. }
  128. };
  129. return log;
  130. }
  131. function createAppender(config, appender, levels) {
  132. if (config.mode === 'master') {
  133. debug('Creating master appender');
  134. return logServer(config, appender, levels);
  135. }
  136. debug('Creating worker appender');
  137. return workerAppender(config);
  138. }
  139. function configure(config, layouts, findAppender, levels) {
  140. let appender;
  141. debug(`configure with mode = ${config.mode}`);
  142. if (config.mode === 'master') {
  143. if (!config.appender) {
  144. debug(`no appender found in config ${config}`);
  145. throw new Error('multiprocess master must have an "appender" defined');
  146. }
  147. debug(`actual appender is ${config.appender}`);
  148. appender = findAppender(config.appender);
  149. if (!appender) {
  150. debug(`actual appender "${config.appender}" not found`);
  151. throw new Error(`multiprocess master appender "${config.appender}" not defined`);
  152. }
  153. }
  154. return createAppender(config, appender, levels);
  155. }
  156. module.exports.configure = configure;