123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516 |
- import { Runnable, _coerceToDict } from "./base.js";
- import { getCallbackManagerForConfig } from "./config.js";
- import { Document } from "../documents/index.js";
- import { ChatPromptValue, StringPromptValue } from "../prompt_values.js";
- import { RunLogPatch, RunLog, } from "../tracers/log_stream.js";
- import { AIMessage, AIMessageChunk, ChatMessage, ChatMessageChunk, FunctionMessage, FunctionMessageChunk, HumanMessage, HumanMessageChunk, SystemMessage, SystemMessageChunk, ToolMessage, ToolMessageChunk, isBaseMessage, } from "../messages/index.js";
- import { GenerationChunk, ChatGenerationChunk, RUN_KEY } from "../outputs.js";
- import { convertEventStreamToIterableReadableDataStream } from "../utils/event_source_parse.js";
- import { IterableReadableStream, concat } from "../utils/stream.js";
- function isSuperset(set, subset) {
- for (const elem of subset) {
- if (!set.has(elem)) {
- return false;
- }
- }
- return true;
- }
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- function revive(obj) {
- if (Array.isArray(obj))
- return obj.map(revive);
- if (typeof obj === "object") {
- // eslint-disable-next-line no-instanceof/no-instanceof
- if (!obj || obj instanceof Date) {
- return obj;
- }
- const keysArr = Object.keys(obj);
- const keys = new Set(keysArr);
- if (isSuperset(keys, new Set(["page_content", "metadata"]))) {
- return new Document({
- pageContent: obj.page_content,
- metadata: obj.metadata,
- });
- }
- if (isSuperset(keys, new Set(["content", "type", "additional_kwargs"]))) {
- if (obj.type === "HumanMessage" || obj.type === "human") {
- return new HumanMessage({
- content: obj.content,
- });
- }
- if (obj.type === "SystemMessage" || obj.type === "system") {
- return new SystemMessage({
- content: obj.content,
- });
- }
- if (obj.type === "ChatMessage" || obj.type === "generic") {
- return new ChatMessage({
- content: obj.content,
- role: obj.role,
- });
- }
- if (obj.type === "FunctionMessage" || obj.type === "function") {
- return new FunctionMessage({
- content: obj.content,
- name: obj.name,
- });
- }
- if (obj.type === "ToolMessage" || obj.type === "tool") {
- return new ToolMessage({
- content: obj.content,
- tool_call_id: obj.tool_call_id,
- status: obj.status,
- artifact: obj.artifact,
- });
- }
- if (obj.type === "AIMessage" || obj.type === "ai") {
- return new AIMessage({
- content: obj.content,
- });
- }
- if (obj.type === "HumanMessageChunk") {
- return new HumanMessageChunk({
- content: obj.content,
- });
- }
- if (obj.type === "SystemMessageChunk") {
- return new SystemMessageChunk({
- content: obj.content,
- });
- }
- if (obj.type === "ChatMessageChunk") {
- return new ChatMessageChunk({
- content: obj.content,
- role: obj.role,
- });
- }
- if (obj.type === "FunctionMessageChunk") {
- return new FunctionMessageChunk({
- content: obj.content,
- name: obj.name,
- });
- }
- if (obj.type === "ToolMessageChunk") {
- return new ToolMessageChunk({
- content: obj.content,
- tool_call_id: obj.tool_call_id,
- status: obj.status,
- artifact: obj.artifact,
- });
- }
- if (obj.type === "AIMessageChunk") {
- return new AIMessageChunk({
- content: obj.content,
- });
- }
- }
- if (isSuperset(keys, new Set(["text", "generation_info", "type"]))) {
- if (obj.type === "ChatGenerationChunk") {
- return new ChatGenerationChunk({
- message: revive(obj.message),
- text: obj.text,
- generationInfo: obj.generation_info,
- });
- }
- else if (obj.type === "ChatGeneration") {
- return {
- message: revive(obj.message),
- text: obj.text,
- generationInfo: obj.generation_info,
- };
- }
- else if (obj.type === "GenerationChunk") {
- return new GenerationChunk({
- text: obj.text,
- generationInfo: obj.generation_info,
- });
- }
- else if (obj.type === "Generation") {
- return {
- text: obj.text,
- generationInfo: obj.generation_info,
- };
- }
- }
- if (isSuperset(keys, new Set(["tool", "tool_input", "log", "type"]))) {
- if (obj.type === "AgentAction") {
- return {
- tool: obj.tool,
- toolInput: obj.tool_input,
- log: obj.log,
- };
- }
- }
- if (isSuperset(keys, new Set(["return_values", "log", "type"]))) {
- if (obj.type === "AgentFinish") {
- return {
- returnValues: obj.return_values,
- log: obj.log,
- };
- }
- }
- if (isSuperset(keys, new Set(["generations", "run", "type"]))) {
- if (obj.type === "LLMResult") {
- return {
- generations: revive(obj.generations),
- llmOutput: obj.llm_output,
- [RUN_KEY]: obj.run,
- };
- }
- }
- if (isSuperset(keys, new Set(["messages"]))) {
- // TODO: Start checking for type: ChatPromptValue and ChatPromptValueConcrete
- // when LangServe bug is fixed
- return new ChatPromptValue({
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- messages: obj.messages.map((msg) => revive(msg)),
- });
- }
- if (isSuperset(keys, new Set(["text"]))) {
- // TODO: Start checking for type: StringPromptValue
- // when LangServe bug is fixed
- return new StringPromptValue(obj.text);
- }
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- const innerRevive = (key) => [
- key,
- revive(obj[key]),
- ];
- const rtn = Object.fromEntries(keysArr.map(innerRevive));
- return rtn;
- }
- return obj;
- }
- function deserialize(str) {
- const obj = JSON.parse(str);
- return revive(obj);
- }
- function removeCallbacksAndSignal(options) {
- const rest = { ...options };
- delete rest.callbacks;
- delete rest.signal;
- return rest;
- }
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- function serialize(input) {
- if (Array.isArray(input))
- return input.map(serialize);
- if (isBaseMessage(input)) {
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- const serializedMessage = {
- content: input.content,
- type: input._getType(),
- additional_kwargs: input.additional_kwargs,
- name: input.name,
- example: false,
- };
- if (ToolMessage.isInstance(input)) {
- serializedMessage.tool_call_id = input.tool_call_id;
- }
- else if (ChatMessage.isInstance(input)) {
- serializedMessage.role = input.role;
- }
- return serializedMessage;
- }
- if (typeof input === "object") {
- // eslint-disable-next-line no-instanceof/no-instanceof
- if (!input || input instanceof Date) {
- return input;
- }
- const keysArr = Object.keys(input);
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- const innerSerialize = (key) => [
- key,
- serialize(input[key]),
- ];
- const rtn = Object.fromEntries(keysArr.map(innerSerialize));
- return rtn;
- }
- return input;
- }
- /**
- * Client for interacting with LangChain runnables
- * that are hosted as LangServe endpoints.
- *
- * Allows you to interact with hosted runnables using the standard
- * `.invoke()`, `.stream()`, `.streamEvents()`, etc. methods that
- * other runnables support.
- *
- * @deprecated LangServe is no longer actively developed - please consider using LangGraph Platform.
- *
- * @param url - The base URL of the LangServe endpoint.
- * @param options - Optional configuration for the remote runnable, including timeout and headers.
- * @param fetch - Optional custom fetch implementation.
- * @param fetchRequestOptions - Optional additional options for fetch requests.
- */
- export class RemoteRunnable extends Runnable {
- constructor(fields) {
- super(fields);
- Object.defineProperty(this, "url", {
- enumerable: true,
- configurable: true,
- writable: true,
- value: void 0
- });
- Object.defineProperty(this, "options", {
- enumerable: true,
- configurable: true,
- writable: true,
- value: void 0
- });
- // Wrap the default fetch call due to issues with illegal invocations
- // from the browser:
- // https://stackoverflow.com/questions/69876859/why-does-bind-fix-failed-to-execute-fetch-on-window-illegal-invocation-err
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- Object.defineProperty(this, "fetchImplementation", {
- enumerable: true,
- configurable: true,
- writable: true,
- value: (...args) =>
- // @ts-expect-error Broad typing to support a range of fetch implementations
- fetch(...args)
- });
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- Object.defineProperty(this, "fetchRequestOptions", {
- enumerable: true,
- configurable: true,
- writable: true,
- value: void 0
- });
- Object.defineProperty(this, "lc_namespace", {
- enumerable: true,
- configurable: true,
- writable: true,
- value: ["langchain", "schema", "runnable", "remote"]
- });
- const { url, options, fetch: fetchImplementation, fetchRequestOptions, } = fields;
- this.url = url.replace(/\/$/, ""); // remove trailing slash
- this.options = options;
- this.fetchImplementation = fetchImplementation ?? this.fetchImplementation;
- this.fetchRequestOptions = fetchRequestOptions;
- }
- async post(path, body, signal) {
- return this.fetchImplementation(`${this.url}${path}`, {
- method: "POST",
- body: JSON.stringify(serialize(body)),
- signal: signal ?? AbortSignal.timeout(this.options?.timeout ?? 60000),
- ...this.fetchRequestOptions,
- headers: {
- "Content-Type": "application/json",
- ...this.fetchRequestOptions?.headers,
- ...this.options?.headers,
- },
- });
- }
- async _invoke(input, options, _) {
- const [config, kwargs] = this._separateRunnableConfigFromCallOptions(options);
- const response = await this.post("/invoke", {
- input,
- config: removeCallbacksAndSignal(config),
- kwargs: kwargs ?? {},
- }, config.signal);
- if (!response.ok) {
- throw new Error(`${response.status} Error: ${await response.text()}`);
- }
- return revive((await response.json()).output);
- }
- async invoke(input, options) {
- return this._callWithConfig(this._invoke, input, options);
- }
- async _batch(inputs, options, _, batchOptions) {
- if (batchOptions?.returnExceptions) {
- throw new Error("returnExceptions is not supported for remote clients");
- }
- const configsAndKwargsArray = options?.map((opts) => this._separateRunnableConfigFromCallOptions(opts));
- const [configs, kwargs] = configsAndKwargsArray?.reduce(([pc, pk], [c, k]) => [
- [...pc, c],
- [...pk, k],
- ], [[], []]) ?? [undefined, undefined];
- const response = await this.post("/batch", {
- inputs,
- config: (configs ?? [])
- .map(removeCallbacksAndSignal)
- .map((config) => ({ ...config, ...batchOptions })),
- kwargs,
- }, options?.[0]?.signal);
- if (!response.ok) {
- throw new Error(`${response.status} Error: ${await response.text()}`);
- }
- const body = await response.json();
- if (!body.output)
- throw new Error("Invalid response from remote runnable");
- return revive(body.output);
- }
- async batch(inputs, options, batchOptions) {
- if (batchOptions?.returnExceptions) {
- throw Error("returnExceptions is not supported for remote clients");
- }
- return this._batchWithConfig(this._batch.bind(this), inputs, options, batchOptions);
- }
- async *_streamIterator(input, options) {
- const [config, kwargs] = this._separateRunnableConfigFromCallOptions(options);
- const callbackManager_ = await getCallbackManagerForConfig(options);
- const runManager = await callbackManager_?.handleChainStart(this.toJSON(), _coerceToDict(input, "input"), config.runId, undefined, undefined, undefined, config.runName);
- delete config.runId;
- let finalOutput;
- let finalOutputSupported = true;
- try {
- const response = await this.post("/stream", {
- input,
- config: removeCallbacksAndSignal(config),
- kwargs,
- }, config.signal);
- if (!response.ok) {
- const json = await response.json();
- const error = new Error(`RemoteRunnable call failed with status code ${response.status}: ${json.message}`);
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- error.response = response;
- throw error;
- }
- const { body } = response;
- if (!body) {
- throw new Error("Could not begin remote stream. Please check the given URL and try again.");
- }
- const runnableStream = convertEventStreamToIterableReadableDataStream(body);
- for await (const chunk of runnableStream) {
- const deserializedChunk = deserialize(chunk);
- yield deserializedChunk;
- if (finalOutputSupported) {
- if (finalOutput === undefined) {
- finalOutput = deserializedChunk;
- }
- else {
- try {
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- finalOutput = concat(finalOutput, deserializedChunk);
- }
- catch {
- finalOutput = undefined;
- finalOutputSupported = false;
- }
- }
- }
- }
- }
- catch (err) {
- await runManager?.handleChainError(err);
- throw err;
- }
- await runManager?.handleChainEnd(finalOutput ?? {});
- }
- async *streamLog(input, options, streamOptions) {
- const [config, kwargs] = this._separateRunnableConfigFromCallOptions(options);
- const callbackManager_ = await getCallbackManagerForConfig(options);
- const runManager = await callbackManager_?.handleChainStart(this.toJSON(), _coerceToDict(input, "input"), config.runId, undefined, undefined, undefined, config.runName);
- delete config.runId;
- // The type is in camelCase but the API only accepts snake_case.
- const camelCaseStreamOptions = {
- include_names: streamOptions?.includeNames,
- include_types: streamOptions?.includeTypes,
- include_tags: streamOptions?.includeTags,
- exclude_names: streamOptions?.excludeNames,
- exclude_types: streamOptions?.excludeTypes,
- exclude_tags: streamOptions?.excludeTags,
- };
- let runLog;
- try {
- const response = await this.post("/stream_log", {
- input,
- config: removeCallbacksAndSignal(config),
- kwargs,
- ...camelCaseStreamOptions,
- diff: false,
- }, config.signal);
- const { body, ok } = response;
- if (!ok) {
- throw new Error(`${response.status} Error: ${await response.text()}`);
- }
- if (!body) {
- throw new Error("Could not begin remote stream log. Please check the given URL and try again.");
- }
- const runnableStream = convertEventStreamToIterableReadableDataStream(body);
- for await (const log of runnableStream) {
- const chunk = revive(JSON.parse(log));
- const logPatch = new RunLogPatch({ ops: chunk.ops });
- yield logPatch;
- if (runLog === undefined) {
- runLog = RunLog.fromRunLogPatch(logPatch);
- }
- else {
- runLog = runLog.concat(logPatch);
- }
- }
- }
- catch (err) {
- await runManager?.handleChainError(err);
- throw err;
- }
- await runManager?.handleChainEnd(runLog?.state.final_output);
- }
- _streamEvents(input, options, streamOptions) {
- // eslint-disable-next-line @typescript-eslint/no-this-alias
- const outerThis = this;
- const generator = async function* () {
- const [config, kwargs] = outerThis._separateRunnableConfigFromCallOptions(options);
- const callbackManager_ = await getCallbackManagerForConfig(options);
- const runManager = await callbackManager_?.handleChainStart(outerThis.toJSON(), _coerceToDict(input, "input"), config.runId, undefined, undefined, undefined, config.runName);
- delete config.runId;
- // The type is in camelCase but the API only accepts snake_case.
- const camelCaseStreamOptions = {
- include_names: streamOptions?.includeNames,
- include_types: streamOptions?.includeTypes,
- include_tags: streamOptions?.includeTags,
- exclude_names: streamOptions?.excludeNames,
- exclude_types: streamOptions?.excludeTypes,
- exclude_tags: streamOptions?.excludeTags,
- };
- const events = [];
- try {
- const response = await outerThis.post("/stream_events", {
- input,
- config: removeCallbacksAndSignal(config),
- kwargs,
- ...camelCaseStreamOptions,
- diff: false,
- }, config.signal);
- const { body, ok } = response;
- if (!ok) {
- throw new Error(`${response.status} Error: ${await response.text()}`);
- }
- if (!body) {
- throw new Error("Could not begin remote stream events. Please check the given URL and try again.");
- }
- const runnableStream = convertEventStreamToIterableReadableDataStream(body);
- for await (const log of runnableStream) {
- const chunk = revive(JSON.parse(log));
- const event = {
- event: chunk.event,
- name: chunk.name,
- run_id: chunk.run_id,
- tags: chunk.tags,
- metadata: chunk.metadata,
- data: chunk.data,
- };
- yield event;
- events.push(event);
- }
- }
- catch (err) {
- await runManager?.handleChainError(err);
- throw err;
- }
- await runManager?.handleChainEnd(events);
- };
- return generator();
- }
- streamEvents(input, options, streamOptions) {
- if (options.version !== "v1" && options.version !== "v2") {
- throw new Error(`Only versions "v1" and "v2" of the events schema is currently supported.`);
- }
- if (options.encoding !== undefined) {
- throw new Error("Special encodings are not supported for this runnable.");
- }
- const eventStream = this._streamEvents(input, options, streamOptions);
- return IterableReadableStream.fromAsyncGenerator(eventStream);
- }
- }
|