traceable.js 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. import { AsyncLocalStorage } from "node:async_hooks";
  2. import { RunTree, isRunTree, isRunnableConfigLike, } from "./run_trees.js";
  3. import { isTracingEnabled } from "./env.js";
  4. import { ROOT, AsyncLocalStorageProviderSingleton, } from "./singletons/traceable.js";
  5. import { _LC_CONTEXT_VARIABLES_KEY } from "./singletons/constants.js";
  6. import { isKVMap, isReadableStream, isAsyncIterable, isIteratorLike, isThenable, isGenerator, isPromiseMethod, } from "./utils/asserts.js";
  7. AsyncLocalStorageProviderSingleton.initializeGlobalInstance(new AsyncLocalStorage());
  8. const runInputsToMap = (rawInputs) => {
  9. const firstInput = rawInputs[0];
  10. let inputs;
  11. if (firstInput == null) {
  12. inputs = {};
  13. }
  14. else if (rawInputs.length > 1) {
  15. inputs = { args: rawInputs };
  16. }
  17. else if (isKVMap(firstInput)) {
  18. inputs = firstInput;
  19. }
  20. else {
  21. inputs = { input: firstInput };
  22. }
  23. return inputs;
  24. };
  25. const handleRunInputs = (inputs, processInputs) => {
  26. try {
  27. return processInputs(inputs);
  28. }
  29. catch (e) {
  30. console.error("Error occurred during processInputs. Sending raw inputs:", e);
  31. return inputs;
  32. }
  33. };
  34. const handleRunOutputs = (rawOutputs, processOutputs) => {
  35. let outputs;
  36. if (isKVMap(rawOutputs)) {
  37. outputs = rawOutputs;
  38. }
  39. else {
  40. outputs = { outputs: rawOutputs };
  41. }
  42. try {
  43. return processOutputs(outputs);
  44. }
  45. catch (e) {
  46. console.error("Error occurred during processOutputs. Sending raw outputs:", e);
  47. return outputs;
  48. }
  49. };
  50. const handleRunAttachments = (rawInputs, extractAttachments) => {
  51. if (!extractAttachments) {
  52. return [undefined, rawInputs];
  53. }
  54. try {
  55. const [attachments, remainingArgs] = extractAttachments(...rawInputs);
  56. return [attachments, remainingArgs];
  57. }
  58. catch (e) {
  59. console.error("Error occurred during extractAttachments:", e);
  60. return [undefined, rawInputs];
  61. }
  62. };
  63. const getTracingRunTree = (runTree, inputs, getInvocationParams, processInputs, extractAttachments) => {
  64. if (!isTracingEnabled(runTree.tracingEnabled)) {
  65. return undefined;
  66. }
  67. const [attached, args] = handleRunAttachments(inputs, extractAttachments);
  68. runTree.attachments = attached;
  69. runTree.inputs = handleRunInputs(args, processInputs);
  70. const invocationParams = getInvocationParams?.(...inputs);
  71. if (invocationParams != null) {
  72. runTree.extra ??= {};
  73. runTree.extra.metadata = {
  74. ...invocationParams,
  75. ...runTree.extra.metadata,
  76. };
  77. }
  78. return runTree;
  79. };
  80. // idea: store the state of the promise outside
  81. // but only when the promise is "consumed"
  82. const getSerializablePromise = (arg) => {
  83. const proxyState = { current: undefined };
  84. const promiseProxy = new Proxy(arg, {
  85. get(target, prop, receiver) {
  86. if (prop === "then") {
  87. const boundThen = arg[prop].bind(arg);
  88. return (resolve, reject = (x) => {
  89. throw x;
  90. }) => {
  91. return boundThen((value) => {
  92. proxyState.current = ["resolve", value];
  93. return resolve(value);
  94. }, (error) => {
  95. proxyState.current = ["reject", error];
  96. return reject(error);
  97. });
  98. };
  99. }
  100. if (prop === "catch") {
  101. const boundCatch = arg[prop].bind(arg);
  102. return (reject) => {
  103. return boundCatch((error) => {
  104. proxyState.current = ["reject", error];
  105. return reject(error);
  106. });
  107. };
  108. }
  109. if (prop === "toJSON") {
  110. return () => {
  111. if (!proxyState.current)
  112. return undefined;
  113. const [type, value] = proxyState.current ?? [];
  114. if (type === "resolve")
  115. return value;
  116. return { error: value };
  117. };
  118. }
  119. return Reflect.get(target, prop, receiver);
  120. },
  121. });
  122. return promiseProxy;
  123. };
  124. const convertSerializableArg = (arg) => {
  125. if (isReadableStream(arg)) {
  126. const proxyState = [];
  127. const transform = new TransformStream({
  128. start: () => void 0,
  129. transform: (chunk, controller) => {
  130. proxyState.push(chunk);
  131. controller.enqueue(chunk);
  132. },
  133. flush: () => void 0,
  134. });
  135. const pipeThrough = arg.pipeThrough(transform);
  136. Object.assign(pipeThrough, { toJSON: () => proxyState });
  137. return pipeThrough;
  138. }
  139. if (isAsyncIterable(arg)) {
  140. const proxyState = { current: [] };
  141. return new Proxy(arg, {
  142. get(target, prop, receiver) {
  143. if (prop === Symbol.asyncIterator) {
  144. return () => {
  145. const boundIterator = arg[Symbol.asyncIterator].bind(arg);
  146. const iterator = boundIterator();
  147. return new Proxy(iterator, {
  148. get(target, prop, receiver) {
  149. if (prop === "next" || prop === "return" || prop === "throw") {
  150. const bound = iterator.next.bind(iterator);
  151. return (...args) => {
  152. // @ts-expect-error TS cannot infer the argument types for the bound function
  153. const wrapped = getSerializablePromise(bound(...args));
  154. proxyState.current.push(wrapped);
  155. return wrapped;
  156. };
  157. }
  158. if (prop === "return" || prop === "throw") {
  159. return iterator.next.bind(iterator);
  160. }
  161. return Reflect.get(target, prop, receiver);
  162. },
  163. });
  164. };
  165. }
  166. if (prop === "toJSON") {
  167. return () => {
  168. const onlyNexts = proxyState.current;
  169. const serialized = onlyNexts.map((next) => next.toJSON());
  170. const chunks = serialized.reduce((memo, next) => {
  171. if (next?.value)
  172. memo.push(next.value);
  173. return memo;
  174. }, []);
  175. return chunks;
  176. };
  177. }
  178. return Reflect.get(target, prop, receiver);
  179. },
  180. });
  181. }
  182. if (!Array.isArray(arg) && isIteratorLike(arg)) {
  183. const proxyState = [];
  184. return new Proxy(arg, {
  185. get(target, prop, receiver) {
  186. if (prop === "next" || prop === "return" || prop === "throw") {
  187. const bound = arg[prop]?.bind(arg);
  188. return (...args) => {
  189. const next = bound?.(...args);
  190. if (next != null)
  191. proxyState.push(next);
  192. return next;
  193. };
  194. }
  195. if (prop === "toJSON") {
  196. return () => {
  197. const chunks = proxyState.reduce((memo, next) => {
  198. if (next.value)
  199. memo.push(next.value);
  200. return memo;
  201. }, []);
  202. return chunks;
  203. };
  204. }
  205. return Reflect.get(target, prop, receiver);
  206. },
  207. });
  208. }
  209. if (isThenable(arg)) {
  210. return getSerializablePromise(arg);
  211. }
  212. return arg;
  213. };
  214. /**
  215. * Higher-order function that takes function as input and returns a
  216. * "TraceableFunction" - a wrapped version of the input that
  217. * automatically handles tracing. If the returned traceable function calls any
  218. * traceable functions, those are automatically traced as well.
  219. *
  220. * The returned TraceableFunction can accept a run tree or run tree config as
  221. * its first argument. If omitted, it will default to the caller's run tree,
  222. * or will be treated as a root run.
  223. *
  224. * @param wrappedFunc Targeted function to be traced
  225. * @param config Additional metadata such as name, tags or providing
  226. * a custom LangSmith client instance
  227. */
  228. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  229. export function traceable(wrappedFunc, config) {
  230. const { aggregator, argsConfigPath, __finalTracedIteratorKey, processInputs, processOutputs, extractAttachments, ...runTreeConfig } = config ?? {};
  231. const processInputsFn = processInputs ?? ((x) => x);
  232. const processOutputsFn = processOutputs ?? ((x) => x);
  233. const extractAttachmentsFn = extractAttachments ?? ((...x) => [undefined, runInputsToMap(x)]);
  234. const traceableFunc = (...args) => {
  235. let ensuredConfig;
  236. try {
  237. let runtimeConfig;
  238. if (argsConfigPath) {
  239. const [index, path] = argsConfigPath;
  240. if (index === args.length - 1 && !path) {
  241. runtimeConfig = args.pop();
  242. }
  243. else if (index <= args.length &&
  244. typeof args[index] === "object" &&
  245. args[index] !== null) {
  246. if (path) {
  247. const { [path]: extracted, ...rest } = args[index];
  248. runtimeConfig = extracted;
  249. args[index] = rest;
  250. }
  251. else {
  252. runtimeConfig = args[index];
  253. args.splice(index, 1);
  254. }
  255. }
  256. }
  257. ensuredConfig = {
  258. name: wrappedFunc.name || "<lambda>",
  259. ...runTreeConfig,
  260. ...runtimeConfig,
  261. tags: [
  262. ...new Set([
  263. ...(runTreeConfig?.tags ?? []),
  264. ...(runtimeConfig?.tags ?? []),
  265. ]),
  266. ],
  267. metadata: {
  268. ...runTreeConfig?.metadata,
  269. ...runtimeConfig?.metadata,
  270. },
  271. };
  272. }
  273. catch (err) {
  274. console.warn(`Failed to extract runtime config from args for ${runTreeConfig?.name ?? wrappedFunc.name}`, err);
  275. ensuredConfig = {
  276. name: wrappedFunc.name || "<lambda>",
  277. ...runTreeConfig,
  278. };
  279. }
  280. const asyncLocalStorage = AsyncLocalStorageProviderSingleton.getInstance();
  281. // TODO: deal with possible nested promises and async iterables
  282. const processedArgs = args;
  283. for (let i = 0; i < processedArgs.length; i++) {
  284. processedArgs[i] = convertSerializableArg(processedArgs[i]);
  285. }
  286. const [currentRunTree, rawInputs] = (() => {
  287. const [firstArg, ...restArgs] = processedArgs;
  288. // used for handoff between LangChain.JS and traceable functions
  289. if (isRunnableConfigLike(firstArg)) {
  290. return [
  291. getTracingRunTree(RunTree.fromRunnableConfig(firstArg, ensuredConfig), restArgs, config?.getInvocationParams, processInputsFn, extractAttachmentsFn),
  292. restArgs,
  293. ];
  294. }
  295. // deprecated: legacy CallbackManagerRunTree used in runOnDataset
  296. // override ALS and do not pass-through the run tree
  297. if (isRunTree(firstArg) &&
  298. "callbackManager" in firstArg &&
  299. firstArg.callbackManager != null) {
  300. return [firstArg, restArgs];
  301. }
  302. // when ALS is unreliable, users can manually
  303. // pass in the run tree
  304. if (firstArg === ROOT || isRunTree(firstArg)) {
  305. const currentRunTree = getTracingRunTree(firstArg === ROOT
  306. ? new RunTree(ensuredConfig)
  307. : firstArg.createChild(ensuredConfig), restArgs, config?.getInvocationParams, processInputsFn, extractAttachmentsFn);
  308. return [currentRunTree, [currentRunTree, ...restArgs]];
  309. }
  310. // Node.JS uses AsyncLocalStorage (ALS) and AsyncResource
  311. // to allow storing context
  312. const prevRunFromStore = asyncLocalStorage.getStore();
  313. if (isRunTree(prevRunFromStore)) {
  314. return [
  315. getTracingRunTree(prevRunFromStore.createChild(ensuredConfig), processedArgs, config?.getInvocationParams, processInputsFn, extractAttachmentsFn),
  316. processedArgs,
  317. ];
  318. }
  319. const currentRunTree = getTracingRunTree(new RunTree(ensuredConfig), processedArgs, config?.getInvocationParams, processInputsFn, extractAttachmentsFn);
  320. // If a context var is set by LangChain outside of a traceable,
  321. // it will be an object with a single property and we should copy
  322. // context vars over into the new run tree.
  323. if (prevRunFromStore !== undefined &&
  324. _LC_CONTEXT_VARIABLES_KEY in prevRunFromStore) {
  325. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  326. currentRunTree[_LC_CONTEXT_VARIABLES_KEY] =
  327. prevRunFromStore[_LC_CONTEXT_VARIABLES_KEY];
  328. }
  329. return [currentRunTree, processedArgs];
  330. })();
  331. return asyncLocalStorage.run(currentRunTree, () => {
  332. const postRunPromise = currentRunTree?.postRun();
  333. async function handleChunks(chunks) {
  334. if (aggregator !== undefined) {
  335. try {
  336. return await aggregator(chunks);
  337. }
  338. catch (e) {
  339. console.error(`[ERROR]: LangSmith aggregation failed: `, e);
  340. }
  341. }
  342. return chunks;
  343. }
  344. function tapReadableStreamForTracing(stream, snapshot) {
  345. const reader = stream.getReader();
  346. let finished = false;
  347. const chunks = [];
  348. const tappedStream = new ReadableStream({
  349. async start(controller) {
  350. // eslint-disable-next-line no-constant-condition
  351. while (true) {
  352. const result = await (snapshot
  353. ? snapshot(() => reader.read())
  354. : reader.read());
  355. if (result.done) {
  356. finished = true;
  357. await currentRunTree?.end(handleRunOutputs(await handleChunks(chunks), processOutputsFn));
  358. await handleEnd();
  359. controller.close();
  360. break;
  361. }
  362. chunks.push(result.value);
  363. // Add new_token event for streaming LLM runs
  364. if (currentRunTree && currentRunTree.run_type === "llm") {
  365. currentRunTree.addEvent({
  366. name: "new_token",
  367. kwargs: { token: result.value },
  368. });
  369. }
  370. controller.enqueue(result.value);
  371. }
  372. },
  373. async cancel(reason) {
  374. if (!finished)
  375. await currentRunTree?.end(undefined, "Cancelled");
  376. await currentRunTree?.end(handleRunOutputs(await handleChunks(chunks), processOutputsFn));
  377. await handleEnd();
  378. return reader.cancel(reason);
  379. },
  380. });
  381. return tappedStream;
  382. }
  383. async function* wrapAsyncIteratorForTracing(iterator, snapshot) {
  384. let finished = false;
  385. const chunks = [];
  386. try {
  387. while (true) {
  388. const { value, done } = await (snapshot
  389. ? snapshot(() => iterator.next())
  390. : iterator.next());
  391. if (done) {
  392. finished = true;
  393. break;
  394. }
  395. chunks.push(value);
  396. // Add new_token event for streaming LLM runs
  397. if (currentRunTree && currentRunTree.run_type === "llm") {
  398. currentRunTree.addEvent({
  399. name: "new_token",
  400. kwargs: { token: value },
  401. });
  402. }
  403. yield value;
  404. }
  405. }
  406. catch (e) {
  407. await currentRunTree?.end(undefined, String(e));
  408. throw e;
  409. }
  410. finally {
  411. if (!finished)
  412. await currentRunTree?.end(undefined, "Cancelled");
  413. await currentRunTree?.end(handleRunOutputs(await handleChunks(chunks), processOutputsFn));
  414. await handleEnd();
  415. }
  416. }
  417. function wrapAsyncGeneratorForTracing(iterable, snapshot) {
  418. if (isReadableStream(iterable)) {
  419. return tapReadableStreamForTracing(iterable, snapshot);
  420. }
  421. const iterator = iterable[Symbol.asyncIterator]();
  422. const wrappedIterator = wrapAsyncIteratorForTracing(iterator, snapshot);
  423. iterable[Symbol.asyncIterator] = () => wrappedIterator;
  424. return iterable;
  425. }
  426. async function handleEnd() {
  427. const onEnd = config?.on_end;
  428. if (onEnd) {
  429. if (!currentRunTree) {
  430. console.warn("Can not call 'on_end' if currentRunTree is undefined");
  431. }
  432. else {
  433. onEnd(currentRunTree);
  434. }
  435. }
  436. await postRunPromise;
  437. await currentRunTree?.patchRun();
  438. }
  439. function gatherAll(iterator) {
  440. const chunks = [];
  441. // eslint-disable-next-line no-constant-condition
  442. while (true) {
  443. const next = iterator.next();
  444. chunks.push(next);
  445. if (next.done)
  446. break;
  447. }
  448. return chunks;
  449. }
  450. let returnValue;
  451. try {
  452. returnValue = wrappedFunc(...rawInputs);
  453. }
  454. catch (err) {
  455. returnValue = Promise.reject(err);
  456. }
  457. if (isAsyncIterable(returnValue)) {
  458. const snapshot = AsyncLocalStorage.snapshot();
  459. return wrapAsyncGeneratorForTracing(returnValue, snapshot);
  460. }
  461. if (!Array.isArray(returnValue) &&
  462. typeof returnValue === "object" &&
  463. returnValue != null &&
  464. __finalTracedIteratorKey !== undefined &&
  465. isAsyncIterable(returnValue[__finalTracedIteratorKey])) {
  466. const snapshot = AsyncLocalStorage.snapshot();
  467. return {
  468. ...returnValue,
  469. [__finalTracedIteratorKey]: wrapAsyncGeneratorForTracing(returnValue[__finalTracedIteratorKey], snapshot),
  470. };
  471. }
  472. const tracedPromise = new Promise((resolve, reject) => {
  473. Promise.resolve(returnValue)
  474. .then(async (rawOutput) => {
  475. if (isAsyncIterable(rawOutput)) {
  476. const snapshot = AsyncLocalStorage.snapshot();
  477. return resolve(wrapAsyncGeneratorForTracing(rawOutput, snapshot));
  478. }
  479. if (!Array.isArray(rawOutput) &&
  480. typeof rawOutput === "object" &&
  481. rawOutput != null &&
  482. __finalTracedIteratorKey !== undefined &&
  483. isAsyncIterable(rawOutput[__finalTracedIteratorKey])) {
  484. const snapshot = AsyncLocalStorage.snapshot();
  485. return {
  486. ...rawOutput,
  487. [__finalTracedIteratorKey]: wrapAsyncGeneratorForTracing(rawOutput[__finalTracedIteratorKey], snapshot),
  488. };
  489. }
  490. if (isGenerator(wrappedFunc) && isIteratorLike(rawOutput)) {
  491. const chunks = gatherAll(rawOutput);
  492. try {
  493. await currentRunTree?.end(handleRunOutputs(await handleChunks(chunks.reduce((memo, { value, done }) => {
  494. if (!done || typeof value !== "undefined") {
  495. memo.push(value);
  496. }
  497. return memo;
  498. }, [])), processOutputsFn));
  499. await handleEnd();
  500. }
  501. catch (e) {
  502. console.error("Error occurred during handleEnd:", e);
  503. }
  504. return (function* () {
  505. for (const ret of chunks) {
  506. if (ret.done)
  507. return ret.value;
  508. yield ret.value;
  509. }
  510. })();
  511. }
  512. try {
  513. await currentRunTree?.end(handleRunOutputs(rawOutput, processOutputsFn));
  514. await handleEnd();
  515. }
  516. finally {
  517. // eslint-disable-next-line no-unsafe-finally
  518. return rawOutput;
  519. }
  520. }, async (error) => {
  521. await currentRunTree?.end(undefined, String(error));
  522. await handleEnd();
  523. throw error;
  524. })
  525. .then(resolve, reject);
  526. });
  527. if (typeof returnValue !== "object" || returnValue === null) {
  528. return tracedPromise;
  529. }
  530. return new Proxy(returnValue, {
  531. get(target, prop, receiver) {
  532. if (isPromiseMethod(prop)) {
  533. return tracedPromise[prop].bind(tracedPromise);
  534. }
  535. return Reflect.get(target, prop, receiver);
  536. },
  537. });
  538. });
  539. };
  540. Object.defineProperty(traceableFunc, "langsmith:traceable", {
  541. value: runTreeConfig,
  542. });
  543. return traceableFunc;
  544. }
  545. export { getCurrentRunTree, isTraceableFunction, withRunTree, ROOT, } from "./singletons/traceable.js";