The world is going realtime, and so should your tasks. With the Streams API, you can stream data from your tasks to the outside world in realtime. This is useful for a variety of use cases, including AI.
How it works
The Streams API is a simple API that allows you to send data from your tasks to the outside world in realtime using the metadata system. You can send any kind of data that is streamed in realtime, but the most common use case is to send streaming output from streaming LLM providers, like OpenAI.
Usage
To use the Streams API, you need to register a stream with a specific key using metadata.stream
. The following example uses the OpenAI SDK with stream: true
to stream the output of the LLM model in realtime:
import { task, metadata } from "@trigger.dev/sdk/v3";
import OpenAI from "openai";
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
export type STREAMS = {
openai: OpenAI.ChatCompletionChunk; // The type of the chunk is determined by the provider
};
export const myTask = task({
id: "my-task",
run: async (payload: { prompt: string }) => {
const completion = await openai.chat.completions.create({
messages: [{ role: "user", content: payload.prompt }],
model: "gpt-3.5-turbo",
stream: true,
});
// Register the stream with the key "openai"
// This will "tee" the stream and send it to the metadata system
const stream = await metadata.stream("openai", completion);
let text = "";
// You can read the returned stream as an async iterator
for await (const chunk of stream) {
logger.log("Received chunk", { chunk });
// The type of the chunk is determined by the provider
text += chunk.choices.map((choice) => choice.delta?.content).join("");
}
return { text };
},
});
You can then subscribe to the stream using the runs.subscribeToRun
method:
runs.subscribeToRun
should be used from your backend or another task. To subscribe to a run from
your frontend, you can use our React hooks.
import { runs } from "@trigger.dev/sdk/v3";
import type { myTask, STREAMS } from "./trigger/my-task";
// Somewhere in your backend
async function subscribeToStream(runId: string) {
// Use a for-await loop to subscribe to the stream
for await (const part of runs.subscribeToRun<typeof myTask>(runId).withStreams<STREAMS>()) {
switch (part.type) {
case "run": {
console.log("Received run", part.run);
break;
}
case "openai": {
// part.chunk is of type OpenAI.ChatCompletionChunk
console.log("Received OpenAI chunk", part.chunk);
break;
}
}
}
}
You can register and subscribe to multiple streams in the same task. Let’s add a stream from the response body of a fetch request:
import { task, metadata } from "@trigger.dev/sdk/v3";
import OpenAI from "openai";
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
export type STREAMS = {
openai: OpenAI.ChatCompletionChunk; // The type of the chunk is determined by the provider
fetch: string; // The response body will be an array of strings
};
export const myTask = task({
id: "my-task",
run: async (payload: { prompt: string }) => {
const completion = await openai.chat.completions.create({
messages: [{ role: "user", content: payload.prompt }],
model: "gpt-3.5-turbo",
stream: true,
});
// Register the stream with the key "openai"
await metadata.stream("openai", completion);
const response = await fetch("https://jsonplaceholder.typicode.com/posts");
if (!response.body) {
return;
}
// Register the stream with the key "fetch"
// Pipe the response.body through a TextDecoderStream to convert it to a string
await metadata.stream("fetch", response.body.pipeThrough(new TextDecoderStream()));
},
});
You may notice above that we aren’t consuming either of the streams in the task. In the
background, we’ll wait until all streams are consumed before the task is considered complete (with
a max timeout of 60 seconds). If you have a longer running stream, make sure to consume it in the
task.
And then subscribing to the streams:
import { runs } from "@trigger.dev/sdk/v3";
import type { myTask, STREAMS } from "./trigger/my-task";
// Somewhere in your backend
async function subscribeToStream(runId: string) {
// Use a for-await loop to subscribe to the stream
for await (const part of runs.subscribeToRun<typeof myTask>(runId).withStreams<STREAMS>()) {
switch (part.type) {
case "run": {
console.log("Received run", part.run);
break;
}
case "openai": {
// part.chunk is of type OpenAI.ChatCompletionChunk
console.log("Received OpenAI chunk", part.chunk);
break;
}
case "fetch": {
// part.chunk is a string
console.log("Received fetch chunk", part.chunk);
break;
}
}
}
}
React hooks
If you’re building a frontend application, you can use our React hooks to subscribe to streams. Here’s an example of how you can use the useRealtimeRunWithStreams
hook to subscribe to a stream:
import { useRealtimeRunWithStreams } from "@trigger.dev/react-hooks";
import type { myTask, STREAMS } from "./trigger/my-task";
// Somewhere in your React component
function MyComponent({ runId, publicAccessToken }: { runId: string; publicAccessToken: string }) {
const { run, streams } = useRealtimeRunWithStreams<typeof myTask, STREAMS>(runId, {
accessToken: publicAccessToken,
});
if (!run) {
return <div>Loading...</div>;
}
return (
<div>
<h1>Run ID: {run.id}</h1>
<h2>Streams:</h2>
<ul>
{Object.entries(streams).map(([key, value]) => (
<li key={key}>
<strong>{key}</strong>: {JSON.stringify(value)}
</li>
))}
</ul>
</div>
);
}
Read more about using the React hooks in the React hooks documentation.
Usage with the ai
SDK
The ai SDK provides a higher-level API for working with AI models. You can use the ai
SDK with the Streams API by using the streamText
method:
import { openai } from "@ai-sdk/openai";
import { logger, metadata, runs, schemaTask } from "@trigger.dev/sdk/v3";
import { streamText } from "ai";
import { z } from "zod";
export type STREAMS = {
openai: string;
};
export const aiStreaming = schemaTask({
id: "ai-streaming",
description: "Stream data from the AI sdk",
schema: z.object({
model: z.string().default("o1-preview"),
prompt: z.string().default("Hello, how are you?"),
}),
run: async ({ model, prompt }) => {
logger.info("Running OpenAI model", { model, prompt });
const result = streamText({
model: openai(model),
prompt,
});
// pass the textStream to the metadata system
const stream = await metadata.stream("openai", result.textStream);
let text = "";
for await (const chunk of stream) {
logger.log("Received chunk", { chunk });
text += chunk; // chunk is a string
}
return { text };
},
});
And then render the stream in your frontend:
import { useRealtimeRunWithStreams } from "@trigger.dev/react-hooks";
import type { aiStreaming, STREAMS } from "./trigger/ai-streaming";
function MyComponent({ runId, publicAccessToken }: { runId: string; publicAccessToken: string }) {
const { streams } = useRealtimeRunWithStreams<typeof aiStreaming, STREAMS>(runId, {
accessToken: publicAccessToken,
});
if (!streams.openai) {
return <div>Loading...</div>;
}
const text = streams.openai.join(""); // `streams.openai` is an array of strings
return (
<div>
<h2>OpenAI response:</h2>
<p>{text}</p>
</div>
);
}
When calling streamText
, you can provide a tools
object that allows the LLM to use additional tools. You can then access the tool call and results using the fullStream
method:
import { openai } from "@ai-sdk/openai";
import { logger, metadata, runs, schemaTask } from "@trigger.dev/sdk/v3";
import { streamText, tool, type TextStreamPart } from "ai";
import { z } from "zod";
const tools = {
getWeather: tool({
description: "Get the weather in a location",
parameters: z.object({
location: z.string().describe("The location to get the weather for"),
}),
execute: async ({ location }) => ({
location,
temperature: 72 + Math.floor(Math.random() * 21) - 10,
}),
}),
};
export type STREAMS = {
// Give the stream a type of TextStreamPart along with the tools
openai: TextStreamPart<{ getWeather: typeof tools.getWeather }>;
};
export const aiStreamingWithTools = schemaTask({
id: "ai-streaming-with-tools",
description: "Stream data from the AI SDK and use tools",
schema: z.object({
model: z.string().default("gpt-4o-mini"),
prompt: z
.string()
.default(
"Based on the temperature, will I need to wear extra clothes today in San Fransico? Please be detailed."
),
}),
run: async ({ model, prompt }) => {
logger.info("Running OpenAI model", { model, prompt });
const result = streamText({
model: openai(model),
prompt,
tools, // Pass in the tools to use
maxSteps: 5, // Allow streamText to repeatedly call the model
});
// pass the fullStream to the metadata system
const stream = await metadata.stream("openai", result.fullStream);
let text = "";
for await (const chunk of stream) {
logger.log("Received chunk", { chunk });
// chunk is a TextStreamPart
if (chunk.type === "text-delta") {
text += chunk.textDelta;
}
}
return { text };
},
});
Now you can get access to the tool call and results in your frontend:
import { useRealtimeRunWithStreams } from "@trigger.dev/react-hooks";
import type { aiStreamingWithTools, STREAMS } from "./trigger/ai-streaming";
function MyComponent({ runId, publicAccessToken }: { runId: string; publicAccessToken: string }) {
const { streams } = useRealtimeRunWithStreams<typeof aiStreamingWithTools, STREAMS>(runId, {
accessToken: publicAccessToken,
});
if (!streams.openai) {
return <div>Loading...</div>;
}
// streams.openai is an array of TextStreamPart
const toolCall = streams.openai.find(
(stream) => stream.type === "tool-call" && stream.toolName === "getWeather"
);
const toolResult = streams.openai.find((stream) => stream.type === "tool-result");
const textDeltas = streams.openai.filter((stream) => stream.type === "text-delta");
const text = textDeltas.map((delta) => delta.textDelta).join("");
const weatherLocation = toolCall ? toolCall.args.location : undefined;
const weather = toolResult ? toolResult.result.temperature : undefined;
return (
<div>
<h2>OpenAI response:</h2>
<p>{text}</p>
<h2>Weather:</h2>
<p>
{weatherLocation
? `The weather in ${weatherLocation} is ${weather} degrees.`
: "No weather data"}
</p>
</div>
);
}
As you can see above, we defined a tool which will be used in the aiStreamingWithTools
task. You can also define a Trigger.dev task that can be used as a tool, and will automatically be invoked with triggerAndWait
when the tool is called. This is done using the toolTask
function:
import { openai } from "@ai-sdk/openai";
import { logger, metadata, runs, schemaTask, toolTask } from "@trigger.dev/sdk/v3";
import { streamText, tool, type TextStreamPart } from "ai";
import { z } from "zod";
export const getWeather = toolTask({
id: "get-weather",
description: "Get the weather for a location",
// Define the parameters for the tool, which becomes the task payload
parameters: z.object({
location: z.string(),
}),
run: async ({ location }) => {
// return mock data
return {
location,
temperature: 72 + Math.floor(Math.random() * 21) - 10,
};
},
});
export type STREAMS = {
// Give the stream a type of TextStreamPart along with the tools
openai: TextStreamPart<{ getWeather: typeof getWeather.tool }>;
};
export const aiStreamingWithTools = schemaTask({
id: "ai-streaming-with-tools",
description: "Stream data from the AI SDK and use tools",
schema: z.object({
model: z.string().default("gpt-4o-mini"),
prompt: z
.string()
.default(
"Based on the temperature, will I need to wear extra clothes today in San Fransico? Please be detailed."
),
}),
run: async ({ model, prompt }) => {
logger.info("Running OpenAI model", { model, prompt });
const result = streamText({
model: openai(model),
prompt,
tools: {
getWeather: getWeather.tool, // pass weatherTask.tool as a tool
},
maxSteps: 5, // Allow streamText to repeatedly call the model
});
// pass the fullStream to the metadata system
const stream = await metadata.stream("openai", result.fullStream);
let text = "";
for await (const chunk of stream) {
logger.log("Received chunk", { chunk });
// chunk is a TextStreamPart
if (chunk.type === "text-delta") {
text += chunk.textDelta;
}
}
return { text };
},
});
Examples
Learn more