langchain.js 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. // These `@langchain/core` imports are intentionally not peer dependencies
  2. // to avoid package manager issues around circular dependencies.
  3. // eslint-disable-next-line import/no-extraneous-dependencies
  4. import { CallbackManager } from "@langchain/core/callbacks/manager";
  5. // eslint-disable-next-line import/no-extraneous-dependencies
  6. import { LangChainTracer } from "@langchain/core/tracers/tracer_langchain";
  7. // eslint-disable-next-line import/no-extraneous-dependencies
  8. import { Runnable, patchConfig, getCallbackManagerForConfig, } from "@langchain/core/runnables";
  9. import { getCurrentRunTree, isTraceableFunction, } from "./traceable.js";
  10. import { isAsyncIterable, isIteratorLike } from "./utils/asserts.js";
  11. /**
  12. * Converts the current run tree active within a traceable-wrapped function
  13. * into a LangChain compatible callback manager. This is useful to handoff tracing
  14. * from LangSmith to LangChain Runnables and LLMs.
  15. *
  16. * @param {RunTree | undefined} currentRunTree Current RunTree from within a traceable-wrapped function. If not provided, the current run tree will be inferred from AsyncLocalStorage.
  17. * @returns {CallbackManager | undefined} Callback manager used by LangChain Runnable objects.
  18. */
  19. export async function getLangchainCallbacks(currentRunTree) {
  20. const runTree = currentRunTree ?? getCurrentRunTree();
  21. if (!runTree)
  22. return undefined;
  23. // TODO: CallbackManager.configure() is only async due to LangChainTracer
  24. // factory being unnecessarily async.
  25. let callbacks = await CallbackManager.configure();
  26. if (!callbacks && runTree.tracingEnabled) {
  27. callbacks = new CallbackManager();
  28. }
  29. let langChainTracer = callbacks?.handlers.find((handler) => handler?.name === "langchain_tracer");
  30. if (!langChainTracer && runTree.tracingEnabled) {
  31. langChainTracer = new LangChainTracer();
  32. callbacks?.addHandler(langChainTracer);
  33. }
  34. const runMap = new Map();
  35. // find upward root run
  36. let rootRun = runTree;
  37. const rootVisited = new Set();
  38. while (rootRun.parent_run) {
  39. if (rootVisited.has(rootRun.id))
  40. break;
  41. rootVisited.add(rootRun.id);
  42. rootRun = rootRun.parent_run;
  43. }
  44. const queue = [rootRun];
  45. const visited = new Set();
  46. while (queue.length > 0) {
  47. const current = queue.shift();
  48. if (!current || visited.has(current.id))
  49. continue;
  50. visited.add(current.id);
  51. runMap.set(current.id, current);
  52. if (current.child_runs) {
  53. queue.push(...current.child_runs);
  54. }
  55. }
  56. if (callbacks != null) {
  57. Object.assign(callbacks, { _parentRunId: runTree.id });
  58. }
  59. if (langChainTracer != null) {
  60. if ("updateFromRunTree" in langChainTracer &&
  61. typeof langChainTracer === "function") {
  62. // eslint-disable-next-line @typescript-eslint/ban-ts-comment
  63. // @ts-ignore @langchain/core can use a different version of LangSmith
  64. langChainTracer.updateFromRunTree(runTree);
  65. }
  66. else {
  67. Object.assign(langChainTracer, {
  68. runMap,
  69. client: runTree.client,
  70. projectName: runTree.project_name || langChainTracer.projectName,
  71. exampleId: runTree.reference_example_id || langChainTracer.exampleId,
  72. });
  73. }
  74. }
  75. return callbacks;
  76. }
  77. /**
  78. * RunnableTraceable is a Runnable that wraps a traceable function.
  79. * This allows adding Langsmith traced functions into LangChain sequences.
  80. */
  81. export class RunnableTraceable extends Runnable {
  82. constructor(fields) {
  83. super(fields);
  84. Object.defineProperty(this, "lc_serializable", {
  85. enumerable: true,
  86. configurable: true,
  87. writable: true,
  88. value: false
  89. });
  90. Object.defineProperty(this, "lc_namespace", {
  91. enumerable: true,
  92. configurable: true,
  93. writable: true,
  94. value: ["langchain_core", "runnables"]
  95. });
  96. Object.defineProperty(this, "func", {
  97. enumerable: true,
  98. configurable: true,
  99. writable: true,
  100. value: void 0
  101. });
  102. if (!isTraceableFunction(fields.func)) {
  103. throw new Error("RunnableTraceable requires a function that is wrapped in traceable higher-order function");
  104. }
  105. this.func = fields.func;
  106. }
  107. async invoke(input, options) {
  108. const [config] = this._getOptionsList(options ?? {}, 1);
  109. const callbacks = await getCallbackManagerForConfig(config);
  110. return (await this.func(patchConfig(config, { callbacks }), input));
  111. }
  112. async *_streamIterator(input, options) {
  113. const result = await this.invoke(input, options);
  114. if (isAsyncIterable(result)) {
  115. for await (const item of result) {
  116. yield item;
  117. }
  118. return;
  119. }
  120. if (isIteratorLike(result)) {
  121. while (true) {
  122. const state = result.next();
  123. if (state.done)
  124. break;
  125. yield state.value;
  126. }
  127. return;
  128. }
  129. yield result;
  130. }
  131. static from(func) {
  132. return new RunnableTraceable({ func });
  133. }
  134. }