123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553 |
- "use strict";
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.ROOT = exports.withRunTree = exports.isTraceableFunction = exports.getCurrentRunTree = void 0;
- exports.traceable = traceable;
- const node_async_hooks_1 = require("node:async_hooks");
- const run_trees_js_1 = require("./run_trees.cjs");
- const env_js_1 = require("./env.cjs");
- const traceable_js_1 = require("./singletons/traceable.cjs");
- const constants_js_1 = require("./singletons/constants.cjs");
- const asserts_js_1 = require("./utils/asserts.cjs");
- traceable_js_1.AsyncLocalStorageProviderSingleton.initializeGlobalInstance(new node_async_hooks_1.AsyncLocalStorage());
- const runInputsToMap = (rawInputs) => {
- const firstInput = rawInputs[0];
- let inputs;
- if (firstInput == null) {
- inputs = {};
- }
- else if (rawInputs.length > 1) {
- inputs = { args: rawInputs };
- }
- else if ((0, asserts_js_1.isKVMap)(firstInput)) {
- inputs = firstInput;
- }
- else {
- inputs = { input: firstInput };
- }
- return inputs;
- };
- const handleRunInputs = (inputs, processInputs) => {
- try {
- return processInputs(inputs);
- }
- catch (e) {
- console.error("Error occurred during processInputs. Sending raw inputs:", e);
- return inputs;
- }
- };
- const handleRunOutputs = (rawOutputs, processOutputs) => {
- let outputs;
- if ((0, asserts_js_1.isKVMap)(rawOutputs)) {
- outputs = rawOutputs;
- }
- else {
- outputs = { outputs: rawOutputs };
- }
- try {
- return processOutputs(outputs);
- }
- catch (e) {
- console.error("Error occurred during processOutputs. Sending raw outputs:", e);
- return outputs;
- }
- };
- const handleRunAttachments = (rawInputs, extractAttachments) => {
- if (!extractAttachments) {
- return [undefined, rawInputs];
- }
- try {
- const [attachments, remainingArgs] = extractAttachments(...rawInputs);
- return [attachments, remainingArgs];
- }
- catch (e) {
- console.error("Error occurred during extractAttachments:", e);
- return [undefined, rawInputs];
- }
- };
- const getTracingRunTree = (runTree, inputs, getInvocationParams, processInputs, extractAttachments) => {
- if (!(0, env_js_1.isTracingEnabled)(runTree.tracingEnabled)) {
- return undefined;
- }
- const [attached, args] = handleRunAttachments(inputs, extractAttachments);
- runTree.attachments = attached;
- runTree.inputs = handleRunInputs(args, processInputs);
- const invocationParams = getInvocationParams?.(...inputs);
- if (invocationParams != null) {
- runTree.extra ??= {};
- runTree.extra.metadata = {
- ...invocationParams,
- ...runTree.extra.metadata,
- };
- }
- return runTree;
- };
- // idea: store the state of the promise outside
- // but only when the promise is "consumed"
- const getSerializablePromise = (arg) => {
- const proxyState = { current: undefined };
- const promiseProxy = new Proxy(arg, {
- get(target, prop, receiver) {
- if (prop === "then") {
- const boundThen = arg[prop].bind(arg);
- return (resolve, reject = (x) => {
- throw x;
- }) => {
- return boundThen((value) => {
- proxyState.current = ["resolve", value];
- return resolve(value);
- }, (error) => {
- proxyState.current = ["reject", error];
- return reject(error);
- });
- };
- }
- if (prop === "catch") {
- const boundCatch = arg[prop].bind(arg);
- return (reject) => {
- return boundCatch((error) => {
- proxyState.current = ["reject", error];
- return reject(error);
- });
- };
- }
- if (prop === "toJSON") {
- return () => {
- if (!proxyState.current)
- return undefined;
- const [type, value] = proxyState.current ?? [];
- if (type === "resolve")
- return value;
- return { error: value };
- };
- }
- return Reflect.get(target, prop, receiver);
- },
- });
- return promiseProxy;
- };
- const convertSerializableArg = (arg) => {
- if ((0, asserts_js_1.isReadableStream)(arg)) {
- const proxyState = [];
- const transform = new TransformStream({
- start: () => void 0,
- transform: (chunk, controller) => {
- proxyState.push(chunk);
- controller.enqueue(chunk);
- },
- flush: () => void 0,
- });
- const pipeThrough = arg.pipeThrough(transform);
- Object.assign(pipeThrough, { toJSON: () => proxyState });
- return pipeThrough;
- }
- if ((0, asserts_js_1.isAsyncIterable)(arg)) {
- const proxyState = { current: [] };
- return new Proxy(arg, {
- get(target, prop, receiver) {
- if (prop === Symbol.asyncIterator) {
- return () => {
- const boundIterator = arg[Symbol.asyncIterator].bind(arg);
- const iterator = boundIterator();
- return new Proxy(iterator, {
- get(target, prop, receiver) {
- if (prop === "next" || prop === "return" || prop === "throw") {
- const bound = iterator.next.bind(iterator);
- return (...args) => {
- // @ts-expect-error TS cannot infer the argument types for the bound function
- const wrapped = getSerializablePromise(bound(...args));
- proxyState.current.push(wrapped);
- return wrapped;
- };
- }
- if (prop === "return" || prop === "throw") {
- return iterator.next.bind(iterator);
- }
- return Reflect.get(target, prop, receiver);
- },
- });
- };
- }
- if (prop === "toJSON") {
- return () => {
- const onlyNexts = proxyState.current;
- const serialized = onlyNexts.map((next) => next.toJSON());
- const chunks = serialized.reduce((memo, next) => {
- if (next?.value)
- memo.push(next.value);
- return memo;
- }, []);
- return chunks;
- };
- }
- return Reflect.get(target, prop, receiver);
- },
- });
- }
- if (!Array.isArray(arg) && (0, asserts_js_1.isIteratorLike)(arg)) {
- const proxyState = [];
- return new Proxy(arg, {
- get(target, prop, receiver) {
- if (prop === "next" || prop === "return" || prop === "throw") {
- const bound = arg[prop]?.bind(arg);
- return (...args) => {
- const next = bound?.(...args);
- if (next != null)
- proxyState.push(next);
- return next;
- };
- }
- if (prop === "toJSON") {
- return () => {
- const chunks = proxyState.reduce((memo, next) => {
- if (next.value)
- memo.push(next.value);
- return memo;
- }, []);
- return chunks;
- };
- }
- return Reflect.get(target, prop, receiver);
- },
- });
- }
- if ((0, asserts_js_1.isThenable)(arg)) {
- return getSerializablePromise(arg);
- }
- return arg;
- };
- /**
- * Higher-order function that takes function as input and returns a
- * "TraceableFunction" - a wrapped version of the input that
- * automatically handles tracing. If the returned traceable function calls any
- * traceable functions, those are automatically traced as well.
- *
- * The returned TraceableFunction can accept a run tree or run tree config as
- * its first argument. If omitted, it will default to the caller's run tree,
- * or will be treated as a root run.
- *
- * @param wrappedFunc Targeted function to be traced
- * @param config Additional metadata such as name, tags or providing
- * a custom LangSmith client instance
- */
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- function traceable(wrappedFunc, config) {
- const { aggregator, argsConfigPath, __finalTracedIteratorKey, processInputs, processOutputs, extractAttachments, ...runTreeConfig } = config ?? {};
- const processInputsFn = processInputs ?? ((x) => x);
- const processOutputsFn = processOutputs ?? ((x) => x);
- const extractAttachmentsFn = extractAttachments ?? ((...x) => [undefined, runInputsToMap(x)]);
- const traceableFunc = (...args) => {
- let ensuredConfig;
- try {
- let runtimeConfig;
- if (argsConfigPath) {
- const [index, path] = argsConfigPath;
- if (index === args.length - 1 && !path) {
- runtimeConfig = args.pop();
- }
- else if (index <= args.length &&
- typeof args[index] === "object" &&
- args[index] !== null) {
- if (path) {
- const { [path]: extracted, ...rest } = args[index];
- runtimeConfig = extracted;
- args[index] = rest;
- }
- else {
- runtimeConfig = args[index];
- args.splice(index, 1);
- }
- }
- }
- ensuredConfig = {
- name: wrappedFunc.name || "<lambda>",
- ...runTreeConfig,
- ...runtimeConfig,
- tags: [
- ...new Set([
- ...(runTreeConfig?.tags ?? []),
- ...(runtimeConfig?.tags ?? []),
- ]),
- ],
- metadata: {
- ...runTreeConfig?.metadata,
- ...runtimeConfig?.metadata,
- },
- };
- }
- catch (err) {
- console.warn(`Failed to extract runtime config from args for ${runTreeConfig?.name ?? wrappedFunc.name}`, err);
- ensuredConfig = {
- name: wrappedFunc.name || "<lambda>",
- ...runTreeConfig,
- };
- }
- const asyncLocalStorage = traceable_js_1.AsyncLocalStorageProviderSingleton.getInstance();
- // TODO: deal with possible nested promises and async iterables
- const processedArgs = args;
- for (let i = 0; i < processedArgs.length; i++) {
- processedArgs[i] = convertSerializableArg(processedArgs[i]);
- }
- const [currentRunTree, rawInputs] = (() => {
- const [firstArg, ...restArgs] = processedArgs;
- // used for handoff between LangChain.JS and traceable functions
- if ((0, run_trees_js_1.isRunnableConfigLike)(firstArg)) {
- return [
- getTracingRunTree(run_trees_js_1.RunTree.fromRunnableConfig(firstArg, ensuredConfig), restArgs, config?.getInvocationParams, processInputsFn, extractAttachmentsFn),
- restArgs,
- ];
- }
- // deprecated: legacy CallbackManagerRunTree used in runOnDataset
- // override ALS and do not pass-through the run tree
- if ((0, run_trees_js_1.isRunTree)(firstArg) &&
- "callbackManager" in firstArg &&
- firstArg.callbackManager != null) {
- return [firstArg, restArgs];
- }
- // when ALS is unreliable, users can manually
- // pass in the run tree
- if (firstArg === traceable_js_1.ROOT || (0, run_trees_js_1.isRunTree)(firstArg)) {
- const currentRunTree = getTracingRunTree(firstArg === traceable_js_1.ROOT
- ? new run_trees_js_1.RunTree(ensuredConfig)
- : firstArg.createChild(ensuredConfig), restArgs, config?.getInvocationParams, processInputsFn, extractAttachmentsFn);
- return [currentRunTree, [currentRunTree, ...restArgs]];
- }
- // Node.JS uses AsyncLocalStorage (ALS) and AsyncResource
- // to allow storing context
- const prevRunFromStore = asyncLocalStorage.getStore();
- if ((0, run_trees_js_1.isRunTree)(prevRunFromStore)) {
- return [
- getTracingRunTree(prevRunFromStore.createChild(ensuredConfig), processedArgs, config?.getInvocationParams, processInputsFn, extractAttachmentsFn),
- processedArgs,
- ];
- }
- const currentRunTree = getTracingRunTree(new run_trees_js_1.RunTree(ensuredConfig), processedArgs, config?.getInvocationParams, processInputsFn, extractAttachmentsFn);
- // If a context var is set by LangChain outside of a traceable,
- // it will be an object with a single property and we should copy
- // context vars over into the new run tree.
- if (prevRunFromStore !== undefined &&
- constants_js_1._LC_CONTEXT_VARIABLES_KEY in prevRunFromStore) {
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- currentRunTree[constants_js_1._LC_CONTEXT_VARIABLES_KEY] =
- prevRunFromStore[constants_js_1._LC_CONTEXT_VARIABLES_KEY];
- }
- return [currentRunTree, processedArgs];
- })();
- return asyncLocalStorage.run(currentRunTree, () => {
- const postRunPromise = currentRunTree?.postRun();
- async function handleChunks(chunks) {
- if (aggregator !== undefined) {
- try {
- return await aggregator(chunks);
- }
- catch (e) {
- console.error(`[ERROR]: LangSmith aggregation failed: `, e);
- }
- }
- return chunks;
- }
- function tapReadableStreamForTracing(stream, snapshot) {
- const reader = stream.getReader();
- let finished = false;
- const chunks = [];
- const tappedStream = new ReadableStream({
- async start(controller) {
- // eslint-disable-next-line no-constant-condition
- while (true) {
- const result = await (snapshot
- ? snapshot(() => reader.read())
- : reader.read());
- if (result.done) {
- finished = true;
- await currentRunTree?.end(handleRunOutputs(await handleChunks(chunks), processOutputsFn));
- await handleEnd();
- controller.close();
- break;
- }
- chunks.push(result.value);
- // Add new_token event for streaming LLM runs
- if (currentRunTree && currentRunTree.run_type === "llm") {
- currentRunTree.addEvent({
- name: "new_token",
- kwargs: { token: result.value },
- });
- }
- controller.enqueue(result.value);
- }
- },
- async cancel(reason) {
- if (!finished)
- await currentRunTree?.end(undefined, "Cancelled");
- await currentRunTree?.end(handleRunOutputs(await handleChunks(chunks), processOutputsFn));
- await handleEnd();
- return reader.cancel(reason);
- },
- });
- return tappedStream;
- }
- async function* wrapAsyncIteratorForTracing(iterator, snapshot) {
- let finished = false;
- const chunks = [];
- try {
- while (true) {
- const { value, done } = await (snapshot
- ? snapshot(() => iterator.next())
- : iterator.next());
- if (done) {
- finished = true;
- break;
- }
- chunks.push(value);
- // Add new_token event for streaming LLM runs
- if (currentRunTree && currentRunTree.run_type === "llm") {
- currentRunTree.addEvent({
- name: "new_token",
- kwargs: { token: value },
- });
- }
- yield value;
- }
- }
- catch (e) {
- await currentRunTree?.end(undefined, String(e));
- throw e;
- }
- finally {
- if (!finished)
- await currentRunTree?.end(undefined, "Cancelled");
- await currentRunTree?.end(handleRunOutputs(await handleChunks(chunks), processOutputsFn));
- await handleEnd();
- }
- }
- function wrapAsyncGeneratorForTracing(iterable, snapshot) {
- if ((0, asserts_js_1.isReadableStream)(iterable)) {
- return tapReadableStreamForTracing(iterable, snapshot);
- }
- const iterator = iterable[Symbol.asyncIterator]();
- const wrappedIterator = wrapAsyncIteratorForTracing(iterator, snapshot);
- iterable[Symbol.asyncIterator] = () => wrappedIterator;
- return iterable;
- }
- async function handleEnd() {
- const onEnd = config?.on_end;
- if (onEnd) {
- if (!currentRunTree) {
- console.warn("Can not call 'on_end' if currentRunTree is undefined");
- }
- else {
- onEnd(currentRunTree);
- }
- }
- await postRunPromise;
- await currentRunTree?.patchRun();
- }
- function gatherAll(iterator) {
- const chunks = [];
- // eslint-disable-next-line no-constant-condition
- while (true) {
- const next = iterator.next();
- chunks.push(next);
- if (next.done)
- break;
- }
- return chunks;
- }
- let returnValue;
- try {
- returnValue = wrappedFunc(...rawInputs);
- }
- catch (err) {
- returnValue = Promise.reject(err);
- }
- if ((0, asserts_js_1.isAsyncIterable)(returnValue)) {
- const snapshot = node_async_hooks_1.AsyncLocalStorage.snapshot();
- return wrapAsyncGeneratorForTracing(returnValue, snapshot);
- }
- if (!Array.isArray(returnValue) &&
- typeof returnValue === "object" &&
- returnValue != null &&
- __finalTracedIteratorKey !== undefined &&
- (0, asserts_js_1.isAsyncIterable)(returnValue[__finalTracedIteratorKey])) {
- const snapshot = node_async_hooks_1.AsyncLocalStorage.snapshot();
- return {
- ...returnValue,
- [__finalTracedIteratorKey]: wrapAsyncGeneratorForTracing(returnValue[__finalTracedIteratorKey], snapshot),
- };
- }
- const tracedPromise = new Promise((resolve, reject) => {
- Promise.resolve(returnValue)
- .then(async (rawOutput) => {
- if ((0, asserts_js_1.isAsyncIterable)(rawOutput)) {
- const snapshot = node_async_hooks_1.AsyncLocalStorage.snapshot();
- return resolve(wrapAsyncGeneratorForTracing(rawOutput, snapshot));
- }
- if (!Array.isArray(rawOutput) &&
- typeof rawOutput === "object" &&
- rawOutput != null &&
- __finalTracedIteratorKey !== undefined &&
- (0, asserts_js_1.isAsyncIterable)(rawOutput[__finalTracedIteratorKey])) {
- const snapshot = node_async_hooks_1.AsyncLocalStorage.snapshot();
- return {
- ...rawOutput,
- [__finalTracedIteratorKey]: wrapAsyncGeneratorForTracing(rawOutput[__finalTracedIteratorKey], snapshot),
- };
- }
- if ((0, asserts_js_1.isGenerator)(wrappedFunc) && (0, asserts_js_1.isIteratorLike)(rawOutput)) {
- const chunks = gatherAll(rawOutput);
- try {
- await currentRunTree?.end(handleRunOutputs(await handleChunks(chunks.reduce((memo, { value, done }) => {
- if (!done || typeof value !== "undefined") {
- memo.push(value);
- }
- return memo;
- }, [])), processOutputsFn));
- await handleEnd();
- }
- catch (e) {
- console.error("Error occurred during handleEnd:", e);
- }
- return (function* () {
- for (const ret of chunks) {
- if (ret.done)
- return ret.value;
- yield ret.value;
- }
- })();
- }
- try {
- await currentRunTree?.end(handleRunOutputs(rawOutput, processOutputsFn));
- await handleEnd();
- }
- finally {
- // eslint-disable-next-line no-unsafe-finally
- return rawOutput;
- }
- }, async (error) => {
- await currentRunTree?.end(undefined, String(error));
- await handleEnd();
- throw error;
- })
- .then(resolve, reject);
- });
- if (typeof returnValue !== "object" || returnValue === null) {
- return tracedPromise;
- }
- return new Proxy(returnValue, {
- get(target, prop, receiver) {
- if ((0, asserts_js_1.isPromiseMethod)(prop)) {
- return tracedPromise[prop].bind(tracedPromise);
- }
- return Reflect.get(target, prop, receiver);
- },
- });
- });
- };
- Object.defineProperty(traceableFunc, "langsmith:traceable", {
- value: runTreeConfig,
- });
- return traceableFunc;
- }
- var traceable_js_2 = require("./singletons/traceable.cjs");
- Object.defineProperty(exports, "getCurrentRunTree", { enumerable: true, get: function () { return traceable_js_2.getCurrentRunTree; } });
- Object.defineProperty(exports, "isTraceableFunction", { enumerable: true, get: function () { return traceable_js_2.isTraceableFunction; } });
- Object.defineProperty(exports, "withRunTree", { enumerable: true, get: function () { return traceable_js_2.withRunTree; } });
- Object.defineProperty(exports, "ROOT", { enumerable: true, get: function () { return traceable_js_2.ROOT; } });
|